// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package log // import "go.opentelemetry.io/otel/sdk/log"

import (
	
	
	
	
	
	

	
)

const (
	dfltMaxQSize        = 2048
	dfltExpInterval     = time.Second
	dfltExpTimeout      = 30 * time.Second
	dfltExpMaxBatchSize = 512
	dfltExpBufferSize   = 1

	envarMaxQSize        = "OTEL_BLRP_MAX_QUEUE_SIZE"
	envarExpInterval     = "OTEL_BLRP_SCHEDULE_DELAY"
	envarExpTimeout      = "OTEL_BLRP_EXPORT_TIMEOUT"
	envarExpMaxBatchSize = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
)

// Compile-time check BatchProcessor implements Processor.
var _ Processor = (*BatchProcessor)(nil)

// BatchProcessor is a processor that exports batches of log records.
//
// Use [NewBatchProcessor] to create a BatchProcessor. An empty BatchProcessor
// is shut down by default, no records will be batched or exported.
type BatchProcessor struct {
	// The BatchProcessor is designed to provide the highest throughput of
	// log records possible while being compatible with OpenTelemetry. The
	// entry point of log records is the OnEmit method. This method is designed
	// to receive records as fast as possible while still honoring shutdown
	// commands. All records received are enqueued to queue.
	//
	// In order to block OnEmit as little as possible, a separate "poll"
	// goroutine is spawned at the creation of a BatchProcessor. This
	// goroutine is responsible for batching the queue at regular polled
	// intervals, or when it is directly signaled to.
	//
	// To keep the polling goroutine from backing up, all batches it makes are
	// exported with a bufferedExporter. This exporter allows the poll
	// goroutine to enqueue an export payload that will be handled in a
	// separate goroutine dedicated to the export. This asynchronous behavior
	// allows the poll goroutine to maintain accurate interval polling.
	//
	//   ___BatchProcessor____     __Poll Goroutine__     __Export Goroutine__
	// ||                     || ||                  || ||                    ||
	// ||          ********** || ||                  || ||     **********     ||
	// || Records=>* OnEmit * || ||   | - ticker     || ||     * export *     ||
	// ||          ********** || ||   | - trigger    || ||     **********     ||
	// ||             ||      || ||   |              || ||         ||         ||
	// ||             ||      || ||   |              || ||         ||         ||
	// ||   __________\/___   || ||   |***********   || ||   ______/\_______  ||
	// ||  (____queue______)>=||=||===|*  batch  *===||=||=>[_export_buffer_] ||
	// ||                     || ||   |***********   || ||                    ||
	// ||_____________________|| ||__________________|| ||____________________||
	//
	//
	// The "release valve" in this processing is the record queue. This queue
	// is a ring buffer. It will overwrite the oldest records first when writes
	// to OnEmit are made faster than the queue can be flushed. If batches
	// cannot be flushed to the export buffer, the records will remain in the
	// queue.

	// exporter is the bufferedExporter all batches are exported with.
	exporter *bufferExporter

	// q is the active queue of records that have not yet been exported.
	q *queue
	// batchSize is the minimum number of records needed before an export is
	// triggered (unless the interval expires).
	batchSize int

	// pollTrigger triggers the poll goroutine to flush a batch from the queue.
	// This is sent to when it is known that the queue contains at least one
	// complete batch.
	//
	// When a send is made to the channel, the poll loop will be reset after
	// the flush. If there is still enough records in the queue for another
	// batch the reset of the poll loop will automatically re-trigger itself.
	// There is no need for the original sender to monitor and resend.
	pollTrigger chan struct{}
	// pollKill kills the poll goroutine. This is only expected to be closed
	// once by the Shutdown method.
	pollKill chan struct{}
	// pollDone signals the poll goroutine has completed.
	pollDone chan struct{}

	// stopped holds the stopped state of the BatchProcessor.
	stopped atomic.Bool

	noCmp [0]func() //nolint: unused  // This is indeed used.
}

// NewBatchProcessor decorates the provided exporter
// so that the log records are batched before exporting.
//
// All of the exporter's methods are called synchronously.
func ( Exporter,  ...BatchProcessorOption) *BatchProcessor {
	 := newBatchConfig()
	if  == nil {
		// Do not panic on nil export.
		 = defaultNoopExporter
	}
	// Order is important here. Wrap the timeoutExporter with the chunkExporter
	// to ensure each export completes in timeout (instead of all chunked
	// exports).
	 = newTimeoutExporter(, .expTimeout.Value)
	// Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched
	// appropriately on export.
	 = newChunkExporter(, .expMaxBatchSize.Value)

	 := &BatchProcessor{
		exporter: newBufferExporter(, .expBufferSize.Value),

		q:           newQueue(.maxQSize.Value),
		batchSize:   .expMaxBatchSize.Value,
		pollTrigger: make(chan struct{}, 1),
		pollKill:    make(chan struct{}),
	}
	.pollDone = .poll(.expInterval.Value)
	return 
}

// poll spawns a goroutine to handle interval polling and batch exporting. The
// returned done chan is closed when the spawned goroutine completes.
func ( *BatchProcessor) ( time.Duration) ( chan struct{}) {
	 = make(chan struct{})

	 := time.NewTicker()
	// TODO: investigate using a sync.Pool instead of cloning.
	 := make([]Record, .batchSize)
	go func() {
		defer close()
		defer .Stop()

		for {
			select {
			case <-.C:
			case <-.pollTrigger:
				.Reset()
			case <-.pollKill:
				return
			}

			if  := .q.Dropped();  > 0 {
				global.Warn("dropped log records", "dropped", )
			}

			var  int
			// Don't copy data from queue unless exporter can accept more, it is very expensive.
			if .exporter.Ready() {
				 = .q.TryDequeue(, func( []Record) bool {
					 := .exporter.EnqueueExport()
					if  {
						 = slices.Clone()
					}
					return 
				})
			} else {
				 = .q.Len()
			}

			if  >= .batchSize {
				// There is another full batch ready. Immediately trigger
				// another export attempt.
				select {
				case .pollTrigger <- struct{}{}:
				default:
					// Another flush signal already received.
				}
			}
		}
	}()
	return 
}

// OnEmit batches provided log record.
func ( *BatchProcessor) ( context.Context,  *Record) error {
	if .stopped.Load() || .q == nil {
		return nil
	}
	// The record is cloned so that changes done by subsequent processors
	// are not going to lead to a data race.
	if  := .q.Enqueue(.Clone());  >= .batchSize {
		select {
		case .pollTrigger <- struct{}{}:
		default:
			// Flush chan full. The poll goroutine will handle this by
			// re-sending any trigger until the queue has less than batchSize
			// records.
		}
	}
	return nil
}

// Shutdown flushes queued log records and shuts down the decorated exporter.
func ( *BatchProcessor) ( context.Context) error {
	if .stopped.Swap(true) || .q == nil {
		return nil
	}

	// Stop the poll goroutine.
	close(.pollKill)
	select {
	case <-.pollDone:
	case <-.Done():
		// Out of time.
		return errors.Join(.Err(), .exporter.Shutdown())
	}

	// Flush remaining queued before exporter shutdown.
	 := .exporter.Export(, .q.Flush())
	return errors.Join(, .exporter.Shutdown())
}

var errPartialFlush = errors.New("partial flush: export buffer full")

// Used for testing.
var ctxErr = func( context.Context) error {
	return .Err()
}

// ForceFlush flushes queued log records and flushes the decorated exporter.
func ( *BatchProcessor) ( context.Context) error {
	if .stopped.Load() || .q == nil {
		return nil
	}

	 := make([]Record, .q.cap)
	 := func() bool {
		var  bool
		_ = .q.TryDequeue(, func( []Record) bool {
			 = .exporter.EnqueueExport()
			return 
		})
		return !
	}
	var  error
	// For as long as ctx allows, try to make a single flush of the queue.
	for () {
		// Use ctxErr instead of calling ctx.Err directly so we can test
		// the partial error return.
		if  := ctxErr();  != nil {
			 = errors.Join(, errPartialFlush)
			break
		}
	}
	return errors.Join(, .exporter.ForceFlush())
}

// queue holds a queue of logging records.
//
// When the queue becomes full, the oldest records in the queue are
// overwritten.
type queue struct {
	sync.Mutex

	dropped     atomic.Uint64
	cap, len    int
	read, write *ring
}

func newQueue( int) *queue {
	 := newRing()
	return &queue{
		cap:   ,
		read:  ,
		write: ,
	}
}

func ( *queue) () int {
	.Lock()
	defer .Unlock()

	return .len
}

// Dropped returns the number of Records dropped during enqueueing since the
// last time Dropped was called.
func ( *queue) () uint64 {
	return .dropped.Swap(0)
}

// Enqueue adds r to the queue. The queue size, including the addition of r, is
// returned.
//
// If enqueueing r will exceed the capacity of q, the oldest Record held in q
// will be dropped and r retained.
func ( *queue) ( Record) int {
	.Lock()
	defer .Unlock()

	.write.Value = 
	.write = .write.Next()

	.len++
	if .len > .cap {
		// Overflow. Advance read to be the new "oldest".
		.len = .cap
		.read = .read.Next()
		.dropped.Add(1)
	}
	return .len
}

// TryDequeue attempts to dequeue up to len(buf) Records. The available Records
// will be assigned into buf and passed to write. If write fails, returning
// false, the Records will not be removed from the queue. If write succeeds,
// returning true, the dequeued Records are removed from the queue. The number
// of Records remaining in the queue are returned.
//
// When write is called the lock of q is held. The write function must not call
// other methods of this q that acquire the lock.
func ( *queue) ( []Record,  func([]Record) bool) int {
	.Lock()
	defer .Unlock()

	 := .read

	 := min(len(), .len)
	for  := range  {
		[] = .read.Value
		.read = .read.Next()
	}

	if ([:]) {
		.len -= 
	} else {
		.read = 
	}
	return .len
}

// Flush returns all the Records held in the queue and resets it to be
// empty.
func ( *queue) () []Record {
	.Lock()
	defer .Unlock()

	 := make([]Record, .len)
	for  := range  {
		[] = .read.Value
		.read = .read.Next()
	}
	.len = 0

	return 
}

type batchConfig struct {
	maxQSize        setting[int]
	expInterval     setting[time.Duration]
	expTimeout      setting[time.Duration]
	expMaxBatchSize setting[int]
	expBufferSize   setting[int]
}

func newBatchConfig( []BatchProcessorOption) batchConfig {
	var  batchConfig
	for ,  := range  {
		 = .apply()
	}

	.maxQSize = .maxQSize.Resolve(
		clearLessThanOne[int](),
		getenv[int](envarMaxQSize),
		clearLessThanOne[int](),
		fallback[int](dfltMaxQSize),
	)
	.expInterval = .expInterval.Resolve(
		clearLessThanOne[time.Duration](),
		getenv[time.Duration](envarExpInterval),
		clearLessThanOne[time.Duration](),
		fallback[time.Duration](dfltExpInterval),
	)
	.expTimeout = .expTimeout.Resolve(
		clearLessThanOne[time.Duration](),
		getenv[time.Duration](envarExpTimeout),
		clearLessThanOne[time.Duration](),
		fallback[time.Duration](dfltExpTimeout),
	)
	.expMaxBatchSize = .expMaxBatchSize.Resolve(
		clearLessThanOne[int](),
		getenv[int](envarExpMaxBatchSize),
		clearLessThanOne[int](),
		clampMax[int](.maxQSize.Value),
		fallback[int](dfltExpMaxBatchSize),
	)
	.expBufferSize = .expBufferSize.Resolve(
		clearLessThanOne[int](),
		fallback[int](dfltExpBufferSize),
	)

	return 
}

// BatchProcessorOption applies a configuration to a [BatchProcessor].
type BatchProcessorOption interface {
	apply(batchConfig) batchConfig
}

type batchOptionFunc func(batchConfig) batchConfig

func ( batchOptionFunc) ( batchConfig) batchConfig {
	return ()
}

// WithMaxQueueSize sets the maximum queue size used by the Batcher.
// After the size is reached log records are dropped.
//
// If the OTEL_BLRP_MAX_QUEUE_SIZE environment variable is set,
// and this option is not passed, that variable value will be used.
//
// By default, if an environment variable is not set, and this option is not
// passed, 2048 will be used.
// The default value is also used when the provided value is less than one.
func ( int) BatchProcessorOption {
	return batchOptionFunc(func( batchConfig) batchConfig {
		.maxQSize = newSetting()
		return 
	})
}

// WithExportInterval sets the maximum duration between batched exports.
//
// If the OTEL_BLRP_SCHEDULE_DELAY environment variable is set,
// and this option is not passed, that variable value will be used.
//
// By default, if an environment variable is not set, and this option is not
// passed, 1s will be used.
// The default value is also used when the provided value is less than one.
func ( time.Duration) BatchProcessorOption {
	return batchOptionFunc(func( batchConfig) batchConfig {
		.expInterval = newSetting()
		return 
	})
}

// WithExportTimeout sets the duration after which a batched export is canceled.
//
// If the OTEL_BLRP_EXPORT_TIMEOUT environment variable is set,
// and this option is not passed, that variable value will be used.
//
// By default, if an environment variable is not set, and this option is not
// passed, 30s will be used.
// The default value is also used when the provided value is less than one.
func ( time.Duration) BatchProcessorOption {
	return batchOptionFunc(func( batchConfig) batchConfig {
		.expTimeout = newSetting()
		return 
	})
}

// WithExportMaxBatchSize sets the maximum batch size of every export.
// A batch will be split into multiple exports to not exceed this size.
//
// If the OTEL_BLRP_MAX_EXPORT_BATCH_SIZE environment variable is set,
// and this option is not passed, that variable value will be used.
//
// By default, if an environment variable is not set, and this option is not
// passed, 512 will be used.
// The default value is also used when the provided value is less than one.
func ( int) BatchProcessorOption {
	return batchOptionFunc(func( batchConfig) batchConfig {
		.expMaxBatchSize = newSetting()
		return 
	})
}

// WithExportBufferSize sets the batch buffer size.
// Batches will be temporarily kept in a memory buffer until they are exported.
//
// By default, a value of 1 will be used.
// The default value is also used when the provided value is less than one.
func ( int) BatchProcessorOption {
	return batchOptionFunc(func( batchConfig) batchConfig {
		.expBufferSize = newSetting()
		return 
	})
}