mattermost-community-enterp.../vendor/github.com/splitio/go-split-commons/v7/tasks/segmentsync.go
Claude ec1f89217a Merge: Complete Mattermost Server with Community Enterprise
Full Mattermost server source with integrated Community Enterprise features.
Includes vendor directory for offline/air-gapped builds.

Structure:
- enterprise-impl/: Enterprise feature implementations
- enterprise-community/: Init files that register implementations
- enterprise/: Bridge imports (community_imports.go)
- vendor/: All dependencies for offline builds

Build (online):
  go build ./cmd/mattermost

Build (offline/air-gapped):
  go build -mod=vendor ./cmd/mattermost

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-17 23:59:07 +09:00

89 lines
2.4 KiB
Go

package tasks
import (
"errors"
"fmt"
"sync/atomic"
healthcheck "github.com/splitio/go-split-commons/v7/healthcheck/application"
"github.com/splitio/go-split-commons/v7/synchronizer/worker/segment"
"github.com/splitio/go-toolkit/v5/asynctask"
"github.com/splitio/go-toolkit/v5/logging"
"github.com/splitio/go-toolkit/v5/workerpool"
)
func updateSegments(
fetcher segment.Updater,
admin *workerpool.WorkerAdmin,
logger logging.LoggerInterface,
) error {
segmentList := fetcher.SegmentNames()
for _, name := range segmentList {
ok := admin.QueueMessage(name)
if !ok {
logger.Error(
fmt.Sprintf("Segment %s could not be added because the job queue is full.\n", name),
fmt.Sprintf(
"You currently have %d segments and the queue size is %d.\n",
len(segmentList),
admin.QueueSize(),
),
"Please consider updating the segment queue size accordingly in the configuration options",
)
}
}
return nil
}
// NewFetchSegmentsTask creates a new segment fetching and storing task
func NewFetchSegmentsTask(
fetcher segment.Updater,
period int,
workerCount int,
queueSize int,
logger logging.LoggerInterface,
appMonitor healthcheck.MonitorProducerInterface,
) *asynctask.AsyncTask {
admin := atomic.Value{}
// After all segments are in sync, add workers to the pool that will keep them up to date
// periodically
onInit := func(logger logging.LoggerInterface) error {
admin.Store(workerpool.NewWorkerAdmin(queueSize, logger))
for i := 0; i < workerCount; i++ {
worker := NewSegmentWorker(
fmt.Sprintf("SegmentWorker_%d", i),
0,
logger,
func(n string, t *int64) error {
_, err := fetcher.SynchronizeSegment(n, t)
return err
},
)
admin.Load().(*workerpool.WorkerAdmin).AddWorker(worker)
}
return nil
}
update := func(logger logging.LoggerInterface) error {
appMonitor.NotifyEvent(healthcheck.Segments)
wa, ok := admin.Load().(*workerpool.WorkerAdmin)
if !ok || wa == nil {
return errors.New("unable to type-assert worker manager")
}
return updateSegments(fetcher, wa, logger)
}
cleanup := func(logger logging.LoggerInterface) {
wa, ok := admin.Load().(*workerpool.WorkerAdmin)
if !ok || wa == nil {
logger.Error("unable to type-assert worker manager")
return
}
wa.StopAll(true)
}
return asynctask.NewAsyncTask("UpdateSegments", update, period, onInit, cleanup, logger)
}