diff options
author | George Goldberg <george@gberg.me> | 2018-07-31 15:40:23 +0100 |
---|---|---|
committer | Christopher Speller <crspeller@gmail.com> | 2018-07-31 07:40:23 -0700 |
commit | 8766690c81fcefdbe0c9d85590de1eea07a908d7 (patch) | |
tree | ab37e369a2c8afc87a3238bbf028aa82ef1bc125 | |
parent | fcb4ee935ef97ca5c79c7433b2be2709fc62e87f (diff) | |
download | chat-8766690c81fcefdbe0c9d85590de1eea07a908d7.tar.gz chat-8766690c81fcefdbe0c9d85590de1eea07a908d7.tar.bz2 chat-8766690c81fcefdbe0c9d85590de1eea07a908d7.zip |
MM-10502: Only cluster master should run job schedulers. (#9174)
* MM-10502: Only cluster master should run job schedulers.
* Use sync.Map for thread safety.
* Fix tests.
-rw-r--r-- | app/app.go | 33 | ||||
-rw-r--r-- | app/cluster.go | 29 | ||||
-rw-r--r-- | jobs/schedulers.go | 38 | ||||
-rw-r--r-- | jobs/server.go | 4 |
4 files changed, 80 insertions, 24 deletions
diff --git a/app/app.go b/app/app.go index 6da16c28c..5cedca2ad 100644 --- a/app/app.go +++ b/app/app.go @@ -64,10 +64,11 @@ type App struct { Mfa einterfaces.MfaInterface Saml einterfaces.SamlInterface - config atomic.Value - envConfig map[string]interface{} - configFile string - configListeners map[string]func(*model.Config, *model.Config) + config atomic.Value + envConfig map[string]interface{} + configFile string + configListeners map[string]func(*model.Config, *model.Config) + clusterLeaderListeners sync.Map licenseValue atomic.Value clientLicenseValue atomic.Value @@ -79,14 +80,15 @@ type App struct { newStore func() store.Store - htmlTemplateWatcher *utils.HTMLTemplateWatcher - sessionCache *utils.Cache - configListenerId string - licenseListenerId string - logListenerId string - disableConfigWatch bool - configWatcher *utils.ConfigWatcher - asymmetricSigningKey *ecdsa.PrivateKey + htmlTemplateWatcher *utils.HTMLTemplateWatcher + sessionCache *utils.Cache + configListenerId string + licenseListenerId string + logListenerId string + clusterLeaderListenerId string + disableConfigWatch bool + configWatcher *utils.ConfigWatcher + asymmetricSigningKey *ecdsa.PrivateKey pluginCommands []*PluginCommand pluginCommandsLock sync.RWMutex @@ -218,6 +220,10 @@ func New(options ...Option) (outApp *App, outErr error) { app.initJobs() }) + app.clusterLeaderListenerId = app.AddClusterLeaderChangedListener(func() { + app.Jobs.Schedulers.HandleClusterLeaderChange(app.IsLeader()) + }) + subpath, err := utils.GetSubpathFromConfig(app.Config()) if err != nil { return nil, errors.Wrap(err, "failed to parse SiteURL subpath") @@ -270,6 +276,7 @@ func (a *App) Shutdown() { a.RemoveConfigListener(a.configListenerId) a.RemoveLicenseListener(a.licenseListenerId) a.RemoveConfigListener(a.logListenerId) + a.RemoveClusterLeaderChangedListener(a.clusterLeaderListenerId) mlog.Info("Server stopped") a.DisableConfigWatch() @@ -432,6 +439,8 @@ func (a *App) initJobs() { if jobsMigrationsInterface != nil { a.Jobs.Migrations = jobsMigrationsInterface(a) } + a.Jobs.Workers = a.Jobs.InitWorkers() + a.Jobs.Schedulers = a.Jobs.InitSchedulers() } func (a *App) DiagnosticId() string { diff --git a/app/cluster.go b/app/cluster.go new file mode 100644 index 000000000..020e57c61 --- /dev/null +++ b/app/cluster.go @@ -0,0 +1,29 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package app + +import "github.com/mattermost/mattermost-server/model" + +// Registers a given function to be called when the cluster leader may have changed. Returns a unique ID for the +// listener which can later be used to remove it. If clustering is not enabled in this build, the callback will never +// be called. +func (a *App) AddClusterLeaderChangedListener(listener func()) string { + id := model.NewId() + a.clusterLeaderListeners.Store(id, listener) + return id +} + +// Removes a listener function by the unique ID returned when AddConfigListener was called +func (a *App) RemoveClusterLeaderChangedListener(id string) { + a.clusterLeaderListeners.Delete(id) +} + +func (a *App) InvokeClusterLeaderChangedListeners() { + a.Go(func() { + a.clusterLeaderListeners.Range(func(_, listener interface{}) bool { + listener.(func())() + return true + }) + }) +} diff --git a/jobs/schedulers.go b/jobs/schedulers.go index 96aa2b635..37a64bc22 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -13,12 +13,13 @@ import ( ) type Schedulers struct { - stop chan bool - stopped chan bool - configChanged chan *model.Config - listenerId string - startOnce sync.Once - jobs *JobServer + stop chan bool + stopped chan bool + configChanged chan *model.Config + clusterLeaderChanged chan bool + listenerId string + startOnce sync.Once + jobs *JobServer schedulers []model.Scheduler nextRunTimes []*time.Time @@ -28,10 +29,11 @@ func (srv *JobServer) InitSchedulers() *Schedulers { mlog.Debug("Initialising schedulers.") schedulers := &Schedulers{ - stop: make(chan bool), - stopped: make(chan bool), - configChanged: make(chan *model.Config), - jobs: srv, + stop: make(chan bool), + stopped: make(chan bool), + configChanged: make(chan *model.Config), + clusterLeaderChanged: make(chan bool), + jobs: srv, } if srv.DataRetentionJob != nil { @@ -114,6 +116,14 @@ func (schedulers *Schedulers) Start() *Schedulers { schedulers.setNextRunTime(newCfg, idx, now, false) } } + case isLeader := <-schedulers.clusterLeaderChanged: + for idx := range schedulers.schedulers { + if !isLeader { + schedulers.nextRunTimes[idx] = nil + } else { + schedulers.setNextRunTime(schedulers.jobs.Config(), idx, now, false) + } + } } } }) @@ -171,3 +181,11 @@ func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newCon mlog.Debug("Schedulers received config change.") schedulers.configChanged <- newConfig } + +func (schedulers *Schedulers) HandleClusterLeaderChange(isLeader bool) { + select { + case schedulers.clusterLeaderChanged <- isLeader: + default: + mlog.Debug("Did not send cluster leader change message to schedulers as no schedulers listening to notification channel.") + } +} diff --git a/jobs/server.go b/jobs/server.go index 10ea9a46f..cffc60da1 100644 --- a/jobs/server.go +++ b/jobs/server.go @@ -50,11 +50,11 @@ func (srv *JobServer) Config() *model.Config { } func (srv *JobServer) StartWorkers() { - srv.Workers = srv.InitWorkers().Start() + srv.Workers = srv.Workers.Start() } func (srv *JobServer) StartSchedulers() { - srv.Schedulers = srv.InitSchedulers().Start() + srv.Schedulers = srv.Schedulers.Start() } func (srv *JobServer) StopWorkers() { |