diff options
Diffstat (limited to 'jobs/jobs_watcher.go')
-rw-r--r-- | jobs/jobs_watcher.go | 26 |
1 files changed, 14 insertions, 12 deletions
diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go index 5892e3b1c..83d4249eb 100644 --- a/jobs/jobs_watcher.go +++ b/jobs/jobs_watcher.go @@ -38,7 +38,7 @@ func (watcher *Watcher) Start() { rand.Seed(time.Now().UTC().UnixNano()) _ = <-time.After(time.Duration(rand.Intn(WATCHER_POLLING_INTERVAL)) * time.Millisecond) - defer func(){ + defer func() { l4g.Debug("Watcher Finished") watcher.stopped <- true }() @@ -64,25 +64,27 @@ func (watcher *Watcher) PollAndNotify() { if result := <-Srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil { l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error()) } else { - jobStatuses := result.Data.([]*model.Job) + jobs := result.Data.([]*model.Job) - for _, js := range jobStatuses { - j := model.Job{ - Type: js.Type, - Id: js.Id, - } - - if js.Type == model.JOB_TYPE_DATA_RETENTION { + for _, job := range jobs { + if job.Type == model.JOB_TYPE_DATA_RETENTION { if watcher.workers.DataRetention != nil { select { - case watcher.workers.DataRetention.JobChannel() <- j: + case watcher.workers.DataRetention.JobChannel() <- *job: default: } } - } else if js.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING { + } else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING { if watcher.workers.ElasticsearchIndexing != nil { select { - case watcher.workers.ElasticsearchIndexing.JobChannel() <- j: + case watcher.workers.ElasticsearchIndexing.JobChannel() <- *job: + default: + } + } + } else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION { + if watcher.workers.ElasticsearchAggregation != nil { + select { + case watcher.workers.ElasticsearchAggregation.JobChannel() <- *job: default: } } |