From 1e5c432e1029601a664454388ae366ef69618d62 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Mon, 25 Jun 2018 12:33:13 -0700 Subject: MM-10702 Moving plugins to use hashicorp go-plugin. (#8978) * Moving plugins to use hashicorp go-plugin. * Tweaks from feedback. --- plugin/client_rpc.go | 331 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 331 insertions(+) create mode 100644 plugin/client_rpc.go (limited to 'plugin/client_rpc.go') diff --git a/plugin/client_rpc.go b/plugin/client_rpc.go new file mode 100644 index 000000000..159d41201 --- /dev/null +++ b/plugin/client_rpc.go @@ -0,0 +1,331 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +//go:generate go run interface_generator/main.go + +package plugin + +import ( + "bytes" + "encoding/gob" + "encoding/json" + "io/ioutil" + "net/http" + "net/rpc" + "reflect" + + "github.com/hashicorp/go-plugin" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" +) + +var HookNameToId map[string]int = make(map[string]int) + +type HooksRPCClient struct { + client *rpc.Client + log *mlog.Logger + muxBroker *plugin.MuxBroker + apiImpl API + implemented [TotalHooksId]bool +} + +type HooksRPCServer struct { + impl interface{} + muxBroker *plugin.MuxBroker + apiRPCClient *APIRPCClient + log *mlog.Logger +} + +// Implements hashicorp/go-plugin/plugin.Plugin interface to connect the hooks of a plugin +type HooksPlugin struct { + hooks interface{} + apiImpl API + log *mlog.Logger +} + +func (p *HooksPlugin) Server(b *plugin.MuxBroker) (interface{}, error) { + return &HooksRPCServer{impl: p.hooks, muxBroker: b}, nil +} + +func (p *HooksPlugin) Client(b *plugin.MuxBroker, client *rpc.Client) (interface{}, error) { + return &HooksRPCClient{client: client, log: p.log, muxBroker: b, apiImpl: p.apiImpl}, nil +} + +type APIRPCClient struct { + client *rpc.Client + log *mlog.Logger +} + +type APIRPCServer struct { + impl API +} + +// Registering some types used by MM for encoding/gob used by rpc +func init() { + gob.Register([]*model.SlackAttachment{}) + gob.Register([]interface{}{}) + gob.Register(map[string]interface{}{}) +} + +// These enforce compile time checks to make sure types implement the interface +// If you are getting an error here, you probably need to run `make pluginapi` to +// autogenerate RPC glue code +var _ plugin.Plugin = &HooksPlugin{} +var _ Hooks = &HooksRPCClient{} + +// +// Below are specal cases for hooks or APIs that can not be auto generated +// + +func (g *HooksRPCClient) Implemented() (impl []string, err error) { + err = g.client.Call("Plugin.Implemented", struct{}{}, &impl) + for _, hookName := range impl { + if hookId, ok := HookNameToId[hookName]; ok { + g.implemented[hookId] = true + } + } + return +} + +// Implemented replies with the names of the hooks that are implemented. +func (s *HooksRPCServer) Implemented(args struct{}, reply *[]string) error { + ifaceType := reflect.TypeOf((*Hooks)(nil)).Elem() + implType := reflect.TypeOf(s.impl) + selfType := reflect.TypeOf(s) + var methods []string + for i := 0; i < ifaceType.NumMethod(); i++ { + method := ifaceType.Method(i) + if m, ok := implType.MethodByName(method.Name); !ok { + continue + } else if m.Type.NumIn() != method.Type.NumIn()+1 { + continue + } else if m.Type.NumOut() != method.Type.NumOut() { + continue + } else { + match := true + for j := 0; j < method.Type.NumIn(); j++ { + if m.Type.In(j+1) != method.Type.In(j) { + match = false + break + } + } + for j := 0; j < method.Type.NumOut(); j++ { + if m.Type.Out(j) != method.Type.Out(j) { + match = false + break + } + } + if !match { + continue + } + } + if _, ok := selfType.MethodByName(method.Name); !ok { + continue + } + methods = append(methods, method.Name) + } + *reply = methods + return nil +} + +type OnActivateArgs struct { + APIMuxId uint32 +} + +type OnActivateReturns struct { + A error +} + +func (g *HooksRPCClient) OnActivate() error { + muxId := g.muxBroker.NextId() + go g.muxBroker.AcceptAndServe(muxId, &APIRPCServer{ + impl: g.apiImpl, + }) + + _args := &OnActivateArgs{ + APIMuxId: muxId, + } + _returns := &OnActivateReturns{} + + if err := g.client.Call("Plugin.OnActivate", _args, _returns); err != nil { + g.log.Error("RPC call to OnActivate plugin failed.", mlog.Err(err)) + } + return _returns.A +} + +func (s *HooksRPCServer) OnActivate(args *OnActivateArgs, returns *OnActivateReturns) error { + connection, err := s.muxBroker.Dial(args.APIMuxId) + if err != nil { + return err // Where does this go? + } + + // Settings for this should come from the parent process, for now just set it up + // though stdout. + logger := mlog.NewLogger(&mlog.LoggerConfiguration{ + EnableConsole: true, + ConsoleJson: true, + ConsoleLevel: mlog.LevelDebug, + EnableFile: false, + }) + logger = logger.With(mlog.Bool("plugin_subprocess", true)) + + s.log = logger + + s.apiRPCClient = &APIRPCClient{ + client: rpc.NewClient(connection), + log: logger, + } + + if mmplugin, ok := s.impl.(interface { + SetAPI(api API) + OnConfigurationChange() error + }); !ok { + } else { + mmplugin.SetAPI(s.apiRPCClient) + mmplugin.OnConfigurationChange() + } + + if hook, ok := s.impl.(interface { + OnActivate() error + }); ok { + returns.A = hook.OnActivate() + } + return nil +} + +type LoadPluginConfigurationArgs struct { +} + +type LoadPluginConfigurationReturns struct { + A []byte +} + +func (g *APIRPCClient) LoadPluginConfiguration(dest interface{}) error { + _args := &LoadPluginConfigurationArgs{} + _returns := &LoadPluginConfigurationReturns{} + if err := g.client.Call("Plugin.LoadPluginConfiguration", _args, _returns); err != nil { + g.log.Error("RPC call to LoadPluginConfiguration API failed.", mlog.Err(err)) + } + return json.Unmarshal(_returns.A, dest) +} + +func (s *APIRPCServer) LoadPluginConfiguration(args *LoadPluginConfigurationArgs, returns *LoadPluginConfigurationReturns) error { + var config interface{} + if hook, ok := s.impl.(interface { + LoadPluginConfiguration(dest interface{}) error + }); ok { + if err := hook.LoadPluginConfiguration(&config); err != nil { + return err + } + } + b, err := json.Marshal(config) + if err != nil { + return err + } + returns.A = b + return nil +} + +func init() { + HookNameToId["ServeHTTP"] = ServeHTTPId +} + +type ServeHTTPArgs struct { + ResponseWriterStream uint32 + Request *http.Request + RequestBodyStream uint32 +} + +func (g *HooksRPCClient) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !g.implemented[ServeHTTPId] { + http.NotFound(w, r) + return + } + + serveHTTPStreamId := g.muxBroker.NextId() + go func() { + connection, err := g.muxBroker.Accept(serveHTTPStreamId) + if err != nil { + g.log.Error("Plugin failed to ServeHTTP, muxBroker couldn't accept connection", mlog.Uint32("serve_http_stream_id", serveHTTPStreamId), mlog.Err(err)) + http.Error(w, "500 internal server error", http.StatusInternalServerError) + return + } + defer connection.Close() + + rpcServer := rpc.NewServer() + if err := rpcServer.RegisterName("Plugin", &HTTPResponseWriterRPCServer{w: w}); err != nil { + g.log.Error("Plugin failed to ServeHTTP, coulden't register RPC name", mlog.Err(err)) + http.Error(w, "500 internal server error", http.StatusInternalServerError) + return + } + rpcServer.ServeConn(connection) + }() + + requestBodyStreamId := uint32(0) + if r.Body != nil { + requestBodyStreamId = g.muxBroker.NextId() + go func() { + bodyConnection, err := g.muxBroker.Accept(requestBodyStreamId) + if err != nil { + g.log.Error("Plugin failed to ServeHTTP, muxBroker couldn't Accept request body connecion", mlog.Err(err)) + http.Error(w, "500 internal server error", http.StatusInternalServerError) + return + } + defer bodyConnection.Close() + ServeIOReader(r.Body, bodyConnection) + }() + } + + forwardedRequest := &http.Request{ + Method: r.Method, + URL: r.URL, + Proto: r.Proto, + ProtoMajor: r.ProtoMajor, + ProtoMinor: r.ProtoMinor, + Header: r.Header, + Host: r.Host, + RemoteAddr: r.RemoteAddr, + RequestURI: r.RequestURI, + } + + if err := g.client.Call("Plugin.ServeHTTP", ServeHTTPArgs{ + ResponseWriterStream: serveHTTPStreamId, + Request: forwardedRequest, + RequestBodyStream: requestBodyStreamId, + }, nil); err != nil { + mlog.Error("Plugin failed to ServeHTTP, RPC call failed", mlog.Err(err)) + http.Error(w, "500 internal server error", http.StatusInternalServerError) + } + return +} + +func (s *HooksRPCServer) ServeHTTP(args *ServeHTTPArgs, returns *struct{}) error { + connection, err := s.muxBroker.Dial(args.ResponseWriterStream) + if err != nil { + s.log.Debug("Can't connect to remote response writer stream", mlog.Err(err)) + return err + } + w := ConnectHTTPResponseWriter(connection) + defer w.Close() + + r := args.Request + if args.RequestBodyStream != 0 { + connection, err := s.muxBroker.Dial(args.RequestBodyStream) + if err != nil { + s.log.Debug("Can't connect to remote response writer stream", mlog.Err(err)) + return err + } + r.Body = ConnectIOReader(connection) + } else { + r.Body = ioutil.NopCloser(&bytes.Buffer{}) + } + defer r.Body.Close() + + if hook, ok := s.impl.(http.Handler); ok { + hook.ServeHTTP(w, r) + } else { + http.NotFound(w, r) + } + + return nil +} -- cgit v1.2.3-1-g7c22 From 83a3ac089cff0d05559e6ba5c2c60b09f5cae176 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Tue, 3 Jul 2018 09:58:28 -0700 Subject: MM-11029 Adding plugin logging functionality. (#9034) * Capturing stdout, stderr of plugins in logs. * Cleanup go-plugin debug logs. * Adding logging to plugin API * Generating mocks. * godoc convention --- plugin/client_rpc.go | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) (limited to 'plugin/client_rpc.go') diff --git a/plugin/client_rpc.go b/plugin/client_rpc.go index 159d41201..f58bbd22b 100644 --- a/plugin/client_rpc.go +++ b/plugin/client_rpc.go @@ -9,9 +9,12 @@ import ( "bytes" "encoding/gob" "encoding/json" + "fmt" "io/ioutil" + "log" "net/http" "net/rpc" + "os" "reflect" "github.com/hashicorp/go-plugin" @@ -33,7 +36,6 @@ type HooksRPCServer struct { impl interface{} muxBroker *plugin.MuxBroker apiRPCClient *APIRPCClient - log *mlog.Logger } // Implements hashicorp/go-plugin/plugin.Plugin interface to connect the hooks of a plugin @@ -156,24 +158,11 @@ func (g *HooksRPCClient) OnActivate() error { func (s *HooksRPCServer) OnActivate(args *OnActivateArgs, returns *OnActivateReturns) error { connection, err := s.muxBroker.Dial(args.APIMuxId) if err != nil { - return err // Where does this go? + return err } - // Settings for this should come from the parent process, for now just set it up - // though stdout. - logger := mlog.NewLogger(&mlog.LoggerConfiguration{ - EnableConsole: true, - ConsoleJson: true, - ConsoleLevel: mlog.LevelDebug, - EnableFile: false, - }) - logger = logger.With(mlog.Bool("plugin_subprocess", true)) - - s.log = logger - s.apiRPCClient = &APIRPCClient{ client: rpc.NewClient(connection), - log: logger, } if mmplugin, ok := s.impl.(interface { @@ -185,6 +174,10 @@ func (s *HooksRPCServer) OnActivate(args *OnActivateArgs, returns *OnActivateRet mmplugin.OnConfigurationChange() } + // Capture output of standard logger because go-plugin + // redirects it. + log.SetOutput(os.Stderr) + if hook, ok := s.impl.(interface { OnActivate() error }); ok { @@ -293,7 +286,7 @@ func (g *HooksRPCClient) ServeHTTP(w http.ResponseWriter, r *http.Request) { Request: forwardedRequest, RequestBodyStream: requestBodyStreamId, }, nil); err != nil { - mlog.Error("Plugin failed to ServeHTTP, RPC call failed", mlog.Err(err)) + g.log.Error("Plugin failed to ServeHTTP, RPC call failed", mlog.Err(err)) http.Error(w, "500 internal server error", http.StatusInternalServerError) } return @@ -302,7 +295,7 @@ func (g *HooksRPCClient) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *HooksRPCServer) ServeHTTP(args *ServeHTTPArgs, returns *struct{}) error { connection, err := s.muxBroker.Dial(args.ResponseWriterStream) if err != nil { - s.log.Debug("Can't connect to remote response writer stream", mlog.Err(err)) + fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote response writer stream, error: %v", err.Error()) return err } w := ConnectHTTPResponseWriter(connection) @@ -312,7 +305,7 @@ func (s *HooksRPCServer) ServeHTTP(args *ServeHTTPArgs, returns *struct{}) error if args.RequestBodyStream != 0 { connection, err := s.muxBroker.Dial(args.RequestBodyStream) if err != nil { - s.log.Debug("Can't connect to remote response writer stream", mlog.Err(err)) + fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote request body stream, error: %v", err.Error()) return err } r.Body = ConnectIOReader(connection) -- cgit v1.2.3-1-g7c22 From 4c1ddcff10b359baf5728b334acb60cc3e1b1123 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Fri, 6 Jul 2018 06:07:09 -0700 Subject: MM-10703 Adding blank request context to plugin hooks for future use. (#9043) * Adding blank request context to plugin hooks for future use. * Rename RequestContext to Context * Adding context to ServeHTTP and ExecuteCommand * Fixing import cycle in test. --- plugin/client_rpc.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'plugin/client_rpc.go') diff --git a/plugin/client_rpc.go b/plugin/client_rpc.go index f58bbd22b..39d91a3e7 100644 --- a/plugin/client_rpc.go +++ b/plugin/client_rpc.go @@ -226,10 +226,11 @@ func init() { type ServeHTTPArgs struct { ResponseWriterStream uint32 Request *http.Request + Context *Context RequestBodyStream uint32 } -func (g *HooksRPCClient) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (g *HooksRPCClient) ServeHTTP(c *Context, w http.ResponseWriter, r *http.Request) { if !g.implemented[ServeHTTPId] { http.NotFound(w, r) return @@ -282,6 +283,7 @@ func (g *HooksRPCClient) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if err := g.client.Call("Plugin.ServeHTTP", ServeHTTPArgs{ + Context: c, ResponseWriterStream: serveHTTPStreamId, Request: forwardedRequest, RequestBodyStream: requestBodyStreamId, @@ -314,8 +316,10 @@ func (s *HooksRPCServer) ServeHTTP(args *ServeHTTPArgs, returns *struct{}) error } defer r.Body.Close() - if hook, ok := s.impl.(http.Handler); ok { - hook.ServeHTTP(w, r) + if hook, ok := s.impl.(interface { + ServeHTTP(c *Context, w http.ResponseWriter, r *http.Request) + }); ok { + hook.ServeHTTP(args.Context, w, r) } else { http.NotFound(w, r) } -- cgit v1.2.3-1-g7c22