mattermost-community-enterp.../vendor/github.com/mattermost-community/enterprise/compliance/compliance.go
Claude ec1f89217a Merge: Complete Mattermost Server with Community Enterprise
Full Mattermost server source with integrated Community Enterprise features.
Includes vendor directory for offline/air-gapped builds.

Structure:
- enterprise-impl/: Enterprise feature implementations
- enterprise-community/: Init files that register implementations
- enterprise/: Bridge imports (community_imports.go)
- vendor/: All dependencies for offline builds

Build (online):
  go build ./cmd/mattermost

Build (offline/air-gapped):
  go build -mod=vendor ./cmd/mattermost

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-17 23:59:07 +09:00

275 lines
8.0 KiB
Go

// Copyright (c) 2024 Mattermost Community Enterprise
// Open source implementation of Mattermost Enterprise Compliance
package compliance
import (
"archive/zip"
"encoding/csv"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/v8/channels/store"
"github.com/mattermost/mattermost/server/v8/einterfaces"
)
const (
ExportBatchSize = 10000
)
type ComplianceImpl struct {
store store.Store
config func() *model.Config
logger mlog.LoggerIFace
stopChan chan struct{}
isRunning bool
}
type ComplianceConfig struct {
Store store.Store
Config func() *model.Config
Logger mlog.LoggerIFace
}
func NewComplianceInterface(cfg *ComplianceConfig) einterfaces.ComplianceInterface {
return &ComplianceImpl{
store: cfg.Store,
config: cfg.Config,
logger: cfg.Logger,
stopChan: make(chan struct{}),
}
}
// StartComplianceDailyJob starts the daily compliance job
func (c *ComplianceImpl) StartComplianceDailyJob() {
if c.isRunning {
return
}
c.isRunning = true
c.logger.Info("Starting compliance daily job scheduler")
go func() {
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
// Run once at startup if enabled
c.runDailyExport()
for {
select {
case <-c.stopChan:
c.logger.Info("Stopping compliance daily job scheduler")
return
case <-ticker.C:
c.runDailyExport()
}
}
}()
}
func (c *ComplianceImpl) runDailyExport() {
config := c.config()
if !*config.ComplianceSettings.Enable || !*config.ComplianceSettings.EnableDaily {
return
}
c.logger.Info("Running daily compliance export")
now := time.Now()
startOfYesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())
endOfYesterday := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
job := &model.Compliance{
Desc: "Daily Export",
Type: model.ComplianceTypeDaily,
StartAt: startOfYesterday.UnixMilli(),
EndAt: endOfYesterday.UnixMilli(),
}
job.PreSave()
// Save the job
savedJob, err := c.store.Compliance().Save(job)
if err != nil {
c.logger.Error("Failed to save daily compliance job", mlog.Err(err))
return
}
// Run the export
rctx := request.EmptyContext(c.logger)
if appErr := c.RunComplianceJob(rctx, savedJob); appErr != nil {
c.logger.Error("Failed to run daily compliance job", mlog.Err(appErr))
}
}
// RunComplianceJob runs a compliance export job
func (c *ComplianceImpl) RunComplianceJob(rctx request.CTX, job *model.Compliance) *model.AppError {
config := c.config()
// Update job status to running
job.Status = model.ComplianceStatusRunning
if _, err := c.store.Compliance().Update(job); err != nil {
return model.NewAppError("RunComplianceJob", "ent.compliance.run.update.app_error", nil, "", 500).Wrap(err)
}
rctx.Logger().Info("Starting compliance export job",
mlog.String("job_id", job.Id),
mlog.String("job_type", job.Type),
)
// Ensure compliance directory exists
complianceDir := *config.ComplianceSettings.Directory
if complianceDir == "" {
complianceDir = "./data/"
}
complianceDir = filepath.Join(complianceDir, "compliance")
if err := os.MkdirAll(complianceDir, 0750); err != nil {
c.updateJobStatus(job, model.ComplianceStatusFailed)
return model.NewAppError("RunComplianceJob", "ent.compliance.run.mkdir.app_error", nil, "", 500).Wrap(err)
}
// Create the zip file
zipFilePath := filepath.Join(complianceDir, job.JobName()+".zip")
zipFile, err := os.Create(zipFilePath)
if err != nil {
c.updateJobStatus(job, model.ComplianceStatusFailed)
return model.NewAppError("RunComplianceJob", "ent.compliance.run.create_zip.app_error", nil, "", 500).Wrap(err)
}
defer zipFile.Close()
zipWriter := zip.NewWriter(zipFile)
defer zipWriter.Close()
// Create CSV file inside zip
csvWriter, err := zipWriter.Create("posts.csv")
if err != nil {
c.updateJobStatus(job, model.ComplianceStatusFailed)
return model.NewAppError("RunComplianceJob", "ent.compliance.run.create_csv.app_error", nil, "", 500).Wrap(err)
}
writer := csv.NewWriter(csvWriter)
// Write header
if err := writer.Write(model.CompliancePostHeader()); err != nil {
c.updateJobStatus(job, model.ComplianceStatusFailed)
return model.NewAppError("RunComplianceJob", "ent.compliance.run.write_header.app_error", nil, "", 500).Wrap(err)
}
// Export posts in batches
cursor := model.ComplianceExportCursor{}
totalCount := 0
for {
posts, newCursor, err := c.store.Compliance().ComplianceExport(job, cursor, ExportBatchSize)
if err != nil {
c.updateJobStatus(job, model.ComplianceStatusFailed)
return model.NewAppError("RunComplianceJob", "ent.compliance.run.export.app_error", nil, "", 500).Wrap(err)
}
for _, post := range posts {
if err := writer.Write(post.Row()); err != nil {
c.updateJobStatus(job, model.ComplianceStatusFailed)
return model.NewAppError("RunComplianceJob", "ent.compliance.run.write_row.app_error", nil, "", 500).Wrap(err)
}
totalCount++
}
cursor = newCursor
// Check if we're done
if cursor.ChannelsQueryCompleted && cursor.DirectMessagesQueryCompleted {
break
}
// Check if we got fewer posts than requested (means we're done)
if len(posts) < ExportBatchSize {
break
}
}
writer.Flush()
if err := writer.Error(); err != nil {
c.updateJobStatus(job, model.ComplianceStatusFailed)
return model.NewAppError("RunComplianceJob", "ent.compliance.run.flush.app_error", nil, "", 500).Wrap(err)
}
// Create metadata file
metaWriter, err := zipWriter.Create("metadata.txt")
if err != nil {
c.updateJobStatus(job, model.ComplianceStatusFailed)
return model.NewAppError("RunComplianceJob", "ent.compliance.run.create_meta.app_error", nil, "", 500).Wrap(err)
}
metadata := c.generateMetadata(job, totalCount)
if _, err := metaWriter.Write([]byte(metadata)); err != nil {
c.updateJobStatus(job, model.ComplianceStatusFailed)
return model.NewAppError("RunComplianceJob", "ent.compliance.run.write_meta.app_error", nil, "", 500).Wrap(err)
}
// Update job status to finished
job.Status = model.ComplianceStatusFinished
job.Count = totalCount
if _, err := c.store.Compliance().Update(job); err != nil {
return model.NewAppError("RunComplianceJob", "ent.compliance.run.update_final.app_error", nil, "", 500).Wrap(err)
}
rctx.Logger().Info("Compliance export job completed",
mlog.String("job_id", job.Id),
mlog.Int("count", totalCount),
)
return nil
}
func (c *ComplianceImpl) updateJobStatus(job *model.Compliance, status string) {
job.Status = status
if _, err := c.store.Compliance().Update(job); err != nil {
c.logger.Error("Failed to update compliance job status",
mlog.String("job_id", job.Id),
mlog.String("status", status),
mlog.Err(err),
)
}
}
func (c *ComplianceImpl) generateMetadata(job *model.Compliance, count int) string {
var sb strings.Builder
sb.WriteString("Mattermost Compliance Export\n")
sb.WriteString("============================\n\n")
sb.WriteString(fmt.Sprintf("Job ID: %s\n", job.Id))
sb.WriteString(fmt.Sprintf("Job Type: %s\n", job.Type))
sb.WriteString(fmt.Sprintf("Description: %s\n", job.Desc))
sb.WriteString(fmt.Sprintf("Created At: %s\n", time.UnixMilli(job.CreateAt).Format(time.RFC3339)))
sb.WriteString(fmt.Sprintf("Start Time: %s\n", time.UnixMilli(job.StartAt).Format(time.RFC3339)))
sb.WriteString(fmt.Sprintf("End Time: %s\n", time.UnixMilli(job.EndAt).Format(time.RFC3339)))
sb.WriteString(fmt.Sprintf("Total Posts: %d\n", count))
if job.Keywords != "" {
sb.WriteString(fmt.Sprintf("Keywords Filter: %s\n", job.Keywords))
}
if job.Emails != "" {
sb.WriteString(fmt.Sprintf("Emails Filter: %s\n", job.Emails))
}
sb.WriteString(fmt.Sprintf("\nGenerated At: %s\n", time.Now().Format(time.RFC3339)))
sb.WriteString("Generated By: Mattermost Community Enterprise\n")
return sb.String()
}
// Stop stops the compliance service
func (c *ComplianceImpl) Stop() {
if c.isRunning {
close(c.stopChan)
c.isRunning = false
}
}