diff options
Diffstat (limited to 'app')
-rw-r--r-- | app/admin.go | 10 | ||||
-rw-r--r-- | app/app.go | 69 | ||||
-rw-r--r-- | app/app_test.go | 56 | ||||
-rw-r--r-- | app/apptestlib.go | 72 | ||||
-rw-r--r-- | app/diagnostics.go | 2 | ||||
-rw-r--r-- | app/import.go | 39 | ||||
-rw-r--r-- | app/notification.go | 4 | ||||
-rw-r--r-- | app/oauth.go | 12 | ||||
-rw-r--r-- | app/options.go | 59 | ||||
-rw-r--r-- | app/plugins.go | 10 | ||||
-rw-r--r-- | app/post.go | 11 | ||||
-rw-r--r-- | app/security_update_check.go | 3 | ||||
-rw-r--r-- | app/server.go | 124 | ||||
-rw-r--r-- | app/user.go | 6 | ||||
-rw-r--r-- | app/web_conn.go | 5 | ||||
-rw-r--r-- | app/webhook.go | 2 | ||||
-rw-r--r-- | app/webrtc.go | 42 | ||||
-rw-r--r-- | app/websocket_router.go | 7 |
18 files changed, 407 insertions, 126 deletions
diff --git a/app/admin.go b/app/admin.go index dab7e9759..5994fc826 100644 --- a/app/admin.go +++ b/app/admin.go @@ -15,7 +15,6 @@ import ( l4g "github.com/alecthomas/log4go" "github.com/mattermost/mattermost-server/model" - "github.com/mattermost/mattermost-server/store" "github.com/mattermost/mattermost-server/store/sqlstore" "github.com/mattermost/mattermost-server/utils" ) @@ -187,12 +186,13 @@ func (a *App) RecycleDatabaseConnection() { oldStore := a.Srv.Store l4g.Warn(utils.T("api.admin.recycle_db_start.warn")) - a.Srv.Store = store.NewLayeredStore(sqlstore.NewSqlSupplier(a.Metrics), a.Metrics, a.Cluster) - + a.Srv.Store = a.newStore() a.Jobs.Store = a.Srv.Store - time.Sleep(20 * time.Second) - oldStore.Close() + if a.Srv.Store != oldStore { + time.Sleep(20 * time.Second) + oldStore.Close() + } l4g.Warn(utils.T("api.admin.recycle_db_end.warn")) } diff --git a/app/app.go b/app/app.go index a250efe5c..34c0721a0 100644 --- a/app/app.go +++ b/app/app.go @@ -4,17 +4,19 @@ package app import ( - "io/ioutil" "net/http" "sync/atomic" l4g "github.com/alecthomas/log4go" + "github.com/gorilla/mux" "github.com/mattermost/mattermost-server/einterfaces" ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" "github.com/mattermost/mattermost-server/jobs" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/plugin/pluginenv" + "github.com/mattermost/mattermost-server/store" + "github.com/mattermost/mattermost-server/store/sqlstore" "github.com/mattermost/mattermost-server/utils" ) @@ -44,44 +46,70 @@ type App struct { Metrics einterfaces.MetricsInterface Mfa einterfaces.MfaInterface Saml einterfaces.SamlInterface + + newStore func() store.Store + configOverride func(*model.Config) *model.Config } var appCount = 0 // New creates a new App. You must call Shutdown when you're done with it. // XXX: For now, only one at a time is allowed as some resources are still shared. -func New() *App { +func New(options ...Option) *App { appCount++ if appCount > 1 { panic("Only one App should exist at a time. Did you forget to call Shutdown()?") } + l4g.Info(utils.T("api.server.new_server.init.info")) + app := &App{ goroutineExitSignal: make(chan struct{}, 1), Jobs: &jobs.JobServer{}, + Srv: &Server{ + Router: mux.NewRouter(), + }, } app.initEnterprise() + + for _, option := range options { + option(app) + } + + if app.newStore == nil { + app.newStore = func() store.Store { + return store.NewLayeredStore(sqlstore.NewSqlSupplier(app.Config().SqlSettings, app.Metrics), app.Metrics, app.Cluster) + } + } + + app.Srv.Store = app.newStore() + app.Jobs.Store = app.Srv.Store + + app.Srv.Router.NotFoundHandler = http.HandlerFunc(app.Handle404) + + app.Srv.WebSocketRouter = &WebSocketRouter{ + app: app, + handlers: make(map[string]webSocketHandler), + } + return app } func (a *App) Shutdown() { appCount-- - if a.Srv != nil { - l4g.Info(utils.T("api.server.stop_server.stopping.info")) + l4g.Info(utils.T("api.server.stop_server.stopping.info")) - a.Srv.GracefulServer.Stop(TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN) - <-a.Srv.GracefulServer.StopChan() - a.HubStop() + a.StopServer() + a.HubStop() - a.ShutDownPlugins() - a.WaitForGoroutines() + a.ShutDownPlugins() + a.WaitForGoroutines() - a.Srv.Store.Close() - a.Srv = nil + a.Srv.Store.Close() + a.Srv = nil - l4g.Info(utils.T("api.server.stop_server.stopped.info")) - } + l4g.Info(utils.T("api.server.stop_server.stopped.info")) } var accountMigrationInterface func(*App) einterfaces.AccountMigrationInterface @@ -206,6 +234,9 @@ func (a *App) initEnterprise() { } func (a *App) Config() *model.Config { + if a.configOverride != nil { + return a.configOverride(utils.Cfg) + } return utils.Cfg } @@ -232,9 +263,11 @@ func (a *App) WaitForGoroutines() { } } -func CloseBody(r *http.Response) { - if r.Body != nil { - ioutil.ReadAll(r.Body) - r.Body.Close() - } +func (a *App) Handle404(w http.ResponseWriter, r *http.Request) { + err := model.NewAppError("Handle404", "api.context.404.app_error", nil, "", http.StatusNotFound) + err.Translate(utils.T) + + l4g.Debug("%v: code=404 ip=%v", r.URL.Path, utils.GetIpAddress(r)) + + utils.RenderWebError(err, w, r) } diff --git a/app/app_test.go b/app/app_test.go new file mode 100644 index 000000000..6f2a3a23a --- /dev/null +++ b/app/app_test.go @@ -0,0 +1,56 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package app + +import ( + "flag" + "os" + "testing" + + l4g "github.com/alecthomas/log4go" + + "github.com/mattermost/mattermost-server/store/storetest" + "github.com/mattermost/mattermost-server/utils" +) + +func TestMain(m *testing.M) { + flag.Parse() + + // In the case where a dev just wants to run a single test, it's faster to just use the default + // store. + if filter := flag.Lookup("test.run").Value.String(); filter != "" && filter != "." { + utils.TranslationsPreInit() + utils.LoadConfig("config.json") + l4g.Info("-test.run used, not creating temporary containers") + os.Exit(m.Run()) + } + + utils.TranslationsPreInit() + utils.LoadConfig("config.json") + utils.InitTranslations(utils.Cfg.LocalizationSettings) + + status := 0 + + container, settings, err := storetest.NewMySQLContainer() + if err != nil { + panic(err) + } + + UseTestStore(container, settings) + + defer func() { + StopTestStore() + os.Exit(status) + }() + + status = m.Run() +} + +func TestAppRace(t *testing.T) { + for i := 0; i < 10; i++ { + a := New() + a.StartServer() + a.Shutdown() + } +} diff --git a/app/apptestlib.go b/app/apptestlib.go index 29139ac39..9c26e0bbb 100644 --- a/app/apptestlib.go +++ b/app/apptestlib.go @@ -7,6 +7,9 @@ import ( "time" "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store" + "github.com/mattermost/mattermost-server/store/sqlstore" + "github.com/mattermost/mattermost-server/store/storetest" "github.com/mattermost/mattermost-server/utils" l4g "github.com/alecthomas/log4go" @@ -21,27 +24,58 @@ type TestHelper struct { BasicPost *model.Post } -func setupTestHelper(enterprise bool) *TestHelper { - th := &TestHelper{ - App: New(), +type persistentTestStore struct { + store.Store +} + +func (*persistentTestStore) Close() {} + +var testStoreContainer *storetest.RunningContainer +var testStore *persistentTestStore + +// UseTestStore sets the container and corresponding settings to use for tests. Once the tests are +// complete (e.g. at the end of your TestMain implementation), you should call StopTestStore. +func UseTestStore(container *storetest.RunningContainer, settings *model.SqlSettings) { + testStoreContainer = container + testStore = &persistentTestStore{store.NewLayeredStore(sqlstore.NewSqlSupplier(*settings, nil), nil, nil)} +} + +func StopTestStore() { + if testStoreContainer != nil { + testStoreContainer.Stop() + testStoreContainer = nil } +} - if th.App.Srv == nil { +func setupTestHelper(enterprise bool) *TestHelper { + if utils.T == nil { utils.TranslationsPreInit() - utils.LoadConfig("config.json") - utils.InitTranslations(utils.Cfg.LocalizationSettings) - *utils.Cfg.TeamSettings.MaxUsersPerTeam = 50 - *utils.Cfg.RateLimitSettings.Enable = false - utils.DisableDebugLogForTest() - th.App.NewServer() - th.App.InitStores() - th.App.StartServer() - utils.InitHTML() - utils.EnableDebugLogForTest() - th.App.Srv.Store.MarkSystemRanUnitTests() - - *utils.Cfg.TeamSettings.EnableOpenServer = true } + utils.LoadConfig("config.json") + utils.InitTranslations(utils.Cfg.LocalizationSettings) + + var options []Option + if testStore != nil { + options = append(options, StoreOverride(testStore)) + options = append(options, ConfigOverride(func(cfg *model.Config) { + cfg.ServiceSettings.ListenAddress = new(string) + *cfg.ServiceSettings.ListenAddress = ":0" + })) + } + + th := &TestHelper{ + App: New(options...), + } + + *utils.Cfg.TeamSettings.MaxUsersPerTeam = 50 + *utils.Cfg.RateLimitSettings.Enable = false + utils.DisableDebugLogForTest() + th.App.StartServer() + utils.InitHTML() + utils.EnableDebugLogForTest() + th.App.Srv.Store.MarkSystemRanUnitTests() + + *utils.Cfg.TeamSettings.EnableOpenServer = true utils.SetIsLicensed(enterprise) if enterprise { @@ -191,4 +225,8 @@ func (me *TestHelper) LinkUserToTeam(user *model.User, team *model.Team) { func (me *TestHelper) TearDown() { me.App.Shutdown() + if err := recover(); err != nil { + StopTestStore() + panic(err) + } } diff --git a/app/diagnostics.go b/app/diagnostics.go index 9e5742111..5f5ef35b2 100644 --- a/app/diagnostics.go +++ b/app/diagnostics.go @@ -204,6 +204,7 @@ func trackConfig() { "session_length_mobile_in_days": *utils.Cfg.ServiceSettings.SessionLengthMobileInDays, "session_length_sso_in_days": *utils.Cfg.ServiceSettings.SessionLengthSSOInDays, "session_cache_in_minutes": *utils.Cfg.ServiceSettings.SessionCacheInMinutes, + "session_idle_timeout_in_minutes": *utils.Cfg.ServiceSettings.SessionIdleTimeoutInMinutes, "isdefault_site_url": isDefault(*utils.Cfg.ServiceSettings.SiteURL, model.SERVICE_SETTINGS_DEFAULT_SITE_URL), "isdefault_tls_cert_file": isDefault(*utils.Cfg.ServiceSettings.TLSCertFile, model.SERVICE_SETTINGS_DEFAULT_TLS_CERT_FILE), "isdefault_tls_key_file": isDefault(*utils.Cfg.ServiceSettings.TLSKeyFile, model.SERVICE_SETTINGS_DEFAULT_TLS_KEY_FILE), @@ -442,6 +443,7 @@ func trackConfig() { "sniff": *utils.Cfg.ElasticsearchSettings.Sniff, "post_index_replicas": *utils.Cfg.ElasticsearchSettings.PostIndexReplicas, "post_index_shards": *utils.Cfg.ElasticsearchSettings.PostIndexShards, + "isdefault_index_prefix": isDefault(*utils.Cfg.ElasticsearchSettings.IndexPrefix, model.ELASTICSEARCH_SETTINGS_DEFAULT_INDEX_PREFIX), }) SendDiagnostic(TRACK_CONFIG_PLUGIN, map[string]interface{}{ diff --git a/app/import.go b/app/import.go index 6a309ad3e..f7f9cf144 100644 --- a/app/import.go +++ b/app/import.go @@ -644,28 +644,30 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError { } } + var err *model.AppError + var savedUser *model.User if user.Id == "" { - if _, err := a.createUser(user); err != nil { + if savedUser, err = a.createUser(user); err != nil { return err } } else { if hasUserChanged { - if _, err := a.UpdateUser(user, false); err != nil { + if savedUser, err = a.UpdateUser(user, false); err != nil { return err } } if hasUserRolesChanged { - if _, err := a.UpdateUserRoles(user.Id, roles); err != nil { + if savedUser, err = a.UpdateUserRoles(user.Id, roles); err != nil { return err } } if hasNotifyPropsChanged { - if _, err := a.UpdateUserNotifyProps(user.Id, user.NotifyProps); err != nil { + if savedUser, err = a.UpdateUserNotifyProps(user.Id, user.NotifyProps); err != nil { return err } } if len(password) > 0 { - if err := a.UpdatePassword(user, password); err != nil { + if err = a.UpdatePassword(user, password); err != nil { return err } } else { @@ -684,12 +686,16 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError { } } + if savedUser == nil { + savedUser = user + } + // Preferences. var preferences model.Preferences if data.Theme != nil { preferences = append(preferences, model.Preference{ - UserId: user.Id, + UserId: savedUser.Id, Category: model.PREFERENCE_CATEGORY_THEME, Name: "", Value: *data.Theme, @@ -698,7 +704,7 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError { if data.UseMilitaryTime != nil { preferences = append(preferences, model.Preference{ - UserId: user.Id, + UserId: savedUser.Id, Category: model.PREFERENCE_CATEGORY_DISPLAY_SETTINGS, Name: "use_military_time", Value: *data.UseMilitaryTime, @@ -707,7 +713,7 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError { if data.CollapsePreviews != nil { preferences = append(preferences, model.Preference{ - UserId: user.Id, + UserId: savedUser.Id, Category: model.PREFERENCE_CATEGORY_DISPLAY_SETTINGS, Name: "collapse_previews", Value: *data.CollapsePreviews, @@ -716,7 +722,7 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError { if data.MessageDisplay != nil { preferences = append(preferences, model.Preference{ - UserId: user.Id, + UserId: savedUser.Id, Category: model.PREFERENCE_CATEGORY_DISPLAY_SETTINGS, Name: "message_display", Value: *data.MessageDisplay, @@ -725,7 +731,7 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError { if data.ChannelDisplayMode != nil { preferences = append(preferences, model.Preference{ - UserId: user.Id, + UserId: savedUser.Id, Category: model.PREFERENCE_CATEGORY_DISPLAY_SETTINGS, Name: "channel_display_mode", Value: *data.ChannelDisplayMode, @@ -734,9 +740,9 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError { if data.TutorialStep != nil { preferences = append(preferences, model.Preference{ - UserId: user.Id, + UserId: savedUser.Id, Category: model.PREFERENCE_CATEGORY_TUTORIAL_STEPS, - Name: user.Id, + Name: savedUser.Id, Value: *data.TutorialStep, }) } @@ -747,19 +753,14 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError { } } - return a.ImportUserTeams(*data.Username, data.Teams) + return a.ImportUserTeams(savedUser, data.Teams) } -func (a *App) ImportUserTeams(username string, data *[]UserTeamImportData) *model.AppError { +func (a *App) ImportUserTeams(user *model.User, data *[]UserTeamImportData) *model.AppError { if data == nil { return nil } - user, err := a.GetUserByUsername(username) - if err != nil { - return err - } - for _, tdata := range *data { team, err := a.GetTeamByName(*tdata.Name) if err != nil { diff --git a/app/notification.go b/app/notification.go index 3df4a789f..2a8f9ff2e 100644 --- a/app/notification.go +++ b/app/notification.go @@ -7,7 +7,6 @@ import ( "fmt" "html" "html/template" - "io/ioutil" "net/http" "net/url" "path/filepath" @@ -701,8 +700,7 @@ func (a *App) sendToPushProxy(msg model.PushNotification, session *model.Session } else { pushResponse := model.PushResponseFromJson(resp.Body) if resp.Body != nil { - ioutil.ReadAll(resp.Body) - resp.Body.Close() + consumeAndClose(resp) } if pushResponse[model.PUSH_STATUS] == model.PUSH_STATUS_REMOVE { diff --git a/app/oauth.go b/app/oauth.go index 6e411138b..5a02f6238 100644 --- a/app/oauth.go +++ b/app/oauth.go @@ -7,7 +7,6 @@ import ( "bytes" b64 "encoding/base64" "io" - "io/ioutil" "net/http" "net/url" "strings" @@ -428,10 +427,7 @@ func (a *App) RevokeAccessToken(token string) *model.AppError { } func (a *App) CompleteOAuth(service string, body io.ReadCloser, teamId string, props map[string]string) (*model.User, *model.AppError) { - defer func() { - ioutil.ReadAll(body) - body.Close() - }() + defer body.Close() action := props["action"] @@ -688,11 +684,9 @@ func (a *App) AuthorizeOAuthUser(w http.ResponseWriter, r *http.Request, service if resp, err := utils.HttpClient(true).Do(req); err != nil { return nil, "", stateProps, model.NewAppError("AuthorizeOAuthUser", "api.user.authorize_oauth_user.token_failed.app_error", nil, err.Error(), http.StatusInternalServerError) } else { - bodyBytes, _ = ioutil.ReadAll(resp.Body) - resp.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes)) - ar = model.AccessResponseFromJson(resp.Body) - defer CloseBody(resp) + consumeAndClose(resp) + if ar == nil { return nil, "", stateProps, model.NewAppError("AuthorizeOAuthUser", "api.user.authorize_oauth_user.bad_response.app_error", nil, "response_body="+string(bodyBytes), http.StatusInternalServerError) } diff --git a/app/options.go b/app/options.go new file mode 100644 index 000000000..121bbbf80 --- /dev/null +++ b/app/options.go @@ -0,0 +1,59 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package app + +import ( + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store" +) + +type Option func(a *App) + +// By default, the app will use a global configuration file. This allows you to override all or part +// of that configuration. +// +// The override parameter must be a *model.Config, func(*model.Config), or func(*model.Config) *model.Config. +// +// XXX: Most code will not respect this at the moment. (We need to eliminate utils.Cfg first.) +func ConfigOverride(override interface{}) Option { + return func(a *App) { + switch o := override.(type) { + case *model.Config: + a.configOverride = func(*model.Config) *model.Config { + return o + } + case func(*model.Config): + a.configOverride = func(cfg *model.Config) *model.Config { + ret := *cfg + o(&ret) + return &ret + } + case func(*model.Config) *model.Config: + a.configOverride = o + default: + panic("invalid ConfigOverride") + } + } +} + +// By default, the app will use the store specified by the configuration. This allows you to +// construct an app with a different store. +// +// The override parameter must be either a store.Store or func(App) store.Store. +func StoreOverride(override interface{}) Option { + return func(a *App) { + switch o := override.(type) { + case store.Store: + a.newStore = func() store.Store { + return o + } + case func(*App) store.Store: + a.newStore = func() store.Store { + return o(a) + } + default: + panic("invalid StoreOverride") + } + } +} diff --git a/app/plugins.go b/app/plugins.go index 2c87cee19..9826674f1 100644 --- a/app/plugins.go +++ b/app/plugins.go @@ -394,12 +394,14 @@ func (a *App) RemovePlugin(id string) *model.AppError { } func (a *App) InitPlugins(pluginPath, webappPath string) { - a.InitBuiltInPlugins() - if !utils.IsLicensed() || !*utils.License().Features.FutureFeatures || !*utils.Cfg.PluginSettings.Enable { return } + if a.PluginEnv != nil { + return + } + l4g.Info("Starting up plugins") err := os.Mkdir(pluginPath, 0744) @@ -485,9 +487,13 @@ func (a *App) ShutDownPlugins() { if a.PluginEnv == nil { return } + + l4g.Info("Shutting down plugins") + for _, err := range a.PluginEnv.Shutdown() { l4g.Error(err.Error()) } utils.RemoveConfigListener(a.PluginConfigListenerId) a.PluginConfigListenerId = "" + a.PluginEnv = nil } diff --git a/app/post.go b/app/post.go index fa929b844..da5661ae2 100644 --- a/app/post.go +++ b/app/post.go @@ -362,9 +362,6 @@ func (a *App) PatchPost(postId string, patch *model.PostPatch) (*model.Post, *mo return nil, err } - a.sendUpdatedPostEvent(updatedPost) - a.InvalidateCacheForChannelPosts(updatedPost.ChannelId) - return updatedPost, nil } @@ -620,6 +617,10 @@ func (a *App) SearchPostsInTeam(terms string, userId string, teamId string, isOr return postList, nil } else { + if !*utils.Cfg.ServiceSettings.EnablePostSearch { + return nil, model.NewAppError("SearchPostsInTeam", "store.sql_post.search.disabled", nil, fmt.Sprintf("teamId=%v userId=%v", teamId, userId), http.StatusNotImplemented) + } + channels := []store.StoreChannel{} for _, params := range paramsList { @@ -682,7 +683,7 @@ func GetOpenGraphMetadata(url string) *opengraph.OpenGraph { l4g.Error("GetOpenGraphMetadata request failed for url=%v with err=%v", url, err.Error()) return og } - defer CloseBody(res) + defer consumeAndClose(res) if err := og.ProcessHTML(res.Body); err != nil { l4g.Error("GetOpenGraphMetadata processing failed for url=%v with err=%v", url, err.Error()) @@ -718,7 +719,7 @@ func (a *App) DoPostAction(postId string, actionId string, userId string) *model if err != nil { return model.NewAppError("DoPostAction", "api.post.do_action.action_integration.app_error", nil, "err="+err.Error(), http.StatusBadRequest) } - defer resp.Body.Close() + defer consumeAndClose(resp) if resp.StatusCode != http.StatusOK { return model.NewAppError("DoPostAction", "api.post.do_action.action_integration.app_error", nil, fmt.Sprintf("status=%v", resp.StatusCode), http.StatusBadRequest) diff --git a/app/security_update_check.go b/app/security_update_check.go index 773556f5e..32d1f4d31 100644 --- a/app/security_update_check.go +++ b/app/security_update_check.go @@ -80,8 +80,7 @@ func (a *App) DoSecurityUpdateCheck() { } bulletins := model.SecurityBulletinsFromJson(res.Body) - ioutil.ReadAll(res.Body) - res.Body.Close() + consumeAndClose(res) for _, bulletin := range bulletins { if bulletin.AppliesToVersion == model.CurrentVersion { diff --git a/app/server.go b/app/server.go index 5f955dd65..d686c1f24 100644 --- a/app/server.go +++ b/app/server.go @@ -4,7 +4,10 @@ package app import ( + "context" "crypto/tls" + "io" + "io/ioutil" "net" "net/http" "strings" @@ -14,13 +17,11 @@ import ( "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/rsc/letsencrypt" - "github.com/tylerb/graceful" "gopkg.in/throttled/throttled.v2" "gopkg.in/throttled/throttled.v2/store/memstore" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/store" - "github.com/mattermost/mattermost-server/store/sqlstore" "github.com/mattermost/mattermost-server/utils" ) @@ -28,7 +29,10 @@ type Server struct { Store store.Store WebSocketRouter *WebSocketRouter Router *mux.Router - GracefulServer *graceful.Server + Server *http.Server + ListenAddr *net.TCPAddr + + didFinishListen chan struct{} } var allowedMethods []string = []string{ @@ -78,16 +82,6 @@ func (cw *CorsWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request) { const TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN = time.Second -func (a *App) NewServer() { - l4g.Info(utils.T("api.server.new_server.init.info")) - - a.Srv = &Server{} -} - -func (a *App) InitStores() { - a.Srv.Store = store.NewLayeredStore(sqlstore.NewSqlSupplier(a.Metrics), a.Metrics, a.Cluster) -} - type VaryBy struct{} func (m *VaryBy) Key(r *http.Request) string { @@ -161,30 +155,45 @@ func (a *App) StartServer() { handler = httpRateLimiter.RateLimit(handler) } - a.Srv.GracefulServer = &graceful.Server{ - Timeout: TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN, - Server: &http.Server{ - Addr: *utils.Cfg.ServiceSettings.ListenAddress, - Handler: handlers.RecoveryHandler(handlers.RecoveryLogger(&RecoveryLogger{}), handlers.PrintRecoveryStack(true))(handler), - ReadTimeout: time.Duration(*utils.Cfg.ServiceSettings.ReadTimeout) * time.Second, - WriteTimeout: time.Duration(*utils.Cfg.ServiceSettings.WriteTimeout) * time.Second, - }, + a.Srv.Server = &http.Server{ + Handler: handlers.RecoveryHandler(handlers.RecoveryLogger(&RecoveryLogger{}), handlers.PrintRecoveryStack(true))(handler), + ReadTimeout: time.Duration(*utils.Cfg.ServiceSettings.ReadTimeout) * time.Second, + WriteTimeout: time.Duration(*utils.Cfg.ServiceSettings.WriteTimeout) * time.Second, + } + + addr := *a.Config().ServiceSettings.ListenAddress + if addr == "" { + if *utils.Cfg.ServiceSettings.ConnectionSecurity == model.CONN_SECURITY_TLS { + addr = ":https" + } else { + addr = ":http" + } + } + + listener, err := net.Listen("tcp", addr) + if err != nil { + l4g.Critical(utils.T("api.server.start_server.starting.critical"), err) + return } - l4g.Info(utils.T("api.server.start_server.listening.info"), *utils.Cfg.ServiceSettings.ListenAddress) + a.Srv.ListenAddr = listener.Addr().(*net.TCPAddr) + + l4g.Info(utils.T("api.server.start_server.listening.info"), listener.Addr().String()) if *utils.Cfg.ServiceSettings.Forward80To443 { go func() { - listener, err := net.Listen("tcp", ":80") + redirectListener, err := net.Listen("tcp", ":80") if err != nil { + listener.Close() l4g.Error("Unable to setup forwarding") return } - defer listener.Close() + defer redirectListener.Close() - http.Serve(listener, http.HandlerFunc(redirectHTTPToHTTPS)) + http.Serve(redirectListener, http.HandlerFunc(redirectHTTPToHTTPS)) }() } + a.Srv.didFinishListen = make(chan struct{}) go func() { var err error if *utils.Cfg.ServiceSettings.ConnectionSecurity == model.CONN_SECURITY_TLS { @@ -198,16 +207,73 @@ func (a *App) StartServer() { tlsConfig.NextProtos = append(tlsConfig.NextProtos, "h2") - err = a.Srv.GracefulServer.ListenAndServeTLSConfig(tlsConfig) + a.Srv.Server.TLSConfig = tlsConfig + err = a.Srv.Server.ServeTLS(listener, "", "") } else { - err = a.Srv.GracefulServer.ListenAndServeTLS(*utils.Cfg.ServiceSettings.TLSCertFile, *utils.Cfg.ServiceSettings.TLSKeyFile) + err = a.Srv.Server.ServeTLS(listener, *utils.Cfg.ServiceSettings.TLSCertFile, *utils.Cfg.ServiceSettings.TLSKeyFile) } } else { - err = a.Srv.GracefulServer.ListenAndServe() + err = a.Srv.Server.Serve(listener) } - if err != nil { + if err != nil && err != http.ErrServerClosed { l4g.Critical(utils.T("api.server.start_server.starting.critical"), err) time.Sleep(time.Second) } + close(a.Srv.didFinishListen) }() } + +type tcpKeepAliveListener struct { + *net.TCPListener +} + +func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { + tc, err := ln.AcceptTCP() + if err != nil { + return + } + tc.SetKeepAlive(true) + tc.SetKeepAlivePeriod(3 * time.Minute) + return tc, nil +} + +func (a *App) Listen(addr string) (net.Listener, error) { + if addr == "" { + addr = ":http" + } + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + return tcpKeepAliveListener{ln.(*net.TCPListener)}, nil +} + +func (a *App) StopServer() { + if a.Srv.Server != nil { + ctx, cancel := context.WithTimeout(context.Background(), TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN) + defer cancel() + didShutdown := false + for a.Srv.didFinishListen != nil && !didShutdown { + if err := a.Srv.Server.Shutdown(ctx); err != nil { + l4g.Warn(err.Error()) + } + timer := time.NewTimer(time.Millisecond * 50) + select { + case <-a.Srv.didFinishListen: + didShutdown = true + case <-timer.C: + } + timer.Stop() + } + a.Srv.Server.Close() + a.Srv.Server = nil + } +} + +// This is required to re-use the underlying connection and not take up file descriptors +func consumeAndClose(r *http.Response) { + if r.Body != nil { + io.Copy(ioutil.Discard, r.Body) + r.Body.Close() + } +} diff --git a/app/user.go b/app/user.go index b98583f80..edb4961fc 100644 --- a/app/user.go +++ b/app/user.go @@ -438,7 +438,7 @@ func (a *App) GetUsersPage(page int, perPage int, asAdmin bool) ([]*model.User, } func (a *App) GetUsersEtag() string { - return (<-a.Srv.Store.User().GetEtagForAllProfiles()).Data.(string) + return fmt.Sprintf("%v.%v.%v", (<-a.Srv.Store.User().GetEtagForAllProfiles()).Data.(string), utils.Cfg.PrivacySettings.ShowFullName, utils.Cfg.PrivacySettings.ShowEmailAddress) } func (a *App) GetUsersInTeam(teamId string, offset int, limit int) ([]*model.User, *model.AppError) { @@ -492,11 +492,11 @@ func (a *App) GetUsersNotInTeamPage(teamId string, page int, perPage int, asAdmi } func (a *App) GetUsersInTeamEtag(teamId string) string { - return (<-a.Srv.Store.User().GetEtagForProfiles(teamId)).Data.(string) + return fmt.Sprintf("%v.%v.%v", (<-a.Srv.Store.User().GetEtagForProfiles(teamId)).Data.(string), utils.Cfg.PrivacySettings.ShowFullName, utils.Cfg.PrivacySettings.ShowEmailAddress) } func (a *App) GetUsersNotInTeamEtag(teamId string) string { - return (<-a.Srv.Store.User().GetEtagForProfilesNotInTeam(teamId)).Data.(string) + return fmt.Sprintf("%v.%v.%v", (<-a.Srv.Store.User().GetEtagForProfilesNotInTeam(teamId)).Data.(string), utils.Cfg.PrivacySettings.ShowFullName, utils.Cfg.PrivacySettings.ShowEmailAddress) } func (a *App) GetUsersInChannel(channelId string, offset int, limit int) ([]*model.User, *model.AppError) { diff --git a/app/web_conn.go b/app/web_conn.go index 92b54723a..1c74e65a5 100644 --- a/app/web_conn.go +++ b/app/web_conn.go @@ -59,7 +59,7 @@ func (a *App) NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.Tra UserId: session.UserId, T: t, Locale: locale, - endWritePump: make(chan struct{}, 1), + endWritePump: make(chan struct{}, 2), pumpFinished: make(chan struct{}, 1), } @@ -111,13 +111,14 @@ func (c *WebConn) Pump() { ch <- struct{}{} }() c.readPump() + c.endWritePump <- struct{}{} <-ch + c.App.HubUnregister(c) c.pumpFinished <- struct{}{} } func (c *WebConn) readPump() { defer func() { - c.App.HubUnregister(c) c.WebSocket.Close() }() c.WebSocket.SetReadLimit(model.SOCKET_MAX_MESSAGE_SIZE_KB) diff --git a/app/webhook.go b/app/webhook.go index 1530ba94a..231fe1529 100644 --- a/app/webhook.go +++ b/app/webhook.go @@ -109,7 +109,7 @@ func (a *App) TriggerWebhook(payload *model.OutgoingWebhookPayload, hook *model. if resp, err := utils.HttpClient(false).Do(req); err != nil { l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.event_post.error"), err.Error()) } else { - defer CloseBody(resp) + defer consumeAndClose(resp) webhookResp := model.OutgoingWebhookResponseFromJson(resp.Body) if webhookResp != nil && webhookResp.Text != nil { diff --git a/app/webrtc.go b/app/webrtc.go index d2bfffbe0..d8684f1fd 100644 --- a/app/webrtc.go +++ b/app/webrtc.go @@ -23,8 +23,9 @@ func GetWebrtcInfoForSession(sessionId string) (*model.WebrtcInfoResponse, *mode } result := &model.WebrtcInfoResponse{ - Token: token, - GatewayUrl: *utils.Cfg.WebrtcSettings.GatewayWebsocketUrl, + Token: token, + GatewayUrl: *utils.Cfg.WebrtcSettings.GatewayWebsocketUrl, + GatewayType: *utils.Cfg.WebrtcSettings.GatewayType, } if len(*utils.Cfg.WebrtcSettings.StunURI) > 0 { @@ -48,6 +49,16 @@ func GetWebrtcToken(sessionId string) (string, *model.AppError) { return "", model.NewAppError("WebRTC.getWebrtcToken", "api.webrtc.disabled.app_error", nil, "", http.StatusNotImplemented) } + switch strings.ToLower(*utils.Cfg.WebrtcSettings.GatewayType) { + case "kopano-webmeetings": + return GetKopanoWebmeetingsWebrtcToken(sessionId) + default: + // Default to Janus. + return GetJanusWebrtcToken(sessionId) + } +} + +func GetJanusWebrtcToken(sessionId string) (string, *model.AppError) { token := base64.StdEncoding.EncodeToString([]byte(sessionId)) data := make(map[string]string) @@ -62,10 +73,10 @@ func GetWebrtcToken(sessionId string) (string, *model.AppError) { if rp, err := utils.HttpClient(true).Do(rq); err != nil { return "", model.NewAppError("WebRTC.Token", "model.client.connecting.app_error", nil, err.Error(), http.StatusInternalServerError) } else if rp.StatusCode >= 300 { - defer CloseBody(rp) + defer consumeAndClose(rp) return "", model.AppErrorFromJson(rp.Body) } else { - janusResponse := model.GatewayResponseFromJson(rp.Body) + janusResponse := model.JanusGatewayResponseFromJson(rp.Body) if janusResponse.Status != "success" { return "", model.NewAppError("getWebrtcToken", "api.webrtc.register_token.app_error", nil, "", http.StatusInternalServerError) } @@ -74,6 +85,29 @@ func GetWebrtcToken(sessionId string) (string, *model.AppError) { return token, nil } +func GetKopanoWebmeetingsWebrtcToken(sessionId string) (string, *model.AppError) { + data := make(map[string]string) + data["type"] = "Token" + data["id"] = sessionId + + rq, _ := http.NewRequest("POST", *utils.Cfg.WebrtcSettings.GatewayAdminUrl+"/auth/tokens", strings.NewReader(model.MapToJson(data))) + rq.Header.Set("Content-Type", "application/json") + rq.Header.Set("Authorization", "Bearer "+*utils.Cfg.WebrtcSettings.GatewayAdminSecret) + + if rp, err := utils.HttpClient(true).Do(rq); err != nil { + return "", model.NewAppError("WebRTC.Token", "model.client.connecting.app_error", nil, err.Error(), http.StatusInternalServerError) + } else if rp.StatusCode >= 300 { + defer consumeAndClose(rp) + return "", model.AppErrorFromJson(rp.Body) + } else { + kwmResponse := model.KopanoWebmeetingsResponseFromJson(rp.Body) + if kwmResponse.Value == "" { + return "", model.NewAppError("getWebrtcToken", "api.webrtc.register_token.app_error", nil, "", http.StatusInternalServerError) + } + return kwmResponse.Value, nil + } +} + func GenerateTurnPassword(username string, secret string) string { key := []byte(secret) h := hmac.New(sha1.New, key) diff --git a/app/websocket_router.go b/app/websocket_router.go index cad53ade7..6bc3a6ff7 100644 --- a/app/websocket_router.go +++ b/app/websocket_router.go @@ -21,13 +21,6 @@ type WebSocketRouter struct { handlers map[string]webSocketHandler } -func (a *App) NewWebSocketRouter() *WebSocketRouter { - return &WebSocketRouter{ - app: a, - handlers: make(map[string]webSocketHandler), - } -} - func (wr *WebSocketRouter) Handle(action string, handler webSocketHandler) { wr.handlers[action] = handler } |