// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package log // import "go.opentelemetry.io/otel/sdk/log"

import (
	
	
	
	
	
	

	
)

// Exporter handles the delivery of log records to external receivers.
type Exporter interface {
	// Export transmits log records to a receiver.
	//
	// The deadline or cancellation of the passed context must be honored. An
	// appropriate error should be returned in these situations.
	//
	// All retry logic must be contained in this function. The SDK does not
	// implement any retry logic. All errors returned by this function are
	// considered unrecoverable and will be reported to a configured error
	// Handler.
	//
	// Implementations must not retain the records slice.
	//
	// Before modifying a Record, the implementation must use Record.Clone
	// to create a copy that shares no state with the original.
	//
	// Export should never be called concurrently with other Export calls.
	// However, it may be called concurrently with other methods.
	Export(ctx context.Context, records []Record) error

	// Shutdown is called when the SDK shuts down. Any cleanup or release of
	// resources held by the exporter should be done in this call.
	//
	// The deadline or cancellation of the passed context must be honored. An
	// appropriate error should be returned in these situations.
	//
	// After Shutdown is called, calls to Export, Shutdown, or ForceFlush
	// should perform no operation and return nil error.
	//
	// Shutdown may be called concurrently with itself or with other methods.
	Shutdown(ctx context.Context) error

	// ForceFlush exports log records to the configured Exporter that have not yet
	// been exported.
	//
	// The deadline or cancellation of the passed context must be honored. An
	// appropriate error should be returned in these situations.
	//
	// ForceFlush may be called concurrently with itself or with other methods.
	ForceFlush(ctx context.Context) error
}

var defaultNoopExporter = &noopExporter{}

type noopExporter struct{}

func (noopExporter) (context.Context, []Record) error { return nil }

func (noopExporter) (context.Context) error { return nil }

func (noopExporter) (context.Context) error { return nil }

// chunkExporter wraps an Exporter's Export method so it is called with
// appropriately sized export payloads. Any payload larger than a defined size
// is chunked into smaller payloads and exported sequentially.
type chunkExporter struct {
	Exporter

	// size is the maximum batch size exported.
	size int
}

// newChunkExporter wraps exporter. Calls to the Export will have their records
// payload chunked so they do not exceed size. If size is less than or equal
// to 0, exporter is returned directly.
func newChunkExporter( Exporter,  int) Exporter {
	if  <= 0 {
		return 
	}
	return &chunkExporter{Exporter: , size: }
}

// Export exports records in chunks no larger than c.size.
func ( chunkExporter) ( context.Context,  []Record) error {
	 := len()
	for ,  := 0, min(.size, );  < ; ,  = +.size, min(+.size, ) {
		if  := .Exporter.Export(, [:]);  != nil {
			return 
		}
	}
	return nil
}

// timeoutExporter wraps an Exporter and ensures any call to Export will have a
// timeout for the context.
type timeoutExporter struct {
	Exporter

	// timeout is the maximum time an export is attempted.
	timeout time.Duration
}

// newTimeoutExporter wraps exporter with an Exporter that limits the context
// lifetime passed to Export to be timeout. If timeout is less than or equal to
// zero, exporter will be returned directly.
func newTimeoutExporter( Exporter,  time.Duration) Exporter {
	if  <= 0 {
		return 
	}
	return &timeoutExporter{Exporter: , timeout: }
}

// Export sets the timeout of ctx before calling the Exporter e wraps.
func ( *timeoutExporter) ( context.Context,  []Record) error {
	// This only used by the batch processor, and it takes processor timeout config.
	// Thus, the error message points to the processor. So users know they should adjust the processor timeout.
	,  := context.WithTimeoutCause(, .timeout, errors.New("processor export timeout"))
	defer ()
	return .Exporter.Export(, )
}

// exportSync exports all data from input using exporter in a spawned
// goroutine. The returned chan will be closed when the spawned goroutine
// completes.
func exportSync( <-chan exportData,  Exporter) ( chan struct{}) {
	 = make(chan struct{})
	go func() {
		defer close()
		for  := range  {
			.DoExport(.Export)
		}
	}()
	return 
}

// exportData is data related to an export.
type exportData struct {
	ctx     context.Context
	records []Record

	// respCh is the channel any error returned from the export will be sent
	// on. If this is nil, and the export error is non-nil, the error will
	// passed to the OTel error handler.
	respCh chan<- error
}

// DoExport calls exportFn with the data contained in e. The error response
// will be returned on e's respCh if not nil. The error will be handled by the
// default OTel error handle if it is not nil and respCh is nil or full.
func ( exportData) ( func(context.Context, []Record) error) {
	if len(.records) == 0 {
		.respond(nil)
		return
	}

	.respond((.ctx, .records))
}

func ( exportData) ( error) {
	select {
	case .respCh <- :
	default:
		// e.respCh is nil or busy, default to otel.Handler.
		if  != nil {
			otel.Handle()
		}
	}
}

// bufferExporter provides asynchronous and synchronous export functionality by
// buffering export requests.
type bufferExporter struct {
	Exporter

	input   chan exportData
	inputMu sync.Mutex

	done    chan struct{}
	stopped atomic.Bool
}

// newBufferExporter returns a new bufferExporter that wraps exporter. The
// returned bufferExporter will buffer at most size number of export requests.
// If size is less than 1, 1 will be used.
func newBufferExporter( Exporter,  int) *bufferExporter {
	if  < 1 {
		 = 1
	}
	 := make(chan exportData, )
	return &bufferExporter{
		Exporter: ,

		input: ,
		done:  exportSync(, ),
	}
}

func ( *bufferExporter) () bool {
	return len(.input) != cap(.input)
}

var errStopped = errors.New("exporter stopped")

func ( *bufferExporter) ( context.Context,  []Record,  chan<- error) error {
	 := exportData{, , }

	.inputMu.Lock()
	defer .inputMu.Unlock()

	// Check stopped before enqueueing now that e.inputMu is held. This
	// prevents sends on a closed chan when Shutdown is called concurrently.
	if .stopped.Load() {
		return errStopped
	}

	select {
	case .input <- :
	case <-.Done():
		return .Err()
	}
	return nil
}

// EnqueueExport enqueues an export of records in the context of ctx to be
// performed asynchronously. This will return true if the records are
// successfully enqueued (or the bufferExporter is shut down), false otherwise.
//
// The passed records are held after this call returns.
func ( *bufferExporter) ( []Record) bool {
	if len() == 0 {
		// Nothing to enqueue, do not waste input space.
		return true
	}

	 := exportData{ctx: context.Background(), records: }

	.inputMu.Lock()
	defer .inputMu.Unlock()

	// Check stopped before enqueueing now that e.inputMu is held. This
	// prevents sends on a closed chan when Shutdown is called concurrently.
	if .stopped.Load() {
		return true
	}

	select {
	case .input <- :
		return true
	default:
		return false
	}
}

// Export synchronously exports records in the context of ctx. This will not
// return until the export has been completed.
func ( *bufferExporter) ( context.Context,  []Record) error {
	if len() == 0 {
		return nil
	}

	 := make(chan error, 1)
	 := .enqueue(, , )
	if  != nil {
		if errors.Is(, errStopped) {
			return nil
		}
		return fmt.Errorf("%w: dropping %d records", , len())
	}

	select {
	case  := <-:
		return 
	case <-.Done():
		return .Err()
	}
}

// ForceFlush flushes buffered exports. Any existing exports that is buffered
// is flushed before this returns.
func ( *bufferExporter) ( context.Context) error {
	 := make(chan error, 1)
	 := .enqueue(, nil, )
	if  != nil {
		if errors.Is(, errStopped) {
			return nil
		}
		return 
	}

	select {
	case <-:
	case <-.Done():
		return .Err()
	}
	return .Exporter.ForceFlush()
}

// Shutdown shuts down e.
//
// Any buffered exports are flushed before this returns.
//
// All calls to EnqueueExport or Exporter will return nil without any export
// after this is called.
func ( *bufferExporter) ( context.Context) error {
	if .stopped.Swap(true) {
		return nil
	}
	.inputMu.Lock()
	defer .inputMu.Unlock()

	// No more sends will be made.
	close(.input)
	select {
	case <-.done:
	case <-.Done():
		return errors.Join(.Err(), .Exporter.Shutdown())
	}
	return .Exporter.Shutdown()
}