mattermost-community-enterp.../vendor/github.com/splitio/go-toolkit/v5/asynctask/asynctasks.go
Claude ec1f89217a Merge: Complete Mattermost Server with Community Enterprise
Full Mattermost server source with integrated Community Enterprise features.
Includes vendor directory for offline/air-gapped builds.

Structure:
- enterprise-impl/: Enterprise feature implementations
- enterprise-community/: Init files that register implementations
- enterprise/: Bridge imports (community_imports.go)
- vendor/: All dependencies for offline builds

Build (online):
  go build ./cmd/mattermost

Build (offline/air-gapped):
  go build -mod=vendor ./cmd/mattermost

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-17 23:59:07 +09:00

151 lines
3.7 KiB
Go

package asynctask
import (
"fmt"
"time"
"github.com/splitio/go-toolkit/v5/logging"
"github.com/splitio/go-toolkit/v5/struct/traits/lifecycle"
)
// AsyncTask is a struct that wraps tasks that should run periodically and can be remotely stopped & started,
// as well as making it's status (running/stopped) available.
type AsyncTask struct {
lifecycle lifecycle.Manager
task func(l logging.LoggerInterface) error
name string
incoming chan int
period int
onInit func(l logging.LoggerInterface) error
onStop func(l logging.LoggerInterface)
logger logging.LoggerInterface
}
const (
taskMessageWakeup = iota
)
// Start initiates the task. It wraps the execution in a closure guarded by a call to recover() in order
// to prevent the main application from crashin if something goes wrong while the sdk interacts with the backend.
func (t *AsyncTask) Start() {
if !t.lifecycle.BeginInitialization() {
if t.logger != nil {
t.logger.Warning(fmt.Sprintf("Task %s is not idle. Aborting new execution.", t.name))
}
return
}
go func() {
defer func() {
if r := recover(); r != nil {
if t.logger != nil {
t.logger.Error(fmt.Sprintf(
"AsyncTask %s is panicking! shutting down. Consider restarting this instance and raising an issue",
t.name,
))
t.logger.Error(r)
}
}
}()
defer t.lifecycle.ShutdownComplete()
if !t.lifecycle.InitializationComplete() {
return
}
// If there's an initialization function, execute it
if t.onInit != nil {
err := t.onInit(t.logger)
if err != nil { // If something goes wrong during initialization, abort.
if t.logger != nil {
t.logger.Error(fmt.Sprintf("task '%s' initialization failed: %s", t.name, err.Error()))
}
t.lifecycle.AbnormalShutdown()
return
}
}
// Create timeout timer
idleDuration := time.Second * time.Duration(t.period)
taskTimer := time.NewTimer(idleDuration)
defer taskTimer.Stop()
if t.onStop != nil {
defer t.onStop(t.logger)
}
// Task execution
for {
select {
case <-t.lifecycle.ShutdownRequested():
return
case <-t.incoming: // wake up signal
case <-taskTimer.C: // Timedout
}
// Run the wrapped task and handle the returned error if any.
err := t.task(t.logger)
if err != nil && t.logger != nil {
t.logger.Error(fmt.Sprintf("task '%s' failed with error: %s", t.name, err.Error()))
}
// Resetting timer
taskTimer.Reset(idleDuration)
}
}()
}
func (t *AsyncTask) sendSignal(signal int) error {
select {
case t.incoming <- signal:
return nil
default:
return fmt.Errorf("Couldn't send message to task %s", t.name)
}
}
// Stop executes onStop hook if any, blocks until its done (if blocking = true) and prevents future executions of the task.
func (t *AsyncTask) Stop(blocking bool) error {
if !t.lifecycle.BeginShutdown() {
return fmt.Errorf("task '%s' not running", t.name)
}
if blocking {
t.lifecycle.AwaitShutdownComplete()
}
return nil
}
// WakeUp interrupts the task's sleep period and resumes execution
func (t *AsyncTask) WakeUp() error {
return t.sendSignal(taskMessageWakeup)
}
// IsRunning returns true if the task is currently running
func (t *AsyncTask) IsRunning() bool {
return t.lifecycle.IsRunning()
}
// NewAsyncTask creates a new task and returns a pointer to it
func NewAsyncTask(
name string,
task func(l logging.LoggerInterface) error,
period int,
onInit func(l logging.LoggerInterface) error,
onStop func(l logging.LoggerInterface),
logger logging.LoggerInterface,
) *AsyncTask {
t := AsyncTask{
name: name,
task: task,
period: period,
onInit: onInit,
onStop: onStop,
logger: logger,
incoming: make(chan int, 10),
}
t.lifecycle.Setup()
return &t
}