diff options
-rw-r--r-- | api4/job.go | 14 | ||||
-rw-r--r-- | api4/job_test.go | 30 | ||||
-rw-r--r-- | app/admin.go | 3 | ||||
-rw-r--r-- | app/job.go | 16 | ||||
-rw-r--r-- | app/job_test.go | 18 | ||||
-rw-r--r-- | cmd/platform/server.go | 7 | ||||
-rw-r--r-- | config/config.json | 4 | ||||
-rw-r--r-- | einterfaces/jobs/data_retention.go | 4 | ||||
-rw-r--r-- | i18n/en.json | 24 | ||||
-rw-r--r-- | jobs/jobs.go | 122 | ||||
-rw-r--r-- | jobs/jobs_watcher.go | 85 | ||||
-rw-r--r-- | jobs/jobserver/jobserver.go | 15 | ||||
-rw-r--r-- | jobs/schedulers.go | 68 | ||||
-rw-r--r-- | jobs/server.go | 31 | ||||
-rw-r--r-- | jobs/testjob.go | 54 | ||||
-rw-r--r-- | jobs/testscheduler.go | 58 | ||||
-rw-r--r-- | jobs/testworker.go | 104 | ||||
-rw-r--r-- | jobs/workers.go | 79 | ||||
-rw-r--r-- | model/client4.go | 12 | ||||
-rw-r--r-- | model/config.go | 16 | ||||
-rw-r--r-- | model/job.go | 79 | ||||
-rw-r--r-- | model/job_status.go | 59 | ||||
-rw-r--r-- | store/layered_store.go | 4 | ||||
-rw-r--r-- | store/sql_job_status_store.go | 190 | ||||
-rw-r--r-- | store/sql_job_status_store_test.go | 151 | ||||
-rw-r--r-- | store/sql_job_store.go | 327 | ||||
-rw-r--r-- | store/sql_job_store_test.go | 341 | ||||
-rw-r--r-- | store/sql_store.go | 1 | ||||
-rw-r--r-- | store/sql_supplier.go | 10 | ||||
-rw-r--r-- | store/sql_upgrade.go | 3 | ||||
-rw-r--r-- | store/store.go | 10 |
31 files changed, 1363 insertions, 576 deletions
diff --git a/api4/job.go b/api4/job.go index 8610d9e74..e6c17c42d 100644 --- a/api4/job.go +++ b/api4/job.go @@ -14,11 +14,11 @@ import ( func InitJob() { l4g.Info("Initializing job API routes") - BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}/statuses", ApiSessionRequired(getJobStatusesByType)).Methods("GET") - BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/status", ApiSessionRequired(getJobStatus)).Methods("GET") + BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}/statuses", ApiSessionRequired(getJobsByType)).Methods("GET") + BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/status", ApiSessionRequired(getJob)).Methods("GET") } -func getJobStatus(c *Context, w http.ResponseWriter, r *http.Request) { +func getJob(c *Context, w http.ResponseWriter, r *http.Request) { c.RequireJobId() if c.Err != nil { return @@ -29,7 +29,7 @@ func getJobStatus(c *Context, w http.ResponseWriter, r *http.Request) { return } - if status, err := app.GetJobStatus(c.Params.JobId); err != nil { + if status, err := app.GetJob(c.Params.JobId); err != nil { c.Err = err return } else { @@ -37,7 +37,7 @@ func getJobStatus(c *Context, w http.ResponseWriter, r *http.Request) { } } -func getJobStatusesByType(c *Context, w http.ResponseWriter, r *http.Request) { +func getJobsByType(c *Context, w http.ResponseWriter, r *http.Request) { c.RequireJobType() if c.Err != nil { return @@ -48,10 +48,10 @@ func getJobStatusesByType(c *Context, w http.ResponseWriter, r *http.Request) { return } - if statuses, err := app.GetJobStatusesByTypePage(c.Params.JobType, c.Params.Page, c.Params.PerPage); err != nil { + if statuses, err := app.GetJobsByTypePage(c.Params.JobType, c.Params.Page, c.Params.PerPage); err != nil { c.Err = err return } else { - w.Write([]byte(model.JobStatusesToJson(statuses))) + w.Write([]byte(model.JobsToJson(statuses))) } } diff --git a/api4/job_test.go b/api4/job_test.go index 0f39fc306..8bbea83e1 100644 --- a/api4/job_test.go +++ b/api4/job_test.go @@ -16,30 +16,30 @@ func TestGetJobStatus(t *testing.T) { th := Setup().InitBasic().InitSystemAdmin() defer TearDown() - status := &model.JobStatus{ + status := &model.Job{ Id: model.NewId(), Status: model.NewId(), } - if result := <-app.Srv.Store.JobStatus().SaveOrUpdate(status); result.Err != nil { + if result := <-app.Srv.Store.Job().Save(status); result.Err != nil { t.Fatal(result.Err) } - defer app.Srv.Store.JobStatus().Delete(status.Id) + defer app.Srv.Store.Job().Delete(status.Id) - received, resp := th.SystemAdminClient.GetJobStatus(status.Id) + received, resp := th.SystemAdminClient.GetJob(status.Id) CheckNoError(t, resp) if received.Id != status.Id || received.Status != status.Status { t.Fatal("incorrect job status received") } - _, resp = th.SystemAdminClient.GetJobStatus("1234") + _, resp = th.SystemAdminClient.GetJob("1234") CheckBadRequestStatus(t, resp) - _, resp = th.Client.GetJobStatus(status.Id) + _, resp = th.Client.GetJob(status.Id) CheckForbiddenStatus(t, resp) - _, resp = th.SystemAdminClient.GetJobStatus(model.NewId()) + _, resp = th.SystemAdminClient.GetJob(model.NewId()) CheckNotFoundStatus(t, resp) } @@ -49,7 +49,7 @@ func TestGetJobStatusesByType(t *testing.T) { jobType := model.NewId() - statuses := []*model.JobStatus{ + statuses := []*model.Job{ { Id: model.NewId(), Type: jobType, @@ -68,11 +68,11 @@ func TestGetJobStatusesByType(t *testing.T) { } for _, status := range statuses { - store.Must(app.Srv.Store.JobStatus().SaveOrUpdate(status)) - defer app.Srv.Store.JobStatus().Delete(status.Id) + store.Must(app.Srv.Store.Job().Save(status)) + defer app.Srv.Store.Job().Delete(status.Id) } - received, resp := th.SystemAdminClient.GetJobStatusesByType(jobType, 0, 2) + received, resp := th.SystemAdminClient.GetJobsByType(jobType, 0, 2) CheckNoError(t, resp) if len(received) != 2 { @@ -83,7 +83,7 @@ func TestGetJobStatusesByType(t *testing.T) { t.Fatal("should've received second newest job second") } - received, resp = th.SystemAdminClient.GetJobStatusesByType(jobType, 1, 2) + received, resp = th.SystemAdminClient.GetJobsByType(jobType, 1, 2) CheckNoError(t, resp) if len(received) != 1 { @@ -92,12 +92,12 @@ func TestGetJobStatusesByType(t *testing.T) { t.Fatal("should've received oldest job last") } - _, resp = th.SystemAdminClient.GetJobStatusesByType("", 0, 60) + _, resp = th.SystemAdminClient.GetJobsByType("", 0, 60) CheckNotFoundStatus(t, resp) - _, resp = th.SystemAdminClient.GetJobStatusesByType(strings.Repeat("a", 33), 0, 60) + _, resp = th.SystemAdminClient.GetJobsByType(strings.Repeat("a", 33), 0, 60) CheckBadRequestStatus(t, resp) - _, resp = th.Client.GetJobStatusesByType(jobType, 0, 60) + _, resp = th.Client.GetJobsByType(jobType, 0, 60) CheckForbiddenStatus(t, resp) } diff --git a/app/admin.go b/app/admin.go index 8b7d64b53..6fbe150c4 100644 --- a/app/admin.go +++ b/app/admin.go @@ -16,6 +16,7 @@ import ( "github.com/mattermost/platform/model" "github.com/mattermost/platform/store" "github.com/mattermost/platform/utils" + "github.com/mattermost/platform/jobs" ) func GetLogs(page, perPage int) ([]string, *model.AppError) { @@ -187,6 +188,8 @@ func RecycleDatabaseConnection() { l4g.Warn(utils.T("api.admin.recycle_db_start.warn")) Srv.Store = store.NewLayeredStore() + jobs.Srv.Store = Srv.Store + time.Sleep(20 * time.Second) oldStore.Close() diff --git a/app/job.go b/app/job.go index 00439e4d2..c625ce15f 100644 --- a/app/job.go +++ b/app/job.go @@ -7,22 +7,22 @@ import ( "github.com/mattermost/platform/model" ) -func GetJobStatus(id string) (*model.JobStatus, *model.AppError) { - if result := <-Srv.Store.JobStatus().Get(id); result.Err != nil { +func GetJob(id string) (*model.Job, *model.AppError) { + if result := <-Srv.Store.Job().Get(id); result.Err != nil { return nil, result.Err } else { - return result.Data.(*model.JobStatus), nil + return result.Data.(*model.Job), nil } } -func GetJobStatusesByTypePage(jobType string, page int, perPage int) ([]*model.JobStatus, *model.AppError) { - return GetJobStatusesByType(jobType, page*perPage, perPage) +func GetJobsByTypePage(jobType string, page int, perPage int) ([]*model.Job, *model.AppError) { + return GetJobsByType(jobType, page*perPage, perPage) } -func GetJobStatusesByType(jobType string, offset int, limit int) ([]*model.JobStatus, *model.AppError) { - if result := <-Srv.Store.JobStatus().GetAllByTypePage(jobType, offset, limit); result.Err != nil { +func GetJobsByType(jobType string, offset int, limit int) ([]*model.Job, *model.AppError) { + if result := <-Srv.Store.Job().GetAllByTypePage(jobType, offset, limit); result.Err != nil { return nil, result.Err } else { - return result.Data.([]*model.JobStatus), nil + return result.Data.([]*model.Job), nil } } diff --git a/app/job_test.go b/app/job_test.go index 20e9dee8a..ced65788f 100644 --- a/app/job_test.go +++ b/app/job_test.go @@ -13,17 +13,17 @@ import ( func TestGetJobStatus(t *testing.T) { Setup() - status := &model.JobStatus{ + status := &model.Job{ Id: model.NewId(), Status: model.NewId(), } - if result := <-Srv.Store.JobStatus().SaveOrUpdate(status); result.Err != nil { + if result := <-Srv.Store.Job().Save(status); result.Err != nil { t.Fatal(result.Err) } - defer Srv.Store.JobStatus().Delete(status.Id) + defer Srv.Store.Job().Delete(status.Id) - if received, err := GetJobStatus(status.Id); err != nil { + if received, err := GetJob(status.Id); err != nil { t.Fatal(err) } else if received.Id != status.Id || received.Status != status.Status { t.Fatal("inccorrect job status received") @@ -35,7 +35,7 @@ func TestGetJobStatusesByType(t *testing.T) { jobType := model.NewId() - statuses := []*model.JobStatus{ + statuses := []*model.Job{ { Id: model.NewId(), Type: jobType, @@ -54,11 +54,11 @@ func TestGetJobStatusesByType(t *testing.T) { } for _, status := range statuses { - store.Must(Srv.Store.JobStatus().SaveOrUpdate(status)) - defer Srv.Store.JobStatus().Delete(status.Id) + store.Must(Srv.Store.Job().Save(status)) + defer Srv.Store.Job().Delete(status.Id) } - if received, err := GetJobStatusesByType(jobType, 0, 2); err != nil { + if received, err := GetJobsByType(jobType, 0, 2); err != nil { t.Fatal(err) } else if len(received) != 2 { t.Fatal("received wrong number of statuses") @@ -68,7 +68,7 @@ func TestGetJobStatusesByType(t *testing.T) { t.Fatal("should've received second newest job second") } - if received, err := GetJobStatusesByType(jobType, 2, 2); err != nil { + if received, err := GetJobsByType(jobType, 2, 2); err != nil { t.Fatal(err) } else if len(received) != 1 { t.Fatal("received wrong number of statuses") diff --git a/cmd/platform/server.go b/cmd/platform/server.go index 2eedbd54a..1edb6c2f3 100644 --- a/cmd/platform/server.go +++ b/cmd/platform/server.go @@ -126,7 +126,9 @@ func runServer(configFileLocation string) { } } - jobs := jobs.InitJobs(app.Srv.Store).Start() + jobs.Srv.Store = app.Srv.Store + jobs.Srv.StartWorkers() + jobs.Srv.StartSchedulers() // wait for kill signal before attempting to gracefully shutdown // the running service @@ -142,7 +144,8 @@ func runServer(configFileLocation string) { einterfaces.GetMetricsInterface().StopServer() } - jobs.Stop() + jobs.Srv.StopSchedulers() + jobs.Srv.StopWorkers() app.StopServer() } diff --git a/config/config.json b/config/config.json index 3401a5e4f..56bd3d9fa 100644 --- a/config/config.json +++ b/config/config.json @@ -289,5 +289,9 @@ }, "DataRetentionSettings": { "Enable": false + }, + "JobSettings": { + "RunJobs": true, + "RunScheduler": true } } diff --git a/einterfaces/jobs/data_retention.go b/einterfaces/jobs/data_retention.go index 340ed1b88..442f667fa 100644 --- a/einterfaces/jobs/data_retention.go +++ b/einterfaces/jobs/data_retention.go @@ -5,11 +5,11 @@ package jobs import ( "github.com/mattermost/platform/model" - "github.com/mattermost/platform/store" ) type DataRetentionInterface interface { - MakeJob(store store.Store) model.Job + MakeWorker() model.Worker + MakeScheduler() model.Scheduler } var theDataRetentionInterface DataRetentionInterface diff --git a/i18n/en.json b/i18n/en.json index 7d23a13c1..03e833fa3 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -3744,6 +3744,10 @@ "translation": "Page not found" }, { + "id": "jobs.request_cancellation.status.error", + "translation": "Could not request cancellation for job that is not in a cancelable state." + }, + { "id": "manaultesting.get_channel_id.no_found.debug", "translation": "Could not find channel: %v, %v possibilities searched" }, @@ -5292,24 +5296,24 @@ "translation": "We couldn't save or update the file info" }, { - "id": "store.sql_job_status.delete_by_type.app_error", - "translation": "We couldn't delete the job status" + "id": "store.sql_job.delete.app_error", + "translation": "We couldn't delete the job" }, { - "id": "store.sql_job_status.get.app_error", - "translation": "We couldn't get the job status" + "id": "store.sql_job.get.app_error", + "translation": "We couldn't get the job" }, { - "id": "store.sql_job_status.get_all.app_error", - "translation": "We couldn't get all job statuses" + "id": "store.sql_job.get_all.app_error", + "translation": "We couldn't get the jobs" }, { - "id": "store.sql_job_status.save.app_error", - "translation": "We couldn't save the job status" + "id": "store.sql_job.save.app_error", + "translation": "We couldn't save the job" }, { - "id": "store.sql_job_status.update.app_error", - "translation": "We couldn't update the job status" + "id": "store.sql_job.update.app_error", + "translation": "We couldn't update the job" }, { "id": "store.sql_license.get.app_error", diff --git a/jobs/jobs.go b/jobs/jobs.go index 8c84f4eea..58c2f2f13 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -4,71 +4,111 @@ package jobs import ( - "sync" + "context" + "time" l4g "github.com/alecthomas/log4go" - ejobs "github.com/mattermost/platform/einterfaces/jobs" "github.com/mattermost/platform/model" - "github.com/mattermost/platform/store" - "github.com/mattermost/platform/utils" ) -type Jobs struct { - startOnce sync.Once +const ( + CANCEL_WATCHER_POLLING_INTERVAL = 5000 +) - DataRetention model.Job - // SearchIndexing model.Job +func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *model.AppError) { + job := model.Job{ + Id: model.NewId(), + Type: jobType, + CreateAt: model.GetMillis(), + Status: model.JOB_STATUS_PENDING, + Data: jobData, + } - listenerId string + if result := <-Srv.Store.Job().Save(&job); result.Err != nil { + return nil, result.Err + } + + return &job, nil } -func InitJobs(s store.Store) *Jobs { - jobs := &Jobs{ - // SearchIndexing: MakeTestJob(s, "SearchIndexing"), +func ClaimJob(job *model.Job) (bool, *model.AppError) { + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + return false, result.Err + } else { + success := result.Data.(bool) + return success, nil } +} - if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { - jobs.DataRetention = dataRetentionInterface.MakeJob(s) - } +func SetJobProgress(jobId string, progress int64) (bool, *model.AppError) { + var job *model.Job - return jobs -} + if result := <-Srv.Store.Job().Get(jobId); result.Err != nil { + return false, result.Err + } else { + job = result.Data.(*model.Job) + } -func (jobs *Jobs) Start() *Jobs { - l4g.Info("Starting jobs") + job.Status = model.JOB_STATUS_IN_PROGRESS + job.Progress = progress - jobs.startOnce.Do(func() { - if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { - go jobs.DataRetention.Run() + if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + return false, result.Err + } else { + if !result.Data.(bool) { + return false, nil } + } - // go jobs.SearchIndexing.Run() - }) + return true, nil +} - jobs.listenerId = utils.AddConfigListener(jobs.handleConfigChange) +func SetJobSuccess(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS) + return result.Err +} - return jobs +func SetJobError(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) + return result.Err } -func (jobs *Jobs) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { - if jobs.DataRetention != nil { - if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable { - go jobs.DataRetention.Run() - } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable { - jobs.DataRetention.Stop() - } - } +func SetJobCanceled(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED) + return result.Err } -func (jobs *Jobs) Stop() *Jobs { - utils.RemoveConfigListener(jobs.listenerId) +func RequestCancellation(job *model.Job) *model.AppError { + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { + return result.Err + } else if result.Data.(bool) { + return nil + } - if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { - jobs.DataRetention.Stop() + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { + return result.Err + } else if result.Data.(bool) { + return nil } - // jobs.SearchIndexing.Stop() - l4g.Info("Stopped jobs") + return model.NewLocAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id) +} - return jobs +func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) { + for { + select { + case <-ctx.Done(): + l4g.Debug("CancellationWatcher for Job: %v Aborting as job has finished.", jobId) + return + case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond): + l4g.Debug("CancellationWatcher for Job: %v polling.", jobId) + if result := <-Srv.Store.Job().Get(jobId); result.Err == nil { + jobStatus := result.Data.(*model.Job) + if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED { + close(cancelChan) + return + } + } + } + } } diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go new file mode 100644 index 000000000..ada957ccc --- /dev/null +++ b/jobs/jobs_watcher.go @@ -0,0 +1,85 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "math/rand" + "time" + + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/platform/model" +) + +const ( + WATCHER_POLLING_INTERVAL = 15000 +) + +type Watcher struct { + workers *Workers + + stop chan bool + stopped chan bool +} + +func MakeWatcher(workers *Workers) *Watcher { + return &Watcher{ + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + workers: workers, + } +} + +func (watcher *Watcher) Start() { + l4g.Debug("Watcher Started") + + // Delay for some random number of milliseconds before starting to ensure that multiple + // instances of the jobserver don't poll at a time too close to each other. + rand.Seed(time.Now().UTC().UnixNano()) + _ = <-time.After(time.Duration(rand.Intn(WATCHER_POLLING_INTERVAL)) * time.Millisecond) + + defer func(){ + l4g.Debug("Watcher Finished") + watcher.stopped <- true + }() + + for { + select { + case <-watcher.stop: + l4g.Debug("Watcher: Received stop signal") + return + case <-time.After(WATCHER_POLLING_INTERVAL * time.Millisecond): + watcher.PollAndNotify() + } + } +} + +func (watcher *Watcher) Stop() { + l4g.Debug("Watcher Stopping") + watcher.stop <- true + <-watcher.stopped +} + +func (watcher *Watcher) PollAndNotify() { + if result := <-Srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil { + l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error()) + } else { + jobStatuses := result.Data.([]*model.Job) + + for _, js := range jobStatuses { + j := model.Job{ + Type: js.Type, + Id: js.Id, + } + + if js.Type == model.JOB_TYPE_DATA_RETENTION { + if watcher.workers.DataRetention != nil { + select { + case watcher.workers.DataRetention.JobChannel() <- j: + default: + } + } + } + } + } +} diff --git a/jobs/jobserver/jobserver.go b/jobs/jobserver/jobserver.go index 5f491a815..aabe5d3b2 100644 --- a/jobs/jobserver/jobserver.go +++ b/jobs/jobserver/jobserver.go @@ -16,22 +16,20 @@ import ( _ "github.com/mattermost/platform/imports" ) -var Srv jobs.JobServer - func main() { // Initialize utils.InitAndLoadConfig("config.json") defer l4g.Close() - Srv.Store = store.NewLayeredStore() - defer Srv.Store.Close() + jobs.Srv.Store = store.NewLayeredStore() + defer jobs.Srv.Store.Close() - Srv.LoadLicense() + jobs.Srv.LoadLicense() // Run jobs l4g.Info("Starting Mattermost job server") - Srv.Jobs = jobs.InitJobs(Srv.Store) - Srv.Jobs.Start() + jobs.Srv.StartWorkers() + jobs.Srv.StartSchedulers() var signalChan chan os.Signal = make(chan os.Signal) signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) @@ -40,7 +38,8 @@ func main() { // Cleanup anything that isn't handled by a defer statement l4g.Info("Stopping Mattermost job server") - Srv.Jobs.Stop() + jobs.Srv.StopSchedulers() + jobs.Srv.StopWorkers() l4g.Info("Stopped Mattermost job server") } diff --git a/jobs/schedulers.go b/jobs/schedulers.go new file mode 100644 index 000000000..73ec6661a --- /dev/null +++ b/jobs/schedulers.go @@ -0,0 +1,68 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "sync" + + l4g "github.com/alecthomas/log4go" + ejobs "github.com/mattermost/platform/einterfaces/jobs" + + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/utils" +) + +type Schedulers struct { + startOnce sync.Once + + DataRetention model.Scheduler + + listenerId string +} + +func InitSchedulers() *Schedulers { + schedulers := &Schedulers{} + + if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { + schedulers.DataRetention = dataRetentionInterface.MakeScheduler() + } + + return schedulers +} + +func (schedulers *Schedulers) Start() *Schedulers { + l4g.Info("Starting schedulers") + + schedulers.startOnce.Do(func() { + if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { + go schedulers.DataRetention.Run() + } + }) + + schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange) + + return schedulers +} + +func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { + if schedulers.DataRetention != nil { + if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable { + go schedulers.DataRetention.Run() + } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable { + schedulers.DataRetention.Stop() + } + } +} + +func (schedulers *Schedulers) Stop() *Schedulers { + utils.RemoveConfigListener(schedulers.listenerId) + + if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { + schedulers.DataRetention.Stop() + } + + l4g.Info("Stopped schedulers") + + return schedulers +} diff --git a/jobs/server.go b/jobs/server.go index dd3448842..7920cb2d5 100644 --- a/jobs/server.go +++ b/jobs/server.go @@ -11,10 +11,13 @@ import ( ) type JobServer struct { - Store store.Store - Jobs *Jobs + Store store.Store + Workers *Workers + Schedulers *Schedulers } +var Srv JobServer + func (server *JobServer) LoadLicense() { licenseId := "" if result := <-server.Store.System().Get(); result.Err == nil { @@ -44,3 +47,27 @@ func (server *JobServer) LoadLicense() { l4g.Info(utils.T("mattermost.load_license.find.warn")) } } + +func (server *JobServer) StartWorkers() { + if *utils.Cfg.JobSettings.RunJobs { + Srv.Workers = InitWorkers().Start() + } +} + +func (server *JobServer) StartSchedulers() { + if *utils.Cfg.JobSettings.RunJobs { + Srv.Schedulers = InitSchedulers().Start() + } +} + +func (server *JobServer) StopWorkers() { + if Srv.Workers != nil { + Srv.Workers.Stop() + } +} + +func (server *JobServer) StopSchedulers() { + if Srv.Schedulers != nil { + Srv.Schedulers.Stop() + } +} diff --git a/jobs/testjob.go b/jobs/testjob.go deleted file mode 100644 index 59d5274e5..000000000 --- a/jobs/testjob.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. -// See License.txt for license information. - -package jobs - -import ( - "time" - - l4g "github.com/alecthomas/log4go" - "github.com/mattermost/platform/store" -) - -type TestJob struct { - store store.Store - - name string - stop chan bool - stopped chan bool -} - -func MakeTestJob(s store.Store, name string) *TestJob { - return &TestJob{ - store: s, - name: name, - stop: make(chan bool, 1), - stopped: make(chan bool, 1), - } -} - -func (job *TestJob) Run() { - l4g.Debug("Job %v: Started", job.name) - - running := true - for running { - l4g.Debug("Job %v: Tick", job.name) - - select { - case <-job.stop: - l4g.Debug("Job %v: Received stop signal", job.name) - running = false - case <-time.After(10 * time.Second): - continue - } - } - - l4g.Debug("Job %v: Finished", job.name) - job.stopped <- true -} - -func (job *TestJob) Stop() { - l4g.Debug("Job %v: Stopping", job.name) - job.stop <- true - <-job.stopped -} diff --git a/jobs/testscheduler.go b/jobs/testscheduler.go new file mode 100644 index 000000000..31b5d144c --- /dev/null +++ b/jobs/testscheduler.go @@ -0,0 +1,58 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "time" + + l4g "github.com/alecthomas/log4go" +) + +type TestScheduler struct { + name string + jobType string + stop chan bool + stopped chan bool +} + +func MakeTestScheduler(name string, jobType string) *TestScheduler { + return &TestScheduler{ + name: name, + jobType: jobType, + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + } +} + +func (scheduler *TestScheduler) Run() { + l4g.Debug("Scheduler %v: Started", scheduler.name) + + defer func(){ + l4g.Debug("Scheduler %v: Finished", scheduler.name) + scheduler.stopped <- true + }() + + for { + select { + case <-scheduler.stop: + l4g.Debug("Scheduler %v: Received stop signal", scheduler.name) + return + case <-time.After(86400 * time.Second): + l4g.Debug("Scheduler: %v: Scheduling new job", scheduler.name) + scheduler.AddJob() + } + } +} + +func (scheduler *TestScheduler) AddJob() { + if _, err := CreateJob(scheduler.jobType, nil); err != nil { + l4g.Error("Scheduler %v: failed to create job: %v", scheduler.name, err) + } +} + +func (scheduler *TestScheduler) Stop() { + l4g.Debug("Scheduler %v: Stopping", scheduler.name) + scheduler.stop <- true + <-scheduler.stopped +} diff --git a/jobs/testworker.go b/jobs/testworker.go new file mode 100644 index 000000000..f1c8a07a3 --- /dev/null +++ b/jobs/testworker.go @@ -0,0 +1,104 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "context" + "time" + + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/platform/model" +) + +type TestWorker struct { + name string + stop chan bool + stopped chan bool + jobs chan model.Job +} + +func MakeTestWorker(name string) *TestWorker { + return &TestWorker{ + name: name, + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + jobs: make(chan model.Job), + } +} + +func (worker *TestWorker) Run() { + l4g.Debug("Worker %v: Started", worker.name) + + defer func() { + l4g.Debug("Worker %v: Finished", worker.name) + worker.stopped <- true + }() + + for { + select { + case <-worker.stop: + l4g.Debug("Worker %v: Received stop signal", worker.name) + return + case job := <-worker.jobs: + l4g.Debug("Worker %v: Received a new candidate job.", worker.name) + worker.DoJob(&job) + } + } +} + +func (worker *TestWorker) DoJob(job *model.Job) { + if claimed, err := ClaimJob(job); err != nil { + l4g.Error("Job: %v: Error occurred while trying to claim job: %v", job.Id, err.Error()) + return + } else if !claimed { + return + } + + cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background()) + cancelWatcherChan := make(chan interface{}, 1) + go CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan) + + defer cancelCancelWatcher() + + counter := 0 + for { + select { + case <-cancelWatcherChan: + l4g.Debug("Job %v: Job has been canceled via CancellationWatcher.", job.Id) + if err := SetJobCanceled(job); err != nil { + l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error()) + } + return + case <-worker.stop: + l4g.Debug("Job %v: Job has been canceled via Worker Stop.", job.Id) + if err := SetJobCanceled(job); err != nil { + l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error()) + } + return + case <-time.After(5 * time.Second): + counter++ + if counter > 10 { + l4g.Debug("Job %v: Job completed.", job.Id) + if err := SetJobSuccess(job); err != nil { + l4g.Error("Failed to mark job: %v as succeeded. Error: %v", job.Id, err.Error()) + } + return + } else { + if _, err := SetJobProgress(job.Id, int64(counter*10)); err != nil { + l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error()) + } + } + } + } +} + +func (worker *TestWorker) Stop() { + l4g.Debug("Worker %v: Stopping", worker.name) + worker.stop <- true + <-worker.stopped +} + +func (worker *TestWorker) JobChannel() chan<- model.Job { + return worker.jobs +} diff --git a/jobs/workers.go b/jobs/workers.go new file mode 100644 index 000000000..a42ec4607 --- /dev/null +++ b/jobs/workers.go @@ -0,0 +1,79 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "sync" + + l4g "github.com/alecthomas/log4go" + ejobs "github.com/mattermost/platform/einterfaces/jobs" + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/utils" +) + +type Workers struct { + startOnce sync.Once + watcher *Watcher + + DataRetention model.Worker + // SearchIndexing model.Job + + listenerId string +} + +func InitWorkers() *Workers { + workers := &Workers{ + // SearchIndexing: MakeTestJob(s, "SearchIndexing"), + } + workers.watcher = MakeWatcher(workers) + + if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { + workers.DataRetention = dataRetentionInterface.MakeWorker() + } + + return workers +} + +func (workers *Workers) Start() *Workers { + l4g.Info("Starting workers") + + workers.startOnce.Do(func() { + if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { + go workers.DataRetention.Run() + } + + // go workers.SearchIndexing.Run() + + go workers.watcher.Start() + }) + + workers.listenerId = utils.AddConfigListener(workers.handleConfigChange) + + return workers +} + +func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { + if workers.DataRetention != nil { + if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable { + go workers.DataRetention.Run() + } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable { + workers.DataRetention.Stop() + } + } +} + +func (workers *Workers) Stop() *Workers { + utils.RemoveConfigListener(workers.listenerId) + + workers.watcher.Stop() + + if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { + workers.DataRetention.Stop() + } + // workers.SearchIndexing.Stop() + + l4g.Info("Stopped workers") + + return workers +} diff --git a/model/client4.go b/model/client4.go index da3dfacb7..996d9362c 100644 --- a/model/client4.go +++ b/model/client4.go @@ -2790,22 +2790,22 @@ func (c *Client4) OpenGraph(url string) (map[string]string, *Response) { // Jobs Section -// GetJobStatus gets the status of a single job. -func (c *Client4) GetJobStatus(id string) (*JobStatus, *Response) { +// GetJob gets a single job. +func (c *Client4) GetJob(id string) (*Job, *Response) { if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/%v/status", id), ""); err != nil { return nil, BuildErrorResponse(r, err) } else { defer closeBody(r) - return JobStatusFromJson(r.Body), BuildResponse(r) + return JobFromJson(r.Body), BuildResponse(r) } } -// GetJobStatusesByType gets the status of all jobs of a given type, sorted with the job that most recently started first. -func (c *Client4) GetJobStatusesByType(jobType string, page int, perPage int) ([]*JobStatus, *Response) { +// GetJobsByType gets all jobs of a given type, sorted with the job that most recently started first. +func (c *Client4) GetJobsByType(jobType string, page int, perPage int) ([]*Job, *Response) { if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/type/%v/statuses?page=%v&per_page=%v", jobType, page, perPage), ""); err != nil { return nil, BuildErrorResponse(r, err) } else { defer closeBody(r) - return JobStatusesFromJson(r.Body), BuildResponse(r) + return JobsFromJson(r.Body), BuildResponse(r) } } diff --git a/model/config.go b/model/config.go index b7526925f..3e98aa8f6 100644 --- a/model/config.go +++ b/model/config.go @@ -436,6 +436,11 @@ type DataRetentionSettings struct { Enable *bool } +type JobSettings struct { + RunJobs *bool + RunScheduler *bool +} + type Config struct { ServiceSettings ServiceSettings TeamSettings TeamSettings @@ -462,6 +467,7 @@ type Config struct { WebrtcSettings WebrtcSettings ElasticSearchSettings ElasticSearchSettings DataRetentionSettings DataRetentionSettings + JobSettings JobSettings } func (o *Config) ToJson() string { @@ -1380,6 +1386,16 @@ func (o *Config) SetDefaults() { *o.DataRetentionSettings.Enable = false } + if o.JobSettings.RunJobs == nil { + o.JobSettings.RunJobs = new(bool) + *o.JobSettings.RunJobs = true + } + + if o.JobSettings.RunScheduler == nil { + o.JobSettings.RunScheduler = new(bool) + *o.JobSettings.RunScheduler = true + } + o.defaultWebrtcSettings() } diff --git a/model/job.go b/model/job.go index d539b5bf9..b0567bf1a 100644 --- a/model/job.go +++ b/model/job.go @@ -3,7 +3,84 @@ package model -type Job interface { +import ( + "encoding/json" + "io" +) + +const ( + JOB_TYPE_DATA_RETENTION = "data_retention" + JOB_TYPE_SEARCH_INDEXING = "search_indexing" + + JOB_STATUS_PENDING = "pending" + JOB_STATUS_IN_PROGRESS = "in_progress" + JOB_STATUS_SUCCESS = "success" + JOB_STATUS_ERROR = "error" + JOB_STATUS_CANCEL_REQUESTED = "cancel_requested" + JOB_STATUS_CANCELED = "canceled" +) + +type Job struct { + Id string `json:"id"` + Type string `json:"type"` + Priority int64 `json:"priority"` + CreateAt int64 `json:"create_at"` + StartAt int64 `json:"start_at"` + LastActivityAt int64 `json:"last_activity_at"` + Status string `json:"status"` + Progress int64 `json:"progress"` + Data map[string]interface{} `json:"data"` +} + +func (js *Job) ToJson() string { + if b, err := json.Marshal(js); err != nil { + return "" + } else { + return string(b) + } +} + +func JobFromJson(data io.Reader) *Job { + var status Job + if err := json.NewDecoder(data).Decode(&status); err == nil { + return &status + } else { + return nil + } +} + +func JobsToJson(jobs []*Job) string { + if b, err := json.Marshal(jobs); err != nil { + return "" + } else { + return string(b) + } +} + +func JobsFromJson(data io.Reader) []*Job { + var jobs []*Job + if err := json.NewDecoder(data).Decode(&jobs); err == nil { + return jobs + } else { + return nil + } +} + +func (js *Job) DataToJson() string { + if b, err := json.Marshal(js.Data); err != nil { + return "" + } else { + return string(b) + } +} + +type Worker interface { + Run() + Stop() + JobChannel() chan<- Job +} + +type Scheduler interface { Run() Stop() } diff --git a/model/job_status.go b/model/job_status.go deleted file mode 100644 index cf490648f..000000000 --- a/model/job_status.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. -// See License.txt for license information. - -package model - -import ( - "encoding/json" - "io" -) - -const ( - JOB_TYPE_DATA_RETENTION = "data_retention" - JOB_TYPE_SEARCH_INDEXING = "search_indexing" -) - -type JobStatus struct { - Id string `json:"id"` - Type string `json:"type"` - StartAt int64 `json:"start_at"` - LastActivityAt int64 `json:"last_activity_at"` - LastRunStartedAt int64 `json:"last_run_started_at"` - LastRunCompletedAt int64 `json:"last_run_completed_at"` - Status string `json:"status"` - Data map[string]interface{} `json:"data"` -} - -func (js *JobStatus) ToJson() string { - if b, err := json.Marshal(js); err != nil { - return "" - } else { - return string(b) - } -} - -func JobStatusFromJson(data io.Reader) *JobStatus { - var status JobStatus - if err := json.NewDecoder(data).Decode(&status); err == nil { - return &status - } else { - return nil - } -} - -func JobStatusesToJson(statuses []*JobStatus) string { - if b, err := json.Marshal(statuses); err != nil { - return "" - } else { - return string(b) - } -} - -func JobStatusesFromJson(data io.Reader) []*JobStatus { - var statuses []*JobStatus - if err := json.NewDecoder(data).Decode(&statuses); err == nil { - return statuses - } else { - return nil - } -} diff --git a/store/layered_store.go b/store/layered_store.go index 58c9e5ca1..ab9859c80 100644 --- a/store/layered_store.go +++ b/store/layered_store.go @@ -119,8 +119,8 @@ func (s *LayeredStore) Reaction() ReactionStore { return s.DatabaseLayer.Reaction() } -func (s *LayeredStore) JobStatus() JobStatusStore { - return s.DatabaseLayer.JobStatus() +func (s *LayeredStore) Job() JobStore { + return s.DatabaseLayer.Job() } func (s *LayeredStore) MarkSystemRanUnitTests() { diff --git a/store/sql_job_status_store.go b/store/sql_job_status_store.go deleted file mode 100644 index a87b8267b..000000000 --- a/store/sql_job_status_store.go +++ /dev/null @@ -1,190 +0,0 @@ -// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. -// See License.txt for license information. - -package store - -import ( - "database/sql" - "net/http" - - "github.com/mattermost/platform/model" -) - -type SqlJobStatusStore struct { - SqlStore -} - -func NewSqlJobStatusStore(sqlStore SqlStore) JobStatusStore { - s := &SqlJobStatusStore{sqlStore} - - for _, db := range sqlStore.GetAllConns() { - table := db.AddTableWithName(model.JobStatus{}, "JobStatuses").SetKeys(false, "Id") - table.ColMap("Id").SetMaxSize(26) - table.ColMap("Type").SetMaxSize(32) - table.ColMap("Status").SetMaxSize(32) - table.ColMap("Data").SetMaxSize(1024) - } - - return s -} - -func (jss SqlJobStatusStore) CreateIndexesIfNotExists() { - jss.CreateIndexIfNotExists("idx_jobstatuses_type", "JobStatuses", "Type") -} - -func (jss SqlJobStatusStore) SaveOrUpdate(status *model.JobStatus) StoreChannel { - storeChannel := make(StoreChannel, 1) - - go func() { - result := StoreResult{} - - if err := jss.GetReplica().SelectOne(&model.JobStatus{}, - `SELECT - * - FROM - JobStatuses - WHERE - Id = :Id`, map[string]interface{}{"Id": status.Id}); err == nil { - if _, err := jss.GetMaster().Update(status); err != nil { - result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate", - "store.sql_job_status.update.app_error", nil, "id="+status.Id+", "+err.Error()) - } - } else if err == sql.ErrNoRows { - if err := jss.GetMaster().Insert(status); err != nil { - result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate", - "store.sql_job_status.save.app_error", nil, "id="+status.Id+", "+err.Error()) - } - } else { - result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate", - "store.sql_job_status.save_or_update.app_error", nil, "id="+status.Id+", "+err.Error()) - } - - if result.Err == nil { - result.Data = status - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} - -func (jss SqlJobStatusStore) Get(id string) StoreChannel { - storeChannel := make(StoreChannel, 1) - - go func() { - result := StoreResult{} - - var status *model.JobStatus - - if err := jss.GetReplica().SelectOne(&status, - `SELECT - * - FROM - JobStatuses - WHERE - Id = :Id`, map[string]interface{}{"Id": id}); err != nil { - if err == sql.ErrNoRows { - result.Err = model.NewAppError("SqlJobStatusStore.Get", - "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound) - } else { - result.Err = model.NewAppError("SqlJobStatusStore.Get", - "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError) - } - } else { - result.Data = status - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} - -func (jss SqlJobStatusStore) GetAllByType(jobType string) StoreChannel { - storeChannel := make(StoreChannel, 1) - - go func() { - result := StoreResult{} - - var statuses []*model.JobStatus - - if _, err := jss.GetReplica().Select(&statuses, - `SELECT - * - FROM - JobStatuses - WHERE - Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil { - result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByType", - "store.sql_job_status.get_all_by_type.app_error", nil, "Type="+jobType+", "+err.Error()) - } else { - result.Data = statuses - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} - -func (jss SqlJobStatusStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel { - storeChannel := make(StoreChannel, 1) - - go func() { - result := StoreResult{} - - var statuses []*model.JobStatus - - if _, err := jss.GetReplica().Select(&statuses, - `SELECT - * - FROM - JobStatuses - WHERE - Type = :Type - ORDER BY - StartAt ASC - LIMIT - :Limit - OFFSET - :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil { - result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByTypePage", - "store.sql_job_status.get_all_by_type_page.app_error", nil, "Type="+jobType+", "+err.Error()) - } else { - result.Data = statuses - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} - -func (jss SqlJobStatusStore) Delete(id string) StoreChannel { - storeChannel := make(StoreChannel, 1) - - go func() { - result := StoreResult{} - - if _, err := jss.GetReplica().Exec( - `DELETE FROM - JobStatuses - WHERE - Id = :Id`, map[string]interface{}{"Id": id}); err != nil { - result.Err = model.NewLocAppError("SqlJobStatusStore.DeleteByType", - "store.sql_job_status.delete.app_error", nil, "id="+id+", "+err.Error()) - } else { - result.Data = id - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} diff --git a/store/sql_job_status_store_test.go b/store/sql_job_status_store_test.go deleted file mode 100644 index 18c29e522..000000000 --- a/store/sql_job_status_store_test.go +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. -// See License.txt for license information. - -package store - -import ( - "testing" - - "github.com/mattermost/platform/model" -) - -func TestJobStatusSaveGetUpdate(t *testing.T) { - Setup() - - status := &model.JobStatus{ - Id: model.NewId(), - Type: model.NewId(), - Status: model.NewId(), - Data: map[string]interface{}{ - "Processed": 0, - "Total": 12345, - "LastProcessed": "abcd", - }, - } - - if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil { - t.Fatal(result.Err) - } - - defer func() { - <-store.JobStatus().Delete(status.Id) - }() - - if result := <-store.JobStatus().Get(status.Id); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.(*model.JobStatus); received.Id != status.Id { - t.Fatal("received incorrect status after save") - } - - status.Status = model.NewId() - status.Data = map[string]interface{}{ - "Processed": 12345, - "Total": 12345, - "LastProcessed": "abcd", - } - - if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil { - t.Fatal(result.Err) - } - - if result := <-store.JobStatus().Get(status.Id); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.(*model.JobStatus); received.Id != status.Id || received.Status != status.Status { - t.Fatal("received incorrect status after update") - } -} - -func TestJobStatusGetAllByType(t *testing.T) { - Setup() - - jobType := model.NewId() - - statuses := []*model.JobStatus{ - { - Id: model.NewId(), - Type: jobType, - }, - { - Id: model.NewId(), - Type: jobType, - }, - { - Id: model.NewId(), - Type: model.NewId(), - }, - } - - for _, status := range statuses { - Must(store.JobStatus().SaveOrUpdate(status)) - defer store.JobStatus().Delete(status.Id) - } - - if result := <-store.JobStatus().GetAllByType(jobType); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.([]*model.JobStatus); len(received) != 2 { - t.Fatal("received wrong number of statuses") - } else if received[0].Id != statuses[0].Id && received[1].Id != statuses[0].Id { - t.Fatal("should've received first status") - } else if received[0].Id != statuses[1].Id && received[1].Id != statuses[1].Id { - t.Fatal("should've received second status") - } -} - -func TestJobStatusGetAllByTypePage(t *testing.T) { - Setup() - - jobType := model.NewId() - - statuses := []*model.JobStatus{ - { - Id: model.NewId(), - Type: jobType, - StartAt: 1000, - }, - { - Id: model.NewId(), - Type: jobType, - StartAt: 999, - }, - { - Id: model.NewId(), - Type: jobType, - StartAt: 1001, - }, - } - - for _, status := range statuses { - Must(store.JobStatus().SaveOrUpdate(status)) - defer store.JobStatus().Delete(status.Id) - } - - if result := <-store.JobStatus().GetAllByTypePage(jobType, 0, 2); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.([]*model.JobStatus); len(received) != 2 { - t.Fatal("received wrong number of statuses") - } else if received[0].Id != statuses[1].Id { - t.Fatal("should've received newest job first") - } else if received[1].Id != statuses[0].Id { - t.Fatal("should've received second newest job second") - } - - if result := <-store.JobStatus().GetAllByTypePage(jobType, 2, 2); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.([]*model.JobStatus); len(received) != 1 { - t.Fatal("received wrong number of statuses") - } else if received[0].Id != statuses[2].Id { - t.Fatal("should've received oldest job last") - } -} - -func TestJobStatusDelete(t *testing.T) { - Setup() - - status := Must(store.JobStatus().SaveOrUpdate(&model.JobStatus{ - Id: model.NewId(), - })).(*model.JobStatus) - - if result := <-store.JobStatus().Delete(status.Id); result.Err != nil { - t.Fatal(result.Err) - } -} diff --git a/store/sql_job_store.go b/store/sql_job_store.go new file mode 100644 index 000000000..c00e37d86 --- /dev/null +++ b/store/sql_job_store.go @@ -0,0 +1,327 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package store + +import ( + "database/sql" + "net/http" + + "github.com/mattermost/gorp" + "github.com/mattermost/platform/model" +) + +type SqlJobStore struct { + SqlStore +} + +func NewSqlJobStore(sqlStore SqlStore) JobStore { + s := &SqlJobStore{sqlStore} + + for _, db := range sqlStore.GetAllConns() { + table := db.AddTableWithName(model.Job{}, "Jobs").SetKeys(false, "Id") + table.ColMap("Id").SetMaxSize(26) + table.ColMap("Type").SetMaxSize(32) + table.ColMap("Status").SetMaxSize(32) + table.ColMap("Data").SetMaxSize(1024) + } + + return s +} + +func (jss SqlJobStore) CreateIndexesIfNotExists() { + jss.CreateIndexIfNotExists("idx_jobs_type", "Jobs", "Type") +} + +func (jss SqlJobStore) Save(job *model.Job) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + if err := jss.GetMaster().Insert(job); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.Save", + "store.sql_job.save.app_error", nil, "id="+job.Id+", "+err.Error()) + } else { + result.Data = job + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + if sqlResult, err := jss.GetMaster().Exec( + `UPDATE + Jobs + SET + LastActivityAt = :LastActivityAt, + Status = :Status, + Progress = :Progress, + Data = :Data + WHERE + Id = :Id + AND + Status = :OldStatus`, + map[string]interface{}{ + "Id": job.Id, + "OldStatus": currentStatus, + "LastActivityAt": model.GetMillis(), + "Status": job.Status, + "Data": job.DataToJson(), + "Progress": job.Progress, + }); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.UpdateOptimistically", + "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error()) + } else { + rows, err := sqlResult.RowsAffected() + + if err != nil { + result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus", + "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error()) + } else { + if rows == 1 { + result.Data = true + } else { + result.Data = false + } + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) UpdateStatus(id string, status string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + job := &model.Job{ + Id: id, + Status: status, + LastActivityAt: model.GetMillis(), + } + + if _, err := jss.GetMaster().UpdateColumns(func(col *gorp.ColumnMap) bool { + return col.ColumnName == "Status" || col.ColumnName == "LastActivityAt" + }, job); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus", + "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error()) + } + + if result.Err == nil { + result.Data = job + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var startAtClause string + if newStatus == model.JOB_STATUS_IN_PROGRESS { + startAtClause = `StartAt = :StartAt,` + } + + if sqlResult, err := jss.GetMaster().Exec( + `UPDATE + Jobs + SET `+startAtClause+` + Status = :NewStatus, + LastActivityAt = :LastActivityAt + WHERE + Id = :Id + AND + Status = :OldStatus`, map[string]interface{}{"Id": id, "OldStatus": currentStatus, "NewStatus": newStatus, "StartAt": model.GetMillis(), "LastActivityAt": model.GetMillis()}); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus", + "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error()) + } else { + rows, err := sqlResult.RowsAffected() + + if err != nil { + result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus", + "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error()) + } else { + if rows == 1 { + result.Data = true + } else { + result.Data = false + } + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) Get(id string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var status *model.Job + + if err := jss.GetReplica().SelectOne(&status, + `SELECT + * + FROM + Jobs + WHERE + Id = :Id`, map[string]interface{}{"Id": id}); err != nil { + if err == sql.ErrNoRows { + result.Err = model.NewAppError("SqlJobStore.Get", + "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound) + } else { + result.Err = model.NewAppError("SqlJobStore.Get", + "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError) + } + } else { + result.Data = status + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) GetAllByType(jobType string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var statuses []*model.Job + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + Jobs + WHERE + Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.GetAllByType", + "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error()) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var statuses []*model.Job + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + Jobs + WHERE + Type = :Type + ORDER BY + StartAt ASC + LIMIT + :Limit + OFFSET + :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.GetAllByTypePage", + "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error()) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) GetAllByStatus(status string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var statuses []*model.Job + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + Jobs + WHERE + Status = :Status + ORDER BY + CreateAt ASC`, map[string]interface{}{"Status": status}); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.GetAllByStatus", + "store.sql_job.get_all.app_error", nil, "Status="+status+", "+err.Error()) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) Delete(id string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + if _, err := jss.GetMaster().Exec( + `DELETE FROM + Jobs + WHERE + Id = :Id`, map[string]interface{}{"Id": id}); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.DeleteByType", + "store.sql_job.delete.app_error", nil, "id="+id+", "+err.Error()) + } else { + result.Data = id + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} diff --git a/store/sql_job_store_test.go b/store/sql_job_store_test.go new file mode 100644 index 000000000..edf09a4c0 --- /dev/null +++ b/store/sql_job_store_test.go @@ -0,0 +1,341 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package store + +import ( + "testing" + + "github.com/mattermost/platform/model" + "time" +) + +func TestJobSaveGet(t *testing.T) { + Setup() + + job := &model.Job{ + Id: model.NewId(), + Type: model.NewId(), + Status: model.NewId(), + Data: map[string]interface{}{ + "Processed": 0, + "Total": 12345, + "LastProcessed": "abcd", + }, + } + + if result := <-store.Job().Save(job); result.Err != nil { + t.Fatal(result.Err) + } + + defer func() { + <-store.Job().Delete(job.Id) + }() + + if result := <-store.Job().Get(job.Id); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.(*model.Job); received.Id != job.Id { + t.Fatal("received incorrect job after save") + } +} + +func TestJobGetAllByType(t *testing.T) { + Setup() + + jobType := model.NewId() + + jobs := []*model.Job{ + { + Id: model.NewId(), + Type: jobType, + }, + { + Id: model.NewId(), + Type: jobType, + }, + { + Id: model.NewId(), + Type: model.NewId(), + }, + } + + for _, job := range jobs { + Must(store.Job().Save(job)) + defer store.Job().Delete(job.Id) + } + + if result := <-store.Job().GetAllByType(jobType); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.Job); len(received) != 2 { + t.Fatal("received wrong number of jobs") + } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id { + t.Fatal("should've received first jobs") + } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id { + t.Fatal("should've received second jobs") + } +} + +func TestJobGetAllByTypePage(t *testing.T) { + Setup() + + jobType := model.NewId() + + jobs := []*model.Job{ + { + Id: model.NewId(), + Type: jobType, + StartAt: 1000, + }, + { + Id: model.NewId(), + Type: jobType, + StartAt: 999, + }, + { + Id: model.NewId(), + Type: jobType, + StartAt: 1001, + }, + } + + for _, job := range jobs { + Must(store.Job().Save(job)) + defer store.Job().Delete(job.Id) + } + + if result := <-store.Job().GetAllByTypePage(jobType, 0, 2); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.Job); len(received) != 2 { + t.Fatal("received wrong number of jobs") + } else if received[0].Id != jobs[1].Id { + t.Fatal("should've received newest job first") + } else if received[1].Id != jobs[0].Id { + t.Fatal("should've received second newest job second") + } + + if result := <-store.Job().GetAllByTypePage(jobType, 2, 2); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.Job); len(received) != 1 { + t.Fatal("received wrong number of jobs") + } else if received[0].Id != jobs[2].Id { + t.Fatal("should've received oldest job last") + } +} + +func TestJobGetAllByStatus(t *testing.T) { + jobType := model.NewId() + status := model.NewId() + + jobs := []*model.Job{ + { + Id: model.NewId(), + Type: jobType, + CreateAt: 1000, + Status: status, + }, + { + Id: model.NewId(), + Type: jobType, + CreateAt: 999, + Status: status, + }, + { + Id: model.NewId(), + Type: jobType, + CreateAt: 1001, + Status: status, + }, + { + Id: model.NewId(), + Type: jobType, + CreateAt: 1002, + Status: model.NewId(), + }, + } + + for _, job := range jobs { + Must(store.Job().Save(job)) + defer store.Job().Delete(job.Id) + } + + if result := <-store.Job().GetAllByStatus(status); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.Job); len(received) != 3 { + t.Fatal("received wrong number of jobs") + } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id { + t.Fatal("should've received first jobs") + } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id { + t.Fatal("should've received second jobs") + } +} + +func TestJobUpdateOptimistically(t *testing.T) { + job := &model.Job{ + Id: model.NewId(), + Type: model.JOB_TYPE_DATA_RETENTION, + CreateAt: model.GetMillis(), + Status: model.JOB_STATUS_PENDING, + } + + if result := <-store.Job().Save(job); result.Err != nil { + t.Fatal(result.Err) + } + defer store.Job().Delete(job.Id) + + job.LastActivityAt = model.GetMillis() + job.Status = model.JOB_STATUS_IN_PROGRESS + job.Progress = 50 + job.Data = map[string]interface{}{ + "Foo": "Bar", + } + + if result := <-store.Job().UpdateOptimistically(job, model.JOB_STATUS_SUCCESS); result.Err != nil { + if result.Data.(bool) { + t.Fatal("should have failed due to incorrect old status") + } + } + + time.Sleep(2 * time.Millisecond) + + if result := <-store.Job().UpdateOptimistically(job, model.JOB_STATUS_PENDING); result.Err != nil { + t.Fatal(result.Err) + } else { + if !result.Data.(bool) { + t.Fatal("Should have successfully updated") + } + + var updatedJob *model.Job + + if result := <-store.Job().Get(job.Id); result.Err != nil { + t.Fatal(result.Err) + } else { + updatedJob = result.Data.(*model.Job) + } + + if updatedJob.Type != job.Type || updatedJob.CreateAt != job.CreateAt || updatedJob.Status != job.Status || updatedJob.LastActivityAt <= job.LastActivityAt || updatedJob.Progress != job.Progress || updatedJob.Data["Foo"] != job.Data["Foo"] { + t.Fatal("Some update property was not as expected") + } + } + +} + +func TestJobUpdateStatusUpdateStatusOptimistically(t *testing.T) { + job := &model.Job{ + Id: model.NewId(), + Type: model.JOB_TYPE_DATA_RETENTION, + CreateAt: model.GetMillis(), + Status: model.JOB_STATUS_SUCCESS, + } + + var lastUpdateAt int64 + if result := <-store.Job().Save(job); result.Err != nil { + t.Fatal(result.Err) + } else { + lastUpdateAt = result.Data.(*model.Job).LastActivityAt + } + + defer store.Job().Delete(job.Id) + + time.Sleep(2 * time.Millisecond) + + if result := <-store.Job().UpdateStatus(job.Id, model.JOB_STATUS_PENDING); result.Err != nil { + t.Fatal(result.Err) + } else { + received := result.Data.(*model.Job) + if received.Status != model.JOB_STATUS_PENDING { + t.Fatal("status wasn't updated") + } + if received.LastActivityAt <= lastUpdateAt { + t.Fatal("lastActivityAt wasn't updated") + } + lastUpdateAt = received.LastActivityAt + } + + time.Sleep(2 * time.Millisecond) + + if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil { + t.Fatal(result.Err) + } else { + if result.Data.(bool) { + t.Fatal("should be false due to incorrect original status") + } + } + + if result := <-store.Job().Get(job.Id); result.Err != nil { + t.Fatal(result.Err) + } else { + received := result.Data.(*model.Job) + if received.Status != model.JOB_STATUS_PENDING { + t.Fatal("should still be pending") + } + if received.LastActivityAt != lastUpdateAt { + t.Fatal("last activity at shouldn't have changed") + } + } + + time.Sleep(2 * time.Millisecond) + + if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + t.Fatal(result.Err) + } else { + if !result.Data.(bool) { + t.Fatal("should have succeeded") + } + } + + var startAtSet int64 + if result := <-store.Job().Get(job.Id); result.Err != nil { + t.Fatal(result.Err) + } else { + received := result.Data.(*model.Job) + if received.Status != model.JOB_STATUS_IN_PROGRESS { + t.Fatal("should be in progress") + } + if received.StartAt == 0 { + t.Fatal("received should have start at set") + } + if received.LastActivityAt <= lastUpdateAt { + t.Fatal("lastActivityAt wasn't updated") + } + lastUpdateAt = received.LastActivityAt + startAtSet = received.StartAt + } + + time.Sleep(2 * time.Millisecond) + + if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil { + t.Fatal(result.Err) + } else { + if !result.Data.(bool) { + t.Fatal("should have succeeded") + } + } + + if result := <-store.Job().Get(job.Id); result.Err != nil { + t.Fatal(result.Err) + } else { + received := result.Data.(*model.Job) + if received.Status != model.JOB_STATUS_SUCCESS { + t.Fatal("should be success status") + } + if received.StartAt != startAtSet { + t.Fatal("startAt should not have changed") + } + if received.LastActivityAt <= lastUpdateAt { + t.Fatal("lastActivityAt wasn't updated") + } + lastUpdateAt = received.LastActivityAt + } +} + +func TestJobDelete(t *testing.T) { + Setup() + + status := Must(store.Job().Save(&model.Job{ + Id: model.NewId(), + })).(*model.Job) + + if result := <-store.Job().Delete(status.Id); result.Err != nil { + t.Fatal(result.Err) + } +} diff --git a/store/sql_store.go b/store/sql_store.go index dc3b51d0c..a039401f3 100644 --- a/store/sql_store.go +++ b/store/sql_store.go @@ -79,4 +79,5 @@ type SqlStore interface { Status() StatusStore FileInfo() FileInfoStore Reaction() ReactionStore + Job() JobStore } diff --git a/store/sql_supplier.go b/store/sql_supplier.go index 6f51cbd09..0f4ab8380 100644 --- a/store/sql_supplier.go +++ b/store/sql_supplier.go @@ -82,7 +82,7 @@ type SqlSupplierOldStores struct { status StatusStore fileInfo FileInfoStore reaction ReactionStore - jobStatus JobStatusStore + job JobStore } type SqlSupplier struct { @@ -121,7 +121,7 @@ func NewSqlSupplier() *SqlSupplier { supplier.oldStores.status = NewSqlStatusStore(supplier) supplier.oldStores.fileInfo = NewSqlFileInfoStore(supplier) supplier.oldStores.reaction = NewSqlReactionStore(supplier) - supplier.oldStores.jobStatus = NewSqlJobStatusStore(supplier) + supplier.oldStores.job = NewSqlJobStore(supplier) err := supplier.GetMaster().CreateTablesIfNotExists() if err != nil { @@ -150,7 +150,7 @@ func NewSqlSupplier() *SqlSupplier { supplier.oldStores.status.(*SqlStatusStore).CreateIndexesIfNotExists() supplier.oldStores.fileInfo.(*SqlFileInfoStore).CreateIndexesIfNotExists() supplier.oldStores.reaction.(*SqlReactionStore).CreateIndexesIfNotExists() - supplier.oldStores.jobStatus.(*SqlJobStatusStore).CreateIndexesIfNotExists() + supplier.oldStores.job.(*SqlJobStore).CreateIndexesIfNotExists() supplier.oldStores.preference.(*SqlPreferenceStore).DeleteUnusedFeatures() @@ -752,8 +752,8 @@ func (ss *SqlSupplier) Reaction() ReactionStore { return ss.oldStores.reaction } -func (ss *SqlSupplier) JobStatus() JobStatusStore { - return ss.oldStores.jobStatus +func (ss *SqlSupplier) Job() JobStore { + return ss.oldStores.job } func (ss *SqlSupplier) DropAllTables() { diff --git a/store/sql_upgrade.go b/store/sql_upgrade.go index 5a6ed0ab5..a7b72124e 100644 --- a/store/sql_upgrade.go +++ b/store/sql_upgrade.go @@ -280,8 +280,9 @@ func UpgradeDatabaseToVersion40(sqlStore SqlStore) { } func UpgradeDatabaseToVersion41(sqlStore SqlStore) { - // TODO: Uncomment following condition when version 4.0.0 is released + // TODO: Uncomment following condition when version 4.1.0 is released // if shouldPerformUpgrade(sqlStore, VERSION_4_0_0, VERSION_4_1_0) { + sqlStore.RemoveTableIfExists("JobStatuses") // saveSchemaVersion(sqlStore, VERSION_4_1_0) // } } diff --git a/store/store.go b/store/store.go index 0007f495e..95496b609 100644 --- a/store/store.go +++ b/store/store.go @@ -48,7 +48,7 @@ type Store interface { Status() StatusStore FileInfo() FileInfoStore Reaction() ReactionStore - JobStatus() JobStatusStore + Job() JobStore MarkSystemRanUnitTests() Close() DropAllTables() @@ -384,10 +384,14 @@ type ReactionStore interface { DeleteAllWithEmojiName(emojiName string) StoreChannel } -type JobStatusStore interface { - SaveOrUpdate(status *model.JobStatus) StoreChannel +type JobStore interface { + Save(job *model.Job) StoreChannel + UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel + UpdateStatus(id string, status string) StoreChannel + UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel Get(id string) StoreChannel GetAllByType(jobType string) StoreChannel GetAllByTypePage(jobType string, offset int, limit int) StoreChannel + GetAllByStatus(status string) StoreChannel Delete(id string) StoreChannel } |