diff options
author | George Goldberg <george@gberg.me> | 2017-09-15 17:35:55 +0100 |
---|---|---|
committer | Christopher Speller <crspeller@gmail.com> | 2017-09-15 09:35:55 -0700 |
commit | 8195c80aa12136838ff4491fac989e0b946382b1 (patch) | |
tree | da24729af5acbd3349c75923d346cfa7aa9ad95c /store | |
parent | 2628022275ef64fde95545abe4634b4bd7177844 (diff) | |
download | chat-8195c80aa12136838ff4491fac989e0b946382b1.tar.gz chat-8195c80aa12136838ff4491fac989e0b946382b1.tar.bz2 chat-8195c80aa12136838ff4491fac989e0b946382b1.zip |
PLT-7639: Batch delete methods for data retention. (#7444)
Diffstat (limited to 'store')
-rw-r--r-- | store/layered_store.go | 6 | ||||
-rw-r--r-- | store/layered_store_supplier.go | 1 | ||||
-rw-r--r-- | store/local_cache_supplier_reactions.go | 6 | ||||
-rw-r--r-- | store/redis_supplier.go | 5 | ||||
-rw-r--r-- | store/sql_audit_store.go | 36 | ||||
-rw-r--r-- | store/sql_audit_store_test.go | 29 | ||||
-rw-r--r-- | store/sql_file_info_store.go | 33 | ||||
-rw-r--r-- | store/sql_file_info_store_test.go | 41 | ||||
-rw-r--r-- | store/sql_post_store.go | 33 | ||||
-rw-r--r-- | store/sql_post_store_test.go | 39 | ||||
-rw-r--r-- | store/sql_reaction_store_test.go | 55 | ||||
-rw-r--r-- | store/sql_supplier_reactions.go | 28 | ||||
-rw-r--r-- | store/store.go | 4 |
13 files changed, 316 insertions, 0 deletions
diff --git a/store/layered_store.go b/store/layered_store.go index ac0713f57..0c6a01125 100644 --- a/store/layered_store.go +++ b/store/layered_store.go @@ -199,3 +199,9 @@ func (s *LayeredReactionStore) DeleteAllWithEmojiName(emojiName string) StoreCha return supplier.ReactionDeleteAllWithEmojiName(s.TmpContext, emojiName) }) } + +func (s *LayeredReactionStore) PermanentDeleteBatch(endTime int64, limit int64) StoreChannel { + return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult { + return supplier.ReactionPermanentDeleteBatch(s.TmpContext, endTime, limit) + }) +} diff --git a/store/layered_store_supplier.go b/store/layered_store_supplier.go index 35668c717..841b75a32 100644 --- a/store/layered_store_supplier.go +++ b/store/layered_store_supplier.go @@ -30,4 +30,5 @@ type LayeredStoreSupplier interface { ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult + ReactionPermanentDeleteBatch(ctx context.Context, endTime int64, limit int64, hints ...LayeredStoreHint) *LayeredStoreSupplierResult } diff --git a/store/local_cache_supplier_reactions.go b/store/local_cache_supplier_reactions.go index a67cff2e4..be32ab77e 100644 --- a/store/local_cache_supplier_reactions.go +++ b/store/local_cache_supplier_reactions.go @@ -45,3 +45,9 @@ func (s *LocalCacheSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, s.doClearCacheCluster(s.reactionCache) return s.Next().ReactionDeleteAllWithEmojiName(ctx, emojiName, hints...) } + +func (s *LocalCacheSupplier) ReactionPermanentDeleteBatch(ctx context.Context, endTime int64, limit int64, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + // Don't bother to clear the cache as the posts will be gone anyway and the reactions being deleted will + // expire from the cache in due course. + return s.Next().ReactionPermanentDeleteBatch(ctx, endTime, limit) +} diff --git a/store/redis_supplier.go b/store/redis_supplier.go index 195d2c496..167bafd6f 100644 --- a/store/redis_supplier.go +++ b/store/redis_supplier.go @@ -131,3 +131,8 @@ func (s *RedisSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emoj // Ignoring this. It's probably OK to have the emoji slowly expire from Redis. return s.Next().ReactionDeleteAllWithEmojiName(ctx, emojiName, hints...) } + +func (s *RedisSupplier) ReactionPermanentDeleteBatch(ctx context.Context, endTime int64, limit int64, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + // Ignoring this. It's probably OK to have the emoji slowly expire from Redis. + return s.Next().ReactionPermanentDeleteBatch(ctx, endTime, limit, hints...) +} diff --git a/store/sql_audit_store.go b/store/sql_audit_store.go index d1ba65be1..1eb4e4819 100644 --- a/store/sql_audit_store.go +++ b/store/sql_audit_store.go @@ -4,7 +4,10 @@ package store import ( + "net/http" + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/utils" ) type SqlAuditStore struct { @@ -109,3 +112,36 @@ func (s SqlAuditStore) PermanentDeleteByUser(userId string) StoreChannel { return storeChannel } + +func (s SqlAuditStore) PermanentDeleteBatch(endTime int64, limit int64) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var query string + if *utils.Cfg.SqlSettings.DriverName == "postgres" { + query = "DELETE from Audits WHERE Id = any (array (SELECT Id FROM Audits WHERE CreateAt < :EndTime LIMIT :Limit))" + } else { + query = "DELETE from Audits WHERE CreateAt < :EndTime LIMIT :Limit" + } + + sqlResult, err := s.GetMaster().Exec(query, map[string]interface{}{"EndTime": endTime, "Limit": limit}) + if err != nil { + result.Err = model.NewAppError("SqlAuditStore.PermanentDeleteBatch", "store.sql_audit.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + } else { + rowsAffected, err1 := sqlResult.RowsAffected() + if err1 != nil { + result.Err = model.NewAppError("SqlAuditStore.PermanentDeleteBatch", "store.sql_audit.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + result.Data = int64(0) + } else { + result.Data = rowsAffected + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} diff --git a/store/sql_audit_store_test.go b/store/sql_audit_store_test.go index 2e83bf563..8db5b5506 100644 --- a/store/sql_audit_store_test.go +++ b/store/sql_audit_store_test.go @@ -58,3 +58,32 @@ func TestSqlAuditStore(t *testing.T) { t.Fatal(r2.Err) } } + +func TestAuditStorePermanentDeleteBatch(t *testing.T) { + Setup() + + a1 := &model.Audit{UserId: model.NewId(), IpAddress: "ipaddress", Action: "Action"} + Must(store.Audit().Save(a1)) + time.Sleep(10 * time.Millisecond) + a2 := &model.Audit{UserId: a1.UserId, IpAddress: "ipaddress", Action: "Action"} + Must(store.Audit().Save(a2)) + time.Sleep(10 * time.Millisecond) + cutoff := model.GetMillis() + time.Sleep(10 * time.Millisecond) + a3 := &model.Audit{UserId: a1.UserId, IpAddress: "ipaddress", Action: "Action"} + Must(store.Audit().Save(a3)) + + if r := <-store.Audit().Get(a1.UserId, 0, 100); len(r.Data.(model.Audits)) != 3 { + t.Fatal("Expected 3 audits. Got ", len(r.Data.(model.Audits))) + } + + Must(store.Audit().PermanentDeleteBatch(cutoff, 1000000)) + + if r := <-store.Audit().Get(a1.UserId, 0, 100); len(r.Data.(model.Audits)) != 1 { + t.Fatal("Expected 1 audit. Got ", len(r.Data.(model.Audits))) + } + + if r2 := <-store.Audit().PermanentDeleteByUser(a1.UserId); r2.Err != nil { + t.Fatal(r2.Err) + } +} diff --git a/store/sql_file_info_store.go b/store/sql_file_info_store.go index eab83992f..4cd574b13 100644 --- a/store/sql_file_info_store.go +++ b/store/sql_file_info_store.go @@ -281,3 +281,36 @@ func (fs SqlFileInfoStore) PermanentDelete(fileId string) StoreChannel { return storeChannel } + +func (s SqlFileInfoStore) PermanentDeleteBatch(endTime int64, limit int64) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var query string + if *utils.Cfg.SqlSettings.DriverName == "postgres" { + query = "DELETE from FileInfo WHERE Id = any (array (SELECT Id FROM FileInfo WHERE CreateAt < :EndTime LIMIT :Limit))" + } else { + query = "DELETE from FileInfo WHERE CreateAt < :EndTime LIMIT :Limit" + } + + sqlResult, err := s.GetMaster().Exec(query, map[string]interface{}{"EndTime": endTime, "Limit": limit}) + if err != nil { + result.Err = model.NewAppError("SqlFileInfoStore.PermanentDeleteBatch", "store.sql_file_info.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + } else { + rowsAffected, err1 := sqlResult.RowsAffected() + if err1 != nil { + result.Err = model.NewAppError("SqlFileInfoStore.PermanentDeleteBatch", "store.sql_file_info.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + result.Data = int64(0) + } else { + result.Data = rowsAffected + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} diff --git a/store/sql_file_info_store_test.go b/store/sql_file_info_store_test.go index b62083136..c08bed7d4 100644 --- a/store/sql_file_info_store_test.go +++ b/store/sql_file_info_store_test.go @@ -256,3 +256,44 @@ func TestFileInfoPermanentDelete(t *testing.T) { t.Fatal(result.Err) } } + +func TestFileInfoPermanentDeleteBatch(t *testing.T) { + Setup() + + postId := model.NewId() + + Must(store.FileInfo().Save(&model.FileInfo{ + PostId: postId, + CreatorId: model.NewId(), + Path: "file.txt", + CreateAt: 1000, + })) + + Must(store.FileInfo().Save(&model.FileInfo{ + PostId: postId, + CreatorId: model.NewId(), + Path: "file.txt", + CreateAt: 1200, + })) + + Must(store.FileInfo().Save(&model.FileInfo{ + PostId: postId, + CreatorId: model.NewId(), + Path: "file.txt", + CreateAt: 2000, + })) + + if result := <-store.FileInfo().GetForPost(postId, true, false); result.Err != nil { + t.Fatal(result.Err) + } else if len(result.Data.([]*model.FileInfo)) != 3 { + t.Fatal("Expected 3 fileInfos") + } + + Must(store.FileInfo().PermanentDeleteBatch(1500, 1000)) + + if result := <-store.FileInfo().GetForPost(postId, true, false); result.Err != nil { + t.Fatal(result.Err) + } else if len(result.Data.([]*model.FileInfo)) != 1 { + t.Fatal("Expected 3 fileInfos") + } +} diff --git a/store/sql_post_store.go b/store/sql_post_store.go index 2aa862218..b300f9a59 100644 --- a/store/sql_post_store.go +++ b/store/sql_post_store.go @@ -1356,3 +1356,36 @@ func (s SqlPostStore) GetPostsBatchForIndexing(startTime int64, limit int) Store return storeChannel } + +func (s SqlPostStore) PermanentDeleteBatch(endTime int64, limit int64) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var query string + if *utils.Cfg.SqlSettings.DriverName == "postgres" { + query = "DELETE from Posts WHERE Id = any (array (SELECT Id FROM Posts WHERE CreateAt < :EndTime LIMIT :Limit))" + } else { + query = "DELETE from Posts WHERE CreateAt < :EndTime LIMIT :Limit" + } + + sqlResult, err := s.GetMaster().Exec(query, map[string]interface{}{"EndTime": endTime, "Limit": limit}) + if err != nil { + result.Err = model.NewAppError("SqlPostStore.PermanentDeleteBatch", "store.sql_post.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + } else { + rowsAffected, err1 := sqlResult.RowsAffected() + if err1 != nil { + result.Err = model.NewAppError("SqlPostStore.PermanentDeleteBatch", "store.sql_post.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + result.Data = int64(0) + } else { + result.Data = rowsAffected + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} diff --git a/store/sql_post_store_test.go b/store/sql_post_store_test.go index 79892b5f5..304fb9f8a 100644 --- a/store/sql_post_store_test.go +++ b/store/sql_post_store_test.go @@ -1661,3 +1661,42 @@ func TestPostStoreGetPostsBatchForIndexing(t *testing.T) { } } } + +func TestPostStorePermanentDeleteBatch(t *testing.T) { + Setup() + + o1 := &model.Post{} + o1.ChannelId = model.NewId() + o1.UserId = model.NewId() + o1.Message = "zz" + model.NewId() + "AAAAAAAAAAA" + o1.CreateAt = 1000 + o1 = (<-store.Post().Save(o1)).Data.(*model.Post) + + o2 := &model.Post{} + o2.ChannelId = model.NewId() + o2.UserId = model.NewId() + o2.Message = "zz" + model.NewId() + "AAAAAAAAAAA" + o2.CreateAt = 1000 + o2 = (<-store.Post().Save(o2)).Data.(*model.Post) + + o3 := &model.Post{} + o3.ChannelId = model.NewId() + o3.UserId = model.NewId() + o3.Message = "zz" + model.NewId() + "AAAAAAAAAAA" + o3.CreateAt = 100000 + o3 = (<-store.Post().Save(o3)).Data.(*model.Post) + + Must(store.Post().PermanentDeleteBatch(2000, 1000)) + + if p := <-store.Post().Get(o1.Id); p.Err == nil { + t.Fatalf("Should have not found post 1 after purge") + } + + if p := <-store.Post().Get(o2.Id); p.Err == nil { + t.Fatalf("Should have not found post 2 after purge") + } + + if p := <-store.Post().Get(o3.Id); p.Err != nil { + t.Fatalf("Should have found post 3 after purge") + } +} diff --git a/store/sql_reaction_store_test.go b/store/sql_reaction_store_test.go index ac2590ea4..ebc09dc9b 100644 --- a/store/sql_reaction_store_test.go +++ b/store/sql_reaction_store_test.go @@ -294,3 +294,58 @@ func TestReactionDeleteAllWithEmojiName(t *testing.T) { t.Fatal("post shouldn't have reactions any more") } } + +func TestReactionStorePermanentDeleteBatch(t *testing.T) { + Setup() + + post := Must(store.Post().Save(&model.Post{ + ChannelId: model.NewId(), + UserId: model.NewId(), + })).(*model.Post) + + reactions := []*model.Reaction{ + { + UserId: model.NewId(), + PostId: post.Id, + EmojiName: "sad", + CreateAt: 1000, + }, + { + UserId: model.NewId(), + PostId: post.Id, + EmojiName: "sad", + CreateAt: 1500, + }, + { + UserId: model.NewId(), + PostId: post.Id, + EmojiName: "sad", + CreateAt: 2000, + }, + { + UserId: model.NewId(), + PostId: post.Id, + EmojiName: "sad", + CreateAt: 2000, + }, + } + + // Need to hang on to a reaction to delete later in order to clear the cache, as "allowFromCache" isn't honoured any more. + var lastReaction *model.Reaction + for _, reaction := range reactions { + lastReaction = Must(store.Reaction().Save(reaction)).(*model.Reaction) + } + + if returned := Must(store.Reaction().GetForPost(post.Id, false)).([]*model.Reaction); len(returned) != 4 { + t.Fatal("expected 4 reactions") + } + + Must(store.Reaction().PermanentDeleteBatch(1800, 1000)) + + // This is to force a clear of the cache. + Must(store.Reaction().Delete(lastReaction)) + + if returned := Must(store.Reaction().GetForPost(post.Id, false)).([]*model.Reaction); len(returned) != 1 { + t.Fatalf("expected 1 reaction. Got: %v", len(returned)) + } +} diff --git a/store/sql_supplier_reactions.go b/store/sql_supplier_reactions.go index 2293a2f88..94a980455 100644 --- a/store/sql_supplier_reactions.go +++ b/store/sql_supplier_reactions.go @@ -5,8 +5,10 @@ package store import ( "context" + "net/http" l4g "github.com/alecthomas/log4go" + "github.com/mattermost/gorp" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" @@ -140,6 +142,32 @@ func (s *SqlSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emojiN return result } +func (s *SqlSupplier) ReactionPermanentDeleteBatch(ctx context.Context, endTime int64, limit int64, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + result := NewSupplierResult() + + var query string + if *utils.Cfg.SqlSettings.DriverName == "postgres" { + query = "DELETE from Reactions WHERE Id = any (array (SELECT Id FROM Reactions WHERE CreateAt < :EndTime LIMIT :Limit))" + } else { + query = "DELETE from Reactions WHERE CreateAt < :EndTime LIMIT :Limit" + } + + sqlResult, err := s.GetMaster().Exec(query, map[string]interface{}{"EndTime": endTime, "Limit": limit}) + if err != nil { + result.Err = model.NewAppError("SqlReactionStore.PermanentDeleteBatch", "store.sql_reaction.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + } else { + rowsAffected, err1 := sqlResult.RowsAffected() + if err1 != nil { + result.Err = model.NewAppError("SqlReactionStore.PermanentDeleteBatch", "store.sql_reaction.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + result.Data = int64(0) + } else { + result.Data = rowsAffected + } + } + + return result +} + func saveReactionAndUpdatePost(transaction *gorp.Transaction, reaction *model.Reaction) error { if err := transaction.Insert(reaction); err != nil { return err diff --git a/store/store.go b/store/store.go index 49d395432..f7962fa4f 100644 --- a/store/store.go +++ b/store/store.go @@ -171,6 +171,7 @@ type PostStore interface { Overwrite(post *model.Post) StoreChannel GetPostsByIds(postIds []string) StoreChannel GetPostsBatchForIndexing(startTime int64, limit int) StoreChannel + PermanentDeleteBatch(endTime int64, limit int64) StoreChannel } type UserStore interface { @@ -242,6 +243,7 @@ type AuditStore interface { Save(audit *model.Audit) StoreChannel Get(user_id string, offset int, limit int) StoreChannel PermanentDeleteByUser(userId string) StoreChannel + PermanentDeleteBatch(endTime int64, limit int64) StoreChannel } type ClusterDiscoveryStore interface { @@ -387,6 +389,7 @@ type FileInfoStore interface { AttachToPost(fileId string, postId string) StoreChannel DeleteForPost(postId string) StoreChannel PermanentDelete(fileId string) StoreChannel + PermanentDeleteBatch(endTime int64, limit int64) StoreChannel } type ReactionStore interface { @@ -394,6 +397,7 @@ type ReactionStore interface { Delete(reaction *model.Reaction) StoreChannel GetForPost(postId string, allowFromCache bool) StoreChannel DeleteAllWithEmojiName(emojiName string) StoreChannel + PermanentDeleteBatch(endTime int64, limit int64) StoreChannel } type JobStore interface { |