diff options
Diffstat (limited to 'store')
-rw-r--r-- | store/sql_job_status_store.go | 190 | ||||
-rw-r--r-- | store/sql_job_status_store_test.go | 151 | ||||
-rw-r--r-- | store/sql_store.go | 19 | ||||
-rw-r--r-- | store/store.go | 9 |
4 files changed, 369 insertions, 0 deletions
diff --git a/store/sql_job_status_store.go b/store/sql_job_status_store.go new file mode 100644 index 000000000..ef039d99a --- /dev/null +++ b/store/sql_job_status_store.go @@ -0,0 +1,190 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package store + +import ( + "database/sql" + "net/http" + + "github.com/mattermost/platform/model" +) + +type SqlJobStatusStore struct { + *SqlStore +} + +func NewSqlJobStatusStore(sqlStore *SqlStore) JobStatusStore { + s := &SqlJobStatusStore{sqlStore} + + for _, db := range sqlStore.GetAllConns() { + table := db.AddTableWithName(model.JobStatus{}, "JobStatuses").SetKeys(false, "Id") + table.ColMap("Id").SetMaxSize(26) + table.ColMap("Type").SetMaxSize(32) + table.ColMap("Status").SetMaxSize(32) + table.ColMap("Data").SetMaxSize(1024) + } + + return s +} + +func (jss SqlJobStatusStore) CreateIndexesIfNotExists() { + jss.CreateIndexIfNotExists("idx_jobstatuses_type", "JobStatuses", "Type") +} + +func (jss SqlJobStatusStore) SaveOrUpdate(status *model.JobStatus) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + if err := jss.GetReplica().SelectOne(&model.JobStatus{}, + `SELECT + * + FROM + JobStatuses + WHERE + Id = :Id`, map[string]interface{}{"Id": status.Id}); err == nil { + if _, err := jss.GetMaster().Update(status); err != nil { + result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate", + "store.sql_job_status.update.app_error", nil, "id="+status.Id+", "+err.Error()) + } + } else if err == sql.ErrNoRows { + if err := jss.GetMaster().Insert(status); err != nil { + result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate", + "store.sql_job_status.save.app_error", nil, "id="+status.Id+", "+err.Error()) + } + } else { + result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate", + "store.sql_job_status.save_or_update.app_error", nil, "id="+status.Id+", "+err.Error()) + } + + if result.Err == nil { + result.Data = status + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStatusStore) Get(id string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var status *model.JobStatus + + if err := jss.GetReplica().SelectOne(&status, + `SELECT + * + FROM + JobStatuses + WHERE + Id = :Id`, map[string]interface{}{"Id": id}); err != nil { + if err == sql.ErrNoRows { + result.Err = model.NewAppError("SqlJobStatusStore.Get", + "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound) + } else { + result.Err = model.NewAppError("SqlJobStatusStore.Get", + "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError) + } + } else { + result.Data = status + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStatusStore) GetAllByType(jobType string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var statuses []*model.JobStatus + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + JobStatuses + WHERE + Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil { + result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByType", + "store.sql_job_status.get_all_by_type.app_error", nil, "Type="+jobType+", "+err.Error()) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStatusStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var statuses []*model.JobStatus + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + JobStatuses + WHERE + Type = :Type + ORDER BY + StartAt ASC + LIMIT + :Limit + OFFSET + :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil { + result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByTypePage", + "store.sql_job_status.get_all_by_type_page.app_error", nil, "Type="+jobType+", "+err.Error()) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStatusStore) Delete(id string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + if _, err := jss.GetReplica().Exec( + `DELETE FROM + JobStatuses + WHERE + Id = :Id`, map[string]interface{}{"Id": id}); err != nil { + result.Err = model.NewLocAppError("SqlJobStatusStore.DeleteByType", + "store.sql_job_status.delete.app_error", nil, "id="+id+", "+err.Error()) + } else { + result.Data = id + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} diff --git a/store/sql_job_status_store_test.go b/store/sql_job_status_store_test.go new file mode 100644 index 000000000..18c29e522 --- /dev/null +++ b/store/sql_job_status_store_test.go @@ -0,0 +1,151 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package store + +import ( + "testing" + + "github.com/mattermost/platform/model" +) + +func TestJobStatusSaveGetUpdate(t *testing.T) { + Setup() + + status := &model.JobStatus{ + Id: model.NewId(), + Type: model.NewId(), + Status: model.NewId(), + Data: map[string]interface{}{ + "Processed": 0, + "Total": 12345, + "LastProcessed": "abcd", + }, + } + + if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil { + t.Fatal(result.Err) + } + + defer func() { + <-store.JobStatus().Delete(status.Id) + }() + + if result := <-store.JobStatus().Get(status.Id); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.(*model.JobStatus); received.Id != status.Id { + t.Fatal("received incorrect status after save") + } + + status.Status = model.NewId() + status.Data = map[string]interface{}{ + "Processed": 12345, + "Total": 12345, + "LastProcessed": "abcd", + } + + if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil { + t.Fatal(result.Err) + } + + if result := <-store.JobStatus().Get(status.Id); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.(*model.JobStatus); received.Id != status.Id || received.Status != status.Status { + t.Fatal("received incorrect status after update") + } +} + +func TestJobStatusGetAllByType(t *testing.T) { + Setup() + + jobType := model.NewId() + + statuses := []*model.JobStatus{ + { + Id: model.NewId(), + Type: jobType, + }, + { + Id: model.NewId(), + Type: jobType, + }, + { + Id: model.NewId(), + Type: model.NewId(), + }, + } + + for _, status := range statuses { + Must(store.JobStatus().SaveOrUpdate(status)) + defer store.JobStatus().Delete(status.Id) + } + + if result := <-store.JobStatus().GetAllByType(jobType); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.JobStatus); len(received) != 2 { + t.Fatal("received wrong number of statuses") + } else if received[0].Id != statuses[0].Id && received[1].Id != statuses[0].Id { + t.Fatal("should've received first status") + } else if received[0].Id != statuses[1].Id && received[1].Id != statuses[1].Id { + t.Fatal("should've received second status") + } +} + +func TestJobStatusGetAllByTypePage(t *testing.T) { + Setup() + + jobType := model.NewId() + + statuses := []*model.JobStatus{ + { + Id: model.NewId(), + Type: jobType, + StartAt: 1000, + }, + { + Id: model.NewId(), + Type: jobType, + StartAt: 999, + }, + { + Id: model.NewId(), + Type: jobType, + StartAt: 1001, + }, + } + + for _, status := range statuses { + Must(store.JobStatus().SaveOrUpdate(status)) + defer store.JobStatus().Delete(status.Id) + } + + if result := <-store.JobStatus().GetAllByTypePage(jobType, 0, 2); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.JobStatus); len(received) != 2 { + t.Fatal("received wrong number of statuses") + } else if received[0].Id != statuses[1].Id { + t.Fatal("should've received newest job first") + } else if received[1].Id != statuses[0].Id { + t.Fatal("should've received second newest job second") + } + + if result := <-store.JobStatus().GetAllByTypePage(jobType, 2, 2); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.JobStatus); len(received) != 1 { + t.Fatal("received wrong number of statuses") + } else if received[0].Id != statuses[2].Id { + t.Fatal("should've received oldest job last") + } +} + +func TestJobStatusDelete(t *testing.T) { + Setup() + + status := Must(store.JobStatus().SaveOrUpdate(&model.JobStatus{ + Id: model.NewId(), + })).(*model.JobStatus) + + if result := <-store.JobStatus().Delete(status.Id); result.Err != nil { + t.Fatal(result.Err) + } +} diff --git a/store/sql_store.go b/store/sql_store.go index f13fe2ec0..4261c849a 100644 --- a/store/sql_store.go +++ b/store/sql_store.go @@ -87,6 +87,7 @@ type SqlStore struct { status StatusStore fileInfo FileInfoStore reaction ReactionStore + jobStatus JobStatusStore SchemaVersion string rrCounter int64 srCounter int64 @@ -151,6 +152,7 @@ func NewSqlStore() Store { sqlStore.status = NewSqlStatusStore(sqlStore) sqlStore.fileInfo = NewSqlFileInfoStore(sqlStore) sqlStore.reaction = NewSqlReactionStore(sqlStore) + sqlStore.jobStatus = NewSqlJobStatusStore(sqlStore) err := sqlStore.master.CreateTablesIfNotExists() if err != nil { @@ -179,6 +181,7 @@ func NewSqlStore() Store { sqlStore.status.(*SqlStatusStore).CreateIndexesIfNotExists() sqlStore.fileInfo.(*SqlFileInfoStore).CreateIndexesIfNotExists() sqlStore.reaction.(*SqlReactionStore).CreateIndexesIfNotExists() + sqlStore.jobStatus.(*SqlJobStatusStore).CreateIndexesIfNotExists() sqlStore.preference.(*SqlPreferenceStore).DeleteUnusedFeatures() @@ -735,6 +738,10 @@ func (ss *SqlStore) Reaction() ReactionStore { return ss.reaction } +func (ss *SqlStore) JobStatus() JobStatusStore { + return ss.jobStatus +} + func (ss *SqlStore) DropAllTables() { ss.master.TruncateTables() } @@ -752,6 +759,8 @@ func (me mattermConverter) ToDb(val interface{}) (interface{}, error) { return encrypt([]byte(utils.Cfg.SqlSettings.AtRestEncryptKey), model.MapToJson(t)) case model.StringInterface: return model.StringInterfaceToJson(t), nil + case map[string]interface{}: + return model.StringInterfaceToJson(model.StringInterface(t)), nil } return val, nil @@ -805,6 +814,16 @@ func (me mattermConverter) FromDb(target interface{}) (gorp.CustomScanner, bool) return json.Unmarshal(b, target) } return gorp.CustomScanner{Holder: new(string), Target: target, Binder: binder}, true + case *map[string]interface{}: + binder := func(holder, target interface{}) error { + s, ok := holder.(*string) + if !ok { + return errors.New(utils.T("store.sql.convert_string_interface")) + } + b := []byte(*s) + return json.Unmarshal(b, target) + } + return gorp.CustomScanner{Holder: new(string), Target: target, Binder: binder}, true } return gorp.CustomScanner{}, false diff --git a/store/store.go b/store/store.go index acbeafdd6..cd7792ce1 100644 --- a/store/store.go +++ b/store/store.go @@ -47,6 +47,7 @@ type Store interface { Status() StatusStore FileInfo() FileInfoStore Reaction() ReactionStore + JobStatus() JobStatusStore MarkSystemRanUnitTests() Close() DropAllTables() @@ -371,3 +372,11 @@ type ReactionStore interface { GetForPost(postId string, allowFromCache bool) StoreChannel DeleteAllWithEmojiName(emojiName string) StoreChannel } + +type JobStatusStore interface { + SaveOrUpdate(status *model.JobStatus) StoreChannel + Get(id string) StoreChannel + GetAllByType(jobType string) StoreChannel + GetAllByTypePage(jobType string, offset int, limit int) StoreChannel + Delete(id string) StoreChannel +} |