mattermost-community-enterp.../vendor/github.com/opensearch-project/opensearch-go/v4/opensearchutil/bulk_indexer.go
Claude ec1f89217a Merge: Complete Mattermost Server with Community Enterprise
Full Mattermost server source with integrated Community Enterprise features.
Includes vendor directory for offline/air-gapped builds.

Structure:
- enterprise-impl/: Enterprise feature implementations
- enterprise-community/: Init files that register implementations
- enterprise/: Bridge imports (community_imports.go)
- vendor/: All dependencies for offline builds

Build (online):
  go build ./cmd/mattermost

Build (offline/air-gapped):
  go build -mod=vendor ./cmd/mattermost

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-17 23:59:07 +09:00

566 lines
14 KiB
Go

// SPDX-License-Identifier: Apache-2.0
//
// The OpenSearch Contributors require contributions made to
// this file be licensed under the Apache-2.0 license or a
// compatible open source license.
//
// Modifications Copyright OpenSearch Contributors. See
// GitHub history for details.
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package opensearchutil
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
)
const defaultFlushInterval = 30 * time.Second
// BulkIndexer represents a parallel, asynchronous, efficient indexer for OpenSearch.
type BulkIndexer interface {
// Add adds an item to the indexer. It returns an error when the item cannot be added.
// Use the OnSuccess and OnFailure callbacks to get the operation result for the item.
//
// You must call the Close() method after you're done adding items.
//
// It is safe for concurrent use. When it's called from goroutines,
// they must finish before the call to Close, eg. using sync.WaitGroup.
Add(context.Context, BulkIndexerItem) error
// Close waits until all added items are flushed and closes the indexer.
Close(context.Context) error
// Stats returns indexer statistics.
Stats() BulkIndexerStats
}
// BulkIndexerConfig represents configuration of the indexer.
type BulkIndexerConfig struct {
NumWorkers int // The number of workers. Defaults to runtime.NumCPU().
FlushBytes int // The flush threshold in bytes. Defaults to 5MB.
FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec.
Client *opensearchapi.Client // The OpenSearch client.
DebugLogger BulkIndexerDebugLogger // An optional logger for debugging.
OnError func(context.Context, error) // Called for indexer errors.
OnFlushStart func(context.Context) context.Context // Called when the flush starts.
OnFlushEnd func(context.Context) // Called when the flush ends.
// Parameters of the Bulk API.
Index string
ErrorTrace bool
Header http.Header
Human bool
Pipeline string
Pretty bool
Refresh string
Routing string
Source []string
SourceExcludes []string
SourceIncludes []string
Timeout time.Duration
WaitForActiveShards string
}
// BulkIndexerStats represents the indexer statistics.
type BulkIndexerStats struct {
NumAdded uint64
NumFlushed uint64
NumFailed uint64
NumIndexed uint64
NumCreated uint64
NumUpdated uint64
NumDeleted uint64
NumRequests uint64
}
// BulkIndexerItem represents an indexer item.
type BulkIndexerItem struct {
Index string
Action string
DocumentID string
Routing *string
Version *int64
VersionType *string
IfSeqNum *int64
IfPrimaryTerm *int64
WaitForActiveShards interface{}
Refresh *string
RequireAlias *bool
Body io.ReadSeeker
RetryOnConflict *int
OnSuccess func(context.Context, BulkIndexerItem, opensearchapi.BulkRespItem) // Per item
OnFailure func(context.Context, BulkIndexerItem, opensearchapi.BulkRespItem, error) // Per item
}
type bulkActionMetadata struct {
Index string `json:"_index,omitempty"`
DocumentID string `json:"_id,omitempty"`
Routing *string `json:"routing,omitempty"`
Version *int64 `json:"version,omitempty"`
VersionType *string `json:"version_type,omitempty"`
IfSeqNum *int64 `json:"if_seq_no,omitempty"`
IfPrimaryTerm *int64 `json:"if_primary_term,omitempty"`
WaitForActiveShards interface{} `json:"wait_for_active_shards,omitempty"`
Refresh *string `json:"refresh,omitempty"`
RequireAlias *bool `json:"require_alias,omitempty"`
RetryOnConflict *int `json:"retry_on_conflict,omitempty"`
}
// BulkIndexerDebugLogger defines the interface for a debugging logger.
type BulkIndexerDebugLogger interface {
Printf(string, ...interface{})
}
type bulkIndexer struct {
wg sync.WaitGroup
queue chan BulkIndexerItem
workers []*worker
ticker *time.Ticker
done chan bool
stats *bulkIndexerStats
config BulkIndexerConfig
}
type bulkIndexerStats struct {
numAdded uint64
numFlushed uint64
numFailed uint64
numIndexed uint64
numCreated uint64
numUpdated uint64
numDeleted uint64
numRequests uint64
}
// NewBulkIndexer creates a new bulk indexer.
func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) {
if cfg.Client == nil {
var err error
cfg.Client, err = opensearchapi.NewDefaultClient()
if err != nil {
return nil, err
}
}
if cfg.NumWorkers == 0 {
cfg.NumWorkers = runtime.NumCPU()
}
if cfg.FlushBytes == 0 {
cfg.FlushBytes = 5e+6
}
if cfg.FlushInterval == 0 {
cfg.FlushInterval = defaultFlushInterval
}
bi := bulkIndexer{
config: cfg,
done: make(chan bool),
stats: &bulkIndexerStats{},
}
bi.init()
return &bi, nil
}
// Add adds an item to the indexer.
//
// Adding an item after a call to Close() will panic.
func (bi *bulkIndexer) Add(ctx context.Context, item BulkIndexerItem) error {
atomic.AddUint64(&bi.stats.numAdded, 1)
select {
case <-ctx.Done():
if bi.config.OnError != nil {
bi.config.OnError(ctx, ctx.Err())
}
return ctx.Err()
case bi.queue <- item:
}
return nil
}
// Close stops the periodic flush, closes the indexer queue channel,
// notifies the done channel and calls flush on all writers.
func (bi *bulkIndexer) Close(ctx context.Context) error {
bi.ticker.Stop()
close(bi.queue)
bi.done <- true
select {
case <-ctx.Done():
if bi.config.OnError != nil {
bi.config.OnError(ctx, ctx.Err())
}
return ctx.Err()
default:
bi.wg.Wait()
}
for _, w := range bi.workers {
w.mu.Lock()
if w.buf.Len() > 0 {
if err := w.flush(ctx); err != nil {
w.mu.Unlock()
if bi.config.OnError != nil {
bi.config.OnError(ctx, err)
}
continue
}
}
w.mu.Unlock()
}
return nil
}
// Stats returns indexer statistics.
func (bi *bulkIndexer) Stats() BulkIndexerStats {
return BulkIndexerStats{
NumAdded: atomic.LoadUint64(&bi.stats.numAdded),
NumFlushed: atomic.LoadUint64(&bi.stats.numFlushed),
NumFailed: atomic.LoadUint64(&bi.stats.numFailed),
NumIndexed: atomic.LoadUint64(&bi.stats.numIndexed),
NumCreated: atomic.LoadUint64(&bi.stats.numCreated),
NumUpdated: atomic.LoadUint64(&bi.stats.numUpdated),
NumDeleted: atomic.LoadUint64(&bi.stats.numDeleted),
NumRequests: atomic.LoadUint64(&bi.stats.numRequests),
}
}
// init initializes the bulk indexer.
func (bi *bulkIndexer) init() {
bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers)
for i := 1; i <= bi.config.NumWorkers; i++ {
w := worker{
id: i,
ch: bi.queue,
bi: bi,
buf: bytes.NewBuffer(make([]byte, 0, bi.config.FlushBytes)),
//nolint:gomnd // Predefine the slice capacity
aux: make([]byte, 0, 512),
}
w.run()
bi.workers = append(bi.workers, &w)
}
bi.wg.Add(bi.config.NumWorkers)
bi.ticker = time.NewTicker(bi.config.FlushInterval)
go func() {
ctx := context.Background()
for {
select {
case <-bi.done:
return
case <-bi.ticker.C:
if bi.config.DebugLogger != nil {
bi.config.DebugLogger.Printf("[indexer] Auto-flushing workers after %s\n", bi.config.FlushInterval)
}
for _, w := range bi.workers {
w.mu.Lock()
if w.buf.Len() > 0 {
if err := w.flush(ctx); err != nil {
w.mu.Unlock()
if bi.config.OnError != nil {
bi.config.OnError(ctx, err)
}
continue
}
}
w.mu.Unlock()
}
}
}
}()
}
// worker represents an indexer worker.
type worker struct {
id int
ch <-chan BulkIndexerItem
mu sync.Mutex
bi *bulkIndexer
buf *bytes.Buffer
aux []byte
items []BulkIndexerItem
}
// run launches the worker in a goroutine.
func (w *worker) run() {
go func() {
ctx := context.Background()
if w.bi.config.DebugLogger != nil {
w.bi.config.DebugLogger.Printf("[worker-%03d] Started\n", w.id)
}
defer w.bi.wg.Done()
for item := range w.ch {
w.mu.Lock()
if w.bi.config.DebugLogger != nil {
w.bi.config.DebugLogger.Printf("[worker-%03d] Received item [%s:%s]\n", w.id, item.Action,
item.DocumentID)
}
if err := w.writeMeta(item); err != nil {
if item.OnFailure != nil {
item.OnFailure(ctx, item, opensearchapi.BulkRespItem{}, err)
}
atomic.AddUint64(&w.bi.stats.numFailed, 1)
w.mu.Unlock()
continue
}
if err := w.writeBody(&item); err != nil {
if item.OnFailure != nil {
item.OnFailure(ctx, item, opensearchapi.BulkRespItem{}, err)
}
atomic.AddUint64(&w.bi.stats.numFailed, 1)
w.mu.Unlock()
continue
}
w.items = append(w.items, item)
if w.buf.Len() >= w.bi.config.FlushBytes {
if err := w.flush(ctx); err != nil {
w.mu.Unlock()
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, err)
}
continue
}
}
w.mu.Unlock()
}
}()
}
// writeMeta formats and writes the item metadata to the buffer; it must be called under a lock.
func (w *worker) writeMeta(item BulkIndexerItem) error {
var err error
meta := bulkActionMetadata{
Index: item.Index,
DocumentID: item.DocumentID,
Version: item.Version,
VersionType: item.VersionType,
Routing: item.Routing,
IfPrimaryTerm: item.IfPrimaryTerm,
IfSeqNum: item.IfSeqNum,
WaitForActiveShards: item.WaitForActiveShards,
Refresh: item.Refresh,
RequireAlias: item.RequireAlias,
RetryOnConflict: item.RetryOnConflict,
}
// Can not specify version or seq num if no document ID is passed
if meta.DocumentID == "" {
meta.Version = nil
meta.VersionType = nil
}
w.aux, err = json.Marshal(map[string]bulkActionMetadata{
item.Action: meta,
})
if err != nil {
return err
}
_, err = w.buf.Write(w.aux)
if err != nil {
return err
}
w.aux = w.aux[:0]
_, err = w.buf.WriteRune('\n')
if err != nil {
return err
}
return nil
}
// writeBody writes the item body to the buffer; it must be called under a lock.
func (w *worker) writeBody(item *BulkIndexerItem) error {
if item.Body == nil {
return nil
}
if _, err := w.buf.ReadFrom(item.Body); err != nil {
if w.bi.config.OnError != nil {
w.bi.config.OnError(context.Background(), err)
}
return err
}
if _, err := item.Body.Seek(0, io.SeekStart); err != nil {
if w.bi.config.OnError != nil {
w.bi.config.OnError(context.Background(), err)
}
return err
}
w.buf.WriteRune('\n')
return nil
}
// flush writes out the worker buffer; it must be called under a lock.
func (w *worker) flush(ctx context.Context) error {
if w.bi.config.OnFlushStart != nil {
ctx = w.bi.config.OnFlushStart(ctx)
}
if w.bi.config.OnFlushEnd != nil {
defer func() { w.bi.config.OnFlushEnd(ctx) }()
}
if w.buf.Len() < 1 {
if w.bi.config.DebugLogger != nil {
w.bi.config.DebugLogger.Printf("[worker-%03d] Flush: Buffer empty\n", w.id)
}
return nil
}
var (
err error
blk *opensearchapi.BulkResp
)
defer func() {
w.items = w.items[:0]
w.buf.Reset()
}()
if w.bi.config.DebugLogger != nil {
w.bi.config.DebugLogger.Printf("[worker-%03d] Flush: %s\n", w.id, w.buf.String())
}
atomic.AddUint64(&w.bi.stats.numRequests, 1)
req := opensearchapi.BulkReq{
Index: w.bi.config.Index,
Body: w.buf,
Params: opensearchapi.BulkParams{
Pipeline: w.bi.config.Pipeline,
Refresh: w.bi.config.Refresh,
Routing: w.bi.config.Routing,
Source: w.bi.config.Source,
SourceExcludes: w.bi.config.SourceExcludes,
SourceIncludes: w.bi.config.SourceIncludes,
Timeout: w.bi.config.Timeout,
WaitForActiveShards: w.bi.config.WaitForActiveShards,
Pretty: w.bi.config.Pretty,
Human: w.bi.config.Human,
ErrorTrace: w.bi.config.ErrorTrace,
},
Header: w.bi.config.Header,
}
blk, err = w.bi.config.Client.Bulk(ctx, req)
if err != nil {
return w.handleBulkError(ctx, fmt.Errorf("flush: %w", err))
}
for i, blkItem := range blk.Items {
var (
item BulkIndexerItem
info opensearchapi.BulkRespItem
op string
)
item = w.items[i]
// The OpenSearch bulk response contains an array of maps like this:
// [ { "index": { ... } }, { "create": { ... } }, ... ]
// We range over the map, to set the last key and value as "op" and "info".
for k, v := range blkItem {
op = k
info = v
}
if info.Error != nil || info.Status > 201 {
atomic.AddUint64(&w.bi.stats.numFailed, 1)
if item.OnFailure != nil {
item.OnFailure(ctx, item, info, nil)
}
} else {
atomic.AddUint64(&w.bi.stats.numFlushed, 1)
switch op {
case "index":
atomic.AddUint64(&w.bi.stats.numIndexed, 1)
case "create":
atomic.AddUint64(&w.bi.stats.numCreated, 1)
case "delete":
atomic.AddUint64(&w.bi.stats.numDeleted, 1)
case "update":
atomic.AddUint64(&w.bi.stats.numUpdated, 1)
}
if item.OnSuccess != nil {
item.OnSuccess(ctx, item, info)
}
}
}
return err
}
func (w *worker) handleBulkError(ctx context.Context, err error) error {
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))
// info (the response item) will be empty since the bulk request failed
var info opensearchapi.BulkRespItem
for i := range w.items {
if item := w.items[i]; item.OnFailure != nil {
item.OnFailure(ctx, item, info, err)
}
}
return err
}