diff options
author | Jonathan <jonfritz@gmail.com> | 2017-11-30 09:07:04 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-30 09:07:04 -0500 |
commit | 375c0632fab03e3fb54865e320585888499c076d (patch) | |
tree | ce6cba679337a82370d5a730c428bef21a9964bf /app | |
parent | d0d9ba4a7e43301697d1c9f495930e0c0179fdc3 (diff) | |
download | chat-375c0632fab03e3fb54865e320585888499c076d.tar.gz chat-375c0632fab03e3fb54865e320585888499c076d.tar.bz2 chat-375c0632fab03e3fb54865e320585888499c076d.zip |
PLT-7503: Create Message Export Scheduled Task and CLI Command (#7612)
* Created message export scheduled task
* Added CLI command to immediately kick off an export job
* Added email addresses for users joining and leaving the channel to the export
* Added support for both MySQL and PostgreSQL
* Fixing gofmt error
* Added a new ChannelMemberHistory store and associated tests
* Updating the ChannelMemberHistory channel as users create/join/leave channels
* Added user email to the message export object so it can be included in the actiance export xml
* Don't fail to log a leave event if a corresponding join event wasn't logged
* Adding copyright notices
* Adding message export settings to daily diagnostics report
* Added System Console integration for message export
* Cleaned up TODOs
* Made batch size configurable
* Added export from timestamp to CLI command
* Made ChannelMemberHistory table updates best effort
* Added a context-based timeout option to the message export CLI
* Minor PR updates/improvements
* Removed unnecessary fields from MessageExport object to reduce query overhead
* Removed JSON functions from the message export query in an effort to optimize performance
* Changed the way that channel member history queries and purges work to better account for edge cases
* Fixing a test I missed with the last refactor
* Added file copy functionality to file backend, improved config validation, added default config values
* Fixed file copy tests
* More concise use of the testing libraries
* Fixed context leak error
* Changed default export path to correctly place an 'export' directory under the 'data' directory
* Can't delete records from a read replica
* Fixed copy file tests
* Start job workers when license is applied, if configured to do so
* Suggestions from the PR
* Moar unit tests
* Fixed test imports
Diffstat (limited to 'app')
-rw-r--r-- | app/app.go | 19 | ||||
-rw-r--r-- | app/channel.go | 36 | ||||
-rw-r--r-- | app/channel_test.go | 166 | ||||
-rw-r--r-- | app/diagnostics.go | 8 | ||||
-rw-r--r-- | app/diagnostics_test.go | 1 | ||||
-rw-r--r-- | app/license.go | 10 |
6 files changed, 235 insertions, 5 deletions
diff --git a/app/app.go b/app/app.go index 7bd4c561b..fd313c9c9 100644 --- a/app/app.go +++ b/app/app.go @@ -48,6 +48,7 @@ type App struct { Elasticsearch einterfaces.ElasticsearchInterface Emoji einterfaces.EmojiInterface Ldap einterfaces.LdapInterface + MessageExport einterfaces.MessageExportInterface Metrics einterfaces.MetricsInterface Mfa einterfaces.MfaInterface Saml einterfaces.SamlInterface @@ -198,6 +199,12 @@ func RegisterJobsDataRetentionJobInterface(f func(*App) ejobs.DataRetentionJobIn jobsDataRetentionJobInterface = f } +var jobsMessageExportJobInterface func(*App) ejobs.MessageExportJobInterface + +func RegisterJobsMessageExportJobInterface(f func(*App) ejobs.MessageExportJobInterface) { + jobsMessageExportJobInterface = f +} + var jobsElasticsearchAggregatorInterface func(*App) ejobs.ElasticsearchAggregatorInterface func RegisterJobsElasticsearchAggregatorInterface(f func(*App) ejobs.ElasticsearchAggregatorInterface) { @@ -222,6 +229,12 @@ func RegisterLdapInterface(f func(*App) einterfaces.LdapInterface) { ldapInterface = f } +var messageExportInterface func(*App) einterfaces.MessageExportInterface + +func RegisterMessageExportInterface(f func(*App) einterfaces.MessageExportInterface) { + messageExportInterface = f +} + var metricsInterface func(*App) einterfaces.MetricsInterface func RegisterMetricsInterface(f func(*App) einterfaces.MetricsInterface) { @@ -267,6 +280,9 @@ func (a *App) initEnterprise() { } }) } + if messageExportInterface != nil { + a.MessageExport = messageExportInterface(a) + } if metricsInterface != nil { a.Metrics = metricsInterface(a) } @@ -289,6 +305,9 @@ func (a *App) initJobs() { if jobsDataRetentionJobInterface != nil { a.Jobs.DataRetentionJob = jobsDataRetentionJobInterface(a) } + if jobsMessageExportJobInterface != nil { + a.Jobs.MessageExportJob = jobsMessageExportJobInterface(a) + } if jobsElasticsearchAggregatorInterface != nil { a.Jobs.ElasticsearchAggregator = jobsElasticsearchAggregatorInterface(a) } diff --git a/app/channel.go b/app/channel.go index 16c5dd084..caaacea06 100644 --- a/app/channel.go +++ b/app/channel.go @@ -49,12 +49,19 @@ func (a *App) JoinDefaultChannels(teamId string, user *model.User, channelRole s } else { townSquare := result.Data.(*model.Channel) - cm := &model.ChannelMember{ChannelId: townSquare.Id, UserId: user.Id, - Roles: channelRole, NotifyProps: model.GetDefaultChannelNotifyProps()} + cm := &model.ChannelMember{ + ChannelId: townSquare.Id, + UserId: user.Id, + Roles: channelRole, + NotifyProps: model.GetDefaultChannelNotifyProps(), + } if cmResult := <-a.Srv.Store.Channel().SaveMember(cm); cmResult.Err != nil { err = cmResult.Err } + if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(user.Id, townSquare.Id, model.GetMillis()); result.Err != nil { + l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err) + } if requestor == nil { if err := a.postJoinChannelMessage(user, townSquare); err != nil { @@ -74,12 +81,19 @@ func (a *App) JoinDefaultChannels(teamId string, user *model.User, channelRole s } else { offTopic := result.Data.(*model.Channel) - cm := &model.ChannelMember{ChannelId: offTopic.Id, UserId: user.Id, - Roles: channelRole, NotifyProps: model.GetDefaultChannelNotifyProps()} + cm := &model.ChannelMember{ + ChannelId: offTopic.Id, + UserId: user.Id, + Roles: channelRole, + NotifyProps: model.GetDefaultChannelNotifyProps(), + } if cmResult := <-a.Srv.Store.Channel().SaveMember(cm); cmResult.Err != nil { err = cmResult.Err } + if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(user.Id, offTopic.Id, model.GetMillis()); result.Err != nil { + l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err) + } if requestor == nil { if err := a.postJoinChannelMessage(user, offTopic); err != nil { @@ -158,6 +172,9 @@ func (a *App) CreateChannel(channel *model.Channel, addMember bool) (*model.Chan if cmresult := <-a.Srv.Store.Channel().SaveMember(cm); cmresult.Err != nil { return nil, cmresult.Err } + if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(channel.CreatorId, sc.Id, model.GetMillis()); result.Err != nil { + l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err) + } a.InvalidateCacheForUser(channel.CreatorId) } @@ -302,6 +319,9 @@ func (a *App) createGroupChannel(userIds []string, creatorId string) (*model.Cha if result := <-a.Srv.Store.Channel().SaveMember(cm); result.Err != nil { return nil, result.Err } + if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(user.Id, channel.Id, model.GetMillis()); result.Err != nil { + l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err) + } } return channel, nil @@ -520,9 +540,12 @@ func (a *App) addUserToChannel(user *model.User, channel *model.Channel, teamMem l4g.Error("Failed to add member user_id=%v channel_id=%v err=%v", user.Id, channel.Id, result.Err) return nil, model.NewAppError("AddUserToChannel", "api.channel.add_user.to.channel.failed.app_error", nil, "", http.StatusInternalServerError) } - a.WaitForChannelMembership(channel.Id, user.Id) + if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(user.Id, channel.Id, model.GetMillis()); result.Err != nil { + l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err) + } + a.InvalidateCacheForUser(user.Id) a.InvalidateCacheForChannelMembers(channel.Id) @@ -1069,6 +1092,9 @@ func (a *App) removeUserFromChannel(userIdToRemove string, removerUserId string, if cmresult := <-a.Srv.Store.Channel().RemoveMember(channel.Id, userIdToRemove); cmresult.Err != nil { return cmresult.Err } + if cmhResult := <-a.Srv.Store.ChannelMemberHistory().LogLeaveEvent(userIdToRemove, channel.Id, model.GetMillis()); cmhResult.Err != nil { + return cmhResult.Err + } a.InvalidateCacheForUser(userIdToRemove) a.InvalidateCacheForChannelMembers(channel.Id) diff --git a/app/channel_test.go b/app/channel_test.go index 374b20657..d44af467d 100644 --- a/app/channel_test.go +++ b/app/channel_test.go @@ -1,9 +1,14 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + package app import ( "testing" "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store" + "github.com/stretchr/testify/assert" ) func TestPermanentDeleteChannel(t *testing.T) { @@ -104,3 +109,164 @@ func TestMoveChannel(t *testing.T) { t.Fatal(err) } } + +func TestJoinDefaultChannelsTownSquare(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + // figure out the initial number of users in town square + townSquareChannelId := store.Must(th.App.Srv.Store.Channel().GetByName(th.BasicTeam.Id, "town-square", true)).(*model.Channel).Id + initialNumTownSquareUsers := len(store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, townSquareChannelId)).([]*model.ChannelMemberHistory)) + + // create a new user that joins the default channels + user := th.CreateUser() + th.App.JoinDefaultChannels(th.BasicTeam.Id, user, model.CHANNEL_USER_ROLE_ID, "") + + // there should be a ChannelMemberHistory record for the user + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, townSquareChannelId)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, initialNumTownSquareUsers+1) + + found := false + for _, history := range histories { + if user.Id == history.UserId && townSquareChannelId == history.ChannelId { + found = true + break + } + } + assert.True(t, found) +} + +func TestJoinDefaultChannelsOffTopic(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + // figure out the initial number of users in off-topic + offTopicChannelId := store.Must(th.App.Srv.Store.Channel().GetByName(th.BasicTeam.Id, "off-topic", true)).(*model.Channel).Id + initialNumTownSquareUsers := len(store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, offTopicChannelId)).([]*model.ChannelMemberHistory)) + + // create a new user that joins the default channels + user := th.CreateUser() + th.App.JoinDefaultChannels(th.BasicTeam.Id, user, model.CHANNEL_USER_ROLE_ID, "") + + // there should be a ChannelMemberHistory record for the user + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, offTopicChannelId)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, initialNumTownSquareUsers+1) + + found := false + for _, history := range histories { + if user.Id == history.UserId && offTopicChannelId == history.ChannelId { + found = true + break + } + } + assert.True(t, found) +} + +func TestCreateChannelPublic(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + // creates a public channel and adds basic user to it + publicChannel := th.createChannel(th.BasicTeam, model.CHANNEL_OPEN) + + // there should be a ChannelMemberHistory record for the user + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, publicChannel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, 1) + assert.Equal(t, th.BasicUser.Id, histories[0].UserId) + assert.Equal(t, publicChannel.Id, histories[0].ChannelId) +} + +func TestCreateChannelPrivate(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + // creates a private channel and adds basic user to it + privateChannel := th.createChannel(th.BasicTeam, model.CHANNEL_PRIVATE) + + // there should be a ChannelMemberHistory record for the user + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, privateChannel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, 1) + assert.Equal(t, th.BasicUser.Id, histories[0].UserId) + assert.Equal(t, privateChannel.Id, histories[0].ChannelId) +} + +func TestCreateGroupChannel(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + user1 := th.CreateUser() + user2 := th.CreateUser() + + groupUserIds := make([]string, 0) + groupUserIds = append(groupUserIds, user1.Id) + groupUserIds = append(groupUserIds, user2.Id) + groupUserIds = append(groupUserIds, th.BasicUser.Id) + + if channel, err := th.App.CreateGroupChannel(groupUserIds, th.BasicUser.Id); err != nil { + t.Fatal("Failed to create group channel. Error: " + err.Message) + } else { + // there should be a ChannelMemberHistory record for each user + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, channel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, 3) + + channelMemberHistoryUserIds := make([]string, 0) + for _, history := range histories { + assert.Equal(t, channel.Id, history.ChannelId) + channelMemberHistoryUserIds = append(channelMemberHistoryUserIds, history.UserId) + } + assert.Equal(t, groupUserIds, channelMemberHistoryUserIds) + } +} + +func TestAddUserToChannel(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + // create a user and add it to a channel + user := th.CreateUser() + if _, err := th.App.AddTeamMember(th.BasicTeam.Id, user.Id); err != nil { + t.Fatal("Failed to add user to team. Error: " + err.Message) + } + + groupUserIds := make([]string, 0) + groupUserIds = append(groupUserIds, th.BasicUser.Id) + groupUserIds = append(groupUserIds, user.Id) + + channel := th.createChannel(th.BasicTeam, model.CHANNEL_OPEN) + if _, err := th.App.AddUserToChannel(user, channel); err != nil { + t.Fatal("Failed to add user to channel. Error: " + err.Message) + } + + // there should be a ChannelMemberHistory record for the user + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, channel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, 2) + channelMemberHistoryUserIds := make([]string, 0) + for _, history := range histories { + assert.Equal(t, channel.Id, history.ChannelId) + channelMemberHistoryUserIds = append(channelMemberHistoryUserIds, history.UserId) + } + assert.Equal(t, groupUserIds, channelMemberHistoryUserIds) +} + +func TestRemoveUserFromChannel(t *testing.T) { + th := Setup().InitBasic() + defer th.TearDown() + + // a user creates a channel + publicChannel := th.createChannel(th.BasicTeam, model.CHANNEL_OPEN) + histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, publicChannel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, 1) + assert.Equal(t, th.BasicUser.Id, histories[0].UserId) + assert.Equal(t, publicChannel.Id, histories[0].ChannelId) + assert.Nil(t, histories[0].LeaveTime) + + // the user leaves that channel + if err := th.App.LeaveChannel(publicChannel.Id, th.BasicUser.Id); err != nil { + t.Fatal("Failed to remove user from channel. Error: " + err.Message) + } + histories = store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, publicChannel.Id)).([]*model.ChannelMemberHistory) + assert.Len(t, histories, 1) + assert.Equal(t, th.BasicUser.Id, histories[0].UserId) + assert.Equal(t, publicChannel.Id, histories[0].ChannelId) + assert.NotNil(t, histories[0].LeaveTime) +} diff --git a/app/diagnostics.go b/app/diagnostics.go index 513cf11f5..3f37337ab 100644 --- a/app/diagnostics.go +++ b/app/diagnostics.go @@ -44,6 +44,7 @@ const ( TRACK_CONFIG_ELASTICSEARCH = "config_elasticsearch" TRACK_CONFIG_PLUGIN = "config_plugin" TRACK_CONFIG_DATA_RETENTION = "config_data_retention" + TRACK_CONFIG_MESSAGE_EXPORT = "config_message_export" TRACK_ACTIVITY = "activity" TRACK_LICENSE = "license" @@ -470,6 +471,13 @@ func (a *App) trackConfig() { "file_retention_days": *cfg.DataRetentionSettings.FileRetentionDays, "deletion_job_start_time": *cfg.DataRetentionSettings.DeletionJobStartTime, }) + + SendDiagnostic(TRACK_CONFIG_MESSAGE_EXPORT, map[string]interface{}{ + "enable_message_export": *cfg.MessageExportSettings.EnableExport, + "daily_run_time": *cfg.MessageExportSettings.DailyRunTime, + "default_export_from_timestamp": *cfg.MessageExportSettings.ExportFromTimestamp, + "batch_size": *cfg.MessageExportSettings.BatchSize, + }) } func trackLicense() { diff --git a/app/diagnostics_test.go b/app/diagnostics_test.go index 25bc75265..9b884fd43 100644 --- a/app/diagnostics_test.go +++ b/app/diagnostics_test.go @@ -135,6 +135,7 @@ func TestDiagnostics(t *testing.T) { TRACK_CONFIG_PLUGIN, TRACK_ACTIVITY, TRACK_SERVER, + TRACK_CONFIG_MESSAGE_EXPORT, TRACK_PLUGINS, } { if !strings.Contains(info, item) { diff --git a/app/license.go b/app/license.go index 18836c571..cacc71524 100644 --- a/app/license.go +++ b/app/license.go @@ -89,6 +89,16 @@ func (a *App) SaveLicense(licenseBytes []byte) (*model.License, *model.AppError) a.ReloadConfig() a.InvalidateAllCaches() + // start job server if necessary - this handles the edge case where a license file is uploaded, but the job server + // doesn't start until the server is restarted, which prevents the 'run job now' buttons in system console from + // functioning as expected + if *a.Config().JobSettings.RunJobs { + a.Jobs.StartWorkers() + } + if *a.Config().JobSettings.RunScheduler { + a.Jobs.StartSchedulers() + } + return license, nil } |