// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. // See LICENSE.txt for license information. //go:generate go run interface_generator/main.go package plugin import ( "bytes" "database/sql" "database/sql/driver" "encoding/gob" "encoding/json" "fmt" "io" "log" "net/http" "net/rpc" "net/url" "os" "reflect" "sync" "github.com/hashicorp/go-plugin" "github.com/lib/pq" "github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/public/shared/mlog" ) var hookNameToId = make(map[string]int) type hooksRPCClient struct { client *rpc.Client log *mlog.Logger muxBroker *plugin.MuxBroker apiImpl API driver Driver implemented [TotalHooksID]bool doneWg sync.WaitGroup } type hooksRPCServer struct { impl any muxBroker *plugin.MuxBroker apiRPCClient *apiRPCClient } // Implements hashicorp/go-plugin/plugin.Plugin interface to connect the hooks of a plugin type hooksPlugin struct { hooks any apiImpl API driverImpl Driver log *mlog.Logger } func (p *hooksPlugin) Server(b *plugin.MuxBroker) (any, error) { return &hooksRPCServer{impl: p.hooks, muxBroker: b}, nil } func (p *hooksPlugin) Client(b *plugin.MuxBroker, client *rpc.Client) (any, error) { return &hooksRPCClient{ client: client, log: p.log, muxBroker: b, apiImpl: p.apiImpl, driver: p.driverImpl, }, nil } type apiRPCClient struct { client *rpc.Client muxBroker *plugin.MuxBroker } type apiRPCServer struct { impl API muxBroker *plugin.MuxBroker } // ErrorString is a fallback for sending unregistered implementations of the error interface across // rpc. For example, the errorString type from the github.com/pkg/errors package cannot be // registered since it is not exported, but this precludes common error handling paradigms. // ErrorString merely preserves the string description of the error, while satisfying the error // interface itself to allow other registered types (such as model.AppError) to be sent unmodified. type ErrorString struct { Code int // Code to map to various error variables Err string } func (e ErrorString) Error() string { return e.Err } func encodableError(err error) error { if err == nil { return nil } if _, ok := err.(*model.AppError); ok { return err } if _, ok := err.(*pq.Error); ok { return err } ret := &ErrorString{ Err: err.Error(), } switch err { case io.EOF: ret.Code = 1 case sql.ErrNoRows: ret.Code = 2 case sql.ErrConnDone: ret.Code = 3 case sql.ErrTxDone: ret.Code = 4 case driver.ErrSkip: ret.Code = 5 case driver.ErrBadConn: ret.Code = 6 case driver.ErrRemoveArgument: ret.Code = 7 } return ret } func decodableError(err error) error { if encErr, ok := err.(*ErrorString); ok { switch encErr.Code { case 1: return io.EOF case 2: return sql.ErrNoRows case 3: return sql.ErrConnDone case 4: return sql.ErrTxDone case 5: return driver.ErrSkip case 6: return driver.ErrBadConn case 7: return driver.ErrRemoveArgument } } return err } // Registering some types used by MM for encoding/gob used by rpc func init() { gob.Register([]*model.SlackAttachment{}) gob.Register([]any{}) gob.Register(map[string]any{}) gob.Register(&model.AppError{}) gob.Register(&pq.Error{}) gob.Register(&ErrorString{}) gob.Register(&model.AutocompleteDynamicListArg{}) gob.Register(&model.AutocompleteStaticListArg{}) gob.Register(&model.AutocompleteTextArg{}) gob.Register(&model.PreviewPost{}) gob.Register(model.PropertyOptions[*model.PluginPropertyOption]{}) } // These enforce compile time checks to make sure types implement the interface // If you are getting an error here, you probably need to run `make pluginapi` to // autogenerate RPC glue code var ( _ plugin.Plugin = &hooksPlugin{} _ Hooks = &hooksRPCClient{} ) // // Below are special cases for hooks or APIs that can not be auto generated // func (g *hooksRPCClient) Implemented() (impl []string, err error) { err = g.client.Call("Plugin.Implemented", struct{}{}, &impl) for _, hookName := range impl { if hookId, ok := hookNameToId[hookName]; ok { g.implemented[hookId] = true } } return } // Implemented replies with the names of the hooks that are implemented. func (s *hooksRPCServer) Implemented(args struct{}, reply *[]string) error { ifaceType := reflect.TypeFor[Hooks]() implType := reflect.TypeOf(s.impl) selfType := reflect.TypeFor[*hooksRPCServer]() var methods []string for i := 0; i < ifaceType.NumMethod(); i++ { method := ifaceType.Method(i) m, ok := implType.MethodByName(method.Name) if !ok { continue } else if m.Type.NumIn() != method.Type.NumIn()+1 { continue } else if m.Type.NumOut() != method.Type.NumOut() { continue } match := true for j := 0; j < method.Type.NumIn(); j++ { if m.Type.In(j+1) != method.Type.In(j) { match = false break } } for j := 0; j < method.Type.NumOut(); j++ { if m.Type.Out(j) != method.Type.Out(j) { match = false break } } if !match { continue } if _, ok := selfType.MethodByName(method.Name); !ok { continue } methods = append(methods, method.Name) } *reply = methods return encodableError(nil) } type Z_OnActivateArgs struct { APIMuxId uint32 DriverMuxId uint32 } type Z_OnActivateReturns struct { A error } func (g *hooksRPCClient) OnActivate() error { muxId := g.muxBroker.NextId() g.doneWg.Add(1) go func() { defer g.doneWg.Done() g.muxBroker.AcceptAndServe(muxId, &apiRPCServer{ impl: g.apiImpl, muxBroker: g.muxBroker, }) }() nextID := g.muxBroker.NextId() g.doneWg.Add(1) go func() { defer g.doneWg.Done() g.muxBroker.AcceptAndServe(nextID, &dbRPCServer{ dbImpl: g.driver, }) }() _args := &Z_OnActivateArgs{ APIMuxId: muxId, DriverMuxId: nextID, } _returns := &Z_OnActivateReturns{} if err := g.client.Call("Plugin.OnActivate", _args, _returns); err != nil { g.log.Error("RPC call to OnActivate plugin failed.", mlog.Err(err)) } return _returns.A } func (s *hooksRPCServer) OnActivate(args *Z_OnActivateArgs, returns *Z_OnActivateReturns) error { connection, err := s.muxBroker.Dial(args.APIMuxId) if err != nil { return err } conn2, err := s.muxBroker.Dial(args.DriverMuxId) if err != nil { return err } s.apiRPCClient = &apiRPCClient{ client: rpc.NewClient(connection), muxBroker: s.muxBroker, } dbClient := &dbRPCClient{ client: rpc.NewClient(conn2), } if mmplugin, ok := s.impl.(interface { SetAPI(api API) SetDriver(driver Driver) }); ok { mmplugin.SetAPI(s.apiRPCClient) mmplugin.SetDriver(dbClient) } if mmplugin, ok := s.impl.(interface { OnConfigurationChange() error }); ok { if err := mmplugin.OnConfigurationChange(); err != nil { fmt.Fprintf(os.Stderr, "[ERROR] call to OnConfigurationChange failed, error: %v", err.Error()) } } // Capture output of standard logger because go-plugin // redirects it. log.SetOutput(os.Stderr) if hook, ok := s.impl.(interface { OnActivate() error }); ok { returns.A = encodableError(hook.OnActivate()) } return nil } type Z_LoadPluginConfigurationArgsArgs struct{} type Z_LoadPluginConfigurationArgsReturns struct { A []byte } func (g *apiRPCClient) LoadPluginConfiguration(dest any) error { _args := &Z_LoadPluginConfigurationArgsArgs{} _returns := &Z_LoadPluginConfigurationArgsReturns{} if err := g.client.Call("Plugin.LoadPluginConfiguration", _args, _returns); err != nil { log.Printf("RPC call to LoadPluginConfiguration API failed: %s", err.Error()) } if err := json.Unmarshal(_returns.A, dest); err != nil { log.Printf("LoadPluginConfiguration API failed to unmarshal: %s", err.Error()) } return nil } func (s *apiRPCServer) LoadPluginConfiguration(args *Z_LoadPluginConfigurationArgsArgs, returns *Z_LoadPluginConfigurationArgsReturns) error { var config any if hook, ok := s.impl.(interface { LoadPluginConfiguration(dest any) error }); ok { if err := hook.LoadPluginConfiguration(&config); err != nil { return err } } b, err := json.Marshal(config) if err != nil { return err } returns.A = b return nil } func init() { hookNameToId["ServeHTTP"] = ServeHTTPID } // Using a subset of http.Request prevents a known incompatibility when decoding Go v1.23+ gob-encoded x509.Certificate // structs from Go v1.22 compiled plugins. These come from http.Request.TLS field (*tls.ConnectionState). type HTTPRequestSubset struct { Method string URL *url.URL Proto string ProtoMajor int ProtoMinor int Header http.Header Host string RemoteAddr string RequestURI string Body io.ReadCloser } func (r *HTTPRequestSubset) GetHTTPRequest() *http.Request { return &http.Request{ Method: r.Method, URL: r.URL, Proto: r.Proto, ProtoMajor: r.ProtoMajor, ProtoMinor: r.ProtoMinor, Header: r.Header, Host: r.Host, RemoteAddr: r.RemoteAddr, RequestURI: r.RequestURI, Body: r.Body, } } type Z_ServeHTTPArgs struct { ResponseWriterStream uint32 Request *HTTPRequestSubset Context *Context RequestBodyStream uint32 } func (g *hooksRPCClient) ServeHTTP(c *Context, w http.ResponseWriter, r *http.Request) { if !g.implemented[ServeHTTPID] { http.NotFound(w, r) return } serveHTTPStreamId := g.muxBroker.NextId() go func() { connection, err := g.muxBroker.Accept(serveHTTPStreamId) if err != nil { g.log.Error("Plugin failed to ServeHTTP, muxBroker couldn't accept connection", mlog.Uint("serve_http_stream_id", serveHTTPStreamId), mlog.Err(err)) return } defer connection.Close() rpcServer := rpc.NewServer() if err := rpcServer.RegisterName("Plugin", &httpResponseWriterRPCServer{w: w, log: g.log}); err != nil { g.log.Error("Plugin failed to ServeHTTP, couldn't register RPC name", mlog.Err(err)) return } rpcServer.ServeConn(connection) }() requestBodyStreamId := uint32(0) if r.Body != nil { requestBodyStreamId = g.muxBroker.NextId() go func() { bodyConnection, err := g.muxBroker.Accept(requestBodyStreamId) if err != nil { g.log.Error("Plugin failed to ServeHTTP, muxBroker couldn't Accept request body connection", mlog.Err(err)) return } defer bodyConnection.Close() serveIOReader(r.Body, bodyConnection) }() } forwardedRequest := &HTTPRequestSubset{ Method: r.Method, URL: r.URL, Proto: r.Proto, ProtoMajor: r.ProtoMajor, ProtoMinor: r.ProtoMinor, Header: r.Header, Host: r.Host, RemoteAddr: r.RemoteAddr, RequestURI: r.RequestURI, } if err := g.client.Call("Plugin.ServeHTTP", Z_ServeHTTPArgs{ Context: c, ResponseWriterStream: serveHTTPStreamId, Request: forwardedRequest, RequestBodyStream: requestBodyStreamId, }, nil); err != nil { g.log.Error("Plugin failed to ServeHTTP, RPC call failed", mlog.Err(err)) http.Error(w, "500 internal server error", http.StatusInternalServerError) } } func (s *hooksRPCServer) ServeHTTP(args *Z_ServeHTTPArgs, returns *struct{}) error { connection, err := s.muxBroker.Dial(args.ResponseWriterStream) if err != nil { fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote response writer stream, error: %v", err.Error()) return err } w := connectHTTPResponseWriter(connection) defer w.Close() r := args.Request if args.RequestBodyStream != 0 { connection, err := s.muxBroker.Dial(args.RequestBodyStream) if err != nil { fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote request body stream, error: %v", err.Error()) return err } r.Body = connectIOReader(connection) } else { r.Body = io.NopCloser(&bytes.Buffer{}) } defer r.Body.Close() httpReq := r.GetHTTPRequest() if hook, ok := s.impl.(interface { ServeHTTP(c *Context, w http.ResponseWriter, r *http.Request) }); ok { hook.ServeHTTP(args.Context, w, httpReq) } else { http.NotFound(w, httpReq) } return nil } // PluginHTTPStream - Streaming version of PluginHTTP that uses MuxBroker for streaming request/response bodies. // This avoids buffering large payloads in memory. // Legacy buffered structs (kept for backward compatibility with old servers) type Z_PluginHTTPArgs struct { Request *HTTPRequestSubset RequestBody []byte } type Z_PluginHTTPReturns struct { Response *http.Response ResponseBody []byte } // New streaming structs type Z_PluginHTTPStreamArgs struct { ResponseBodyStream uint32 Request *HTTPRequestSubset RequestBodyStream uint32 } type Z_PluginHTTPStreamReturns struct { StatusCode int Header http.Header } func (g *apiRPCClient) PluginHTTP(request *http.Request) *http.Response { // Try to use the streaming version first (if server supports it) // Fall back to buffered version if not available (signaled by nil) response, err := g.pluginHTTPStream(request) if err != nil { // If we error for some other reason other than stream not being // implemented just report and fail log.Print(err.Error()) return nil } if response != nil { return response } // Fallback to buffered version return g.pluginHTTPBuffered(request) } // pluginHTTPStream attempts to use the new streaming endpoint func (g *apiRPCClient) pluginHTTPStream(request *http.Request) (*http.Response, error) { // Set up request body stream requestBodyStreamId := uint32(0) if request.Body != nil { requestBodyStreamId = g.muxBroker.NextId() go func() { bodyConnection, err := g.muxBroker.Accept(requestBodyStreamId) if err != nil { log.Printf("Plugin failed to accept request body connection for PluginHTTPStream: %s", err.Error()) return } defer bodyConnection.Close() serveIOReader(request.Body, bodyConnection) }() } // Set up response body stream responseBodyStreamId := g.muxBroker.NextId() responsePipe := make(chan io.ReadCloser, 1) go func() { connection, err := g.muxBroker.Accept(responseBodyStreamId) if err != nil { log.Printf("Plugin failed to accept response body connection for PluginHTTPStream: %s", err.Error()) responsePipe <- nil return } // Don't close connection here - it will be closed when response body is read responsePipe <- connectIOReader(connection) }() forwardedRequest := &HTTPRequestSubset{ Method: request.Method, URL: request.URL, Proto: request.Proto, ProtoMajor: request.ProtoMajor, ProtoMinor: request.ProtoMinor, Header: request.Header, Host: request.Host, RemoteAddr: request.RemoteAddr, RequestURI: request.RequestURI, } _args := &Z_PluginHTTPStreamArgs{ ResponseBodyStream: responseBodyStreamId, Request: forwardedRequest, RequestBodyStream: requestBodyStreamId, } _returns := &Z_PluginHTTPStreamReturns{} if err := g.client.Call("Plugin.PluginHTTPStream", _args, _returns); err != nil { // If the method doesn't exist, return nil to trigger fallback if err.Error() == "rpc: can't find method Plugin.PluginHTTPStream" { return nil, nil } return nil, fmt.Errorf("RPC call to PluginHTTPStream API failed: %w", err) } // Wait for response body reader responseBody := <-responsePipe if responseBody == nil { return nil, fmt.Errorf("Failed to get response body stream for PluginHTTPStream") } // Create response with streamed body response := &http.Response{ StatusCode: _returns.StatusCode, Header: _returns.Header, Body: responseBody, Proto: request.Proto, ProtoMajor: request.ProtoMajor, ProtoMinor: request.ProtoMinor, } return response, nil } // pluginHTTPBuffered is the original buffered implementation func (g *apiRPCClient) pluginHTTPBuffered(request *http.Request) *http.Response { forwardedRequest := &HTTPRequestSubset{ Method: request.Method, URL: request.URL, Proto: request.Proto, ProtoMajor: request.ProtoMajor, ProtoMinor: request.ProtoMinor, Header: request.Header, Host: request.Host, RemoteAddr: request.RemoteAddr, RequestURI: request.RequestURI, } _args := &Z_PluginHTTPArgs{ Request: forwardedRequest, } if request.Body != nil { requestBody, err := io.ReadAll(request.Body) if err != nil { log.Printf("RPC call to PluginHTTP API failed: %s", err.Error()) return nil } request.Body.Close() request.Body = nil _args.RequestBody = requestBody } _returns := &Z_PluginHTTPReturns{} if err := g.client.Call("Plugin.PluginHTTP", _args, _returns); err != nil { log.Printf("RPC call to PluginHTTP API failed: %s", err.Error()) return nil } _returns.Response.Body = io.NopCloser(bytes.NewBuffer(_returns.ResponseBody)) return _returns.Response } func (s *apiRPCServer) PluginHTTPStream(args *Z_PluginHTTPStreamArgs, returns *Z_PluginHTTPStreamReturns) error { responseConnection, err := s.muxBroker.Dial(args.ResponseBodyStream) if err != nil { return encodableError(fmt.Errorf("can't connect to remote response body stream: %w", err)) } // Connect to request body stream r := args.Request if args.RequestBodyStream != 0 { requestConnection, err := s.muxBroker.Dial(args.RequestBodyStream) if err != nil { return encodableError(fmt.Errorf("can't connect to remote request body stream: %w", err)) } r.Body = connectIOReader(requestConnection) } else { r.Body = io.NopCloser(&bytes.Buffer{}) } httpReq := r.GetHTTPRequest() // Call the PluginHTTP implementation if hook, ok := s.impl.(interface { PluginHTTP(request *http.Request) *http.Response }); ok { response := hook.PluginHTTP(httpReq) if response != nil { returns.StatusCode = response.StatusCode returns.Header = response.Header // Connect to response body stream and stream the response body go func() { defer r.Body.Close() if response.Body != nil { // Stream the response body through the connection if _, err := io.Copy(responseConnection, response.Body); err != nil { log.Printf("error streaming response body: %s", err.Error()) } response.Body.Close() } responseConnection.Close() }() } else { r.Body.Close() } } else { r.Body.Close() return encodableError(fmt.Errorf("API PluginHTTP called but not implemented")) } return nil } // Server-side handler for old buffered PluginHTTP (for backward compatibility) func (s *apiRPCServer) PluginHTTP(args *Z_PluginHTTPArgs, returns *Z_PluginHTTPReturns) error { args.Request.Body = io.NopCloser(bytes.NewBuffer(args.RequestBody)) if hook, ok := s.impl.(interface { PluginHTTP(request *http.Request) *http.Response }); ok { response := hook.PluginHTTP(args.Request.GetHTTPRequest()) responseBody, err := io.ReadAll(response.Body) if err != nil { return encodableError(fmt.Errorf("RPC call to PluginHTTP API failed: %s", err.Error())) } response.Body.Close() response.Body = nil returns.Response = response returns.ResponseBody = responseBody } else { return encodableError(fmt.Errorf("API PluginHTTP called but not implemented")) } return nil } func init() { hookNameToId["FileWillBeUploaded"] = FileWillBeUploadedID } type Z_FileWillBeUploadedArgs struct { A *Context B *model.FileInfo UploadedFileStream uint32 ReplacementFileStream uint32 } type Z_FileWillBeUploadedReturns struct { A *model.FileInfo B string } func (g *hooksRPCClient) FileWillBeUploaded(c *Context, info *model.FileInfo, file io.Reader, output io.Writer) (*model.FileInfo, string) { if !g.implemented[FileWillBeUploadedID] { return info, "" } uploadedFileStreamId := g.muxBroker.NextId() go func() { uploadedFileConnection, err := g.muxBroker.Accept(uploadedFileStreamId) if err != nil { g.log.Error("Plugin failed to serve upload file stream. MuxBroker could not Accept connection", mlog.Err(err)) return } defer uploadedFileConnection.Close() serveIOReader(file, uploadedFileConnection) }() replacementDone := make(chan bool) replacementFileStreamId := g.muxBroker.NextId() go func() { defer close(replacementDone) replacementFileConnection, err := g.muxBroker.Accept(replacementFileStreamId) if err != nil { g.log.Error("Plugin failed to serve replacement file stream. MuxBroker could not Accept connection", mlog.Err(err)) return } defer replacementFileConnection.Close() if _, err := io.Copy(output, replacementFileConnection); err != nil { g.log.Error("Error reading replacement file.", mlog.Err(err)) } }() _args := &Z_FileWillBeUploadedArgs{c, info, uploadedFileStreamId, replacementFileStreamId} _returns := &Z_FileWillBeUploadedReturns{A: _args.B} if err := g.client.Call("Plugin.FileWillBeUploaded", _args, _returns); err != nil { g.log.Error("RPC call FileWillBeUploaded to plugin failed.", mlog.Err(err)) } // Ensure the io.Copy from the replacementFileConnection above completes. <-replacementDone return _returns.A, _returns.B } func (s *hooksRPCServer) FileWillBeUploaded(args *Z_FileWillBeUploadedArgs, returns *Z_FileWillBeUploadedReturns) error { uploadFileConnection, err := s.muxBroker.Dial(args.UploadedFileStream) if err != nil { fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote upload file stream, error: %v", err.Error()) return err } defer uploadFileConnection.Close() fileReader := connectIOReader(uploadFileConnection) defer fileReader.Close() replacementFileConnection, err := s.muxBroker.Dial(args.ReplacementFileStream) if err != nil { fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote replacement file stream, error: %v", err.Error()) return err } defer replacementFileConnection.Close() returnFileWriter := replacementFileConnection if hook, ok := s.impl.(interface { FileWillBeUploaded(c *Context, info *model.FileInfo, file io.Reader, output io.Writer) (*model.FileInfo, string) }); ok { returns.A, returns.B = hook.FileWillBeUploaded(args.A, args.B, fileReader, returnFileWriter) } else { return fmt.Errorf("hook FileWillBeUploaded called but not implemented") } return nil } // MessageWillBePosted is in this file because of the difficulty of identifying which fields need special behaviour. // The special behaviour needed is decoding the returned post into the original one to avoid the unintentional removal // of fields by older plugins. func init() { hookNameToId["MessageWillBePosted"] = MessageWillBePostedID } type Z_MessageWillBePostedArgs struct { A *Context B *model.Post } type Z_MessageWillBePostedReturns struct { A *model.Post B string } func (g *hooksRPCClient) MessageWillBePosted(c *Context, post *model.Post) (*model.Post, string) { _args := &Z_MessageWillBePostedArgs{c, post} _returns := &Z_MessageWillBePostedReturns{A: _args.B} if g.implemented[MessageWillBePostedID] { if err := g.client.Call("Plugin.MessageWillBePosted", _args, _returns); err != nil { g.log.Error("RPC call MessageWillBePosted to plugin failed.", mlog.Err(err)) } } return _returns.A, _returns.B } func (s *hooksRPCServer) MessageWillBePosted(args *Z_MessageWillBePostedArgs, returns *Z_MessageWillBePostedReturns) error { if hook, ok := s.impl.(interface { MessageWillBePosted(c *Context, post *model.Post) (*model.Post, string) }); ok { returns.A, returns.B = hook.MessageWillBePosted(args.A, args.B) } else { return encodableError(fmt.Errorf("hook MessageWillBePosted called but not implemented")) } return nil } // MessageWillBeUpdated is in this file because of the difficulty of identifying which fields need special behaviour. // The special behaviour needed is decoding the returned post into the original one to avoid the unintentional removal // of fields by older plugins. func init() { hookNameToId["MessageWillBeUpdated"] = MessageWillBeUpdatedID } type Z_MessageWillBeUpdatedArgs struct { A *Context B *model.Post C *model.Post } type Z_MessageWillBeUpdatedReturns struct { A *model.Post B string } func (g *hooksRPCClient) MessageWillBeUpdated(c *Context, newPost, oldPost *model.Post) (*model.Post, string) { _args := &Z_MessageWillBeUpdatedArgs{c, newPost, oldPost} _default_returns := &Z_MessageWillBeUpdatedReturns{A: _args.B} if g.implemented[MessageWillBeUpdatedID] { _returns := &Z_MessageWillBeUpdatedReturns{} if err := g.client.Call("Plugin.MessageWillBeUpdated", _args, _returns); err != nil { g.log.Error("RPC call MessageWillBeUpdated to plugin failed.", mlog.Err(err)) return _default_returns.A, _default_returns.B } return _returns.A, _returns.B } return _default_returns.A, _default_returns.B } func (s *hooksRPCServer) MessageWillBeUpdated(args *Z_MessageWillBeUpdatedArgs, returns *Z_MessageWillBeUpdatedReturns) error { if hook, ok := s.impl.(interface { MessageWillBeUpdated(c *Context, newPost, oldPost *model.Post) (*model.Post, string) }); ok { returns.A, returns.B = hook.MessageWillBeUpdated(args.A, args.B, args.C) } else { return encodableError(fmt.Errorf("hook MessageWillBeUpdated called but not implemented")) } return nil } // MessagesWillBeConsumed is in this file because of the difficulty of identifying which fields need special behaviour. // The special behaviour needed is decoding the returned post into the original one to avoid the unintentional removal // of fields by older plugins. func init() { hookNameToId["MessagesWillBeConsumed"] = MessagesWillBeConsumedID } type Z_MessagesWillBeConsumedArgs struct { A []*model.Post } type Z_MessagesWillBeConsumedReturns struct { A []*model.Post } func (g *hooksRPCClient) MessagesWillBeConsumed(posts []*model.Post) []*model.Post { _args := &Z_MessagesWillBeConsumedArgs{posts} _returns := &Z_MessagesWillBeConsumedReturns{} if g.implemented[MessagesWillBeConsumedID] { if err := g.client.Call("Plugin.MessagesWillBeConsumed", _args, _returns); err != nil { g.log.Error("RPC call MessagesWillBeConsumed to plugin failed.", mlog.Err(err)) } } return _returns.A } func (s *hooksRPCServer) MessagesWillBeConsumed(args *Z_MessagesWillBeConsumedArgs, returns *Z_MessagesWillBeConsumedReturns) error { if hook, ok := s.impl.(interface { MessagesWillBeConsumed(posts []*model.Post) []*model.Post }); ok { returns.A = hook.MessagesWillBeConsumed(args.A) } else { return encodableError(fmt.Errorf("hook MessagesWillBeConsumed called but not implemented")) } return nil } type Z_LogDebugArgs struct { A string B []any } type Z_LogDebugReturns struct{} func (g *apiRPCClient) LogDebug(msg string, keyValuePairs ...any) { stringifiedPairs := stringifyToObjects(keyValuePairs) _args := &Z_LogDebugArgs{msg, stringifiedPairs} _returns := &Z_LogDebugReturns{} if err := g.client.Call("Plugin.LogDebug", _args, _returns); err != nil { log.Printf("RPC call to LogDebug API failed: %s", err.Error()) } } func (s *apiRPCServer) LogDebug(args *Z_LogDebugArgs, returns *Z_LogDebugReturns) error { if hook, ok := s.impl.(interface { LogDebug(msg string, keyValuePairs ...any) }); ok { hook.LogDebug(args.A, args.B...) } else { return encodableError(fmt.Errorf("API LogDebug called but not implemented")) } return nil } type Z_LogInfoArgs struct { A string B []any } type Z_LogInfoReturns struct{} func (g *apiRPCClient) LogInfo(msg string, keyValuePairs ...any) { stringifiedPairs := stringifyToObjects(keyValuePairs) _args := &Z_LogInfoArgs{msg, stringifiedPairs} _returns := &Z_LogInfoReturns{} if err := g.client.Call("Plugin.LogInfo", _args, _returns); err != nil { log.Printf("RPC call to LogInfo API failed: %s", err.Error()) } } func (s *apiRPCServer) LogInfo(args *Z_LogInfoArgs, returns *Z_LogInfoReturns) error { if hook, ok := s.impl.(interface { LogInfo(msg string, keyValuePairs ...any) }); ok { hook.LogInfo(args.A, args.B...) } else { return encodableError(fmt.Errorf("API LogInfo called but not implemented")) } return nil } type Z_LogWarnArgs struct { A string B []any } type Z_LogWarnReturns struct{} func (g *apiRPCClient) LogWarn(msg string, keyValuePairs ...any) { stringifiedPairs := stringifyToObjects(keyValuePairs) _args := &Z_LogWarnArgs{msg, stringifiedPairs} _returns := &Z_LogWarnReturns{} if err := g.client.Call("Plugin.LogWarn", _args, _returns); err != nil { log.Printf("RPC call to LogWarn API failed: %s", err.Error()) } } func (s *apiRPCServer) LogWarn(args *Z_LogWarnArgs, returns *Z_LogWarnReturns) error { if hook, ok := s.impl.(interface { LogWarn(msg string, keyValuePairs ...any) }); ok { hook.LogWarn(args.A, args.B...) } else { return encodableError(fmt.Errorf("API LogWarn called but not implemented")) } return nil } type Z_LogErrorArgs struct { A string B []any } type Z_LogErrorReturns struct{} func (g *apiRPCClient) LogError(msg string, keyValuePairs ...any) { stringifiedPairs := stringifyToObjects(keyValuePairs) _args := &Z_LogErrorArgs{msg, stringifiedPairs} _returns := &Z_LogErrorReturns{} if err := g.client.Call("Plugin.LogError", _args, _returns); err != nil { log.Printf("RPC call to LogError API failed: %s", err.Error()) } } func (s *apiRPCServer) LogError(args *Z_LogErrorArgs, returns *Z_LogErrorReturns) error { if hook, ok := s.impl.(interface { LogError(msg string, keyValuePairs ...any) }); ok { hook.LogError(args.A, args.B...) } else { return encodableError(fmt.Errorf("API LogError called but not implemented")) } return nil } type Z_LogAuditRecArgs struct { A *model.AuditRecord } type Z_LogAuditRecReturns struct { } // Custom audit logging methods with gob safety checks func (g *apiRPCClient) LogAuditRec(rec *model.AuditRecord) { gobSafeRec := makeAuditRecordGobSafe(*rec) _args := &Z_LogAuditRecArgs{&gobSafeRec} _returns := &Z_LogAuditRecReturns{} if err := g.client.Call("Plugin.LogAuditRec", _args, _returns); err != nil { log.Printf("RPC call to LogAuditRec API failed: %s", err.Error()) } } func (s *apiRPCServer) LogAuditRec(args *Z_LogAuditRecArgs, returns *Z_LogAuditRecReturns) error { if hook, ok := s.impl.(interface { LogAuditRec(rec *model.AuditRecord) }); ok { hook.LogAuditRec(args.A) } else { return encodableError(fmt.Errorf("API LogAuditRec called but not implemented")) } return nil } type Z_LogAuditRecWithLevelArgs struct { A *model.AuditRecord B mlog.Level } type Z_LogAuditRecWithLevelReturns struct { } func (g *apiRPCClient) LogAuditRecWithLevel(rec *model.AuditRecord, level mlog.Level) { gobSafeRec := makeAuditRecordGobSafe(*rec) _args := &Z_LogAuditRecWithLevelArgs{&gobSafeRec, level} _returns := &Z_LogAuditRecWithLevelReturns{} if err := g.client.Call("Plugin.LogAuditRecWithLevel", _args, _returns); err != nil { log.Printf("RPC call to LogAuditRecWithLevel API failed: %s", err.Error()) } } func (s *apiRPCServer) LogAuditRecWithLevel(args *Z_LogAuditRecWithLevelArgs, returns *Z_LogAuditRecWithLevelReturns) error { if hook, ok := s.impl.(interface { LogAuditRecWithLevel(rec *model.AuditRecord, level mlog.Level) }); ok { hook.LogAuditRecWithLevel(args.A, args.B) } else { return encodableError(fmt.Errorf("API LogAuditRecWithLevel called but not implemented")) } return nil } type Z_InstallPluginArgs struct { PluginStreamID uint32 B bool } type Z_InstallPluginReturns struct { A *model.Manifest B *model.AppError } func (g *apiRPCClient) InstallPlugin(file io.Reader, replace bool) (*model.Manifest, *model.AppError) { pluginStreamID := g.muxBroker.NextId() go func() { uploadPluginConnection, err := g.muxBroker.Accept(pluginStreamID) if err != nil { log.Print("Plugin failed to upload plugin. MuxBroker could not Accept connection", mlog.Err(err)) return } defer uploadPluginConnection.Close() serveIOReader(file, uploadPluginConnection) }() _args := &Z_InstallPluginArgs{pluginStreamID, replace} _returns := &Z_InstallPluginReturns{} if err := g.client.Call("Plugin.InstallPlugin", _args, _returns); err != nil { log.Print("RPC call InstallPlugin to plugin failed.", mlog.Err(err)) } return _returns.A, _returns.B } func (s *apiRPCServer) InstallPlugin(args *Z_InstallPluginArgs, returns *Z_InstallPluginReturns) error { hook, ok := s.impl.(interface { InstallPlugin(file io.Reader, replace bool) (*model.Manifest, *model.AppError) }) if !ok { return encodableError(fmt.Errorf("API InstallPlugin called but not implemented")) } receivePluginConnection, err := s.muxBroker.Dial(args.PluginStreamID) if err != nil { fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote plugin stream, error: %v", err.Error()) return err } pluginReader := connectIOReader(receivePluginConnection) defer pluginReader.Close() returns.A, returns.B = hook.InstallPlugin(pluginReader, args.B) return nil } type Z_UploadDataArgs struct { A *model.UploadSession PluginStreamID uint32 } type Z_UploadDataReturns struct { A *model.FileInfo B error } func (g *apiRPCClient) UploadData(us *model.UploadSession, rd io.Reader) (*model.FileInfo, error) { pluginStreamID := g.muxBroker.NextId() go func() { pluginConnection, err := g.muxBroker.Accept(pluginStreamID) if err != nil { log.Print("Failed to upload data. MuxBroker could not Accept connection", mlog.Err(err)) return } defer pluginConnection.Close() serveIOReader(rd, pluginConnection) }() _args := &Z_UploadDataArgs{us, pluginStreamID} _returns := &Z_UploadDataReturns{} if err := g.client.Call("Plugin.UploadData", _args, _returns); err != nil { log.Print("RPC call UploadData to plugin failed.", mlog.Err(err)) } return _returns.A, _returns.B } func (s *apiRPCServer) UploadData(args *Z_UploadDataArgs, returns *Z_UploadDataReturns) error { hook, ok := s.impl.(interface { UploadData(us *model.UploadSession, rd io.Reader) (*model.FileInfo, error) }) if !ok { return encodableError(fmt.Errorf("API UploadData called but not implemented")) } receivePluginConnection, err := s.muxBroker.Dial(args.PluginStreamID) if err != nil { fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote plugin stream, error: %v", err.Error()) return err } pluginReader := connectIOReader(receivePluginConnection) defer pluginReader.Close() returns.A, returns.B = hook.UploadData(args.A, pluginReader) return nil } func init() { hookNameToId["ServeMetrics"] = ServeMetricsID } type Z_ServeMetricsArgs struct { ResponseWriterStream uint32 Request *HTTPRequestSubset Context *Context RequestBodyStream uint32 } func (g *hooksRPCClient) ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request) { if !g.implemented[ServeMetricsID] { http.NotFound(w, r) return } serveMetricsStreamId := g.muxBroker.NextId() go func() { connection, err := g.muxBroker.Accept(serveMetricsStreamId) if err != nil { g.log.Error("Plugin failed to ServeMetrics, muxBroker couldn't accept connection", mlog.Uint("serve_http_stream_id", serveMetricsStreamId), mlog.Err(err)) return } defer connection.Close() rpcServer := rpc.NewServer() if err := rpcServer.RegisterName("Plugin", &httpResponseWriterRPCServer{w: w, log: g.log}); err != nil { g.log.Error("Plugin failed to ServeMetrics, couldn't register RPC name", mlog.Err(err)) return } rpcServer.ServeConn(connection) }() requestBodyStreamId := uint32(0) if r.Body != nil { requestBodyStreamId = g.muxBroker.NextId() go func() { bodyConnection, err := g.muxBroker.Accept(requestBodyStreamId) if err != nil { g.log.Error("Plugin failed to ServeMetrics, muxBroker couldn't Accept request body connection", mlog.Err(err)) return } defer bodyConnection.Close() serveIOReader(r.Body, bodyConnection) }() } forwardedRequest := &HTTPRequestSubset{ Method: r.Method, URL: r.URL, Proto: r.Proto, ProtoMajor: r.ProtoMajor, ProtoMinor: r.ProtoMinor, Header: r.Header, Host: r.Host, RemoteAddr: r.RemoteAddr, RequestURI: r.RequestURI, } if err := g.client.Call("Plugin.ServeMetrics", Z_ServeMetricsArgs{ Context: c, ResponseWriterStream: serveMetricsStreamId, Request: forwardedRequest, RequestBodyStream: requestBodyStreamId, }, nil); err != nil { g.log.Error("Plugin failed to ServeMetrics, RPC call failed", mlog.Err(err)) http.Error(w, "500 internal server error", http.StatusInternalServerError) } } func (s *hooksRPCServer) ServeMetrics(args *Z_ServeMetricsArgs, returns *struct{}) error { connection, err := s.muxBroker.Dial(args.ResponseWriterStream) if err != nil { fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote response writer stream, error: %v", err.Error()) return err } w := connectHTTPResponseWriter(connection) defer w.Close() r := args.Request if args.RequestBodyStream != 0 { connection, err := s.muxBroker.Dial(args.RequestBodyStream) if err != nil { fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote request body stream, error: %v", err.Error()) return err } r.Body = connectIOReader(connection) } else { r.Body = io.NopCloser(&bytes.Buffer{}) } defer r.Body.Close() httpReq := r.GetHTTPRequest() if hook, ok := s.impl.(interface { ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request) }); ok { hook.ServeMetrics(args.Context, w, httpReq) } else { http.NotFound(w, httpReq) } return nil }