Source File
batch.go
Belonging Package
go.opentelemetry.io/otel/sdk/log
// Copyright The OpenTelemetry Authors// SPDX-License-Identifier: Apache-2.0package log // import "go.opentelemetry.io/otel/sdk/log"import ()const (dfltMaxQSize = 2048dfltExpInterval = time.SeconddfltExpTimeout = 30 * time.SeconddfltExpMaxBatchSize = 512dfltExpBufferSize = 1envarMaxQSize = "OTEL_BLRP_MAX_QUEUE_SIZE"envarExpInterval = "OTEL_BLRP_SCHEDULE_DELAY"envarExpTimeout = "OTEL_BLRP_EXPORT_TIMEOUT"envarExpMaxBatchSize = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE")// Compile-time check BatchProcessor implements Processor.var _ Processor = (*BatchProcessor)(nil)// BatchProcessor is a processor that exports batches of log records.//// Use [NewBatchProcessor] to create a BatchProcessor. An empty BatchProcessor// is shut down by default, no records will be batched or exported.type BatchProcessor struct {// The BatchProcessor is designed to provide the highest throughput of// log records possible while being compatible with OpenTelemetry. The// entry point of log records is the OnEmit method. This method is designed// to receive records as fast as possible while still honoring shutdown// commands. All records received are enqueued to queue.//// In order to block OnEmit as little as possible, a separate "poll"// goroutine is spawned at the creation of a BatchProcessor. This// goroutine is responsible for batching the queue at regular polled// intervals, or when it is directly signaled to.//// To keep the polling goroutine from backing up, all batches it makes are// exported with a bufferedExporter. This exporter allows the poll// goroutine to enqueue an export payload that will be handled in a// separate goroutine dedicated to the export. This asynchronous behavior// allows the poll goroutine to maintain accurate interval polling.//// ___BatchProcessor____ __Poll Goroutine__ __Export Goroutine__// || || || || || ||// || ********** || || || || ********** ||// || Records=>* OnEmit * || || | - ticker || || * export * ||// || ********** || || | - trigger || || ********** ||// || || || || | || || || ||// || || || || | || || || ||// || __________\/___ || || |*********** || || ______/\_______ ||// || (____queue______)>=||=||===|* batch *===||=||=>[_export_buffer_] ||// || || || |*********** || || ||// ||_____________________|| ||__________________|| ||____________________||////// The "release valve" in this processing is the record queue. This queue// is a ring buffer. It will overwrite the oldest records first when writes// to OnEmit are made faster than the queue can be flushed. If batches// cannot be flushed to the export buffer, the records will remain in the// queue.// exporter is the bufferedExporter all batches are exported with.exporter *bufferExporter// q is the active queue of records that have not yet been exported.q *queue// batchSize is the minimum number of records needed before an export is// triggered (unless the interval expires).batchSize int// pollTrigger triggers the poll goroutine to flush a batch from the queue.// This is sent to when it is known that the queue contains at least one// complete batch.//// When a send is made to the channel, the poll loop will be reset after// the flush. If there is still enough records in the queue for another// batch the reset of the poll loop will automatically re-trigger itself.// There is no need for the original sender to monitor and resend.pollTrigger chan struct{}// pollKill kills the poll goroutine. This is only expected to be closed// once by the Shutdown method.pollKill chan struct{}// pollDone signals the poll goroutine has completed.pollDone chan struct{}// stopped holds the stopped state of the BatchProcessor.stopped atomic.BoolnoCmp [0]func() //nolint: unused // This is indeed used.}// NewBatchProcessor decorates the provided exporter// so that the log records are batched before exporting.//// All of the exporter's methods are called synchronously.func ( Exporter, ...BatchProcessorOption) *BatchProcessor {:= newBatchConfig()if == nil {// Do not panic on nil export.= defaultNoopExporter}// Order is important here. Wrap the timeoutExporter with the chunkExporter// to ensure each export completes in timeout (instead of all chunked// exports).= newTimeoutExporter(, .expTimeout.Value)// Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched// appropriately on export.= newChunkExporter(, .expMaxBatchSize.Value):= &BatchProcessor{exporter: newBufferExporter(, .expBufferSize.Value),q: newQueue(.maxQSize.Value),batchSize: .expMaxBatchSize.Value,pollTrigger: make(chan struct{}, 1),pollKill: make(chan struct{}),}.pollDone = .poll(.expInterval.Value)return}// poll spawns a goroutine to handle interval polling and batch exporting. The// returned done chan is closed when the spawned goroutine completes.func ( *BatchProcessor) ( time.Duration) ( chan struct{}) {= make(chan struct{}):= time.NewTicker()// TODO: investigate using a sync.Pool instead of cloning.:= make([]Record, .batchSize)go func() {defer close()defer .Stop()for {select {case <-.C:case <-.pollTrigger:.Reset()case <-.pollKill:return}if := .q.Dropped(); > 0 {global.Warn("dropped log records", "dropped", )}var int// Don't copy data from queue unless exporter can accept more, it is very expensive.if .exporter.Ready() {= .q.TryDequeue(, func( []Record) bool {:= .exporter.EnqueueExport()if {= slices.Clone()}return})} else {= .q.Len()}if >= .batchSize {// There is another full batch ready. Immediately trigger// another export attempt.select {case .pollTrigger <- struct{}{}:default:// Another flush signal already received.}}}}()return}// OnEmit batches provided log record.func ( *BatchProcessor) ( context.Context, *Record) error {if .stopped.Load() || .q == nil {return nil}// The record is cloned so that changes done by subsequent processors// are not going to lead to a data race.if := .q.Enqueue(.Clone()); >= .batchSize {select {case .pollTrigger <- struct{}{}:default:// Flush chan full. The poll goroutine will handle this by// re-sending any trigger until the queue has less than batchSize// records.}}return nil}// Shutdown flushes queued log records and shuts down the decorated exporter.func ( *BatchProcessor) ( context.Context) error {if .stopped.Swap(true) || .q == nil {return nil}// Stop the poll goroutine.close(.pollKill)select {case <-.pollDone:case <-.Done():// Out of time.return errors.Join(.Err(), .exporter.Shutdown())}// Flush remaining queued before exporter shutdown.:= .exporter.Export(, .q.Flush())return errors.Join(, .exporter.Shutdown())}var errPartialFlush = errors.New("partial flush: export buffer full")// Used for testing.var ctxErr = func( context.Context) error {return .Err()}// ForceFlush flushes queued log records and flushes the decorated exporter.func ( *BatchProcessor) ( context.Context) error {if .stopped.Load() || .q == nil {return nil}:= make([]Record, .q.cap):= func() bool {var bool_ = .q.TryDequeue(, func( []Record) bool {= .exporter.EnqueueExport()return})return !}var error// For as long as ctx allows, try to make a single flush of the queue.for () {// Use ctxErr instead of calling ctx.Err directly so we can test// the partial error return.if := ctxErr(); != nil {= errors.Join(, errPartialFlush)break}}return errors.Join(, .exporter.ForceFlush())}// queue holds a queue of logging records.//// When the queue becomes full, the oldest records in the queue are// overwritten.type queue struct {sync.Mutexdropped atomic.Uint64cap, len intread, write *ring}func newQueue( int) *queue {:= newRing()return &queue{cap: ,read: ,write: ,}}func ( *queue) () int {.Lock()defer .Unlock()return .len}// Dropped returns the number of Records dropped during enqueueing since the// last time Dropped was called.func ( *queue) () uint64 {return .dropped.Swap(0)}// Enqueue adds r to the queue. The queue size, including the addition of r, is// returned.//// If enqueueing r will exceed the capacity of q, the oldest Record held in q// will be dropped and r retained.func ( *queue) ( Record) int {.Lock()defer .Unlock().write.Value =.write = .write.Next().len++if .len > .cap {// Overflow. Advance read to be the new "oldest"..len = .cap.read = .read.Next().dropped.Add(1)}return .len}// TryDequeue attempts to dequeue up to len(buf) Records. The available Records// will be assigned into buf and passed to write. If write fails, returning// false, the Records will not be removed from the queue. If write succeeds,// returning true, the dequeued Records are removed from the queue. The number// of Records remaining in the queue are returned.//// When write is called the lock of q is held. The write function must not call// other methods of this q that acquire the lock.func ( *queue) ( []Record, func([]Record) bool) int {.Lock()defer .Unlock():= .read:= min(len(), .len)for := range {[] = .read.Value.read = .read.Next()}if ([:]) {.len -=} else {.read =}return .len}// Flush returns all the Records held in the queue and resets it to be// empty.func ( *queue) () []Record {.Lock()defer .Unlock():= make([]Record, .len)for := range {[] = .read.Value.read = .read.Next()}.len = 0return}type batchConfig struct {maxQSize setting[int]expInterval setting[time.Duration]expTimeout setting[time.Duration]expMaxBatchSize setting[int]expBufferSize setting[int]}func newBatchConfig( []BatchProcessorOption) batchConfig {var batchConfigfor , := range {= .apply()}.maxQSize = .maxQSize.Resolve(clearLessThanOne[int](),getenv[int](envarMaxQSize),clearLessThanOne[int](),fallback[int](dfltMaxQSize),).expInterval = .expInterval.Resolve(clearLessThanOne[time.Duration](),getenv[time.Duration](envarExpInterval),clearLessThanOne[time.Duration](),fallback[time.Duration](dfltExpInterval),).expTimeout = .expTimeout.Resolve(clearLessThanOne[time.Duration](),getenv[time.Duration](envarExpTimeout),clearLessThanOne[time.Duration](),fallback[time.Duration](dfltExpTimeout),).expMaxBatchSize = .expMaxBatchSize.Resolve(clearLessThanOne[int](),getenv[int](envarExpMaxBatchSize),clearLessThanOne[int](),clampMax[int](.maxQSize.Value),fallback[int](dfltExpMaxBatchSize),).expBufferSize = .expBufferSize.Resolve(clearLessThanOne[int](),fallback[int](dfltExpBufferSize),)return}// BatchProcessorOption applies a configuration to a [BatchProcessor].type BatchProcessorOption interface {apply(batchConfig) batchConfig}type batchOptionFunc func(batchConfig) batchConfigfunc ( batchOptionFunc) ( batchConfig) batchConfig {return ()}// WithMaxQueueSize sets the maximum queue size used by the Batcher.// After the size is reached log records are dropped.//// If the OTEL_BLRP_MAX_QUEUE_SIZE environment variable is set,// and this option is not passed, that variable value will be used.//// By default, if an environment variable is not set, and this option is not// passed, 2048 will be used.// The default value is also used when the provided value is less than one.func ( int) BatchProcessorOption {return batchOptionFunc(func( batchConfig) batchConfig {.maxQSize = newSetting()return})}// WithExportInterval sets the maximum duration between batched exports.//// If the OTEL_BLRP_SCHEDULE_DELAY environment variable is set,// and this option is not passed, that variable value will be used.//// By default, if an environment variable is not set, and this option is not// passed, 1s will be used.// The default value is also used when the provided value is less than one.func ( time.Duration) BatchProcessorOption {return batchOptionFunc(func( batchConfig) batchConfig {.expInterval = newSetting()return})}// WithExportTimeout sets the duration after which a batched export is canceled.//// If the OTEL_BLRP_EXPORT_TIMEOUT environment variable is set,// and this option is not passed, that variable value will be used.//// By default, if an environment variable is not set, and this option is not// passed, 30s will be used.// The default value is also used when the provided value is less than one.func ( time.Duration) BatchProcessorOption {return batchOptionFunc(func( batchConfig) batchConfig {.expTimeout = newSetting()return})}// WithExportMaxBatchSize sets the maximum batch size of every export.// A batch will be split into multiple exports to not exceed this size.//// If the OTEL_BLRP_MAX_EXPORT_BATCH_SIZE environment variable is set,// and this option is not passed, that variable value will be used.//// By default, if an environment variable is not set, and this option is not// passed, 512 will be used.// The default value is also used when the provided value is less than one.func ( int) BatchProcessorOption {return batchOptionFunc(func( batchConfig) batchConfig {.expMaxBatchSize = newSetting()return})}// WithExportBufferSize sets the batch buffer size.// Batches will be temporarily kept in a memory buffer until they are exported.//// By default, a value of 1 will be used.// The default value is also used when the provided value is less than one.func ( int) BatchProcessorOption {return batchOptionFunc(func( batchConfig) batchConfig {.expBufferSize = newSetting()return})}
![]() |
The pages are generated with Golds v0.8.2. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |