diff options
-rw-r--r-- | einterfaces/jobs/elasticsearch.go | 22 | ||||
-rw-r--r-- | i18n/en.json | 16 | ||||
-rw-r--r-- | jobs/jobs.go | 52 | ||||
-rw-r--r-- | jobs/jobs_watcher.go | 7 | ||||
-rw-r--r-- | jobs/testworker.go | 2 | ||||
-rw-r--r-- | jobs/workers.go | 31 | ||||
-rw-r--r-- | model/post.go | 6 | ||||
-rw-r--r-- | store/sql_post_store.go | 41 | ||||
-rw-r--r-- | store/sql_post_store_test.go | 69 | ||||
-rw-r--r-- | store/store.go | 1 |
10 files changed, 216 insertions, 31 deletions
diff --git a/einterfaces/jobs/elasticsearch.go b/einterfaces/jobs/elasticsearch.go new file mode 100644 index 000000000..6d6dbe893 --- /dev/null +++ b/einterfaces/jobs/elasticsearch.go @@ -0,0 +1,22 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "github.com/mattermost/platform/model" +) + +type ElasticsearchIndexerInterface interface { + MakeWorker() model.Worker +} + +var theElasticsearchIndexerInterface ElasticsearchIndexerInterface + +func RegisterElasticsearchIndexerInterface(newInterface ElasticsearchIndexerInterface) { + theElasticsearchIndexerInterface = newInterface +} + +func GetElasticsearchIndexerInterface() ElasticsearchIndexerInterface { + return theElasticsearchIndexerInterface +} diff --git a/i18n/en.json b/i18n/en.json index d6eaffc7f..1225ee824 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -3516,19 +3516,19 @@ "translation": "Failed to decode search results" }, { - "id": "ent.elasticsearch.start.connect_failed", + "id": "ent.elasticsearch.create_client.connect_failed", "translation": "Setting up ElasticSearch Client Failed" }, { - "id": "ent.elasticsearch.start.index_create_failed", + "id": "ent.elasticsearch.create_index_if_not_exists.index_create_failed", "translation": "Failed to create ElasticSearch index" }, { - "id": "ent.elasticsearch.start.index_exists_failed", + "id": "ent.elasticsearch.create_index_if_not_exists.index_exists_failed", "translation": "Failed to establish whether ElasticSearch index exists" }, { - "id": "ent.elasticsearch.start.index_mapping_failed", + "id": "ent.elasticsearch.create_index_if_not_exists.index_mapping_failed", "translation": "Failed to setup ElasticSearch index mapping" }, { @@ -3764,6 +3764,10 @@ "translation": "Page not found" }, { + "id": "jobs.set_job_error.update.error", + "translation": "Failed to set job status to error" + }, + { "id": "jobs.request_cancellation.status.error", "translation": "Could not request cancellation for job that is not in a cancelable state." }, @@ -5484,6 +5488,10 @@ "translation": "We couldn't get the parent posts for the channel" }, { + "id": "store.sql_post.get_posts_batch_for_indexing.get.app_error", + "translation": "We couldn't get the posts batch for indexing" + }, + { "id": "store.sql_post.get_posts_by_ids.app_error", "translation": "We couldn't get the posts" }, diff --git a/jobs/jobs.go b/jobs/jobs.go index 58c2f2f13..9247355d0 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -9,6 +9,7 @@ import ( l4g "github.com/alecthomas/log4go" "github.com/mattermost/platform/model" + "net/http" ) const ( @@ -40,27 +41,15 @@ func ClaimJob(job *model.Job) (bool, *model.AppError) { } } -func SetJobProgress(jobId string, progress int64) (bool, *model.AppError) { - var job *model.Job - - if result := <-Srv.Store.Job().Get(jobId); result.Err != nil { - return false, result.Err - } else { - job = result.Data.(*model.Job) - } - +func SetJobProgress(job *model.Job, progress int64) (*model.AppError) { job.Status = model.JOB_STATUS_IN_PROGRESS job.Progress = progress if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { - return false, result.Err + return result.Err } else { - if !result.Data.(bool) { - return false, nil - } + return nil } - - return true, nil } func SetJobSuccess(job *model.Job) *model.AppError { @@ -68,9 +57,34 @@ func SetJobSuccess(job *model.Job) *model.AppError { return result.Err } -func SetJobError(job *model.Job) *model.AppError { - result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) - return result.Err +func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { + if jobError == nil { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) + return result.Err + } + + job.Status = model.JOB_STATUS_ERROR + job.Progress = -1 + if job.Data == nil { + job.Data = make(map[string]interface{}) + } + job.Data["error"] = jobError + + if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + return result.Err + } else { + if !result.Data.(bool) { + if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { + return result.Err + } else { + if !result.Data.(bool) { + return model.NewAppError("Jobs.SetJobError", "jobs.set_job_error.update.error", nil, "id=" + job.Id, http.StatusInternalServerError) + } + } + } + } + + return nil } func SetJobCanceled(job *model.Job) *model.AppError { @@ -91,7 +105,7 @@ func RequestCancellation(job *model.Job) *model.AppError { return nil } - return model.NewLocAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id) + return model.NewAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id, http.StatusInternalServerError) } func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) { diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go index ada957ccc..5979d6207 100644 --- a/jobs/jobs_watcher.go +++ b/jobs/jobs_watcher.go @@ -79,6 +79,13 @@ func (watcher *Watcher) PollAndNotify() { default: } } + } else if js.Type == model.JOB_TYPE_SEARCH_INDEXING { + if watcher.workers.ElasticsearchIndexing != nil { + select { + case watcher.workers.ElasticsearchIndexing.JobChannel() <- j: + default: + } + } } } } diff --git a/jobs/testworker.go b/jobs/testworker.go index f1c8a07a3..385a2073b 100644 --- a/jobs/testworker.go +++ b/jobs/testworker.go @@ -85,7 +85,7 @@ func (worker *TestWorker) DoJob(job *model.Job) { } return } else { - if _, err := SetJobProgress(job.Id, int64(counter*10)); err != nil { + if err := SetJobProgress(job, int64(counter*10)); err != nil { l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error()) } } diff --git a/jobs/workers.go b/jobs/workers.go index a42ec4607..bb80ad79a 100644 --- a/jobs/workers.go +++ b/jobs/workers.go @@ -13,13 +13,13 @@ import ( ) type Workers struct { - startOnce sync.Once - watcher *Watcher + startOnce sync.Once + watcher *Watcher - DataRetention model.Worker - // SearchIndexing model.Job + DataRetention model.Worker + ElasticsearchIndexing model.Worker - listenerId string + listenerId string } func InitWorkers() *Workers { @@ -32,6 +32,10 @@ func InitWorkers() *Workers { workers.DataRetention = dataRetentionInterface.MakeWorker() } + if elasticsearchIndexerInterface := ejobs.GetElasticsearchIndexerInterface(); elasticsearchIndexerInterface != nil { + workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker() + } + return workers } @@ -43,7 +47,9 @@ func (workers *Workers) Start() *Workers { go workers.DataRetention.Run() } - // go workers.SearchIndexing.Run() + if workers.ElasticsearchIndexing != nil && *utils.Cfg.ElasticSearchSettings.EnableIndexing { + go workers.ElasticsearchIndexing.Run() + } go workers.watcher.Start() }) @@ -61,6 +67,14 @@ func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *m workers.DataRetention.Stop() } } + + if workers.ElasticsearchIndexing != nil { + if !*oldConfig.ElasticSearchSettings.EnableIndexing && *newConfig.ElasticSearchSettings.EnableIndexing { + go workers.ElasticsearchIndexing.Run() + } else if *oldConfig.ElasticSearchSettings.EnableIndexing && !*newConfig.ElasticSearchSettings.EnableIndexing { + workers.ElasticsearchIndexing.Stop() + } + } } func (workers *Workers) Stop() *Workers { @@ -71,7 +85,10 @@ func (workers *Workers) Stop() *Workers { if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { workers.DataRetention.Stop() } - // workers.SearchIndexing.Stop() + + if workers.ElasticsearchIndexing != nil && *utils.Cfg.ElasticSearchSettings.EnableIndexing { + workers.ElasticsearchIndexing.Stop() + } l4g.Info("Stopped workers") diff --git a/model/post.go b/model/post.go index f5a398656..55e6f591d 100644 --- a/model/post.go +++ b/model/post.go @@ -62,6 +62,12 @@ type PostPatch struct { HasReactions *bool `json:"has_reactions"` } +type PostForIndexing struct { + Post + TeamId string `json:"team_id"` + ParentCreateAt *int64 `json:"parent_create_at"` +} + func (o *Post) ToJson() string { b, err := json.Marshal(o) if err != nil { diff --git a/store/sql_post_store.go b/store/sql_post_store.go index 6db2d5992..16142681c 100644 --- a/store/sql_post_store.go +++ b/store/sql_post_store.go @@ -1315,3 +1315,44 @@ func (s SqlPostStore) GetPostsByIds(postIds []string) StoreChannel { return storeChannel } + +func (s SqlPostStore) GetPostsBatchForIndexing(startTime int64, limit int) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var posts []*model.PostForIndexing + _, err1 := s.GetSearchReplica().Select(&posts, + `(SELECT + Posts.*, + Channels.TeamId, + ParentPosts.CreateAt ParentCreateAt + FROM + Posts + LEFT JOIN + Channels + ON + Posts.ChannelId = Channels.Id + LEFT JOIN + Posts ParentPosts + ON + Posts.RootId = ParentPosts.Id + WHERE + Posts.CreateAt >= :StartTime + ORDER BY CreateAt ASC + LIMIT :NumPosts)`, + map[string]interface{}{"StartTime": startTime, "NumPosts": limit}) + + if err1 != nil { + result.Err = model.NewLocAppError("SqlPostStore.GetPostContext", "store.sql_post.get_posts_batch_for_indexing.get.app_error", nil, err1.Error()) + } else { + result.Data = posts + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} diff --git a/store/sql_post_store_test.go b/store/sql_post_store_test.go index 00d4185b4..27e816996 100644 --- a/store/sql_post_store_test.go +++ b/store/sql_post_store_test.go @@ -1592,3 +1592,72 @@ func TestPostStoreGetPostsByIds(t *testing.T) { t.Fatalf("Expected 2 posts in results. Got %v", len(ro5)) } } + +func TestPostStoreGetPostsBatchForIndexing(t *testing.T) { + Setup() + + c1 := &model.Channel{} + c1.TeamId = model.NewId() + c1.DisplayName = "Channel1" + c1.Name = "zz" + model.NewId() + "b" + c1.Type = model.CHANNEL_OPEN + c1 = (<-store.Channel().Save(c1)).Data.(*model.Channel) + + c2 := &model.Channel{} + c2.TeamId = model.NewId() + c2.DisplayName = "Channel2" + c2.Name = "zz" + model.NewId() + "b" + c2.Type = model.CHANNEL_OPEN + c2 = (<-store.Channel().Save(c2)).Data.(*model.Channel) + + o1 := &model.Post{} + o1.ChannelId = c1.Id + o1.UserId = model.NewId() + o1.Message = "zz" + model.NewId() + "AAAAAAAAAAA" + o1 = (<-store.Post().Save(o1)).Data.(*model.Post) + + o2 := &model.Post{} + o2.ChannelId = c2.Id + o2.UserId = model.NewId() + o2.Message = "zz" + model.NewId() + "CCCCCCCCC" + o2 = (<-store.Post().Save(o2)).Data.(*model.Post) + + o3 := &model.Post{} + o3.ChannelId = c1.Id + o3.UserId = model.NewId() + o3.ParentId = o1.Id + o3.RootId = o1.Id + o3.Message = "zz" + model.NewId() + "QQQQQQQQQQ" + o3 = (<-store.Post().Save(o3)).Data.(*model.Post) + + if r := Must(store.Post().GetPostsBatchForIndexing(o1.CreateAt, 100)).([]*model.PostForIndexing); len(r) != 3 { + t.Fatalf("Expected 3 posts in results. Got %v", len(r)) + } else { + for _, p := range r { + if p.Id == o1.Id { + if p.TeamId != c1.TeamId { + t.Fatalf("Unexpected team ID") + } + if p.ParentCreateAt != nil { + t.Fatalf("Unexpected parent create at") + } + } else if p.Id == o2.Id { + if p.TeamId != c2.TeamId { + t.Fatalf("Unexpected team ID") + } + if p.ParentCreateAt != nil { + t.Fatalf("Unexpected parent create at") + } + } else if p.Id == o3.Id { + if p.TeamId != c1.TeamId { + t.Fatalf("Unexpected team ID") + } + if *p.ParentCreateAt != o1.CreateAt { + t.Fatalf("Unexpected parent create at") + } + } else { + t.Fatalf("unexpected post returned") + } + } + } +} diff --git a/store/store.go b/store/store.go index 95496b609..062ed0fbd 100644 --- a/store/store.go +++ b/store/store.go @@ -168,6 +168,7 @@ type PostStore interface { GetPostsCreatedAt(channelId string, time int64) StoreChannel Overwrite(post *model.Post) StoreChannel GetPostsByIds(postIds []string) StoreChannel + GetPostsBatchForIndexing(startTime int64, limit int) StoreChannel } type UserStore interface { |