From 375c0632fab03e3fb54865e320585888499c076d Mon Sep 17 00:00:00 2001 From: Jonathan Date: Thu, 30 Nov 2017 09:07:04 -0500 Subject: PLT-7503: Create Message Export Scheduled Task and CLI Command (#7612) * Created message export scheduled task * Added CLI command to immediately kick off an export job * Added email addresses for users joining and leaving the channel to the export * Added support for both MySQL and PostgreSQL * Fixing gofmt error * Added a new ChannelMemberHistory store and associated tests * Updating the ChannelMemberHistory channel as users create/join/leave channels * Added user email to the message export object so it can be included in the actiance export xml * Don't fail to log a leave event if a corresponding join event wasn't logged * Adding copyright notices * Adding message export settings to daily diagnostics report * Added System Console integration for message export * Cleaned up TODOs * Made batch size configurable * Added export from timestamp to CLI command * Made ChannelMemberHistory table updates best effort * Added a context-based timeout option to the message export CLI * Minor PR updates/improvements * Removed unnecessary fields from MessageExport object to reduce query overhead * Removed JSON functions from the message export query in an effort to optimize performance * Changed the way that channel member history queries and purges work to better account for edge cases * Fixing a test I missed with the last refactor * Added file copy functionality to file backend, improved config validation, added default config values * Fixed file copy tests * More concise use of the testing libraries * Fixed context leak error * Changed default export path to correctly place an 'export' directory under the 'data' directory * Can't delete records from a read replica * Fixed copy file tests * Start job workers when license is applied, if configured to do so * Suggestions from the PR * Moar unit tests * Fixed test imports --- app/app.go | 19 +++ app/channel.go | 36 ++++- app/channel_test.go | 166 +++++++++++++++++++ app/diagnostics.go | 8 + app/diagnostics_test.go | 1 + app/license.go | 10 ++ cmd/platform/mattermost.go | 2 +- cmd/platform/message_export.go | 79 +++++++++ cmd/platform/message_export_test.go | 65 ++++++++ config/default.json | 7 + einterfaces/jobs/message_export.go | 13 ++ einterfaces/message_export.go | 14 ++ i18n/en.json | 92 +++++++++++ jobs/jobs_watcher.go | 7 + jobs/schedulers.go | 4 + jobs/server.go | 1 + jobs/workers.go | 21 +++ model/channel_member_history.go | 12 ++ model/config.go | 66 ++++++++ model/config_test.go | 110 +++++++++++++ model/job.go | 2 + model/license.go | 6 + model/message_export.go | 18 +++ store/layered_store.go | 4 + store/sqlstore/channel_member_history_store.go | 102 ++++++++++++ .../sqlstore/channel_member_history_store_test.go | 14 ++ store/sqlstore/compliance_store.go | 33 ++++ store/sqlstore/supplier.go | 52 +++--- store/store.go | 9 ++ store/storetest/channel_member_history_store.go | 179 +++++++++++++++++++++ store/storetest/compliance_store.go | 117 ++++++++++++++ store/storetest/mocks/ChannelMemberHistoryStore.go | 77 +++++++++ store/storetest/mocks/ComplianceStore.go | 16 ++ store/storetest/mocks/LayeredStoreDatabaseLayer.go | 16 ++ store/storetest/mocks/Store.go | 16 ++ store/storetest/store.go | 63 ++++---- utils/file_backend.go | 1 + utils/file_backend_local.go | 7 + utils/file_backend_s3.go | 19 ++- utils/file_backend_test.go | 20 +++ utils/license.go | 1 + 41 files changed, 1446 insertions(+), 59 deletions(-) create mode 100644 cmd/platform/message_export.go create mode 100644 cmd/platform/message_export_test.go create mode 100644 einterfaces/jobs/message_export.go create mode 100644 einterfaces/message_export.go create mode 100644 model/channel_member_history.go create mode 100644 model/message_export.go create mode 100644 store/sqlstore/channel_member_history_store.go create mode 100644 store/sqlstore/channel_member_history_store_test.go create mode 100644 store/storetest/channel_member_history_store.go create mode 100644 store/storetest/mocks/ChannelMemberHistoryStore.go diff --git a/app/app.go b/app/app.go index 7bd4c561b..fd313c9c9 100644 --- a/app/app.go +++ b/app/app.go @@ -48,6 +48,7 @@ type App struct { Elasticsearch einterfaces.ElasticsearchInterface Emoji einterfaces.EmojiInterface Ldap einterfaces.LdapInterface + MessageExport einterfaces.MessageExportInterface Metrics einterfaces.MetricsInterface Mfa einterfaces.MfaInterface Saml einterfaces.SamlInterface @@ -198,6 +199,12 @@ func RegisterJobsDataRetentionJobInterface(f func(*App) ejobs.DataRetentionJobIn jobsDataRetentionJobInterface = f } +var jobsMessageExportJobInterface func(*App) ejobs.MessageExportJobInterface + +func RegisterJobsMessageExportJobInterface(f func(*App) ejobs.MessageExportJobInterface) { + jobsMessageExportJobInterface = f +} + var jobsElasticsearchAggregatorInterface func(*App) ejobs.ElasticsearchAggregatorInterface func RegisterJobsElasticsearchAggregatorInterface(f func(*App) ejobs.ElasticsearchAggregatorInterface) { @@ -222,6 +229,12 @@ func RegisterLdapInterface(f func(*App) einterfaces.LdapInterface) { ldapInterface = f } +var messageExportInterface func(*App) einterfaces.MessageExportInterface + +func RegisterMessageExportInterface(f func(*App) einterfaces.MessageExportInterface) { + messageExportInterface = f +} + var metricsInterface func(*App) einterfaces.MetricsInterface func RegisterMetricsInterface(f func(*App) einterfaces.MetricsInterface) { @@ -267,6 +280,9 @@ func (a *App) initEnterprise() { } }) } + if messageExportInterface != nil { + a.MessageExport = messageExportInterface(a) + } if metricsInterface != nil { a.Metrics = metricsInterface(a) } @@ -289,6 +305,9 @@ func (a *App) initJobs() { if jobsDataRetentionJobInterface != nil { a.Jobs.DataRetentionJob = jobsDataRetentionJobInterface(a) } + if jobsMessageExportJobInterface != nil { + a.Jobs.MessageExportJob = jobsMessageExportJobInterface(a) + } if jobsElasticsearchAggregatorInterface != nil { a.Jobs.ElasticsearchAggregator = jobsElasticsearchAggregatorInterface(a) } diff --git a/app/channel.go b/app/channel.go index 16c5dd084..caaacea06 100644 --- a/app/channel.go +++ b/app/channel.go @@ -49,12 +49,19 @@ func (a *App) JoinDefaultChannels(teamId string, user *model.User, channelRole s } else { townSquare := result.Data.(*model.Channel) - cm := &model.ChannelMember{ChannelId: townSquare.Id, UserId: user.Id, - Roles: channelRole, NotifyProps: model.GetDefaultChannelNotifyProps()} + cm := &model.ChannelMember{ + ChannelId: townSquare.Id, + UserId: user.Id, + Roles: channelRole, + NotifyProps: model.GetDefaultChannelNotifyProps(), + } if cmResult := <-a.Srv.Store.Channel().SaveMember(cm); cmResult.Err != nil { err = cmResult.Err } + if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(user.Id, townSquare.Id, model.GetMillis()); result.Err != nil { + l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err) + } if requestor == nil { if err := a.postJoinChannelMessage(user, townSquare); err != nil { @@ -74,12 +81,19 @@ func (a *App) JoinDefaultChannels(teamId string, user *model.User, channelRole s } else { offTopic := result.Data.(*model.Channel) - cm := &model.ChannelMember{ChannelId: offTopic.Id, UserId: user.Id, - Roles: channelRole, NotifyProps: model.GetDefaultChannelNotifyProps()} + cm := &model.ChannelMember{ + ChannelId: offTopic.Id, + UserId: user.Id, + Roles: channelRole, + NotifyProps: model.GetDefaultChannelNotifyProps(), + } if cmResult := <-a.Srv.Store.Channel().SaveMember(cm); cmResult.Err != nil { err = cmResult.Err } + if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(user.Id, offTopic.Id, model.GetMillis()); result.Err != nil { + l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err) + } if requestor == nil { if err := a.postJoinChannelMessage(user, offTopic); err != nil { @@ -158,6 +172,9 @@ func (a *App) CreateChannel(channel *model.Channel, addMember bool) (*model.Chan if cmresult := <-a.Srv.Store.Channel().SaveMember(cm); cmresult.Err != nil { return nil, cmresult.Err } + if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(channel.CreatorId, sc.Id, model.GetMillis()); result.Err != nil { + l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err) + } a.InvalidateCacheForUser(channel.CreatorId) } @@ -302,6 +319,9 @@ func (a *App) createGroupChannel(userIds []string, creatorId string) (*model.Cha if result := <-a.Srv.Store.Channel().SaveMember(cm); result.Err != nil { return nil, result.Err } + if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(user.Id, channel.Id, model.GetMillis()); result.Err != nil { + l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err) + } } return channel, nil @@ -520,9 +540,12 @@ func (a *App) addUserToChannel(user *model.User, channel *model.Channel, teamMem l4g.Error("Failed to add member user_id=%v channel_id=%v err=%v", user.Id, channel.Id, result.Err) return nil, model.NewAppError("AddUserToChannel", "api.channel.add_user.to.channel.failed.app_error", nil, "", http.StatusInternalServerError) } - a.WaitForChannelMembership(channel.Id, user.Id) + if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(user.Id, channel.Id, model.GetMillis()); result.Err != nil { + l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err) + } + a.InvalidateCacheForUser(user.Id) a.InvalidateCacheForChannelMembers(channel.Id) @@ -1069,6 +1092,9 @@ func (a *App) removeUserFromChannel(userIdToRemove string, removerUserId string, if cmresult := <-a.Srv.Store.Channel().RemoveMember(channel.Id, userIdToRemove); cmresult.Err != nil { return cmresult.Err } + if cmhResult := <-a.Srv.Store.ChannelMemberHistory().LogLeaveEvent(userIdToRemove, channel.Id, model.GetMillis()); cmhResult.Err != nil { + return cmhResult.Err + } a.InvalidateCacheForUser(userIdToRemove) a.InvalidateCacheForChannelMembers(channel.Id) diff --git a/app/channel_test.go b/app/channel_test.go index 374b20657..d44af467d 100644 --- a/app/channel_test.go +++ b/app/channel_test.go @@ -1,9 +1,14 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + package app import ( "testing" "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store" + "github.com/stretchr/testify/assert" ) func TestPermanentDeleteChannel(t *testing.T) { @@ -104,3 +109,164 @@ func TestMoveChannel(t *testing.T) { t.Fatal(err) } } + +func TestJoinDefaultChannelsTownSquare(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + // figure out the initial number of users in town square + townSquareChannelId := store.Must(th.App.Srv.Store.Channel().GetByName(th.BasicTeam.Id, "town-square", true)).(*model.Channel).Id + initialNumTownSquareUsers := len(store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, townSquareChannelId)).([]*model.ChannelMemberHistory)) + + // create a new user that joins the default channels + user := th.CreateUser() + th.App.JoinDefaultChannels(th.BasicTeam.Id, user, model.CHANNEL_USER_ROLE_ID, "") + + // there should be a ChannelMemberHistory record for the user + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, townSquareChannelId)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, initialNumTownSquareUsers+1) + + found := false + for _, history := range histories { + if user.Id == history.UserId && townSquareChannelId == history.ChannelId { + found = true + break + } + } + assert.True(t, found) +} + +func TestJoinDefaultChannelsOffTopic(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + // figure out the initial number of users in off-topic + offTopicChannelId := store.Must(th.App.Srv.Store.Channel().GetByName(th.BasicTeam.Id, "off-topic", true)).(*model.Channel).Id + initialNumTownSquareUsers := len(store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, offTopicChannelId)).([]*model.ChannelMemberHistory)) + + // create a new user that joins the default channels + user := th.CreateUser() + th.App.JoinDefaultChannels(th.BasicTeam.Id, user, model.CHANNEL_USER_ROLE_ID, "") + + // there should be a ChannelMemberHistory record for the user + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, offTopicChannelId)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, initialNumTownSquareUsers+1) + + found := false + for _, history := range histories { + if user.Id == history.UserId && offTopicChannelId == history.ChannelId { + found = true + break + } + } + assert.True(t, found) +} + +func TestCreateChannelPublic(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + // creates a public channel and adds basic user to it + publicChannel := th.createChannel(th.BasicTeam, model.CHANNEL_OPEN) + + // there should be a ChannelMemberHistory record for the user + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, publicChannel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, 1) + assert.Equal(t, th.BasicUser.Id, histories[0].UserId) + assert.Equal(t, publicChannel.Id, histories[0].ChannelId) +} + +func TestCreateChannelPrivate(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + // creates a private channel and adds basic user to it + privateChannel := th.createChannel(th.BasicTeam, model.CHANNEL_PRIVATE) + + // there should be a ChannelMemberHistory record for the user + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, privateChannel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, 1) + assert.Equal(t, th.BasicUser.Id, histories[0].UserId) + assert.Equal(t, privateChannel.Id, histories[0].ChannelId) +} + +func TestCreateGroupChannel(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + user1 := th.CreateUser() + user2 := th.CreateUser() + + groupUserIds := make([]string, 0) + groupUserIds = append(groupUserIds, user1.Id) + groupUserIds = append(groupUserIds, user2.Id) + groupUserIds = append(groupUserIds, th.BasicUser.Id) + + if channel, err := th.App.CreateGroupChannel(groupUserIds, th.BasicUser.Id); err != nil { + t.Fatal("Failed to create group channel. Error: " + err.Message) + } else { + // there should be a ChannelMemberHistory record for each user + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, channel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, 3) + + channelMemberHistoryUserIds := make([]string, 0) + for _, history := range histories { + assert.Equal(t, channel.Id, history.ChannelId) + channelMemberHistoryUserIds = append(channelMemberHistoryUserIds, history.UserId) + } + assert.Equal(t, groupUserIds, channelMemberHistoryUserIds) + } +} + +func TestAddUserToChannel(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + // create a user and add it to a channel + user := th.CreateUser() + if _, err := th.App.AddTeamMember(th.BasicTeam.Id, user.Id); err != nil { + t.Fatal("Failed to add user to team. Error: " + err.Message) + } + + groupUserIds := make([]string, 0) + groupUserIds = append(groupUserIds, th.BasicUser.Id) + groupUserIds = append(groupUserIds, user.Id) + + channel := th.createChannel(th.BasicTeam, model.CHANNEL_OPEN) + if _, err := th.App.AddUserToChannel(user, channel); err != nil { + t.Fatal("Failed to add user to channel. Error: " + err.Message) + } + + // there should be a ChannelMemberHistory record for the user + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, channel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, 2) + channelMemberHistoryUserIds := make([]string, 0) + for _, history := range histories { + assert.Equal(t, channel.Id, history.ChannelId) + channelMemberHistoryUserIds = append(channelMemberHistoryUserIds, history.UserId) + } + assert.Equal(t, groupUserIds, channelMemberHistoryUserIds) +} + +func TestRemoveUserFromChannel(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + // a user creates a channel + publicChannel := th.createChannel(th.BasicTeam, model.CHANNEL_OPEN) + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, publicChannel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, 1) + assert.Equal(t, th.BasicUser.Id, histories[0].UserId) + assert.Equal(t, publicChannel.Id, histories[0].ChannelId) + assert.Nil(t, histories[0].LeaveTime) + + // the user leaves that channel + if err := th.App.LeaveChannel(publicChannel.Id, th.BasicUser.Id); err != nil { + t.Fatal("Failed to remove user from channel. Error: " + err.Message) + } + histories = store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, publicChannel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, 1) + assert.Equal(t, th.BasicUser.Id, histories[0].UserId) + assert.Equal(t, publicChannel.Id, histories[0].ChannelId) + assert.NotNil(t, histories[0].LeaveTime) +} diff --git a/app/diagnostics.go b/app/diagnostics.go index 513cf11f5..3f37337ab 100644 --- a/app/diagnostics.go +++ b/app/diagnostics.go @@ -44,6 +44,7 @@ const ( TRACK_CONFIG_ELASTICSEARCH = "config_elasticsearch" TRACK_CONFIG_PLUGIN = "config_plugin" TRACK_CONFIG_DATA_RETENTION = "config_data_retention" + TRACK_CONFIG_MESSAGE_EXPORT = "config_message_export" TRACK_ACTIVITY = "activity" TRACK_LICENSE = "license" @@ -470,6 +471,13 @@ func (a *App) trackConfig() { "file_retention_days": *cfg.DataRetentionSettings.FileRetentionDays, "deletion_job_start_time": *cfg.DataRetentionSettings.DeletionJobStartTime, }) + + SendDiagnostic(TRACK_CONFIG_MESSAGE_EXPORT, map[string]interface{}{ + "enable_message_export": *cfg.MessageExportSettings.EnableExport, + "daily_run_time": *cfg.MessageExportSettings.DailyRunTime, + "default_export_from_timestamp": *cfg.MessageExportSettings.ExportFromTimestamp, + "batch_size": *cfg.MessageExportSettings.BatchSize, + }) } func trackLicense() { diff --git a/app/diagnostics_test.go b/app/diagnostics_test.go index 25bc75265..9b884fd43 100644 --- a/app/diagnostics_test.go +++ b/app/diagnostics_test.go @@ -135,6 +135,7 @@ func TestDiagnostics(t *testing.T) { TRACK_CONFIG_PLUGIN, TRACK_ACTIVITY, TRACK_SERVER, + TRACK_CONFIG_MESSAGE_EXPORT, TRACK_PLUGINS, } { if !strings.Contains(info, item) { diff --git a/app/license.go b/app/license.go index 18836c571..cacc71524 100644 --- a/app/license.go +++ b/app/license.go @@ -89,6 +89,16 @@ func (a *App) SaveLicense(licenseBytes []byte) (*model.License, *model.AppError) a.ReloadConfig() a.InvalidateAllCaches() + // start job server if necessary - this handles the edge case where a license file is uploaded, but the job server + // doesn't start until the server is restarted, which prevents the 'run job now' buttons in system console from + // functioning as expected + if *a.Config().JobSettings.RunJobs { + a.Jobs.StartWorkers() + } + if *a.Config().JobSettings.RunScheduler { + a.Jobs.StartSchedulers() + } + return license, nil } diff --git a/cmd/platform/mattermost.go b/cmd/platform/mattermost.go index 6c015c6db..be2ff8164 100644 --- a/cmd/platform/mattermost.go +++ b/cmd/platform/mattermost.go @@ -36,7 +36,7 @@ func init() { resetCmd.Flags().Bool("confirm", false, "Confirm you really want to delete everything and a DB backup has been performed.") - rootCmd.AddCommand(serverCmd, versionCmd, userCmd, teamCmd, licenseCmd, importCmd, resetCmd, channelCmd, rolesCmd, testCmd, ldapCmd, configCmd, jobserverCmd, commandCmd) + rootCmd.AddCommand(serverCmd, versionCmd, userCmd, teamCmd, licenseCmd, importCmd, resetCmd, channelCmd, rolesCmd, testCmd, ldapCmd, configCmd, jobserverCmd, commandCmd, messageExportCmd) } var rootCmd = &cobra.Command{ diff --git a/cmd/platform/message_export.go b/cmd/platform/message_export.go new file mode 100644 index 000000000..fb1f4073b --- /dev/null +++ b/cmd/platform/message_export.go @@ -0,0 +1,79 @@ +// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package main + +import ( + "errors" + + "context" + + "time" + + "github.com/mattermost/mattermost-server/model" + "github.com/spf13/cobra" +) + +var messageExportCmd = &cobra.Command{ + Use: "export", + Short: "Export data from Mattermost", + Long: "Export data from Mattermost in a format suitable for import into a third-party application", + Example: "export --format=actiance --exportFrom=12345", + RunE: messageExportCmdF, +} + +func init() { + messageExportCmd.Flags().String("format", "actiance", "The format to export data in") + messageExportCmd.Flags().Int64("exportFrom", -1, "The timestamp of the earliest post to export, expressed in seconds since the unix epoch.") + messageExportCmd.Flags().Int("timeoutSeconds", -1, "The maximum number of seconds to wait for the job to complete before timing out.") +} + +func messageExportCmdF(cmd *cobra.Command, args []string) error { + a, err := initDBCommandContextCobra(cmd) + if err != nil { + return err + } + + if !*a.Config().MessageExportSettings.EnableExport { + return errors.New("ERROR: The message export feature is not enabled") + } + + // for now, format is hard-coded to actiance. In time, we'll have to support other formats and inject them into job data + if format, err := cmd.Flags().GetString("format"); err != nil { + return errors.New("format flag error") + } else if format != "actiance" { + return errors.New("unsupported export format") + } + + startTime, err := cmd.Flags().GetInt64("exportFrom") + if err != nil { + return errors.New("exportFrom flag error") + } else if startTime < 0 { + return errors.New("exportFrom must be a positive integer") + } + + timeoutSeconds, err := cmd.Flags().GetInt("timeoutSeconds") + if err != nil { + return errors.New("timeoutSeconds error") + } else if timeoutSeconds < 0 { + return errors.New("timeoutSeconds must be a positive integer") + } + + if messageExportI := a.MessageExport; messageExportI != nil { + ctx := context.Background() + if timeoutSeconds > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeoutSeconds)) + defer cancel() + } + + job, err := messageExportI.StartSynchronizeJob(ctx, startTime) + if err != nil || job.Status == model.JOB_STATUS_ERROR || job.Status == model.JOB_STATUS_CANCELED { + CommandPrintErrorln("ERROR: Message export job failed. Please check the server logs") + } else { + CommandPrettyPrintln("SUCCESS: Message export job complete") + } + } + + return nil +} diff --git a/cmd/platform/message_export_test.go b/cmd/platform/message_export_test.go new file mode 100644 index 000000000..211c1ca3c --- /dev/null +++ b/cmd/platform/message_export_test.go @@ -0,0 +1,65 @@ +// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package main + +import ( + "testing" + + "io/ioutil" + "os" + "path/filepath" + + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/utils" + "github.com/stretchr/testify/require" +) + +// There are no tests that actually run the Message Export job, because it can take a long time to complete depending +// on the size of the database that the config is pointing to. As such, these tests just ensure that the CLI command +// fails fast if invalid flags are supplied + +func TestMessageExportNotEnabled(t *testing.T) { + configPath := writeTempConfig(t, false) + defer os.RemoveAll(filepath.Dir(configPath)) + + // should fail fast because the feature isn't enabled + require.Error(t, runCommand(t, "--config", configPath, "export")) +} + +func TestMessageExportInvalidFormat(t *testing.T) { + configPath := writeTempConfig(t, true) + defer os.RemoveAll(filepath.Dir(configPath)) + + // should fail fast because format isn't supported + require.Error(t, runCommand(t, "--config", configPath, "--format", "not_actiance", "export")) +} + +func TestMessageExportNegativeExportFrom(t *testing.T) { + configPath := writeTempConfig(t, true) + defer os.RemoveAll(filepath.Dir(configPath)) + + // should fail fast because export from must be a valid timestamp + require.Error(t, runCommand(t, "--config", configPath, "--format", "actiance", "--exportFrom", "-1", "export")) +} + +func TestMessageExportNegativeTimeoutSeconds(t *testing.T) { + configPath := writeTempConfig(t, true) + defer os.RemoveAll(filepath.Dir(configPath)) + + // should fail fast because timeout seconds must be a positive int + require.Error(t, runCommand(t, "--config", configPath, "--format", "actiance", "--exportFrom", "0", "--timeoutSeconds", "-1", "export")) +} + +func writeTempConfig(t *testing.T, isMessageExportEnabled bool) string { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + + utils.TranslationsPreInit() + config := utils.LoadGlobalConfig("config.json") + config.MessageExportSettings.EnableExport = model.NewBool(isMessageExportEnabled) + configPath := filepath.Join(dir, "foo.json") + require.NoError(t, ioutil.WriteFile(configPath, []byte(config.ToJson()), 0600)) + + return configPath +} diff --git a/config/default.json b/config/default.json index e6f9cab1b..1c13c0f09 100644 --- a/config/default.json +++ b/config/default.json @@ -334,6 +334,13 @@ "FileRetentionDays": 365, "DeletionJobStartTime": "02:00" }, + "MessageExportSettings": { + "EnableExport": false, + "DailyRunTime": "01:00", + "ExportFromTimestamp": 0, + "FileLocation": "export", + "BatchSize": 10000 + }, "JobSettings": { "RunJobs": true, "RunScheduler": true diff --git a/einterfaces/jobs/message_export.go b/einterfaces/jobs/message_export.go new file mode 100644 index 000000000..74b0df751 --- /dev/null +++ b/einterfaces/jobs/message_export.go @@ -0,0 +1,13 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "github.com/mattermost/mattermost-server/model" +) + +type MessageExportJobInterface interface { + MakeWorker() model.Worker + MakeScheduler() model.Scheduler +} diff --git a/einterfaces/message_export.go b/einterfaces/message_export.go new file mode 100644 index 000000000..ba498cdfb --- /dev/null +++ b/einterfaces/message_export.go @@ -0,0 +1,14 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package einterfaces + +import ( + "context" + + "github.com/mattermost/mattermost-server/model" +) + +type MessageExportInterface interface { + StartSynchronizeJob(ctx context.Context, exportFromTimestamp int64) (*model.Job, *model.AppError) +} diff --git a/i18n/en.json b/i18n/en.json index f252ebe08..60b289f88 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -1332,6 +1332,10 @@ "id": "api.file.move_file.configured.app_error", "translation": "File storage not configured properly. Please configure for either S3 or local server file storage." }, + { + "id": "api.file.move_file.copy_within_s3.app_error", + "translation": "Unable to copy file within S3." + }, { "id": "api.file.move_file.delete_from_s3.app_error", "translation": "Unable to delete file from S3." @@ -3850,6 +3854,18 @@ "id": "ent.data_retention.generic.license.error", "translation": "License does not support Data Retention." }, + { + "id": "ent.message_export.generic.license.error", + "translation": "License does not support Message Export." + }, + { + "id": "ent.elasticsearch.start.create_bulk_processor_failed.app_error", + "translation": "Failed to create Elasticsearch bulk processor" + }, + { + "id": "ent.elasticsearch.start.start_bulk_processor_failed.app_error", + "translation": "Failed to start Elasticsearch bulk processor" + }, { "id": "ent.elasticsearch.aggregator_worker.create_index_job.error", "translation": "Elasticsearch aggregator worker failed to create the indexing job" @@ -4378,6 +4394,54 @@ "id": "model.channel_member.is_valid.user_id.app_error", "translation": "Invalid user id" }, + { + "id": "model.channel_member_history.is_valid.channel_id.app_error", + "translation": "Invalid channel id" + }, + { + "id": "model.channel_member_history.is_valid.user_id.app_error", + "translation": "Invalid user id" + }, + { + "id": "model.channel_member_history.is_valid.user_email.app_error", + "translation": "Invalid user email" + }, + { + "id": "model.channel_member_history.is_valid.join_time.app_error", + "translation": "Invalid join time" + }, + { + "id": "model.channel_member_history.is_valid.leave_time.app_error", + "translation": "Invalid leave time" + }, + { + "id": "store.sql_channel_member_history.log_join_event.app_error", + "translation": "Failed to record channel member history" + }, + { + "id": "store.sql_channel_member_history.log_leave_event.select_error", + "translation": "Failed to record channel member history. No existing join record found" + }, + { + "id": "store.sql_channel_member_history.log_leave_event.update_error", + "translation": "Failed to record channel member history. Failed to update existing join record" + }, + { + "id": "store.sql_channel_member_history.get_users_in_channel_at.app_error", + "translation": "Failed to get users in channel at specified time" + }, + { + "id": "store.sql_channel_member_history.get_users_in_channel_during.app_error", + "translation": "Failed to get users in channel during specified time period" + }, + { + "id": "store.sql_channel_member_history.get_all.app_error", + "translation": "Failed to get records" + }, + { + "id": "store.sql_channel_member_history.purge_history_before.app_error", + "translation": "Failed to purge records" + }, { "id": "model.client.connecting.app_error", "translation": "We encountered an error while connecting to the server" @@ -4878,6 +4942,30 @@ "id": "model.config.is_valid.write_timeout.app_error", "translation": "Invalid value for write timeout." }, + { + "id": "model.config.is_valid.message_export.enable.app_error", + "translation": "Message export job EnableExport setting must be either true or false" + }, + { + "id": "model.config.is_valid.message_export.daily_runtime.app_error", + "translation": "Message export job DailyRuntime must be a 24-hour time stamp in the form HH:MM." + }, + { + "id": "model.config.is_valid.message_export.export_from.app_error", + "translation": "Message export job ExportFromTimestamp must be a timestamp (expressed in seconds since unix epoch). Only messages sent after this timestamp will be exported." + }, + { + "id": "model.config.is_valid.message_export.file_location.app_error", + "translation": "Message export job FileLocation must be a writable directory that export data will be written to" + }, + { + "id": "model.config.is_valid.message_export.file_location.relative", + "translation": "Message export job FileLocation must be a sub-directory of FileSettings.Directory" + }, + { + "id": "model.config.is_valid.message_export.batch_size.app_error", + "translation": "Message export job BatchSize must be a positive integer" + }, { "id": "model.emoji.create_at.app_error", "translation": "Create at must be a valid time" @@ -5474,6 +5562,10 @@ "id": "store.sql.upgraded.warn", "translation": "The database schema has been upgraded to version %v" }, + { + "id": "store.sql_compliance.message_export.app_error", + "translation": "Failed to select message export data" + }, { "id": "store.sql_audit.get.finding.app_error", "translation": "We encountered an error finding the audits" diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go index b36a99051..f519e7cca 100644 --- a/jobs/jobs_watcher.go +++ b/jobs/jobs_watcher.go @@ -78,6 +78,13 @@ func (watcher *Watcher) PollAndNotify() { default: } } + } else if job.Type == model.JOB_TYPE_MESSAGE_EXPORT { + if watcher.workers.MessageExport != nil { + select { + case watcher.workers.MessageExport.JobChannel() <- *job: + default: + } + } } else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING { if watcher.workers.ElasticsearchIndexing != nil { select { diff --git a/jobs/schedulers.go b/jobs/schedulers.go index cbe5f1749..bec53a49b 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -39,6 +39,10 @@ func (srv *JobServer) InitSchedulers() *Schedulers { schedulers.schedulers = append(schedulers.schedulers, srv.DataRetentionJob.MakeScheduler()) } + if srv.MessageExportJob != nil { + schedulers.schedulers = append(schedulers.schedulers, srv.MessageExportJob.MakeScheduler()) + } + if elasticsearchAggregatorInterface := srv.ElasticsearchAggregator; elasticsearchAggregatorInterface != nil { schedulers.schedulers = append(schedulers.schedulers, elasticsearchAggregatorInterface.MakeScheduler()) } diff --git a/jobs/server.go b/jobs/server.go index 40cfb1f64..777b02a26 100644 --- a/jobs/server.go +++ b/jobs/server.go @@ -19,6 +19,7 @@ type JobServer struct { Schedulers *Schedulers DataRetentionJob ejobs.DataRetentionJobInterface + MessageExportJob ejobs.MessageExportJobInterface ElasticsearchAggregator ejobs.ElasticsearchAggregatorInterface ElasticsearchIndexer ejobs.ElasticsearchIndexerInterface LdapSync ejobs.LdapSyncInterface diff --git a/jobs/workers.go b/jobs/workers.go index b1d275658..3abd7131c 100644 --- a/jobs/workers.go +++ b/jobs/workers.go @@ -17,6 +17,7 @@ type Workers struct { Watcher *Watcher DataRetention model.Worker + MessageExport model.Worker ElasticsearchIndexing model.Worker ElasticsearchAggregation model.Worker LdapSync model.Worker @@ -34,6 +35,10 @@ func (srv *JobServer) InitWorkers() *Workers { workers.DataRetention = srv.DataRetentionJob.MakeWorker() } + if srv.MessageExportJob != nil { + workers.MessageExport = srv.MessageExportJob.MakeWorker() + } + if elasticsearchIndexerInterface := srv.ElasticsearchIndexer; elasticsearchIndexerInterface != nil { workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker() } @@ -57,6 +62,10 @@ func (workers *Workers) Start() *Workers { go workers.DataRetention.Run() } + if workers.MessageExport != nil && *workers.Config().MessageExportSettings.EnableExport { + go workers.MessageExport.Run() + } + if workers.ElasticsearchIndexing != nil && *workers.Config().ElasticsearchSettings.EnableIndexing { go workers.ElasticsearchIndexing.Run() } @@ -86,6 +95,14 @@ func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *m } } + if workers.MessageExport != nil { + if !*oldConfig.MessageExportSettings.EnableExport && *newConfig.MessageExportSettings.EnableExport { + go workers.MessageExport.Run() + } else if *oldConfig.MessageExportSettings.EnableExport && !*newConfig.MessageExportSettings.EnableExport { + workers.MessageExport.Stop() + } + } + if workers.ElasticsearchIndexing != nil { if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing { go workers.ElasticsearchIndexing.Run() @@ -120,6 +137,10 @@ func (workers *Workers) Stop() *Workers { workers.DataRetention.Stop() } + if workers.MessageExport != nil && *workers.Config().MessageExportSettings.EnableExport { + workers.MessageExport.Stop() + } + if workers.ElasticsearchIndexing != nil && *workers.Config().ElasticsearchSettings.EnableIndexing { workers.ElasticsearchIndexing.Stop() } diff --git a/model/channel_member_history.go b/model/channel_member_history.go new file mode 100644 index 000000000..bc71b580a --- /dev/null +++ b/model/channel_member_history.go @@ -0,0 +1,12 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package model + +type ChannelMemberHistory struct { + ChannelId string + UserId string + UserEmail string `db:"Email"` + JoinTime int64 + LeaveTime *int64 +} diff --git a/model/config.go b/model/config.go index e2f05d72e..1f56eb4f5 100644 --- a/model/config.go +++ b/model/config.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "net/url" + "path/filepath" "strings" "time" ) @@ -1508,6 +1509,36 @@ func (s *PluginSettings) SetDefaults() { } } +type MessageExportSettings struct { + EnableExport *bool + DailyRunTime *string + ExportFromTimestamp *int64 + FileLocation *string + BatchSize *int +} + +func (s *MessageExportSettings) SetDefaults() { + if s.EnableExport == nil { + s.EnableExport = NewBool(false) + } + + if s.FileLocation == nil { + s.FileLocation = NewString("export") + } + + if s.DailyRunTime == nil { + s.DailyRunTime = NewString("01:00") + } + + if s.ExportFromTimestamp == nil { + s.ExportFromTimestamp = NewInt64(0) + } + + if s.BatchSize == nil { + s.BatchSize = NewInt(10000) + } +} + type ConfigFunc func() *Config type Config struct { @@ -1538,6 +1569,7 @@ type Config struct { WebrtcSettings WebrtcSettings ElasticsearchSettings ElasticsearchSettings DataRetentionSettings DataRetentionSettings + MessageExportSettings MessageExportSettings JobSettings JobSettings PluginSettings PluginSettings } @@ -1617,6 +1649,7 @@ func (o *Config) SetDefaults() { o.LogSettings.SetDefaults() o.JobSettings.SetDefaults() o.WebrtcSettings.SetDefaults() + o.MessageExportSettings.SetDefaults() } func (o *Config) IsValid() *AppError { @@ -1680,6 +1713,10 @@ func (o *Config) IsValid() *AppError { return err } + if err := o.MessageExportSettings.isValid(o.FileSettings); err != nil { + return err + } + return nil } @@ -1998,6 +2035,35 @@ func (ls *LocalizationSettings) isValid() *AppError { return nil } +func (mes *MessageExportSettings) isValid(fs FileSettings) *AppError { + if mes.EnableExport == nil { + return NewAppError("Config.IsValid", "model.config.is_valid.message_export.enable.app_error", nil, "", http.StatusBadRequest) + } + if *mes.EnableExport { + if mes.ExportFromTimestamp == nil || *mes.ExportFromTimestamp < 0 || *mes.ExportFromTimestamp > time.Now().Unix() { + return NewAppError("Config.IsValid", "model.config.is_valid.message_export.export_from.app_error", nil, "", http.StatusBadRequest) + } else if mes.DailyRunTime == nil { + return NewAppError("Config.IsValid", "model.config.is_valid.message_export.daily_runtime.app_error", nil, "", http.StatusBadRequest) + } else if _, err := time.Parse("15:04", *mes.DailyRunTime); err != nil { + return NewAppError("Config.IsValid", "model.config.is_valid.message_export.daily_runtime.app_error", nil, err.Error(), http.StatusBadRequest) + } else if mes.FileLocation == nil { + return NewAppError("Config.IsValid", "model.config.is_valid.message_export.file_location.app_error", nil, "", http.StatusBadRequest) + } else if mes.BatchSize == nil || *mes.BatchSize < 0 { + return NewAppError("Config.IsValid", "model.config.is_valid.message_export.batch_size.app_error", nil, "", http.StatusBadRequest) + } else if *fs.DriverName != IMAGE_DRIVER_LOCAL { + if absFileDir, err := filepath.Abs(fs.Directory); err != nil { + return NewAppError("Config.IsValid", "model.config.is_valid.message_export.file_location.relative", nil, err.Error(), http.StatusBadRequest) + } else if absMessageExportDir, err := filepath.Abs(*mes.FileLocation); err != nil { + return NewAppError("Config.IsValid", "model.config.is_valid.message_export.file_location.relative", nil, err.Error(), http.StatusBadRequest) + } else if !strings.HasPrefix(absMessageExportDir, absFileDir) { + // configured export directory must be relative to data directory + return NewAppError("Config.IsValid", "model.config.is_valid.message_export.file_location.relative", nil, "", http.StatusBadRequest) + } + } + } + return nil +} + func (o *Config) GetSanitizeOptions() map[string]bool { options := map[string]bool{} options["fullname"] = o.PrivacySettings.ShowFullName diff --git a/model/config_test.go b/model/config_test.go index 86958458c..58f690165 100644 --- a/model/config_test.go +++ b/model/config_test.go @@ -5,6 +5,10 @@ package model import ( "testing" + + "os" + + "github.com/stretchr/testify/require" ) func TestConfigDefaultFileSettingsDirectory(t *testing.T) { @@ -33,3 +37,109 @@ func TestConfigDefaultFileSettingsS3SSE(t *testing.T) { t.Fatal("FileSettings.AmazonS3SSE should default to false") } } + +func TestMessageExportSettingsIsValidEnableExportNotSet(t *testing.T) { + fs := &FileSettings{} + mes := &MessageExportSettings{} + + // should fail fast because mes.EnableExport is not set + require.Error(t, mes.isValid(*fs)) +} + +func TestMessageExportSettingsIsValidEnableExportFalse(t *testing.T) { + fs := &FileSettings{} + mes := &MessageExportSettings{ + EnableExport: NewBool(false), + } + + // should fail fast because message export isn't enabled + require.Nil(t, mes.isValid(*fs)) +} + +func TestMessageExportSettingsIsValidExportFromTimestampInvalid(t *testing.T) { + fs := &FileSettings{} + mes := &MessageExportSettings{ + EnableExport: NewBool(true), + } + + // should fail fast because export from timestamp isn't set + require.Error(t, mes.isValid(*fs)) + + mes.ExportFromTimestamp = NewInt64(-1) + + // should fail fast because export from timestamp isn't valid + require.Error(t, mes.isValid(*fs)) + + mes.ExportFromTimestamp = NewInt64(GetMillis() + 10000) + + // should fail fast because export from timestamp is greater than current time + require.Error(t, mes.isValid(*fs)) +} + +func TestMessageExportSettingsIsValidDailyRunTimeInvalid(t *testing.T) { + fs := &FileSettings{} + mes := &MessageExportSettings{ + EnableExport: NewBool(true), + ExportFromTimestamp: NewInt64(0), + } + + // should fail fast because daily runtime isn't set + require.Error(t, mes.isValid(*fs)) + + mes.DailyRunTime = NewString("33:33:33") + + // should fail fast because daily runtime is invalid format + require.Error(t, mes.isValid(*fs)) +} + +func TestMessageExportSettingsIsValidBatchSizeInvalid(t *testing.T) { + fs := &FileSettings{ + DriverName: NewString("foo"), // bypass file location check + } + mes := &MessageExportSettings{ + EnableExport: NewBool(true), + ExportFromTimestamp: NewInt64(0), + DailyRunTime: NewString("15:04"), + FileLocation: NewString("foo"), + } + + // should fail fast because batch size isn't set + require.Error(t, mes.isValid(*fs)) +} + +func TestMessageExportSettingsIsValidFileLocationInvalid(t *testing.T) { + fs := &FileSettings{} + mes := &MessageExportSettings{ + EnableExport: NewBool(true), + ExportFromTimestamp: NewInt64(0), + DailyRunTime: NewString("15:04"), + BatchSize: NewInt(100), + } + + // should fail fast because FileLocation isn't set + require.Error(t, mes.isValid(*fs)) + + // if using the local file driver, there are more rules for FileLocation + fs.DriverName = NewString(IMAGE_DRIVER_LOCAL) + fs.Directory, _ = os.Getwd() + mes.FileLocation = NewString("") + + // should fail fast because file location is not relative to basepath + require.Error(t, mes.isValid(*fs)) +} + +func TestMessageExportSettingsIsValid(t *testing.T) { + fs := &FileSettings{ + DriverName: NewString("foo"), // bypass file location check + } + mes := &MessageExportSettings{ + EnableExport: NewBool(true), + ExportFromTimestamp: NewInt64(0), + DailyRunTime: NewString("15:04"), + FileLocation: NewString("foo"), + BatchSize: NewInt(100), + } + + // should pass because everything is valid + require.Nil(t, mes.isValid(*fs)) +} diff --git a/model/job.go b/model/job.go index 843d73fad..9a7566025 100644 --- a/model/job.go +++ b/model/job.go @@ -12,6 +12,7 @@ import ( const ( JOB_TYPE_DATA_RETENTION = "data_retention" + JOB_TYPE_MESSAGE_EXPORT = "message_export" JOB_TYPE_ELASTICSEARCH_POST_INDEXING = "elasticsearch_post_indexing" JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION = "elasticsearch_post_aggregation" JOB_TYPE_LDAP_SYNC = "ldap_sync" @@ -50,6 +51,7 @@ func (j *Job) IsValid() *AppError { case JOB_TYPE_ELASTICSEARCH_POST_INDEXING: case JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION: case JOB_TYPE_LDAP_SYNC: + case JOB_TYPE_MESSAGE_EXPORT: default: return NewAppError("Job.IsValid", "model.job.is_valid.type.app_error", nil, "id="+j.Id, http.StatusBadRequest) } diff --git a/model/license.go b/model/license.go index 3e42a2343..a81f882ca 100644 --- a/model/license.go +++ b/model/license.go @@ -55,6 +55,7 @@ type Features struct { ThemeManagement *bool `json:"theme_management"` EmailNotificationContents *bool `json:"email_notification_contents"` DataRetention *bool `json:"data_retention"` + MessageExport *bool `json:"message_export"` // after we enabled more features for webrtc we'll need to control them with this FutureFeatures *bool `json:"future_features"` @@ -76,6 +77,7 @@ func (f *Features) ToMap() map[string]interface{} { "elastic_search": *f.Elasticsearch, "email_notification_contents": *f.EmailNotificationContents, "data_retention": *f.DataRetention, + "message_export": *f.MessageExport, "future": *f.FutureFeatures, } } @@ -152,6 +154,10 @@ func (f *Features) SetDefaults() { if f.DataRetention == nil { f.DataRetention = NewBool(*f.FutureFeatures) } + + if f.MessageExport == nil { + f.MessageExport = NewBool(*f.FutureFeatures) + } } func (l *License) IsExpired() bool { diff --git a/model/message_export.go b/model/message_export.go new file mode 100644 index 000000000..b59b114d4 --- /dev/null +++ b/model/message_export.go @@ -0,0 +1,18 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package model + +type MessageExport struct { + ChannelId *string + ChannelDisplayName *string + + UserId *string + UserEmail *string + + PostId *string + PostCreateAt *int64 + PostMessage *string + PostType *string + PostFileIds StringArray +} diff --git a/store/layered_store.go b/store/layered_store.go index ecf02864c..65b4670c0 100644 --- a/store/layered_store.go +++ b/store/layered_store.go @@ -153,6 +153,10 @@ func (s *LayeredStore) UserAccessToken() UserAccessTokenStore { return s.DatabaseLayer.UserAccessToken() } +func (s *LayeredStore) ChannelMemberHistory() ChannelMemberHistoryStore { + return s.DatabaseLayer.ChannelMemberHistory() +} + func (s *LayeredStore) Plugin() PluginStore { return s.DatabaseLayer.Plugin() } diff --git a/store/sqlstore/channel_member_history_store.go b/store/sqlstore/channel_member_history_store.go new file mode 100644 index 000000000..20d0d3335 --- /dev/null +++ b/store/sqlstore/channel_member_history_store.go @@ -0,0 +1,102 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package sqlstore + +import ( + "net/http" + + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store" +) + +type SqlChannelMemberHistoryStore struct { + SqlStore +} + +func NewSqlChannelMemberHistoryStore(sqlStore SqlStore) store.ChannelMemberHistoryStore { + s := &SqlChannelMemberHistoryStore{ + SqlStore: sqlStore, + } + + for _, db := range sqlStore.GetAllConns() { + table := db.AddTableWithName(model.ChannelMemberHistory{}, "ChannelMemberHistory").SetKeys(false, "ChannelId", "UserId", "JoinTime") + table.ColMap("ChannelId").SetMaxSize(26) + table.ColMap("UserId").SetMaxSize(26) + table.ColMap("JoinTime").SetNotNull(true) + } + + return s +} + +func (s SqlChannelMemberHistoryStore) LogJoinEvent(userId string, channelId string, joinTime int64) store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + channelMemberHistory := &model.ChannelMemberHistory{ + UserId: userId, + ChannelId: channelId, + JoinTime: joinTime, + } + + if err := s.GetMaster().Insert(channelMemberHistory); err != nil { + result.Err = model.NewAppError("SqlChannelMemberHistoryStore.LogJoinEvent", "store.sql_channel_member_history.log_join_event.app_error", map[string]interface{}{"ChannelMemberHistory": channelMemberHistory}, err.Error(), http.StatusInternalServerError) + } + }) +} + +func (s SqlChannelMemberHistoryStore) LogLeaveEvent(userId string, channelId string, leaveTime int64) store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + query := ` + UPDATE ChannelMemberHistory + SET LeaveTime = :LeaveTime + WHERE UserId = :UserId + AND ChannelId = :ChannelId + AND LeaveTime IS NULL` + + params := map[string]interface{}{"UserId": userId, "ChannelId": channelId, "LeaveTime": leaveTime} + if sqlResult, err := s.GetMaster().Exec(query, params); err != nil { + result.Err = model.NewAppError("SqlChannelMemberHistoryStore.LogLeaveEvent", "store.sql_channel_member_history.log_leave_event.update_error", nil, err.Error(), http.StatusInternalServerError) + } else if rows, err := sqlResult.RowsAffected(); err == nil && rows != 1 { + // there was no join event to update + l4g.Warn("Channel join event for user %v and channel %v not found", userId, channelId) + } + }) +} + +func (s SqlChannelMemberHistoryStore) GetUsersInChannelDuring(startTime int64, endTime int64, channelId string) store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + query := ` + SELECT + cmh.*, + u.Email + FROM ChannelMemberHistory cmh + INNER JOIN Users u ON cmh.UserId = u.Id + WHERE cmh.ChannelId = :ChannelId + AND cmh.JoinTime <= :EndTime + AND (cmh.LeaveTime IS NULL OR cmh.LeaveTime >= :StartTime) + ORDER BY cmh.JoinTime ASC` + + params := map[string]interface{}{"ChannelId": channelId, "StartTime": startTime, "EndTime": endTime} + var histories []*model.ChannelMemberHistory + if _, err := s.GetReplica().Select(&histories, query, params); err != nil { + result.Err = model.NewAppError("SqlChannelMemberHistoryStore.GetUsersInChannelAt", "store.sql_channel_member_history.get_users_in_channel_during.app_error", params, err.Error(), http.StatusInternalServerError) + } else { + result.Data = histories + } + }) +} + +func (s SqlChannelMemberHistoryStore) PurgeHistoryBefore(time int64, channelId string) store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + query := ` + DELETE FROM ChannelMemberHistory + WHERE ChannelId = :ChannelId + AND LeaveTime IS NOT NULL + AND LeaveTime <= :AtTime` + + params := map[string]interface{}{"AtTime": time, "ChannelId": channelId} + if _, err := s.GetMaster().Exec(query, params); err != nil { + result.Err = model.NewAppError("SqlChannelMemberHistoryStore.PurgeHistoryBefore", "store.sql_channel_member_history.purge_history_before.app_error", params, err.Error(), http.StatusInternalServerError) + } + }) +} diff --git a/store/sqlstore/channel_member_history_store_test.go b/store/sqlstore/channel_member_history_store_test.go new file mode 100644 index 000000000..c1119d227 --- /dev/null +++ b/store/sqlstore/channel_member_history_store_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package sqlstore + +import ( + "testing" + + "github.com/mattermost/mattermost-server/store/storetest" +) + +func TestChannelMemberHistoryStore(t *testing.T) { + StoreTest(t, storetest.TestChannelMemberHistoryStore) +} diff --git a/store/sqlstore/compliance_store.go b/store/sqlstore/compliance_store.go index 3d638b1fd..a25b01548 100644 --- a/store/sqlstore/compliance_store.go +++ b/store/sqlstore/compliance_store.go @@ -211,3 +211,36 @@ func (s SqlComplianceStore) ComplianceExport(job *model.Compliance) store.StoreC } }) } + +func (s SqlComplianceStore) MessageExport(after int64, limit int) store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + props := map[string]interface{}{"StartTime": after, "Limit": limit} + query := + `SELECT + Posts.Id AS PostId, + Posts.CreateAt AS PostCreateAt, + Posts.Message AS PostMessage, + Posts.Type AS PostType, + Posts.FileIds AS PostFileIds, + Channels.Id AS ChannelId, + Channels.DisplayName AS ChannelDisplayName, + Users.Id AS UserId, + Users.Email AS UserEmail + FROM + Posts + LEFT OUTER JOIN Channels ON Posts.ChannelId = Channels.Id + LEFT OUTER JOIN Users ON Posts.UserId = Users.Id + WHERE + Posts.CreateAt > :StartTime AND + Posts.Type = '' + ORDER BY PostCreateAt + LIMIT :Limit` + + var cposts []*model.MessageExport + if _, err := s.GetReplica().Select(&cposts, query, props); err != nil { + result.Err = model.NewAppError("SqlComplianceStore.MessageExport", "store.sql_compliance.message_export.app_error", nil, err.Error(), http.StatusInternalServerError) + } else { + result.Data = cposts + } + }) +} diff --git a/store/sqlstore/supplier.go b/store/sqlstore/supplier.go index dbe4aa92c..3b9528578 100644 --- a/store/sqlstore/supplier.go +++ b/store/sqlstore/supplier.go @@ -62,29 +62,30 @@ const ( ) type SqlSupplierOldStores struct { - team store.TeamStore - channel store.ChannelStore - post store.PostStore - user store.UserStore - audit store.AuditStore - cluster store.ClusterDiscoveryStore - compliance store.ComplianceStore - session store.SessionStore - oauth store.OAuthStore - system store.SystemStore - webhook store.WebhookStore - command store.CommandStore - commandWebhook store.CommandWebhookStore - preference store.PreferenceStore - license store.LicenseStore - token store.TokenStore - emoji store.EmojiStore - status store.StatusStore - fileInfo store.FileInfoStore - reaction store.ReactionStore - job store.JobStore - userAccessToken store.UserAccessTokenStore - plugin store.PluginStore + team store.TeamStore + channel store.ChannelStore + post store.PostStore + user store.UserStore + audit store.AuditStore + cluster store.ClusterDiscoveryStore + compliance store.ComplianceStore + session store.SessionStore + oauth store.OAuthStore + system store.SystemStore + webhook store.WebhookStore + command store.CommandStore + commandWebhook store.CommandWebhookStore + preference store.PreferenceStore + license store.LicenseStore + token store.TokenStore + emoji store.EmojiStore + status store.StatusStore + fileInfo store.FileInfoStore + reaction store.ReactionStore + job store.JobStore + userAccessToken store.UserAccessTokenStore + plugin store.PluginStore + channelMemberHistory store.ChannelMemberHistoryStore } type SqlSupplier struct { @@ -130,6 +131,7 @@ func NewSqlSupplier(settings model.SqlSettings, metrics einterfaces.MetricsInter supplier.oldStores.fileInfo = NewSqlFileInfoStore(supplier, metrics) supplier.oldStores.job = NewSqlJobStore(supplier) supplier.oldStores.userAccessToken = NewSqlUserAccessTokenStore(supplier) + supplier.oldStores.channelMemberHistory = NewSqlChannelMemberHistoryStore(supplier) supplier.oldStores.plugin = NewSqlPluginStore(supplier) initSqlSupplierReactions(supplier) @@ -801,6 +803,10 @@ func (ss *SqlSupplier) UserAccessToken() store.UserAccessTokenStore { return ss.oldStores.userAccessToken } +func (ss *SqlSupplier) ChannelMemberHistory() store.ChannelMemberHistoryStore { + return ss.oldStores.channelMemberHistory +} + func (ss *SqlSupplier) Plugin() store.PluginStore { return ss.oldStores.plugin } diff --git a/store/store.go b/store/store.go index 3c950495d..c95888c22 100644 --- a/store/store.go +++ b/store/store.go @@ -63,6 +63,7 @@ type Store interface { Reaction() ReactionStore Job() JobStore UserAccessToken() UserAccessTokenStore + ChannelMemberHistory() ChannelMemberHistoryStore Plugin() PluginStore MarkSystemRanUnitTests() Close() @@ -160,6 +161,13 @@ type ChannelStore interface { GetChannelUnread(channelId, userId string) StoreChannel } +type ChannelMemberHistoryStore interface { + LogJoinEvent(userId string, channelId string, joinTime int64) StoreChannel + LogLeaveEvent(userId string, channelId string, leaveTime int64) StoreChannel + GetUsersInChannelDuring(startTime int64, endTime int64, channelId string) StoreChannel + PurgeHistoryBefore(time int64, channelId string) StoreChannel +} + type PostStore interface { Save(post *model.Post) StoreChannel Update(newPost *model.Post, oldPost *model.Post) StoreChannel @@ -276,6 +284,7 @@ type ComplianceStore interface { Get(id string) StoreChannel GetAll(offset, limit int) StoreChannel ComplianceExport(compliance *model.Compliance) StoreChannel + MessageExport(after int64, limit int) StoreChannel } type OAuthStore interface { diff --git a/store/storetest/channel_member_history_store.go b/store/storetest/channel_member_history_store.go new file mode 100644 index 000000000..c73a25f65 --- /dev/null +++ b/store/storetest/channel_member_history_store.go @@ -0,0 +1,179 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package storetest + +import ( + "testing" + + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store" + "github.com/stretchr/testify/assert" +) + +func TestChannelMemberHistoryStore(t *testing.T, ss store.Store) { + t.Run("Log Join Event", func(t *testing.T) { testLogJoinEvent(t, ss) }) + t.Run("Log Leave Event", func(t *testing.T) { testLogLeaveEvent(t, ss) }) + t.Run("Get Users In Channel At Time", func(t *testing.T) { testGetUsersInChannelAt(t, ss) }) + t.Run("Purge History", func(t *testing.T) { testPurgeHistoryBefore(t, ss) }) +} + +func testLogJoinEvent(t *testing.T, ss store.Store) { + // create a test channel + channel := model.Channel{ + TeamId: model.NewId(), + DisplayName: "Display " + model.NewId(), + Name: "zz" + model.NewId() + "b", + Type: model.CHANNEL_OPEN, + } + channel = *store.Must(ss.Channel().Save(&channel, -1)).(*model.Channel) + + // and a test user + user := model.User{ + Email: model.NewId() + "@mattermost.com", + Nickname: model.NewId(), + } + user = *store.Must(ss.User().Save(&user)).(*model.User) + + // log a join event + result := <-ss.ChannelMemberHistory().LogJoinEvent(user.Id, channel.Id, model.GetMillis()) + assert.Nil(t, result.Err) +} + +func testLogLeaveEvent(t *testing.T, ss store.Store) { + // create a test channel + channel := model.Channel{ + TeamId: model.NewId(), + DisplayName: "Display " + model.NewId(), + Name: "zz" + model.NewId() + "b", + Type: model.CHANNEL_OPEN, + } + channel = *store.Must(ss.Channel().Save(&channel, -1)).(*model.Channel) + + // and a test user + user := model.User{ + Email: model.NewId() + "@mattermost.com", + Nickname: model.NewId(), + } + user = *store.Must(ss.User().Save(&user)).(*model.User) + + // log a join event, followed by a leave event + result := <-ss.ChannelMemberHistory().LogJoinEvent(user.Id, channel.Id, model.GetMillis()) + assert.Nil(t, result.Err) + + result = <-ss.ChannelMemberHistory().LogLeaveEvent(user.Id, channel.Id, model.GetMillis()) + assert.Nil(t, result.Err) +} + +func testGetUsersInChannelAt(t *testing.T, ss store.Store) { + // create a test channel + channel := model.Channel{ + TeamId: model.NewId(), + DisplayName: "Display " + model.NewId(), + Name: "zz" + model.NewId() + "b", + Type: model.CHANNEL_OPEN, + } + channel = *store.Must(ss.Channel().Save(&channel, -1)).(*model.Channel) + + // and a test user + user := model.User{ + Email: model.NewId() + "@mattermost.com", + Nickname: model.NewId(), + } + user = *store.Must(ss.User().Save(&user)).(*model.User) + + // log a join event + leaveTime := model.GetMillis() + joinTime := leaveTime - 10000 + store.Must(ss.ChannelMemberHistory().LogJoinEvent(user.Id, channel.Id, joinTime)) + + // case 1: both start and end before join time + channelMembers := store.Must(ss.ChannelMemberHistory().GetUsersInChannelDuring(joinTime-500, joinTime-100, channel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, channelMembers, 0) + + // case 2: start before join time, no leave time + channelMembers = store.Must(ss.ChannelMemberHistory().GetUsersInChannelDuring(joinTime-100, joinTime+100, channel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, channelMembers, 1) + assert.Equal(t, channel.Id, channelMembers[0].ChannelId) + assert.Equal(t, user.Id, channelMembers[0].UserId) + assert.Equal(t, user.Email, channelMembers[0].UserEmail) + assert.Equal(t, joinTime, channelMembers[0].JoinTime) + assert.Nil(t, channelMembers[0].LeaveTime) + + // case 3: start after join time, no leave time + channelMembers = store.Must(ss.ChannelMemberHistory().GetUsersInChannelDuring(joinTime+100, joinTime+500, channel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, channelMembers, 1) + assert.Equal(t, channel.Id, channelMembers[0].ChannelId) + assert.Equal(t, user.Id, channelMembers[0].UserId) + assert.Equal(t, user.Email, channelMembers[0].UserEmail) + assert.Equal(t, joinTime, channelMembers[0].JoinTime) + assert.Nil(t, channelMembers[0].LeaveTime) + + // add a leave time for the user + store.Must(ss.ChannelMemberHistory().LogLeaveEvent(user.Id, channel.Id, leaveTime)) + + // case 4: start after join time, end before leave time + channelMembers = store.Must(ss.ChannelMemberHistory().GetUsersInChannelDuring(joinTime+100, leaveTime-100, channel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, channelMembers, 1) + assert.Equal(t, channel.Id, channelMembers[0].ChannelId) + assert.Equal(t, user.Id, channelMembers[0].UserId) + assert.Equal(t, user.Email, channelMembers[0].UserEmail) + assert.Equal(t, joinTime, channelMembers[0].JoinTime) + assert.Equal(t, leaveTime, *channelMembers[0].LeaveTime) + + // case 5: start before join time, end after leave time + channelMembers = store.Must(ss.ChannelMemberHistory().GetUsersInChannelDuring(joinTime-100, leaveTime+100, channel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, channelMembers, 1) + assert.Equal(t, channel.Id, channelMembers[0].ChannelId) + assert.Equal(t, user.Id, channelMembers[0].UserId) + assert.Equal(t, user.Email, channelMembers[0].UserEmail) + assert.Equal(t, joinTime, channelMembers[0].JoinTime) + assert.Equal(t, leaveTime, *channelMembers[0].LeaveTime) + + // case 6: start and end after leave time + channelMembers = store.Must(ss.ChannelMemberHistory().GetUsersInChannelDuring(leaveTime+100, leaveTime+200, channel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, channelMembers, 0) +} + +func testPurgeHistoryBefore(t *testing.T, ss store.Store) { + // create a test channel + channel := model.Channel{ + TeamId: model.NewId(), + DisplayName: "Display " + model.NewId(), + Name: "zz" + model.NewId() + "b", + Type: model.CHANNEL_OPEN, + } + channel = *store.Must(ss.Channel().Save(&channel, -1)).(*model.Channel) + + // and two test users + user := model.User{ + Email: model.NewId() + "@mattermost.com", + Nickname: model.NewId(), + } + user = *store.Must(ss.User().Save(&user)).(*model.User) + + user2 := model.User{ + Email: model.NewId() + "@mattermost.com", + Nickname: model.NewId(), + } + user2 = *store.Must(ss.User().Save(&user2)).(*model.User) + + // user1 joins and leaves the channel + leaveTime := model.GetMillis() + joinTime := leaveTime - 10000 + store.Must(ss.ChannelMemberHistory().LogJoinEvent(user.Id, channel.Id, joinTime)) + store.Must(ss.ChannelMemberHistory().LogLeaveEvent(user.Id, channel.Id, leaveTime)) + + // user2 joins the channel but never leaves + store.Must(ss.ChannelMemberHistory().LogJoinEvent(user2.Id, channel.Id, joinTime)) + + // in between the join time and the leave time, both users were members of the channel + channelMembers := store.Must(ss.ChannelMemberHistory().GetUsersInChannelDuring(joinTime+10, leaveTime-10, channel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, channelMembers, 2) + + // but if we purge the old data, only the user that didn't leave is left + store.Must(ss.ChannelMemberHistory().PurgeHistoryBefore(leaveTime, channel.Id)) + channelMembers = store.Must(ss.ChannelMemberHistory().GetUsersInChannelDuring(joinTime+10, leaveTime-10, channel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, channelMembers, 1) + assert.Equal(t, user2.Id, channelMembers[0].UserId) +} diff --git a/store/storetest/compliance_store.go b/store/storetest/compliance_store.go index 514910f6f..c5bd60f05 100644 --- a/store/storetest/compliance_store.go +++ b/store/storetest/compliance_store.go @@ -9,12 +9,14 @@ import ( "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/store" + "github.com/stretchr/testify/assert" ) func TestComplianceStore(t *testing.T, ss store.Store) { t.Run("", func(t *testing.T) { testComplianceStore(t, ss) }) t.Run("ComplianceExport", func(t *testing.T) { testComplianceExport(t, ss) }) t.Run("ComplianceExportDirectMessages", func(t *testing.T) { testComplianceExportDirectMessages(t, ss) }) + t.Run("MessageExport", func(t *testing.T) { testComplianceMessageExport(t, ss) }) } func testComplianceStore(t *testing.T, ss store.Store) { @@ -316,3 +318,118 @@ func testComplianceExportDirectMessages(t *testing.T, ss store.Store) { } } } + +func testComplianceMessageExport(t *testing.T, ss store.Store) { + // get the starting number of message export entries + startTime := model.GetMillis() + var numMessageExports = 0 + if r1 := <-ss.Compliance().MessageExport(startTime-10, 10); r1.Err != nil { + t.Fatal(r1.Err) + } else { + messages := r1.Data.([]*model.MessageExport) + numMessageExports = len(messages) + } + + // need a team + team := &model.Team{ + DisplayName: "DisplayName", + Name: "zz" + model.NewId() + "b", + Email: model.NewId() + "@nowhere.com", + Type: model.TEAM_OPEN, + } + team = store.Must(ss.Team().Save(team)).(*model.Team) + + // and two users that are a part of that team + user1 := &model.User{ + Email: model.NewId(), + } + user1 = store.Must(ss.User().Save(user1)).(*model.User) + store.Must(ss.Team().SaveMember(&model.TeamMember{ + TeamId: team.Id, + UserId: user1.Id, + }, -1)) + + user2 := &model.User{ + Email: model.NewId(), + } + user2 = store.Must(ss.User().Save(user2)).(*model.User) + store.Must(ss.Team().SaveMember(&model.TeamMember{ + TeamId: team.Id, + UserId: user2.Id, + }, -1)) + + // need a public channel as well as a DM channel between the two users + channel := &model.Channel{ + TeamId: team.Id, + Name: model.NewId(), + DisplayName: "Channel2", + Type: model.CHANNEL_OPEN, + } + channel = store.Must(ss.Channel().Save(channel, -1)).(*model.Channel) + directMessageChannel := store.Must(ss.Channel().CreateDirectChannel(user1.Id, user2.Id)).(*model.Channel) + + // user1 posts twice in the public channel + post1 := &model.Post{ + ChannelId: channel.Id, + UserId: user1.Id, + CreateAt: startTime, + Message: "zz" + model.NewId() + "a", + } + post1 = store.Must(ss.Post().Save(post1)).(*model.Post) + + post2 := &model.Post{ + ChannelId: channel.Id, + UserId: user1.Id, + CreateAt: startTime + 10, + Message: "zz" + model.NewId() + "b", + } + post2 = store.Must(ss.Post().Save(post2)).(*model.Post) + + // user1 also sends a DM to user2 + post3 := &model.Post{ + ChannelId: directMessageChannel.Id, + UserId: user1.Id, + CreateAt: startTime + 20, + Message: "zz" + model.NewId() + "c", + } + post3 = store.Must(ss.Post().Save(post3)).(*model.Post) + + // fetch the message exports for all three posts that user1 sent + messageExportMap := map[string]model.MessageExport{} + if r1 := <-ss.Compliance().MessageExport(startTime-10, 10); r1.Err != nil { + t.Fatal(r1.Err) + } else { + messages := r1.Data.([]*model.MessageExport) + assert.Equal(t, numMessageExports+3, len(messages)) + + for _, v := range messages { + messageExportMap[*v.PostId] = *v + } + } + + // post1 was made by user1 in channel1 and team1 + assert.Equal(t, post1.Id, *messageExportMap[post1.Id].PostId) + assert.Equal(t, post1.CreateAt, *messageExportMap[post1.Id].PostCreateAt) + assert.Equal(t, post1.Message, *messageExportMap[post1.Id].PostMessage) + assert.Equal(t, channel.Id, *messageExportMap[post1.Id].ChannelId) + assert.Equal(t, channel.DisplayName, *messageExportMap[post1.Id].ChannelDisplayName) + assert.Equal(t, user1.Id, *messageExportMap[post1.Id].UserId) + assert.Equal(t, user1.Email, *messageExportMap[post1.Id].UserEmail) + + // post2 was made by user1 in channel1 and team1 + assert.Equal(t, post2.Id, *messageExportMap[post2.Id].PostId) + assert.Equal(t, post2.CreateAt, *messageExportMap[post2.Id].PostCreateAt) + assert.Equal(t, post2.Message, *messageExportMap[post2.Id].PostMessage) + assert.Equal(t, channel.Id, *messageExportMap[post2.Id].ChannelId) + assert.Equal(t, channel.DisplayName, *messageExportMap[post2.Id].ChannelDisplayName) + assert.Equal(t, user1.Id, *messageExportMap[post2.Id].UserId) + assert.Equal(t, user1.Email, *messageExportMap[post2.Id].UserEmail) + + // post3 is a DM between user1 and user2 + assert.Equal(t, post3.Id, *messageExportMap[post3.Id].PostId) + assert.Equal(t, post3.CreateAt, *messageExportMap[post3.Id].PostCreateAt) + assert.Equal(t, post3.Message, *messageExportMap[post3.Id].PostMessage) + assert.Equal(t, directMessageChannel.Id, *messageExportMap[post3.Id].ChannelId) + assert.Equal(t, user1.Id, *messageExportMap[post3.Id].UserId) + assert.Equal(t, user1.Email, *messageExportMap[post3.Id].UserEmail) +} diff --git a/store/storetest/mocks/ChannelMemberHistoryStore.go b/store/storetest/mocks/ChannelMemberHistoryStore.go new file mode 100644 index 000000000..4ac0967f9 --- /dev/null +++ b/store/storetest/mocks/ChannelMemberHistoryStore.go @@ -0,0 +1,77 @@ +// Code generated by mockery v1.0.0 + +// Regenerate this file using `make store-mocks`. + +package mocks + +import mock "github.com/stretchr/testify/mock" +import store "github.com/mattermost/mattermost-server/store" + +// ChannelMemberHistoryStore is an autogenerated mock type for the ChannelMemberHistoryStore type +type ChannelMemberHistoryStore struct { + mock.Mock +} + +// GetUsersInChannelDuring provides a mock function with given fields: startTime, endTime, channelId +func (_m *ChannelMemberHistoryStore) GetUsersInChannelDuring(startTime int64, endTime int64, channelId string) store.StoreChannel { + ret := _m.Called(startTime, endTime, channelId) + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func(int64, int64, string) store.StoreChannel); ok { + r0 = rf(startTime, endTime, channelId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} + +// LogJoinEvent provides a mock function with given fields: userId, channelId, joinTime +func (_m *ChannelMemberHistoryStore) LogJoinEvent(userId string, channelId string, joinTime int64) store.StoreChannel { + ret := _m.Called(userId, channelId, joinTime) + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func(string, string, int64) store.StoreChannel); ok { + r0 = rf(userId, channelId, joinTime) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} + +// LogLeaveEvent provides a mock function with given fields: userId, channelId, leaveTime +func (_m *ChannelMemberHistoryStore) LogLeaveEvent(userId string, channelId string, leaveTime int64) store.StoreChannel { + ret := _m.Called(userId, channelId, leaveTime) + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func(string, string, int64) store.StoreChannel); ok { + r0 = rf(userId, channelId, leaveTime) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} + +// PurgeHistoryBefore provides a mock function with given fields: time, channelId +func (_m *ChannelMemberHistoryStore) PurgeHistoryBefore(time int64, channelId string) store.StoreChannel { + ret := _m.Called(time, channelId) + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func(int64, string) store.StoreChannel); ok { + r0 = rf(time, channelId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} diff --git a/store/storetest/mocks/ComplianceStore.go b/store/storetest/mocks/ComplianceStore.go index b2208ead7..fb828cd4b 100644 --- a/store/storetest/mocks/ComplianceStore.go +++ b/store/storetest/mocks/ComplianceStore.go @@ -61,6 +61,22 @@ func (_m *ComplianceStore) GetAll(offset int, limit int) store.StoreChannel { return r0 } +// MessageExport provides a mock function with given fields: after, limit +func (_m *ComplianceStore) MessageExport(after int64, limit int) store.StoreChannel { + ret := _m.Called(after, limit) + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func(int64, int) store.StoreChannel); ok { + r0 = rf(after, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} + // Save provides a mock function with given fields: compliance func (_m *ComplianceStore) Save(compliance *model.Compliance) store.StoreChannel { ret := _m.Called(compliance) diff --git a/store/storetest/mocks/LayeredStoreDatabaseLayer.go b/store/storetest/mocks/LayeredStoreDatabaseLayer.go index c3b8bbb60..9c66c4aac 100644 --- a/store/storetest/mocks/LayeredStoreDatabaseLayer.go +++ b/store/storetest/mocks/LayeredStoreDatabaseLayer.go @@ -46,6 +46,22 @@ func (_m *LayeredStoreDatabaseLayer) Channel() store.ChannelStore { return r0 } +// ChannelMemberHistory provides a mock function with given fields: +func (_m *LayeredStoreDatabaseLayer) ChannelMemberHistory() store.ChannelMemberHistoryStore { + ret := _m.Called() + + var r0 store.ChannelMemberHistoryStore + if rf, ok := ret.Get(0).(func() store.ChannelMemberHistoryStore); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.ChannelMemberHistoryStore) + } + } + + return r0 +} + // Close provides a mock function with given fields: func (_m *LayeredStoreDatabaseLayer) Close() { _m.Called() diff --git a/store/storetest/mocks/Store.go b/store/storetest/mocks/Store.go index 85ed10d35..40b50a554 100644 --- a/store/storetest/mocks/Store.go +++ b/store/storetest/mocks/Store.go @@ -44,6 +44,22 @@ func (_m *Store) Channel() store.ChannelStore { return r0 } +// ChannelMemberHistory provides a mock function with given fields: +func (_m *Store) ChannelMemberHistory() store.ChannelMemberHistoryStore { + ret := _m.Called() + + var r0 store.ChannelMemberHistoryStore + if rf, ok := ret.Get(0).(func() store.ChannelMemberHistoryStore); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.ChannelMemberHistoryStore) + } + } + + return r0 +} + // Close provides a mock function with given fields: func (_m *Store) Close() { _m.Called() diff --git a/store/storetest/store.go b/store/storetest/store.go index 55545decb..367c5f441 100644 --- a/store/storetest/store.go +++ b/store/storetest/store.go @@ -19,29 +19,30 @@ func NewStoreChannel(result store.StoreResult) store.StoreChannel { // Store can be used to provide mock stores for testing. type Store struct { - TeamStore mocks.TeamStore - ChannelStore mocks.ChannelStore - PostStore mocks.PostStore - UserStore mocks.UserStore - AuditStore mocks.AuditStore - ClusterDiscoveryStore mocks.ClusterDiscoveryStore - ComplianceStore mocks.ComplianceStore - SessionStore mocks.SessionStore - OAuthStore mocks.OAuthStore - SystemStore mocks.SystemStore - WebhookStore mocks.WebhookStore - CommandStore mocks.CommandStore - CommandWebhookStore mocks.CommandWebhookStore - PreferenceStore mocks.PreferenceStore - LicenseStore mocks.LicenseStore - TokenStore mocks.TokenStore - EmojiStore mocks.EmojiStore - StatusStore mocks.StatusStore - FileInfoStore mocks.FileInfoStore - ReactionStore mocks.ReactionStore - JobStore mocks.JobStore - UserAccessTokenStore mocks.UserAccessTokenStore - PluginStore mocks.PluginStore + TeamStore mocks.TeamStore + ChannelStore mocks.ChannelStore + PostStore mocks.PostStore + UserStore mocks.UserStore + AuditStore mocks.AuditStore + ClusterDiscoveryStore mocks.ClusterDiscoveryStore + ComplianceStore mocks.ComplianceStore + SessionStore mocks.SessionStore + OAuthStore mocks.OAuthStore + SystemStore mocks.SystemStore + WebhookStore mocks.WebhookStore + CommandStore mocks.CommandStore + CommandWebhookStore mocks.CommandWebhookStore + PreferenceStore mocks.PreferenceStore + LicenseStore mocks.LicenseStore + TokenStore mocks.TokenStore + EmojiStore mocks.EmojiStore + StatusStore mocks.StatusStore + FileInfoStore mocks.FileInfoStore + ReactionStore mocks.ReactionStore + JobStore mocks.JobStore + UserAccessTokenStore mocks.UserAccessTokenStore + PluginStore mocks.PluginStore + ChannelMemberHistoryStore mocks.ChannelMemberHistoryStore } func (s *Store) Team() store.TeamStore { return &s.TeamStore } @@ -67,12 +68,15 @@ func (s *Store) Reaction() store.ReactionStore { return &s.React func (s *Store) Job() store.JobStore { return &s.JobStore } func (s *Store) UserAccessToken() store.UserAccessTokenStore { return &s.UserAccessTokenStore } func (s *Store) Plugin() store.PluginStore { return &s.PluginStore } -func (s *Store) MarkSystemRanUnitTests() { /* do nothing */ } -func (s *Store) Close() { /* do nothing */ } -func (s *Store) DropAllTables() { /* do nothing */ } -func (s *Store) TotalMasterDbConnections() int { return 1 } -func (s *Store) TotalReadDbConnections() int { return 1 } -func (s *Store) TotalSearchDbConnections() int { return 1 } +func (s *Store) ChannelMemberHistory() store.ChannelMemberHistoryStore { + return &s.ChannelMemberHistoryStore +} +func (s *Store) MarkSystemRanUnitTests() { /* do nothing */ } +func (s *Store) Close() { /* do nothing */ } +func (s *Store) DropAllTables() { /* do nothing */ } +func (s *Store) TotalMasterDbConnections() int { return 1 } +func (s *Store) TotalReadDbConnections() int { return 1 } +func (s *Store) TotalSearchDbConnections() int { return 1 } func (s *Store) AssertExpectations(t mock.TestingT) bool { return mock.AssertExpectationsForObjects(t, @@ -98,6 +102,7 @@ func (s *Store) AssertExpectations(t mock.TestingT) bool { &s.ReactionStore, &s.JobStore, &s.UserAccessTokenStore, + &s.ChannelMemberHistoryStore, &s.PluginStore, ) } diff --git a/utils/file_backend.go b/utils/file_backend.go index 3469a63fb..c7a6c5591 100644 --- a/utils/file_backend.go +++ b/utils/file_backend.go @@ -13,6 +13,7 @@ type FileBackend interface { TestConnection() *model.AppError ReadFile(path string) ([]byte, *model.AppError) + CopyFile(oldPath, newPath string) *model.AppError MoveFile(oldPath, newPath string) *model.AppError WriteFile(f []byte, path string) *model.AppError RemoveFile(path string) *model.AppError diff --git a/utils/file_backend_local.go b/utils/file_backend_local.go index b5e67f8f0..1367ccc1e 100644 --- a/utils/file_backend_local.go +++ b/utils/file_backend_local.go @@ -40,6 +40,13 @@ func (b *LocalFileBackend) ReadFile(path string) ([]byte, *model.AppError) { } } +func (b *LocalFileBackend) CopyFile(oldPath, newPath string) *model.AppError { + if err := CopyFile(filepath.Join(b.directory, oldPath), filepath.Join(b.directory, newPath)); err != nil { + return model.NewAppError("copyFile", "api.file.move_file.rename.app_error", nil, err.Error(), http.StatusInternalServerError) + } + return nil +} + func (b *LocalFileBackend) MoveFile(oldPath, newPath string) *model.AppError { if err := os.MkdirAll(filepath.Dir(filepath.Join(b.directory, newPath)), 0774); err != nil { return model.NewAppError("moveFile", "api.file.move_file.rename.app_error", nil, err.Error(), http.StatusInternalServerError) diff --git a/utils/file_backend_s3.go b/utils/file_backend_s3.go index ed88dc70c..5512b64dc 100644 --- a/utils/file_backend_s3.go +++ b/utils/file_backend_s3.go @@ -95,6 +95,23 @@ func (b *S3FileBackend) ReadFile(path string) ([]byte, *model.AppError) { } } +func (b *S3FileBackend) CopyFile(oldPath, newPath string) *model.AppError { + s3Clnt, err := b.s3New() + if err != nil { + return model.NewAppError("copyFile", "api.file.write_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError) + } + + source := s3.NewSourceInfo(b.bucket, oldPath, nil) + destination, err := s3.NewDestinationInfo(b.bucket, newPath, nil, s3CopyMetadata(b.encrypt)) + if err != nil { + return model.NewAppError("copyFile", "api.file.write_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError) + } + if err = s3Clnt.CopyObject(destination, source); err != nil { + return model.NewAppError("copyFile", "api.file.move_file.copy_within_s3.app_error", nil, err.Error(), http.StatusInternalServerError) + } + return nil +} + func (b *S3FileBackend) MoveFile(oldPath, newPath string) *model.AppError { s3Clnt, err := b.s3New() if err != nil { @@ -107,7 +124,7 @@ func (b *S3FileBackend) MoveFile(oldPath, newPath string) *model.AppError { return model.NewAppError("moveFile", "api.file.write_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError) } if err = s3Clnt.CopyObject(destination, source); err != nil { - return model.NewAppError("moveFile", "api.file.move_file.delete_from_s3.app_error", nil, err.Error(), http.StatusInternalServerError) + return model.NewAppError("moveFile", "api.file.move_file.copy_within_s3.app_error", nil, err.Error(), http.StatusInternalServerError) } if err = s3Clnt.RemoveObject(b.bucket, oldPath); err != nil { return model.NewAppError("moveFile", "api.file.move_file.delete_from_s3.app_error", nil, err.Error(), http.StatusInternalServerError) diff --git a/utils/file_backend_test.go b/utils/file_backend_test.go index 0989f783c..098f86bbd 100644 --- a/utils/file_backend_test.go +++ b/utils/file_backend_test.go @@ -86,6 +86,26 @@ func (s *FileBackendTestSuite) TestReadWriteFile() { s.EqualValues(readString, "test") } +func (s *FileBackendTestSuite) TestCopyFile() { + b := []byte("test") + path1 := "tests/" + model.NewId() + path2 := "tests/" + model.NewId() + + err := s.backend.WriteFile(b, path1) + s.Nil(err) + defer s.backend.RemoveFile(path1) + + err = s.backend.CopyFile(path1, path2) + s.Nil(err) + defer s.backend.RemoveFile(path2) + + _, err = s.backend.ReadFile(path1) + s.Nil(err) + + _, err = s.backend.ReadFile(path2) + s.Nil(err) +} + func (s *FileBackendTestSuite) TestMoveFile() { b := []byte("test") path1 := "tests/" + model.NewId() diff --git a/utils/license.go b/utils/license.go index f4775d338..54bad45b5 100644 --- a/utils/license.go +++ b/utils/license.go @@ -228,6 +228,7 @@ func getClientLicense(l *model.License) map[string]string { props["Company"] = l.Customer.Company props["PhoneNumber"] = l.Customer.PhoneNumber props["EmailNotificationContents"] = strconv.FormatBool(*l.Features.EmailNotificationContents) + props["MessageExport"] = strconv.FormatBool(*l.Features.MessageExport) } return props -- cgit v1.2.3-1-g7c22