diff options
Diffstat (limited to 'jobs/jobs.go')
-rw-r--r-- | jobs/jobs.go | 122 |
1 files changed, 81 insertions, 41 deletions
diff --git a/jobs/jobs.go b/jobs/jobs.go index 8c84f4eea..58c2f2f13 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -4,71 +4,111 @@ package jobs import ( - "sync" + "context" + "time" l4g "github.com/alecthomas/log4go" - ejobs "github.com/mattermost/platform/einterfaces/jobs" "github.com/mattermost/platform/model" - "github.com/mattermost/platform/store" - "github.com/mattermost/platform/utils" ) -type Jobs struct { - startOnce sync.Once +const ( + CANCEL_WATCHER_POLLING_INTERVAL = 5000 +) - DataRetention model.Job - // SearchIndexing model.Job +func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *model.AppError) { + job := model.Job{ + Id: model.NewId(), + Type: jobType, + CreateAt: model.GetMillis(), + Status: model.JOB_STATUS_PENDING, + Data: jobData, + } - listenerId string + if result := <-Srv.Store.Job().Save(&job); result.Err != nil { + return nil, result.Err + } + + return &job, nil } -func InitJobs(s store.Store) *Jobs { - jobs := &Jobs{ - // SearchIndexing: MakeTestJob(s, "SearchIndexing"), +func ClaimJob(job *model.Job) (bool, *model.AppError) { + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + return false, result.Err + } else { + success := result.Data.(bool) + return success, nil } +} - if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { - jobs.DataRetention = dataRetentionInterface.MakeJob(s) - } +func SetJobProgress(jobId string, progress int64) (bool, *model.AppError) { + var job *model.Job - return jobs -} + if result := <-Srv.Store.Job().Get(jobId); result.Err != nil { + return false, result.Err + } else { + job = result.Data.(*model.Job) + } -func (jobs *Jobs) Start() *Jobs { - l4g.Info("Starting jobs") + job.Status = model.JOB_STATUS_IN_PROGRESS + job.Progress = progress - jobs.startOnce.Do(func() { - if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { - go jobs.DataRetention.Run() + if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + return false, result.Err + } else { + if !result.Data.(bool) { + return false, nil } + } - // go jobs.SearchIndexing.Run() - }) + return true, nil +} - jobs.listenerId = utils.AddConfigListener(jobs.handleConfigChange) +func SetJobSuccess(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS) + return result.Err +} - return jobs +func SetJobError(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) + return result.Err } -func (jobs *Jobs) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { - if jobs.DataRetention != nil { - if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable { - go jobs.DataRetention.Run() - } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable { - jobs.DataRetention.Stop() - } - } +func SetJobCanceled(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED) + return result.Err } -func (jobs *Jobs) Stop() *Jobs { - utils.RemoveConfigListener(jobs.listenerId) +func RequestCancellation(job *model.Job) *model.AppError { + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { + return result.Err + } else if result.Data.(bool) { + return nil + } - if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { - jobs.DataRetention.Stop() + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { + return result.Err + } else if result.Data.(bool) { + return nil } - // jobs.SearchIndexing.Stop() - l4g.Info("Stopped jobs") + return model.NewLocAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id) +} - return jobs +func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) { + for { + select { + case <-ctx.Done(): + l4g.Debug("CancellationWatcher for Job: %v Aborting as job has finished.", jobId) + return + case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond): + l4g.Debug("CancellationWatcher for Job: %v polling.", jobId) + if result := <-Srv.Store.Job().Get(jobId); result.Err == nil { + jobStatus := result.Data.(*model.Job) + if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED { + close(cancelChan) + return + } + } + } + } } |