Full Mattermost server source with integrated Community Enterprise features. Includes vendor directory for offline/air-gapped builds. Structure: - enterprise-impl/: Enterprise feature implementations - enterprise-community/: Init files that register implementations - enterprise/: Bridge imports (community_imports.go) - vendor/: All dependencies for offline builds Build (online): go build ./cmd/mattermost Build (offline/air-gapped): go build -mod=vendor ./cmd/mattermost 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
254 lines
7.8 KiB
Go
254 lines
7.8 KiB
Go
package push
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/splitio/go-split-commons/v7/conf"
|
|
"github.com/splitio/go-split-commons/v7/dtos"
|
|
"github.com/splitio/go-split-commons/v7/service"
|
|
"github.com/splitio/go-split-commons/v7/service/api/sse"
|
|
"github.com/splitio/go-split-commons/v7/storage"
|
|
"github.com/splitio/go-split-commons/v7/telemetry"
|
|
"github.com/splitio/go-toolkit/v5/common"
|
|
"github.com/splitio/go-toolkit/v5/logging"
|
|
"github.com/splitio/go-toolkit/v5/struct/traits/lifecycle"
|
|
)
|
|
|
|
// Status update contants that will be propagated to the push manager's user
|
|
const (
|
|
StatusUp = iota
|
|
StatusDown
|
|
StatusRetryableError
|
|
StatusNonRetryableError
|
|
)
|
|
|
|
// ErrAlreadyRunning is the error to be returned when .Start() is called on an already running instance
|
|
var ErrAlreadyRunning = errors.New("push manager already running")
|
|
|
|
// ErrNotRunning is the error to be returned when .Stop() is called on a non-running instance
|
|
var ErrNotRunning = errors.New("push manager not running")
|
|
|
|
// Manager interface contains public methods for push manager
|
|
type Manager interface {
|
|
Start() error
|
|
Stop() error
|
|
StopWorkers()
|
|
StartWorkers()
|
|
NextRefresh() time.Time
|
|
}
|
|
|
|
// ManagerImpl implements the manager interface
|
|
type ManagerImpl struct {
|
|
parser NotificationParser
|
|
sseClient sse.StreamingClient
|
|
authAPI service.AuthClient
|
|
processor Processor
|
|
statusTracker StatusTracker
|
|
feedback FeedbackLoop
|
|
nextRefresh *time.Timer
|
|
nextRefreshAt time.Time
|
|
refreshTokenMutex sync.Mutex
|
|
lifecycle lifecycle.Manager
|
|
logger logging.LoggerInterface
|
|
runtimeTelemetry storage.TelemetryRuntimeProducer
|
|
}
|
|
|
|
// FeedbackLoop is a type alias for the type of chan that must be supplied for push status tobe propagated
|
|
type FeedbackLoop = chan<- int64
|
|
|
|
// NewManager constructs a new push manager
|
|
func NewManager(
|
|
logger logging.LoggerInterface,
|
|
synchronizer synchronizerInterface,
|
|
cfg *conf.AdvancedConfig,
|
|
feedbackLoop chan<- int64,
|
|
authAPI service.AuthClient,
|
|
runtimeTelemetry storage.TelemetryRuntimeProducer,
|
|
metadata dtos.Metadata,
|
|
clientKey *string,
|
|
) (*ManagerImpl, error) {
|
|
processor, err := NewProcessor(cfg.SplitUpdateQueueSize, cfg.SegmentUpdateQueueSize, synchronizer, logger, cfg.LargeSegment)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error instantiating processor: %w", err)
|
|
}
|
|
|
|
statusTracker := NewStatusTracker(logger, runtimeTelemetry)
|
|
parser := NewNotificationParserImpl(logger, processor.ProcessSplitChangeUpdate, processor.ProcessSplitKillUpdate, processor.ProcessSegmentChangeUpdate,
|
|
statusTracker.HandleControl, statusTracker.HandleOccupancy, statusTracker.HandleAblyError, processor.ProcessLargeSegmentChangeUpdate)
|
|
|
|
manager := &ManagerImpl{
|
|
authAPI: authAPI,
|
|
sseClient: sse.NewStreamingClient(cfg, logger, metadata, clientKey),
|
|
statusTracker: statusTracker,
|
|
feedback: feedbackLoop,
|
|
processor: processor,
|
|
parser: parser,
|
|
logger: logger,
|
|
runtimeTelemetry: runtimeTelemetry,
|
|
}
|
|
manager.lifecycle.Setup()
|
|
return manager, nil
|
|
}
|
|
|
|
// Start initiates the authentication flow and if successful initiates a connection
|
|
func (m *ManagerImpl) Start() error {
|
|
if !m.lifecycle.BeginInitialization() {
|
|
return ErrAlreadyRunning
|
|
}
|
|
m.triggerConnectionFlow()
|
|
return nil
|
|
}
|
|
|
|
// Stop method stops the sse client and it's status monitoring goroutine
|
|
func (m *ManagerImpl) Stop() error {
|
|
if !m.lifecycle.BeginShutdown() {
|
|
return ErrNotRunning
|
|
}
|
|
m.statusTracker.NotifySSEShutdownExpected()
|
|
m.withRefreshTokenLock(func() {
|
|
if m.nextRefresh != nil {
|
|
m.nextRefresh.Stop()
|
|
}
|
|
})
|
|
m.StopWorkers()
|
|
m.sseClient.StopStreaming()
|
|
m.lifecycle.AwaitShutdownComplete()
|
|
return nil
|
|
}
|
|
|
|
// StartWorkers start the splits & segments workers
|
|
func (m *ManagerImpl) StartWorkers() {
|
|
m.processor.StartWorkers()
|
|
}
|
|
|
|
// StopWorkers stops the splits & segments workers
|
|
func (m *ManagerImpl) StopWorkers() {
|
|
m.processor.StopWorkers()
|
|
}
|
|
|
|
// NextRefresh returns the time when the next token refresh will happen
|
|
func (m *ManagerImpl) NextRefresh() time.Time {
|
|
m.refreshTokenMutex.Lock()
|
|
defer m.refreshTokenMutex.Unlock()
|
|
return m.nextRefreshAt
|
|
}
|
|
|
|
func (m *ManagerImpl) performAuthentication() (*dtos.Token, *int64) {
|
|
before := time.Now()
|
|
token, err := m.authAPI.Authenticate()
|
|
if err != nil {
|
|
if errType, ok := err.(*dtos.HTTPError); ok {
|
|
m.runtimeTelemetry.RecordSyncError(telemetry.TokenSync, errType.Code)
|
|
if errType.Code >= http.StatusInternalServerError {
|
|
m.logger.Error(fmt.Sprintf("Error authenticating: %s", err.Error()))
|
|
return nil, common.Int64Ref(StatusRetryableError)
|
|
}
|
|
if errType.Code == http.StatusUnauthorized {
|
|
m.runtimeTelemetry.RecordAuthRejections() // Only 401
|
|
}
|
|
return nil, common.Int64Ref(StatusNonRetryableError) // 400, 401, etc
|
|
}
|
|
// Not an HTTP error, most likely a tcp/bad connection. Should retry
|
|
return nil, common.Int64Ref(StatusRetryableError)
|
|
}
|
|
m.runtimeTelemetry.RecordSyncLatency(telemetry.TokenSync, time.Since(before))
|
|
if !token.PushEnabled {
|
|
return nil, common.Int64Ref(StatusNonRetryableError)
|
|
}
|
|
m.runtimeTelemetry.RecordTokenRefreshes()
|
|
m.runtimeTelemetry.RecordSuccessfulSync(telemetry.TokenSync, time.Now().UTC())
|
|
return token, nil
|
|
}
|
|
|
|
func (m *ManagerImpl) eventHandler(e sse.IncomingMessage) {
|
|
newStatus, err := m.parser.ParseAndForward(e)
|
|
if newStatus != nil {
|
|
m.feedback <- *newStatus
|
|
} else if err != nil {
|
|
m.logger.Error("error parsing message: ", err)
|
|
m.logger.Debug("failed message: ", e)
|
|
m.feedback <- StatusRetryableError
|
|
}
|
|
}
|
|
|
|
func (m *ManagerImpl) triggerConnectionFlow() {
|
|
token, status := m.performAuthentication()
|
|
if status != nil {
|
|
m.lifecycle.AbnormalShutdown()
|
|
defer m.lifecycle.ShutdownComplete()
|
|
m.feedback <- *status
|
|
return
|
|
}
|
|
|
|
tokenList, err := token.ChannelList()
|
|
if err != nil {
|
|
m.logger.Error("error parsing channel list: ", err)
|
|
m.lifecycle.AbnormalShutdown()
|
|
defer m.lifecycle.ShutdownComplete()
|
|
m.feedback <- StatusRetryableError
|
|
return
|
|
}
|
|
|
|
m.statusTracker.Reset()
|
|
sseStatus := make(chan int, 100)
|
|
m.sseClient.ConnectStreaming(token.Token, sseStatus, tokenList, m.eventHandler)
|
|
go func() {
|
|
defer m.lifecycle.ShutdownComplete()
|
|
if !m.lifecycle.InitializationComplete() {
|
|
return
|
|
}
|
|
for {
|
|
message := <-sseStatus
|
|
switch message {
|
|
case sse.StatusFirstEventOk:
|
|
when, err := token.CalculateNextTokenExpiration()
|
|
if err != nil || when <= 0 {
|
|
m.logger.Warning("Failed to calculate next token expiration time. Defaulting to 50 minutes")
|
|
when = 50 * time.Minute
|
|
}
|
|
m.runtimeTelemetry.RecordStreamingEvent(telemetry.GetStreamingEvent(telemetry.EventTypeTokenRefresh, when.Milliseconds()))
|
|
m.withRefreshTokenLock(func() {
|
|
m.nextRefreshAt = time.Now().Add(when)
|
|
m.nextRefresh = time.AfterFunc(when, func() {
|
|
m.logger.Info("Refreshing SSE auth token.")
|
|
m.Stop()
|
|
m.Start()
|
|
})
|
|
})
|
|
m.runtimeTelemetry.RecordStreamingEvent(telemetry.GetStreamingEvent(telemetry.EventTypeSSEConnectionEstablished, 0))
|
|
m.feedback <- StatusUp
|
|
case sse.StatusConnectionFailed:
|
|
m.lifecycle.AbnormalShutdown()
|
|
m.logger.Error("SSE Connection failed")
|
|
m.feedback <- StatusRetryableError
|
|
return
|
|
case sse.StatusDisconnected:
|
|
m.logger.Debug("propagating sse disconnection event")
|
|
status := m.statusTracker.HandleDisconnection()
|
|
if status != nil { // connection ended unexpectedly
|
|
m.lifecycle.AbnormalShutdown()
|
|
m.feedback <- *status
|
|
}
|
|
return
|
|
case sse.StatusUnderlyingClientInUse:
|
|
m.lifecycle.AbnormalShutdown()
|
|
m.logger.Error("unexpected error in streaming. Switching to polling")
|
|
m.feedback <- StatusNonRetryableError
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (m *ManagerImpl) withRefreshTokenLock(f func()) {
|
|
m.refreshTokenMutex.Lock()
|
|
defer m.refreshTokenMutex.Unlock()
|
|
f()
|
|
}
|
|
|
|
var _ Manager = (*ManagerImpl)(nil)
|