diff options
author | George Goldberg <george@gberg.me> | 2017-07-07 15:21:02 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-07-07 15:21:02 +0100 |
commit | 0495a519499d6cefa289982a94d8f42de541c1f0 (patch) | |
tree | 94b6145daa41ca4d1d4a172f030071076852a09a /jobs/testworker.go | |
parent | 6e0f5f096986dad11ef182ddb51d4bfb0e558860 (diff) | |
download | chat-0495a519499d6cefa289982a94d8f42de541c1f0.tar.gz chat-0495a519499d6cefa289982a94d8f42de541c1f0.tar.bz2 chat-0495a519499d6cefa289982a94d8f42de541c1f0.zip |
PLT-6916: Redesign the jobs package and Jobserver. (#6733)
This commit redesigns the jobserver to be based around an architecture
of "workers", which carry out jobs of a particular type, and "jobs"
which are a unit of work carried by a particular worker. It also
introduces "schedulers" which are responsible for scheduling jobs of a
particular type automatically (jobs can also be scheduled manually when
apropriate).
Workers may be run many times, either in instances of the platform
binary, or the standalone jobserver binary. In any mattermost cluster,
only one instance of platform OR jobserver must run the schedulers. At
the moment this is controlled by a config variable, but in future will
be controlled through the cluster leader election process.
Diffstat (limited to 'jobs/testworker.go')
-rw-r--r-- | jobs/testworker.go | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/jobs/testworker.go b/jobs/testworker.go new file mode 100644 index 000000000..f1c8a07a3 --- /dev/null +++ b/jobs/testworker.go @@ -0,0 +1,104 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "context" + "time" + + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/platform/model" +) + +type TestWorker struct { + name string + stop chan bool + stopped chan bool + jobs chan model.Job +} + +func MakeTestWorker(name string) *TestWorker { + return &TestWorker{ + name: name, + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + jobs: make(chan model.Job), + } +} + +func (worker *TestWorker) Run() { + l4g.Debug("Worker %v: Started", worker.name) + + defer func() { + l4g.Debug("Worker %v: Finished", worker.name) + worker.stopped <- true + }() + + for { + select { + case <-worker.stop: + l4g.Debug("Worker %v: Received stop signal", worker.name) + return + case job := <-worker.jobs: + l4g.Debug("Worker %v: Received a new candidate job.", worker.name) + worker.DoJob(&job) + } + } +} + +func (worker *TestWorker) DoJob(job *model.Job) { + if claimed, err := ClaimJob(job); err != nil { + l4g.Error("Job: %v: Error occurred while trying to claim job: %v", job.Id, err.Error()) + return + } else if !claimed { + return + } + + cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background()) + cancelWatcherChan := make(chan interface{}, 1) + go CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan) + + defer cancelCancelWatcher() + + counter := 0 + for { + select { + case <-cancelWatcherChan: + l4g.Debug("Job %v: Job has been canceled via CancellationWatcher.", job.Id) + if err := SetJobCanceled(job); err != nil { + l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error()) + } + return + case <-worker.stop: + l4g.Debug("Job %v: Job has been canceled via Worker Stop.", job.Id) + if err := SetJobCanceled(job); err != nil { + l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error()) + } + return + case <-time.After(5 * time.Second): + counter++ + if counter > 10 { + l4g.Debug("Job %v: Job completed.", job.Id) + if err := SetJobSuccess(job); err != nil { + l4g.Error("Failed to mark job: %v as succeeded. Error: %v", job.Id, err.Error()) + } + return + } else { + if _, err := SetJobProgress(job.Id, int64(counter*10)); err != nil { + l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error()) + } + } + } + } +} + +func (worker *TestWorker) Stop() { + l4g.Debug("Worker %v: Stopping", worker.name) + worker.stop <- true + <-worker.stopped +} + +func (worker *TestWorker) JobChannel() chan<- model.Job { + return worker.jobs +} |