From 51bd710ecdca6628461c9fa2679737073e4d5059 Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Mon, 14 May 2018 15:59:04 +0100 Subject: MM-9728: Online migration for advanced permissions phase 2 (#8744) * MM-9728: Online migration for advanced permissions phase 2 * Add unit tests for new store functions. * Move migration specific code to own file. * Add migration state function test. * Style fixes. * Add i18n strings. * Fix mocks. * Add TestMain to migrations package tests. * Fix typo. * Fix review comments. * Fix up the "Check if migration is done" check to actually work. --- api4/channel_test.go | 13 + api4/scheme_test.go | 121 ++++++++- api4/team_test.go | 11 + app/app.go | 10 + app/scheme.go | 8 +- i18n/en.json | 48 ++++ imports/placeholder.go | 4 + jobs/interfaces/migrations_interface.go | 11 + jobs/jobs.go | 7 + jobs/jobs_watcher.go | 7 + jobs/schedulers.go | 4 + jobs/server.go | 2 + jobs/workers.go | 13 + migrations/advanced_permissions_phase_2.go | 106 ++++++++ migrations/migrations.go | 63 +++++ migrations/migrations_test.go | 140 ++++++++++ migrations/migrationstestlib.go | 419 +++++++++++++++++++++++++++++ migrations/scheduler.go | 110 ++++++++ migrations/worker.go | 166 ++++++++++++ model/job.go | 2 + model/migration.go | 8 + store/sqlstore/channel_store.go | 68 +++++ store/sqlstore/team_store.go | 69 +++++ store/store.go | 2 + store/storetest/channel_store.go | 75 ++++++ store/storetest/mocks/ChannelStore.go | 16 ++ store/storetest/mocks/TeamStore.go | 16 ++ store/storetest/team_store.go | 71 +++++ 28 files changed, 1586 insertions(+), 4 deletions(-) create mode 100644 jobs/interfaces/migrations_interface.go create mode 100644 migrations/advanced_permissions_phase_2.go create mode 100644 migrations/migrations.go create mode 100644 migrations/migrations_test.go create mode 100644 migrations/migrationstestlib.go create mode 100644 migrations/scheduler.go create mode 100644 migrations/worker.go create mode 100644 model/migration.go diff --git a/api4/channel_test.go b/api4/channel_test.go index 11d313291..551a1a484 100644 --- a/api4/channel_test.go +++ b/api4/channel_test.go @@ -12,6 +12,8 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" + "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) @@ -1890,6 +1892,17 @@ func TestUpdateChannelScheme(t *testing.T) { th.App.SetLicense(model.NewTestLicense("")) + // Mark the migration as done. + <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + res := <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"}) + assert.Nil(t, res.Err) + + // Un-mark the migration at the end of the test. + defer func() { + res := <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + assert.Nil(t, res.Err) + }() + team := &model.Team{ DisplayName: "Name", Description: "Some description", diff --git a/api4/scheme_test.go b/api4/scheme_test.go index a0ea1e9b0..92cfa4d30 100644 --- a/api4/scheme_test.go +++ b/api4/scheme_test.go @@ -18,6 +18,11 @@ func TestCreateScheme(t *testing.T) { th.App.SetLicense(model.NewTestLicense("")) + // Mark the migration as done. + <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + res := <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"}) + assert.Nil(t, res.Err) + // Basic test of creating a team scheme. scheme1 := &model.Scheme{ Name: model.NewId(), @@ -113,6 +118,21 @@ func TestCreateScheme(t *testing.T) { } _, r6 := th.SystemAdminClient.CreateScheme(scheme6) CheckNotImplementedStatus(t, r6) + + // Mark the migration as not done. + res = <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + assert.Nil(t, res.Err) + + th.LoginSystemAdmin() + th.App.SetLicense(model.NewTestLicense("")) + + scheme7 := &model.Scheme{ + Name: model.NewId(), + Description: model.NewId(), + Scope: model.SCHEME_SCOPE_TEAM, + } + _, r7 := th.SystemAdminClient.CreateScheme(scheme7) + CheckInternalErrorStatus(t, r7) } func TestGetScheme(t *testing.T) { @@ -128,9 +148,17 @@ func TestGetScheme(t *testing.T) { Scope: model.SCHEME_SCOPE_TEAM, } + // Mark the migration as done while we create the scheme. + <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + res := <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"}) + assert.Nil(t, res.Err) + s1, r1 := th.SystemAdminClient.CreateScheme(scheme1) CheckNoError(t, r1) + res = <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + assert.Nil(t, res.Err) + assert.Equal(t, s1.Name, scheme1.Name) assert.Equal(t, s1.Description, scheme1.Description) assert.NotZero(t, s1.CreateAt) @@ -184,11 +212,19 @@ func TestGetSchemes(t *testing.T) { Scope: model.SCHEME_SCOPE_CHANNEL, } + // Mark the migration as done while we create the scheme. + <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + res := <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"}) + assert.Nil(t, res.Err) + _, r1 := th.SystemAdminClient.CreateScheme(scheme1) CheckNoError(t, r1) _, r2 := th.SystemAdminClient.CreateScheme(scheme2) CheckNoError(t, r2) + res = <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + assert.Nil(t, res.Err) + l3, r3 := th.SystemAdminClient.GetSchemes("", 0, 100) CheckNoError(t, r3) @@ -226,6 +262,11 @@ func TestGetTeamsForScheme(t *testing.T) { th.App.SetLicense(model.NewTestLicense("")) + // Mark the migration as done while we create the scheme. + <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + res := <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"}) + assert.Nil(t, res.Err) + scheme1 := &model.Scheme{ Name: model.NewId(), Description: model.NewId(), @@ -234,6 +275,9 @@ func TestGetTeamsForScheme(t *testing.T) { scheme1, r1 := th.SystemAdminClient.CreateScheme(scheme1) CheckNoError(t, r1) + res = <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + assert.Nil(t, res.Err) + team1 := &model.Team{ Name: GenerateTestUsername(), DisplayName: "A Test Team", @@ -294,6 +338,10 @@ func TestGetTeamsForScheme(t *testing.T) { _, ri4 := th.Client.GetTeamsForScheme(model.NewId(), 0, 100) CheckForbiddenStatus(t, ri4) + // Mark the migration as done again while we create a scheme. + res = <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"}) + assert.Nil(t, res.Err) + scheme2 := &model.Scheme{ Name: model.NewId(), Description: model.NewId(), @@ -302,6 +350,9 @@ func TestGetTeamsForScheme(t *testing.T) { scheme2, rs2 := th.SystemAdminClient.CreateScheme(scheme2) CheckNoError(t, rs2) + res = <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + assert.Nil(t, res.Err) + _, ri5 := th.SystemAdminClient.GetTeamsForScheme(scheme2.Id, 0, 100) CheckBadRequestStatus(t, ri5) } @@ -312,6 +363,11 @@ func TestGetChannelsForScheme(t *testing.T) { th.App.SetLicense(model.NewTestLicense("")) + // Mark the migration as done while we create the scheme. + <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + res := <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"}) + assert.Nil(t, res.Err) + scheme1 := &model.Scheme{ Name: model.NewId(), Description: model.NewId(), @@ -320,6 +376,9 @@ func TestGetChannelsForScheme(t *testing.T) { scheme1, r1 := th.SystemAdminClient.CreateScheme(scheme1) CheckNoError(t, r1) + res = <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + assert.Nil(t, res.Err) + channel1 := &model.Channel{ TeamId: model.NewId(), DisplayName: "A Name", @@ -382,6 +441,10 @@ func TestGetChannelsForScheme(t *testing.T) { _, ri4 := th.Client.GetChannelsForScheme(model.NewId(), 0, 100) CheckForbiddenStatus(t, ri4) + // Mark the migration as done again while we create a scheme. + res = <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"}) + assert.Nil(t, res.Err) + scheme2 := &model.Scheme{ Name: model.NewId(), Description: model.NewId(), @@ -390,6 +453,9 @@ func TestGetChannelsForScheme(t *testing.T) { scheme2, rs2 := th.SystemAdminClient.CreateScheme(scheme2) CheckNoError(t, rs2) + res = <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + assert.Nil(t, res.Err) + _, ri5 := th.SystemAdminClient.GetChannelsForScheme(scheme2.Id, 0, 100) CheckBadRequestStatus(t, ri5) } @@ -400,6 +466,11 @@ func TestPatchScheme(t *testing.T) { th.App.SetLicense(model.NewTestLicense("")) + // Mark the migration as done. + <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + res := <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"}) + assert.Nil(t, res.Err) + // Basic test of creating a team scheme. scheme1 := &model.Scheme{ Name: model.NewId(), @@ -480,6 +551,16 @@ func TestPatchScheme(t *testing.T) { th.App.SetLicense(nil) _, r11 := th.SystemAdminClient.PatchScheme(s6.Id, schemePatch) CheckNotImplementedStatus(t, r11) + + // Mark the migration as not done. + res = <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + assert.Nil(t, res.Err) + + th.LoginSystemAdmin() + th.App.SetLicense(model.NewTestLicense("")) + + _, r12 := th.SystemAdminClient.PatchScheme(s6.Id, schemePatch) + CheckInternalErrorStatus(t, r12) } func TestDeleteScheme(t *testing.T) { @@ -489,6 +570,17 @@ func TestDeleteScheme(t *testing.T) { t.Run("ValidTeamScheme", func(t *testing.T) { th.App.SetLicense(model.NewTestLicense("")) + // Mark the migration as done. + <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + res := <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"}) + assert.Nil(t, res.Err) + + // Un-mark the migration at the end of the test. + defer func() { + res := <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + assert.Nil(t, res.Err) + }() + // Create a team scheme. scheme1 := &model.Scheme{ Name: model.NewId(), @@ -515,7 +607,7 @@ func TestDeleteScheme(t *testing.T) { assert.Zero(t, role4.DeleteAt) // Make sure this scheme is in use by a team. - res := <-th.App.Srv.Store.Team().Save(&model.Team{ + res = <-th.App.Srv.Store.Team().Save(&model.Team{ Name: model.NewId(), DisplayName: model.NewId(), Email: model.NewId() + "@nowhere.com", @@ -571,6 +663,17 @@ func TestDeleteScheme(t *testing.T) { t.Run("ValidChannelScheme", func(t *testing.T) { th.App.SetLicense(model.NewTestLicense("")) + // Mark the migration as done. + <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + res := <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"}) + assert.Nil(t, res.Err) + + // Un-mark the migration at the end of the test. + defer func() { + res := <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + assert.Nil(t, res.Err) + }() + // Create a channel scheme. scheme1 := &model.Scheme{ Name: model.NewId(), @@ -591,7 +694,7 @@ func TestDeleteScheme(t *testing.T) { assert.Zero(t, role4.DeleteAt) // Make sure this scheme is in use by a team. - res := <-th.App.Srv.Store.Channel().Save(&model.Channel{ + res = <-th.App.Srv.Store.Channel().Save(&model.Channel{ TeamId: model.NewId(), DisplayName: model.NewId(), Name: model.NewId(), @@ -635,6 +738,11 @@ func TestDeleteScheme(t *testing.T) { t.Run("FailureCases", func(t *testing.T) { th.App.SetLicense(model.NewTestLicense("")) + // Mark the migration as done. + <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + res := <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"}) + assert.Nil(t, res.Err) + scheme1 := &model.Scheme{ Name: model.NewId(), Description: model.NewId(), @@ -660,5 +768,14 @@ func TestDeleteScheme(t *testing.T) { th.App.SetLicense(nil) _, r5 := th.SystemAdminClient.DeleteScheme(s1.Id) CheckNotImplementedStatus(t, r5) + + // Test with migration not being done. + res = <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + assert.Nil(t, res.Err) + + th.App.SetLicense(model.NewTestLicense("")) + + _, r6 := th.SystemAdminClient.DeleteScheme(s1.Id) + CheckInternalErrorStatus(t, r6) }) } diff --git a/api4/team_test.go b/api4/team_test.go index 6df56f754..45d8e8f08 100644 --- a/api4/team_test.go +++ b/api4/team_test.go @@ -2059,6 +2059,17 @@ func TestUpdateTeamScheme(t *testing.T) { th.App.SetLicense(model.NewTestLicense("")) + // Mark the migration as done. + <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + res := <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"}) + assert.Nil(t, res.Err) + + // Un-mark the migration at the end of the test. + defer func() { + res := <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2) + assert.Nil(t, res.Err) + }() + team := &model.Team{ DisplayName: "Name", Description: "Some description", diff --git a/app/app.go b/app/app.go index 2cdf333c1..d4a663e32 100644 --- a/app/app.go +++ b/app/app.go @@ -20,6 +20,7 @@ import ( "github.com/mattermost/mattermost-server/einterfaces" ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" "github.com/mattermost/mattermost-server/jobs" + tjobs "github.com/mattermost/mattermost-server/jobs/interfaces" "github.com/mattermost/mattermost-server/mlog" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/plugin/pluginenv" @@ -319,6 +320,12 @@ func RegisterJobsLdapSyncInterface(f func(*App) ejobs.LdapSyncInterface) { jobsLdapSyncInterface = f } +var jobsMigrationsInterface func(*App) tjobs.MigrationsJobInterface + +func RegisterJobsMigrationsJobInterface(f func(*App) tjobs.MigrationsJobInterface) { + jobsMigrationsInterface = f +} + var ldapInterface func(*App) einterfaces.LdapInterface func RegisterLdapInterface(f func(*App) einterfaces.LdapInterface) { @@ -413,6 +420,9 @@ func (a *App) initJobs() { if jobsLdapSyncInterface != nil { a.Jobs.LdapSync = jobsLdapSyncInterface(a) } + if jobsMigrationsInterface != nil { + a.Jobs.Migrations = jobsMigrationsInterface(a) + } } func (a *App) DiagnosticId() string { diff --git a/app/scheme.go b/app/scheme.go index b43914eb8..a8eb9ef46 100644 --- a/app/scheme.go +++ b/app/scheme.go @@ -3,7 +3,9 @@ package app -import "github.com/mattermost/mattermost-server/model" +import ( + "github.com/mattermost/mattermost-server/model" +) func (a *App) GetScheme(id string) (*model.Scheme, *model.AppError) { if result := <-a.Srv.Store.Scheme().Get(id); result.Err != nil { @@ -109,7 +111,9 @@ func (a *App) GetChannelsForScheme(scheme *model.Scheme, offset int, limit int) } func (a *App) IsPhase2MigrationCompleted() *model.AppError { - // TODO: Actually check the Phase 2 migration has completed before permitting these actions. + if result := <-a.Srv.Store.System().GetByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2); result.Err != nil { + return result.Err + } return nil } diff --git a/i18n/en.json b/i18n/en.json index 58a950921..0b954d814 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -47,6 +47,54 @@ "id": "September", "translation": "September" }, + { + "id": "migrations.worker.run_advanced_permissions_phase_2_migration.invalid_progress", + "translation": "Migration failed due to invalid progress data." + }, + { + "id": "migrations.worker.run_migration.unknown_key", + "translation": "Cannot run migration job due to unknown migration key." + }, + { + "id": "store.sql_channel.migrate_channel_members.open_transaction.app_error", + "translation": "Failed to open the database transaction" + }, + { + "id": "store.sql_channel.migrate_channel_members.select.app_error", + "translation": "Failed to select the batch of channel members" + }, + { + "id": "store.sql_channel.migrate_channel_members.rollback_transaction.app_error", + "translation": "Failed to roll back the database transaction" + }, + { + "id": "store.sql_channel.migrate_channel_members.update.app_error", + "translation": "Failed to update the channel member" + }, + { + "id": "store.sql_channel.migrate_channel_members.commit_transaction.app_error", + "translation": "Failed to commit the database transaction" + }, + { + "id": "store.sql_team.migrate_team_members.open_transaction.app_error", + "translation": "Failed to open the database transaction" + }, + { + "id": "store.sql_team.migrate_team_members.select.app_error", + "translation": " Failed to select the batch of team members" + }, + { + "id": "store.sql_team.migrate_team_members.rollback_transaction.app_error", + "translation": "Failed to roll back the database transaction" + }, + { + "id": "store.sql_team.migrate_team_members.update.app_error", + "translation": "Failed to update the team member" + }, + { + "id": "store.sql_team.migrate_team_members.commit_transaction.app_error", + "translation": "Failed to commit the database transaction" + }, { "id": "api.admin.add_certificate.no_file.app_error", "translation": "No file under 'certificate' in request." diff --git a/imports/placeholder.go b/imports/placeholder.go index 98e5decd5..b7a5d449c 100644 --- a/imports/placeholder.go +++ b/imports/placeholder.go @@ -4,3 +4,7 @@ package imports // This is a placeholder so this package can be imported in Team Edition when it will be otherwise empty + +import ( + _ "github.com/mattermost/mattermost-server/migrations" +) diff --git a/jobs/interfaces/migrations_interface.go b/jobs/interfaces/migrations_interface.go new file mode 100644 index 000000000..48dc9f579 --- /dev/null +++ b/jobs/interfaces/migrations_interface.go @@ -0,0 +1,11 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package interfaces + +import "github.com/mattermost/mattermost-server/model" + +type MigrationsJobInterface interface { + MakeWorker() model.Worker + MakeScheduler() model.Scheduler +} diff --git a/jobs/jobs.go b/jobs/jobs.go index 850491403..ddbc4489b 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -106,6 +106,13 @@ func (srv *JobServer) SetJobCanceled(job *model.Job) *model.AppError { return result.Err } +func (srv *JobServer) UpdateInProgressJobData(job *model.Job) *model.AppError { + job.Status = model.JOB_STATUS_IN_PROGRESS + job.LastActivityAt = model.GetMillis() + result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS) + return result.Err +} + func (srv *JobServer) RequestCancellation(jobId string) *model.AppError { if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { return result.Err diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go index 07979442d..01d0a8d0f 100644 --- a/jobs/jobs_watcher.go +++ b/jobs/jobs_watcher.go @@ -107,6 +107,13 @@ func (watcher *Watcher) PollAndNotify() { default: } } + } else if job.Type == model.JOB_TYPE_MIGRATIONS { + if watcher.workers.Migrations != nil { + select { + case watcher.workers.Migrations.JobChannel() <- *job: + default: + } + } } } } diff --git a/jobs/schedulers.go b/jobs/schedulers.go index 2823036df..96aa2b635 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -50,6 +50,10 @@ func (srv *JobServer) InitSchedulers() *Schedulers { schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler()) } + if migrationsInterface := srv.Migrations; migrationsInterface != nil { + schedulers.schedulers = append(schedulers.schedulers, migrationsInterface.MakeScheduler()) + } + schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers)) return schedulers } diff --git a/jobs/server.go b/jobs/server.go index 01cf821dc..10ea9a46f 100644 --- a/jobs/server.go +++ b/jobs/server.go @@ -5,6 +5,7 @@ package jobs import ( ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" + tjobs "github.com/mattermost/mattermost-server/jobs/interfaces" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/store" ) @@ -34,6 +35,7 @@ type JobServer struct { ElasticsearchAggregator ejobs.ElasticsearchAggregatorInterface ElasticsearchIndexer ejobs.ElasticsearchIndexerInterface LdapSync ejobs.LdapSyncInterface + Migrations tjobs.MigrationsJobInterface } func NewJobServer(configService ConfigService, store store.Store) *JobServer { diff --git a/jobs/workers.go b/jobs/workers.go index 57a255013..67ab43241 100644 --- a/jobs/workers.go +++ b/jobs/workers.go @@ -20,6 +20,7 @@ type Workers struct { ElasticsearchIndexing model.Worker ElasticsearchAggregation model.Worker LdapSync model.Worker + Migrations model.Worker listenerId string } @@ -50,6 +51,10 @@ func (srv *JobServer) InitWorkers() *Workers { workers.LdapSync = ldapSyncInterface.MakeWorker() } + if migrationsInterface := srv.Migrations; migrationsInterface != nil { + workers.Migrations = migrationsInterface.MakeWorker() + } + return workers } @@ -77,6 +82,10 @@ func (workers *Workers) Start() *Workers { go workers.LdapSync.Run() } + if workers.Migrations != nil { + go workers.Migrations.Run() + } + go workers.Watcher.Start() }) @@ -152,6 +161,10 @@ func (workers *Workers) Stop() *Workers { workers.LdapSync.Stop() } + if workers.Migrations != nil { + workers.Migrations.Stop() + } + mlog.Info("Stopped workers") return workers diff --git a/migrations/advanced_permissions_phase_2.go b/migrations/advanced_permissions_phase_2.go new file mode 100644 index 000000000..55b1876c4 --- /dev/null +++ b/migrations/advanced_permissions_phase_2.go @@ -0,0 +1,106 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package migrations + +import ( + "encoding/json" + "io" + "net/http" + "strings" + + "github.com/mattermost/mattermost-server/model" +) + +type AdvancedPermissionsPhase2Progress struct { + CurrentTable string `json:"current_table"` + LastTeamId string `json:"last_team_id"` + LastChannelId string `json:"last_channel_id"` + LastUserId string `json:"last_user"` +} + +func (p *AdvancedPermissionsPhase2Progress) ToJson() string { + b, _ := json.Marshal(p) + return string(b) +} + +func AdvancedPermissionsPhase2ProgressFromJson(data io.Reader) *AdvancedPermissionsPhase2Progress { + var o *AdvancedPermissionsPhase2Progress + json.NewDecoder(data).Decode(&o) + return o +} + +func (p *AdvancedPermissionsPhase2Progress) IsValid() bool { + if len(p.LastChannelId) != 26 { + return false + } + + if len(p.LastTeamId) != 26 { + return false + } + + if len(p.LastUserId) != 26 { + return false + } + + switch p.CurrentTable { + case "TeamMembers": + case "ChannelMembers": + default: + return false + } + + return true +} + +func (worker *Worker) runAdvancedPermissionsPhase2Migration(lastDone string) (bool, string, *model.AppError) { + var progress *AdvancedPermissionsPhase2Progress + if len(lastDone) == 0 { + // Haven't started the migration yet. + progress = new(AdvancedPermissionsPhase2Progress) + progress.CurrentTable = "TeamMembers" + progress.LastChannelId = strings.Repeat("0", 26) + progress.LastTeamId = strings.Repeat("0", 26) + progress.LastUserId = strings.Repeat("0", 26) + } else { + progress = AdvancedPermissionsPhase2ProgressFromJson(strings.NewReader(lastDone)) + if !progress.IsValid() { + return false, "", model.NewAppError("MigrationsWorker.runAdvancedPermissionsPhase2Migration", "migrations.worker.run_advanced_permissions_phase_2_migration.invalid_progress", map[string]interface{}{"progress": progress.ToJson()}, "", http.StatusInternalServerError) + } + } + + if progress.CurrentTable == "TeamMembers" { + // Run a TeamMembers migration batch. + if result := <-worker.app.Srv.Store.Team().MigrateTeamMembers(progress.LastTeamId, progress.LastUserId); result.Err != nil { + return false, progress.ToJson(), result.Err + } else { + if result.Data == nil { + // We haven't progressed. That means that we've reached the end of this stage of the migration, and should now advance to the next stage. + progress.LastUserId = strings.Repeat("0", 26) + progress.CurrentTable = "ChannelMembers" + return false, progress.ToJson(), nil + } + + data := result.Data.(map[string]string) + progress.LastTeamId = data["TeamId"] + progress.LastUserId = data["UserId"] + } + } else if progress.CurrentTable == "ChannelMembers" { + // Run a ChannelMembers migration batch. + if result := <-worker.app.Srv.Store.Channel().MigrateChannelMembers(progress.LastChannelId, progress.LastUserId); result.Err != nil { + return false, progress.ToJson(), result.Err + } else { + if result.Data == nil { + // We haven't progressed. That means we've reached the end of this final stage of the migration. + + return true, progress.ToJson(), nil + } + + data := result.Data.(map[string]string) + progress.LastChannelId = data["ChannelId"] + progress.LastUserId = data["UserId"] + } + } + + return false, progress.ToJson(), nil +} diff --git a/migrations/migrations.go b/migrations/migrations.go new file mode 100644 index 000000000..940992839 --- /dev/null +++ b/migrations/migrations.go @@ -0,0 +1,63 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package migrations + +import ( + "github.com/mattermost/mattermost-server/app" + tjobs "github.com/mattermost/mattermost-server/jobs/interfaces" + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store" +) + +const ( + MIGRATION_STATE_UNSCHEDULED = "unscheduled" + MIGRATION_STATE_IN_PROGRESS = "in_progress" + MIGRATION_STATE_COMPLETED = "completed" + + JOB_DATA_KEY_MIGRATION = "migration_key" + JOB_DATA_KEY_MIGRATION_LAST_DONE = "last_done" +) + +type MigrationsJobInterfaceImpl struct { + App *app.App +} + +func init() { + app.RegisterJobsMigrationsJobInterface(func(a *app.App) tjobs.MigrationsJobInterface { + return &MigrationsJobInterfaceImpl{a} + }) +} + +func MakeMigrationsList() []string { + return []string{ + model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, + } +} + +func GetMigrationState(migration string, store store.Store) (string, *model.Job, *model.AppError) { + if result := <-store.System().GetByName(migration); result.Err == nil { + return MIGRATION_STATE_COMPLETED, nil, nil + } + + if result := <-store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS); result.Err != nil { + return "", nil, result.Err + } else { + for _, job := range result.Data.([]*model.Job) { + if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok { + if key != migration { + continue + } + + switch job.Status { + case model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_PENDING: + return MIGRATION_STATE_IN_PROGRESS, job, nil + default: + return MIGRATION_STATE_UNSCHEDULED, job, nil + } + } + } + } + + return MIGRATION_STATE_UNSCHEDULED, nil, nil +} diff --git a/migrations/migrations_test.go b/migrations/migrations_test.go new file mode 100644 index 000000000..308319430 --- /dev/null +++ b/migrations/migrations_test.go @@ -0,0 +1,140 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package migrations + +import ( + "flag" + "os" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store/storetest" + "github.com/mattermost/mattermost-server/utils" +) + +func TestMain(m *testing.M) { + flag.Parse() + + // Setup a global logger to catch tests logging outside of app context + // The global logger will be stomped by apps initalizing but that's fine for testing. Ideally this won't happen. + mlog.InitGlobalLogger(mlog.NewLogger(&mlog.LoggerConfiguration{ + EnableConsole: true, + ConsoleJson: true, + ConsoleLevel: "error", + EnableFile: false, + })) + + utils.TranslationsPreInit() + + // In the case where a dev just wants to run a single test, it's faster to just use the default + // store. + if filter := flag.Lookup("test.run").Value.String(); filter != "" && filter != "." { + mlog.Info("-test.run used, not creating temporary containers") + os.Exit(m.Run()) + } + + status := 0 + + container, settings, err := storetest.NewMySQLContainer() + if err != nil { + panic(err) + } + + UseTestStore(container, settings) + + defer func() { + StopTestStore() + os.Exit(status) + }() + + status = m.Run() +} + +func TestGetMigrationState(t *testing.T) { + th := Setup() + defer th.TearDown() + + migrationKey := model.NewId() + + th.DeleteAllJobsByTypeAndMigrationKey(model.JOB_TYPE_MIGRATIONS, migrationKey) + + // Test with no job yet. + state, job, err := GetMigrationState(migrationKey, th.App.Srv.Store) + assert.Nil(t, err) + assert.Nil(t, job) + assert.Equal(t, "unscheduled", state) + + // Test with the system table showing the migration as done. + system := model.System{ + Name: migrationKey, + Value: "true", + } + res1 := <-th.App.Srv.Store.System().Save(&system) + assert.Nil(t, res1.Err) + + state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store) + assert.Nil(t, err) + assert.Nil(t, job) + assert.Equal(t, "completed", state) + + res2 := <-th.App.Srv.Store.System().PermanentDeleteByName(migrationKey) + assert.Nil(t, res2.Err) + + // Test with a job scheduled in "pending" state. + j1 := &model.Job{ + Id: model.NewId(), + CreateAt: model.GetMillis(), + Data: map[string]string{ + JOB_DATA_KEY_MIGRATION: migrationKey, + }, + Status: model.JOB_STATUS_PENDING, + Type: model.JOB_TYPE_MIGRATIONS, + } + + j1 = (<-th.App.Srv.Store.Job().Save(j1)).Data.(*model.Job) + + state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store) + assert.Nil(t, err) + assert.Equal(t, j1.Id, job.Id) + assert.Equal(t, "in_progress", state) + + // Test with a job scheduled in "in progress" state. + j2 := &model.Job{ + Id: model.NewId(), + CreateAt: j1.CreateAt + 1, + Data: map[string]string{ + JOB_DATA_KEY_MIGRATION: migrationKey, + }, + Status: model.JOB_STATUS_IN_PROGRESS, + Type: model.JOB_TYPE_MIGRATIONS, + } + + j2 = (<-th.App.Srv.Store.Job().Save(j2)).Data.(*model.Job) + + state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store) + assert.Nil(t, err) + assert.Equal(t, j2.Id, job.Id) + assert.Equal(t, "in_progress", state) + + // Test with a job scheduled in "error" state. + j3 := &model.Job{ + Id: model.NewId(), + CreateAt: j2.CreateAt + 1, + Data: map[string]string{ + JOB_DATA_KEY_MIGRATION: migrationKey, + }, + Status: model.JOB_STATUS_ERROR, + Type: model.JOB_TYPE_MIGRATIONS, + } + + j3 = (<-th.App.Srv.Store.Job().Save(j3)).Data.(*model.Job) + + state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store) + assert.Nil(t, err) + assert.Equal(t, j3.Id, job.Id) + assert.Equal(t, "unscheduled", state) +} diff --git a/migrations/migrationstestlib.go b/migrations/migrationstestlib.go new file mode 100644 index 000000000..b52f7af79 --- /dev/null +++ b/migrations/migrationstestlib.go @@ -0,0 +1,419 @@ +// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package migrations + +import ( + "encoding/json" + "io" + "io/ioutil" + "os" + "path/filepath" + "time" + + "github.com/mattermost/mattermost-server/app" + "github.com/mattermost/mattermost-server/einterfaces" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/plugin" + "github.com/mattermost/mattermost-server/plugin/pluginenv" + "github.com/mattermost/mattermost-server/store" + "github.com/mattermost/mattermost-server/store/sqlstore" + "github.com/mattermost/mattermost-server/store/storetest" + "github.com/mattermost/mattermost-server/utils" +) + +type TestHelper struct { + App *app.App + BasicTeam *model.Team + BasicUser *model.User + BasicUser2 *model.User + BasicChannel *model.Channel + BasicPost *model.Post + + SystemAdminUser *model.User + + tempConfigPath string + tempWorkspace string + pluginHooks map[string]plugin.Hooks +} + +type persistentTestStore struct { + store.Store +} + +func (*persistentTestStore) Close() {} + +var testStoreContainer *storetest.RunningContainer +var testStore *persistentTestStore +var testStoreSqlSupplier *sqlstore.SqlSupplier +var testClusterInterface *FakeClusterInterface + +// UseTestStore sets the container and corresponding settings to use for tests. Once the tests are +// complete (e.g. at the end of your TestMain implementation), you should call StopTestStore. +func UseTestStore(container *storetest.RunningContainer, settings *model.SqlSettings) { + testClusterInterface = &FakeClusterInterface{} + testStoreContainer = container + testStoreSqlSupplier = sqlstore.NewSqlSupplier(*settings, nil) + testStore = &persistentTestStore{store.NewLayeredStore(testStoreSqlSupplier, nil, testClusterInterface)} +} + +func StopTestStore() { + if testStoreContainer != nil { + testStoreContainer.Stop() + testStoreContainer = nil + } +} + +func setupTestHelper(enterprise bool) *TestHelper { + permConfig, err := os.Open(utils.FindConfigFile("config.json")) + if err != nil { + panic(err) + } + defer permConfig.Close() + tempConfig, err := ioutil.TempFile("", "") + if err != nil { + panic(err) + } + _, err = io.Copy(tempConfig, permConfig) + tempConfig.Close() + if err != nil { + panic(err) + } + + options := []app.Option{app.ConfigFile(tempConfig.Name()), app.DisableConfigWatch} + if testStore != nil { + options = append(options, app.StoreOverride(testStore)) + } + + a, err := app.New(options...) + if err != nil { + panic(err) + } + + th := &TestHelper{ + App: a, + pluginHooks: make(map[string]plugin.Hooks), + tempConfigPath: tempConfig.Name(), + } + + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.TeamSettings.MaxUsersPerTeam = 50 }) + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.RateLimitSettings.Enable = false }) + prevListenAddress := *th.App.Config().ServiceSettings.ListenAddress + if testStore != nil { + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.ListenAddress = ":0" }) + } + serverErr := th.App.StartServer() + if serverErr != nil { + panic(serverErr) + } + + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.ListenAddress = prevListenAddress }) + + th.App.DoAdvancedPermissionsMigration() + + th.App.Srv.Store.MarkSystemRanUnitTests() + + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.TeamSettings.EnableOpenServer = true }) + + if enterprise { + th.App.SetLicense(model.NewTestLicense()) + } else { + th.App.SetLicense(nil) + } + + return th +} + +func SetupEnterprise() *TestHelper { + return setupTestHelper(true) +} + +func Setup() *TestHelper { + return setupTestHelper(false) +} + +func (me *TestHelper) InitBasic() *TestHelper { + me.BasicTeam = me.CreateTeam() + me.BasicUser = me.CreateUser() + me.LinkUserToTeam(me.BasicUser, me.BasicTeam) + me.BasicUser2 = me.CreateUser() + me.LinkUserToTeam(me.BasicUser2, me.BasicTeam) + me.BasicChannel = me.CreateChannel(me.BasicTeam) + me.BasicPost = me.CreatePost(me.BasicChannel) + + return me +} + +func (me *TestHelper) InitSystemAdmin() *TestHelper { + me.SystemAdminUser = me.CreateUser() + me.App.UpdateUserRoles(me.SystemAdminUser.Id, model.SYSTEM_USER_ROLE_ID+" "+model.SYSTEM_ADMIN_ROLE_ID, false) + me.SystemAdminUser, _ = me.App.GetUser(me.SystemAdminUser.Id) + + return me +} + +func (me *TestHelper) MakeEmail() string { + return "success_" + model.NewId() + "@simulator.amazonses.com" +} + +func (me *TestHelper) CreateTeam() *model.Team { + id := model.NewId() + team := &model.Team{ + DisplayName: "dn_" + id, + Name: "name" + id, + Email: "success+" + id + "@simulator.amazonses.com", + Type: model.TEAM_OPEN, + } + + utils.DisableDebugLogForTest() + var err *model.AppError + if team, err = me.App.CreateTeam(team); err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + utils.EnableDebugLogForTest() + return team +} + +func (me *TestHelper) CreateUser() *model.User { + id := model.NewId() + + user := &model.User{ + Email: "success+" + id + "@simulator.amazonses.com", + Username: "un_" + id, + Nickname: "nn_" + id, + Password: "Password1", + EmailVerified: true, + } + + utils.DisableDebugLogForTest() + var err *model.AppError + if user, err = me.App.CreateUser(user); err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + utils.EnableDebugLogForTest() + return user +} + +func (me *TestHelper) CreateChannel(team *model.Team) *model.Channel { + return me.createChannel(team, model.CHANNEL_OPEN) +} + +func (me *TestHelper) createChannel(team *model.Team, channelType string) *model.Channel { + id := model.NewId() + + channel := &model.Channel{ + DisplayName: "dn_" + id, + Name: "name_" + id, + Type: channelType, + TeamId: team.Id, + CreatorId: me.BasicUser.Id, + } + + utils.DisableDebugLogForTest() + var err *model.AppError + if channel, err = me.App.CreateChannel(channel, true); err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + utils.EnableDebugLogForTest() + return channel +} + +func (me *TestHelper) CreateDmChannel(user *model.User) *model.Channel { + utils.DisableDebugLogForTest() + var err *model.AppError + var channel *model.Channel + if channel, err = me.App.CreateDirectChannel(me.BasicUser.Id, user.Id); err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + utils.EnableDebugLogForTest() + return channel +} + +func (me *TestHelper) CreatePost(channel *model.Channel) *model.Post { + id := model.NewId() + + post := &model.Post{ + UserId: me.BasicUser.Id, + ChannelId: channel.Id, + Message: "message_" + id, + CreateAt: model.GetMillis() - 10000, + } + + utils.DisableDebugLogForTest() + var err *model.AppError + if post, err = me.App.CreatePost(post, channel, false); err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + utils.EnableDebugLogForTest() + return post +} + +func (me *TestHelper) LinkUserToTeam(user *model.User, team *model.Team) { + utils.DisableDebugLogForTest() + + err := me.App.JoinUserToTeam(team, user, "") + if err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + + utils.EnableDebugLogForTest() +} + +func (me *TestHelper) AddUserToChannel(user *model.User, channel *model.Channel) *model.ChannelMember { + utils.DisableDebugLogForTest() + + member, err := me.App.AddUserToChannel(user, channel) + if err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + + utils.EnableDebugLogForTest() + + return member +} + +func (me *TestHelper) TearDown() { + me.App.Shutdown() + os.Remove(me.tempConfigPath) + if err := recover(); err != nil { + StopTestStore() + panic(err) + } + if me.tempWorkspace != "" { + os.RemoveAll(me.tempWorkspace) + } +} + +type mockPluginSupervisor struct { + hooks plugin.Hooks +} + +func (s *mockPluginSupervisor) Start(api plugin.API) error { + return s.hooks.OnActivate(api) +} + +func (s *mockPluginSupervisor) Stop() error { + return nil +} + +func (s *mockPluginSupervisor) Hooks() plugin.Hooks { + return s.hooks +} + +func (me *TestHelper) InstallPlugin(manifest *model.Manifest, hooks plugin.Hooks) { + if me.tempWorkspace == "" { + dir, err := ioutil.TempDir("", "apptest") + if err != nil { + panic(err) + } + me.tempWorkspace = dir + } + + pluginDir := filepath.Join(me.tempWorkspace, "plugins") + webappDir := filepath.Join(me.tempWorkspace, "webapp") + me.App.InitPlugins(pluginDir, webappDir, func(bundle *model.BundleInfo) (plugin.Supervisor, error) { + if hooks, ok := me.pluginHooks[bundle.Manifest.Id]; ok { + return &mockPluginSupervisor{hooks}, nil + } + return pluginenv.DefaultSupervisorProvider(bundle) + }) + + me.pluginHooks[manifest.Id] = hooks + + manifestCopy := *manifest + if manifestCopy.Backend == nil { + manifestCopy.Backend = &model.ManifestBackend{} + } + manifestBytes, err := json.Marshal(&manifestCopy) + if err != nil { + panic(err) + } + + if err := os.MkdirAll(filepath.Join(pluginDir, manifest.Id), 0700); err != nil { + panic(err) + } + + if err := ioutil.WriteFile(filepath.Join(pluginDir, manifest.Id, "plugin.json"), manifestBytes, 0600); err != nil { + panic(err) + } +} + +func (me *TestHelper) ResetRoleMigration() { + if _, err := testStoreSqlSupplier.GetMaster().Exec("DELETE from Roles"); err != nil { + panic(err) + } + + testClusterInterface.sendClearRoleCacheMessage() + + if _, err := testStoreSqlSupplier.GetMaster().Exec("DELETE from Systems where Name = :Name", map[string]interface{}{"Name": app.ADVANCED_PERMISSIONS_MIGRATION_KEY}); err != nil { + panic(err) + } +} + +func (me *TestHelper) DeleteAllJobsByTypeAndMigrationKey(jobType string, migrationKey string) { + if res := <-me.App.Srv.Store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS); res.Err != nil { + panic(res.Err) + } else { + jobs := res.Data.([]*model.Job) + + for _, job := range jobs { + if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok && key == migrationKey { + if res := <-me.App.Srv.Store.Job().Delete(job.Id); res.Err != nil { + panic(res.Err) + } + } + } + } +} + +type FakeClusterInterface struct { + clusterMessageHandler einterfaces.ClusterMessageHandler +} + +func (me *FakeClusterInterface) StartInterNodeCommunication() {} +func (me *FakeClusterInterface) StopInterNodeCommunication() {} +func (me *FakeClusterInterface) RegisterClusterMessageHandler(event string, crm einterfaces.ClusterMessageHandler) { + me.clusterMessageHandler = crm +} +func (me *FakeClusterInterface) GetClusterId() string { return "" } +func (me *FakeClusterInterface) IsLeader() bool { return false } +func (me *FakeClusterInterface) GetMyClusterInfo() *model.ClusterInfo { return nil } +func (me *FakeClusterInterface) GetClusterInfos() []*model.ClusterInfo { return nil } +func (me *FakeClusterInterface) SendClusterMessage(cluster *model.ClusterMessage) {} +func (me *FakeClusterInterface) NotifyMsg(buf []byte) {} +func (me *FakeClusterInterface) GetClusterStats() ([]*model.ClusterStats, *model.AppError) { + return nil, nil +} +func (me *FakeClusterInterface) GetLogs(page, perPage int) ([]string, *model.AppError) { + return []string{}, nil +} +func (me *FakeClusterInterface) ConfigChanged(previousConfig *model.Config, newConfig *model.Config, sendToOtherServer bool) *model.AppError { + return nil +} +func (me *FakeClusterInterface) sendClearRoleCacheMessage() { + me.clusterMessageHandler(&model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_ROLES, + }) +} diff --git a/migrations/scheduler.go b/migrations/scheduler.go new file mode 100644 index 000000000..8a7ac30d0 --- /dev/null +++ b/migrations/scheduler.go @@ -0,0 +1,110 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package migrations + +import ( + "time" + + "github.com/mattermost/mattermost-server/app" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store" +) + +const ( + MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS = 3600000 // 1 hour +) + +type Scheduler struct { + App *app.App + allMigrationsCompleted bool +} + +func (m *MigrationsJobInterfaceImpl) MakeScheduler() model.Scheduler { + return &Scheduler{m.App, false} +} + +func (scheduler *Scheduler) Name() string { + return "MigrationsScheduler" +} + +func (scheduler *Scheduler) JobType() string { + return model.JOB_TYPE_MIGRATIONS +} + +func (scheduler *Scheduler) Enabled(cfg *model.Config) bool { + return true +} + +func (scheduler *Scheduler) NextScheduleTime(cfg *model.Config, now time.Time, pendingJobs bool, lastSuccessfulJob *model.Job) *time.Time { + if scheduler.allMigrationsCompleted { + return nil + } + + nextTime := time.Now().Add(60 * time.Second) + return &nextTime +} + +func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) { + mlog.Debug("Scheduling Job", mlog.String("scheduler", scheduler.Name())) + + // Work through the list of migrations in order. Schedule the first one that isn't done (assuming it isn't in progress already). + for _, key := range MakeMigrationsList() { + state, job, err := GetMigrationState(key, scheduler.App.Srv.Store) + if err != nil { + mlog.Error("Failed to determine status of migration: ", mlog.String("scheduler", scheduler.Name()), mlog.String("migration_key", key), mlog.String("error", err.Error())) + return nil, nil + } + + if state == MIGRATION_STATE_IN_PROGRESS { + // Check the migration job isn't wedged. + if job != nil && job.LastActivityAt < model.GetMillis()-MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS { + mlog.Warn("Job appears to be wedged. Rescheduling another instance.", mlog.String("scheduler", scheduler.Name()), mlog.String("wedged_job_id", job.Id), mlog.String("migration_key", key)) + if err := scheduler.App.Jobs.SetJobError(job, nil); err != nil { + mlog.Error("Worker: Failed to set job error", mlog.String("scheduler", scheduler.Name()), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + } + return scheduler.createJob(key, job, scheduler.App.Srv.Store) + } + + return nil, nil + } + + if state == MIGRATION_STATE_COMPLETED { + // This migration is done. Continue to check the next. + continue + } + + if state == MIGRATION_STATE_UNSCHEDULED { + mlog.Debug("Scheduling a new job for migration.", mlog.String("scheduler", scheduler.Name()), mlog.String("migration_key", key)) + return scheduler.createJob(key, job, scheduler.App.Srv.Store) + } + + mlog.Error("Unknown migration state. Not doing anything.", mlog.String("migration_state", state)) + return nil, nil + } + + // If we reached here, then there aren't any migrations left to run. + scheduler.allMigrationsCompleted = true + mlog.Debug("All migrations are complete.", mlog.String("scheduler", scheduler.Name())) + + return nil, nil +} + +func (scheduler *Scheduler) createJob(migrationKey string, lastJob *model.Job, store store.Store) (*model.Job, *model.AppError) { + var lastDone string + if lastJob != nil { + lastDone = lastJob.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE] + } + + data := map[string]string{ + JOB_DATA_KEY_MIGRATION: migrationKey, + JOB_DATA_KEY_MIGRATION_LAST_DONE: lastDone, + } + + if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_MIGRATIONS, data); err != nil { + return nil, err + } else { + return job, nil + } +} diff --git a/migrations/worker.go b/migrations/worker.go new file mode 100644 index 000000000..7a64dd609 --- /dev/null +++ b/migrations/worker.go @@ -0,0 +1,166 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package migrations + +import ( + "context" + "net/http" + "time" + + "github.com/mattermost/mattermost-server/app" + "github.com/mattermost/mattermost-server/jobs" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" +) + +const ( + TIME_BETWEEN_BATCHES = 100 +) + +type Worker struct { + name string + stop chan bool + stopped chan bool + jobs chan model.Job + jobServer *jobs.JobServer + app *app.App +} + +func (m *MigrationsJobInterfaceImpl) MakeWorker() model.Worker { + worker := Worker{ + name: "Migrations", + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + jobs: make(chan model.Job), + jobServer: m.App.Jobs, + app: m.App, + } + + return &worker +} + +func (worker *Worker) Run() { + mlog.Debug("Worker started", mlog.String("worker", worker.name)) + + defer func() { + mlog.Debug("Worker finished", mlog.String("worker", worker.name)) + worker.stopped <- true + }() + + for { + select { + case <-worker.stop: + mlog.Debug("Worker received stop signal", mlog.String("worker", worker.name)) + return + case job := <-worker.jobs: + mlog.Debug("Worker received a new candidate job.", mlog.String("worker", worker.name)) + worker.DoJob(&job) + } + } +} + +func (worker *Worker) Stop() { + mlog.Debug("Worker stopping", mlog.String("worker", worker.name)) + worker.stop <- true + <-worker.stopped +} + +func (worker *Worker) JobChannel() chan<- model.Job { + return worker.jobs +} + +func (worker *Worker) DoJob(job *model.Job) { + if claimed, err := worker.jobServer.ClaimJob(job); err != nil { + mlog.Info("Worker experienced an error while trying to claim job", + mlog.String("worker", worker.name), + mlog.String("job_id", job.Id), + mlog.String("error", err.Error())) + return + } else if !claimed { + return + } + + cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background()) + cancelWatcherChan := make(chan interface{}, 1) + go worker.app.Jobs.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan) + + defer cancelCancelWatcher() + + for { + select { + case <-cancelWatcherChan: + mlog.Debug("Worker: Job has been canceled via CancellationWatcher", mlog.String("worker", worker.name), mlog.String("job_id", job.Id)) + worker.setJobCanceled(job) + return + + case <-worker.stop: + mlog.Debug("Worker: Job has been canceled via Worker Stop", mlog.String("worker", worker.name), mlog.String("job_id", job.Id)) + worker.setJobCanceled(job) + return + + case <-time.After(TIME_BETWEEN_BATCHES * time.Millisecond): + done, progress, err := worker.runMigration(job.Data[JOB_DATA_KEY_MIGRATION], job.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE]) + if err != nil { + mlog.Error("Worker: Failed to run migration", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + worker.setJobError(job, err) + return + } else if done { + mlog.Info("Worker: Job is complete", mlog.String("worker", worker.name), mlog.String("job_id", job.Id)) + worker.setJobSuccess(job) + return + } else { + job.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE] = progress + if err := worker.app.Jobs.UpdateInProgressJobData(job); err != nil { + mlog.Error("Worker: Failed to update migration status data for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + worker.setJobError(job, err) + return + } + } + } + } +} + +func (worker *Worker) setJobSuccess(job *model.Job) { + if err := worker.app.Jobs.SetJobSuccess(job); err != nil { + mlog.Error("Worker: Failed to set success for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + worker.setJobError(job, err) + } +} + +func (worker *Worker) setJobError(job *model.Job, appError *model.AppError) { + if err := worker.app.Jobs.SetJobError(job, appError); err != nil { + mlog.Error("Worker: Failed to set job error", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + } +} + +func (worker *Worker) setJobCanceled(job *model.Job) { + if err := worker.app.Jobs.SetJobCanceled(job); err != nil { + mlog.Error("Worker: Failed to mark job as canceled", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + } +} + +// Return parameters: +// - whether the migration is completed on this run (true) or still incomplete (false). +// - the updated lastDone string for the migration. +// - any error which may have occurred while running the migration. +func (worker *Worker) runMigration(key string, lastDone string) (bool, string, *model.AppError) { + var done bool + var progress string + var err *model.AppError + + switch key { + case model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2: + done, progress, err = worker.runAdvancedPermissionsPhase2Migration(lastDone) + default: + return false, "", model.NewAppError("MigrationsWorker.runMigration", "migrations.worker.run_migration.unknown_key", map[string]interface{}{"key": key}, "", http.StatusInternalServerError) + } + + if done { + if result := <-worker.app.Srv.Store.System().Save(&model.System{Name: key, Value: "true"}); result.Err != nil { + return false, "", result.Err + } + } + + return done, progress, err +} diff --git a/model/job.go b/model/job.go index e10ed1f5d..c16614958 100644 --- a/model/job.go +++ b/model/job.go @@ -16,6 +16,7 @@ const ( JOB_TYPE_ELASTICSEARCH_POST_INDEXING = "elasticsearch_post_indexing" JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION = "elasticsearch_post_aggregation" JOB_TYPE_LDAP_SYNC = "ldap_sync" + JOB_TYPE_MIGRATIONS = "migrations" JOB_STATUS_PENDING = "pending" JOB_STATUS_IN_PROGRESS = "in_progress" @@ -52,6 +53,7 @@ func (j *Job) IsValid() *AppError { case JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION: case JOB_TYPE_LDAP_SYNC: case JOB_TYPE_MESSAGE_EXPORT: + case JOB_TYPE_MIGRATIONS: default: return NewAppError("Job.IsValid", "model.job.is_valid.type.app_error", nil, "id="+j.Id, http.StatusBadRequest) } diff --git a/model/migration.go b/model/migration.go new file mode 100644 index 000000000..ead7acce2 --- /dev/null +++ b/model/migration.go @@ -0,0 +1,8 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package model + +const ( + MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2 = "migration_advanced_permissions_phase_2" +) diff --git a/store/sqlstore/channel_store.go b/store/sqlstore/channel_store.go index beef1be80..dceebc92e 100644 --- a/store/sqlstore/channel_store.go +++ b/store/sqlstore/channel_store.go @@ -1739,3 +1739,71 @@ func (s SqlChannelStore) GetChannelsByScheme(schemeId string, offset int, limit } }) } + +// This function does the Advanced Permissions Phase 2 migration for ChannelMember objects. It performs the migration +// in batches as a single transaction per batch to ensure consistency but to also minimise execution time to avoid +// causing unnecessary table locks. **THIS FUNCTION SHOULD NOT BE USED FOR ANY OTHER PURPOSE.** Executing this function +// *after* the new Schemes functionality has been used on an installation will have unintended consequences. +func (s SqlChannelStore) MigrateChannelMembers(fromChannelId string, fromUserId string) store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + var transaction *gorp.Transaction + var err error + + if transaction, err = s.GetMaster().Begin(); err != nil { + result.Err = model.NewAppError("SqlChannelStore.MigrateChannelMembers", "store.sql_channel.migrate_channel_members.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + var channelMembers []channelMember + if _, err := transaction.Select(&channelMembers, "SELECT * from ChannelMembers WHERE (ChannelId, UserId) > (:FromChannelId, :FromUserId) ORDER BY ChannelId, UserId LIMIT 100", map[string]interface{}{"FromChannelId": fromChannelId, "FromUserId": fromUserId}); err != nil { + result.Err = model.NewAppError("SqlChannelStore.MigrateChannelMembers", "store.sql_channel.migrate_channel_members.select.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + if len(channelMembers) == 0 { + // No more channel members in query result means that the migration has finished. + return + } + + for _, member := range channelMembers { + roles := strings.Fields(member.Roles) + var newRoles []string + member.SchemeAdmin = sql.NullBool{Bool: false, Valid: true} + member.SchemeUser = sql.NullBool{Bool: false, Valid: true} + for _, role := range roles { + if role == model.CHANNEL_ADMIN_ROLE_ID { + member.SchemeAdmin = sql.NullBool{Bool: true, Valid: true} + } else if role == model.CHANNEL_USER_ROLE_ID { + member.SchemeUser = sql.NullBool{Bool: true, Valid: true} + } else { + newRoles = append(newRoles, role) + } + } + member.Roles = strings.Join(newRoles, " ") + + if _, err := transaction.Update(&member); err != nil { + if err2 := transaction.Rollback(); err2 != nil { + result.Err = model.NewAppError("SqlChannelStore.MigrateChannelMembers", "store.sql_channel.migrate_channel_members.rollback_transaction.app_error", nil, err2.Error(), http.StatusInternalServerError) + return + } + result.Err = model.NewAppError("SqlChannelStore.MigrateChannelMembers", "store.sql_channel.migrate_channel_members.update.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + } + + if err := transaction.Commit(); err != nil { + if err2 := transaction.Rollback(); err2 != nil { + result.Err = model.NewAppError("SqlChannelStore.MigrateChannelMembers", "store.sql_channel.migrate_channel_members.rollback_transaction.app_error", nil, err2.Error(), http.StatusInternalServerError) + return + } + result.Err = model.NewAppError("SqlChannelStore.MigrateChannelMembers", "store.sql_channel.migrate_channel_members.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + data := make(map[string]string) + data["ChannelId"] = channelMembers[len(channelMembers)-1].ChannelId + data["UserId"] = channelMembers[len(channelMembers)-1].UserId + result.Data = data + }) +} diff --git a/store/sqlstore/team_store.go b/store/sqlstore/team_store.go index 9e72cc82e..ea5f7fd1f 100644 --- a/store/sqlstore/team_store.go +++ b/store/sqlstore/team_store.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" + "github.com/mattermost/gorp" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/store" ) @@ -725,3 +726,71 @@ func (s SqlTeamStore) GetTeamsByScheme(schemeId string, offset int, limit int) s } }) } + +// This function does the Advanced Permissions Phase 2 migration for TeamMember objects. It performs the migration +// in batches as a single transaction per batch to ensure consistency but to also minimise execution time to avoid +// causing unnecessary table locks. **THIS FUNCTION SHOULD NOT BE USED FOR ANY OTHER PURPOSE.** Executing this function +// *after* the new Schemes functionality has been used on an installation will have unintended consequences. +func (s SqlTeamStore) MigrateTeamMembers(fromTeamId string, fromUserId string) store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + var transaction *gorp.Transaction + var err error + + if transaction, err = s.GetMaster().Begin(); err != nil { + result.Err = model.NewAppError("SqlTeamStore.MigrateTeamMembers", "store.sql_team.migrate_team_members.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + var teamMembers []teamMember + if _, err := transaction.Select(&teamMembers, "SELECT * from TeamMembers WHERE (TeamId, UserId) > (:FromTeamId, :FromUserId) ORDER BY TeamId, UserId LIMIT 100", map[string]interface{}{"FromTeamId": fromTeamId, "FromUserId": fromUserId}); err != nil { + result.Err = model.NewAppError("SqlTeamStore.MigrateTeamMembers", "store.sql_team.migrate_team_members.select.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + if len(teamMembers) == 0 { + // No more team members in query result means that the migration has finished. + return + } + + for _, member := range teamMembers { + roles := strings.Fields(member.Roles) + var newRoles []string + member.SchemeAdmin = sql.NullBool{Bool: false, Valid: true} + member.SchemeUser = sql.NullBool{Bool: false, Valid: true} + for _, role := range roles { + if role == model.TEAM_ADMIN_ROLE_ID { + member.SchemeAdmin = sql.NullBool{Bool: true, Valid: true} + } else if role == model.TEAM_USER_ROLE_ID { + member.SchemeUser = sql.NullBool{Bool: true, Valid: true} + } else { + newRoles = append(newRoles, role) + } + } + member.Roles = strings.Join(newRoles, " ") + + if _, err := transaction.Update(&member); err != nil { + if err2 := transaction.Rollback(); err2 != nil { + result.Err = model.NewAppError("SqlTeamStore.MigrateTeamMembers", "store.sql_team.migrate_team_members.rollback_transaction.app_error", nil, err2.Error(), http.StatusInternalServerError) + return + } + result.Err = model.NewAppError("SqlTeamStore.MigrateTeamMembers", "store.sql_team.migrate_team_members.update.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + } + + if err := transaction.Commit(); err != nil { + if err2 := transaction.Rollback(); err2 != nil { + result.Err = model.NewAppError("SqlTeamStore.MigrateTeamMembers", "store.sql_team.migrate_team_members.rollback_transaction.app_error", nil, err2.Error(), http.StatusInternalServerError) + return + } + result.Err = model.NewAppError("SqlTeamStore.MigrateTeamMembers", "store.sql_team.migrate_team_members.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + data := make(map[string]string) + data["TeamId"] = teamMembers[len(teamMembers)-1].TeamId + data["UserId"] = teamMembers[len(teamMembers)-1].UserId + result.Data = data + }) +} diff --git a/store/store.go b/store/store.go index 2e85c0a68..bf2ac42f5 100644 --- a/store/store.go +++ b/store/store.go @@ -105,6 +105,7 @@ type TeamStore interface { RemoveAllMembersByUser(userId string) StoreChannel UpdateLastTeamIconUpdate(teamId string, curTime int64) StoreChannel GetTeamsByScheme(schemeId string, offset int, limit int) StoreChannel + MigrateTeamMembers(fromTeamId string, fromUserId string) StoreChannel } type ChannelStore interface { @@ -163,6 +164,7 @@ type ChannelStore interface { GetChannelUnread(channelId, userId string) StoreChannel ClearCaches() GetChannelsByScheme(schemeId string, offset int, limit int) StoreChannel + MigrateChannelMembers(fromChannelId string, fromUserId string) StoreChannel } type ChannelMemberHistoryStore interface { diff --git a/store/storetest/channel_store.go b/store/storetest/channel_store.go index d90a0ae1e..d044f3907 100644 --- a/store/storetest/channel_store.go +++ b/store/storetest/channel_store.go @@ -5,6 +5,7 @@ package storetest import ( "sort" + "strings" "testing" "time" @@ -52,6 +53,7 @@ func TestChannelStore(t *testing.T, ss store.Store) { t.Run("GetPinnedPosts", func(t *testing.T) { testChannelStoreGetPinnedPosts(t, ss) }) t.Run("MaxChannelsPerTeam", func(t *testing.T) { testChannelStoreMaxChannelsPerTeam(t, ss) }) t.Run("GetChannelsByScheme", func(t *testing.T) { testChannelStoreGetChannelsByScheme(t, ss) }) + t.Run("MigrateChannelMembers", func(t *testing.T) { testChannelStoreMigrateChannelMembers(t, ss) }) } @@ -2254,3 +2256,76 @@ func testChannelStoreGetChannelsByScheme(t *testing.T, ss store.Store) { d3 := res3.Data.(model.ChannelList) assert.Len(t, d3, 0) } + +func testChannelStoreMigrateChannelMembers(t *testing.T, ss store.Store) { + s1 := model.NewId() + c1 := &model.Channel{ + TeamId: model.NewId(), + DisplayName: "Name", + Name: model.NewId(), + Type: model.CHANNEL_OPEN, + SchemeId: &s1, + } + c1 = (<-ss.Channel().Save(c1, 100)).Data.(*model.Channel) + + cm1 := &model.ChannelMember{ + ChannelId: c1.Id, + UserId: model.NewId(), + ExplicitRoles: "channel_admin channel_user", + NotifyProps: model.GetDefaultChannelNotifyProps(), + } + cm2 := &model.ChannelMember{ + ChannelId: c1.Id, + UserId: model.NewId(), + ExplicitRoles: "channel_user", + NotifyProps: model.GetDefaultChannelNotifyProps(), + } + cm3 := &model.ChannelMember{ + ChannelId: c1.Id, + UserId: model.NewId(), + ExplicitRoles: "something_else", + NotifyProps: model.GetDefaultChannelNotifyProps(), + } + + cm1 = (<-ss.Channel().SaveMember(cm1)).Data.(*model.ChannelMember) + cm2 = (<-ss.Channel().SaveMember(cm2)).Data.(*model.ChannelMember) + cm3 = (<-ss.Channel().SaveMember(cm3)).Data.(*model.ChannelMember) + + lastDoneChannelId := strings.Repeat("0", 26) + lastDoneUserId := strings.Repeat("0", 26) + + for { + res := <-ss.Channel().MigrateChannelMembers(lastDoneChannelId, lastDoneUserId) + if assert.Nil(t, res.Err) { + if res.Data == nil { + break + } + data := res.Data.(map[string]string) + lastDoneChannelId = data["ChannelId"] + lastDoneUserId = data["UserId"] + } + } + + ss.Channel().ClearCaches() + + res1 := <-ss.Channel().GetMember(cm1.ChannelId, cm1.UserId) + assert.Nil(t, res1.Err) + cm1b := res1.Data.(*model.ChannelMember) + assert.Equal(t, "", cm1b.ExplicitRoles) + assert.True(t, cm1b.SchemeUser) + assert.True(t, cm1b.SchemeAdmin) + + res2 := <-ss.Channel().GetMember(cm2.ChannelId, cm2.UserId) + assert.Nil(t, res2.Err) + cm2b := res2.Data.(*model.ChannelMember) + assert.Equal(t, "", cm2b.ExplicitRoles) + assert.True(t, cm2b.SchemeUser) + assert.False(t, cm2b.SchemeAdmin) + + res3 := <-ss.Channel().GetMember(cm3.ChannelId, cm3.UserId) + assert.Nil(t, res3.Err) + cm3b := res3.Data.(*model.ChannelMember) + assert.Equal(t, "something_else", cm3b.ExplicitRoles) + assert.False(t, cm3b.SchemeUser) + assert.False(t, cm3b.SchemeAdmin) +} diff --git a/store/storetest/mocks/ChannelStore.go b/store/storetest/mocks/ChannelStore.go index ecc8b8768..8858e3d3b 100644 --- a/store/storetest/mocks/ChannelStore.go +++ b/store/storetest/mocks/ChannelStore.go @@ -583,6 +583,22 @@ func (_m *ChannelStore) IsUserInChannelUseCache(userId string, channelId string) return r0 } +// MigrateChannelMembers provides a mock function with given fields: fromChannelId, fromUserId +func (_m *ChannelStore) MigrateChannelMembers(fromChannelId string, fromUserId string) store.StoreChannel { + ret := _m.Called(fromChannelId, fromUserId) + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func(string, string) store.StoreChannel); ok { + r0 = rf(fromChannelId, fromUserId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} + // PermanentDelete provides a mock function with given fields: channelId func (_m *ChannelStore) PermanentDelete(channelId string) store.StoreChannel { ret := _m.Called(channelId) diff --git a/store/storetest/mocks/TeamStore.go b/store/storetest/mocks/TeamStore.go index 51a968784..93cb84caf 100644 --- a/store/storetest/mocks/TeamStore.go +++ b/store/storetest/mocks/TeamStore.go @@ -301,6 +301,22 @@ func (_m *TeamStore) GetTotalMemberCount(teamId string) store.StoreChannel { return r0 } +// MigrateTeamMembers provides a mock function with given fields: fromTeamId, fromUserId +func (_m *TeamStore) MigrateTeamMembers(fromTeamId string, fromUserId string) store.StoreChannel { + ret := _m.Called(fromTeamId, fromUserId) + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func(string, string) store.StoreChannel); ok { + r0 = rf(fromTeamId, fromUserId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} + // PermanentDelete provides a mock function with given fields: teamId func (_m *TeamStore) PermanentDelete(teamId string) store.StoreChannel { ret := _m.Called(teamId) diff --git a/store/storetest/team_store.go b/store/storetest/team_store.go index ff79650d5..726c17a99 100644 --- a/store/storetest/team_store.go +++ b/store/storetest/team_store.go @@ -4,6 +4,7 @@ package storetest import ( + "strings" "testing" "time" @@ -39,6 +40,7 @@ func TestTeamStore(t *testing.T, ss store.Store) { t.Run("GetChannelUnreadsForTeam", func(t *testing.T) { testGetChannelUnreadsForTeam(t, ss) }) t.Run("UpdateLastTeamIconUpdate", func(t *testing.T) { testUpdateLastTeamIconUpdate(t, ss) }) t.Run("GetTeamsByScheme", func(t *testing.T) { testGetTeamsByScheme(t, ss) }) + t.Run("MigrateTeamMembers", func(t *testing.T) { testTeamStoreMigrateTeamMembers(t, ss) }) } func testTeamStoreSave(t *testing.T, ss store.Store) { @@ -1098,3 +1100,72 @@ func testGetTeamsByScheme(t *testing.T, ss store.Store) { d3 := res3.Data.([]*model.Team) assert.Len(t, d3, 0) } + +func testTeamStoreMigrateTeamMembers(t *testing.T, ss store.Store) { + s1 := model.NewId() + t1 := &model.Team{ + DisplayName: "Name", + Name: "z-z-z" + model.NewId() + "b", + Email: model.NewId() + "@nowhere.com", + Type: model.TEAM_OPEN, + InviteId: model.NewId(), + SchemeId: &s1, + } + t1 = store.Must(ss.Team().Save(t1)).(*model.Team) + + tm1 := &model.TeamMember{ + TeamId: t1.Id, + UserId: model.NewId(), + ExplicitRoles: "team_admin team_user", + } + tm2 := &model.TeamMember{ + TeamId: t1.Id, + UserId: model.NewId(), + ExplicitRoles: "team_user", + } + tm3 := &model.TeamMember{ + TeamId: t1.Id, + UserId: model.NewId(), + ExplicitRoles: "something_else", + } + + tm1 = (<-ss.Team().SaveMember(tm1, -1)).Data.(*model.TeamMember) + tm2 = (<-ss.Team().SaveMember(tm2, -1)).Data.(*model.TeamMember) + tm3 = (<-ss.Team().SaveMember(tm3, -1)).Data.(*model.TeamMember) + + lastDoneTeamId := strings.Repeat("0", 26) + lastDoneUserId := strings.Repeat("0", 26) + + for { + res := <-ss.Team().MigrateTeamMembers(lastDoneTeamId, lastDoneUserId) + if assert.Nil(t, res.Err) { + if res.Data == nil { + break + } + data := res.Data.(map[string]string) + lastDoneTeamId = data["TeamId"] + lastDoneUserId = data["UserId"] + } + } + + res1 := <-ss.Team().GetMember(tm1.TeamId, tm1.UserId) + assert.Nil(t, res1.Err) + tm1b := res1.Data.(*model.TeamMember) + assert.Equal(t, "", tm1b.ExplicitRoles) + assert.True(t, tm1b.SchemeUser) + assert.True(t, tm1b.SchemeAdmin) + + res2 := <-ss.Team().GetMember(tm2.TeamId, tm2.UserId) + assert.Nil(t, res2.Err) + tm2b := res2.Data.(*model.TeamMember) + assert.Equal(t, "", tm2b.ExplicitRoles) + assert.True(t, tm2b.SchemeUser) + assert.False(t, tm2b.SchemeAdmin) + + res3 := <-ss.Team().GetMember(tm3.TeamId, tm3.UserId) + assert.Nil(t, res3.Err) + tm3b := res3.Data.(*model.TeamMember) + assert.Equal(t, "something_else", tm3b.ExplicitRoles) + assert.False(t, tm3b.SchemeUser) + assert.False(t, tm3b.SchemeAdmin) +} -- cgit v1.2.3-1-g7c22