diff options
author | =Corey Hulen <corey@hulen.com> | 2015-06-14 23:53:32 -0800 |
---|---|---|
committer | =Corey Hulen <corey@hulen.com> | 2015-06-14 23:53:32 -0800 |
commit | cf7a05f80f68b5b1c8bcc0089679dd497cec2506 (patch) | |
tree | 70007378570a6962d7c175ca96af732b71aeb6da /api/web_team_hub.go | |
download | chat-cf7a05f80f68b5b1c8bcc0089679dd497cec2506.tar.gz chat-cf7a05f80f68b5b1c8bcc0089679dd497cec2506.tar.bz2 chat-cf7a05f80f68b5b1c8bcc0089679dd497cec2506.zip |
first commit
Diffstat (limited to 'api/web_team_hub.go')
-rw-r--r-- | api/web_team_hub.go | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/api/web_team_hub.go b/api/web_team_hub.go new file mode 100644 index 000000000..7c7981e76 --- /dev/null +++ b/api/web_team_hub.go @@ -0,0 +1,119 @@ +// Copyright (c) 2015 Spinpunch, Inc. All Rights Reserved. +// See License.txt for license information. + +package api + +import ( + l4g "code.google.com/p/log4go" + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/store" + "strings" +) + +type TeamHub struct { + connections map[*WebConn]bool + broadcast chan *model.Message + register chan *WebConn + unregister chan *WebConn + stop chan bool + teamId string +} + +func NewTeamHub(teamId string) *TeamHub { + return &TeamHub{ + broadcast: make(chan *model.Message), + register: make(chan *WebConn), + unregister: make(chan *WebConn), + connections: make(map[*WebConn]bool), + stop: make(chan bool), + teamId: teamId, + } +} + +func (h *TeamHub) Register(webConn *WebConn) { + h.register <- webConn +} + +func (h *TeamHub) Unregister(webConn *WebConn) { + h.unregister <- webConn +} + +func (h *TeamHub) Stop() { + h.stop <- true +} + +func (h *TeamHub) Start() { + + pubsub := store.RedisClient().PubSub() + + go func() { + defer func() { + l4g.Debug("redis reader finished for teamId=%v", h.teamId) + hub.Stop(h.teamId) + }() + + l4g.Debug("redis reader starting for teamId=%v", h.teamId) + + err := pubsub.Subscribe(h.teamId) + if err != nil { + l4g.Error("Error while subscribing to redis %v %v", h.teamId, err) + return + } + + for { + if payload, err := pubsub.ReceiveTimeout(REDIS_WAIT); err != nil { + if strings.Contains(err.Error(), "i/o timeout") { + if len(h.connections) == 0 { + l4g.Debug("No active connections so sending stop %v", h.teamId) + return + } + } else { + return + } + } else { + msg := store.GetMessageFromPayload(payload) + if msg != nil { + h.broadcast <- msg + } + } + } + + }() + + go func() { + for { + select { + case webCon := <-h.register: + h.connections[webCon] = true + case webCon := <-h.unregister: + if _, ok := h.connections[webCon]; ok { + delete(h.connections, webCon) + close(webCon.Send) + } + case msg := <-h.broadcast: + for webCon := range h.connections { + if !(webCon.UserId == msg.UserId && msg.Action == model.ACTION_TYPING) { + select { + case webCon.Send <- msg: + default: + close(webCon.Send) + delete(h.connections, webCon) + } + } + } + case s := <-h.stop: + if s { + + l4g.Debug("team hub stopping for teamId=%v", h.teamId) + + for webCon := range h.connections { + webCon.WebSocket.Close() + } + + pubsub.Close() + return + } + } + } + }() +} |