From 4e79d2d4d037e7c33ec3e63d58110668106de222 Mon Sep 17 00:00:00 2001 From: Chris Date: Fri, 29 Sep 2017 04:29:29 -0500 Subject: remove jobs.Srv and other jobs-related globals (#7535) --- api4/apitestlib.go | 5 ++-- api4/job.go | 4 +-- app/admin.go | 3 +-- app/app.go | 45 +++++++++++++++++++++++++++++++++- app/job.go | 9 +++---- cmd/platform/jobserver.go | 15 ++++++------ cmd/platform/server.go | 11 ++++----- einterfaces/jobs/data_retention.go | 10 -------- einterfaces/jobs/elasticsearch.go | 20 --------------- einterfaces/jobs/ldap_sync.go | 10 -------- jobs/jobs.go | 50 +++++++++++++++++++------------------- jobs/jobs_watcher.go | 6 +++-- jobs/schedulers.go | 20 ++++++++------- jobs/server.go | 35 ++++++++++++++------------ jobs/testworker.go | 16 ++++++------ jobs/workers.go | 13 +++++----- 16 files changed, 140 insertions(+), 132 deletions(-) diff --git a/api4/apitestlib.go b/api4/apitestlib.go index fad066ff8..301e312da 100644 --- a/api4/apitestlib.go +++ b/api4/apitestlib.go @@ -23,7 +23,6 @@ import ( "github.com/mattermost/mattermost-server/utils" "github.com/mattermost/mattermost-server/wsapi" - "github.com/mattermost/mattermost-server/jobs" s3 "github.com/minio/minio-go" "github.com/minio/minio-go/pkg/credentials" ) @@ -76,8 +75,8 @@ func setupTestHelper(enterprise bool) *TestHelper { utils.License().Features.SetDefaults() } - if jobs.Srv.Store == nil { - jobs.Srv.Store = th.App.Srv.Store + if th.App.Jobs.Store == nil { + th.App.Jobs.Store = th.App.Srv.Store } th.Client = th.CreateClient() diff --git a/api4/job.go b/api4/job.go index 57c604361..138c76c8f 100644 --- a/api4/job.go +++ b/api4/job.go @@ -52,7 +52,7 @@ func createJob(c *Context, w http.ResponseWriter, r *http.Request) { return } - if job, err := app.CreateJob(job); err != nil { + if job, err := c.App.CreateJob(job); err != nil { c.Err = err return } else { @@ -109,7 +109,7 @@ func cancelJob(c *Context, w http.ResponseWriter, r *http.Request) { return } - if err := app.CancelJob(c.Params.JobId); err != nil { + if err := c.App.CancelJob(c.Params.JobId); err != nil { c.Err = err return } diff --git a/app/admin.go b/app/admin.go index 0d02c3b49..dab7e9759 100644 --- a/app/admin.go +++ b/app/admin.go @@ -14,7 +14,6 @@ import ( "net/http" l4g "github.com/alecthomas/log4go" - "github.com/mattermost/mattermost-server/jobs" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/store" "github.com/mattermost/mattermost-server/store/sqlstore" @@ -190,7 +189,7 @@ func (a *App) RecycleDatabaseConnection() { l4g.Warn(utils.T("api.admin.recycle_db_start.warn")) a.Srv.Store = store.NewLayeredStore(sqlstore.NewSqlSupplier(a.Metrics), a.Metrics, a.Cluster) - jobs.Srv.Store = a.Srv.Store + a.Jobs.Store = a.Srv.Store time.Sleep(20 * time.Second) oldStore.Close() diff --git a/app/app.go b/app/app.go index e85fa6342..26388d841 100644 --- a/app/app.go +++ b/app/app.go @@ -9,6 +9,8 @@ import ( "sync" "github.com/mattermost/mattermost-server/einterfaces" + ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" + "github.com/mattermost/mattermost-server/jobs" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/plugin/pluginenv" "github.com/mattermost/mattermost-server/utils" @@ -25,6 +27,8 @@ type App struct { Hubs []*Hub HubsStopCheckingForDeadlock chan bool + Jobs *jobs.JobServer + AccountMigration einterfaces.AccountMigrationInterface Brand einterfaces.BrandInterface Cluster einterfaces.ClusterInterface @@ -36,7 +40,9 @@ type App struct { Saml einterfaces.SamlInterface } -var globalApp App +var globalApp App = App{ + Jobs: &jobs.JobServer{}, +} var initEnterprise sync.Once @@ -65,6 +71,30 @@ func RegisterComplianceInterface(f func(*App) einterfaces.ComplianceInterface) { complianceInterface = f } +var jobsDataRetentionInterface func(*App) ejobs.DataRetentionInterface + +func RegisterJobsDataRetentionInterface(f func(*App) ejobs.DataRetentionInterface) { + jobsDataRetentionInterface = f +} + +var jobsElasticsearchAggregatorInterface func(*App) ejobs.ElasticsearchAggregatorInterface + +func RegisterJobsElasticsearchAggregatorInterface(f func(*App) ejobs.ElasticsearchAggregatorInterface) { + jobsElasticsearchAggregatorInterface = f +} + +var jobsElasticsearchIndexerInterface func(*App) ejobs.ElasticsearchIndexerInterface + +func RegisterJobsElasticsearchIndexerInterface(f func(*App) ejobs.ElasticsearchIndexerInterface) { + jobsElasticsearchIndexerInterface = f +} + +var jobsLdapSyncInterface func(*App) ejobs.LdapSyncInterface + +func RegisterJobsLdapSyncInterface(f func(*App) ejobs.LdapSyncInterface) { + jobsLdapSyncInterface = f +} + var ldapInterface func(*App) einterfaces.LdapInterface func RegisterLdapInterface(f func(*App) einterfaces.LdapInterface) { @@ -121,6 +151,19 @@ func (a *App) initEnterprise() { a.Saml.ConfigureSP() }) } + + if jobsDataRetentionInterface != nil { + a.Jobs.DataRetention = jobsDataRetentionInterface(a) + } + if jobsElasticsearchAggregatorInterface != nil { + a.Jobs.ElasticsearchAggregator = jobsElasticsearchAggregatorInterface(a) + } + if jobsElasticsearchIndexerInterface != nil { + a.Jobs.ElasticsearchIndexer = jobsElasticsearchIndexerInterface(a) + } + if jobsLdapSyncInterface != nil { + a.Jobs.LdapSync = jobsLdapSyncInterface(a) + } } func (a *App) Config() *model.Config { diff --git a/app/job.go b/app/job.go index c1058880f..d80fe6262 100644 --- a/app/job.go +++ b/app/job.go @@ -4,7 +4,6 @@ package app import ( - "github.com/mattermost/mattermost-server/jobs" "github.com/mattermost/mattermost-server/model" ) @@ -40,10 +39,10 @@ func (a *App) GetJobsByType(jobType string, offset int, limit int) ([]*model.Job } } -func CreateJob(job *model.Job) (*model.Job, *model.AppError) { - return jobs.CreateJob(job.Type, job.Data) +func (a *App) CreateJob(job *model.Job) (*model.Job, *model.AppError) { + return a.Jobs.CreateJob(job.Type, job.Data) } -func CancelJob(jobId string) *model.AppError { - return jobs.RequestCancellation(jobId) +func (a *App) CancelJob(jobId string) *model.AppError { + return a.Jobs.RequestCancellation(jobId) } diff --git a/cmd/platform/jobserver.go b/cmd/platform/jobserver.go index 6d4e828bd..4f82a21ee 100644 --- a/cmd/platform/jobserver.go +++ b/cmd/platform/jobserver.go @@ -8,7 +8,6 @@ import ( "syscall" l4g "github.com/alecthomas/log4go" - "github.com/mattermost/mattermost-server/jobs" "github.com/mattermost/mattermost-server/store" "github.com/mattermost/mattermost-server/store/sqlstore" "github.com/spf13/cobra" @@ -37,18 +36,18 @@ func jobserverCmdF(cmd *cobra.Command, args []string) { } defer l4g.Close() - jobs.Srv.Store = store.NewLayeredStore(sqlstore.NewSqlSupplier(a.Metrics), a.Metrics, a.Cluster) - defer jobs.Srv.Store.Close() + a.Jobs.Store = store.NewLayeredStore(sqlstore.NewSqlSupplier(a.Metrics), a.Metrics, a.Cluster) + defer a.Jobs.Store.Close() - jobs.Srv.LoadLicense() + a.Jobs.LoadLicense() // Run jobs l4g.Info("Starting Mattermost job server") if !noJobs { - jobs.Srv.StartWorkers() + a.Jobs.StartWorkers() } if !noSchedule { - jobs.Srv.StartSchedulers() + a.Jobs.StartSchedulers() } var signalChan chan os.Signal = make(chan os.Signal) @@ -58,8 +57,8 @@ func jobserverCmdF(cmd *cobra.Command, args []string) { // Cleanup anything that isn't handled by a defer statement l4g.Info("Stopping Mattermost job server") - jobs.Srv.StopSchedulers() - jobs.Srv.StopWorkers() + a.Jobs.StopSchedulers() + a.Jobs.StopWorkers() l4g.Info("Stopped Mattermost job server") } diff --git a/cmd/platform/server.go b/cmd/platform/server.go index 7f5fbf6e8..ec753c837 100644 --- a/cmd/platform/server.go +++ b/cmd/platform/server.go @@ -13,7 +13,6 @@ import ( "github.com/mattermost/mattermost-server/api" "github.com/mattermost/mattermost-server/api4" "github.com/mattermost/mattermost-server/app" - "github.com/mattermost/mattermost-server/jobs" "github.com/mattermost/mattermost-server/manualtesting" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" @@ -138,12 +137,12 @@ func runServer(configFileLocation string) { } } - jobs.Srv.Store = a.Srv.Store + a.Jobs.Store = a.Srv.Store if *utils.Cfg.JobSettings.RunJobs { - jobs.Srv.StartWorkers() + a.Jobs.StartWorkers() } if *utils.Cfg.JobSettings.RunScheduler { - jobs.Srv.StartSchedulers() + a.Jobs.StartSchedulers() } // wait for kill signal before attempting to gracefully shutdown @@ -160,8 +159,8 @@ func runServer(configFileLocation string) { a.Metrics.StopServer() } - jobs.Srv.StopSchedulers() - jobs.Srv.StopWorkers() + a.Jobs.StopSchedulers() + a.Jobs.StopWorkers() a.StopServer() } diff --git a/einterfaces/jobs/data_retention.go b/einterfaces/jobs/data_retention.go index 887ce561b..5910d6120 100644 --- a/einterfaces/jobs/data_retention.go +++ b/einterfaces/jobs/data_retention.go @@ -11,13 +11,3 @@ type DataRetentionInterface interface { MakeWorker() model.Worker MakeScheduler() model.Scheduler } - -var theDataRetentionInterface DataRetentionInterface - -func RegisterDataRetentionInterface(newInterface DataRetentionInterface) { - theDataRetentionInterface = newInterface -} - -func GetDataRetentionInterface() DataRetentionInterface { - return theDataRetentionInterface -} diff --git a/einterfaces/jobs/elasticsearch.go b/einterfaces/jobs/elasticsearch.go index 513a6c323..16e0d7697 100644 --- a/einterfaces/jobs/elasticsearch.go +++ b/einterfaces/jobs/elasticsearch.go @@ -11,27 +11,7 @@ type ElasticsearchIndexerInterface interface { MakeWorker() model.Worker } -var theElasticsearchIndexerInterface ElasticsearchIndexerInterface - -func RegisterElasticsearchIndexerInterface(newInterface ElasticsearchIndexerInterface) { - theElasticsearchIndexerInterface = newInterface -} - -func GetElasticsearchIndexerInterface() ElasticsearchIndexerInterface { - return theElasticsearchIndexerInterface -} - type ElasticsearchAggregatorInterface interface { MakeWorker() model.Worker MakeScheduler() model.Scheduler } - -var theElasticsearchAggregatorInterface ElasticsearchAggregatorInterface - -func RegisterElasticsearchAggregatorInterface(newInterface ElasticsearchAggregatorInterface) { - theElasticsearchAggregatorInterface = newInterface -} - -func GetElasticsearchAggregatorInterface() ElasticsearchAggregatorInterface { - return theElasticsearchAggregatorInterface -} diff --git a/einterfaces/jobs/ldap_sync.go b/einterfaces/jobs/ldap_sync.go index 97055bfcc..5565afe41 100644 --- a/einterfaces/jobs/ldap_sync.go +++ b/einterfaces/jobs/ldap_sync.go @@ -11,13 +11,3 @@ type LdapSyncInterface interface { MakeWorker() model.Worker MakeScheduler() model.Scheduler } - -var theLdapSyncInterface LdapSyncInterface - -func RegisterLdapSyncInterface(newInterface LdapSyncInterface) { - theLdapSyncInterface = newInterface -} - -func GetLdapSyncInterface() LdapSyncInterface { - return theLdapSyncInterface -} diff --git a/jobs/jobs.go b/jobs/jobs.go index 22d87e850..7e49b2f48 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -18,7 +18,7 @@ const ( CANCEL_WATCHER_POLLING_INTERVAL = 5000 ) -func CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) { +func (srv *JobServer) CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) { job := model.Job{ Id: model.NewId(), Type: jobType, @@ -31,23 +31,23 @@ func CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.Ap return nil, err } - if result := <-Srv.Store.Job().Save(&job); result.Err != nil { + if result := <-srv.Store.Job().Save(&job); result.Err != nil { return nil, result.Err } return &job, nil } -func GetJob(id string) (*model.Job, *model.AppError) { - if result := <-Srv.Store.Job().Get(id); result.Err != nil { +func (srv *JobServer) GetJob(id string) (*model.Job, *model.AppError) { + if result := <-srv.Store.Job().Get(id); result.Err != nil { return nil, result.Err } else { return result.Data.(*model.Job), nil } } -func ClaimJob(job *model.Job) (bool, *model.AppError) { - if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { +func (srv *JobServer) ClaimJob(job *model.Job) (bool, *model.AppError) { + if result := <-srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { return false, result.Err } else { success := result.Data.(bool) @@ -55,25 +55,25 @@ func ClaimJob(job *model.Job) (bool, *model.AppError) { } } -func SetJobProgress(job *model.Job, progress int64) *model.AppError { +func (srv *JobServer) SetJobProgress(job *model.Job, progress int64) *model.AppError { job.Status = model.JOB_STATUS_IN_PROGRESS job.Progress = progress - if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { return result.Err } else { return nil } } -func SetJobSuccess(job *model.Job) *model.AppError { - result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS) +func (srv *JobServer) SetJobSuccess(job *model.Job) *model.AppError { + result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS) return result.Err } -func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { +func (srv *JobServer) SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { if jobError == nil { - result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) + result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) return result.Err } @@ -85,11 +85,11 @@ func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { jobError.Translate(utils.T) job.Data["error"] = jobError.Message + " (" + jobError.DetailedError + ")" - if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { return result.Err } else { if !result.Data.(bool) { - if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { + if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { return result.Err } else { if !result.Data.(bool) { @@ -102,19 +102,19 @@ func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { return nil } -func SetJobCanceled(job *model.Job) *model.AppError { - result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED) +func (srv *JobServer) SetJobCanceled(job *model.Job) *model.AppError { + result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED) return result.Err } -func RequestCancellation(jobId string) *model.AppError { - if result := <-Srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { +func (srv *JobServer) RequestCancellation(jobId string) *model.AppError { + if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { return result.Err } else if result.Data.(bool) { return nil } - if result := <-Srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { + if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { return result.Err } else if result.Data.(bool) { return nil @@ -123,7 +123,7 @@ func RequestCancellation(jobId string) *model.AppError { return model.NewAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id="+jobId, http.StatusInternalServerError) } -func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) { +func (srv *JobServer) CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) { for { select { case <-ctx.Done(): @@ -131,7 +131,7 @@ func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan inte return case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond): l4g.Debug("CancellationWatcher for Job: %v polling.", jobId) - if result := <-Srv.Store.Job().Get(jobId); result.Err == nil { + if result := <-srv.Store.Job().Get(jobId); result.Err == nil { jobStatus := result.Data.(*model.Job) if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED { close(cancelChan) @@ -152,16 +152,16 @@ func GenerateNextStartDateTime(now time.Time, nextStartTime time.Time) *time.Tim return &nextTime } -func CheckForPendingJobsByType(jobType string) (bool, *model.AppError) { - if result := <-Srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil { +func (srv *JobServer) CheckForPendingJobsByType(jobType string) (bool, *model.AppError) { + if result := <-srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil { return false, result.Err } else { return result.Data.(int64) > 0, nil } } -func GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) { - if result := <-Srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil { +func (srv *JobServer) GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) { + if result := <-srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil { return nil, result.Err } else { return result.Data.(*model.Job), nil diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go index 56cf9eb2e..f9a958fe3 100644 --- a/jobs/jobs_watcher.go +++ b/jobs/jobs_watcher.go @@ -16,6 +16,7 @@ const ( ) type Watcher struct { + srv *JobServer workers *Workers stop chan bool @@ -23,12 +24,13 @@ type Watcher struct { pollingInterval int } -func MakeWatcher(workers *Workers, pollingInterval int) *Watcher { +func (srv *JobServer) MakeWatcher(workers *Workers, pollingInterval int) *Watcher { return &Watcher{ stop: make(chan bool, 1), stopped: make(chan bool, 1), pollingInterval: pollingInterval, workers: workers, + srv: srv, } } @@ -63,7 +65,7 @@ func (watcher *Watcher) Stop() { } func (watcher *Watcher) PollAndNotify() { - if result := <-Srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil { + if result := <-watcher.srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil { l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error()) } else { jobs := result.Data.([]*model.Job) diff --git a/jobs/schedulers.go b/jobs/schedulers.go index cdf8d956d..2f1ae394f 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -9,7 +9,6 @@ import ( l4g "github.com/alecthomas/log4go" - ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) @@ -20,28 +19,31 @@ type Schedulers struct { configChanged chan *model.Config listenerId string startOnce sync.Once + jobs *JobServer schedulers []model.Scheduler nextRunTimes []*time.Time } -func InitSchedulers() *Schedulers { +func (srv *JobServer) InitSchedulers() *Schedulers { l4g.Debug("Initialising schedulers.") + schedulers := &Schedulers{ stop: make(chan bool), stopped: make(chan bool), configChanged: make(chan *model.Config), + jobs: srv, } - if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { + if dataRetentionInterface := srv.DataRetention; dataRetentionInterface != nil { schedulers.schedulers = append(schedulers.schedulers, dataRetentionInterface.MakeScheduler()) } - if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil { + if elasticsearchAggregatorInterface := srv.ElasticsearchAggregator; elasticsearchAggregatorInterface != nil { schedulers.schedulers = append(schedulers.schedulers, elasticsearchAggregatorInterface.MakeScheduler()) } - if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil { + if ldapSyncInterface := srv.LdapSync; ldapSyncInterface != nil { schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler()) } @@ -124,7 +126,7 @@ func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now tim scheduler := schedulers.schedulers[idx] if !pendingJobs { - if pj, err := CheckForPendingJobsByType(scheduler.JobType()); err != nil { + if pj, err := schedulers.jobs.CheckForPendingJobsByType(scheduler.JobType()); err != nil { l4g.Error("Failed to set next job run time: " + err.Error()) schedulers.nextRunTimes[idx] = nil return @@ -133,7 +135,7 @@ func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now tim } } - lastSuccessfulJob, err := GetLastSuccessfulJobByType(scheduler.JobType()) + lastSuccessfulJob, err := schedulers.jobs.GetLastSuccessfulJobByType(scheduler.JobType()) if err != nil { l4g.Error("Failed to set next job run time: " + err.Error()) schedulers.nextRunTimes[idx] = nil @@ -145,12 +147,12 @@ func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now tim } func (schedulers *Schedulers) scheduleJob(cfg *model.Config, scheduler model.Scheduler) (*model.Job, *model.AppError) { - pendingJobs, err := CheckForPendingJobsByType(scheduler.JobType()) + pendingJobs, err := schedulers.jobs.CheckForPendingJobsByType(scheduler.JobType()) if err != nil { return nil, err } - lastSuccessfulJob, err2 := GetLastSuccessfulJobByType(scheduler.JobType()) + lastSuccessfulJob, err2 := schedulers.jobs.GetLastSuccessfulJobByType(scheduler.JobType()) if err2 != nil { return nil, err } diff --git a/jobs/server.go b/jobs/server.go index 6c857e7dc..667d6c075 100644 --- a/jobs/server.go +++ b/jobs/server.go @@ -5,6 +5,8 @@ package jobs import ( l4g "github.com/alecthomas/log4go" + + ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/store" "github.com/mattermost/mattermost-server/utils" @@ -14,13 +16,16 @@ type JobServer struct { Store store.Store Workers *Workers Schedulers *Schedulers -} -var Srv JobServer + DataRetention ejobs.DataRetentionInterface + ElasticsearchAggregator ejobs.ElasticsearchAggregatorInterface + ElasticsearchIndexer ejobs.ElasticsearchIndexerInterface + LdapSync ejobs.LdapSyncInterface +} -func (server *JobServer) LoadLicense() { +func (srv *JobServer) LoadLicense() { licenseId := "" - if result := <-server.Store.System().Get(); result.Err == nil { + if result := <-srv.Store.System().Get(); result.Err == nil { props := result.Data.(model.StringMap) licenseId = props[model.SYSTEM_ACTIVE_LICENSE_ID] } @@ -31,7 +36,7 @@ func (server *JobServer) LoadLicense() { // Lets attempt to load the file from disk since it was missing from the DB _, licenseBytes = utils.GetAndValidateLicenseFileFromDisk() } else { - if result := <-server.Store.License().Get(licenseId); result.Err == nil { + if result := <-srv.Store.License().Get(licenseId); result.Err == nil { record := result.Data.(*model.LicenseRecord) licenseBytes = []byte(record.Bytes) l4g.Info("License key valid unlocking enterprise features.") @@ -48,22 +53,22 @@ func (server *JobServer) LoadLicense() { } } -func (server *JobServer) StartWorkers() { - Srv.Workers = InitWorkers().Start() +func (srv *JobServer) StartWorkers() { + srv.Workers = srv.InitWorkers().Start() } -func (server *JobServer) StartSchedulers() { - Srv.Schedulers = InitSchedulers().Start() +func (srv *JobServer) StartSchedulers() { + srv.Schedulers = srv.InitSchedulers().Start() } -func (server *JobServer) StopWorkers() { - if Srv.Workers != nil { - Srv.Workers.Stop() +func (srv *JobServer) StopWorkers() { + if srv.Workers != nil { + srv.Workers.Stop() } } -func (server *JobServer) StopSchedulers() { - if Srv.Schedulers != nil { - Srv.Schedulers.Stop() +func (srv *JobServer) StopSchedulers() { + if srv.Schedulers != nil { + srv.Schedulers.Stop() } } diff --git a/jobs/testworker.go b/jobs/testworker.go index 29608e909..9cfc8614f 100644 --- a/jobs/testworker.go +++ b/jobs/testworker.go @@ -12,14 +12,16 @@ import ( ) type TestWorker struct { + srv *JobServer name string stop chan bool stopped chan bool jobs chan model.Job } -func MakeTestWorker(name string) *TestWorker { +func (srv *JobServer) MakeTestWorker(name string) *TestWorker { return &TestWorker{ + srv: srv, name: name, stop: make(chan bool, 1), stopped: make(chan bool, 1), @@ -48,7 +50,7 @@ func (worker *TestWorker) Run() { } func (worker *TestWorker) DoJob(job *model.Job) { - if claimed, err := ClaimJob(job); err != nil { + if claimed, err := worker.srv.ClaimJob(job); err != nil { l4g.Error("Job: %v: Error occurred while trying to claim job: %v", job.Id, err.Error()) return } else if !claimed { @@ -57,7 +59,7 @@ func (worker *TestWorker) DoJob(job *model.Job) { cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background()) cancelWatcherChan := make(chan interface{}, 1) - go CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan) + go worker.srv.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan) defer cancelCancelWatcher() @@ -66,13 +68,13 @@ func (worker *TestWorker) DoJob(job *model.Job) { select { case <-cancelWatcherChan: l4g.Debug("Job %v: Job has been canceled via CancellationWatcher.", job.Id) - if err := SetJobCanceled(job); err != nil { + if err := worker.srv.SetJobCanceled(job); err != nil { l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error()) } return case <-worker.stop: l4g.Debug("Job %v: Job has been canceled via Worker Stop.", job.Id) - if err := SetJobCanceled(job); err != nil { + if err := worker.srv.SetJobCanceled(job); err != nil { l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error()) } return @@ -80,12 +82,12 @@ func (worker *TestWorker) DoJob(job *model.Job) { counter++ if counter > 10 { l4g.Debug("Job %v: Job completed.", job.Id) - if err := SetJobSuccess(job); err != nil { + if err := worker.srv.SetJobSuccess(job); err != nil { l4g.Error("Failed to mark job: %v as succeeded. Error: %v", job.Id, err.Error()) } return } else { - if err := SetJobProgress(job, int64(counter*10)); err != nil { + if err := worker.srv.SetJobProgress(job, int64(counter*10)); err != nil { l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error()) } } diff --git a/jobs/workers.go b/jobs/workers.go index 9f85adaf5..dfa150ff6 100644 --- a/jobs/workers.go +++ b/jobs/workers.go @@ -7,7 +7,6 @@ import ( "sync" l4g "github.com/alecthomas/log4go" - ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) @@ -24,23 +23,23 @@ type Workers struct { listenerId string } -func InitWorkers() *Workers { +func (srv *JobServer) InitWorkers() *Workers { workers := &Workers{} - workers.Watcher = MakeWatcher(workers, DEFAULT_WATCHER_POLLING_INTERVAL) + workers.Watcher = srv.MakeWatcher(workers, DEFAULT_WATCHER_POLLING_INTERVAL) - if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { + if dataRetentionInterface := srv.DataRetention; dataRetentionInterface != nil { workers.DataRetention = dataRetentionInterface.MakeWorker() } - if elasticsearchIndexerInterface := ejobs.GetElasticsearchIndexerInterface(); elasticsearchIndexerInterface != nil { + if elasticsearchIndexerInterface := srv.ElasticsearchIndexer; elasticsearchIndexerInterface != nil { workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker() } - if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil { + if elasticsearchAggregatorInterface := srv.ElasticsearchAggregator; elasticsearchAggregatorInterface != nil { workers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeWorker() } - if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil { + if ldapSyncInterface := srv.LdapSync; ldapSyncInterface != nil { workers.LdapSync = ldapSyncInterface.MakeWorker() } -- cgit v1.2.3-1-g7c22