mattermost-community-enterp.../channels/jobs/schedulers_test.go
Claude ec1f89217a Merge: Complete Mattermost Server with Community Enterprise
Full Mattermost server source with integrated Community Enterprise features.
Includes vendor directory for offline/air-gapped builds.

Structure:
- enterprise-impl/: Enterprise feature implementations
- enterprise-community/: Init files that register implementations
- enterprise/: Bridge imports (community_imports.go)
- vendor/: All dependencies for offline builds

Build (online):
  go build ./cmd/mattermost

Build (offline/air-gapped):
  go build -mod=vendor ./cmd/mattermost

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-17 23:59:07 +09:00

189 lines
5.0 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package jobs
import (
"os"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin/plugintest/mock"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/v8/channels/store/storetest"
"github.com/mattermost/mattermost/server/v8/channels/utils/testutils"
)
type MockScheduler struct {
mock.Mock
}
func (scheduler *MockScheduler) Enabled(cfg *model.Config) bool {
return true
}
func (scheduler *MockScheduler) NextScheduleTime(cfg *model.Config, now time.Time, pendingJobs bool, lastSuccessfulJob *model.Job) *time.Time {
nextTime := time.Now().Add(60 * time.Second)
return &nextTime
}
func (scheduler *MockScheduler) ScheduleJob(rctx request.CTX, cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) {
return nil, nil
}
func TestScheduler(t *testing.T) {
if os.Getenv("ENABLE_FULLY_PARALLEL_TESTS") == "true" {
t.Parallel()
}
mockStore := &storetest.Store{}
defer mockStore.AssertExpectations(t)
job := &model.Job{
Id: model.NewId(),
CreateAt: model.GetMillis(),
Status: model.JobStatusPending,
Type: model.JobTypeMessageExport,
}
// mock job store doesn't return a previously successful job, forcing fallback to config
mockStore.JobStore.On("GetNewestJobByStatusesAndType", mock.AnythingOfType("[]string"), mock.AnythingOfType("string")).Return(job, nil)
mockStore.JobStore.On("GetCountByStatusAndType", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(int64(1), nil)
jobServer := &JobServer{
Store: mockStore,
ConfigService: &testutils.StaticConfigService{
Cfg: &model.Config{
// mock config
DataRetentionSettings: model.DataRetentionSettings{
EnableMessageDeletion: model.NewPointer(true),
},
MessageExportSettings: model.MessageExportSettings{
EnableExport: model.NewPointer(true),
},
},
},
}
jobServer.initSchedulers()
jobServer.RegisterJobType(model.JobTypeDataRetention, nil, new(MockScheduler))
jobServer.RegisterJobType(model.JobTypeMessageExport, nil, new(MockScheduler))
t.Run("Base", func(t *testing.T) {
err := jobServer.StartSchedulers()
require.NoError(t, err)
time.Sleep(2 * time.Second)
err = jobServer.StopSchedulers()
require.NoError(t, err)
// They should be all on here
for _, element := range jobServer.schedulers.nextRunTimes {
assert.NotNil(t, element)
}
})
t.Run("ClusterLeaderChanged", func(t *testing.T) {
jobServer.initSchedulers()
err := jobServer.StartSchedulers()
require.NoError(t, err)
time.Sleep(2 * time.Second)
jobServer.HandleClusterLeaderChange(false)
err = jobServer.StopSchedulers()
require.NoError(t, err)
// They should be turned off
for _, element := range jobServer.schedulers.nextRunTimes {
assert.Nil(t, element)
}
})
t.Run("ClusterLeaderChangedBeforeStart", func(t *testing.T) {
jobServer.initSchedulers()
jobServer.HandleClusterLeaderChange(false)
err := jobServer.StartSchedulers()
require.NoError(t, err)
time.Sleep(2 * time.Second)
err = jobServer.StopSchedulers()
require.NoError(t, err)
for _, element := range jobServer.schedulers.nextRunTimes {
assert.Nil(t, element)
}
})
t.Run("DoubleClusterLeaderChangedBeforeStart", func(t *testing.T) {
jobServer.initSchedulers()
jobServer.HandleClusterLeaderChange(false)
jobServer.HandleClusterLeaderChange(true)
err := jobServer.StartSchedulers()
require.NoError(t, err)
time.Sleep(2 * time.Second)
err = jobServer.StopSchedulers()
require.NoError(t, err)
for _, element := range jobServer.schedulers.nextRunTimes {
assert.NotNil(t, element)
}
})
t.Run("ConfigChanged", func(t *testing.T) {
jobServer.initSchedulers()
err := jobServer.StartSchedulers()
require.NoError(t, err)
time.Sleep(2 * time.Second)
jobServer.HandleClusterLeaderChange(false)
// After running a config change, they should stay off
jobServer.schedulers.handleConfigChange(nil, nil)
err = jobServer.StopSchedulers()
require.NoError(t, err)
for _, element := range jobServer.schedulers.nextRunTimes {
assert.Nil(t, element)
}
})
t.Run("ConfigChangedDeadlock", func(t *testing.T) {
jobServer.initSchedulers()
err := jobServer.StartSchedulers()
require.NoError(t, err)
time.Sleep(2 * time.Second)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
err := jobServer.StopSchedulers()
require.NoError(t, err)
}()
go func() {
defer wg.Done()
jobServer.schedulers.handleConfigChange(nil, nil)
}()
wg.Wait()
})
}
func TestRandomDelay(t *testing.T) {
if os.Getenv("ENABLE_FULLY_PARALLEL_TESTS") == "true" {
t.Parallel()
}
cases := []int64{5, 10, 100}
for _, c := range cases {
out := getRandomDelay(c)
require.Less(t, out.Milliseconds(), c)
}
}