diff options
author | Joram Wilander <jwawilander@gmail.com> | 2017-04-01 11:39:13 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-01 11:39:13 -0400 |
commit | 95da05a8c97332d8eff90c7587ed17a41966c5f0 (patch) | |
tree | d50084481487988ad83deb5ab6af2be7d2a9f110 | |
parent | d39947f53933ee4beb4ed8ab614324edc36fba2d (diff) | |
download | chat-95da05a8c97332d8eff90c7587ed17a41966c5f0.tar.gz chat-95da05a8c97332d8eff90c7587ed17a41966c5f0.tar.bz2 chat-95da05a8c97332d8eff90c7587ed17a41966c5f0.zip |
PLT-5750 Add sequence number to websocket connections and events (#5907)
* Add sequence number to websocket connections and events
* Copy pointer instead of pass by value and use int64 over uint64
* Add more logging to missed events
-rw-r--r-- | app/web_conn.go | 15 | ||||
-rw-r--r-- | app/web_hub.go | 2 | ||||
-rw-r--r-- | app/websocket_router.go | 2 | ||||
-rw-r--r-- | model/websocket_message.go | 45 | ||||
-rw-r--r-- | webapp/actions/websocket_actions.jsx | 7 | ||||
-rw-r--r-- | webapp/client/websocket_client.jsx | 13 | ||||
-rw-r--r-- | wsapi/websocket_handler.go | 3 |
7 files changed, 41 insertions, 46 deletions
diff --git a/app/web_conn.go b/app/web_conn.go index da6330f5c..11290b67d 100644 --- a/app/web_conn.go +++ b/app/web_conn.go @@ -35,6 +35,7 @@ type WebConn struct { Locale string AllChannelMembers map[string]string LastAllChannelMembersTime int64 + Sequence int64 } func NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFunc, locale string) *WebConn { @@ -104,8 +105,19 @@ func (c *WebConn) WritePump() { return } + var msgBytes []byte + if evt, ok := msg.(*model.WebSocketEvent); ok { + cpyEvt := &model.WebSocketEvent{} + *cpyEvt = *evt + cpyEvt.Sequence = c.Sequence + msgBytes = []byte(cpyEvt.ToJson()) + c.Sequence++ + } else { + msgBytes = []byte(msg.ToJson()) + } + c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT)) - if err := c.WebSocket.WriteMessage(websocket.TextMessage, msg.GetPreComputeJson()); err != nil { + if err := c.WebSocket.WriteMessage(websocket.TextMessage, msgBytes); err != nil { // browsers will appear as CloseNoStatusReceived if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { l4g.Debug(fmt.Sprintf("websocket.send: client side closed socket userId=%v", c.UserId)) @@ -179,7 +191,6 @@ func (webCon *WebConn) IsAuthenticated() bool { func (webCon *WebConn) SendHello() { msg := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_HELLO, "", "", webCon.UserId, nil) msg.Add("server_version", fmt.Sprintf("%v.%v.%v.%v", model.CurrentVersion, model.BuildNumber, utils.CfgHash, utils.IsLicensed)) - msg.DoPreComputeJson() webCon.Send <- msg } diff --git a/app/web_hub.go b/app/web_hub.go index 65d18481f..a0663459a 100644 --- a/app/web_hub.go +++ b/app/web_hub.go @@ -94,7 +94,6 @@ func Publish(message *model.WebSocketEvent) { metrics.IncrementWebsocketEvent(message.Event) } - message.DoPreComputeJson() for _, hub := range hubs { hub.Broadcast(message) } @@ -105,7 +104,6 @@ func Publish(message *model.WebSocketEvent) { } func PublishSkipClusterSend(message *model.WebSocketEvent) { - message.DoPreComputeJson() for _, hub := range hubs { hub.Broadcast(message) } diff --git a/app/websocket_router.go b/app/websocket_router.go index 30714a447..4569134b0 100644 --- a/app/websocket_router.go +++ b/app/websocket_router.go @@ -61,7 +61,6 @@ func (wr *WebSocketRouter) ServeWebSocket(conn *WebConn, r *model.WebSocketReque HubRegister(conn) resp := model.NewWebSocketResponse(model.STATUS_OK, r.Seq, nil) - resp.DoPreComputeJson() conn.Send <- resp } @@ -91,7 +90,6 @@ func ReturnWebSocketError(conn *WebConn, r *model.WebSocketRequest, err *model.A err.DetailedError = "" errorResp := model.NewWebSocketError(r.Seq, err) - errorResp.DoPreComputeJson() conn.Send <- errorResp } diff --git a/model/websocket_message.go b/model/websocket_message.go index 23820470b..6c47eb948 100644 --- a/model/websocket_message.go +++ b/model/websocket_message.go @@ -36,8 +36,6 @@ const ( type WebSocketMessage interface { ToJson() string IsValid() bool - DoPreComputeJson() - GetPreComputeJson() []byte EventType() string } @@ -49,10 +47,10 @@ type WebsocketBroadcast struct { } type WebSocketEvent struct { - Event string `json:"event"` - Data map[string]interface{} `json:"data"` - Broadcast *WebsocketBroadcast `json:"broadcast"` - PreComputeJson []byte `json:"-"` + Event string `json:"event"` + Data map[string]interface{} `json:"data"` + Broadcast *WebsocketBroadcast `json:"broadcast"` + Sequence int64 `json:"seq"` } func (m *WebSocketEvent) Add(key string, value interface{}) { @@ -72,19 +70,6 @@ func (o *WebSocketEvent) EventType() string { return o.Event } -func (o *WebSocketEvent) DoPreComputeJson() { - b, err := json.Marshal(o) - if err != nil { - o.PreComputeJson = []byte("") - } else { - o.PreComputeJson = b - } -} - -func (o *WebSocketEvent) GetPreComputeJson() []byte { - return o.PreComputeJson -} - func (o *WebSocketEvent) ToJson() string { b, err := json.Marshal(o) if err != nil { @@ -106,11 +91,10 @@ func WebSocketEventFromJson(data io.Reader) *WebSocketEvent { } type WebSocketResponse struct { - Status string `json:"status"` - SeqReply int64 `json:"seq_reply,omitempty"` - Data map[string]interface{} `json:"data,omitempty"` - Error *AppError `json:"error,omitempty"` - PreComputeJson []byte `json:"-"` + Status string `json:"status"` + SeqReply int64 `json:"seq_reply,omitempty"` + Data map[string]interface{} `json:"data,omitempty"` + Error *AppError `json:"error,omitempty"` } func (m *WebSocketResponse) Add(key string, value interface{}) { @@ -142,19 +126,6 @@ func (o *WebSocketResponse) ToJson() string { } } -func (o *WebSocketResponse) DoPreComputeJson() { - b, err := json.Marshal(o) - if err != nil { - o.PreComputeJson = []byte("") - } else { - o.PreComputeJson = b - } -} - -func (o *WebSocketResponse) GetPreComputeJson() []byte { - return o.PreComputeJson -} - func WebSocketResponseFromJson(data io.Reader) *WebSocketResponse { decoder := json.NewDecoder(data) var o WebSocketResponse diff --git a/webapp/actions/websocket_actions.jsx b/webapp/actions/websocket_actions.jsx index e36d11fde..e07e3e217 100644 --- a/webapp/actions/websocket_actions.jsx +++ b/webapp/actions/websocket_actions.jsx @@ -61,6 +61,13 @@ export function initialize() { WebSocketClient.setEventCallback(handleEvent); WebSocketClient.setFirstConnectCallback(handleFirstConnect); + WebSocketClient.setReconnectCallback(() => reconnect(false)); + WebSocketClient.setMissedEventCallback(() => { + if (global.window.mm_config.EnableDeveloper === 'true') { + Client.logClientError('missed websocket event seq=' + WebSocketClient.eventSequence); + } + reconnect(false); + }); WebSocketClient.setCloseCallback(handleClose); WebSocketClient.initialize(connUrl); } diff --git a/webapp/client/websocket_client.jsx b/webapp/client/websocket_client.jsx index 35be5c3df..1cf97b788 100644 --- a/webapp/client/websocket_client.jsx +++ b/webapp/client/websocket_client.jsx @@ -10,11 +10,13 @@ export default class WebSocketClient { this.conn = null; this.connectionUrl = null; this.sequence = 1; + this.eventSequence = 0; this.connectFailCount = 0; this.eventCallback = null; this.responseCallbacks = {}; this.firstConnectCallback = null; this.reconnectCallback = null; + this.missedEventCallback = null; this.errorCallback = null; this.closeCallback = null; } @@ -37,6 +39,8 @@ export default class WebSocketClient { this.connectionUrl = connectionUrl; this.conn.onopen = () => { + this.eventSequence = 0; + if (token) { this.sendMessage('authentication_challenge', {token}); } @@ -108,6 +112,11 @@ export default class WebSocketClient { Reflect.deleteProperty(this.responseCallbacks, msg.seq_reply); } } else if (this.eventCallback) { + if (msg.seq !== this.eventSequence && this.missedEventCallback) { + console.log('missed websocket event, act_seq=' + msg.seq + ' exp_seq=' + this.eventSequence); //eslint-disable-line no-console + this.missedEventCallback(); + } + this.eventSequence = msg.seq + 1; this.eventCallback(msg); } }; @@ -125,6 +134,10 @@ export default class WebSocketClient { this.reconnectCallback = callback; } + setMissedEventCallback(callback) { + this.missedEventCallback = callback; + } + setErrorCallback(callback) { this.errorCallback = callback; } diff --git a/wsapi/websocket_handler.go b/wsapi/websocket_handler.go index 193539242..8d78ece04 100644 --- a/wsapi/websocket_handler.go +++ b/wsapi/websocket_handler.go @@ -27,7 +27,6 @@ func (wh webSocketHandler) ServeWebSocket(conn *app.WebConn, r *model.WebSocketR l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, conn.UserId, sessionErr.SystemMessage(utils.T), sessionErr.Error()) sessionErr.DetailedError = "" errResp := model.NewWebSocketError(r.Seq, sessionErr) - errResp.DoPreComputeJson() conn.Send <- errResp return @@ -44,14 +43,12 @@ func (wh webSocketHandler) ServeWebSocket(conn *app.WebConn, r *model.WebSocketR l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, r.Session.UserId, err.SystemMessage(utils.T), err.DetailedError) err.DetailedError = "" errResp := model.NewWebSocketError(r.Seq, err) - errResp.DoPreComputeJson() conn.Send <- errResp return } resp := model.NewWebSocketResponse(model.STATUS_OK, r.Seq, data) - resp.DoPreComputeJson() conn.Send <- resp } |