Full Mattermost server source with integrated Community Enterprise features. Includes vendor directory for offline/air-gapped builds. Structure: - enterprise-impl/: Enterprise feature implementations - enterprise-community/: Init files that register implementations - enterprise/: Bridge imports (community_imports.go) - vendor/: All dependencies for offline builds Build (online): go build ./cmd/mattermost Build (offline/air-gapped): go build -mod=vendor ./cmd/mattermost 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
180 lines
5.8 KiB
Go
180 lines
5.8 KiB
Go
package push
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/splitio/go-split-commons/v7/dtos"
|
|
"github.com/splitio/go-split-commons/v7/storage"
|
|
"github.com/splitio/go-split-commons/v7/telemetry"
|
|
"github.com/splitio/go-toolkit/v5/common"
|
|
"github.com/splitio/go-toolkit/v5/logging"
|
|
)
|
|
|
|
const (
|
|
pri = "control_pri"
|
|
sec = "control_sec"
|
|
)
|
|
|
|
// StatusTracker keeps track of the status of the push subsystem and generates appropriate status change notifications.
|
|
type StatusTracker interface {
|
|
HandleOccupancy(*dtos.OccupancyMessage) *int64
|
|
HandleControl(*dtos.ControlUpdate) *int64
|
|
HandleAblyError(*dtos.AblyError) *int64
|
|
HandleDisconnection() *int64
|
|
NotifySSEShutdownExpected()
|
|
Reset()
|
|
}
|
|
|
|
// StatusTrackerImpl is a concrete implementation of the StatusTracker interface
|
|
type StatusTrackerImpl struct {
|
|
logger logging.LoggerInterface
|
|
mutex sync.Mutex
|
|
occupancy map[string]int64
|
|
lastControlTimestamp int64
|
|
lastOccupancyTimestamp int64
|
|
lastControlMessage string
|
|
lastStatusPropagated int64
|
|
shutdownExpected bool
|
|
runtimeTelemetry storage.TelemetryRuntimeProducer
|
|
}
|
|
|
|
// NotifySSEShutdownExpected should be called when we are forcefully closing the SSE client
|
|
func (p *StatusTrackerImpl) NotifySSEShutdownExpected() {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
p.shutdownExpected = true
|
|
}
|
|
|
|
// Reset should be called on initialization and when the a new connection is being established (to start from scratch)
|
|
func (p *StatusTrackerImpl) Reset() {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
p.occupancy = map[string]int64{pri: 2, sec: 2}
|
|
p.lastControlMessage = dtos.ControlTypeStreamingEnabled
|
|
p.lastStatusPropagated = StatusUp
|
|
p.shutdownExpected = false
|
|
}
|
|
|
|
// HandleOccupancy should be called for every occupancy notification received
|
|
func (p *StatusTrackerImpl) HandleOccupancy(message *dtos.OccupancyMessage) (newStatus *int64) {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
if p.shutdownExpected {
|
|
return nil // we don't care about occupancy if we're disconnecting
|
|
}
|
|
|
|
channel := message.ChannelWithoutPrefix()
|
|
if _, ok := p.occupancy[channel]; !ok {
|
|
p.logger.Warning(fmt.Sprintf("received occupancy on non-registered channel '%s'. Ignoring", channel))
|
|
return nil
|
|
}
|
|
|
|
p.lastOccupancyTimestamp = message.Timestamp()
|
|
p.occupancy[channel] = message.Publishers()
|
|
// Tracking OccupancyEvent
|
|
switch channel {
|
|
case pri:
|
|
p.runtimeTelemetry.RecordStreamingEvent(telemetry.GetStreamingEvent(telemetry.EventTypeOccupancyPri, message.Publishers()))
|
|
case sec:
|
|
p.runtimeTelemetry.RecordStreamingEvent(telemetry.GetStreamingEvent(telemetry.EventTypeOccupancySec, message.Publishers()))
|
|
}
|
|
return p.updateStatus()
|
|
}
|
|
|
|
// HandleAblyError should be called whenever an ably error is received
|
|
func (p *StatusTrackerImpl) HandleAblyError(errorEvent *dtos.AblyError) (newStatus *int64) {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
if p.shutdownExpected {
|
|
return nil // we don't care about occupancy if we're disconnecting
|
|
}
|
|
|
|
// Regardless of whether the error is retryable or not, we're going to close the connection
|
|
p.shutdownExpected = true
|
|
|
|
// Tracking ABLY_ERROR
|
|
p.runtimeTelemetry.RecordStreamingEvent(telemetry.GetStreamingEvent(telemetry.EventTypeAblyError, int64(errorEvent.Code())))
|
|
|
|
if errorEvent.IsRetryable() {
|
|
p.logger.Info("Received retryable error message. Restarting SSE connection with backoff")
|
|
return p.propagateStatus(StatusRetryableError)
|
|
}
|
|
|
|
p.logger.Info("Received non-retryable error message. Disabling streaming")
|
|
return p.propagateStatus(StatusNonRetryableError)
|
|
}
|
|
|
|
// HandleControl should be called whenever a control notification is received
|
|
func (p *StatusTrackerImpl) HandleControl(controlUpdate *dtos.ControlUpdate) *int64 {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
if p.shutdownExpected {
|
|
return nil // we don't care about occupancy if we're disconnecting
|
|
}
|
|
|
|
if p.lastControlTimestamp > controlUpdate.Timestamp() {
|
|
p.logger.Warning("Received an old control update. Ignoring")
|
|
return nil
|
|
}
|
|
|
|
p.lastControlMessage = controlUpdate.ControlType()
|
|
p.lastControlTimestamp = controlUpdate.Timestamp()
|
|
return p.updateStatus()
|
|
}
|
|
|
|
// HandleDisconnection should be called whenver the SSE client gets disconnected
|
|
func (p *StatusTrackerImpl) HandleDisconnection() *int64 {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
if !p.shutdownExpected {
|
|
p.runtimeTelemetry.RecordStreamingEvent(telemetry.GetStreamingEvent(telemetry.EventTypeConnectionError, telemetry.NonRequested))
|
|
return p.propagateStatus(StatusRetryableError)
|
|
}
|
|
p.runtimeTelemetry.RecordStreamingEvent(telemetry.GetStreamingEvent(telemetry.EventTypeConnectionError, telemetry.Requested))
|
|
return nil
|
|
}
|
|
|
|
// NewStatusTracker returns a new StatusTracker
|
|
func NewStatusTracker(logger logging.LoggerInterface, runtimeTelemetry storage.TelemetryRuntimeProducer) *StatusTrackerImpl {
|
|
tracker := &StatusTrackerImpl{logger: logger, runtimeTelemetry: runtimeTelemetry}
|
|
tracker.Reset()
|
|
return tracker
|
|
}
|
|
|
|
func (p *StatusTrackerImpl) occupancyOk() bool {
|
|
for _, v := range p.occupancy {
|
|
if v > 0 {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (p *StatusTrackerImpl) updateStatus() *int64 {
|
|
if p.lastStatusPropagated == StatusUp {
|
|
if !p.occupancyOk() || p.lastControlMessage == dtos.ControlTypeStreamingPaused {
|
|
return p.propagateStatus(StatusDown)
|
|
}
|
|
if p.lastControlMessage == dtos.ControlTypeStreamingDisabled {
|
|
return p.propagateStatus(StatusNonRetryableError)
|
|
}
|
|
}
|
|
if p.lastStatusPropagated == StatusDown {
|
|
if p.occupancyOk() && p.lastControlMessage == dtos.ControlTypeStreamingEnabled {
|
|
return p.propagateStatus(StatusUp)
|
|
}
|
|
if p.lastControlMessage == dtos.ControlTypeStreamingDisabled {
|
|
return p.propagateStatus(StatusNonRetryableError)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *StatusTrackerImpl) propagateStatus(newStatus int64) *int64 {
|
|
p.lastStatusPropagated = newStatus
|
|
return common.Int64Ref(newStatus)
|
|
}
|
|
|
|
var _ StatusTracker = &StatusTrackerImpl{}
|