diff options
author | George Goldberg <george@gberg.me> | 2017-07-07 15:21:02 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-07-07 15:21:02 +0100 |
commit | 0495a519499d6cefa289982a94d8f42de541c1f0 (patch) | |
tree | 94b6145daa41ca4d1d4a172f030071076852a09a /jobs/jobs.go | |
parent | 6e0f5f096986dad11ef182ddb51d4bfb0e558860 (diff) | |
download | chat-0495a519499d6cefa289982a94d8f42de541c1f0.tar.gz chat-0495a519499d6cefa289982a94d8f42de541c1f0.tar.bz2 chat-0495a519499d6cefa289982a94d8f42de541c1f0.zip |
PLT-6916: Redesign the jobs package and Jobserver. (#6733)
This commit redesigns the jobserver to be based around an architecture
of "workers", which carry out jobs of a particular type, and "jobs"
which are a unit of work carried by a particular worker. It also
introduces "schedulers" which are responsible for scheduling jobs of a
particular type automatically (jobs can also be scheduled manually when
apropriate).
Workers may be run many times, either in instances of the platform
binary, or the standalone jobserver binary. In any mattermost cluster,
only one instance of platform OR jobserver must run the schedulers. At
the moment this is controlled by a config variable, but in future will
be controlled through the cluster leader election process.
Diffstat (limited to 'jobs/jobs.go')
-rw-r--r-- | jobs/jobs.go | 122 |
1 files changed, 81 insertions, 41 deletions
diff --git a/jobs/jobs.go b/jobs/jobs.go index 8c84f4eea..58c2f2f13 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -4,71 +4,111 @@ package jobs import ( - "sync" + "context" + "time" l4g "github.com/alecthomas/log4go" - ejobs "github.com/mattermost/platform/einterfaces/jobs" "github.com/mattermost/platform/model" - "github.com/mattermost/platform/store" - "github.com/mattermost/platform/utils" ) -type Jobs struct { - startOnce sync.Once +const ( + CANCEL_WATCHER_POLLING_INTERVAL = 5000 +) - DataRetention model.Job - // SearchIndexing model.Job +func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *model.AppError) { + job := model.Job{ + Id: model.NewId(), + Type: jobType, + CreateAt: model.GetMillis(), + Status: model.JOB_STATUS_PENDING, + Data: jobData, + } - listenerId string + if result := <-Srv.Store.Job().Save(&job); result.Err != nil { + return nil, result.Err + } + + return &job, nil } -func InitJobs(s store.Store) *Jobs { - jobs := &Jobs{ - // SearchIndexing: MakeTestJob(s, "SearchIndexing"), +func ClaimJob(job *model.Job) (bool, *model.AppError) { + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + return false, result.Err + } else { + success := result.Data.(bool) + return success, nil } +} - if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { - jobs.DataRetention = dataRetentionInterface.MakeJob(s) - } +func SetJobProgress(jobId string, progress int64) (bool, *model.AppError) { + var job *model.Job - return jobs -} + if result := <-Srv.Store.Job().Get(jobId); result.Err != nil { + return false, result.Err + } else { + job = result.Data.(*model.Job) + } -func (jobs *Jobs) Start() *Jobs { - l4g.Info("Starting jobs") + job.Status = model.JOB_STATUS_IN_PROGRESS + job.Progress = progress - jobs.startOnce.Do(func() { - if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { - go jobs.DataRetention.Run() + if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + return false, result.Err + } else { + if !result.Data.(bool) { + return false, nil } + } - // go jobs.SearchIndexing.Run() - }) + return true, nil +} - jobs.listenerId = utils.AddConfigListener(jobs.handleConfigChange) +func SetJobSuccess(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS) + return result.Err +} - return jobs +func SetJobError(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) + return result.Err } -func (jobs *Jobs) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { - if jobs.DataRetention != nil { - if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable { - go jobs.DataRetention.Run() - } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable { - jobs.DataRetention.Stop() - } - } +func SetJobCanceled(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED) + return result.Err } -func (jobs *Jobs) Stop() *Jobs { - utils.RemoveConfigListener(jobs.listenerId) +func RequestCancellation(job *model.Job) *model.AppError { + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { + return result.Err + } else if result.Data.(bool) { + return nil + } - if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { - jobs.DataRetention.Stop() + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { + return result.Err + } else if result.Data.(bool) { + return nil } - // jobs.SearchIndexing.Stop() - l4g.Info("Stopped jobs") + return model.NewLocAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id) +} - return jobs +func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) { + for { + select { + case <-ctx.Done(): + l4g.Debug("CancellationWatcher for Job: %v Aborting as job has finished.", jobId) + return + case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond): + l4g.Debug("CancellationWatcher for Job: %v polling.", jobId) + if result := <-Srv.Store.Job().Get(jobId); result.Err == nil { + jobStatus := result.Data.(*model.Job) + if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED { + close(cancelChan) + return + } + } + } + } } |