package telemetry
import (
"context"
"fmt"
"log"
"os"
"regexp"
"slices"
"strconv"
"strings"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
olog "go.opentelemetry.io/otel/log"
ologsdk "go.opentelemetry.io/otel/sdk/log"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"github.com/pancsta/asyncmachine-go/internal/utils"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
ssam "github.com/pancsta/asyncmachine-go/pkg/states"
)
const (
maxHist = 300
EnvOtelTrace = "AM_OTEL_TRACE"
EnvOtelTraceTxs = "AM_OTEL_TRACE_TXS"
EnvOtelTraceArgs = "AM_OTEL_TRACE_ARGS"
EnvOtelTraceNoauto = "AM_OTEL_TRACE_NOAUTO"
EnvOtelTraceAllowStates = "AM_OTEL_TRACE_ALLOW_STATES"
EnvOtelTraceAllowStatesRe = "AM_OTEL_TRACE_ALLOW_STATES_RE"
EnvOtelTraceSkipStates = "AM_OTEL_TRACE_SKIP_STATES"
EnvOtelTraceSkipStatesRe = "AM_OTEL_TRACE_SKIP_STATES_RE"
EnvOtelExporterOtlpTracesEndpoint = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
EnvOtelTraceId = "AM_OTEL_TRACE_ID"
EnvOtelSpanId = "AM_OTEL_SPAN_ID"
)
type OtelMachineData struct {
ID string
Index int
Ended bool
mx sync .Mutex
machTrace context .Context
txTrace context .Context
stateNames map [string ]context .Context
stateInstances map [string ]context .Context
stateGroup context .Context
txGroup context .Context
txHist []OtelTxHist
}
type OtelTxHist struct {
Id string
Ctx context .Context
}
type OtelMachTracerOpts struct {
SkipTransitions bool
IncludeHealth bool
SkipLogArgs bool
SkipAuto bool
AllowStates []string
AllowStatesRe *regexp .Regexp
SkipStates []string
SkipStatesRe *regexp .Regexp
Logf func (format string , args ...any )
}
type OtelMachTracer struct {
*am .TracerNoOp
Tracer trace .Tracer
Machines map [string ]*OtelMachineData
MachinesMx sync .Mutex
MachinesOrder []string
RootSpan trace .Span
NextIndex int
Logf func (format string , args ...any )
parentSpans map [string ]trace .Span
parents map [string ]string
parentsMx sync .Mutex
parentSpansMx sync .Mutex
opts *OtelMachTracerOpts
ended bool
}
var _ am .Tracer = (*OtelMachTracer )(nil )
func NewOtelMachTracer (
rootMach am .Api , rootSpan trace .Span , otelTracer trace .Tracer ,
opts *OtelMachTracerOpts ,
) *OtelMachTracer {
if otelTracer == nil {
panic ("nil tracer" )
}
if opts == nil {
opts = &OtelMachTracerOpts {}
}
mt := &OtelMachTracer {
Tracer : otelTracer ,
Machines : make (map [string ]*OtelMachineData ),
RootSpan : rootSpan ,
opts : opts ,
parentSpans : make (map [string ]trace .Span ),
parents : make (map [string ]string ),
}
if mt .RootSpan != nil {
mt .RootSpan .End ()
}
if opts .Logf != nil {
mt .Logf = opts .Logf
} else if am .EnvLogLevel ("" ) >= am .LogDecisions {
mt .Logf = rootMach .Log
} else {
mt .Logf = func (format string , args ...any ) {}
}
return mt
}
func (mt *OtelMachTracer ) getMachineData (
id string , locked bool ,
) *OtelMachineData {
if !locked {
mt .MachinesMx .Lock ()
defer mt .MachinesMx .Unlock ()
}
if data , ok := mt .Machines [id ]; ok {
return data
}
data := &OtelMachineData {
stateInstances : make (map [string ]context .Context ),
stateNames : make (map [string ]context .Context ),
}
if !mt .ended {
mt .Logf ("[otel] getMachineData: creating for %s" , id )
mt .Machines [id ] = data
mt .MachinesOrder = append (mt .MachinesOrder , id )
}
return data
}
func (mt *OtelMachTracer ) MachineInit (mach am .Api ) context .Context {
id := mach .Id ()
mt .parentsMx .Lock ()
index := mt .NextIndex
mt .NextIndex ++
name := "mach:" + strconv .Itoa (index ) + ":" + id
mach .Log ("[otel] bind traces" )
mt .Logf ("[otel] MachineInit: trace %s" , id )
ctx := mach .Context ()
pid , ok := mt .parents [id ]
if ok {
if parentSpan , ok := mt .parentSpans [pid ]; ok {
ctx = trace .ContextWithSpan (ctx , parentSpan )
}
} else if t , s , err := OtelTraceIdFromEnv (); err == nil {
spanCtx := trace .NewSpanContext (trace .SpanContextConfig {
TraceID : *t ,
SpanID : *s ,
TraceFlags : trace .FlagsSampled ,
Remote : true ,
})
ctx = trace .ContextWithRemoteSpanContext (mach .Context (), spanCtx )
} else if mt .RootSpan != nil {
ctx = trace .ContextWithSpan (ctx , mt .RootSpan )
} else {
mach .Log ("[otel] root span missing" )
}
mt .parentsMx .Unlock ()
machCtx , machSpan := mt .Tracer .Start (ctx , name , trace .WithAttributes (
attribute .String ("id" , id ),
))
data := mt .getMachineData (id , false )
data .machTrace = machCtx
data .ID = id
data .Index = index
machSpan .End ()
stateGroupCtx , stateGroupSpan := mt .Tracer .Start (machCtx ,
strconv .Itoa (index )+":states" ,
trace .WithAttributes (attribute .String ("mach_id" , id )))
data .stateGroup = stateGroupCtx
stateGroupSpan .End ()
if !mt .opts .SkipTransitions {
txGroupCtx , txGroupSpan := mt .Tracer .Start (machCtx ,
strconv .Itoa (index )+":transitions" ,
trace .WithAttributes (attribute .String ("mach_id" , id )))
data .txGroup = txGroupCtx
txGroupSpan .End ()
}
return machCtx
}
func (mt *OtelMachTracer ) NewSubmachine (parent , mach am .Api ) {
if mt .ended {
mt .Logf ("[otel] NewSubmachine: tracer already ended, ignoring %s" ,
mach .Id ())
return
}
dbgRpc := os .Getenv ("AM_RPC_DBG" ) != ""
for _ , tag := range mach .Tags () {
if (strings .HasPrefix (tag , "rpc-" ) || tag == "relay" ) && !dbgRpc {
return
}
}
if mt .opts .SkipStatesRe != nil &&
mt .opts .SkipStatesRe .MatchString (mach .Id ()) {
return
}
err := mach .BindTracer (mt )
if err != nil {
mt .Logf ("[otel] NewSubmachine: err binding tracer" , mach .Id ())
return
}
mt .parentsMx .Lock ()
_ , ok := mt .parents [mach .Id ()]
if ok {
mt .parentsMx .Unlock ()
mt .Logf ("Submachine already being traced (duplicate ID %s)" , mach .Id ())
return
}
mt .parents [mach .Id ()] = parent .Id ()
mt .parentsMx .Unlock ()
data := mt .getMachineData (parent .Id (), false )
data .mx .Lock ()
defer data .mx .Unlock ()
if data .Ended {
mt .Logf ("[otel] NewSubmachine: parent %s already ended" , parent .Id ())
return
}
mt .parentSpansMx .Lock ()
if _ , ok := mt .parentSpans [parent .Id ()]; !ok {
_ , submachGroupSpan := mt .Tracer .Start (
data .machTrace , strconv .Itoa (data .Index )+":submachines" ,
trace .WithAttributes (attribute .String ("mach_id" , parent .Id ())))
mt .parentSpans [parent .Id ()] = submachGroupSpan
submachGroupSpan .End ()
}
mt .parentSpansMx .Unlock ()
}
func (mt *OtelMachTracer ) MachineDispose (id string ) {
mt .MachinesMx .Lock ()
defer mt .MachinesMx .Unlock ()
mt .doDispose (id )
}
func (mt *OtelMachTracer ) doDispose (id string ) {
data , ok := mt .Machines [id ]
if !ok {
mt .Logf ("[otel] MachineDispose: machine %s not found" , id )
return
}
mt .Logf ("[otel] MachineDispose: disposing %s" , id )
data .mx .Lock ()
defer data .mx .Unlock ()
mt .parentSpansMx .Lock ()
defer mt .parentSpansMx .Unlock ()
delete (mt .parentSpans , id )
delete (mt .Machines , id )
mt .MachinesOrder = utils .SlicesWithout (mt .MachinesOrder , id )
data .Ended = true
if data .txTrace != nil {
trace .SpanFromContext (data .txTrace ).End ()
}
for _ , ctx := range data .stateInstances {
trace .SpanFromContext (ctx ).End ()
}
for _ , ctx := range data .stateNames {
trace .SpanFromContext (ctx ).End ()
}
trace .SpanFromContext (data .stateGroup ).End ()
trace .SpanFromContext (data .txGroup ).End ()
trace .SpanFromContext (data .machTrace ).End ()
}
func (mt *OtelMachTracer ) TransitionInit (tx *am .Transition ) {
mt .MachinesMx .Lock ()
defer mt .MachinesMx .Unlock ()
if mt .ended {
mt .Logf ("[otel] TransitionInit: tracer already ended, ignoring %s" ,
tx .Machine .Id ())
return
}
called := tx .CalledStates ()
isHealth := slices .Contains (called , am .StateHealthcheck ) ||
slices .Contains (called , am .StateHeartbeat )
if !mt .opts .IncludeHealth && isHealth {
return
}
data := mt .getMachineData (tx .Machine .Id (), true )
if data .Ended {
mt .Logf ("[otel] TransitionInit: machine %s already ended" , tx .Machine .Id ())
return
}
if mt .opts .SkipTransitions {
return
}
if mt .opts .SkipAuto && tx .IsAuto () {
return
}
src := tx .Mutation .Source
var srcSpan trace .Span
if src != nil && src .TxId != "" && src .MachId != "" {
if srcData , ok := mt .Machines [src .MachId ]; ok {
for _ , srcTx := range srcData .txHist {
if srcTx .Id == src .TxId {
srcSpan = trace .SpanFromContext (srcTx .Ctx )
break
}
}
}
}
mutLabel := fmt .Sprintf ("%d: %s" , data .Index ,
tx .Mutation .StringFromIndex (tx .Machine .StateNames ()))
name := mutLabel
var errAttr error
if slices .Contains (tx .TargetStates (), am .StateException ) {
name = "!" + name
errAttr = am .ParseArgs (tx .Args ()).Err
}
ctx , span := mt .Tracer .Start (data .txGroup , name , trace .WithAttributes (
attribute .String ("tx_id" , tx .Id ),
attribute .Int64 ("time_before" , int64 (tx .TimeBefore .Sum (nil ))),
attribute .String ("mutation" , mutLabel ),
))
if errAttr != nil {
span .SetAttributes (
attribute .String ("error" , errAttr .Error()),
)
}
argsMapper := tx .Machine .SemLogger ().ArgsMapper ()
if !mt .opts .SkipLogArgs && argsMapper != nil {
for param , val := range argsMapper (tx .Args ()) {
span .SetAttributes (
attribute .String ("args." +param , val ),
)
}
}
if srcSpan != nil {
span .AddLink (trace .Link {
SpanContext : srcSpan .SpanContext (),
Attributes : nil ,
})
}
data .mx .Lock ()
defer data .mx .Unlock ()
data .txTrace = ctx
data .txHist = append (data .txHist , OtelTxHist {
Id : tx .Id ,
Ctx : ctx ,
})
if len (data .txHist ) > maxHist {
data .txHist = data .txHist [1 :]
}
}
func (mt *OtelMachTracer ) TransitionFinals (tx *am .Transition ) {
if mt .ended {
mt .Logf ("[otel] TransitionEnd: tracer already ended, ignoring %s" ,
tx .Machine .Id ())
return
}
if !tx .IsAccepted .Load () {
return
}
schema := tx .Machine .Schema ()
target := tx .TargetStates ()
called := tx .CalledStates ()
if slices .Contains (called , "BrowserConn" ) {
print ()
}
isHealth := slices .Contains (called , am .StateHealthcheck ) ||
slices .Contains (called , am .StateHeartbeat )
if !mt .opts .IncludeHealth && isHealth {
return
}
data := mt .getMachineData (tx .Machine .Id (), false )
if data .Ended {
mt .Logf ("[otel] TransitionEnd: machine %s already ended" , tx .Machine .Id ())
return
}
data .mx .Lock ()
defer data .mx .Unlock ()
index := tx .Machine .StateNames ()
statesAdded := am .StatesDiff (target , tx .StatesBefore ())
statesRemoved := am .StatesDiff (tx .StatesBefore (), target )
skip := len (statesAdded ) > 0 && len (statesRemoved ) > 0
for _ , state := range slices .Concat (statesAdded , statesRemoved ) {
if mt .allowedState (tx .MachApi .Id (), state ) {
skip = false
break
}
}
if skip {
return
}
before := tx .ClockBefore ()
for name , tick := range tx .ClockAfter () {
if tick > 1 +before [name ] && !slices .Contains (statesAdded , name ) {
statesAdded = append (statesAdded , name )
}
}
var (
txSpan trace .Span
statesDiff string
)
if !mt .opts .SkipTransitions && data .txTrace != nil {
if len (statesAdded ) > 0 {
statesDiff += "+" + utils .Jw (statesAdded , " +" )
}
if len (statesRemoved ) > 0 {
statesDiff += " -" + utils .Jw (statesRemoved , " -" )
}
txSpan = trace .SpanFromContext (data .txTrace )
txSpan .SetAttributes (
attribute .String ("states_diff" , strings .Trim (statesDiff , " " )),
attribute .Int64 ("time_after" , int64 (tx .TimeAfter .Sum (nil ))),
attribute .Bool ("accepted" , tx .IsAccepted .Load ()),
attribute .Bool ("auto" , tx .IsAuto ()),
attribute .Int ("steps_count" , len (tx .Steps )),
)
for _ , state := range statesRemoved {
if !mt .allowedState (tx .MachApi .Id (), state ) {
continue
}
if ctx , ok := data .stateInstances [state ]; ok {
txSpan .AddLink (trace .Link {
SpanContext : trace .SpanFromContext (ctx ).SpanContext (),
Attributes : nil ,
})
}
}
}
for _ , state := range statesRemoved {
if !mt .allowedState (tx .MachApi .Id (), state ) {
continue
}
if ctx , ok := data .stateInstances [state ]; ok {
trace .SpanFromContext (ctx ).End ()
delete (data .stateInstances , state )
}
}
for _ , state := range statesAdded {
if data .Ended {
mt .Logf ("[otel] TransitionEnd: machine %s already ended" , tx .Machine .Id ())
break
}
if !mt .allowedState (tx .MachApi .Id (), state ) {
continue
}
labelState := strconv .Itoa (data .Index ) + ":" + state
if schema [state ].Multi {
labelState += " [M]"
}
nameCtx , ok := data .stateNames [state ]
if !ok {
ctx , span := mt .Tracer .Start (data .stateGroup , labelState )
nameCtx = ctx
data .stateNames [state ] = nameCtx
span .End ()
}
_, ok = data .stateInstances [state ]
var ctx context .Context
var instanceSpan trace .Span
if ok {
instanceSpan = trace .SpanFromContext (data .stateInstances [state ])
instanceSpan .AddEvent (tx .Mutation .StringFromIndex (index ))
} else {
ctx , instanceSpan = mt .Tracer .Start (nameCtx , labelState ,
trace .WithAttributes (
attribute .String ("tx_id" , tx .Id ),
))
data .stateInstances [state ] = ctx
instanceSpan .AddEvent (tx .Mutation .StringFromIndex (index ))
}
if txSpan != nil {
txSpan .AddLink (trace .Link {
SpanContext : instanceSpan .SpanContext (),
Attributes : nil ,
})
}
}
}
func (mt *OtelMachTracer ) TransitionEnd (tx *am .Transition ) {
if mt .ended {
mt .Logf ("[otel] TransitionEnd: tracer already ended, ignoring %s" ,
tx .Machine .Id ())
return
}
called := tx .CalledStates ()
isHealth := slices .Contains (called , am .StateHealthcheck ) ||
slices .Contains (called , am .StateHeartbeat )
if !mt .opts .IncludeHealth && isHealth {
return
}
data := mt .getMachineData (tx .Machine .Id (), false )
if data .Ended {
mt .Logf ("[otel] TransitionEnd: machine %s already ended" , tx .Machine .Id ())
return
}
data .mx .Lock ()
defer data .mx .Unlock ()
if !mt .opts .SkipTransitions && data .txTrace != nil {
trace .SpanFromContext (data .txTrace ).End ()
data .txTrace = nil
}
}
func (mt *OtelMachTracer ) HandlerEnd (
tx *am .Transition , emitter string , handler string ,
) {
if mt .ended {
return
}
if mt .opts .SkipTransitions {
return
}
data := mt .getMachineData (tx .Machine .Id (), false )
if data .Ended {
return
}
trace .SpanFromContext (data .txTrace ).AddEvent (handler , trace .WithAttributes (
attribute .String ("emitter" , emitter ),
))
}
func (mt *OtelMachTracer ) QueueEnd (mach am .Api ) {}
func (mt *OtelMachTracer ) allowedState (machID string , state string ) bool {
for _ , allowed := range mt .opts .SkipStates {
machState := strings .Split (allowed , ":" )
if len (machState ) == 2 && machState [0 ] == machID && machState [1 ] == state {
return false
} else if len (machState ) == 1 && machState [0 ] == state {
return false
}
}
if mt .opts .SkipStatesRe != nil {
return !mt .opts .SkipStatesRe .MatchString (machID + ":" + state )
}
if len (mt .opts .AllowStates ) == 0 && mt .opts .AllowStatesRe == nil {
return true
}
for _ , allowed := range mt .opts .AllowStates {
machState := strings .Split (allowed , ":" )
if len (machState ) == 2 && machState [0 ] == machID && machState [1 ] == state {
return true
} else if len (machState ) == 1 && machState [0 ] == state {
return true
}
}
if mt .opts .AllowStatesRe != nil {
return mt .opts .AllowStatesRe .MatchString (machID + ":" + state )
}
return false
}
func (mt *OtelMachTracer ) dispose () {
mt .MachinesMx .Lock ()
defer mt .MachinesMx .Unlock ()
mt .Logf ("[otel] End" )
mt .ended = true
slices .Reverse (mt .MachinesOrder )
for _ , id := range mt .MachinesOrder {
mt .doDispose (id )
}
}
func NewOtelLoggerProvider (exporter ologsdk .Exporter ) *ologsdk .LoggerProvider {
provider := ologsdk .NewLoggerProvider (
ologsdk .WithProcessor (ologsdk .NewBatchProcessor (exporter )),
)
return provider
}
func BindOtelLogger (
mach am .Api , provider *ologsdk .LoggerProvider , service string ,
) {
l := provider .Logger (mach .Id ())
mach .SemLogger ().EnableId (false )
amlog := func (level am .LogLevel , msg string , args ...any ) {
r := olog .Record {}
r .SetTimestamp (time .Now ())
if strings .HasPrefix (msg , "[error" ) {
r .SetSeverity (olog .SeverityError )
} else {
switch level {
case am .LogExternal :
r .SetSeverity (olog .SeverityInfo4 )
r .SetSeverityText (am .LogChanges .String ())
case am .LogChanges :
r .SetSeverity (olog .SeverityInfo3 )
r .SetSeverityText (am .LogChanges .String ())
case am .LogOps :
r .SetSeverity (olog .SeverityInfo1 )
r .SetSeverityText (am .LogOps .String ())
case am .LogDecisions :
r .SetSeverity (olog .SeverityTrace4 )
r .SetSeverityText (am .LogDecisions .String ())
case am .LogEverything :
r .SetSeverity (olog .SeverityTrace1 )
r .SetSeverityText (am .LogEverything .String ())
default :
}
}
r .SetBody (olog .StringValue (fmt .Sprintf (msg , args ...)))
if service != "" {
r .AddAttributes (
olog .String ("service.name" , service ),
)
}
r .AddAttributes (
olog .String ("asyncmachine.id" , mach .Id ()),
)
l .Emit (mach .Context (), r )
}
mach .SemLogger ().SetLogger (amlog )
}
func OtelTraceIdFromEnv () (*trace .TraceID , *trace .SpanID , error ) {
tId , err := trace .TraceIDFromHex (os .Getenv (EnvOtelTraceId ))
if err != nil {
return nil , nil , err
}
sId , err := trace .SpanIDFromHex (os .Getenv (EnvOtelSpanId ))
if err != nil {
return nil , nil , err
}
return &tId , &sId , nil
}
var (
tracer trace .Tracer
provider *sdktrace .TracerProvider
)
func MachBindOtelEnv (mach am .Api ) error {
if mach .ParentId () != "" {
return nil
}
service := os .Getenv (EnvService )
if os .Getenv (EnvOtelTrace ) == "" || service == "" {
return nil
}
ctx := mach .Context ()
var err error
if tracer == nil || provider == nil {
tracer , provider , err = NewOtelProvider (service , ctx )
if err != nil {
return err
}
}
var rootSpan trace .Span
if _ , _ , err := OtelTraceIdFromEnv (); err != nil {
_, rootSpan = tracer .Start (mach .Context (), "root" )
traceId := rootSpan .SpanContext ().TraceID ().String ()
spanId := rootSpan .SpanContext ().SpanID ().String ()
_ = os .Setenv (EnvOtelTraceId , traceId )
_ = os .Setenv (EnvOtelSpanId , spanId )
}
opts := &OtelMachTracerOpts {
SkipTransitions : os .Getenv (EnvOtelTraceTxs ) == "" ,
SkipLogArgs : os .Getenv (EnvOtelTraceArgs ) == "" ,
SkipAuto : os .Getenv (EnvOtelTraceNoauto ) != "" ,
}
if list := os .Getenv (EnvOtelTraceAllowStates ); list != "" {
opts .AllowStates = strings .Split (list , "," )
}
if list := os .Getenv (EnvOtelTraceSkipStates ); list != "" {
opts .SkipStates = strings .Split (list , "," )
}
if re := os .Getenv (EnvOtelTraceAllowStatesRe ); re != "" {
if opts .AllowStatesRe , err = regexp .Compile (re ); err != nil {
return err
}
}
if re := os .Getenv (EnvOtelTraceSkipStatesRe ); re != "" {
if opts .SkipStatesRe , err = regexp .Compile (re ); err != nil {
return err
}
}
mt := NewOtelMachTracer (mach , rootSpan , tracer , opts )
var dispose am .HandlerDispose = func (id string , ctx context .Context ) {
tracerCooldown := 100 * time .Millisecond
mt .dispose ()
time .Sleep (tracerCooldown )
err = provider .ForceFlush (ctx )
if err != nil {
log .Printf ("Error flushing tracer: %v" , err )
}
time .Sleep (tracerCooldown )
}
register := ssam .DisposedStates .RegisterDisposal
if mach .Has1 (register ) {
mach .Add1 (register , am .A {
ssam .DisposedArgHandler : dispose ,
})
} else {
mach .OnDispose (dispose )
}
err = mach .BindTracer (mt )
if err != nil {
return err
}
mt .MachineInit (mach )
return nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .