diff options
-rw-r--r-- | api4/apitestlib.go | 9 | ||||
-rw-r--r-- | api4/job.go | 81 | ||||
-rw-r--r-- | api4/job_test.go | 182 | ||||
-rw-r--r-- | app/job.go | 21 | ||||
-rw-r--r-- | app/job_test.go | 14 | ||||
-rw-r--r-- | i18n/en.json | 24 | ||||
-rw-r--r-- | jobs/jobs.go | 16 | ||||
-rw-r--r-- | model/authorization.go | 7 | ||||
-rw-r--r-- | model/client4.go | 36 | ||||
-rw-r--r-- | model/job.go | 37 | ||||
-rw-r--r-- | store/sql_job_store.go | 38 | ||||
-rw-r--r-- | store/sql_job_store_test.go | 76 | ||||
-rw-r--r-- | store/store.go | 1 |
13 files changed, 471 insertions, 71 deletions
diff --git a/api4/apitestlib.go b/api4/apitestlib.go index 537d8610c..d70b9e5f6 100644 --- a/api4/apitestlib.go +++ b/api4/apitestlib.go @@ -24,6 +24,7 @@ import ( "github.com/mattermost/platform/wsapi" s3 "github.com/minio/minio-go" + "github.com/mattermost/platform/jobs" ) type TestHelper struct { @@ -68,6 +69,10 @@ func SetupEnterprise() *TestHelper { *utils.Cfg.TeamSettings.EnableOpenServer = true } + if jobs.Srv.Store == nil { + jobs.Srv.Store = app.Srv.Store + } + th := &TestHelper{} th.Client = th.CreateClient() th.SystemAdminClient = th.CreateClient() @@ -99,6 +104,10 @@ func Setup() *TestHelper { *utils.Cfg.TeamSettings.EnableOpenServer = true } + if jobs.Srv.Store == nil { + jobs.Srv.Store = app.Srv.Store + } + th := &TestHelper{} th.Client = th.CreateClient() th.SystemAdminClient = th.CreateClient() diff --git a/api4/job.go b/api4/job.go index e6c17c42d..941e5d543 100644 --- a/api4/job.go +++ b/api4/job.go @@ -14,8 +14,11 @@ import ( func InitJob() { l4g.Info("Initializing job API routes") - BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}/statuses", ApiSessionRequired(getJobsByType)).Methods("GET") - BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/status", ApiSessionRequired(getJob)).Methods("GET") + BaseRoutes.Jobs.Handle("", ApiSessionRequired(getJobs)).Methods("GET") + BaseRoutes.Jobs.Handle("", ApiSessionRequired(createJob)).Methods("POST") + BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}", ApiSessionRequired(getJob)).Methods("GET") + BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/cancel", ApiSessionRequired(cancelJob)).Methods("POST") + BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}", ApiSessionRequired(getJobsByType)).Methods("GET") } func getJob(c *Context, w http.ResponseWriter, r *http.Request) { @@ -24,16 +27,55 @@ func getJob(c *Context, w http.ResponseWriter, r *http.Request) { return } - if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_SYSTEM) { - c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM) + if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_JOBS) { + c.SetPermissionError(model.PERMISSION_MANAGE_JOBS) return } - if status, err := app.GetJob(c.Params.JobId); err != nil { + if job, err := app.GetJob(c.Params.JobId); err != nil { c.Err = err return } else { - w.Write([]byte(status.ToJson())) + w.Write([]byte(job.ToJson())) + } +} + +func createJob(c *Context, w http.ResponseWriter, r *http.Request) { + job := model.JobFromJson(r.Body) + if job == nil { + c.SetInvalidParam("job") + return + } + + if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_JOBS) { + c.SetPermissionError(model.PERMISSION_MANAGE_JOBS) + return + } + + if job, err := app.CreateJob(job); err != nil { + c.Err = err + return + } else { + w.WriteHeader(http.StatusCreated) + w.Write([]byte(job.ToJson())) + } +} + +func getJobs(c *Context, w http.ResponseWriter, r *http.Request) { + if c.Err != nil { + return + } + + if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_JOBS) { + c.SetPermissionError(model.PERMISSION_MANAGE_JOBS) + return + } + + if jobs, err := app.GetJobsPage(c.Params.Page, c.Params.PerPage); err != nil { + c.Err = err + return + } else { + w.Write([]byte(model.JobsToJson(jobs))) } } @@ -43,15 +85,34 @@ func getJobsByType(c *Context, w http.ResponseWriter, r *http.Request) { return } - if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_SYSTEM) { - c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM) + if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_JOBS) { + c.SetPermissionError(model.PERMISSION_MANAGE_JOBS) return } - if statuses, err := app.GetJobsByTypePage(c.Params.JobType, c.Params.Page, c.Params.PerPage); err != nil { + if jobs, err := app.GetJobsByTypePage(c.Params.JobType, c.Params.Page, c.Params.PerPage); err != nil { c.Err = err return } else { - w.Write([]byte(model.JobsToJson(statuses))) + w.Write([]byte(model.JobsToJson(jobs))) } } + +func cancelJob(c *Context, w http.ResponseWriter, r *http.Request) { + c.RequireJobId() + if c.Err != nil { + return + } + + if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_JOBS) { + c.SetPermissionError(model.PERMISSION_MANAGE_JOBS) + return + } + + if err := app.CancelJob(c.Params.JobId); err != nil { + c.Err = err + return + } + + ReturnStatusOK(w) +} diff --git a/api4/job_test.go b/api4/job_test.go index 8bbea83e1..3dcdbe58b 100644 --- a/api4/job_test.go +++ b/api4/job_test.go @@ -12,74 +12,157 @@ import ( "github.com/mattermost/platform/store" ) -func TestGetJobStatus(t *testing.T) { +func TestCreateJob(t *testing.T) { th := Setup().InitBasic().InitSystemAdmin() defer TearDown() - status := &model.Job{ + job := &model.Job{ + Type: model.JOB_TYPE_DATA_RETENTION, + Data: map[string]interface{}{ + "thing": "stuff", + }, + } + + received, resp := th.SystemAdminClient.CreateJob(job) + CheckNoError(t, resp) + + defer app.Srv.Store.Job().Delete(received.Id) + + job = &model.Job{ + Type: model.NewId(), + } + + _, resp = th.SystemAdminClient.CreateJob(job) + CheckBadRequestStatus(t, resp) + + _, resp = th.Client.CreateJob(job) + CheckForbiddenStatus(t, resp) +} + +func TestGetJob(t *testing.T) { + th := Setup().InitBasic().InitSystemAdmin() + defer TearDown() + + job := &model.Job{ Id: model.NewId(), - Status: model.NewId(), + Status: model.JOB_STATUS_PENDING, } - if result := <-app.Srv.Store.Job().Save(status); result.Err != nil { + if result := <-app.Srv.Store.Job().Save(job); result.Err != nil { t.Fatal(result.Err) } - defer app.Srv.Store.Job().Delete(status.Id) + defer app.Srv.Store.Job().Delete(job.Id) - received, resp := th.SystemAdminClient.GetJob(status.Id) + received, resp := th.SystemAdminClient.GetJob(job.Id) CheckNoError(t, resp) - if received.Id != status.Id || received.Status != status.Status { - t.Fatal("incorrect job status received") + if received.Id != job.Id || received.Status != job.Status { + t.Fatal("incorrect job received") } _, resp = th.SystemAdminClient.GetJob("1234") CheckBadRequestStatus(t, resp) - _, resp = th.Client.GetJob(status.Id) + _, resp = th.Client.GetJob(job.Id) CheckForbiddenStatus(t, resp) _, resp = th.SystemAdminClient.GetJob(model.NewId()) CheckNotFoundStatus(t, resp) } -func TestGetJobStatusesByType(t *testing.T) { +func TestGetJobs(t *testing.T) { + th := Setup().InitBasic().InitSystemAdmin() + defer TearDown() + + jobType := model.NewId() + + jobs := []*model.Job{ + { + Id: model.NewId(), + Type: jobType, + CreateAt: model.GetMillis() + 1, + }, + { + Id: model.NewId(), + Type: jobType, + CreateAt: model.GetMillis(), + }, + { + Id: model.NewId(), + Type: jobType, + CreateAt: model.GetMillis() + 2, + }, + } + + for _, job := range jobs { + store.Must(app.Srv.Store.Job().Save(job)) + defer app.Srv.Store.Job().Delete(job.Id) + } + + received, resp := th.SystemAdminClient.GetJobs(0, 2) + CheckNoError(t, resp) + + if len(received) != 2 { + t.Fatal("received wrong number of jobs") + } else if received[0].Id != jobs[2].Id { + t.Fatal("should've received newest job first") + } else if received[1].Id != jobs[0].Id { + t.Fatal("should've received second newest job second") + } + + received, resp = th.SystemAdminClient.GetJobs(1, 2) + CheckNoError(t, resp) + + if received[0].Id != jobs[1].Id { + t.Fatal("should've received oldest job last") + } + + _, resp = th.Client.GetJobs(0, 60) + CheckForbiddenStatus(t, resp) +} + +func TestGetJobsByType(t *testing.T) { th := Setup().InitBasic().InitSystemAdmin() defer TearDown() jobType := model.NewId() - statuses := []*model.Job{ + jobs := []*model.Job{ { - Id: model.NewId(), - Type: jobType, - StartAt: 1000, + Id: model.NewId(), + Type: jobType, + CreateAt: 1000, }, { - Id: model.NewId(), - Type: jobType, - StartAt: 999, + Id: model.NewId(), + Type: jobType, + CreateAt: 999, }, { - Id: model.NewId(), - Type: jobType, - StartAt: 1001, + Id: model.NewId(), + Type: jobType, + CreateAt: 1001, + }, + { + Id: model.NewId(), + Type: model.NewId(), + CreateAt: 1002, }, } - for _, status := range statuses { - store.Must(app.Srv.Store.Job().Save(status)) - defer app.Srv.Store.Job().Delete(status.Id) + for _, job := range jobs { + store.Must(app.Srv.Store.Job().Save(job)) + defer app.Srv.Store.Job().Delete(job.Id) } received, resp := th.SystemAdminClient.GetJobsByType(jobType, 0, 2) CheckNoError(t, resp) if len(received) != 2 { - t.Fatal("received wrong number of statuses") - } else if received[0].Id != statuses[1].Id { + t.Fatal("received wrong number of jobs") + } else if received[0].Id != jobs[2].Id { t.Fatal("should've received newest job first") - } else if received[1].Id != statuses[0].Id { + } else if received[1].Id != jobs[0].Id { t.Fatal("should've received second newest job second") } @@ -87,8 +170,8 @@ func TestGetJobStatusesByType(t *testing.T) { CheckNoError(t, resp) if len(received) != 1 { - t.Fatal("received wrong number of statuses") - } else if received[0].Id != statuses[2].Id { + t.Fatal("received wrong number of jobs") + } else if received[0].Id != jobs[1].Id { t.Fatal("should've received oldest job last") } @@ -101,3 +184,46 @@ func TestGetJobStatusesByType(t *testing.T) { _, resp = th.Client.GetJobsByType(jobType, 0, 60) CheckForbiddenStatus(t, resp) } + +func TestCancelJob(t *testing.T) { + th := Setup().InitBasic().InitSystemAdmin() + defer TearDown() + + jobs := []*model.Job{ + { + Id: model.NewId(), + Type: model.NewId(), + Status: model.JOB_STATUS_PENDING, + }, + { + Id: model.NewId(), + Type: model.NewId(), + Status: model.JOB_STATUS_IN_PROGRESS, + }, + { + Id: model.NewId(), + Type: model.NewId(), + Status: model.JOB_STATUS_SUCCESS, + }, + } + + for _, job := range jobs { + store.Must(app.Srv.Store.Job().Save(job)) + defer app.Srv.Store.Job().Delete(job.Id) + } + + _, resp := th.Client.CancelJob(jobs[0].Id) + CheckForbiddenStatus(t, resp) + + _, resp = th.SystemAdminClient.CancelJob(jobs[0].Id) + CheckNoError(t, resp) + + _, resp = th.SystemAdminClient.CancelJob(jobs[1].Id) + CheckNoError(t, resp) + + _, resp = th.SystemAdminClient.CancelJob(jobs[2].Id) + CheckInternalErrorStatus(t, resp) + + _, resp = th.SystemAdminClient.CancelJob(model.NewId()) + CheckInternalErrorStatus(t, resp) +} diff --git a/app/job.go b/app/job.go index c625ce15f..36c0b1992 100644 --- a/app/job.go +++ b/app/job.go @@ -5,6 +5,7 @@ package app import ( "github.com/mattermost/platform/model" + "github.com/mattermost/platform/jobs" ) func GetJob(id string) (*model.Job, *model.AppError) { @@ -15,6 +16,18 @@ func GetJob(id string) (*model.Job, *model.AppError) { } } +func GetJobsPage(page int, perPage int) ([]*model.Job, *model.AppError) { + return GetJobs(page*perPage, perPage) +} + +func GetJobs(offset int, limit int) ([]*model.Job, *model.AppError) { + if result := <-Srv.Store.Job().GetAllPage(offset, limit); result.Err != nil { + return nil, result.Err + } else { + return result.Data.([]*model.Job), nil + } +} + func GetJobsByTypePage(jobType string, page int, perPage int) ([]*model.Job, *model.AppError) { return GetJobsByType(jobType, page*perPage, perPage) } @@ -26,3 +39,11 @@ func GetJobsByType(jobType string, offset int, limit int) ([]*model.Job, *model. return result.Data.([]*model.Job), nil } } + +func CreateJob(job *model.Job) (*model.Job, *model.AppError) { + return jobs.CreateJob(job.Type, job.Data) +} + +func CancelJob(jobId string) *model.AppError { + return jobs.RequestCancellation(jobId) +} diff --git a/app/job_test.go b/app/job_test.go index ced65788f..8f068901a 100644 --- a/app/job_test.go +++ b/app/job_test.go @@ -10,7 +10,7 @@ import ( "github.com/mattermost/platform/store" ) -func TestGetJobStatus(t *testing.T) { +func TestGetJob(t *testing.T) { Setup() status := &model.Job{ @@ -30,7 +30,7 @@ func TestGetJobStatus(t *testing.T) { } } -func TestGetJobStatusesByType(t *testing.T) { +func TestGetJobByType(t *testing.T) { Setup() jobType := model.NewId() @@ -39,17 +39,17 @@ func TestGetJobStatusesByType(t *testing.T) { { Id: model.NewId(), Type: jobType, - StartAt: 1000, + CreateAt: 1000, }, { Id: model.NewId(), Type: jobType, - StartAt: 999, + CreateAt: 999, }, { Id: model.NewId(), Type: jobType, - StartAt: 1001, + CreateAt: 1001, }, } @@ -62,7 +62,7 @@ func TestGetJobStatusesByType(t *testing.T) { t.Fatal(err) } else if len(received) != 2 { t.Fatal("received wrong number of statuses") - } else if received[0].Id != statuses[1].Id { + } else if received[0].Id != statuses[2].Id { t.Fatal("should've received newest job first") } else if received[1].Id != statuses[0].Id { t.Fatal("should've received second newest job second") @@ -72,7 +72,7 @@ func TestGetJobStatusesByType(t *testing.T) { t.Fatal(err) } else if len(received) != 1 { t.Fatal("received wrong number of statuses") - } else if received[0].Id != statuses[2].Id { + } else if received[0].Id != statuses[1].Id { t.Fatal("should've received oldest job last") } } diff --git a/i18n/en.json b/i18n/en.json index 27e65c6ba..90a08f7dc 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -3372,6 +3372,14 @@ "translation": "Create Teams" }, { + "id": "authentication.permissions.manage_jobs.description", + "translation": "Ability to manage jobs" + }, + { + "id": "authentication.permissions.manage_jobs.name", + "translation": "Manage Jobs" + }, + { "id": "authentication.permissions.manage_team_roles.description", "translation": "Ability to change the roles of a team member" }, @@ -4472,6 +4480,22 @@ "translation": "Invalid user id" }, { + "id": "model.job.is_valid.id.app_error", + "translation": "Invalid job Id" + }, + { + "id": "model.job.is_valid.create_at.app_error", + "translation": "Create at must be a valid time" + }, + { + "id": "model.job.is_valid.type.app_error", + "translation": "Invalid job type" + }, + { + "id": "model.job.is_valid.status.app_error", + "translation": "Invalid job status" + }, + { "id": "model.oauth.is_valid.app_id.app_error", "translation": "Invalid app id" }, diff --git a/jobs/jobs.go b/jobs/jobs.go index 9247355d0..1986b22b6 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -25,6 +25,10 @@ func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *mod Data: jobData, } + if err := job.IsValid(); err != nil { + return nil, err + } + if result := <-Srv.Store.Job().Save(&job); result.Err != nil { return nil, result.Err } @@ -41,7 +45,7 @@ func ClaimJob(job *model.Job) (bool, *model.AppError) { } } -func SetJobProgress(job *model.Job, progress int64) (*model.AppError) { +func SetJobProgress(job *model.Job, progress int64) *model.AppError { job.Status = model.JOB_STATUS_IN_PROGRESS job.Progress = progress @@ -78,7 +82,7 @@ func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { return result.Err } else { if !result.Data.(bool) { - return model.NewAppError("Jobs.SetJobError", "jobs.set_job_error.update.error", nil, "id=" + job.Id, http.StatusInternalServerError) + return model.NewAppError("Jobs.SetJobError", "jobs.set_job_error.update.error", nil, "id="+job.Id, http.StatusInternalServerError) } } } @@ -92,20 +96,20 @@ func SetJobCanceled(job *model.Job) *model.AppError { return result.Err } -func RequestCancellation(job *model.Job) *model.AppError { - if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { +func RequestCancellation(jobId string) *model.AppError { + if result := <-Srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { return result.Err } else if result.Data.(bool) { return nil } - if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { + if result := <-Srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { return result.Err } else if result.Data.(bool) { return nil } - return model.NewAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id, http.StatusInternalServerError) + return model.NewAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id="+jobId, http.StatusInternalServerError) } func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) { diff --git a/model/authorization.go b/model/authorization.go index 458ed1bdb..880d25e27 100644 --- a/model/authorization.go +++ b/model/authorization.go @@ -58,6 +58,7 @@ var PERMISSION_MANAGE_TEAM *Permission var PERMISSION_IMPORT_TEAM *Permission var PERMISSION_VIEW_TEAM *Permission var PERMISSION_LIST_USERS_WITHOUT_TEAM *Permission +var PERMISSION_MANAGE_JOBS *Permission // General permission that encompases all system admin functions // in the future this could be broken up to allow access to some @@ -292,6 +293,11 @@ func InitalizePermissions() { "authentication.permisssions.list_users_without_team.name", "authentication.permisssions.list_users_without_team.description", } + PERMISSION_MANAGE_JOBS = &Permission{ + "manage_jobs", + "authentication.permisssions.manage_jobs.name", + "authentication.permisssions.manage_jobs.description", + } } func InitalizeRoles() { @@ -405,6 +411,7 @@ func InitalizeRoles() { PERMISSION_CREATE_TEAM.Id, PERMISSION_ADD_USER_TO_TEAM.Id, PERMISSION_LIST_USERS_WITHOUT_TEAM.Id, + PERMISSION_MANAGE_JOBS.Id, }, ROLE_TEAM_USER.Permissions..., ), diff --git a/model/client4.go b/model/client4.go index feff9f8de..6f5eb03c6 100644 --- a/model/client4.go +++ b/model/client4.go @@ -2800,7 +2800,7 @@ func (c *Client4) OpenGraph(url string) (map[string]string, *Response) { // GetJob gets a single job. func (c *Client4) GetJob(id string) (*Job, *Response) { - if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/%v/status", id), ""); err != nil { + if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/%v", id), ""); err != nil { return nil, BuildErrorResponse(r, err) } else { defer closeBody(r) @@ -2808,12 +2808,42 @@ func (c *Client4) GetJob(id string) (*Job, *Response) { } } -// GetJobsByType gets all jobs of a given type, sorted with the job that most recently started first. +// Get all jobs, sorted with the job that was created most recently first. +func (c *Client4) GetJobs(page int, perPage int) ([]*Job, *Response) { + if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("?page=%v&per_page=%v", page, perPage), ""); err != nil { + return nil, BuildErrorResponse(r, err) + } else { + defer closeBody(r) + return JobsFromJson(r.Body), BuildResponse(r) + } +} + +// GetJobsByType gets all jobs of a given type, sorted with the job that was created most recently first. func (c *Client4) GetJobsByType(jobType string, page int, perPage int) ([]*Job, *Response) { - if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/type/%v/statuses?page=%v&per_page=%v", jobType, page, perPage), ""); err != nil { + if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/type/%v?page=%v&per_page=%v", jobType, page, perPage), ""); err != nil { return nil, BuildErrorResponse(r, err) } else { defer closeBody(r) return JobsFromJson(r.Body), BuildResponse(r) } } + +// CreateJob creates a job based on the provided job struct. +func (c *Client4) CreateJob(job *Job) (*Job, *Response) { + if r, err := c.DoApiPost(c.GetJobsRoute(), job.ToJson()); err != nil { + return nil, BuildErrorResponse(r, err) + } else { + defer closeBody(r) + return JobFromJson(r.Body), BuildResponse(r) + } +} + +// CancelJob requests the cancellation of the job with the provided Id. +func (c *Client4) CancelJob(jobId string) (bool, *Response) { + if r, err := c.DoApiPost(c.GetJobsRoute()+fmt.Sprintf("/%v/cancel", jobId), ""); err != nil { + return false, BuildErrorResponse(r, err) + } else { + defer closeBody(r) + return CheckStatusOK(r), BuildResponse(r) + } +} diff --git a/model/job.go b/model/job.go index b0567bf1a..ebc849b30 100644 --- a/model/job.go +++ b/model/job.go @@ -6,6 +6,7 @@ package model import ( "encoding/json" "io" + "net/http" ) const ( @@ -32,6 +33,36 @@ type Job struct { Data map[string]interface{} `json:"data"` } +func (j *Job) IsValid() *AppError { + if len(j.Id) != 26 { + return NewAppError("Job.IsValid", "model.job.is_valid.id.app_error", nil, "id="+j.Id, http.StatusBadRequest) + } + + if j.CreateAt == 0 { + return NewAppError("Job.IsValid", "model.job.is_valid.create_at.app_error", nil, "id="+j.Id, http.StatusBadRequest) + } + + switch j.Type { + case JOB_TYPE_DATA_RETENTION: + case JOB_TYPE_SEARCH_INDEXING: + default: + return NewAppError("Job.IsValid", "model.job.is_valid.type.app_error", nil, "id="+j.Id, http.StatusBadRequest) + } + + switch j.Status { + case JOB_STATUS_PENDING: + case JOB_STATUS_IN_PROGRESS: + case JOB_STATUS_SUCCESS: + case JOB_STATUS_ERROR: + case JOB_STATUS_CANCEL_REQUESTED: + case JOB_STATUS_CANCELED: + default: + return NewAppError("Job.IsValid", "model.job.is_valid.status.app_error", nil, "id="+j.Id, http.StatusBadRequest) + } + + return nil +} + func (js *Job) ToJson() string { if b, err := json.Marshal(js); err != nil { return "" @@ -41,9 +72,9 @@ func (js *Job) ToJson() string { } func JobFromJson(data io.Reader) *Job { - var status Job - if err := json.NewDecoder(data).Decode(&status); err == nil { - return &status + var job Job + if err := json.NewDecoder(data).Decode(&job); err == nil { + return &job } else { return nil } diff --git a/store/sql_job_store.go b/store/sql_job_store.go index c00e37d86..e287edad6 100644 --- a/store/sql_job_store.go +++ b/store/sql_job_store.go @@ -210,6 +210,38 @@ func (jss SqlJobStore) Get(id string) StoreChannel { return storeChannel } +func (jss SqlJobStore) GetAllPage(offset int, limit int) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var statuses []*model.Job + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + Jobs + ORDER BY + CreateAt DESC + LIMIT + :Limit + OFFSET + :Offset`, map[string]interface{}{"Limit": limit, "Offset": offset}); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.GetAllPage", + "store.sql_job.get_all.app_error", nil, err.Error()) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + func (jss SqlJobStore) GetAllByType(jobType string) StoreChannel { storeChannel := make(StoreChannel, 1) @@ -224,7 +256,9 @@ func (jss SqlJobStore) GetAllByType(jobType string) StoreChannel { FROM Jobs WHERE - Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil { + Type = :Type + ORDER BY + CreateAt DESC`, map[string]interface{}{"Type": jobType}); err != nil { result.Err = model.NewLocAppError("SqlJobStore.GetAllByType", "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error()) } else { @@ -254,7 +288,7 @@ func (jss SqlJobStore) GetAllByTypePage(jobType string, offset int, limit int) S WHERE Type = :Type ORDER BY - StartAt ASC + CreateAt DESC LIMIT :Limit OFFSET diff --git a/store/sql_job_store_test.go b/store/sql_job_store_test.go index edf09a4c0..97e95ab92 100644 --- a/store/sql_job_store_test.go +++ b/store/sql_job_store_test.go @@ -82,19 +82,24 @@ func TestJobGetAllByTypePage(t *testing.T) { jobs := []*model.Job{ { - Id: model.NewId(), - Type: jobType, - StartAt: 1000, + Id: model.NewId(), + Type: jobType, + CreateAt: 1000, + }, + { + Id: model.NewId(), + Type: jobType, + CreateAt: 999, }, { - Id: model.NewId(), - Type: jobType, - StartAt: 999, + Id: model.NewId(), + Type: jobType, + CreateAt: 1001, }, { - Id: model.NewId(), - Type: jobType, - StartAt: 1001, + Id: model.NewId(), + Type: model.NewId(), + CreateAt: 1002, }, } @@ -107,7 +112,7 @@ func TestJobGetAllByTypePage(t *testing.T) { t.Fatal(result.Err) } else if received := result.Data.([]*model.Job); len(received) != 2 { t.Fatal("received wrong number of jobs") - } else if received[0].Id != jobs[1].Id { + } else if received[0].Id != jobs[2].Id { t.Fatal("should've received newest job first") } else if received[1].Id != jobs[0].Id { t.Fatal("should've received second newest job second") @@ -117,7 +122,54 @@ func TestJobGetAllByTypePage(t *testing.T) { t.Fatal(result.Err) } else if received := result.Data.([]*model.Job); len(received) != 1 { t.Fatal("received wrong number of jobs") + } else if received[0].Id != jobs[1].Id { + t.Fatal("should've received oldest job last") + } +} + +func TestJobGetAllPage(t *testing.T) { + Setup() + + jobType := model.NewId() + + jobs := []*model.Job{ + { + Id: model.NewId(), + Type: jobType, + CreateAt: model.GetMillis() + 1, + }, + { + Id: model.NewId(), + Type: jobType, + CreateAt: model.GetMillis(), + }, + { + Id: model.NewId(), + Type: jobType, + CreateAt: model.GetMillis() + 2, + }, + } + + for _, job := range jobs { + Must(store.Job().Save(job)) + defer store.Job().Delete(job.Id) + } + + if result := <-store.Job().GetAllPage(0, 2); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.Job); len(received) != 2 { + t.Fatal("received wrong number of jobs") } else if received[0].Id != jobs[2].Id { + t.Fatal("should've received newest job first") + } else if received[1].Id != jobs[0].Id { + t.Fatal("should've received second newest job second") + } + + if result := <-store.Job().GetAllPage(2, 2); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.Job); len(received) < 1 { + t.Fatal("received wrong number of jobs") + } else if received[0].Id != jobs[1].Id { t.Fatal("should've received oldest job last") } } @@ -331,11 +383,11 @@ func TestJobUpdateStatusUpdateStatusOptimistically(t *testing.T) { func TestJobDelete(t *testing.T) { Setup() - status := Must(store.Job().Save(&model.Job{ + job := Must(store.Job().Save(&model.Job{ Id: model.NewId(), })).(*model.Job) - if result := <-store.Job().Delete(status.Id); result.Err != nil { + if result := <-store.Job().Delete(job.Id); result.Err != nil { t.Fatal(result.Err) } } diff --git a/store/store.go b/store/store.go index 062ed0fbd..ab3d97d9b 100644 --- a/store/store.go +++ b/store/store.go @@ -391,6 +391,7 @@ type JobStore interface { UpdateStatus(id string, status string) StoreChannel UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel Get(id string) StoreChannel + GetAllPage(offset int, limit int) StoreChannel GetAllByType(jobType string) StoreChannel GetAllByTypePage(jobType string, offset int, limit int) StoreChannel GetAllByStatus(status string) StoreChannel |