Full Mattermost server source with integrated Community Enterprise features. Includes vendor directory for offline/air-gapped builds. Structure: - enterprise-impl/: Enterprise feature implementations - enterprise-community/: Init files that register implementations - enterprise/: Bridge imports (community_imports.go) - vendor/: All dependencies for offline builds Build (online): go build ./cmd/mattermost Build (offline/air-gapped): go build -mod=vendor ./cmd/mattermost 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
89 lines
2.4 KiB
Go
89 lines
2.4 KiB
Go
package tasks
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync/atomic"
|
|
|
|
healthcheck "github.com/splitio/go-split-commons/v7/healthcheck/application"
|
|
"github.com/splitio/go-split-commons/v7/synchronizer/worker/segment"
|
|
"github.com/splitio/go-toolkit/v5/asynctask"
|
|
"github.com/splitio/go-toolkit/v5/logging"
|
|
"github.com/splitio/go-toolkit/v5/workerpool"
|
|
)
|
|
|
|
func updateSegments(
|
|
fetcher segment.Updater,
|
|
admin *workerpool.WorkerAdmin,
|
|
logger logging.LoggerInterface,
|
|
) error {
|
|
segmentList := fetcher.SegmentNames()
|
|
for _, name := range segmentList {
|
|
ok := admin.QueueMessage(name)
|
|
if !ok {
|
|
logger.Error(
|
|
fmt.Sprintf("Segment %s could not be added because the job queue is full.\n", name),
|
|
fmt.Sprintf(
|
|
"You currently have %d segments and the queue size is %d.\n",
|
|
len(segmentList),
|
|
admin.QueueSize(),
|
|
),
|
|
"Please consider updating the segment queue size accordingly in the configuration options",
|
|
)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NewFetchSegmentsTask creates a new segment fetching and storing task
|
|
func NewFetchSegmentsTask(
|
|
fetcher segment.Updater,
|
|
period int,
|
|
workerCount int,
|
|
queueSize int,
|
|
logger logging.LoggerInterface,
|
|
appMonitor healthcheck.MonitorProducerInterface,
|
|
) *asynctask.AsyncTask {
|
|
|
|
admin := atomic.Value{}
|
|
|
|
// After all segments are in sync, add workers to the pool that will keep them up to date
|
|
// periodically
|
|
onInit := func(logger logging.LoggerInterface) error {
|
|
admin.Store(workerpool.NewWorkerAdmin(queueSize, logger))
|
|
for i := 0; i < workerCount; i++ {
|
|
worker := NewSegmentWorker(
|
|
fmt.Sprintf("SegmentWorker_%d", i),
|
|
0,
|
|
logger,
|
|
func(n string, t *int64) error {
|
|
_, err := fetcher.SynchronizeSegment(n, t)
|
|
return err
|
|
},
|
|
)
|
|
admin.Load().(*workerpool.WorkerAdmin).AddWorker(worker)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
update := func(logger logging.LoggerInterface) error {
|
|
appMonitor.NotifyEvent(healthcheck.Segments)
|
|
wa, ok := admin.Load().(*workerpool.WorkerAdmin)
|
|
if !ok || wa == nil {
|
|
return errors.New("unable to type-assert worker manager")
|
|
}
|
|
return updateSegments(fetcher, wa, logger)
|
|
}
|
|
|
|
cleanup := func(logger logging.LoggerInterface) {
|
|
wa, ok := admin.Load().(*workerpool.WorkerAdmin)
|
|
if !ok || wa == nil {
|
|
logger.Error("unable to type-assert worker manager")
|
|
return
|
|
}
|
|
wa.StopAll(true)
|
|
}
|
|
|
|
return asynctask.NewAsyncTask("UpdateSegments", update, period, onInit, cleanup, logger)
|
|
}
|