package telemetry
import (
"context"
"fmt"
"log"
"os"
"slices"
"strconv"
"strings"
"sync"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
olog "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/propagation"
ologsdk "go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
"go.opentelemetry.io/otel/trace"
"github.com/pancsta/asyncmachine-go/internal/utils"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
ssam "github.com/pancsta/asyncmachine-go/pkg/states"
)
const maxHist = 300
type OtelMachineData struct {
ID string
Index int
Ended bool
mx sync .Mutex
machTrace context .Context
txTrace context .Context
stateNames map [string ]context .Context
stateInstances map [string ]context .Context
stateGroup context .Context
txGroup context .Context
txHist []OtelTxHist
}
type OtelTxHist struct {
Id string
Ctx context .Context
}
type OtelMachTracerOpts struct {
SkipTransitions bool
IncludeHealth bool
SkipLogArgs bool
SkipAuto bool
AllowStates am .S
SkipStates am .S
Logf func (format string , args ...any )
}
type OtelMachTracer struct {
*am .TracerNoOp
Tracer trace .Tracer
Machines map [string ]*OtelMachineData
MachinesMx sync .Mutex
MachinesOrder []string
RootSpan trace .Span
Logf func (format string , args ...any )
parentSpans map [string ]trace .Span
parents map [string ]string
parentsMx sync .Mutex
parentSpansMx sync .Mutex
opts *OtelMachTracerOpts
ended bool
NextIndex int
}
var _ am .Tracer = (*OtelMachTracer )(nil )
func NewOtelMachTracer (
rootMach am .Api , rootSpan trace .Span , otelTracer trace .Tracer ,
opts *OtelMachTracerOpts ,
) *OtelMachTracer {
if otelTracer == nil {
panic ("nil tracer" )
}
if opts == nil {
opts = &OtelMachTracerOpts {}
}
mt := &OtelMachTracer {
Tracer : otelTracer ,
Machines : make (map [string ]*OtelMachineData ),
RootSpan : rootSpan ,
opts : opts ,
parentSpans : make (map [string ]trace .Span ),
parents : make (map [string ]string ),
}
mt .RootSpan .End ()
if opts .Logf != nil {
mt .Logf = opts .Logf
} else if am .EnvLogLevel ("" ) >= am .LogDecisions {
mt .Logf = rootMach .Log
} else {
mt .Logf = func (format string , args ...any ) {}
}
return mt
}
func (mt *OtelMachTracer ) getMachineData (
id string , locked bool ,
) *OtelMachineData {
if !locked {
mt .MachinesMx .Lock ()
defer mt .MachinesMx .Unlock ()
}
if data , ok := mt .Machines [id ]; ok {
return data
}
data := &OtelMachineData {
stateInstances : make (map [string ]context .Context ),
stateNames : make (map [string ]context .Context ),
}
if !mt .ended {
mt .Logf ("[otel] getMachineData: creating for %s" , id )
mt .Machines [id ] = data
mt .MachinesOrder = append (mt .MachinesOrder , id )
}
return data
}
func (mt *OtelMachTracer ) MachineInit (mach am .Api ) context .Context {
id := mach .Id ()
mt .parentsMx .Lock ()
index := mt .NextIndex
mt .NextIndex ++
name := "mach:" + strconv .Itoa (index ) + ":" + id
mach .Log ("[bind] otel traces" )
mt .Logf ("[otel] MachineInit: trace %s" , id )
ctx := mach .Ctx ()
pid , ok := mt .parents [id ]
if ok {
if parentSpan , ok := mt .parentSpans [pid ]; ok {
ctx = trace .ContextWithSpan (ctx , parentSpan )
}
} else {
ctx = trace .ContextWithSpan (ctx , mt .RootSpan )
}
mt .parentsMx .Unlock ()
machCtx , machSpan := mt .Tracer .Start (ctx , name , trace .WithAttributes (
attribute .String ("id" , id ),
))
data := mt .getMachineData (id , false )
data .machTrace = machCtx
data .ID = id
data .Index = index
machSpan .End ()
stateGroupCtx , stateGroupSpan := mt .Tracer .Start (machCtx ,
strconv .Itoa (index )+":states" ,
trace .WithAttributes (attribute .String ("mach_id" , id )))
data .stateGroup = stateGroupCtx
stateGroupSpan .End ()
if !mt .opts .SkipTransitions {
txGroupCtx , txGroupSpan := mt .Tracer .Start (machCtx ,
strconv .Itoa (index )+":transitions" ,
trace .WithAttributes (attribute .String ("mach_id" , id )))
data .txGroup = txGroupCtx
txGroupSpan .End ()
}
return machCtx
}
func (mt *OtelMachTracer ) NewSubmachine (parent , mach am .Api ) {
if mt .ended {
mt .Logf ("[otel] NewSubmachine: tracer already ended, ignoring %s" ,
mach .Id ())
return
}
dbgRpc := os .Getenv ("AM_RPC_DBG" ) != ""
for _ , tag := range mach .Tags () {
if strings .HasPrefix (tag , "rpc-" ) && !dbgRpc {
return
}
}
err := mach .BindTracer (mt )
if err != nil {
mt .Logf ("[otel] NewSubmachine: err binding tracer" , mach .Id ())
return
}
mt .parentsMx .Lock ()
_ , ok := mt .parents [mach .Id ()]
if ok {
mt .parentsMx .Unlock ()
mt .Logf ("Submachine already being traced (duplicate ID %s)" , mach .Id ())
return
}
mt .parents [mach .Id ()] = parent .Id ()
mt .parentsMx .Unlock ()
data := mt .getMachineData (parent .Id (), false )
data .mx .Lock ()
defer data .mx .Unlock ()
if data .Ended {
mt .Logf ("[otel] NewSubmachine: parent %s already ended" , parent .Id ())
return
}
mt .parentSpansMx .Lock ()
if _ , ok := mt .parentSpans [parent .Id ()]; !ok {
_ , submachGroupSpan := mt .Tracer .Start (
data .machTrace , strconv .Itoa (data .Index )+":submachines" ,
trace .WithAttributes (attribute .String ("mach_id" , parent .Id ())))
mt .parentSpans [parent .Id ()] = submachGroupSpan
submachGroupSpan .End ()
}
mt .parentSpansMx .Unlock ()
}
func (mt *OtelMachTracer ) MachineDispose (id string ) {
mt .MachinesMx .Lock ()
defer mt .MachinesMx .Unlock ()
mt .doDispose (id )
}
func (mt *OtelMachTracer ) doDispose (id string ) {
data , ok := mt .Machines [id ]
if !ok {
mt .Logf ("[otel] MachineDispose: machine %s not found" , id )
return
}
mt .Logf ("[otel] MachineDispose: disposing %s" , id )
data .mx .Lock ()
defer data .mx .Unlock ()
mt .parentSpansMx .Lock ()
defer mt .parentSpansMx .Unlock ()
delete (mt .parentSpans , id )
delete (mt .Machines , id )
mt .MachinesOrder = utils .SlicesWithout (mt .MachinesOrder , id )
data .Ended = true
if data .txTrace != nil {
trace .SpanFromContext (data .txTrace ).End ()
}
for _ , ctx := range data .stateInstances {
trace .SpanFromContext (ctx ).End ()
}
for _ , ctx := range data .stateNames {
trace .SpanFromContext (ctx ).End ()
}
trace .SpanFromContext (data .stateGroup ).End ()
trace .SpanFromContext (data .txGroup ).End ()
trace .SpanFromContext (data .machTrace ).End ()
}
func (mt *OtelMachTracer ) TransitionInit (tx *am .Transition ) {
mt .MachinesMx .Lock ()
defer mt .MachinesMx .Unlock ()
if mt .ended {
mt .Logf ("[otel] TransitionInit: tracer already ended, ignoring %s" ,
tx .Machine .Id ())
return
}
called := tx .CalledStates ()
isHealth := slices .Contains (called , "Healthcheck" ) ||
slices .Contains (called , "Heartbeat" )
if !mt .opts .IncludeHealth && isHealth {
return
}
data := mt .getMachineData (tx .Machine .Id (), true )
if data .Ended {
mt .Logf ("[otel] TransitionInit: machine %s already ended" , tx .Machine .Id ())
return
}
if mt .opts .SkipTransitions {
return
}
if mt .opts .SkipAuto && tx .IsAuto () {
return
}
src := tx .Mutation .Source
var srcSpan trace .Span
if src != nil && src .TxId != "" && src .MachId != "" {
if srcData , ok := mt .Machines [src .MachId ]; ok {
for _ , srcTx := range srcData .txHist {
if srcTx .Id == src .TxId {
srcSpan = trace .SpanFromContext (srcTx .Ctx )
break
}
}
}
}
mutLabel := fmt .Sprintf ("%d: %s" , data .Index ,
tx .Mutation .StringFromIndex (tx .Machine .StateNames ()))
name := mutLabel
var errAttr error
if slices .Contains (tx .TargetStates (), am .StateException ) {
name = "!" + name
errAttr = am .ParseArgs (tx .Args ()).Err
}
ctx , span := mt .Tracer .Start (data .txGroup , name , trace .WithAttributes (
attribute .String ("tx_id" , tx .Id ),
attribute .Int64 ("time_before" , int64 (tx .TimeBefore .Sum (nil ))),
attribute .String ("mutation" , mutLabel ),
))
if errAttr != nil {
span .SetAttributes (
attribute .String ("error" , errAttr .Error()),
)
}
argsMatcher := tx .Machine .SemLogger ().ArgsMapper ()
if !mt .opts .SkipLogArgs && argsMatcher != nil {
for param , val := range argsMatcher (tx .Args ()) {
span .SetAttributes (
attribute .String ("args." +param , val ),
)
}
}
if srcSpan != nil {
span .AddLink (trace .Link {
SpanContext : srcSpan .SpanContext (),
Attributes : nil ,
})
}
data .mx .Lock ()
defer data .mx .Unlock ()
data .txTrace = ctx
data .txHist = append (data .txHist , OtelTxHist {
Id : tx .Id ,
Ctx : ctx ,
})
if len (data .txHist ) > maxHist {
data .txHist = data .txHist [1 :]
}
}
func (mt *OtelMachTracer ) TransitionEnd (tx *am .Transition ) {
if mt .ended {
mt .Logf ("[otel] TransitionEnd: tracer already ended, ignoring %s" ,
tx .Machine .Id ())
return
}
target := tx .TargetStates ()
called := tx .CalledStates ()
isHealth := slices .Contains (called , "Healthcheck" ) ||
slices .Contains (called , "Heartbeat" )
if !mt .opts .IncludeHealth && isHealth {
return
}
data := mt .getMachineData (tx .Machine .Id (), false )
if data .Ended {
mt .Logf ("[otel] TransitionEnd: machine %s already ended" , tx .Machine .Id ())
return
}
data .mx .Lock ()
defer data .mx .Unlock ()
statesAdded := am .StatesDiff (target , tx .StatesBefore ())
statesRemoved := am .StatesDiff (tx .StatesBefore (), target )
before := tx .ClockBefore ()
for name , tick := range tx .ClockAfter () {
if tick > 1 +before [name ] && !slices .Contains (statesAdded , name ) {
statesAdded = append (statesAdded , name )
}
}
var (
txSpan trace .Span
statesDiff string
)
if !mt .opts .SkipTransitions && data .txTrace != nil {
if len (statesAdded ) > 0 {
statesDiff += "+" + utils .Jw (statesAdded , " +" )
}
if len (statesRemoved ) > 0 {
statesDiff += " -" + utils .Jw (statesRemoved , " -" )
}
txSpan = trace .SpanFromContext (data .txTrace )
txSpan .SetAttributes (
attribute .String ("states_diff" , strings .Trim (statesDiff , " " )),
attribute .Int64 ("time_after" , int64 (tx .TimeAfter .Sum (nil ))),
attribute .Bool ("accepted" , tx .IsAccepted .Load ()),
attribute .Bool ("auto" , tx .IsAuto ()),
attribute .Int ("steps_count" , len (tx .Steps )),
)
for _ , state := range statesRemoved {
if ctx , ok := data .stateInstances [state ]; ok {
txSpan .AddLink (trace .Link {
SpanContext : trace .SpanFromContext (ctx ).SpanContext (),
Attributes : nil ,
})
}
}
defer txSpan .End ()
data .txTrace = nil
}
if !tx .IsAccepted .Load () {
return
}
for _ , state := range statesRemoved {
if ctx , ok := data .stateInstances [state ]; ok {
trace .SpanFromContext (ctx ).End ()
delete (data .stateInstances , state )
}
}
for _ , state := range statesAdded {
if data .Ended {
mt .Logf ("[otel] TransitionEnd: machine %s already ended" , tx .Machine .Id ())
break
}
nameCtx , ok := data .stateNames [state ]
if !ok {
ctx , span := mt .Tracer .Start (data .stateGroup ,
strconv .Itoa (data .Index )+":" +state )
nameCtx = ctx
data .stateNames [state ] = nameCtx
span .End ()
}
_, ok = data .stateInstances [state ]
var ctx context .Context
var instanceSpan trace .Span
if ok {
instanceSpan = trace .SpanFromContext (data .stateInstances [state ])
instanceSpan .AddEvent (tx .Mutation .String ())
} else {
ctx , instanceSpan = mt .Tracer .Start (nameCtx ,
strconv .Itoa (data .Index )+":" +state , trace .WithAttributes (
attribute .String ("tx_id" , tx .Id ),
))
data .stateInstances [state ] = ctx
instanceSpan .AddEvent (tx .Mutation .String ())
}
if txSpan != nil {
txSpan .AddLink (trace .Link {
SpanContext : instanceSpan .SpanContext (),
Attributes : nil ,
})
}
}
}
func (mt *OtelMachTracer ) HandlerEnd (
tx *am .Transition , emitter string , handler string ,
) {
if mt .ended {
return
}
if mt .opts .SkipTransitions {
return
}
data := mt .getMachineData (tx .Machine .Id (), false )
if data .Ended {
return
}
trace .SpanFromContext (data .txTrace ).AddEvent (handler , trace .WithAttributes (
attribute .String ("emitter" , emitter ),
))
}
func (mt *OtelMachTracer ) End () {
mt .MachinesMx .Lock ()
defer mt .MachinesMx .Unlock ()
mt .Logf ("[otel] End" )
mt .ended = true
slices .Reverse (mt .MachinesOrder )
for _ , id := range mt .MachinesOrder {
mt .doDispose (id )
}
mt .RootSpan .End ()
mt .Machines = nil
}
func (mt *OtelMachTracer ) QueueEnd (mach am .Api ) {}
func NewOtelLoggerProvider (exporter ologsdk .Exporter ) *ologsdk .LoggerProvider {
provider := ologsdk .NewLoggerProvider (
ologsdk .WithProcessor (ologsdk .NewBatchProcessor (exporter )),
)
return provider
}
func BindOtelLogger (
mach am .Api , provider *ologsdk .LoggerProvider , service string ,
) {
l := provider .Logger (mach .Id ())
mach .SemLogger ().EnableId (false )
amlog := func (level am .LogLevel , msg string , args ...any ) {
r := olog .Record {}
r .SetTimestamp (time .Now ())
if strings .HasPrefix (msg , "[error" ) {
r .SetSeverity (olog .SeverityError )
} else {
switch level {
case am .LogExternal :
r .SetSeverity (olog .SeverityInfo4 )
r .SetSeverityText (am .LogChanges .String ())
case am .LogChanges :
r .SetSeverity (olog .SeverityInfo3 )
r .SetSeverityText (am .LogChanges .String ())
case am .LogOps :
r .SetSeverity (olog .SeverityInfo1 )
r .SetSeverityText (am .LogOps .String ())
case am .LogDecisions :
r .SetSeverity (olog .SeverityTrace4 )
r .SetSeverityText (am .LogDecisions .String ())
case am .LogEverything :
r .SetSeverity (olog .SeverityTrace1 )
r .SetSeverityText (am .LogEverything .String ())
default :
}
}
r .SetBody (olog .StringValue (fmt .Sprintf (msg , args ...)))
if service != "" {
r .AddAttributes (
olog .String ("service.name" , service ),
)
}
r .AddAttributes (
olog .String ("asyncmachine.id" , mach .Id ()),
)
l .Emit (mach .Ctx (), r )
}
mach .SemLogger ().SetLogger (amlog )
}
func MachBindOtelEnv (mach am .Api ) error {
if mach .ParentId () != "" {
return nil
}
service := os .Getenv (EnvService )
if os .Getenv (EnvOtelTrace ) == "" || service == "" {
return nil
}
ctx := mach .Ctx ()
t , p , err := NewOtelProvider (service , ctx )
if err != nil {
return err
}
_ , rootSpan := t .Start (mach .Ctx (), "root" )
mt := NewOtelMachTracer (mach , rootSpan , t , &OtelMachTracerOpts {
SkipTransitions : os .Getenv (EnvOtelTraceTxs ) == "" ,
SkipLogArgs : os .Getenv (EnvOtelTraceArgs ) == "" ,
SkipAuto : os .Getenv (EnvOtelTraceNoauto ) != "" ,
})
var dispose am .HandlerDispose = func (id string , _ context .Context ) {
tracerCooldown := 100 * time .Millisecond
mt .End ()
time .Sleep (tracerCooldown )
err = p .ForceFlush (ctx )
if err != nil {
log .Printf ("Error flushing tracer: %v" , err )
}
time .Sleep (tracerCooldown )
if err := p .Shutdown (ctx ); err != nil {
log .Printf ("Error shutting down tracer: %v" , err )
}
}
register := ssam .DisposedStates .RegisterDisposal
if mach .Has1 (register ) {
mach .Add1 (register , am .A {
ssam .DisposedArgHandler : dispose ,
})
}
err = mach .BindTracer (mt )
if err != nil {
return err
}
mt .MachineInit (mach )
return nil
}
func NewOtelProvider (
source string , ctx context .Context ,
) (trace .Tracer , *sdktrace .TracerProvider , error ) {
otel .SetTextMapPropagator (
propagation .NewCompositeTextMapPropagator (
propagation .TraceContext {},
propagation .Baggage {},
))
exporter , err := otlptrace .New (ctx ,
otlptracegrpc .NewClient (
otlptracegrpc .WithInsecure (),
),
)
if err != nil {
return nil , nil , err
}
serviceName := source
tp := sdktrace .NewTracerProvider (
sdktrace .WithBatcher (exporter ,
sdktrace .WithMaxExportBatchSize (50 ),
sdktrace .WithBatchTimeout (100 *time .Millisecond ),
),
sdktrace .WithResource (resource .NewWithAttributes (
semconv .SchemaURL ,
semconv .ServiceNameKey .String (serviceName ),
)),
)
otel .SetTracerProvider (tp )
otel .SetTextMapPropagator (propagation .TraceContext {})
return otel .Tracer (source ), tp , nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .