From 22459ee17a3ba0b4487f975b6ebe630cab2d9feb Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Thu, 17 Aug 2017 15:05:17 +0100 Subject: PLT-7302: Aggregate Elasticsearch indexes over a certain age. (#7224) * PLT-7302: Aggregate Elasticsearch indexes over a certain age. This is done by a scheduled daily job, in order to keep the shard count to a sensible level in Elasticsearch. * Use map[string]string instead of StringMap --- api4/job_test.go | 2 +- config/config.json | 4 +++- einterfaces/jobs/elasticsearch.go | 15 ++++++++++++ i18n/en.json | 32 +++++++++++++++++++++++++ jobs/jobs.go | 6 ++--- jobs/jobs_watcher.go | 24 ++++++++++--------- jobs/schedulers.go | 23 +++++++++++++++++- jobs/workers.go | 29 +++++++++++++++++++---- model/config.go | 49 ++++++++++++++++++++++++++++----------- model/job.go | 24 ++++++++++--------- store/sql_job_store_test.go | 21 ++++++++++------- store/sql_supplier.go | 12 ++++++++++ 12 files changed, 187 insertions(+), 54 deletions(-) diff --git a/api4/job_test.go b/api4/job_test.go index 3dcdbe58b..511386810 100644 --- a/api4/job_test.go +++ b/api4/job_test.go @@ -18,7 +18,7 @@ func TestCreateJob(t *testing.T) { job := &model.Job{ Type: model.JOB_TYPE_DATA_RETENTION, - Data: map[string]interface{}{ + Data: map[string]string{ "thing": "stuff", }, } diff --git a/config/config.json b/config/config.json index 5acd7d177..b8657d8d2 100644 --- a/config/config.json +++ b/config/config.json @@ -296,7 +296,9 @@ "EnableSearching": false, "Sniff": true, "PostIndexReplicas": 1, - "PostIndexShards": 1 + "PostIndexShards": 1, + "AggregatePostsAfterDays": 365, + "PostsAggregatorJobStartTime": "03:00" }, "DataRetentionSettings": { "Enable": false diff --git a/einterfaces/jobs/elasticsearch.go b/einterfaces/jobs/elasticsearch.go index 6d6dbe893..ca05b2ef3 100644 --- a/einterfaces/jobs/elasticsearch.go +++ b/einterfaces/jobs/elasticsearch.go @@ -20,3 +20,18 @@ func RegisterElasticsearchIndexerInterface(newInterface ElasticsearchIndexerInte func GetElasticsearchIndexerInterface() ElasticsearchIndexerInterface { return theElasticsearchIndexerInterface } + +type ElasticsearchAggregatorInterface interface { + MakeWorker() model.Worker + MakeScheduler() model.Scheduler +} + +var theElasticsearchAggregatorInterface ElasticsearchAggregatorInterface + +func RegisterElasticsearchAggregatorInterface(newInterface ElasticsearchAggregatorInterface) { + theElasticsearchAggregatorInterface = newInterface +} + +func GetElasticsearchAggregatorInterface() ElasticsearchAggregatorInterface { + return theElasticsearchAggregatorInterface +} diff --git a/i18n/en.json b/i18n/en.json index 8a2d0d770..49f5c1310 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -3787,6 +3787,30 @@ "id": "ent.compliance.run_started.info", "translation": "Compliance export started for job '{{.JobName}}' at '{{.FilePath}}'" }, + { + "id": "ent.elasticsearch.aggregator_worker.get_indexes.error", + "translation": "Elasticsearch aggregator worker failed to get indexes" + }, + { + "id": "ent.elasticsearch.aggregator_worker.create_index_job.error", + "translation": "Elasticsearch aggregator worker failed to create the indexing job" + }, + { + "id": "ent.elasticsearch.aggregator_worker.delete_indexes.error", + "translation": "Elasticsearch aggregator worker failed to delete the indexes" + }, + { + "id": "ent.elasticsearch.aggregator_worker.index_job_failed.error", + "translation": "Elasticsearch aggregator worker failed due to the indexing job failing" + }, + { + "id": "ent.elasticsearch.indexer.do_job.parse_start_time.error", + "translation": "Elasticsearch indexing worker failed to parse the start time" + }, + { + "id": "ent.elasticsearch.indexer.do_job.parse_end_time.error", + "translation": "Elasticsearch indexing worker failed to parse the end time" + }, { "id": "ent.elasticsearch.create_client.connect_failed", "translation": "Setting up Elasticsearch Client Failed" @@ -4435,6 +4459,14 @@ "id": "model.config.is_valid.elastic_search.username.app_error", "translation": "Elastic Search Username setting must be provided when Elastic Search indexing is enabled." }, + { + "id": "model.config.is_valid.elastic_search.aggregate_posts_after_days.app_error", + "translation": "Elasticsearch AggregatePostsAfterDays setting must be a number greater than or equal to 1" + }, + { + "id": "model.config.is_valid.elastic_search.posts_aggregator_job_start_time.app_error", + "translation": "Elasticsearch PostsAggregatorJobStartTime setting must be a time in the format \"hh:mm\"" + }, { "id": "model.config.is_valid.email_batching_buffer_size.app_error", "translation": "Invalid email batching buffer size for email settings. Must be zero or a positive number." diff --git a/jobs/jobs.go b/jobs/jobs.go index 1986b22b6..e478c5a19 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -16,7 +16,7 @@ const ( CANCEL_WATCHER_POLLING_INTERVAL = 5000 ) -func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *model.AppError) { +func CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) { job := model.Job{ Id: model.NewId(), Type: jobType, @@ -70,9 +70,9 @@ func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { job.Status = model.JOB_STATUS_ERROR job.Progress = -1 if job.Data == nil { - job.Data = make(map[string]interface{}) + job.Data = make(map[string]string) } - job.Data["error"] = jobError + job.Data["error"] = jobError.Error() if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { return result.Err diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go index 9ba68e85e..83d4249eb 100644 --- a/jobs/jobs_watcher.go +++ b/jobs/jobs_watcher.go @@ -64,25 +64,27 @@ func (watcher *Watcher) PollAndNotify() { if result := <-Srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil { l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error()) } else { - jobStatuses := result.Data.([]*model.Job) + jobs := result.Data.([]*model.Job) - for _, js := range jobStatuses { - j := model.Job{ - Type: js.Type, - Id: js.Id, - } - - if js.Type == model.JOB_TYPE_DATA_RETENTION { + for _, job := range jobs { + if job.Type == model.JOB_TYPE_DATA_RETENTION { if watcher.workers.DataRetention != nil { select { - case watcher.workers.DataRetention.JobChannel() <- j: + case watcher.workers.DataRetention.JobChannel() <- *job: default: } } - } else if js.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING { + } else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING { if watcher.workers.ElasticsearchIndexing != nil { select { - case watcher.workers.ElasticsearchIndexing.JobChannel() <- j: + case watcher.workers.ElasticsearchIndexing.JobChannel() <- *job: + default: + } + } + } else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION { + if watcher.workers.ElasticsearchAggregation != nil { + select { + case watcher.workers.ElasticsearchAggregation.JobChannel() <- *job: default: } } diff --git a/jobs/schedulers.go b/jobs/schedulers.go index 73ec6661a..2f4e18001 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -16,7 +16,8 @@ import ( type Schedulers struct { startOnce sync.Once - DataRetention model.Scheduler + DataRetention model.Scheduler + ElasticsearchAggregation model.Scheduler listenerId string } @@ -28,6 +29,10 @@ func InitSchedulers() *Schedulers { schedulers.DataRetention = dataRetentionInterface.MakeScheduler() } + if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil { + schedulers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeScheduler() + } + return schedulers } @@ -38,6 +43,10 @@ func (schedulers *Schedulers) Start() *Schedulers { if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { go schedulers.DataRetention.Run() } + + if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing { + go schedulers.ElasticsearchAggregation.Run() + } }) schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange) @@ -53,6 +62,14 @@ func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newCon schedulers.DataRetention.Stop() } } + + if schedulers.ElasticsearchAggregation != nil { + if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing { + go schedulers.ElasticsearchAggregation.Run() + } else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing { + schedulers.ElasticsearchAggregation.Stop() + } + } } func (schedulers *Schedulers) Stop() *Schedulers { @@ -62,6 +79,10 @@ func (schedulers *Schedulers) Stop() *Schedulers { schedulers.DataRetention.Stop() } + if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing { + schedulers.ElasticsearchAggregation.Stop() + } + l4g.Info("Stopped schedulers") return schedulers diff --git a/jobs/workers.go b/jobs/workers.go index 592c001fb..fe38641e7 100644 --- a/jobs/workers.go +++ b/jobs/workers.go @@ -16,16 +16,15 @@ type Workers struct { startOnce sync.Once watcher *Watcher - DataRetention model.Worker - ElasticsearchIndexing model.Worker + DataRetention model.Worker + ElasticsearchIndexing model.Worker + ElasticsearchAggregation model.Worker listenerId string } func InitWorkers() *Workers { - workers := &Workers{ - // SearchIndexing: MakeTestJob(s, "SearchIndexing"), - } + workers := &Workers{} workers.watcher = MakeWatcher(workers) if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { @@ -36,6 +35,10 @@ func InitWorkers() *Workers { workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker() } + if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil { + workers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeWorker() + } + return workers } @@ -51,6 +54,10 @@ func (workers *Workers) Start() *Workers { go workers.ElasticsearchIndexing.Run() } + if workers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing { + go workers.ElasticsearchAggregation.Run() + } + go workers.watcher.Start() }) @@ -75,6 +82,14 @@ func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *m workers.ElasticsearchIndexing.Stop() } } + + if workers.ElasticsearchAggregation != nil { + if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing { + go workers.ElasticsearchAggregation.Run() + } else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing { + workers.ElasticsearchAggregation.Stop() + } + } } func (workers *Workers) Stop() *Workers { @@ -90,6 +105,10 @@ func (workers *Workers) Stop() *Workers { workers.ElasticsearchIndexing.Stop() } + if workers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing { + workers.ElasticsearchAggregation.Stop() + } + l4g.Info("Stopped workers") return workers diff --git a/model/config.go b/model/config.go index 933c643f2..9906723e7 100644 --- a/model/config.go +++ b/model/config.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "net/url" + "time" ) const ( @@ -123,11 +124,13 @@ const ( ANNOUNCEMENT_SETTINGS_DEFAULT_BANNER_COLOR = "#f2a93b" ANNOUNCEMENT_SETTINGS_DEFAULT_BANNER_TEXT_COLOR = "#333333" - ELASTICSEARCH_SETTINGS_DEFAULT_CONNECTION_URL = "" - ELASTICSEARCH_SETTINGS_DEFAULT_USERNAME = "" - ELASTICSEARCH_SETTINGS_DEFAULT_PASSWORD = "" - ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_REPLICAS = 1 - ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_SHARDS = 1 + ELASTICSEARCH_SETTINGS_DEFAULT_CONNECTION_URL = "" + ELASTICSEARCH_SETTINGS_DEFAULT_USERNAME = "" + ELASTICSEARCH_SETTINGS_DEFAULT_PASSWORD = "" + ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_REPLICAS = 1 + ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_SHARDS = 1 + ELASTICSEARCH_SETTINGS_DEFAULT_AGGREGATE_POSTS_AFTER_DAYS = 365 + ELASTICSEARCH_SETTINGS_DEFAULT_POSTS_AGGREGATOR_JOB_START_TIME = "03:00" ) type ServiceSettings struct { @@ -441,14 +444,16 @@ type WebrtcSettings struct { } type ElasticsearchSettings struct { - ConnectionUrl *string - Username *string - Password *string - EnableIndexing *bool - EnableSearching *bool - Sniff *bool - PostIndexReplicas *int - PostIndexShards *int + ConnectionUrl *string + Username *string + Password *string + EnableIndexing *bool + EnableSearching *bool + Sniff *bool + PostIndexReplicas *int + PostIndexShards *int + AggregatePostsAfterDays *int + PostsAggregatorJobStartTime *string } type DataRetentionSettings struct { @@ -1452,6 +1457,16 @@ func (o *Config) SetDefaults() { *o.ElasticsearchSettings.PostIndexShards = ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_SHARDS } + if o.ElasticsearchSettings.AggregatePostsAfterDays == nil { + o.ElasticsearchSettings.AggregatePostsAfterDays = new(int) + *o.ElasticsearchSettings.AggregatePostsAfterDays = ELASTICSEARCH_SETTINGS_DEFAULT_AGGREGATE_POSTS_AFTER_DAYS + } + + if o.ElasticsearchSettings.PostsAggregatorJobStartTime == nil { + o.ElasticsearchSettings.PostsAggregatorJobStartTime = new(string) + *o.ElasticsearchSettings.PostsAggregatorJobStartTime = ELASTICSEARCH_SETTINGS_DEFAULT_POSTS_AGGREGATOR_JOB_START_TIME + } + if o.DataRetentionSettings.Enable == nil { o.DataRetentionSettings.Enable = new(bool) *o.DataRetentionSettings.Enable = false @@ -1700,6 +1715,14 @@ func (o *Config) IsValid() *AppError { return NewLocAppError("Config.IsValid", "model.config.is_valid.elastic_search.enable_searching.app_error", nil, "") } + if *o.ElasticsearchSettings.AggregatePostsAfterDays < 1 { + return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.aggregate_posts_after_days.app_error", nil, "", http.StatusBadRequest) + } + + if _, err := time.Parse("03:04", *o.ElasticsearchSettings.PostsAggregatorJobStartTime); err != nil { + return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.posts_aggregator_job_start_time.app_error", nil, err.Error(), http.StatusBadRequest) + } + return nil } diff --git a/model/job.go b/model/job.go index 004331a1f..258fa2bd3 100644 --- a/model/job.go +++ b/model/job.go @@ -10,8 +10,9 @@ import ( ) const ( - JOB_TYPE_DATA_RETENTION = "data_retention" - JOB_TYPE_ELASTICSEARCH_POST_INDEXING = "elasticsearch_post_indexing" + JOB_TYPE_DATA_RETENTION = "data_retention" + JOB_TYPE_ELASTICSEARCH_POST_INDEXING = "elasticsearch_post_indexing" + JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION = "elasticsearch_post_aggregation" JOB_STATUS_PENDING = "pending" JOB_STATUS_IN_PROGRESS = "in_progress" @@ -22,15 +23,15 @@ const ( ) type Job struct { - Id string `json:"id"` - Type string `json:"type"` - Priority int64 `json:"priority"` - CreateAt int64 `json:"create_at"` - StartAt int64 `json:"start_at"` - LastActivityAt int64 `json:"last_activity_at"` - Status string `json:"status"` - Progress int64 `json:"progress"` - Data map[string]interface{} `json:"data"` + Id string `json:"id"` + Type string `json:"type"` + Priority int64 `json:"priority"` + CreateAt int64 `json:"create_at"` + StartAt int64 `json:"start_at"` + LastActivityAt int64 `json:"last_activity_at"` + Status string `json:"status"` + Progress int64 `json:"progress"` + Data map[string]string `json:"data"` } func (j *Job) IsValid() *AppError { @@ -45,6 +46,7 @@ func (j *Job) IsValid() *AppError { switch j.Type { case JOB_TYPE_DATA_RETENTION: case JOB_TYPE_ELASTICSEARCH_POST_INDEXING: + case JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION: default: return NewAppError("Job.IsValid", "model.job.is_valid.type.app_error", nil, "id="+j.Id, http.StatusBadRequest) } diff --git a/store/sql_job_store_test.go b/store/sql_job_store_test.go index 97e95ab92..394a09192 100644 --- a/store/sql_job_store_test.go +++ b/store/sql_job_store_test.go @@ -17,9 +17,9 @@ func TestJobSaveGet(t *testing.T) { Id: model.NewId(), Type: model.NewId(), Status: model.NewId(), - Data: map[string]interface{}{ - "Processed": 0, - "Total": 12345, + Data: map[string]string{ + "Processed": "0", + "Total": "12345", "LastProcessed": "abcd", }, } @@ -36,6 +36,8 @@ func TestJobSaveGet(t *testing.T) { t.Fatal(result.Err) } else if received := result.Data.(*model.Job); received.Id != job.Id { t.Fatal("received incorrect job after save") + } else if received.Data["Total"] != "12345" { + t.Fatal("data field was not retrieved successfully:", received.Data) } } @@ -184,6 +186,9 @@ func TestJobGetAllByStatus(t *testing.T) { Type: jobType, CreateAt: 1000, Status: status, + Data: map[string]string{ + "test": "data", + }, }, { Id: model.NewId(), @@ -214,10 +219,10 @@ func TestJobGetAllByStatus(t *testing.T) { t.Fatal(result.Err) } else if received := result.Data.([]*model.Job); len(received) != 3 { t.Fatal("received wrong number of jobs") - } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id { - t.Fatal("should've received first jobs") - } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id { - t.Fatal("should've received second jobs") + } else if received[0].Id != jobs[1].Id || received[1].Id != jobs[0].Id || received[2].Id != jobs[2].Id { + t.Fatal("should've received jobs ordered by CreateAt time") + } else if received[1].Data["test"] != "data" { + t.Fatal("should've received job data field back as saved") } } @@ -237,7 +242,7 @@ func TestJobUpdateOptimistically(t *testing.T) { job.LastActivityAt = model.GetMillis() job.Status = model.JOB_STATUS_IN_PROGRESS job.Progress = 50 - job.Data = map[string]interface{}{ + job.Data = map[string]string{ "Foo": "Bar", } diff --git a/store/sql_supplier.go b/store/sql_supplier.go index 5b9c268bb..f56a9f448 100644 --- a/store/sql_supplier.go +++ b/store/sql_supplier.go @@ -786,6 +786,8 @@ func (me mattermConverter) ToDb(val interface{}) (interface{}, error) { switch t := val.(type) { case model.StringMap: return model.MapToJson(t), nil + case map[string]string: + return model.MapToJson(model.StringMap(t)), nil case model.StringArray: return model.ArrayToJson(t), nil case model.StringInterface: @@ -809,6 +811,16 @@ func (me mattermConverter) FromDb(target interface{}) (gorp.CustomScanner, bool) return json.Unmarshal(b, target) } return gorp.CustomScanner{Holder: new(string), Target: target, Binder: binder}, true + case *map[string]string: + binder := func(holder, target interface{}) error { + s, ok := holder.(*string) + if !ok { + return errors.New(utils.T("store.sql.convert_string_map")) + } + b := []byte(*s) + return json.Unmarshal(b, target) + } + return gorp.CustomScanner{Holder: new(string), Target: target, Binder: binder}, true case *model.StringArray: binder := func(holder, target interface{}) error { s, ok := holder.(*string) -- cgit v1.2.3-1-g7c22