mattermost-community-enterp.../enterprise/elasticsearch/opensearch/bulk.go
Claude ec1f89217a Merge: Complete Mattermost Server with Community Enterprise
Full Mattermost server source with integrated Community Enterprise features.
Includes vendor directory for offline/air-gapped builds.

Structure:
- enterprise-impl/: Enterprise feature implementations
- enterprise-community/: Init files that register implementations
- enterprise/: Bridge imports (community_imports.go)
- vendor/: All dependencies for offline builds

Build (online):
  go build ./cmd/mattermost

Build (offline/air-gapped):
  go build -mod=vendor ./cmd/mattermost

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-17 23:59:07 +09:00

192 lines
3.9 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.enterprise for license information.
package opensearch
import (
"bytes"
"context"
"encoding/json"
"sync"
"time"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/v8/enterprise/elasticsearch/common"
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
)
type Bulk struct {
mut sync.Mutex
buf *bytes.Buffer
client *opensearchapi.Client
bulkSettings common.BulkSettings
reqTimeout time.Duration
logger mlog.LoggerIFace
quitFlusher chan struct{}
quitFlusherWg sync.WaitGroup
pendingRequests int
}
func NewBulk(bulkSettings common.BulkSettings,
client *opensearchapi.Client,
reqTimeout time.Duration,
logger mlog.LoggerIFace,
) *Bulk {
b := &Bulk{
bulkSettings: bulkSettings,
reqTimeout: reqTimeout,
logger: logger,
client: client,
quitFlusher: make(chan struct{}),
buf: &bytes.Buffer{},
}
// Start the timer only if a flush interval was specified
if bulkSettings.FlushInterval > 0 {
b.quitFlusherWg.Add(1)
go b.periodicFlusher()
}
return b
}
// IndexOp is a helper function to add an IndexOperation to the current bulk request.
// doc argument can be a []byte, json.RawMessage or a struct.
func (r *Bulk) IndexOp(op *types.IndexOperation, doc any) error {
r.mut.Lock()
defer r.mut.Unlock()
operation := types.OperationContainer{Index: op}
header, err := json.Marshal(operation)
if err != nil {
return err
}
r.buf.Write(header)
r.buf.Write([]byte("\n"))
switch v := doc.(type) {
case []byte:
r.buf.Write(v)
case json.RawMessage:
r.buf.Write(v)
default:
body, err := json.Marshal(doc)
if err != nil {
return err
}
r.buf.Write(body)
}
r.buf.Write([]byte("\n"))
return r.flushIfNecessary()
}
// DeleteOp is a helper function to add a DeleteOperation to the current bulk request.
func (r *Bulk) DeleteOp(op *types.DeleteOperation) error {
r.mut.Lock()
defer r.mut.Unlock()
operation := types.OperationContainer{Delete: op}
header, err := json.Marshal(operation)
if err != nil {
return err
}
r.buf.Write(header)
r.buf.Write([]byte("\n"))
return r.flushIfNecessary()
}
// flushIfNecessary flushes the pending buffer if needed.
// It MUST be called with an already acquired mutex.
func (r *Bulk) flushIfNecessary() error {
// Check data threshold, only if specified
if r.bulkSettings.FlushBytes > 0 {
if r.buf.Len() >= r.bulkSettings.FlushBytes {
return r._flush()
}
}
r.pendingRequests++
// Check number of requests threshold, only if specified
if r.bulkSettings.FlushNumReqs > 0 {
if r.pendingRequests > r.bulkSettings.FlushNumReqs {
return r._flush()
}
}
return nil
}
func (r *Bulk) Stop() error {
r.mut.Lock()
defer r.mut.Unlock()
r.logger.Info("Stopping Bulk processor")
if r.pendingRequests > 0 {
return r._flush()
}
// Cleanup the timer if the flush interval was specified
if r.bulkSettings.FlushInterval > 0 {
close(r.quitFlusher)
r.quitFlusherWg.Wait()
}
return nil
}
func (r *Bulk) periodicFlusher() {
defer r.quitFlusherWg.Done()
for {
select {
case <-time.After(r.bulkSettings.FlushInterval):
r.mut.Lock()
if r.pendingRequests > 0 {
if err := r._flush(); err != nil {
r.logger.Warn("Error flushing live indexing buffer", mlog.Err(err))
}
}
r.mut.Unlock()
case <-r.quitFlusher:
return
}
}
}
// _flush MUST be called with an acquired lock.
func (r *Bulk) _flush() error {
if r.pendingRequests == 0 {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), r.reqTimeout)
defer cancel()
_, err := r.client.Bulk(ctx, opensearchapi.BulkReq{
Body: bytes.NewReader(r.buf.Bytes()),
})
if err != nil {
return err
}
r.buf.Reset()
r.pendingRequests = 0
return nil
}
func (r *Bulk) Flush() error {
r.mut.Lock()
defer r.mut.Unlock()
return r._flush()
}