package observ
import (
"context"
"errors"
"fmt"
"sync"
"time"
"google.golang.org/grpc/codes"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/x"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.40.0"
"go.opentelemetry.io/otel/semconv/v1.40.0/otelconv"
)
const (
ScopeName = "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/observ"
SchemaURL = semconv .SchemaURL
Version = internal .Version
)
var (
measureAttrsPool = &sync .Pool {
New : func () any {
const n = 1 +
1 +
1 +
1 +
1 +
1
s := make ([]attribute .KeyValue , 0 , n )
return &s
},
}
addOptPool = &sync .Pool {
New : func () any {
const n = 1
o := make ([]metric .AddOption , 0 , n )
return &o
},
}
recordOptPool = &sync .Pool {
New : func () any {
const n = 1
o := make ([]metric .RecordOption , 0 , n )
return &o
},
}
)
func get[T any ](p *sync .Pool ) *[]T { return p .Get ().(*[]T ) }
func put[T any ](p *sync .Pool , s *[]T ) {
*s = (*s )[:0 ]
p .Put (s )
}
func ComponentName (id int64 ) string {
t := semconv .OTelComponentTypeOtlpGRPCSpanExporter .Value .AsString ()
return fmt .Sprintf ("%s/%d" , t , id )
}
type Instrumentation struct {
inflightSpans metric .Int64UpDownCounter
exportedSpans metric .Int64Counter
opDuration metric .Float64Histogram
attrs []attribute .KeyValue
addOpt metric .AddOption
recOpt metric .RecordOption
}
func NewInstrumentation (id int64 , target string ) (*Instrumentation , error ) {
if !x .Observability .Enabled () {
return nil , nil
}
attrs := BaseAttrs (id , target )
i := &Instrumentation {
attrs : attrs ,
addOpt : metric .WithAttributeSet (attribute .NewSet (attrs ...)),
recOpt : metric .WithAttributeSet (attribute .NewSet (append (
[]attribute .KeyValue {
semconv .RPCResponseStatusCode (codes .OK .String ()),
},
attrs ...,
)...)),
}
mp := otel .GetMeterProvider ()
m := mp .Meter (
ScopeName ,
metric .WithInstrumentationVersion (Version ),
metric .WithSchemaURL (SchemaURL ),
)
var err error
inflightSpans , e := otelconv .NewSDKExporterSpanInflight (m )
if e != nil {
e = fmt .Errorf ("failed to create span inflight metric: %w" , e )
err = errors .Join (err , e )
}
i .inflightSpans = inflightSpans .Inst ()
exportedSpans , e := otelconv .NewSDKExporterSpanExported (m )
if e != nil {
e = fmt .Errorf ("failed to create span exported metric: %w" , e )
err = errors .Join (err , e )
}
i .exportedSpans = exportedSpans .Inst ()
opDuration , e := otelconv .NewSDKExporterOperationDuration (m )
if e != nil {
e = fmt .Errorf ("failed to create operation duration metric: %w" , e )
err = errors .Join (err , e )
}
i .opDuration = opDuration .Inst ()
return i , err
}
func BaseAttrs (id int64 , target string ) []attribute .KeyValue {
host , port , err := ParseCanonicalTarget (target )
if err != nil || (host == "" && port < 0 ) {
if err != nil {
global .Debug ("failed to parse target" , "target" , target , "error" , err )
}
return []attribute .KeyValue {
semconv .OTelComponentName (ComponentName (id )),
semconv .OTelComponentTypeOtlpGRPCSpanExporter ,
}
}
if port < 0 {
return []attribute .KeyValue {
semconv .OTelComponentName (ComponentName (id )),
semconv .OTelComponentTypeOtlpGRPCSpanExporter ,
semconv .ServerAddress (host ),
}
}
if host == "" {
return []attribute .KeyValue {
semconv .OTelComponentName (ComponentName (id )),
semconv .OTelComponentTypeOtlpGRPCSpanExporter ,
semconv .ServerPort (port ),
}
}
return []attribute .KeyValue {
semconv .OTelComponentName (ComponentName (id )),
semconv .OTelComponentTypeOtlpGRPCSpanExporter ,
semconv .ServerAddress (host ),
semconv .ServerPort (port ),
}
}
func (i *Instrumentation ) ExportSpans (ctx context .Context , nSpans int ) ExportOp {
start := time .Now ()
if i .inflightSpans .Enabled (ctx ) {
addOpt := get [metric .AddOption ](addOptPool )
defer put (addOptPool , addOpt )
*addOpt = append (*addOpt , i .addOpt )
i .inflightSpans .Add (ctx , int64 (nSpans ), *addOpt ...)
}
return ExportOp {
ctx : ctx ,
start : start ,
nSpans : int64 (nSpans ),
inst : i ,
}
}
type ExportOp struct {
ctx context .Context
start time .Time
nSpans int64
inst *Instrumentation
}
func (e ExportOp ) End (err error , code codes .Code ) {
addOpt := get [metric .AddOption ](addOptPool )
defer put (addOptPool , addOpt )
*addOpt = append (*addOpt , e .inst .addOpt )
if e .inst .inflightSpans .Enabled (e .ctx ) {
e .inst .inflightSpans .Add (e .ctx , -e .nSpans , *addOpt ...)
}
success := successful (e .nSpans , err )
if e .inst .exportedSpans .Enabled (e .ctx ) {
e .inst .exportedSpans .Add (e .ctx , success , *addOpt ...)
}
if err != nil && e .inst .exportedSpans .Enabled (e .ctx ) {
attrs := get [attribute .KeyValue ](measureAttrsPool )
defer put (measureAttrsPool , attrs )
*attrs = append (*attrs , e .inst .attrs ...)
*attrs = append (*attrs , semconv .ErrorType (err ))
o := metric .WithAttributeSet (attribute .NewSet (*attrs ...))
*addOpt = append ((*addOpt )[:0 ], o )
e .inst .exportedSpans .Add (e .ctx , e .nSpans -success , *addOpt ...)
}
if e .inst .opDuration .Enabled (e .ctx ) {
recOpt := get [metric .RecordOption ](recordOptPool )
defer put (recordOptPool , recOpt )
*recOpt = append (*recOpt , e .inst .recordOption (err , code ))
d := time .Since (e .start ).Seconds ()
e .inst .opDuration .Record (e .ctx , d , *recOpt ...)
}
}
func (i *Instrumentation ) recordOption (err error , code codes .Code ) metric .RecordOption {
if err == nil && code == codes .OK {
return i .recOpt
}
attrs := get [attribute .KeyValue ](measureAttrsPool )
defer put (measureAttrsPool , attrs )
*attrs = append (*attrs , i .attrs ...)
*attrs = append (*attrs , semconv .RPCResponseStatusCode (code .String ()))
if err != nil {
*attrs = append (*attrs , semconv .ErrorType (err ))
}
return metric .WithAttributeSet (attribute .NewSet (*attrs ...))
}
func successful(n int64 , err error ) int64 {
if err == nil {
return n
}
return n - rejected (n , err )
}
var errPartialPool = &sync .Pool {
New : func () any { return new (internal .PartialSuccess ) },
}
func rejected(n int64 , err error ) int64 {
ps := errPartialPool .Get ().(*internal .PartialSuccess )
defer errPartialPool .Put (ps )
if errors .As (err , ps ) {
return min (max (ps .RejectedItems , 0 ), n )
}
return n
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .