diff options
Diffstat (limited to 'model')
-rw-r--r-- | model/cluster_discovery.go | 132 | ||||
-rw-r--r-- | model/cluster_discovery_test.go | 59 | ||||
-rw-r--r-- | model/cluster_info.go | 49 | ||||
-rw-r--r-- | model/cluster_info_test.go | 18 | ||||
-rw-r--r-- | model/cluster_message.go | 55 | ||||
-rw-r--r-- | model/cluster_message_test.go | 28 | ||||
-rw-r--r-- | model/config.go | 51 | ||||
-rw-r--r-- | model/utils.go | 19 | ||||
-rw-r--r-- | model/utils_test.go | 6 |
9 files changed, 348 insertions, 69 deletions
diff --git a/model/cluster_discovery.go b/model/cluster_discovery.go new file mode 100644 index 000000000..4b9269656 --- /dev/null +++ b/model/cluster_discovery.go @@ -0,0 +1,132 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package model + +import ( + "encoding/json" + "io" + "os" +) + +const ( + CDS_OFFLINE_AFTER_MILLIS = 1000 * 60 * 30 // 30 minutes + CDS_TYPE_APP = "mattermost_app" +) + +type ClusterDiscovery struct { + Id string `json:"id"` + Type string `json:"type"` + ClusterName string `json:"cluster_name"` + Hostname string `json:"hostname"` + GossipPort int32 `json:"gossip_port"` + Port int32 `json:"port"` + CreateAt int64 `json:"create_at"` + LastPingAt int64 `json:"last_ping_at"` +} + +func (o *ClusterDiscovery) PreSave() { + if o.Id == "" { + o.Id = NewId() + } + + if o.CreateAt == 0 { + o.CreateAt = GetMillis() + o.LastPingAt = o.CreateAt + } +} + +func (o *ClusterDiscovery) AutoFillHostname() { + // attempt to set the hostname from the OS + if len(o.Hostname) == 0 { + if hn, err := os.Hostname(); err == nil { + o.Hostname = hn + } + } +} + +func (o *ClusterDiscovery) AutoFillIpAddress() { + // attempt to set the hostname to the first non-local IP address + if len(o.Hostname) == 0 { + o.Hostname = GetServerIpAddress() + } +} + +func (o *ClusterDiscovery) IsEqual(in *ClusterDiscovery) bool { + if in == nil { + return false + } + + if o.Type != in.Type { + return false + } + + if o.ClusterName != in.ClusterName { + return false + } + + if o.Hostname != in.Hostname { + return false + } + + return true +} + +func FilterClusterDiscovery(vs []*ClusterDiscovery, f func(*ClusterDiscovery) bool) []*ClusterDiscovery { + copy := make([]*ClusterDiscovery, 0) + for _, v := range vs { + if f(v) { + copy = append(copy, v) + } + } + + return copy +} + +func (o *ClusterDiscovery) IsValid() *AppError { + if len(o.Id) != 26 { + return NewLocAppError("Channel.IsValid", "model.channel.is_valid.id.app_error", nil, "") + } + + if len(o.ClusterName) == 0 { + return NewLocAppError("ClusterDiscovery.IsValid", "ClusterName must be set", nil, "") + } + + if len(o.Type) == 0 { + return NewLocAppError("ClusterDiscovery.IsValid", "Type must be set", nil, "") + } + + if len(o.Hostname) == 0 { + return NewLocAppError("ClusterDiscovery.IsValid", "Hostname must be set", nil, "") + } + + if o.CreateAt == 0 { + return NewLocAppError("ClusterDiscovery.IsValid", "CreateAt must be set", nil, "") + } + + if o.LastPingAt == 0 { + return NewLocAppError("ClusterDiscovery.IsValid", "LastPingAt must be set", nil, "") + } + + return nil +} + +func (o *ClusterDiscovery) ToJson() string { + b, err := json.Marshal(o) + if err != nil { + return "" + } + + return string(b) +} + +func ClusterDiscoveryFromJson(data io.Reader) *ClusterDiscovery { + decoder := json.NewDecoder(data) + var me ClusterDiscovery + err := decoder.Decode(&me) + if err == nil { + return &me + } + + return nil +} diff --git a/model/cluster_discovery_test.go b/model/cluster_discovery_test.go new file mode 100644 index 000000000..bfbdbd303 --- /dev/null +++ b/model/cluster_discovery_test.go @@ -0,0 +1,59 @@ +// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package model + +import ( + "strings" + "testing" +) + +func TestClusterDiscovery(t *testing.T) { + o := ClusterDiscovery{ + Type: "test_type", + ClusterName: "cluster_name", + Hostname: "test_hostname", + } + + json := o.ToJson() + result1 := ClusterDiscoveryFromJson(strings.NewReader(json)) + + if result1.ClusterName != "cluster_name" { + t.Fatal("should be set") + } + + result2 := ClusterDiscoveryFromJson(strings.NewReader(json)) + result3 := ClusterDiscoveryFromJson(strings.NewReader(json)) + + o.Id = "0" + result1.Id = "1" + result2.Id = "2" + result3.Id = "3" + result3.Hostname = "something_diff" + + if !o.IsEqual(result1) { + t.Fatal("Should be equal") + } + + list := make([]*ClusterDiscovery, 0) + list = append(list, &o) + list = append(list, result1) + list = append(list, result2) + list = append(list, result3) + + rlist := FilterClusterDiscovery(list, func(in *ClusterDiscovery) bool { + return !o.IsEqual(in) + }) + + if len(rlist) != 1 { + t.Fatal("should only have 1 result") + } + + o.AutoFillHostname() + o.Hostname = "" + o.AutoFillHostname() + + o.AutoFillIpAddress() + o.Hostname = "" + o.AutoFillIpAddress() +} diff --git a/model/cluster_info.go b/model/cluster_info.go index f76a03c0b..1e468044e 100644 --- a/model/cluster_info.go +++ b/model/cluster_info.go @@ -7,24 +7,16 @@ import ( "encoding/json" "io" "strings" - "sync" - "sync/atomic" ) type ClusterInfo struct { - Id string `json:"id"` - Version string `json:"version"` - ConfigHash string `json:"config_hash"` - InterNodeUrl string `json:"internode_url"` - Hostname string `json:"hostname"` - LastSuccessfulPing int64 `json:"last_ping"` - Alive int32 `json:"is_alive"` - Mutex sync.RWMutex `json:"-"` + Version string `json:"version"` + ConfigHash string `json:"config_hash"` + IpAddress string `json:"ipaddress"` + Hostname string `json:"hostname"` } func (me *ClusterInfo) ToJson() string { - me.Mutex.RLock() - defer me.Mutex.RUnlock() b, err := json.Marshal(me) if err != nil { return "" @@ -41,7 +33,6 @@ func (me *ClusterInfo) Copy() *ClusterInfo { func ClusterInfoFromJson(data io.Reader) *ClusterInfo { decoder := json.NewDecoder(data) var me ClusterInfo - me.Mutex = sync.RWMutex{} err := decoder.Decode(&me) if err == nil { return &me @@ -50,38 +41,6 @@ func ClusterInfoFromJson(data io.Reader) *ClusterInfo { } } -func (me *ClusterInfo) SetAlive(alive bool) { - if alive { - atomic.StoreInt32(&me.Alive, 1) - } else { - atomic.StoreInt32(&me.Alive, 0) - } -} - -func (me *ClusterInfo) IsAlive() bool { - return atomic.LoadInt32(&me.Alive) == 1 -} - -func (me *ClusterInfo) HaveEstablishedInitialContact() bool { - me.Mutex.RLock() - defer me.Mutex.RUnlock() - if me.Id != "" { - return true - } - - return false -} - -func (me *ClusterInfo) IdEqualTo(in string) bool { - me.Mutex.RLock() - defer me.Mutex.RUnlock() - if me.Id == in { - return true - } - - return false -} - func ClusterInfosToJson(objmap []*ClusterInfo) string { if b, err := json.Marshal(objmap); err != nil { return "" diff --git a/model/cluster_info_test.go b/model/cluster_info_test.go index 038927120..c019df40a 100644 --- a/model/cluster_info_test.go +++ b/model/cluster_info_test.go @@ -9,33 +9,23 @@ import ( ) func TestClusterInfoJson(t *testing.T) { - cluster := ClusterInfo{Id: NewId(), InterNodeUrl: NewId(), Hostname: NewId()} + cluster := ClusterInfo{IpAddress: NewId(), Hostname: NewId()} json := cluster.ToJson() result := ClusterInfoFromJson(strings.NewReader(json)) - if cluster.Id != result.Id { + if cluster.IpAddress != result.IpAddress { t.Fatal("Ids do not match") } - - cluster.SetAlive(true) - if !cluster.IsAlive() { - t.Fatal("should be live") - } - - cluster.SetAlive(false) - if cluster.IsAlive() { - t.Fatal("should be not live") - } } func TestClusterInfosJson(t *testing.T) { - cluster := ClusterInfo{Id: NewId(), InterNodeUrl: NewId(), Hostname: NewId()} + cluster := ClusterInfo{IpAddress: NewId(), Hostname: NewId()} clusterInfos := make([]*ClusterInfo, 1) clusterInfos[0] = &cluster json := ClusterInfosToJson(clusterInfos) result := ClusterInfosFromJson(strings.NewReader(json)) - if clusterInfos[0].Id != result[0].Id { + if clusterInfos[0].IpAddress != result[0].IpAddress { t.Fatal("Ids do not match") } diff --git a/model/cluster_message.go b/model/cluster_message.go new file mode 100644 index 000000000..a6dec2e7f --- /dev/null +++ b/model/cluster_message.go @@ -0,0 +1,55 @@ +// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package model + +import ( + "encoding/json" + "io" +) + +const ( + CLUSTER_EVENT_PUBLISH = "publish" + CLUSTER_EVENT_UPDATE_STATUS = "update_status" + CLUSTER_EVENT_INVALIDATE_ALL_CACHES = "inv_all_caches" + CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS = "inv_reactions" + CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOK = "inv_webhook" + CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_POSTS = "inv_channel_posts" + CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS_NOTIFY_PROPS = "inv_channel_members_notify_props" + CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS = "inv_channel_members" + CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME = "inv_channel_name" + CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL = "inv_channel" + CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER = "inv_user" + CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER = "clear_session_user" + + CLUSTER_SEND_BEST_EFFORT = "best_effort" + CLUSTER_SEND_RELIABLE = "reliable" +) + +type ClusterMessage struct { + Event string `json:"event"` + SendType string `json:"-"` + WaitForAllToSend bool `json:"-"` + Data string `json:"data,omitempty"` + Props map[string]string `json:"props,omitempty"` +} + +func (o *ClusterMessage) ToJson() string { + b, err := json.Marshal(o) + if err != nil { + return "" + } else { + return string(b) + } +} + +func ClusterMessageFromJson(data io.Reader) *ClusterMessage { + decoder := json.NewDecoder(data) + var o ClusterMessage + err := decoder.Decode(&o) + if err == nil { + return &o + } else { + return nil + } +} diff --git a/model/cluster_message_test.go b/model/cluster_message_test.go new file mode 100644 index 000000000..38603e577 --- /dev/null +++ b/model/cluster_message_test.go @@ -0,0 +1,28 @@ +// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package model + +import ( + "strings" + "testing" +) + +func TestClusterMessage(t *testing.T) { + m := ClusterMessage{ + Event: CLUSTER_EVENT_PUBLISH, + SendType: CLUSTER_SEND_BEST_EFFORT, + Data: "hello", + } + json := m.ToJson() + result := ClusterMessageFromJson(strings.NewReader(json)) + + if result.Data != "hello" { + t.Fatal() + } + + badresult := ClusterMessageFromJson(strings.NewReader("junk")) + if badresult != nil { + t.Fatal("should not have parsed") + } +} diff --git a/model/config.go b/model/config.go index 4e3a3f7cc..f2b17bced 100644 --- a/model/config.go +++ b/model/config.go @@ -163,9 +163,14 @@ type ServiceSettings struct { } type ClusterSettings struct { - Enable *bool - InterNodeListenAddress *string - InterNodeUrls []string + Enable *bool + ClusterName *string + OverrideHostname *string + UseIpAddress *bool + UseExperimentalGossip *bool + ReadOnlyConfig *bool + GossipPort *int + StreamingPort *int } type MetricsSettings struct { @@ -1036,18 +1041,44 @@ func (o *Config) SetDefaults() { *o.ServiceSettings.PostEditTimeLimit = 300 } - if o.ClusterSettings.InterNodeListenAddress == nil { - o.ClusterSettings.InterNodeListenAddress = new(string) - *o.ClusterSettings.InterNodeListenAddress = ":8075" - } - if o.ClusterSettings.Enable == nil { o.ClusterSettings.Enable = new(bool) *o.ClusterSettings.Enable = false } - if o.ClusterSettings.InterNodeUrls == nil { - o.ClusterSettings.InterNodeUrls = []string{} + if o.ClusterSettings.ClusterName == nil { + o.ClusterSettings.ClusterName = new(string) + *o.ClusterSettings.ClusterName = "" + } + + if o.ClusterSettings.OverrideHostname == nil { + o.ClusterSettings.OverrideHostname = new(string) + *o.ClusterSettings.OverrideHostname = "" + } + + if o.ClusterSettings.UseIpAddress == nil { + o.ClusterSettings.UseIpAddress = new(bool) + *o.ClusterSettings.UseIpAddress = true + } + + if o.ClusterSettings.UseExperimentalGossip == nil { + o.ClusterSettings.UseExperimentalGossip = new(bool) + *o.ClusterSettings.UseExperimentalGossip = false + } + + if o.ClusterSettings.ReadOnlyConfig == nil { + o.ClusterSettings.ReadOnlyConfig = new(bool) + *o.ClusterSettings.ReadOnlyConfig = true + } + + if o.ClusterSettings.GossipPort == nil { + o.ClusterSettings.GossipPort = new(int) + *o.ClusterSettings.GossipPort = 8074 + } + + if o.ClusterSettings.StreamingPort == nil { + o.ClusterSettings.StreamingPort = new(int) + *o.ClusterSettings.StreamingPort = 8075 } if o.MetricsSettings.ListenAddress == nil { diff --git a/model/utils.go b/model/utils.go index d24540683..e7d8bfdac 100644 --- a/model/utils.go +++ b/model/utils.go @@ -17,6 +17,8 @@ import ( "strings" "time" + "net" + goi18n "github.com/nicksnyder/go-i18n/i18n" "github.com/pborman/uuid" ) @@ -264,6 +266,23 @@ func StringFromJson(data io.Reader) string { } } +func GetServerIpAddress() string { + if addrs, err := net.InterfaceAddrs(); err != nil { + return "" + } else { + for _, addr := range addrs { + + if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() { + if ip.IP.To4() != nil { + return ip.IP.String() + } + } + } + } + + return "" +} + func IsLower(s string) bool { if strings.ToLower(s) == s { return true diff --git a/model/utils_test.go b/model/utils_test.go index 94ee55aa9..bc2aa6ce7 100644 --- a/model/utils_test.go +++ b/model/utils_test.go @@ -193,6 +193,12 @@ func TestIsValidAlphaNum(t *testing.T) { } } +func TestGetServerIpAddress(t *testing.T) { + if len(GetServerIpAddress()) == 0 { + t.Fatal("Should find local ip address") + } +} + func TestIsValidAlphaNumHyphenUnderscore(t *testing.T) { casesWithFormat := []struct { Input string |