// Copyright The OpenTelemetry Authors// SPDX-License-Identifier: Apache-2.0package log // import "go.opentelemetry.io/otel/sdk/log"import ()// Exporter handles the delivery of log records to external receivers.typeExporterinterface {// Export transmits log records to a receiver. // // The deadline or cancellation of the passed context must be honored. An // appropriate error should be returned in these situations. // // All retry logic must be contained in this function. The SDK does not // implement any retry logic. All errors returned by this function are // considered unrecoverable and will be reported to a configured error // Handler. // // Implementations must not retain the records slice. // // Before modifying a Record, the implementation must use Record.Clone // to create a copy that shares no state with the original. // // Export should never be called concurrently with other Export calls. // However, it may be called concurrently with other methods.Export(ctx context.Context, records []Record) error// Shutdown is called when the SDK shuts down. Any cleanup or release of // resources held by the exporter should be done in this call. // // The deadline or cancellation of the passed context must be honored. An // appropriate error should be returned in these situations. // // After Shutdown is called, calls to Export, Shutdown, or ForceFlush // should perform no operation and return nil error. // // Shutdown may be called concurrently with itself or with other methods.Shutdown(ctx context.Context) error// ForceFlush exports log records to the configured Exporter that have not yet // been exported. // // The deadline or cancellation of the passed context must be honored. An // appropriate error should be returned in these situations. // // ForceFlush may be called concurrently with itself or with other methods.ForceFlush(ctx context.Context) error}var defaultNoopExporter = &noopExporter{}type noopExporter struct{}func (noopExporter) (context.Context, []Record) error { returnnil }func (noopExporter) (context.Context) error { returnnil }func (noopExporter) (context.Context) error { returnnil }// chunkExporter wraps an Exporter's Export method so it is called with// appropriately sized export payloads. Any payload larger than a defined size// is chunked into smaller payloads and exported sequentially.type chunkExporter struct {Exporter// size is the maximum batch size exported. size int}// newChunkExporter wraps exporter. Calls to the Export will have their records// payload chunked so they do not exceed size. If size is less than or equal// to 0, exporter is returned directly.func newChunkExporter( Exporter, int) Exporter {if <= 0 {return }return &chunkExporter{Exporter: , size: }}// Export exports records in chunks no larger than c.size.func ( chunkExporter) ( context.Context, []Record) error { := len()for , := 0, min(.size, ); < ; , = +.size, min(+.size, ) {if := .Exporter.Export(, [:]); != nil {return } }returnnil}// timeoutExporter wraps an Exporter and ensures any call to Export will have a// timeout for the context.type timeoutExporter struct {Exporter// timeout is the maximum time an export is attempted. timeout time.Duration}// newTimeoutExporter wraps exporter with an Exporter that limits the context// lifetime passed to Export to be timeout. If timeout is less than or equal to// zero, exporter will be returned directly.func newTimeoutExporter( Exporter, time.Duration) Exporter {if <= 0 {return }return &timeoutExporter{Exporter: , timeout: }}// Export sets the timeout of ctx before calling the Exporter e wraps.func ( *timeoutExporter) ( context.Context, []Record) error {// This only used by the batch processor, and it takes processor timeout config. // Thus, the error message points to the processor. So users know they should adjust the processor timeout. , := context.WithTimeoutCause(, .timeout, errors.New("processor export timeout"))defer ()return .Exporter.Export(, )}// exportSync exports all data from input using exporter in a spawned// goroutine. The returned chan will be closed when the spawned goroutine// completes.func exportSync( <-chanexportData, Exporter) ( chanstruct{}) { = make(chanstruct{})gofunc() {deferclose()for := range { .DoExport(.Export) } }()return}// exportData is data related to an export.type exportData struct { ctx context.Context records []Record// respCh is the channel any error returned from the export will be sent // on. If this is nil, and the export error is non-nil, the error will // passed to the OTel error handler. respCh chan<- error}// DoExport calls exportFn with the data contained in e. The error response// will be returned on e's respCh if not nil. The error will be handled by the// default OTel error handle if it is not nil and respCh is nil or full.func ( exportData) ( func(context.Context, []Record) error) {iflen(.records) == 0 { .respond(nil)return } .respond((.ctx, .records))}func ( exportData) ( error) {select {case .respCh<- :default:// e.respCh is nil or busy, default to otel.Handler.if != nil {otel.Handle() } }}// bufferExporter provides asynchronous and synchronous export functionality by// buffering export requests.type bufferExporter struct {Exporter input chanexportData inputMu sync.Mutex done chanstruct{} stopped atomic.Bool}// newBufferExporter returns a new bufferExporter that wraps exporter. The// returned bufferExporter will buffer at most size number of export requests.// If size is less than 1, 1 will be used.func newBufferExporter( Exporter, int) *bufferExporter {if < 1 { = 1 } := make(chanexportData, )return &bufferExporter{Exporter: ,input: ,done: exportSync(, ), }}func ( *bufferExporter) () bool {returnlen(.input) != cap(.input)}var errStopped = errors.New("exporter stopped")func ( *bufferExporter) ( context.Context, []Record, chan<- error) error { := exportData{, , } .inputMu.Lock()defer .inputMu.Unlock()// Check stopped before enqueueing now that e.inputMu is held. This // prevents sends on a closed chan when Shutdown is called concurrently.if .stopped.Load() {returnerrStopped }select {case .input<- :case<-.Done():return .Err() }returnnil}// EnqueueExport enqueues an export of records in the context of ctx to be// performed asynchronously. This will return true if the records are// successfully enqueued (or the bufferExporter is shut down), false otherwise.//// The passed records are held after this call returns.func ( *bufferExporter) ( []Record) bool {iflen() == 0 {// Nothing to enqueue, do not waste input space.returntrue } := exportData{ctx: context.Background(), records: } .inputMu.Lock()defer .inputMu.Unlock()// Check stopped before enqueueing now that e.inputMu is held. This // prevents sends on a closed chan when Shutdown is called concurrently.if .stopped.Load() {returntrue }select {case .input<- :returntruedefault:returnfalse }}// Export synchronously exports records in the context of ctx. This will not// return until the export has been completed.func ( *bufferExporter) ( context.Context, []Record) error {iflen() == 0 {returnnil } := make(chanerror, 1) := .enqueue(, , )if != nil {iferrors.Is(, errStopped) {returnnil }returnfmt.Errorf("%w: dropping %d records", , len()) }select {case := <-:returncase<-.Done():return .Err() }}// ForceFlush flushes buffered exports. Any existing exports that is buffered// is flushed before this returns.func ( *bufferExporter) ( context.Context) error { := make(chanerror, 1) := .enqueue(, nil, )if != nil {iferrors.Is(, errStopped) {returnnil }return }select {case<-:case<-.Done():return .Err() }return .Exporter.ForceFlush()}// Shutdown shuts down e.//// Any buffered exports are flushed before this returns.//// All calls to EnqueueExport or Exporter will return nil without any export// after this is called.func ( *bufferExporter) ( context.Context) error {if .stopped.Swap(true) {returnnil } .inputMu.Lock()defer .inputMu.Unlock()// No more sends will be made.close(.input)select {case<-.done:case<-.Done():returnerrors.Join(.Err(), .Exporter.Shutdown()) }return .Exporter.Shutdown()}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.