From a06830b2f88a8d374c326a1191870cbc7cf7dac2 Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Thu, 28 Sep 2017 17:11:13 +0100 Subject: PLT-7644: Improve job scheduler architecture. (#7532) --- jobs/schedulers.go | 174 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 114 insertions(+), 60 deletions(-) (limited to 'jobs/schedulers.go') diff --git a/jobs/schedulers.go b/jobs/schedulers.go index 1cb4a6f28..cdf8d956d 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -5,106 +5,160 @@ package jobs import ( "sync" + "time" l4g "github.com/alecthomas/log4go" - ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" + ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) type Schedulers struct { - startOnce sync.Once - - DataRetention model.Scheduler - ElasticsearchAggregation model.Scheduler - LdapSync model.Scheduler - - listenerId string + stop chan bool + stopped chan bool + configChanged chan *model.Config + listenerId string + startOnce sync.Once + + schedulers []model.Scheduler + nextRunTimes []*time.Time } func InitSchedulers() *Schedulers { - schedulers := &Schedulers{} + l4g.Debug("Initialising schedulers.") + schedulers := &Schedulers{ + stop: make(chan bool), + stopped: make(chan bool), + configChanged: make(chan *model.Config), + } if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { - schedulers.DataRetention = dataRetentionInterface.MakeScheduler() + schedulers.schedulers = append(schedulers.schedulers, dataRetentionInterface.MakeScheduler()) } if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil { - schedulers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeScheduler() + schedulers.schedulers = append(schedulers.schedulers, elasticsearchAggregatorInterface.MakeScheduler()) } - if ldaySyncInterface := ejobs.GetLdapSyncInterface(); ldaySyncInterface != nil { - schedulers.LdapSync = ldaySyncInterface.MakeScheduler() + if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil { + schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler()) } + schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers)) return schedulers } func (schedulers *Schedulers) Start() *Schedulers { - l4g.Info("Starting schedulers") - - schedulers.startOnce.Do(func() { - if schedulers.DataRetention != nil && (*utils.Cfg.DataRetentionSettings.EnableMessageDeletion || *utils.Cfg.DataRetentionSettings.EnableFileDeletion) { - go schedulers.DataRetention.Run() - } - - if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing { - go schedulers.ElasticsearchAggregation.Run() - } + schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange) - if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable { - go schedulers.LdapSync.Run() - } - }) + go func() { + schedulers.startOnce.Do(func() { + l4g.Info("Starting schedulers.") + + defer func() { + l4g.Info("Schedulers stopped.") + close(schedulers.stopped) + }() + + now := time.Now() + for idx, scheduler := range schedulers.schedulers { + if !scheduler.Enabled(utils.Cfg) { + schedulers.nextRunTimes[idx] = nil + } else { + schedulers.setNextRunTime(utils.Cfg, idx, now, false) + } + } + + for { + select { + case <-schedulers.stop: + l4g.Debug("Schedulers received stop signal.") + return + case now = <-time.After(1 * time.Minute): + cfg := utils.Cfg + + for idx, nextTime := range schedulers.nextRunTimes { + if nextTime == nil { + continue + } + + if time.Now().After(*nextTime) { + scheduler := schedulers.schedulers[idx] + if scheduler != nil { + if scheduler.Enabled(cfg) { + if _, err := schedulers.scheduleJob(cfg, scheduler); err != nil { + l4g.Warn("Failed to schedule job with scheduler: %v", scheduler.Name()) + l4g.Error(err) + } else { + schedulers.setNextRunTime(cfg, idx, now, true) + } + } + } + } + } + case newCfg := <-schedulers.configChanged: + for idx, scheduler := range schedulers.schedulers { + if !scheduler.Enabled(newCfg) { + schedulers.nextRunTimes[idx] = nil + } else { + schedulers.setNextRunTime(newCfg, idx, now, false) + } + } + } + } + }) + }() - schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange) + return schedulers +} +func (schedulers *Schedulers) Stop() *Schedulers { + l4g.Info("Stopping schedulers.") + close(schedulers.stop) + <-schedulers.stopped return schedulers } -func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { - if schedulers.DataRetention != nil { - if (!*oldConfig.DataRetentionSettings.EnableMessageDeletion && !*oldConfig.DataRetentionSettings.EnableFileDeletion) && (*newConfig.DataRetentionSettings.EnableMessageDeletion || *newConfig.DataRetentionSettings.EnableFileDeletion) { - go schedulers.DataRetention.Run() - } else if (*oldConfig.DataRetentionSettings.EnableMessageDeletion || *oldConfig.DataRetentionSettings.EnableFileDeletion) && (!*newConfig.DataRetentionSettings.EnableMessageDeletion && !*newConfig.DataRetentionSettings.EnableFileDeletion) { - schedulers.DataRetention.Stop() - } - } +func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now time.Time, pendingJobs bool) { + scheduler := schedulers.schedulers[idx] - if schedulers.ElasticsearchAggregation != nil { - if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing { - go schedulers.ElasticsearchAggregation.Run() - } else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing { - schedulers.ElasticsearchAggregation.Stop() + if !pendingJobs { + if pj, err := CheckForPendingJobsByType(scheduler.JobType()); err != nil { + l4g.Error("Failed to set next job run time: " + err.Error()) + schedulers.nextRunTimes[idx] = nil + return + } else { + pendingJobs = pj } } - if schedulers.LdapSync != nil { - if !*oldConfig.LdapSettings.Enable && *newConfig.LdapSettings.Enable { - go schedulers.LdapSync.Run() - } else if *oldConfig.LdapSettings.Enable && !*newConfig.LdapSettings.Enable { - schedulers.LdapSync.Stop() - } + lastSuccessfulJob, err := GetLastSuccessfulJobByType(scheduler.JobType()) + if err != nil { + l4g.Error("Failed to set next job run time: " + err.Error()) + schedulers.nextRunTimes[idx] = nil + return } -} -func (schedulers *Schedulers) Stop() *Schedulers { - utils.RemoveConfigListener(schedulers.listenerId) - - if schedulers.DataRetention != nil && (*utils.Cfg.DataRetentionSettings.EnableMessageDeletion || *utils.Cfg.DataRetentionSettings.EnableFileDeletion) { - schedulers.DataRetention.Stop() - } + schedulers.nextRunTimes[idx] = scheduler.NextScheduleTime(cfg, now, pendingJobs, lastSuccessfulJob) + l4g.Debug("Next run time for scheduler %v: %v", scheduler.Name(), schedulers.nextRunTimes[idx]) +} - if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing { - schedulers.ElasticsearchAggregation.Stop() +func (schedulers *Schedulers) scheduleJob(cfg *model.Config, scheduler model.Scheduler) (*model.Job, *model.AppError) { + pendingJobs, err := CheckForPendingJobsByType(scheduler.JobType()) + if err != nil { + return nil, err } - if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable { - schedulers.LdapSync.Stop() + lastSuccessfulJob, err2 := GetLastSuccessfulJobByType(scheduler.JobType()) + if err2 != nil { + return nil, err } - l4g.Info("Stopped schedulers") + return scheduler.ScheduleJob(cfg, pendingJobs, lastSuccessfulJob) +} - return schedulers +func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { + l4g.Debug("Schedulers received config change.") + schedulers.configChanged <- newConfig } -- cgit v1.2.3-1-g7c22