diff options
author | Corey Hulen <corey@hulen.com> | 2017-04-21 09:38:26 -0700 |
---|---|---|
committer | Joram Wilander <jwawilander@gmail.com> | 2017-04-21 12:38:26 -0400 |
commit | e62afeace04e2abd23fa78a0a54e0a5d2e17e0b7 (patch) | |
tree | ccc4feb2ac3c9dcb0e8766366854de0646443b63 /app/web_conn.go | |
parent | 81706b402dafd6ce0727ed6d65105092f76b118a (diff) | |
download | chat-e62afeace04e2abd23fa78a0a54e0a5d2e17e0b7.tar.gz chat-e62afeace04e2abd23fa78a0a54e0a5d2e17e0b7.tar.bz2 chat-e62afeace04e2abd23fa78a0a54e0a5d2e17e0b7.zip |
Adding slow pump detection to web_conn and better metrics (#6114)
* Adding slow pump detection to web_conn and better metrics
* Fixing bad merge
* Fixing typo
Diffstat (limited to 'app/web_conn.go')
-rw-r--r-- | app/web_conn.go | 64 |
1 files changed, 43 insertions, 21 deletions
diff --git a/app/web_conn.go b/app/web_conn.go index 000704791..2c1913e2b 100644 --- a/app/web_conn.go +++ b/app/web_conn.go @@ -17,6 +17,9 @@ import ( ) const ( + SEND_QUEUE_SIZE = 256 + SEND_SLOW_WARN = (SEND_QUEUE_SIZE * 50) / 100 + SEND_DEADLOCK_WARN = (SEND_QUEUE_SIZE * 95) / 100 WRITE_WAIT = 30 * time.Second PONG_WAIT = 100 * time.Second PING_PERIOD = (PONG_WAIT * 6) / 10 @@ -44,7 +47,7 @@ func NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFun } return &WebConn{ - Send: make(chan model.WebSocketMessage, 256), + Send: make(chan model.WebSocketMessage, SEND_QUEUE_SIZE), WebSocket: ws, UserId: session.UserId, SessionToken: session.Token, @@ -105,35 +108,54 @@ func (c *WebConn) WritePump() { return } - var msgBytes []byte - if evt, ok := msg.(*model.WebSocketEvent); ok { - cpyEvt := &model.WebSocketEvent{} - *cpyEvt = *evt - cpyEvt.Sequence = c.Sequence - msgBytes = []byte(cpyEvt.ToJson()) - c.Sequence++ - } else { - msgBytes = []byte(msg.ToJson()) + evt, evtOk := msg.(*model.WebSocketEvent) + + skipSend := false + if len(c.Send) >= SEND_SLOW_WARN { + // When the pump starts to get slow we'll drop non-critical messages + if msg.EventType() == model.WEBSOCKET_EVENT_TYPING || msg.EventType() == model.WEBSOCKET_EVENT_STATUS_CHANGE { + l4g.Info(fmt.Sprintf("websocket.slow: dropping message userId=%v type=%v channelId=%v", c.UserId, msg.EventType(), evt.Broadcast.ChannelId)) + skipSend = true + } } - c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT)) - if err := c.WebSocket.WriteMessage(websocket.TextMessage, msgBytes); err != nil { - // browsers will appear as CloseNoStatusReceived - if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { - l4g.Debug(fmt.Sprintf("websocket.send: client side closed socket userId=%v", c.UserId)) + if !skipSend { + var msgBytes []byte + if evtOk { + cpyEvt := &model.WebSocketEvent{} + *cpyEvt = *evt + cpyEvt.Sequence = c.Sequence + msgBytes = []byte(cpyEvt.ToJson()) + c.Sequence++ } else { - l4g.Debug(fmt.Sprintf("websocket.send: closing websocket for userId=%v, error=%v", c.UserId, err.Error())) + msgBytes = []byte(msg.ToJson()) } - return - } + if len(c.Send) >= SEND_DEADLOCK_WARN { + if evtOk { + l4g.Error(fmt.Sprintf("websocket.full: message userId=%v type=%v channelId=%v size=%v", c.UserId, msg.EventType(), evt.Broadcast.ChannelId, len(msg.ToJson()))) + } else { + l4g.Error(fmt.Sprintf("websocket.full: message userId=%v type=%v size=%v", c.UserId, msg.EventType(), len(msg.ToJson()))) + } + } + + c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT)) + if err := c.WebSocket.WriteMessage(websocket.TextMessage, msgBytes); err != nil { + // browsers will appear as CloseNoStatusReceived + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { + l4g.Debug(fmt.Sprintf("websocket.send: client side closed socket userId=%v", c.UserId)) + } else { + l4g.Debug(fmt.Sprintf("websocket.send: closing websocket for userId=%v, error=%v", c.UserId, err.Error())) + } + + return + } - if msg.EventType() == model.WEBSOCKET_EVENT_POSTED { if einterfaces.GetMetricsInterface() != nil { - einterfaces.GetMetricsInterface().IncrementPostBroadcast() + go einterfaces.GetMetricsInterface().IncrementWebSocketBroadcast(msg.EventType()) } - } + } case <-ticker.C: c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT)) if err := c.WebSocket.WriteMessage(websocket.PingMessage, []byte{}); err != nil { |