diff options
author | Joram Wilander <jwawilander@gmail.com> | 2016-07-12 09:36:27 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-07-12 09:36:27 -0400 |
commit | ad343a0f4ad175053f7d0da12a0587bcbb396d1c (patch) | |
tree | 8e1be00202a1d3a037ec75879538eb0ba1f25c01 /api | |
parent | 06eacf30b97aacf6544552448635b7f078d2c90b (diff) | |
download | chat-ad343a0f4ad175053f7d0da12a0587bcbb396d1c.tar.gz chat-ad343a0f4ad175053f7d0da12a0587bcbb396d1c.tar.bz2 chat-ad343a0f4ad175053f7d0da12a0587bcbb396d1c.zip |
Added infrastructure for basic WebSocket API (#3432)
Diffstat (limited to 'api')
-rw-r--r-- | api/api.go | 4 | ||||
-rw-r--r-- | api/apitestlib.go | 4 | ||||
-rw-r--r-- | api/channel.go | 10 | ||||
-rw-r--r-- | api/command_expand_collapse.go | 2 | ||||
-rw-r--r-- | api/general.go | 6 | ||||
-rw-r--r-- | api/post.go | 12 | ||||
-rw-r--r-- | api/team.go | 4 | ||||
-rw-r--r-- | api/user.go | 23 | ||||
-rw-r--r-- | api/user_test.go | 82 | ||||
-rw-r--r-- | api/web_conn.go | 35 | ||||
-rw-r--r-- | api/web_hub.go | 21 | ||||
-rw-r--r-- | api/web_socket_test.go | 103 | ||||
-rw-r--r-- | api/webhook_test.go | 10 | ||||
-rw-r--r-- | api/websocket.go (renamed from api/web_socket.go) | 2 | ||||
-rw-r--r-- | api/websocket_handler.go | 42 | ||||
-rw-r--r-- | api/websocket_router.go | 59 | ||||
-rw-r--r-- | api/websocket_test.go | 144 |
17 files changed, 408 insertions, 155 deletions
diff --git a/api/api.go b/api/api.go index 37172260b..4cc11168c 100644 --- a/api/api.go +++ b/api/api.go @@ -48,6 +48,8 @@ type Routes struct { Public *mux.Router // 'api/v3/public' Emoji *mux.Router // 'api/v3/emoji' + + WebSocket *WebSocketRouter // websocket api } var BaseRoutes *Routes @@ -76,6 +78,8 @@ func InitApi() { BaseRoutes.Public = BaseRoutes.ApiRoot.PathPrefix("/public").Subrouter() BaseRoutes.Emoji = BaseRoutes.ApiRoot.PathPrefix("/emoji").Subrouter() + BaseRoutes.WebSocket = NewWebSocketRouter() + InitUser() InitTeam() InitChannel() diff --git a/api/apitestlib.go b/api/apitestlib.go index c6796a56c..ea0de4716 100644 --- a/api/apitestlib.go +++ b/api/apitestlib.go @@ -103,6 +103,10 @@ func (me *TestHelper) CreateClient() *model.Client { return model.NewClient("http://localhost" + utils.Cfg.ServiceSettings.ListenAddress) } +func (me *TestHelper) CreateWebSocketClient() (*model.WebSocketClient, *model.AppError) { + return model.NewWebSocketClient("ws://localhost"+utils.Cfg.ServiceSettings.ListenAddress, me.BasicClient.AuthToken) +} + func (me *TestHelper) CreateTeam(client *model.Client) *model.Team { id := model.NewId() team := &model.Team{ diff --git a/api/channel.go b/api/channel.go index 2e4eb2bb5..2a5b6f8b0 100644 --- a/api/channel.go +++ b/api/channel.go @@ -158,7 +158,7 @@ func CreateDirectChannel(userId string, otherUserId string) (*model.Channel, *mo return nil, result.Err } } else { - message := model.NewMessage("", channel.Id, userId, model.ACTION_DIRECT_ADDED) + message := model.NewWebSocketEvent("", channel.Id, userId, model.WEBSOCKET_EVENT_DIRECT_ADDED) message.Add("teammate_id", otherUserId) go Publish(message) @@ -587,7 +587,7 @@ func AddUserToChannel(user *model.User, channel *model.Channel) (*model.ChannelM go func() { InvalidateCacheForUser(user.Id) - message := model.NewMessage(channel.TeamId, channel.Id, user.Id, model.ACTION_USER_ADDED) + message := model.NewWebSocketEvent(channel.TeamId, channel.Id, user.Id, model.WEBSOCKET_EVENT_USER_ADDED) go Publish(message) }() @@ -772,7 +772,7 @@ func deleteChannel(c *Context, w http.ResponseWriter, r *http.Request) { go func() { InvalidateCacheForChannel(channel.Id) - message := model.NewMessage(c.TeamId, channel.Id, c.Session.UserId, model.ACTION_CHANNEL_DELETED) + message := model.NewWebSocketEvent(c.TeamId, channel.Id, c.Session.UserId, model.WEBSOCKET_EVENT_CHANNEL_DELETED) go Publish(message) post := &model.Post{ @@ -806,7 +806,7 @@ func updateLastViewedAt(c *Context, w http.ResponseWriter, r *http.Request) { Srv.Store.Preference().Save(&model.Preferences{preference}) - message := model.NewMessage(c.TeamId, id, c.Session.UserId, model.ACTION_CHANNEL_VIEWED) + message := model.NewWebSocketEvent(c.TeamId, id, c.Session.UserId, model.WEBSOCKET_EVENT_CHANNEL_VIEWED) message.Add("channel_id", id) go Publish(message) @@ -1032,7 +1032,7 @@ func RemoveUserFromChannel(userIdToRemove string, removerUserId string, channel InvalidateCacheForUser(userIdToRemove) - message := model.NewMessage(channel.TeamId, channel.Id, userIdToRemove, model.ACTION_USER_REMOVED) + message := model.NewWebSocketEvent(channel.TeamId, channel.Id, userIdToRemove, model.WEBSOCKET_EVENT_USER_REMOVED) message.Add("remover_id", removerUserId) go Publish(message) diff --git a/api/command_expand_collapse.go b/api/command_expand_collapse.go index 6015e8bc1..c56845a9e 100644 --- a/api/command_expand_collapse.go +++ b/api/command_expand_collapse.go @@ -69,7 +69,7 @@ func setCollapsePreference(c *Context, value string) *model.CommandResponse { return &model.CommandResponse{Text: c.T("api.command_expand_collapse.fail.app_error"), ResponseType: model.COMMAND_RESPONSE_TYPE_EPHEMERAL} } - socketMessage := model.NewMessage("", "", c.Session.UserId, model.ACTION_PREFERENCE_CHANGED) + socketMessage := model.NewWebSocketEvent("", "", c.Session.UserId, model.WEBSOCKET_EVENT_PREFERENCE_CHANGED) socketMessage.Add("preference", pref.ToJson()) go Publish(socketMessage) diff --git a/api/general.go b/api/general.go index fdf884d6b..4124d2e95 100644 --- a/api/general.go +++ b/api/general.go @@ -21,6 +21,7 @@ func InitGeneral() { BaseRoutes.General.Handle("/log_client", ApiAppHandler(logClient)).Methods("POST") BaseRoutes.General.Handle("/ping", ApiAppHandler(ping)).Methods("GET") + BaseRoutes.WebSocket.Handle("ping", ApiWebSocketHandler(webSocketPing)) } func getClientConfig(c *Context, w http.ResponseWriter, r *http.Request) { @@ -71,3 +72,8 @@ func ping(c *Context, w http.ResponseWriter, r *http.Request) { m["node_id"] = "" w.Write([]byte(model.MapToJson(m))) } + +func webSocketPing(req *model.WebSocketRequest, responseData map[string]interface{}) *model.AppError { + responseData["text"] = "pong" + return nil +} diff --git a/api/post.go b/api/post.go index 20363c80e..60ac11a2b 100644 --- a/api/post.go +++ b/api/post.go @@ -329,7 +329,7 @@ func makeDirectChannelVisible(teamId string, channelId string) { if saveResult := <-Srv.Store.Preference().Save(&model.Preferences{*preference}); saveResult.Err != nil { l4g.Error(utils.T("api.post.make_direct_channel_visible.save_pref.error"), member.UserId, otherUserId, saveResult.Err.Message) } else { - message := model.NewMessage(teamId, channelId, member.UserId, model.ACTION_PREFERENCE_CHANGED) + message := model.NewWebSocketEvent(teamId, channelId, member.UserId, model.WEBSOCKET_EVENT_PREFERENCE_CHANGED) message.Add("preference", preference.ToJson()) go Publish(message) @@ -344,7 +344,7 @@ func makeDirectChannelVisible(teamId string, channelId string) { if updateResult := <-Srv.Store.Preference().Save(&model.Preferences{preference}); updateResult.Err != nil { l4g.Error(utils.T("api.post.make_direct_channel_visible.update_pref.error"), member.UserId, otherUserId, updateResult.Err.Message) } else { - message := model.NewMessage(teamId, channelId, member.UserId, model.ACTION_PREFERENCE_CHANGED) + message := model.NewWebSocketEvent(teamId, channelId, member.UserId, model.WEBSOCKET_EVENT_PREFERENCE_CHANGED) message.Add("preference", preference.ToJson()) go Publish(message) @@ -627,7 +627,7 @@ func sendNotifications(c *Context, post *model.Post, team *model.Team, channel * } } - message := model.NewMessage(c.TeamId, post.ChannelId, post.UserId, model.ACTION_POSTED) + message := model.NewWebSocketEvent(c.TeamId, post.ChannelId, post.UserId, model.WEBSOCKET_EVENT_POSTED) message.Add("post", post.ToJson()) message.Add("channel_type", channel.Type) message.Add("channel_display_name", channel.DisplayName) @@ -905,7 +905,7 @@ func SendEphemeralPost(teamId, userId string, post *model.Post) { post.Filenames = []string{} } - message := model.NewMessage(teamId, post.ChannelId, userId, model.ACTION_EPHEMERAL_MESSAGE) + message := model.NewWebSocketEvent(teamId, post.ChannelId, userId, model.WEBSOCKET_EVENT_EPHEMERAL_MESSAGE) message.Add("post", post.ToJson()) go Publish(message) @@ -967,7 +967,7 @@ func updatePost(c *Context, w http.ResponseWriter, r *http.Request) { } else { rpost := result.Data.(*model.Post) - message := model.NewMessage(c.TeamId, rpost.ChannelId, c.Session.UserId, model.ACTION_POST_EDITED) + message := model.NewWebSocketEvent(c.TeamId, rpost.ChannelId, c.Session.UserId, model.WEBSOCKET_EVENT_POST_EDITED) message.Add("post", rpost.ToJson()) go Publish(message) @@ -1231,7 +1231,7 @@ func deletePost(c *Context, w http.ResponseWriter, r *http.Request) { return } - message := model.NewMessage(c.TeamId, post.ChannelId, c.Session.UserId, model.ACTION_POST_DELETED) + message := model.NewWebSocketEvent(c.TeamId, post.ChannelId, c.Session.UserId, model.WEBSOCKET_EVENT_POST_DELETED) message.Add("post", post.ToJson()) go Publish(message) diff --git a/api/team.go b/api/team.go index 7f8a421ce..702ea96d1 100644 --- a/api/team.go +++ b/api/team.go @@ -298,7 +298,7 @@ func JoinUserToTeam(team *model.Team, user *model.User) *model.AppError { InvalidateCacheForUser(user.Id) // This message goes to every channel, so the channelId is irrelevant - go Publish(model.NewMessage("", "", user.Id, model.ACTION_NEW_USER)) + go Publish(model.NewWebSocketEvent("", "", user.Id, model.WEBSOCKET_EVENT_NEW_USER)) return nil } @@ -348,7 +348,7 @@ func LeaveTeam(team *model.Team, user *model.User) *model.AppError { RemoveAllSessionsForUserId(user.Id) InvalidateCacheForUser(user.Id) - go Publish(model.NewMessage(team.Id, "", user.Id, model.ACTION_LEAVE_TEAM)) + go Publish(model.NewWebSocketEvent(team.Id, "", user.Id, model.WEBSOCKET_EVENT_LEAVE_TEAM)) return nil } diff --git a/api/user.go b/api/user.go index 84906eece..3666bfd7a 100644 --- a/api/user.go +++ b/api/user.go @@ -75,6 +75,8 @@ func InitUser() { BaseRoutes.Root.Handle("/login/sso/saml", AppHandlerIndependent(loginWithSaml)).Methods("GET") BaseRoutes.Root.Handle("/login/sso/saml", AppHandlerIndependent(completeSaml)).Methods("POST") + + BaseRoutes.WebSocket.Handle("user_typing", ApiWebSocketHandler(userTyping)) } func createUser(c *Context, w http.ResponseWriter, r *http.Request) { @@ -269,7 +271,7 @@ func CreateUser(user *model.User) (*model.User, *model.AppError) { ruser.Sanitize(map[string]bool{}) // This message goes to every channel, so the channelId is irrelevant - go Publish(model.NewMessage("", "", ruser.Id, model.ACTION_NEW_USER)) + go Publish(model.NewWebSocketEvent("", "", ruser.Id, model.WEBSOCKET_EVENT_NEW_USER)) return ruser, nil } @@ -2540,3 +2542,22 @@ func completeSaml(c *Context, w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, GetProtocol(r)+"://"+r.Host, http.StatusFound) } } + +func userTyping(req *model.WebSocketRequest, responseData map[string]interface{}) *model.AppError { + var ok bool + var channelId string + if channelId, ok = req.Data["channel_id"].(string); !ok || len(channelId) != 26 { + return NewInvalidWebSocketParamError(req.Action, "channel_id") + } + + var parentId string + if parentId, ok = req.Data["parent_id"].(string); !ok { + parentId = "" + } + + event := model.NewWebSocketEvent("", channelId, req.Session.UserId, model.WEBSOCKET_EVENT_TYPING) + event.Add("parent_id", parentId) + go Publish(event) + + return nil +} diff --git a/api/user_test.go b/api/user_test.go index 7dabc8e9b..12390135e 100644 --- a/api/user_test.go +++ b/api/user_test.go @@ -1719,3 +1719,85 @@ func TestCheckMfa(t *testing.T) { // need to add more test cases when enterprise bits can be loaded into tests } + +func TestUserTyping(t *testing.T) { + th := Setup().InitBasic() + Client := th.BasicClient + WebSocketClient, err := th.CreateWebSocketClient() + if err != nil { + t.Fatal(err) + } + defer WebSocketClient.Close() + WebSocketClient.Listen() + + WebSocketClient.UserTyping("", "") + time.Sleep(300 * time.Millisecond) + if resp := <-WebSocketClient.ResponseChannel; resp.Error.Id != "api.websocket_handler.invalid_param.app_error" { + t.Fatal("should have been invalid param response") + } + + th.LoginBasic2() + Client.Must(Client.JoinChannel(th.BasicChannel.Id)) + + WebSocketClient2, err2 := th.CreateWebSocketClient() + if err2 != nil { + t.Fatal(err2) + } + defer WebSocketClient2.Close() + WebSocketClient2.Listen() + + WebSocketClient.UserTyping(th.BasicChannel.Id, "") + + time.Sleep(300 * time.Millisecond) + + stop := make(chan bool) + eventHit := false + + go func() { + for { + select { + case resp := <-WebSocketClient2.EventChannel: + if resp.Event == model.WEBSOCKET_EVENT_TYPING && resp.UserId == th.BasicUser.Id { + eventHit = true + } + case <-stop: + return + } + } + }() + + time.Sleep(300 * time.Millisecond) + + stop <- true + + if !eventHit { + t.Fatal("did not receive typing event") + } + + WebSocketClient.UserTyping(th.BasicChannel.Id, "someparentid") + + time.Sleep(300 * time.Millisecond) + + eventHit = false + + go func() { + for { + select { + case resp := <-WebSocketClient2.EventChannel: + if resp.Event == model.WEBSOCKET_EVENT_TYPING && resp.Data["parent_id"] == "someparentid" { + eventHit = true + } + case <-stop: + return + } + } + }() + + time.Sleep(300 * time.Millisecond) + + stop <- true + + if !eventHit { + t.Fatal("did not receive typing event") + } +} diff --git a/api/web_conn.go b/api/web_conn.go index 971cc8cb8..3f4414c5e 100644 --- a/api/web_conn.go +++ b/api/web_conn.go @@ -6,10 +6,12 @@ package api import ( "time" - l4g "github.com/alecthomas/log4go" - "github.com/gorilla/websocket" "github.com/mattermost/platform/model" "github.com/mattermost/platform/utils" + + l4g "github.com/alecthomas/log4go" + "github.com/gorilla/websocket" + goi18n "github.com/nicksnyder/go-i18n/i18n" ) const ( @@ -22,32 +24,36 @@ const ( type WebConn struct { WebSocket *websocket.Conn - Send chan *model.Message + Send chan model.WebSocketMessage SessionToken string UserId string + T goi18n.TranslateFunc + Locale string hasPermissionsToChannel map[string]bool hasPermissionsToTeam map[string]bool } -func NewWebConn(ws *websocket.Conn, userId string, sessionToken string) *WebConn { +func NewWebConn(c *Context, ws *websocket.Conn) *WebConn { go func() { - achan := Srv.Store.User().UpdateUserAndSessionActivity(userId, sessionToken, model.GetMillis()) - pchan := Srv.Store.User().UpdateLastPingAt(userId, model.GetMillis()) + achan := Srv.Store.User().UpdateUserAndSessionActivity(c.Session.UserId, c.Session.Token, model.GetMillis()) + pchan := Srv.Store.User().UpdateLastPingAt(c.Session.UserId, model.GetMillis()) if result := <-achan; result.Err != nil { - l4g.Error(utils.T("api.web_conn.new_web_conn.last_activity.error"), userId, sessionToken, result.Err) + l4g.Error(utils.T("api.web_conn.new_web_conn.last_activity.error"), c.Session.UserId, c.Session.Token, result.Err) } if result := <-pchan; result.Err != nil { - l4g.Error(utils.T("api.web_conn.new_web_conn.last_ping.error"), userId, result.Err) + l4g.Error(utils.T("api.web_conn.new_web_conn.last_ping.error"), c.Session.UserId, result.Err) } }() return &WebConn{ - Send: make(chan *model.Message, 64), + Send: make(chan model.WebSocketMessage, 64), WebSocket: ws, - UserId: userId, - SessionToken: sessionToken, + UserId: c.Session.UserId, + SessionToken: c.Session.Token, + T: c.T, + Locale: c.Locale, hasPermissionsToChannel: make(map[string]bool), hasPermissionsToTeam: make(map[string]bool), } @@ -73,12 +79,11 @@ func (c *WebConn) readPump() { }) for { - var msg model.Message - if err := c.WebSocket.ReadJSON(&msg); err != nil { + var req model.WebSocketRequest + if err := c.WebSocket.ReadJSON(&req); err != nil { return } else { - msg.UserId = c.UserId - go Publish(&msg) + BaseRoutes.WebSocket.ServeWebSocket(c, &req) } } } diff --git a/api/web_hub.go b/api/web_hub.go index 133bb162a..db0f31bb7 100644 --- a/api/web_hub.go +++ b/api/web_hub.go @@ -13,7 +13,7 @@ type Hub struct { connections map[*WebConn]bool register chan *WebConn unregister chan *WebConn - broadcast chan *model.Message + broadcast chan *model.WebSocketEvent stop chan string invalidateUser chan string invalidateChannel chan string @@ -23,13 +23,13 @@ var hub = &Hub{ register: make(chan *WebConn), unregister: make(chan *WebConn), connections: make(map[*WebConn]bool), - broadcast: make(chan *model.Message), + broadcast: make(chan *model.WebSocketEvent), stop: make(chan string), invalidateUser: make(chan string), invalidateChannel: make(chan string), } -func Publish(message *model.Message) { +func Publish(message *model.WebSocketEvent) { hub.Broadcast(message) } @@ -49,7 +49,7 @@ func (h *Hub) Unregister(webConn *WebConn) { h.unregister <- webConn } -func (h *Hub) Broadcast(message *model.Message) { +func (h *Hub) Broadcast(message *model.WebSocketEvent) { if message != nil { h.broadcast <- message } @@ -108,11 +108,10 @@ func (h *Hub) Start() { }() } -func shouldSendEvent(webCon *WebConn, msg *model.Message) bool { - +func shouldSendEvent(webCon *WebConn, msg *model.WebSocketEvent) bool { if webCon.UserId == msg.UserId { // Don't need to tell the user they are typing - if msg.Action == model.ACTION_TYPING { + if msg.Event == model.WEBSOCKET_EVENT_TYPING { return false } @@ -127,11 +126,11 @@ func shouldSendEvent(webCon *WebConn, msg *model.Message) bool { } } else { // Don't share a user's view or preference events with other users - if msg.Action == model.ACTION_CHANNEL_VIEWED { + if msg.Event == model.WEBSOCKET_EVENT_CHANNEL_VIEWED { return false - } else if msg.Action == model.ACTION_PREFERENCE_CHANGED { + } else if msg.Event == model.WEBSOCKET_EVENT_PREFERENCE_CHANGED { return false - } else if msg.Action == model.ACTION_EPHEMERAL_MESSAGE { + } else if msg.Event == model.WEBSOCKET_EVENT_EPHEMERAL_MESSAGE { // For now, ephemeral messages are sent directly to individual users return false } @@ -146,7 +145,7 @@ func shouldSendEvent(webCon *WebConn, msg *model.Message) bool { } // Only report events to users who are in the channel for the event execept deleted events - if len(msg.ChannelId) > 0 && msg.Action != model.ACTION_CHANNEL_DELETED { + if len(msg.ChannelId) > 0 && msg.Event != model.WEBSOCKET_EVENT_CHANNEL_DELETED { allowed := webCon.HasPermissionsToChannel(msg.ChannelId) if !allowed { diff --git a/api/web_socket_test.go b/api/web_socket_test.go deleted file mode 100644 index 7cb04e93e..000000000 --- a/api/web_socket_test.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright (c) 2015 Mattermost, Inc. All Rights Reserved. -// See License.txt for license information. - -package api - -import ( - "github.com/gorilla/websocket" - "github.com/mattermost/platform/model" - "github.com/mattermost/platform/utils" - "net/http" - "testing" - "time" -) - -func TestSocket(t *testing.T) { - th := Setup().InitBasic() - Client := th.BasicClient - team := th.BasicTeam - channel1 := th.BasicChannel - channel2 := th.CreateChannel(Client, team) - Client.Must(Client.AddChannelMember(channel1.Id, th.BasicUser2.Id)) - - url := "ws://localhost" + utils.Cfg.ServiceSettings.ListenAddress + model.API_URL_SUFFIX + "/users/websocket" - - header1 := http.Header{} - header1.Set(model.HEADER_AUTH, "BEARER "+Client.AuthToken) - - c1, _, err := websocket.DefaultDialer.Dial(url, header1) - if err != nil { - t.Fatal(err) - } - - th.LoginBasic2() - - header2 := http.Header{} - header2.Set(model.HEADER_AUTH, "BEARER "+Client.AuthToken) - - c2, _, err := websocket.DefaultDialer.Dial(url, header2) - if err != nil { - t.Fatal(err) - } - - time.Sleep(300 * time.Millisecond) - - var rmsg model.Message - - // Test sending message without a channelId - m := model.NewMessage(team.Id, "", "", model.ACTION_TYPING) - m.Add("RootId", model.NewId()) - m.Add("ParentId", model.NewId()) - - c1.WriteJSON(m) - - if err := c2.ReadJSON(&rmsg); err != nil { - t.Fatal(err) - } - - t.Log(rmsg.ToJson()) - - if team.Id != rmsg.TeamId { - t.Fatal("Ids do not match") - } - - if m.Props["RootId"] != rmsg.Props["RootId"] { - t.Fatal("Ids do not match") - } - - // Test sending messsage to Channel you have access to - m = model.NewMessage(team.Id, channel1.Id, "", model.ACTION_TYPING) - m.Add("RootId", model.NewId()) - m.Add("ParentId", model.NewId()) - - c1.WriteJSON(m) - - if err := c2.ReadJSON(&rmsg); err != nil { - t.Fatal(err) - } - - if team.Id != rmsg.TeamId { - t.Fatal("Ids do not match") - } - - if m.Props["RootId"] != rmsg.Props["RootId"] { - t.Fatal("Ids do not match") - } - - // Test sending message to Channel you *do not* have access too - m = model.NewMessage("", channel2.Id, "", model.ACTION_TYPING) - m.Add("RootId", model.NewId()) - m.Add("ParentId", model.NewId()) - - c1.WriteJSON(m) - - go func() { - if err := c2.ReadJSON(&rmsg); err != nil { - t.Fatal(err) - } - - t.Fatal(err) - }() - - time.Sleep(2 * time.Second) -} diff --git a/api/webhook_test.go b/api/webhook_test.go index 95e4d92be..f2375fb19 100644 --- a/api/webhook_test.go +++ b/api/webhook_test.go @@ -8,7 +8,6 @@ import ( "github.com/mattermost/platform/model" "github.com/mattermost/platform/utils" "testing" - "time" ) func TestCreateIncomingHook(t *testing.T) { @@ -629,12 +628,3 @@ func TestIncomingWebhooks(t *testing.T) { t.Fatal("should have failed - webhooks turned off") } } - -func TestZZWebSocketTearDown(t *testing.T) { - // *IMPORTANT* - Kind of hacky - // This should be the last function in any test file - // that calls Setup() - // Should be in the last file too sorted by name - time.Sleep(2 * time.Second) - TearDown() -} diff --git a/api/web_socket.go b/api/websocket.go index 4c4a56c52..fe9fa0bf9 100644 --- a/api/web_socket.go +++ b/api/websocket.go @@ -33,7 +33,7 @@ func connect(c *Context, w http.ResponseWriter, r *http.Request) { return } - wc := NewWebConn(ws, c.Session.UserId, c.Session.Token) + wc := NewWebConn(c, ws) hub.Register(wc) go wc.writePump() wc.readPump() diff --git a/api/websocket_handler.go b/api/websocket_handler.go new file mode 100644 index 000000000..8abec6715 --- /dev/null +++ b/api/websocket_handler.go @@ -0,0 +1,42 @@ +// Copyright (c) 2016 Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package api + +import ( + l4g "github.com/alecthomas/log4go" + + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/utils" +) + +func ApiWebSocketHandler(wh func(*model.WebSocketRequest, map[string]interface{}) *model.AppError) *webSocketHandler { + return &webSocketHandler{wh} +} + +type webSocketHandler struct { + handlerFunc func(*model.WebSocketRequest, map[string]interface{}) *model.AppError +} + +func (wh *webSocketHandler) ServeWebSocket(conn *WebConn, r *model.WebSocketRequest) { + l4g.Debug("/api/v3/users/websocket:%s", r.Action) + + r.Session = *GetSession(conn.SessionToken) + r.T = conn.T + r.Locale = conn.Locale + + data := make(map[string]interface{}) + + if err := wh.handlerFunc(r, data); err != nil { + l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, r.Session.UserId, err.SystemMessage(utils.T), err.DetailedError) + err.DetailedError = "" + conn.Send <- model.NewWebSocketError(r.Seq, err) + return + } + + conn.Send <- model.NewWebSocketResponse(model.STATUS_OK, r.Seq, data) +} + +func NewInvalidWebSocketParamError(action string, name string) *model.AppError { + return model.NewLocAppError("/api/v3/users/websocket:"+action, "api.websocket_handler.invalid_param.app_error", map[string]interface{}{"Name": name}, "") +} diff --git a/api/websocket_router.go b/api/websocket_router.go new file mode 100644 index 000000000..cd3ff4d1a --- /dev/null +++ b/api/websocket_router.go @@ -0,0 +1,59 @@ +// Copyright (c) 2016 Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package api + +import ( + l4g "github.com/alecthomas/log4go" + + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/utils" +) + +type WebSocketRouter struct { + handlers map[string]*webSocketHandler +} + +func NewWebSocketRouter() *WebSocketRouter { + router := &WebSocketRouter{} + router.handlers = make(map[string]*webSocketHandler) + return router +} + +func (wr *WebSocketRouter) Handle(action string, handler *webSocketHandler) { + wr.handlers[action] = handler +} + +func (wr *WebSocketRouter) ServeWebSocket(conn *WebConn, r *model.WebSocketRequest) { + if r.Action == "" { + err := model.NewLocAppError("ServeWebSocket", "api.web_socket_router.no_action.app_error", nil, "") + wr.ReturnWebSocketError(conn, r, err) + return + } + + if r.Seq <= 0 { + err := model.NewLocAppError("ServeWebSocket", "api.web_socket_router.bad_seq.app_error", nil, "") + wr.ReturnWebSocketError(conn, r, err) + return + } + + var handler *webSocketHandler + if h, ok := wr.handlers[r.Action]; !ok { + err := model.NewLocAppError("ServeWebSocket", "api.web_socket_router.bad_action.app_error", nil, "") + wr.ReturnWebSocketError(conn, r, err) + return + } else { + handler = h + } + + handler.ServeWebSocket(conn, r) +} + +func (wr *WebSocketRouter) ReturnWebSocketError(conn *WebConn, r *model.WebSocketRequest, err *model.AppError) { + l4g.Error(utils.T("api.web_socket_router.log.error"), r.Seq, conn.UserId, err.SystemMessage(utils.T), err.DetailedError) + + err.DetailedError = "" + errorResp := model.NewWebSocketError(r.Seq, err) + + conn.Send <- errorResp +} diff --git a/api/websocket_test.go b/api/websocket_test.go new file mode 100644 index 000000000..b0dc1e955 --- /dev/null +++ b/api/websocket_test.go @@ -0,0 +1,144 @@ +// Copyright (c) 2016 Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package api + +import ( + "testing" + "time" + + "github.com/mattermost/platform/model" +) + +func TestWebSocket(t *testing.T) { + th := Setup().InitBasic() + WebSocketClient, err := th.CreateWebSocketClient() + if err != nil { + t.Fatal(err) + } + defer WebSocketClient.Close() + + time.Sleep(300 * time.Millisecond) + + // Test closing and reconnecting + WebSocketClient.Close() + if err := WebSocketClient.Connect(); err != nil { + t.Fatal(err) + } + + WebSocketClient.Listen() + + time.Sleep(300 * time.Millisecond) + + WebSocketClient.SendMessage("ping", nil) + time.Sleep(300 * time.Millisecond) + if resp := <-WebSocketClient.ResponseChannel; resp.Data["text"].(string) != "pong" { + t.Fatal("wrong response") + } + + WebSocketClient.SendMessage("", nil) + time.Sleep(300 * time.Millisecond) + if resp := <-WebSocketClient.ResponseChannel; resp.Error.Id != "api.web_socket_router.no_action.app_error" { + t.Fatal("should have been no action response") + } + + WebSocketClient.SendMessage("junk", nil) + time.Sleep(300 * time.Millisecond) + if resp := <-WebSocketClient.ResponseChannel; resp.Error.Id != "api.web_socket_router.bad_action.app_error" { + t.Fatal("should have been bad action response") + } + + req := &model.WebSocketRequest{} + req.Seq = 0 + req.Action = "ping" + WebSocketClient.Conn.WriteJSON(req) + time.Sleep(300 * time.Millisecond) + if resp := <-WebSocketClient.ResponseChannel; resp.Error.Id != "api.web_socket_router.bad_seq.app_error" { + t.Fatal("should have been bad action response") + } + + WebSocketClient.UserTyping("", "") + time.Sleep(300 * time.Millisecond) + if resp := <-WebSocketClient.ResponseChannel; resp.Error.Id != "api.websocket_handler.invalid_param.app_error" { + t.Fatal("should have been invalid param response") + } else { + if resp.Error.DetailedError != "" { + t.Fatal("detailed error not cleared") + } + } +} + +func TestWebSocketEvent(t *testing.T) { + th := Setup().InitBasic() + WebSocketClient, err := th.CreateWebSocketClient() + if err != nil { + t.Fatal(err) + } + defer WebSocketClient.Close() + + WebSocketClient.Listen() + + evt1 := model.NewWebSocketEvent(th.BasicTeam.Id, th.BasicChannel.Id, "somerandomid", model.WEBSOCKET_EVENT_TYPING) + go Publish(evt1) + time.Sleep(300 * time.Millisecond) + + stop := make(chan bool) + eventHit := false + + go func() { + for { + select { + case resp := <-WebSocketClient.EventChannel: + if resp.Event == model.WEBSOCKET_EVENT_TYPING && resp.UserId == "somerandomid" { + eventHit = true + } + case <-stop: + return + } + } + }() + + time.Sleep(300 * time.Millisecond) + + stop <- true + + if !eventHit { + t.Fatal("did not receive typing event") + } + + evt2 := model.NewWebSocketEvent(th.BasicTeam.Id, "somerandomid", "somerandomid", model.WEBSOCKET_EVENT_TYPING) + go Publish(evt2) + time.Sleep(300 * time.Millisecond) + + eventHit = false + + go func() { + for { + select { + case resp := <-WebSocketClient.EventChannel: + if resp.Event == model.WEBSOCKET_EVENT_TYPING { + eventHit = true + } + case <-stop: + return + } + } + }() + + time.Sleep(300 * time.Millisecond) + + stop <- true + + if eventHit { + t.Fatal("got typing event for bad channel id") + } +} + +func TestZZWebSocketTearDown(t *testing.T) { + // *IMPORTANT* - Kind of hacky + // This should be the last function in any test file + // that calls Setup() + // Should be in the last file too sorted by name + time.Sleep(2 * time.Second) + TearDown() +} |