// Copyright (c) 2024 Mattermost Community Enterprise // Open source implementation of Mattermost Enterprise Compliance package compliance import ( "archive/zip" "encoding/csv" "fmt" "os" "path/filepath" "strings" "time" "github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/public/shared/mlog" "github.com/mattermost/mattermost/server/public/shared/request" "github.com/mattermost/mattermost/server/v8/channels/store" "github.com/mattermost/mattermost/server/v8/einterfaces" ) const ( ExportBatchSize = 10000 ) type ComplianceImpl struct { store store.Store config func() *model.Config logger mlog.LoggerIFace stopChan chan struct{} isRunning bool } type ComplianceConfig struct { Store store.Store Config func() *model.Config Logger mlog.LoggerIFace } func NewComplianceInterface(cfg *ComplianceConfig) einterfaces.ComplianceInterface { return &ComplianceImpl{ store: cfg.Store, config: cfg.Config, logger: cfg.Logger, stopChan: make(chan struct{}), } } // StartComplianceDailyJob starts the daily compliance job func (c *ComplianceImpl) StartComplianceDailyJob() { if c.isRunning { return } c.isRunning = true c.logger.Info("Starting compliance daily job scheduler") go func() { ticker := time.NewTicker(24 * time.Hour) defer ticker.Stop() // Run once at startup if enabled c.runDailyExport() for { select { case <-c.stopChan: c.logger.Info("Stopping compliance daily job scheduler") return case <-ticker.C: c.runDailyExport() } } }() } func (c *ComplianceImpl) runDailyExport() { config := c.config() if !*config.ComplianceSettings.Enable || !*config.ComplianceSettings.EnableDaily { return } c.logger.Info("Running daily compliance export") now := time.Now() startOfYesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()) endOfYesterday := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) job := &model.Compliance{ Desc: "Daily Export", Type: model.ComplianceTypeDaily, StartAt: startOfYesterday.UnixMilli(), EndAt: endOfYesterday.UnixMilli(), } job.PreSave() // Save the job savedJob, err := c.store.Compliance().Save(job) if err != nil { c.logger.Error("Failed to save daily compliance job", mlog.Err(err)) return } // Run the export rctx := request.EmptyContext(c.logger) if appErr := c.RunComplianceJob(rctx, savedJob); appErr != nil { c.logger.Error("Failed to run daily compliance job", mlog.Err(appErr)) } } // RunComplianceJob runs a compliance export job func (c *ComplianceImpl) RunComplianceJob(rctx request.CTX, job *model.Compliance) *model.AppError { config := c.config() // Update job status to running job.Status = model.ComplianceStatusRunning if _, err := c.store.Compliance().Update(job); err != nil { return model.NewAppError("RunComplianceJob", "ent.compliance.run.update.app_error", nil, "", 500).Wrap(err) } rctx.Logger().Info("Starting compliance export job", mlog.String("job_id", job.Id), mlog.String("job_type", job.Type), ) // Ensure compliance directory exists complianceDir := *config.ComplianceSettings.Directory if complianceDir == "" { complianceDir = "./data/" } complianceDir = filepath.Join(complianceDir, "compliance") if err := os.MkdirAll(complianceDir, 0750); err != nil { c.updateJobStatus(job, model.ComplianceStatusFailed) return model.NewAppError("RunComplianceJob", "ent.compliance.run.mkdir.app_error", nil, "", 500).Wrap(err) } // Create the zip file zipFilePath := filepath.Join(complianceDir, job.JobName()+".zip") zipFile, err := os.Create(zipFilePath) if err != nil { c.updateJobStatus(job, model.ComplianceStatusFailed) return model.NewAppError("RunComplianceJob", "ent.compliance.run.create_zip.app_error", nil, "", 500).Wrap(err) } defer zipFile.Close() zipWriter := zip.NewWriter(zipFile) defer zipWriter.Close() // Create CSV file inside zip csvWriter, err := zipWriter.Create("posts.csv") if err != nil { c.updateJobStatus(job, model.ComplianceStatusFailed) return model.NewAppError("RunComplianceJob", "ent.compliance.run.create_csv.app_error", nil, "", 500).Wrap(err) } writer := csv.NewWriter(csvWriter) // Write header if err := writer.Write(model.CompliancePostHeader()); err != nil { c.updateJobStatus(job, model.ComplianceStatusFailed) return model.NewAppError("RunComplianceJob", "ent.compliance.run.write_header.app_error", nil, "", 500).Wrap(err) } // Export posts in batches cursor := model.ComplianceExportCursor{} totalCount := 0 for { posts, newCursor, err := c.store.Compliance().ComplianceExport(job, cursor, ExportBatchSize) if err != nil { c.updateJobStatus(job, model.ComplianceStatusFailed) return model.NewAppError("RunComplianceJob", "ent.compliance.run.export.app_error", nil, "", 500).Wrap(err) } for _, post := range posts { if err := writer.Write(post.Row()); err != nil { c.updateJobStatus(job, model.ComplianceStatusFailed) return model.NewAppError("RunComplianceJob", "ent.compliance.run.write_row.app_error", nil, "", 500).Wrap(err) } totalCount++ } cursor = newCursor // Check if we're done if cursor.ChannelsQueryCompleted && cursor.DirectMessagesQueryCompleted { break } // Check if we got fewer posts than requested (means we're done) if len(posts) < ExportBatchSize { break } } writer.Flush() if err := writer.Error(); err != nil { c.updateJobStatus(job, model.ComplianceStatusFailed) return model.NewAppError("RunComplianceJob", "ent.compliance.run.flush.app_error", nil, "", 500).Wrap(err) } // Create metadata file metaWriter, err := zipWriter.Create("metadata.txt") if err != nil { c.updateJobStatus(job, model.ComplianceStatusFailed) return model.NewAppError("RunComplianceJob", "ent.compliance.run.create_meta.app_error", nil, "", 500).Wrap(err) } metadata := c.generateMetadata(job, totalCount) if _, err := metaWriter.Write([]byte(metadata)); err != nil { c.updateJobStatus(job, model.ComplianceStatusFailed) return model.NewAppError("RunComplianceJob", "ent.compliance.run.write_meta.app_error", nil, "", 500).Wrap(err) } // Update job status to finished job.Status = model.ComplianceStatusFinished job.Count = totalCount if _, err := c.store.Compliance().Update(job); err != nil { return model.NewAppError("RunComplianceJob", "ent.compliance.run.update_final.app_error", nil, "", 500).Wrap(err) } rctx.Logger().Info("Compliance export job completed", mlog.String("job_id", job.Id), mlog.Int("count", totalCount), ) return nil } func (c *ComplianceImpl) updateJobStatus(job *model.Compliance, status string) { job.Status = status if _, err := c.store.Compliance().Update(job); err != nil { c.logger.Error("Failed to update compliance job status", mlog.String("job_id", job.Id), mlog.String("status", status), mlog.Err(err), ) } } func (c *ComplianceImpl) generateMetadata(job *model.Compliance, count int) string { var sb strings.Builder sb.WriteString("Mattermost Compliance Export\n") sb.WriteString("============================\n\n") sb.WriteString(fmt.Sprintf("Job ID: %s\n", job.Id)) sb.WriteString(fmt.Sprintf("Job Type: %s\n", job.Type)) sb.WriteString(fmt.Sprintf("Description: %s\n", job.Desc)) sb.WriteString(fmt.Sprintf("Created At: %s\n", time.UnixMilli(job.CreateAt).Format(time.RFC3339))) sb.WriteString(fmt.Sprintf("Start Time: %s\n", time.UnixMilli(job.StartAt).Format(time.RFC3339))) sb.WriteString(fmt.Sprintf("End Time: %s\n", time.UnixMilli(job.EndAt).Format(time.RFC3339))) sb.WriteString(fmt.Sprintf("Total Posts: %d\n", count)) if job.Keywords != "" { sb.WriteString(fmt.Sprintf("Keywords Filter: %s\n", job.Keywords)) } if job.Emails != "" { sb.WriteString(fmt.Sprintf("Emails Filter: %s\n", job.Emails)) } sb.WriteString(fmt.Sprintf("\nGenerated At: %s\n", time.Now().Format(time.RFC3339))) sb.WriteString("Generated By: Mattermost Community Enterprise\n") return sb.String() } // Stop stops the compliance service func (c *ComplianceImpl) Stop() { if c.isRunning { close(c.stopChan) c.isRunning = false } }