package trace
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/internal/env"
"go.opentelemetry.io/otel/sdk/trace/internal/x"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
"go.opentelemetry.io/otel/trace"
)
const (
DefaultMaxQueueSize = 2048
DefaultScheduleDelay = 5000
DefaultExportTimeout = 30000
DefaultMaxExportBatchSize = 512
)
var queueFull = otelconv .ErrorTypeAttr ("queue_full" )
type BatchSpanProcessorOption func (o *BatchSpanProcessorOptions )
type BatchSpanProcessorOptions struct {
MaxQueueSize int
BatchTimeout time .Duration
ExportTimeout time .Duration
MaxExportBatchSize int
BlockOnQueueFull bool
}
type batchSpanProcessor struct {
e SpanExporter
o BatchSpanProcessorOptions
queue chan ReadOnlySpan
dropped uint32
selfObservabilityEnabled bool
callbackRegistration metric .Registration
spansProcessedCounter otelconv .SDKProcessorSpanProcessed
componentNameAttr attribute .KeyValue
batch []ReadOnlySpan
batchMutex sync .Mutex
timer *time .Timer
stopWait sync .WaitGroup
stopOnce sync .Once
stopCh chan struct {}
stopped atomic .Bool
}
var _ SpanProcessor = (*batchSpanProcessor )(nil )
func NewBatchSpanProcessor (exporter SpanExporter , options ...BatchSpanProcessorOption ) SpanProcessor {
maxQueueSize := env .BatchSpanProcessorMaxQueueSize (DefaultMaxQueueSize )
maxExportBatchSize := env .BatchSpanProcessorMaxExportBatchSize (DefaultMaxExportBatchSize )
if maxExportBatchSize > maxQueueSize {
maxExportBatchSize = min (DefaultMaxExportBatchSize , maxQueueSize )
}
o := BatchSpanProcessorOptions {
BatchTimeout : time .Duration (env .BatchSpanProcessorScheduleDelay (DefaultScheduleDelay )) * time .Millisecond ,
ExportTimeout : time .Duration (env .BatchSpanProcessorExportTimeout (DefaultExportTimeout )) * time .Millisecond ,
MaxQueueSize : maxQueueSize ,
MaxExportBatchSize : maxExportBatchSize ,
}
for _ , opt := range options {
opt (&o )
}
bsp := &batchSpanProcessor {
e : exporter ,
o : o ,
batch : make ([]ReadOnlySpan , 0 , o .MaxExportBatchSize ),
timer : time .NewTimer (o .BatchTimeout ),
queue : make (chan ReadOnlySpan , o .MaxQueueSize ),
stopCh : make (chan struct {}),
}
if x .SelfObservability .Enabled () {
bsp .selfObservabilityEnabled = true
bsp .componentNameAttr = componentName ()
var err error
bsp .spansProcessedCounter , bsp .callbackRegistration , err = newBSPObs (
bsp .componentNameAttr ,
func () int64 { return int64 (len (bsp .queue )) },
int64 (bsp .o .MaxQueueSize ),
)
if err != nil {
otel .Handle (err )
}
}
bsp .stopWait .Add (1 )
go func () {
defer bsp .stopWait .Done ()
bsp .processQueue ()
bsp .drainQueue ()
}()
return bsp
}
var processorIDCounter atomic .Int64
func nextProcessorID() int64 {
return processorIDCounter .Add (1 ) - 1
}
func componentName() attribute .KeyValue {
id := nextProcessorID ()
name := fmt .Sprintf ("%s/%d" , otelconv .ComponentTypeBatchingSpanProcessor , id )
return semconv .OTelComponentName (name )
}
func newBSPObs(
cmpnt attribute .KeyValue ,
qLen func () int64 ,
qMax int64 ,
) (otelconv .SDKProcessorSpanProcessed , metric .Registration , error ) {
meter := otel .GetMeterProvider ().Meter (
selfObsScopeName ,
metric .WithInstrumentationVersion (sdk .Version ()),
metric .WithSchemaURL (semconv .SchemaURL ),
)
qCap , err := otelconv .NewSDKProcessorSpanQueueCapacity (meter )
qSize , e := otelconv .NewSDKProcessorSpanQueueSize (meter )
err = errors .Join (err , e )
spansProcessed , e := otelconv .NewSDKProcessorSpanProcessed (meter )
err = errors .Join (err , e )
cmpntT := semconv .OTelComponentTypeBatchingSpanProcessor
attrs := metric .WithAttributes (cmpnt , cmpntT )
reg , e := meter .RegisterCallback (
func (_ context .Context , o metric .Observer ) error {
o .ObserveInt64 (qSize .Inst (), qLen (), attrs )
o .ObserveInt64 (qCap .Inst (), qMax , attrs )
return nil
},
qSize .Inst (),
qCap .Inst (),
)
err = errors .Join (err , e )
return spansProcessed , reg , err
}
func (*batchSpanProcessor ) OnStart (context .Context , ReadWriteSpan ) {}
func (bsp *batchSpanProcessor ) OnEnd (s ReadOnlySpan ) {
if bsp .stopped .Load () {
return
}
if bsp .e == nil {
return
}
bsp .enqueue (s )
}
func (bsp *batchSpanProcessor ) Shutdown (ctx context .Context ) error {
var err error
bsp .stopOnce .Do (func () {
bsp .stopped .Store (true )
wait := make (chan struct {})
go func () {
close (bsp .stopCh )
bsp .stopWait .Wait ()
if bsp .e != nil {
if err := bsp .e .Shutdown (ctx ); err != nil {
otel .Handle (err )
}
}
close (wait )
}()
select {
case <- wait :
case <- ctx .Done ():
err = ctx .Err ()
}
if bsp .selfObservabilityEnabled {
err = errors .Join (err , bsp .callbackRegistration .Unregister ())
}
})
return err
}
type forceFlushSpan struct {
ReadOnlySpan
flushed chan struct {}
}
func (forceFlushSpan ) SpanContext () trace .SpanContext {
return trace .NewSpanContext (trace .SpanContextConfig {TraceFlags : trace .FlagsSampled })
}
func (bsp *batchSpanProcessor ) ForceFlush (ctx context .Context ) error {
if err := ctx .Err (); err != nil {
return err
}
if bsp .stopped .Load () {
return nil
}
var err error
if bsp .e != nil {
flushCh := make (chan struct {})
if bsp .enqueueBlockOnQueueFull (ctx , forceFlushSpan {flushed : flushCh }) {
select {
case <- bsp .stopCh :
return nil
case <- flushCh :
case <- ctx .Done ():
return ctx .Err ()
}
}
wait := make (chan error , 1 )
go func () {
wait <- bsp .exportSpans (ctx )
}()
select {
case err = <- wait :
case <- ctx .Done ():
err = ctx .Err ()
}
}
return err
}
func WithMaxQueueSize (size int ) BatchSpanProcessorOption {
return func (o *BatchSpanProcessorOptions ) {
o .MaxQueueSize = size
}
}
func WithMaxExportBatchSize (size int ) BatchSpanProcessorOption {
return func (o *BatchSpanProcessorOptions ) {
o .MaxExportBatchSize = size
}
}
func WithBatchTimeout (delay time .Duration ) BatchSpanProcessorOption {
return func (o *BatchSpanProcessorOptions ) {
o .BatchTimeout = delay
}
}
func WithExportTimeout (timeout time .Duration ) BatchSpanProcessorOption {
return func (o *BatchSpanProcessorOptions ) {
o .ExportTimeout = timeout
}
}
func WithBlocking () BatchSpanProcessorOption {
return func (o *BatchSpanProcessorOptions ) {
o .BlockOnQueueFull = true
}
}
func (bsp *batchSpanProcessor ) exportSpans (ctx context .Context ) error {
bsp .timer .Reset (bsp .o .BatchTimeout )
bsp .batchMutex .Lock ()
defer bsp .batchMutex .Unlock ()
if bsp .o .ExportTimeout > 0 {
var cancel context .CancelFunc
ctx , cancel = context .WithTimeoutCause (ctx , bsp .o .ExportTimeout , errors .New ("processor export timeout" ))
defer cancel ()
}
if l := len (bsp .batch ); l > 0 {
global .Debug ("exporting spans" , "count" , len (bsp .batch ), "total_dropped" , atomic .LoadUint32 (&bsp .dropped ))
if bsp .selfObservabilityEnabled {
bsp .spansProcessedCounter .Add (ctx , int64 (l ),
bsp .componentNameAttr ,
bsp .spansProcessedCounter .AttrComponentType (otelconv .ComponentTypeBatchingSpanProcessor ))
}
err := bsp .e .ExportSpans (ctx , bsp .batch )
clear (bsp .batch )
bsp .batch = bsp .batch [:0 ]
if err != nil {
return err
}
}
return nil
}
func (bsp *batchSpanProcessor ) processQueue () {
defer bsp .timer .Stop ()
ctx , cancel := context .WithCancel (context .Background ())
defer cancel ()
for {
select {
case <- bsp .stopCh :
return
case <- bsp .timer .C :
if err := bsp .exportSpans (ctx ); err != nil {
otel .Handle (err )
}
case sd := <- bsp .queue :
if ffs , ok := sd .(forceFlushSpan ); ok {
close (ffs .flushed )
continue
}
bsp .batchMutex .Lock ()
bsp .batch = append (bsp .batch , sd )
shouldExport := len (bsp .batch ) >= bsp .o .MaxExportBatchSize
bsp .batchMutex .Unlock ()
if shouldExport {
if !bsp .timer .Stop () {
select {
case <- bsp .timer .C :
default :
}
}
if err := bsp .exportSpans (ctx ); err != nil {
otel .Handle (err )
}
}
}
}
}
func (bsp *batchSpanProcessor ) drainQueue () {
ctx , cancel := context .WithCancel (context .Background ())
defer cancel ()
for {
select {
case sd := <- bsp .queue :
if _ , ok := sd .(forceFlushSpan ); ok {
continue
}
bsp .batchMutex .Lock ()
bsp .batch = append (bsp .batch , sd )
shouldExport := len (bsp .batch ) == bsp .o .MaxExportBatchSize
bsp .batchMutex .Unlock ()
if shouldExport {
if err := bsp .exportSpans (ctx ); err != nil {
otel .Handle (err )
}
}
default :
if err := bsp .exportSpans (ctx ); err != nil {
otel .Handle (err )
}
return
}
}
}
func (bsp *batchSpanProcessor ) enqueue (sd ReadOnlySpan ) {
ctx := context .TODO ()
if bsp .o .BlockOnQueueFull {
bsp .enqueueBlockOnQueueFull (ctx , sd )
} else {
bsp .enqueueDrop (ctx , sd )
}
}
func (bsp *batchSpanProcessor ) enqueueBlockOnQueueFull (ctx context .Context , sd ReadOnlySpan ) bool {
if !sd .SpanContext ().IsSampled () {
return false
}
select {
case bsp .queue <- sd :
return true
case <- ctx .Done ():
if bsp .selfObservabilityEnabled {
bsp .spansProcessedCounter .Add (ctx , 1 ,
bsp .componentNameAttr ,
bsp .spansProcessedCounter .AttrComponentType (otelconv .ComponentTypeBatchingSpanProcessor ),
bsp .spansProcessedCounter .AttrErrorType (queueFull ))
}
return false
}
}
func (bsp *batchSpanProcessor ) enqueueDrop (ctx context .Context , sd ReadOnlySpan ) bool {
if !sd .SpanContext ().IsSampled () {
return false
}
select {
case bsp .queue <- sd :
return true
default :
atomic .AddUint32 (&bsp .dropped , 1 )
if bsp .selfObservabilityEnabled {
bsp .spansProcessedCounter .Add (ctx , 1 ,
bsp .componentNameAttr ,
bsp .spansProcessedCounter .AttrComponentType (otelconv .ComponentTypeBatchingSpanProcessor ),
bsp .spansProcessedCounter .AttrErrorType (queueFull ))
}
}
return false
}
func (bsp *batchSpanProcessor ) MarshalLog () any {
return struct {
Type string
SpanExporter SpanExporter
Config BatchSpanProcessorOptions
}{
Type : "BatchSpanProcessor" ,
SpanExporter : bsp .e ,
Config : bsp .o ,
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .