diff options
Diffstat (limited to 'store/sqlstore/job_store.go')
-rw-r--r-- | store/sqlstore/job_store.go | 349 |
1 files changed, 349 insertions, 0 deletions
diff --git a/store/sqlstore/job_store.go b/store/sqlstore/job_store.go new file mode 100644 index 000000000..c56f526af --- /dev/null +++ b/store/sqlstore/job_store.go @@ -0,0 +1,349 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package sqlstore + +import ( + "database/sql" + "net/http" + + "github.com/mattermost/gorp" + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store" +) + +type SqlJobStore struct { + SqlStore +} + +func NewSqlJobStore(sqlStore SqlStore) store.JobStore { + s := &SqlJobStore{sqlStore} + + for _, db := range sqlStore.GetAllConns() { + table := db.AddTableWithName(model.Job{}, "Jobs").SetKeys(false, "Id") + table.ColMap("Id").SetMaxSize(26) + table.ColMap("Type").SetMaxSize(32) + table.ColMap("Status").SetMaxSize(32) + table.ColMap("Data").SetMaxSize(1024) + } + + return s +} + +func (jss SqlJobStore) CreateIndexesIfNotExists() { + jss.CreateIndexIfNotExists("idx_jobs_type", "Jobs", "Type") +} + +func (jss SqlJobStore) Save(job *model.Job) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + if err := jss.GetMaster().Insert(job); err != nil { + result.Err = model.NewAppError("SqlJobStore.Save", "store.sql_job.save.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError) + } else { + result.Data = job + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) UpdateOptimistically(job *model.Job, currentStatus string) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + + if sqlResult, err := jss.GetMaster().Exec( + `UPDATE + Jobs + SET + LastActivityAt = :LastActivityAt, + Status = :Status, + Progress = :Progress, + Data = :Data + WHERE + Id = :Id + AND + Status = :OldStatus`, + map[string]interface{}{ + "Id": job.Id, + "OldStatus": currentStatus, + "LastActivityAt": model.GetMillis(), + "Status": job.Status, + "Data": job.DataToJson(), + "Progress": job.Progress, + }); err != nil { + result.Err = model.NewAppError("SqlJobStore.UpdateOptimistically", "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError) + } else { + rows, err := sqlResult.RowsAffected() + + if err != nil { + result.Err = model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError) + } else { + if rows == 1 { + result.Data = true + } else { + result.Data = false + } + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) UpdateStatus(id string, status string) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + + job := &model.Job{ + Id: id, + Status: status, + LastActivityAt: model.GetMillis(), + } + + if _, err := jss.GetMaster().UpdateColumns(func(col *gorp.ColumnMap) bool { + return col.ColumnName == "Status" || col.ColumnName == "LastActivityAt" + }, job); err != nil { + result.Err = model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError) + } + + if result.Err == nil { + result.Data = job + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + + var startAtClause string + if newStatus == model.JOB_STATUS_IN_PROGRESS { + startAtClause = `StartAt = :StartAt,` + } + + if sqlResult, err := jss.GetMaster().Exec( + `UPDATE + Jobs + SET `+startAtClause+` + Status = :NewStatus, + LastActivityAt = :LastActivityAt + WHERE + Id = :Id + AND + Status = :OldStatus`, map[string]interface{}{"Id": id, "OldStatus": currentStatus, "NewStatus": newStatus, "StartAt": model.GetMillis(), "LastActivityAt": model.GetMillis()}); err != nil { + result.Err = model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError) + } else { + rows, err := sqlResult.RowsAffected() + + if err != nil { + result.Err = model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError) + } else { + if rows == 1 { + result.Data = true + } else { + result.Data = false + } + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) Get(id string) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + + var status *model.Job + + if err := jss.GetReplica().SelectOne(&status, + `SELECT + * + FROM + Jobs + WHERE + Id = :Id`, map[string]interface{}{"Id": id}); err != nil { + if err == sql.ErrNoRows { + result.Err = model.NewAppError("SqlJobStore.Get", "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound) + } else { + result.Err = model.NewAppError("SqlJobStore.Get", "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError) + } + } else { + result.Data = status + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) GetAllPage(offset int, limit int) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + + var statuses []*model.Job + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + Jobs + ORDER BY + CreateAt DESC + LIMIT + :Limit + OFFSET + :Offset`, map[string]interface{}{"Limit": limit, "Offset": offset}); err != nil { + result.Err = model.NewAppError("SqlJobStore.GetAllPage", "store.sql_job.get_all.app_error", nil, err.Error(), http.StatusInternalServerError) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) GetAllByType(jobType string) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + + var statuses []*model.Job + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + Jobs + WHERE + Type = :Type + ORDER BY + CreateAt DESC`, map[string]interface{}{"Type": jobType}); err != nil { + result.Err = model.NewAppError("SqlJobStore.GetAllByType", "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error(), http.StatusInternalServerError) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) GetAllByTypePage(jobType string, offset int, limit int) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + + var statuses []*model.Job + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + Jobs + WHERE + Type = :Type + ORDER BY + CreateAt DESC + LIMIT + :Limit + OFFSET + :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil { + result.Err = model.NewAppError("SqlJobStore.GetAllByTypePage", "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error(), http.StatusInternalServerError) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) GetAllByStatus(status string) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + + var statuses []*model.Job + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + Jobs + WHERE + Status = :Status + ORDER BY + CreateAt ASC`, map[string]interface{}{"Status": status}); err != nil { + result.Err = model.NewAppError("SqlJobStore.GetAllByStatus", "store.sql_job.get_all.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) Delete(id string) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + + if _, err := jss.GetMaster().Exec( + `DELETE FROM + Jobs + WHERE + Id = :Id`, map[string]interface{}{"Id": id}); err != nil { + result.Err = model.NewAppError("SqlJobStore.DeleteByType", "store.sql_job.delete.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError) + } else { + result.Data = id + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} |