diff options
Diffstat (limited to 'jobs')
-rw-r--r-- | jobs/jobs.go | 122 | ||||
-rw-r--r-- | jobs/jobs_watcher.go | 85 | ||||
-rw-r--r-- | jobs/jobserver/jobserver.go | 15 | ||||
-rw-r--r-- | jobs/schedulers.go | 68 | ||||
-rw-r--r-- | jobs/server.go | 31 | ||||
-rw-r--r-- | jobs/testjob.go | 54 | ||||
-rw-r--r-- | jobs/testscheduler.go | 58 | ||||
-rw-r--r-- | jobs/testworker.go | 104 | ||||
-rw-r--r-- | jobs/workers.go | 79 |
9 files changed, 511 insertions, 105 deletions
diff --git a/jobs/jobs.go b/jobs/jobs.go index 8c84f4eea..58c2f2f13 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -4,71 +4,111 @@ package jobs import ( - "sync" + "context" + "time" l4g "github.com/alecthomas/log4go" - ejobs "github.com/mattermost/platform/einterfaces/jobs" "github.com/mattermost/platform/model" - "github.com/mattermost/platform/store" - "github.com/mattermost/platform/utils" ) -type Jobs struct { - startOnce sync.Once +const ( + CANCEL_WATCHER_POLLING_INTERVAL = 5000 +) - DataRetention model.Job - // SearchIndexing model.Job +func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *model.AppError) { + job := model.Job{ + Id: model.NewId(), + Type: jobType, + CreateAt: model.GetMillis(), + Status: model.JOB_STATUS_PENDING, + Data: jobData, + } - listenerId string + if result := <-Srv.Store.Job().Save(&job); result.Err != nil { + return nil, result.Err + } + + return &job, nil } -func InitJobs(s store.Store) *Jobs { - jobs := &Jobs{ - // SearchIndexing: MakeTestJob(s, "SearchIndexing"), +func ClaimJob(job *model.Job) (bool, *model.AppError) { + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + return false, result.Err + } else { + success := result.Data.(bool) + return success, nil } +} - if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { - jobs.DataRetention = dataRetentionInterface.MakeJob(s) - } +func SetJobProgress(jobId string, progress int64) (bool, *model.AppError) { + var job *model.Job - return jobs -} + if result := <-Srv.Store.Job().Get(jobId); result.Err != nil { + return false, result.Err + } else { + job = result.Data.(*model.Job) + } -func (jobs *Jobs) Start() *Jobs { - l4g.Info("Starting jobs") + job.Status = model.JOB_STATUS_IN_PROGRESS + job.Progress = progress - jobs.startOnce.Do(func() { - if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { - go jobs.DataRetention.Run() + if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + return false, result.Err + } else { + if !result.Data.(bool) { + return false, nil } + } - // go jobs.SearchIndexing.Run() - }) + return true, nil +} - jobs.listenerId = utils.AddConfigListener(jobs.handleConfigChange) +func SetJobSuccess(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS) + return result.Err +} - return jobs +func SetJobError(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) + return result.Err } -func (jobs *Jobs) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { - if jobs.DataRetention != nil { - if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable { - go jobs.DataRetention.Run() - } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable { - jobs.DataRetention.Stop() - } - } +func SetJobCanceled(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED) + return result.Err } -func (jobs *Jobs) Stop() *Jobs { - utils.RemoveConfigListener(jobs.listenerId) +func RequestCancellation(job *model.Job) *model.AppError { + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { + return result.Err + } else if result.Data.(bool) { + return nil + } - if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { - jobs.DataRetention.Stop() + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { + return result.Err + } else if result.Data.(bool) { + return nil } - // jobs.SearchIndexing.Stop() - l4g.Info("Stopped jobs") + return model.NewLocAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id) +} - return jobs +func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) { + for { + select { + case <-ctx.Done(): + l4g.Debug("CancellationWatcher for Job: %v Aborting as job has finished.", jobId) + return + case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond): + l4g.Debug("CancellationWatcher for Job: %v polling.", jobId) + if result := <-Srv.Store.Job().Get(jobId); result.Err == nil { + jobStatus := result.Data.(*model.Job) + if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED { + close(cancelChan) + return + } + } + } + } } diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go new file mode 100644 index 000000000..ada957ccc --- /dev/null +++ b/jobs/jobs_watcher.go @@ -0,0 +1,85 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "math/rand" + "time" + + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/platform/model" +) + +const ( + WATCHER_POLLING_INTERVAL = 15000 +) + +type Watcher struct { + workers *Workers + + stop chan bool + stopped chan bool +} + +func MakeWatcher(workers *Workers) *Watcher { + return &Watcher{ + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + workers: workers, + } +} + +func (watcher *Watcher) Start() { + l4g.Debug("Watcher Started") + + // Delay for some random number of milliseconds before starting to ensure that multiple + // instances of the jobserver don't poll at a time too close to each other. + rand.Seed(time.Now().UTC().UnixNano()) + _ = <-time.After(time.Duration(rand.Intn(WATCHER_POLLING_INTERVAL)) * time.Millisecond) + + defer func(){ + l4g.Debug("Watcher Finished") + watcher.stopped <- true + }() + + for { + select { + case <-watcher.stop: + l4g.Debug("Watcher: Received stop signal") + return + case <-time.After(WATCHER_POLLING_INTERVAL * time.Millisecond): + watcher.PollAndNotify() + } + } +} + +func (watcher *Watcher) Stop() { + l4g.Debug("Watcher Stopping") + watcher.stop <- true + <-watcher.stopped +} + +func (watcher *Watcher) PollAndNotify() { + if result := <-Srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil { + l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error()) + } else { + jobStatuses := result.Data.([]*model.Job) + + for _, js := range jobStatuses { + j := model.Job{ + Type: js.Type, + Id: js.Id, + } + + if js.Type == model.JOB_TYPE_DATA_RETENTION { + if watcher.workers.DataRetention != nil { + select { + case watcher.workers.DataRetention.JobChannel() <- j: + default: + } + } + } + } + } +} diff --git a/jobs/jobserver/jobserver.go b/jobs/jobserver/jobserver.go index 5f491a815..aabe5d3b2 100644 --- a/jobs/jobserver/jobserver.go +++ b/jobs/jobserver/jobserver.go @@ -16,22 +16,20 @@ import ( _ "github.com/mattermost/platform/imports" ) -var Srv jobs.JobServer - func main() { // Initialize utils.InitAndLoadConfig("config.json") defer l4g.Close() - Srv.Store = store.NewLayeredStore() - defer Srv.Store.Close() + jobs.Srv.Store = store.NewLayeredStore() + defer jobs.Srv.Store.Close() - Srv.LoadLicense() + jobs.Srv.LoadLicense() // Run jobs l4g.Info("Starting Mattermost job server") - Srv.Jobs = jobs.InitJobs(Srv.Store) - Srv.Jobs.Start() + jobs.Srv.StartWorkers() + jobs.Srv.StartSchedulers() var signalChan chan os.Signal = make(chan os.Signal) signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) @@ -40,7 +38,8 @@ func main() { // Cleanup anything that isn't handled by a defer statement l4g.Info("Stopping Mattermost job server") - Srv.Jobs.Stop() + jobs.Srv.StopSchedulers() + jobs.Srv.StopWorkers() l4g.Info("Stopped Mattermost job server") } diff --git a/jobs/schedulers.go b/jobs/schedulers.go new file mode 100644 index 000000000..73ec6661a --- /dev/null +++ b/jobs/schedulers.go @@ -0,0 +1,68 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "sync" + + l4g "github.com/alecthomas/log4go" + ejobs "github.com/mattermost/platform/einterfaces/jobs" + + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/utils" +) + +type Schedulers struct { + startOnce sync.Once + + DataRetention model.Scheduler + + listenerId string +} + +func InitSchedulers() *Schedulers { + schedulers := &Schedulers{} + + if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { + schedulers.DataRetention = dataRetentionInterface.MakeScheduler() + } + + return schedulers +} + +func (schedulers *Schedulers) Start() *Schedulers { + l4g.Info("Starting schedulers") + + schedulers.startOnce.Do(func() { + if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { + go schedulers.DataRetention.Run() + } + }) + + schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange) + + return schedulers +} + +func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { + if schedulers.DataRetention != nil { + if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable { + go schedulers.DataRetention.Run() + } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable { + schedulers.DataRetention.Stop() + } + } +} + +func (schedulers *Schedulers) Stop() *Schedulers { + utils.RemoveConfigListener(schedulers.listenerId) + + if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { + schedulers.DataRetention.Stop() + } + + l4g.Info("Stopped schedulers") + + return schedulers +} diff --git a/jobs/server.go b/jobs/server.go index dd3448842..7920cb2d5 100644 --- a/jobs/server.go +++ b/jobs/server.go @@ -11,10 +11,13 @@ import ( ) type JobServer struct { - Store store.Store - Jobs *Jobs + Store store.Store + Workers *Workers + Schedulers *Schedulers } +var Srv JobServer + func (server *JobServer) LoadLicense() { licenseId := "" if result := <-server.Store.System().Get(); result.Err == nil { @@ -44,3 +47,27 @@ func (server *JobServer) LoadLicense() { l4g.Info(utils.T("mattermost.load_license.find.warn")) } } + +func (server *JobServer) StartWorkers() { + if *utils.Cfg.JobSettings.RunJobs { + Srv.Workers = InitWorkers().Start() + } +} + +func (server *JobServer) StartSchedulers() { + if *utils.Cfg.JobSettings.RunJobs { + Srv.Schedulers = InitSchedulers().Start() + } +} + +func (server *JobServer) StopWorkers() { + if Srv.Workers != nil { + Srv.Workers.Stop() + } +} + +func (server *JobServer) StopSchedulers() { + if Srv.Schedulers != nil { + Srv.Schedulers.Stop() + } +} diff --git a/jobs/testjob.go b/jobs/testjob.go deleted file mode 100644 index 59d5274e5..000000000 --- a/jobs/testjob.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. -// See License.txt for license information. - -package jobs - -import ( - "time" - - l4g "github.com/alecthomas/log4go" - "github.com/mattermost/platform/store" -) - -type TestJob struct { - store store.Store - - name string - stop chan bool - stopped chan bool -} - -func MakeTestJob(s store.Store, name string) *TestJob { - return &TestJob{ - store: s, - name: name, - stop: make(chan bool, 1), - stopped: make(chan bool, 1), - } -} - -func (job *TestJob) Run() { - l4g.Debug("Job %v: Started", job.name) - - running := true - for running { - l4g.Debug("Job %v: Tick", job.name) - - select { - case <-job.stop: - l4g.Debug("Job %v: Received stop signal", job.name) - running = false - case <-time.After(10 * time.Second): - continue - } - } - - l4g.Debug("Job %v: Finished", job.name) - job.stopped <- true -} - -func (job *TestJob) Stop() { - l4g.Debug("Job %v: Stopping", job.name) - job.stop <- true - <-job.stopped -} diff --git a/jobs/testscheduler.go b/jobs/testscheduler.go new file mode 100644 index 000000000..31b5d144c --- /dev/null +++ b/jobs/testscheduler.go @@ -0,0 +1,58 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "time" + + l4g "github.com/alecthomas/log4go" +) + +type TestScheduler struct { + name string + jobType string + stop chan bool + stopped chan bool +} + +func MakeTestScheduler(name string, jobType string) *TestScheduler { + return &TestScheduler{ + name: name, + jobType: jobType, + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + } +} + +func (scheduler *TestScheduler) Run() { + l4g.Debug("Scheduler %v: Started", scheduler.name) + + defer func(){ + l4g.Debug("Scheduler %v: Finished", scheduler.name) + scheduler.stopped <- true + }() + + for { + select { + case <-scheduler.stop: + l4g.Debug("Scheduler %v: Received stop signal", scheduler.name) + return + case <-time.After(86400 * time.Second): + l4g.Debug("Scheduler: %v: Scheduling new job", scheduler.name) + scheduler.AddJob() + } + } +} + +func (scheduler *TestScheduler) AddJob() { + if _, err := CreateJob(scheduler.jobType, nil); err != nil { + l4g.Error("Scheduler %v: failed to create job: %v", scheduler.name, err) + } +} + +func (scheduler *TestScheduler) Stop() { + l4g.Debug("Scheduler %v: Stopping", scheduler.name) + scheduler.stop <- true + <-scheduler.stopped +} diff --git a/jobs/testworker.go b/jobs/testworker.go new file mode 100644 index 000000000..f1c8a07a3 --- /dev/null +++ b/jobs/testworker.go @@ -0,0 +1,104 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "context" + "time" + + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/platform/model" +) + +type TestWorker struct { + name string + stop chan bool + stopped chan bool + jobs chan model.Job +} + +func MakeTestWorker(name string) *TestWorker { + return &TestWorker{ + name: name, + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + jobs: make(chan model.Job), + } +} + +func (worker *TestWorker) Run() { + l4g.Debug("Worker %v: Started", worker.name) + + defer func() { + l4g.Debug("Worker %v: Finished", worker.name) + worker.stopped <- true + }() + + for { + select { + case <-worker.stop: + l4g.Debug("Worker %v: Received stop signal", worker.name) + return + case job := <-worker.jobs: + l4g.Debug("Worker %v: Received a new candidate job.", worker.name) + worker.DoJob(&job) + } + } +} + +func (worker *TestWorker) DoJob(job *model.Job) { + if claimed, err := ClaimJob(job); err != nil { + l4g.Error("Job: %v: Error occurred while trying to claim job: %v", job.Id, err.Error()) + return + } else if !claimed { + return + } + + cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background()) + cancelWatcherChan := make(chan interface{}, 1) + go CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan) + + defer cancelCancelWatcher() + + counter := 0 + for { + select { + case <-cancelWatcherChan: + l4g.Debug("Job %v: Job has been canceled via CancellationWatcher.", job.Id) + if err := SetJobCanceled(job); err != nil { + l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error()) + } + return + case <-worker.stop: + l4g.Debug("Job %v: Job has been canceled via Worker Stop.", job.Id) + if err := SetJobCanceled(job); err != nil { + l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error()) + } + return + case <-time.After(5 * time.Second): + counter++ + if counter > 10 { + l4g.Debug("Job %v: Job completed.", job.Id) + if err := SetJobSuccess(job); err != nil { + l4g.Error("Failed to mark job: %v as succeeded. Error: %v", job.Id, err.Error()) + } + return + } else { + if _, err := SetJobProgress(job.Id, int64(counter*10)); err != nil { + l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error()) + } + } + } + } +} + +func (worker *TestWorker) Stop() { + l4g.Debug("Worker %v: Stopping", worker.name) + worker.stop <- true + <-worker.stopped +} + +func (worker *TestWorker) JobChannel() chan<- model.Job { + return worker.jobs +} diff --git a/jobs/workers.go b/jobs/workers.go new file mode 100644 index 000000000..a42ec4607 --- /dev/null +++ b/jobs/workers.go @@ -0,0 +1,79 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "sync" + + l4g "github.com/alecthomas/log4go" + ejobs "github.com/mattermost/platform/einterfaces/jobs" + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/utils" +) + +type Workers struct { + startOnce sync.Once + watcher *Watcher + + DataRetention model.Worker + // SearchIndexing model.Job + + listenerId string +} + +func InitWorkers() *Workers { + workers := &Workers{ + // SearchIndexing: MakeTestJob(s, "SearchIndexing"), + } + workers.watcher = MakeWatcher(workers) + + if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { + workers.DataRetention = dataRetentionInterface.MakeWorker() + } + + return workers +} + +func (workers *Workers) Start() *Workers { + l4g.Info("Starting workers") + + workers.startOnce.Do(func() { + if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { + go workers.DataRetention.Run() + } + + // go workers.SearchIndexing.Run() + + go workers.watcher.Start() + }) + + workers.listenerId = utils.AddConfigListener(workers.handleConfigChange) + + return workers +} + +func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { + if workers.DataRetention != nil { + if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable { + go workers.DataRetention.Run() + } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable { + workers.DataRetention.Stop() + } + } +} + +func (workers *Workers) Stop() *Workers { + utils.RemoveConfigListener(workers.listenerId) + + workers.watcher.Stop() + + if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { + workers.DataRetention.Stop() + } + // workers.SearchIndexing.Stop() + + l4g.Info("Stopped workers") + + return workers +} |