// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package trace // import "go.opentelemetry.io/otel/sdk/trace"

import (
	
	
	
	
	
	

	
	
	
	
	
	
	
	semconv 
	
	
)

// Defaults for BatchSpanProcessorOptions.
const (
	DefaultMaxQueueSize = 2048
	// DefaultScheduleDelay is the delay interval between two consecutive exports, in milliseconds.
	DefaultScheduleDelay = 5000
	// DefaultExportTimeout is the duration after which an export is cancelled, in milliseconds.
	DefaultExportTimeout      = 30000
	DefaultMaxExportBatchSize = 512
)

var queueFull = otelconv.ErrorTypeAttr("queue_full")

// BatchSpanProcessorOption configures a BatchSpanProcessor.
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)

// BatchSpanProcessorOptions is configuration settings for a
// BatchSpanProcessor.
type BatchSpanProcessorOptions struct {
	// MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the
	// queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior.
	// The default value of MaxQueueSize is 2048.
	MaxQueueSize int

	// BatchTimeout is the maximum duration for constructing a batch. Processor
	// forcefully sends available spans when timeout is reached.
	// The default value of BatchTimeout is 5000 msec.
	BatchTimeout time.Duration

	// ExportTimeout specifies the maximum duration for exporting spans. If the timeout
	// is reached, the export will be cancelled.
	// The default value of ExportTimeout is 30000 msec.
	ExportTimeout time.Duration

	// MaxExportBatchSize is the maximum number of spans to process in a single batch.
	// If there are more than one batch worth of spans then it processes multiple batches
	// of spans one batch after the other without any delay.
	// The default value of MaxExportBatchSize is 512.
	MaxExportBatchSize int

	// BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full
	// AND if BlockOnQueueFull is set to true.
	// Blocking option should be used carefully as it can severely affect the performance of an
	// application.
	BlockOnQueueFull bool
}

// batchSpanProcessor is a SpanProcessor that batches asynchronously-received
// spans and sends them to a trace.Exporter when complete.
type batchSpanProcessor struct {
	e SpanExporter
	o BatchSpanProcessorOptions

	queue   chan ReadOnlySpan
	dropped uint32

	selfObservabilityEnabled bool
	callbackRegistration     metric.Registration
	spansProcessedCounter    otelconv.SDKProcessorSpanProcessed
	componentNameAttr        attribute.KeyValue

	batch      []ReadOnlySpan
	batchMutex sync.Mutex
	timer      *time.Timer
	stopWait   sync.WaitGroup
	stopOnce   sync.Once
	stopCh     chan struct{}
	stopped    atomic.Bool
}

var _ SpanProcessor = (*batchSpanProcessor)(nil)

// NewBatchSpanProcessor creates a new SpanProcessor that will send completed
// span batches to the exporter with the supplied options.
//
// If the exporter is nil, the span processor will perform no action.
func ( SpanExporter,  ...BatchSpanProcessorOption) SpanProcessor {
	 := env.BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize)
	 := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize)

	if  >  {
		 = min(DefaultMaxExportBatchSize, )
	}

	 := BatchSpanProcessorOptions{
		BatchTimeout:       time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond,
		ExportTimeout:      time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,
		MaxQueueSize:       ,
		MaxExportBatchSize: ,
	}
	for ,  := range  {
		(&)
	}
	 := &batchSpanProcessor{
		e:      ,
		o:      ,
		batch:  make([]ReadOnlySpan, 0, .MaxExportBatchSize),
		timer:  time.NewTimer(.BatchTimeout),
		queue:  make(chan ReadOnlySpan, .MaxQueueSize),
		stopCh: make(chan struct{}),
	}

	if x.SelfObservability.Enabled() {
		.selfObservabilityEnabled = true
		.componentNameAttr = componentName()

		var  error
		.spansProcessedCounter, .callbackRegistration,  = newBSPObs(
			.componentNameAttr,
			func() int64 { return int64(len(.queue)) },
			int64(.o.MaxQueueSize),
		)
		if  != nil {
			otel.Handle()
		}
	}

	.stopWait.Add(1)
	go func() {
		defer .stopWait.Done()
		.processQueue()
		.drainQueue()
	}()

	return 
}

var processorIDCounter atomic.Int64

// nextProcessorID returns an identifier for this batch span processor,
// starting with 0 and incrementing by 1 each time it is called.
func nextProcessorID() int64 {
	return processorIDCounter.Add(1) - 1
}

func componentName() attribute.KeyValue {
	 := nextProcessorID()
	 := fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, )
	return semconv.OTelComponentName()
}

// newBSPObs creates and returns a new set of metrics instruments and a
// registration for a BatchSpanProcessor. It is the caller's responsibility
// to unregister the registration when it is no longer needed.
func newBSPObs(
	 attribute.KeyValue,
	 func() int64,
	 int64,
) (otelconv.SDKProcessorSpanProcessed, metric.Registration, error) {
	 := otel.GetMeterProvider().Meter(
		selfObsScopeName,
		metric.WithInstrumentationVersion(sdk.Version()),
		metric.WithSchemaURL(semconv.SchemaURL),
	)

	,  := otelconv.NewSDKProcessorSpanQueueCapacity()

	,  := otelconv.NewSDKProcessorSpanQueueSize()
	 = errors.Join(, )

	,  := otelconv.NewSDKProcessorSpanProcessed()
	 = errors.Join(, )

	 := semconv.OTelComponentTypeBatchingSpanProcessor
	 := metric.WithAttributes(, )

	,  := .RegisterCallback(
		func( context.Context,  metric.Observer) error {
			.ObserveInt64(.Inst(), (), )
			.ObserveInt64(.Inst(), , )
			return nil
		},
		.Inst(),
		.Inst(),
	)
	 = errors.Join(, )

	return , , 
}

// OnStart method does nothing.
func (*batchSpanProcessor) (context.Context, ReadWriteSpan) {}

// OnEnd method enqueues a ReadOnlySpan for later processing.
func ( *batchSpanProcessor) ( ReadOnlySpan) {
	// Do not enqueue spans after Shutdown.
	if .stopped.Load() {
		return
	}

	// Do not enqueue spans if we are just going to drop them.
	if .e == nil {
		return
	}
	.enqueue()
}

// Shutdown flushes the queue and waits until all spans are processed.
// It only executes once. Subsequent call does nothing.
func ( *batchSpanProcessor) ( context.Context) error {
	var  error
	.stopOnce.Do(func() {
		.stopped.Store(true)
		 := make(chan struct{})
		go func() {
			close(.stopCh)
			.stopWait.Wait()
			if .e != nil {
				if  := .e.Shutdown();  != nil {
					otel.Handle()
				}
			}
			close()
		}()
		// Wait until the wait group is done or the context is cancelled
		select {
		case <-:
		case <-.Done():
			 = .Err()
		}
		if .selfObservabilityEnabled {
			 = errors.Join(, .callbackRegistration.Unregister())
		}
	})
	return 
}

type forceFlushSpan struct {
	ReadOnlySpan
	flushed chan struct{}
}

func (forceFlushSpan) () trace.SpanContext {
	return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})
}

// ForceFlush exports all ended spans that have not yet been exported.
func ( *batchSpanProcessor) ( context.Context) error {
	// Interrupt if context is already canceled.
	if  := .Err();  != nil {
		return 
	}

	// Do nothing after Shutdown.
	if .stopped.Load() {
		return nil
	}

	var  error
	if .e != nil {
		 := make(chan struct{})
		if .enqueueBlockOnQueueFull(, forceFlushSpan{flushed: }) {
			select {
			case <-.stopCh:
				// The batchSpanProcessor is Shutdown.
				return nil
			case <-:
				// Processed any items in queue prior to ForceFlush being called
			case <-.Done():
				return .Err()
			}
		}

		 := make(chan error, 1)
		go func() {
			 <- .exportSpans()
		}()
		// Wait until the export is finished or the context is cancelled/timed out
		select {
		case  = <-:
		case <-.Done():
			 = .Err()
		}
	}
	return 
}

// WithMaxQueueSize returns a BatchSpanProcessorOption that configures the
// maximum queue size allowed for a BatchSpanProcessor.
func ( int) BatchSpanProcessorOption {
	return func( *BatchSpanProcessorOptions) {
		.MaxQueueSize = 
	}
}

// WithMaxExportBatchSize returns a BatchSpanProcessorOption that configures
// the maximum export batch size allowed for a BatchSpanProcessor.
func ( int) BatchSpanProcessorOption {
	return func( *BatchSpanProcessorOptions) {
		.MaxExportBatchSize = 
	}
}

// WithBatchTimeout returns a BatchSpanProcessorOption that configures the
// maximum delay allowed for a BatchSpanProcessor before it will export any
// held span (whether the queue is full or not).
func ( time.Duration) BatchSpanProcessorOption {
	return func( *BatchSpanProcessorOptions) {
		.BatchTimeout = 
	}
}

// WithExportTimeout returns a BatchSpanProcessorOption that configures the
// amount of time a BatchSpanProcessor waits for an exporter to export before
// abandoning the export.
func ( time.Duration) BatchSpanProcessorOption {
	return func( *BatchSpanProcessorOptions) {
		.ExportTimeout = 
	}
}

// WithBlocking returns a BatchSpanProcessorOption that configures a
// BatchSpanProcessor to wait for enqueue operations to succeed instead of
// dropping data when the queue is full.
func () BatchSpanProcessorOption {
	return func( *BatchSpanProcessorOptions) {
		.BlockOnQueueFull = true
	}
}

// exportSpans is a subroutine of processing and draining the queue.
func ( *batchSpanProcessor) ( context.Context) error {
	.timer.Reset(.o.BatchTimeout)

	.batchMutex.Lock()
	defer .batchMutex.Unlock()

	if .o.ExportTimeout > 0 {
		var  context.CancelFunc
		,  = context.WithTimeoutCause(, .o.ExportTimeout, errors.New("processor export timeout"))
		defer ()
	}

	if  := len(.batch);  > 0 {
		global.Debug("exporting spans", "count", len(.batch), "total_dropped", atomic.LoadUint32(&.dropped))
		if .selfObservabilityEnabled {
			.spansProcessedCounter.Add(, int64(),
				.componentNameAttr,
				.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor))
		}
		 := .e.ExportSpans(, .batch)

		// A new batch is always created after exporting, even if the batch failed to be exported.
		//
		// It is up to the exporter to implement any type of retry logic if a batch is failing
		// to be exported, since it is specific to the protocol and backend being sent to.
		clear(.batch) // Erase elements to let GC collect objects
		.batch = .batch[:0]

		if  != nil {
			return 
		}
	}
	return nil
}

// processQueue removes spans from the `queue` channel until processor
// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
// waiting up to BatchTimeout to form a batch.
func ( *batchSpanProcessor) () {
	defer .timer.Stop()

	,  := context.WithCancel(context.Background())
	defer ()
	for {
		select {
		case <-.stopCh:
			return
		case <-.timer.C:
			if  := .exportSpans();  != nil {
				otel.Handle()
			}
		case  := <-.queue:
			if ,  := .(forceFlushSpan);  {
				close(.flushed)
				continue
			}
			.batchMutex.Lock()
			.batch = append(.batch, )
			 := len(.batch) >= .o.MaxExportBatchSize
			.batchMutex.Unlock()
			if  {
				if !.timer.Stop() {
					// Handle both GODEBUG=asynctimerchan=[0|1] properly.
					select {
					case <-.timer.C:
					default:
					}
				}
				if  := .exportSpans();  != nil {
					otel.Handle()
				}
			}
		}
	}
}

// drainQueue awaits the any caller that had added to bsp.stopWait
// to finish the enqueue, then exports the final batch.
func ( *batchSpanProcessor) () {
	,  := context.WithCancel(context.Background())
	defer ()
	for {
		select {
		case  := <-.queue:
			if ,  := .(forceFlushSpan);  {
				// Ignore flush requests as they are not valid spans.
				continue
			}

			.batchMutex.Lock()
			.batch = append(.batch, )
			 := len(.batch) == .o.MaxExportBatchSize
			.batchMutex.Unlock()

			if  {
				if  := .exportSpans();  != nil {
					otel.Handle()
				}
			}
		default:
			// There are no more enqueued spans. Make final export.
			if  := .exportSpans();  != nil {
				otel.Handle()
			}
			return
		}
	}
}

func ( *batchSpanProcessor) ( ReadOnlySpan) {
	 := context.TODO()
	if .o.BlockOnQueueFull {
		.enqueueBlockOnQueueFull(, )
	} else {
		.enqueueDrop(, )
	}
}

func ( *batchSpanProcessor) ( context.Context,  ReadOnlySpan) bool {
	if !.SpanContext().IsSampled() {
		return false
	}

	select {
	case .queue <- :
		return true
	case <-.Done():
		if .selfObservabilityEnabled {
			.spansProcessedCounter.Add(, 1,
				.componentNameAttr,
				.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
				.spansProcessedCounter.AttrErrorType(queueFull))
		}
		return false
	}
}

func ( *batchSpanProcessor) ( context.Context,  ReadOnlySpan) bool {
	if !.SpanContext().IsSampled() {
		return false
	}

	select {
	case .queue <- :
		return true
	default:
		atomic.AddUint32(&.dropped, 1)
		if .selfObservabilityEnabled {
			.spansProcessedCounter.Add(, 1,
				.componentNameAttr,
				.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
				.spansProcessedCounter.AttrErrorType(queueFull))
		}
	}
	return false
}

// MarshalLog is the marshaling function used by the logging system to represent this Span Processor.
func ( *batchSpanProcessor) () any {
	return struct {
		         string
		 SpanExporter
		       BatchSpanProcessorOptions
	}{
		:         "BatchSpanProcessor",
		: .e,
		:       .o,
	}
}