Full Mattermost server source with integrated Community Enterprise features. Includes vendor directory for offline/air-gapped builds. Structure: - enterprise-impl/: Enterprise feature implementations - enterprise-community/: Init files that register implementations - enterprise/: Bridge imports (community_imports.go) - vendor/: All dependencies for offline builds Build (online): go build ./cmd/mattermost Build (offline/air-gapped): go build -mod=vendor ./cmd/mattermost 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
81 lines
2.0 KiB
Go
81 lines
2.0 KiB
Go
package push
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/splitio/go-split-commons/v7/dtos"
|
|
"github.com/splitio/go-toolkit/v5/logging"
|
|
"github.com/splitio/go-toolkit/v5/struct/traits/lifecycle"
|
|
)
|
|
|
|
type LargeSegmentUpdateWorker struct {
|
|
lsQueue chan dtos.LargeSegmentChangeUpdate
|
|
sync synchronizerInterface
|
|
logger logging.LoggerInterface
|
|
lifecycle lifecycle.Manager
|
|
}
|
|
|
|
func NewLargeSegmentUpdateWorker(
|
|
lsQueue chan dtos.LargeSegmentChangeUpdate,
|
|
synchronizer synchronizerInterface,
|
|
logger logging.LoggerInterface,
|
|
) (*LargeSegmentUpdateWorker, error) {
|
|
if cap(lsQueue) < 5000 {
|
|
return nil, errors.New("largeSegmentQueue capacity must be larger")
|
|
}
|
|
|
|
worker := &LargeSegmentUpdateWorker{
|
|
lsQueue: lsQueue,
|
|
sync: synchronizer,
|
|
logger: logger,
|
|
}
|
|
worker.lifecycle.Setup()
|
|
return worker, nil
|
|
}
|
|
|
|
// Start starts worker
|
|
func (s *LargeSegmentUpdateWorker) Start() {
|
|
if !s.lifecycle.BeginInitialization() {
|
|
s.logger.Info("Large Segment worker is already running")
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
if !s.lifecycle.InitializationComplete() {
|
|
return
|
|
}
|
|
defer s.lifecycle.ShutdownComplete()
|
|
for {
|
|
select {
|
|
case lstUpdate := <-s.lsQueue:
|
|
s.logger.Debug("Received Large Segment updates and proceding to perform fetch")
|
|
for _, ls := range lstUpdate.LargeSegments {
|
|
s.logger.Debug(fmt.Sprintf("LargeSegmentName: %s. ChangeNumber: %d", ls.Name, lstUpdate.ChangeNumber()))
|
|
ls.ChangeNumber = lstUpdate.ChangeNumber()
|
|
err := s.sync.SynchronizeLargeSegmentUpdate(&ls)
|
|
if err != nil {
|
|
s.logger.Error(err)
|
|
}
|
|
}
|
|
case <-s.lifecycle.ShutdownRequested():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Stop stops worker
|
|
func (s *LargeSegmentUpdateWorker) Stop() {
|
|
if !s.lifecycle.BeginShutdown() {
|
|
s.logger.Debug("Large Segment worker not runnning. Ignoring.")
|
|
return
|
|
}
|
|
s.lifecycle.AwaitShutdownComplete()
|
|
}
|
|
|
|
// IsRunning indicates if worker is running or not
|
|
func (s *LargeSegmentUpdateWorker) IsRunning() bool {
|
|
return s.lifecycle.IsRunning()
|
|
}
|