// Copyright The OpenTelemetry Authors// SPDX-License-Identifier: Apache-2.0package otlptracegrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"import (coltracepbtracepb)type client struct { endpoint string dialOpts []grpc.DialOption metadata metadata.MD exportTimeout time.Duration requestFunc retry.RequestFunc// stopCtx is used as a parent context for all exports. Therefore, when it // is canceled with the stopFunc all exports are canceled. stopCtx context.Context// stopFunc cancels stopCtx, stopping any active exports. stopFunc context.CancelFunc// ourConn keeps track of where conn was created: true if created here on // Start, or false if passed with an option. This is important on Shutdown // as the conn should only be closed if created here on start. Otherwise, // it is up to the processes that passed the conn to close it. ourConn bool conn *grpc.ClientConn tscMu sync.RWMutex tsc coltracepb.TraceServiceClient}// Compile time check *client implements otlptrace.Client.var _ otlptrace.Client = (*client)(nil)// NewClient creates a new gRPC trace client.func ( ...Option) otlptrace.Client {returnnewClient(...)}func newClient( ...Option) *client { := otlpconfig.NewGRPCConfig(asGRPCOptions()...) , := context.WithCancel(context.Background()) := &client{endpoint: .Traces.Endpoint,exportTimeout: .Traces.Timeout,requestFunc: .RetryConfig.RequestFunc(retryable),dialOpts: .DialOptions,stopCtx: ,stopFunc: ,conn: .GRPCConn, }iflen(.Traces.Headers) > 0 { .metadata = metadata.New(.Traces.Headers) }return}// Start establishes a gRPC connection to the collector.func ( *client) (context.Context) error {if .conn == nil {// If the caller did not provide a ClientConn when the client was // created, create one using the configuration they did provide. , := grpc.NewClient(.endpoint, .dialOpts...)if != nil {return }// Keep track that we own the lifecycle of this conn and need to close // it on Shutdown. .ourConn = true .conn = }// The otlptrace.Client interface states this method is called just once, // so no need to check if already started. .tscMu.Lock() .tsc = coltracepb.NewTraceServiceClient(.conn) .tscMu.Unlock()returnnil}var errAlreadyStopped = errors.New("the client is already stopped")// Stop shuts down the client.//// Any active connections to a remote endpoint are closed if they were created// by the client. Any gRPC connection passed during creation using// WithGRPCConn will not be closed. It is the caller's responsibility to// handle cleanup of that resource.//// This method synchronizes with the UploadTraces method of the client. It// will wait for any active calls to that method to complete unimpeded, or it// will cancel any active calls if ctx expires. If ctx expires, the context// error will be forwarded as the returned error. All client held resources// will still be released in this situation.//// If the client has already stopped, an error will be returned describing// this.func ( *client) ( context.Context) error {// Make sure to return context error if the context is done when calling this method. := .Err()// Acquire the c.tscMu lock within the ctx lifetime. := make(chanstruct{})gofunc() { .tscMu.Lock()close() }()select {case<-.Done():// The Stop timeout is reached. Kill any remaining exports to force // the clear of the lock and save the timeout error to return and // signal the shutdown timed out before cleanly stopping. .stopFunc() = .Err()// To ensure the client is not left in a dirty state c.tsc needs to be // set to nil. To avoid the race condition when doing this, ensure // that all the exports are killed (initiated by c.stopFunc). <-case<-: }// Hold the tscMu lock for the rest of the function to ensure no new // exports are started.defer .tscMu.Unlock()// The otlptrace.Client interface states this method is called only // once, but there is no guarantee it is called after Start. Ensure the // client is started before doing anything and let the called know if they // made a mistake.if .tsc == nil {returnerrAlreadyStopped }// Clear c.tsc to signal the client is stopped. .tsc = nilif .ourConn { := .conn.Close()// A context timeout error takes precedence over this error.if == nil && != nil { = } }return}var errShutdown = errors.New("the client is shutdown")// UploadTraces sends a batch of spans.//// Retryable errors from the server will be handled according to any// RetryConfig the client was created with.func ( *client) ( context.Context, []*tracepb.ResourceSpans) error {// Hold a read lock to ensure a shut down initiated after this starts does // not abandon the export. This read lock acquire has less priority than a // write lock acquire (i.e. Stop), meaning if the client is shutting down // this will come after the shut down. .tscMu.RLock()defer .tscMu.RUnlock()if .tsc == nil {returnerrShutdown } , := .exportContext()defer ()return .requestFunc(, func( context.Context) error { , := .tsc.Export(, &coltracepb.ExportTraceServiceRequest{ResourceSpans: , })if != nil && .PartialSuccess != nil { := .PartialSuccess.GetErrorMessage() := .PartialSuccess.GetRejectedSpans()if != 0 || != "" { := internal.TracePartialSuccessError(, )otel.Handle() } }// nil is converted to OK.ifstatus.Code() == codes.OK {// Success.returnnil }return })}// exportContext returns a copy of parent with an appropriate deadline and// cancellation function.//// It is the callers responsibility to cancel the returned context once its// use is complete, via the parent or directly with the returned CancelFunc, to// ensure all resources are correctly released.func ( *client) ( context.Context) (context.Context, context.CancelFunc) {var (context.Contextcontext.CancelFunc )if .exportTimeout > 0 { , = context.WithTimeoutCause(, .exportTimeout, errors.New("exporter export timeout")) } else { , = context.WithCancel() }if .metadata.Len() > 0 { := .metadataif , := metadata.FromOutgoingContext(); { = metadata.Join(, ) } = metadata.NewOutgoingContext(, ) }// Unify the client stopCtx with the parent.gofunc() {select {case<-.Done():case<-.stopCtx.Done():// Cancel the export as the shutdown has timed out. () } }()return , }// retryable returns if err identifies a request that can be retried and a// duration to wait for if an explicit throttle time is included in err.func retryable( error) (bool, time.Duration) { := status.Convert()returnretryableGRPCStatus()}func retryableGRPCStatus( *status.Status) (bool, time.Duration) {switch .Code() {casecodes.Canceled,codes.DeadlineExceeded,codes.Aborted,codes.OutOfRange,codes.Unavailable,codes.DataLoss:// Additionally handle RetryInfo. , := throttleDelay()returntrue, casecodes.ResourceExhausted:// Retry only if the server signals that the recovery from resource exhaustion is possible.returnthrottleDelay() }// Not a retry-able error.returnfalse, 0}// throttleDelay returns of the status is RetryInfo// and the its duration to wait for if an explicit throttle time.func throttleDelay( *status.Status) (bool, time.Duration) {for , := range .Details() {if , := .(*errdetails.RetryInfo); {returntrue, .RetryDelay.AsDuration() } }returnfalse, 0}// MarshalLog is the marshaling function used by the logging system to represent this Client.func ( *client) () any {returnstruct {stringstring }{ : "otlptracegrpc", : .endpoint, }}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.