// Package telemetry provides telemetry exporters: am-dbg, Prometheus, // OpenTelemetry.
package telemetry // TODO OTEL support nesting states in groups via AM_OTEL_GROUPS=1 import ( olog ologsdk sdktrace am ssam ) const ( // TODO config maxHist = 300 // export Otel traces for states and submachines, requires AM_SERVICE EnvOtelTrace = "AM_OTEL_TRACE" // create additional Otel traces for transitions EnvOtelTraceTxs = "AM_OTEL_TRACE_TXS" // include logged arguments as traces tags EnvOtelTraceArgs = "AM_OTEL_TRACE_ARGS" // skip traces for auto transitions EnvOtelTraceNoauto = "AM_OTEL_TRACE_NOAUTO" EnvOtelTraceAllowStates = "AM_OTEL_TRACE_ALLOW_STATES" EnvOtelTraceAllowStatesRe = "AM_OTEL_TRACE_ALLOW_STATES_RE" EnvOtelTraceSkipStates = "AM_OTEL_TRACE_SKIP_STATES" EnvOtelTraceSkipStatesRe = "AM_OTEL_TRACE_SKIP_STATES_RE" EnvOtelExporterOtlpTracesEndpoint = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" EnvOtelTraceId = "AM_OTEL_TRACE_ID" EnvOtelSpanId = "AM_OTEL_SPAN_ID" ) type OtelMachineData struct { ID string // Index is a unique number for this machine withing the Otel tracer. Index int Ended bool mx sync.Mutex machTrace context.Context txTrace context.Context // per-state group traces stateNames map[string]context.Context // per-state traces stateInstances map[string]context.Context // group trace for all state name groups stateGroup context.Context // group trace for all the transitions txGroup context.Context // history of recent transitions txHist []OtelTxHist } type OtelTxHist struct { Id string Ctx context.Context } type OtelMachTracerOpts struct { // if true, only state changes will be traced SkipTransitions bool // if true, only Healthcheck and Heartbeat will be skipped IncludeHealth bool // if true, transition traces won't include [am.Machine.GetLogArgs] SkipLogArgs bool // if true, auto transitions won't be traced SkipAuto bool // eg []string{"machId:StateName", "StateName2"} AllowStates []string // against "machId:StateName" AllowStatesRe *regexp.Regexp // SkipStates take preceding before [AllowStates] // eg []string{"machId:StateName", "StateName2"} SkipStates []string // against "machId:StateName" SkipStatesRe *regexp.Regexp // TODO skipping empty and canceled txs requires a custom Processor to // discard an open span // SkipCanceled bool // SkipEmpty bool // TODO option to disable auto env-root-span Logf func(format string, args ...any) } // OtelMachTracer implements machine.Tracer for OpenTelemetry. Supports tracing // of multiple state machines, resulting in a single trace. This tracer is // automatically bound to new sub-machines. type OtelMachTracer struct { *am.TracerNoOp // TODO rewrite with better locking Tracer trace.Tracer Machines map[string]*OtelMachineData MachinesMx sync.Mutex MachinesOrder []string RootSpan trace.Span NextIndex int // TODO bind to env var Logf func(format string, args ...any) // map of parent Span for each submachine parentSpans map[string]trace.Span // child-parent map, used for parentSpans parents map[string]string parentsMx sync.Mutex parentSpansMx sync.Mutex opts *OtelMachTracerOpts ended bool } var _ am.Tracer = (*OtelMachTracer)(nil) // NewOtelMachTracer creates a new machine tracer from an OpenTelemetry tracer. // Requires OtelMachTracer.Dispose to be called at the end. func ( am.Api, trace.Span, trace.Tracer, *OtelMachTracerOpts, ) *OtelMachTracer { // if == nil { panic("nil tracer") } if == nil { = &OtelMachTracerOpts{} } := &OtelMachTracer{ Tracer: , Machines: make(map[string]*OtelMachineData), RootSpan: , opts: , parentSpans: make(map[string]trace.Span), parents: make(map[string]string), } if .RootSpan != nil { .RootSpan.End() } if .Logf != nil { .Logf = .Logf } else if am.EnvLogLevel("") >= am.LogDecisions { .Logf = .Log } else { .Logf = func( string, ...any) {} } return } func ( *OtelMachTracer) ( string, bool, ) *OtelMachineData { if ! { .MachinesMx.Lock() defer .MachinesMx.Unlock() } if , := .Machines[]; { return } := &OtelMachineData{ stateInstances: make(map[string]context.Context), stateNames: make(map[string]context.Context), } if !.ended { .Logf("[otel] getMachineData: creating for %s", ) .Machines[] = .MachinesOrder = append(.MachinesOrder, ) } return } func ( *OtelMachTracer) ( am.Api) context.Context { := .Id() .parentsMx.Lock() := .NextIndex .NextIndex++ := "mach:" + strconv.Itoa() + ":" + .Log("[otel] bind traces") .Logf("[otel] MachineInit: trace %s", ) // nest under parent := .Context() , := .parents[] if { if , := .parentSpans[]; { = trace.ContextWithSpan(, ) } } else if , , := OtelTraceIdFromEnv(); == nil { := trace.NewSpanContext(trace.SpanContextConfig{ TraceID: *, SpanID: *, TraceFlags: trace.FlagsSampled, Remote: true, }) = trace.ContextWithRemoteSpanContext(.Context(), ) } else if .RootSpan != nil { = trace.ContextWithSpan(, .RootSpan) } else { .Log("[otel] root span missing") } .parentsMx.Unlock() // create a machine trace , := .Tracer.Start(, , trace.WithAttributes( attribute.String("id", ), )) // create a machine trace := .getMachineData(, false) .machTrace = .ID = .Index = .End() // create a group span for states , := .Tracer.Start(, strconv.Itoa()+":states", trace.WithAttributes(attribute.String("mach_id", ))) .stateGroup = // groups are only for nesting, so end it right away .End() if !.opts.SkipTransitions { // create a group span for transitions , := .Tracer.Start(, strconv.Itoa()+":transitions", trace.WithAttributes(attribute.String("mach_id", ))) .txGroup = // groups are only for nesting, so end it right away .End() } return } // NewSubmachine links 2 machines with a parent-child relationship. func ( *OtelMachTracer) (, am.Api) { // TODO race on mt.parents if .ended { .Logf("[otel] NewSubmachine: tracer already ended, ignoring %s", .Id()) return } // skip RPC machines := os.Getenv("AM_RPC_DBG") != "" for , := range .Tags() { if (strings.HasPrefix(, "rpc-") || == "relay") && ! { return } } // check skipped states regexp if .opts.SkipStatesRe != nil && .opts.SkipStatesRe.MatchString(.Id()) { return } := .BindTracer() if != nil { .Logf("[otel] NewSubmachine: err binding tracer", .Id()) return } .parentsMx.Lock() , := .parents[.Id()] if { .parentsMx.Unlock() .Logf("Submachine already being traced (duplicate ID %s)", .Id()) return } .parents[.Id()] = .Id() .parentsMx.Unlock() := .getMachineData(.Id(), false) .mx.Lock() defer .mx.Unlock() if .Ended { .Logf("[otel] NewSubmachine: parent %s already ended", .Id()) return } .parentSpansMx.Lock() if , := .parentSpans[.Id()]; ! { // create a group span for submachines , := .Tracer.Start( .machTrace, strconv.Itoa(.Index)+":submachines", trace.WithAttributes(attribute.String("mach_id", .Id()))) .parentSpans[.Id()] = // groups are only for nesting, so end it right away .End() } .parentSpansMx.Unlock() } func ( *OtelMachTracer) ( string) { .MachinesMx.Lock() defer .MachinesMx.Unlock() .doDispose() } func ( *OtelMachTracer) ( string) { , := .Machines[] if ! { .Logf("[otel] MachineDispose: machine %s not found", ) return } .Logf("[otel] MachineDispose: disposing %s", ) .mx.Lock() defer .mx.Unlock() .parentSpansMx.Lock() defer .parentSpansMx.Unlock() delete(.parentSpans, ) delete(.Machines, ) .MachinesOrder = utils.SlicesWithout(.MachinesOrder, ) .Ended = true // transitions if .txTrace != nil { trace.SpanFromContext(.txTrace).End() } // states for , := range .stateInstances { trace.SpanFromContext().End() } for , := range .stateNames { trace.SpanFromContext().End() } // groups trace.SpanFromContext(.stateGroup).End() trace.SpanFromContext(.txGroup).End() trace.SpanFromContext(.machTrace).End() } func ( *OtelMachTracer) ( *am.Transition) { // TODO deadlock .MachinesMx.Lock() defer .MachinesMx.Unlock() if .ended { .Logf("[otel] TransitionInit: tracer already ended, ignoring %s", .Machine.Id()) return } // skip health txs := .CalledStates() := slices.Contains(, am.StateHealthcheck) || slices.Contains(, am.StateHeartbeat) if !.opts.IncludeHealth && { return } := .getMachineData(.Machine.Id(), true) if .Ended { .Logf("[otel] TransitionInit: machine %s already ended", .Machine.Id()) return } // if skipping transitions, only create machine data for the states trace if .opts.SkipTransitions { return } if .opts.SkipAuto && .IsAuto() { return } // source event := .Mutation.Source var trace.Span if != nil && .TxId != "" && .MachId != "" { // TODO get span from that tx (GC via LRU) and link to it if , := .Machines[.MachId]; { // TODO optimize with an index for , := range .txHist { if .Id == .TxId { = trace.SpanFromContext(.Ctx) break } } } } // label := fmt.Sprintf("%d: %s", .Index, .Mutation.StringFromIndex(.Machine.StateNames())) := // exception support var error if slices.Contains(.TargetStates(), am.StateException) { = "!" + = am.ParseArgs(.Args()).Err } // build a transition trace , := .Tracer.Start(.txGroup, , trace.WithAttributes( attribute.String("tx_id", .Id), attribute.Int64("time_before", int64(.TimeBefore.Sum(nil))), attribute.String("mutation", ), )) // decorate Exception trace if != nil { .SetAttributes( attribute.String("error", .Error()), ) } // trace logged args, if any and enabled := .Machine.SemLogger().ArgsMapper() if !.opts.SkipLogArgs && != nil { for , := range (.Args()) { .SetAttributes( attribute.String("args."+, ), ) } } if != nil { .AddLink(trace.Link{ SpanContext: .SpanContext(), Attributes: nil, }) } // expose .mx.Lock() defer .mx.Unlock() .txTrace = .txHist = append(.txHist, OtelTxHist{ Id: .Id, Ctx: , }) if len(.txHist) > maxHist { // TODO leak for canceled txs? .txHist = .txHist[1:] } } func ( *OtelMachTracer) ( *am.Transition) { if .ended { .Logf("[otel] TransitionEnd: tracer already ended, ignoring %s", .Machine.Id()) return } // handle state changes if !.IsAccepted.Load() { return } // skip health txs := .Machine.Schema() := .TargetStates() := .CalledStates() if slices.Contains(, "BrowserConn") { print() } := slices.Contains(, am.StateHealthcheck) || slices.Contains(, am.StateHeartbeat) if !.opts.IncludeHealth && { return } := .getMachineData(.Machine.Id(), false) if .Ended { .Logf("[otel] TransitionEnd: machine %s already ended", .Machine.Id()) return } .mx.Lock() defer .mx.Unlock() // parse states collected from resolving relations := .Machine.StateNames() := am.StatesDiff(, .StatesBefore()) := am.StatesDiff(.StatesBefore(), ) // skip by default, but not multi states := len() > 0 && len() > 0 for , := range slices.Concat(, ) { if .allowedState(.MachApi.Id(), ) { = false break } } if { return } // support multi states := .ClockBefore() for , := range .ClockAfter() { if > 1+[] && !slices.Contains(, ) { = append(, ) } } var ( // handle transition trace.Span string ) if !.opts.SkipTransitions && .txTrace != nil { if len() > 0 { += "+" + utils.Jw(, " +") } if len() > 0 { += " -" + utils.Jw(, " -") } = trace.SpanFromContext(.txTrace) .SetAttributes( attribute.String("states_diff", strings.Trim(, " ")), attribute.Int64("time_after", int64(.TimeAfter.Sum(nil))), attribute.Bool("accepted", .IsAccepted.Load()), attribute.Bool("auto", .IsAuto()), attribute.Int("steps_count", len(.Steps)), ) // link to old states for , := range { if !.allowedState(.MachApi.Id(), ) { continue } if , := .stateInstances[]; { .AddLink(trace.Link{ SpanContext: trace.SpanFromContext().SpanContext(), Attributes: nil, }) } } } // remove old states for , := range { if !.allowedState(.MachApi.Id(), ) { continue } if , := .stateInstances[]; { trace.SpanFromContext().End() delete(.stateInstances, ) } } // add a new state trace with a group if needed for , := range { if .Ended { .Logf("[otel] TransitionEnd: machine %s already ended", .Machine.Id()) break } if !.allowedState(.MachApi.Id(), ) { continue } // create a new state name group trace, but end it right away := strconv.Itoa(.Index) + ":" + if [].Multi { += " [M]" } // name group , := .stateNames[] if ! { , := .Tracer.Start(.stateGroup, ) = .stateNames[] = .End() } // multi state - add as an event _, = .stateInstances[] var context.Context var trace.Span if { = trace.SpanFromContext(.stateInstances[]) .AddEvent(.Mutation.StringFromIndex()) // initial state span } else { , = .Tracer.Start(, , trace.WithAttributes( attribute.String("tx_id", .Id), )) .stateInstances[] = .AddEvent(.Mutation.StringFromIndex()) } // link with the source tx if != nil { .AddLink(trace.Link{ SpanContext: .SpanContext(), Attributes: nil, }) } } } func ( *OtelMachTracer) ( *am.Transition) { if .ended { .Logf("[otel] TransitionEnd: tracer already ended, ignoring %s", .Machine.Id()) return } // skip health txs := .CalledStates() := slices.Contains(, am.StateHealthcheck) || slices.Contains(, am.StateHeartbeat) if !.opts.IncludeHealth && { return } := .getMachineData(.Machine.Id(), false) if .Ended { .Logf("[otel] TransitionEnd: machine %s already ended", .Machine.Id()) return } .mx.Lock() defer .mx.Unlock() if !.opts.SkipTransitions && .txTrace != nil { trace.SpanFromContext(.txTrace).End() .txTrace = nil } } func ( *OtelMachTracer) ( *am.Transition, string, string, ) { if .ended { return } if .opts.SkipTransitions { return } := .getMachineData(.Machine.Id(), false) if .Ended { return } // add an event to the tx trace trace.SpanFromContext(.txTrace).AddEvent(, trace.WithAttributes( attribute.String("emitter", ), )) } func ( *OtelMachTracer) ( am.Api) {} func ( *OtelMachTracer) ( string, string) bool { // skip first, strings for , := range .opts.SkipStates { := strings.Split(, ":") // "machId:stateName" if len() == 2 && [0] == && [1] == { return false // "stateName" } else if len() == 1 && [0] == { return false } } // regexp skip if .opts.SkipStatesRe != nil { return !.opts.SkipStatesRe.MatchString( + ":" + ) } // allowlist? if len(.opts.AllowStates) == 0 && .opts.AllowStatesRe == nil { return true } // string allows for , := range .opts.AllowStates { := strings.Split(, ":") // "machId:stateName" if len() == 2 && [0] == && [1] == { return true // "stateName" } else if len() == 1 && [0] == { return true } } // regexp allow if .opts.AllowStatesRe != nil { return .opts.AllowStatesRe.MatchString( + ":" + ) } return false } func ( *OtelMachTracer) () { .MachinesMx.Lock() defer .MachinesMx.Unlock() .Logf("[otel] End") .ended = true // end traces in reverse order slices.Reverse(.MachinesOrder) for , := range .MachinesOrder { .doDispose() } } // NewOtelLoggerProvider creates a new OpenTelemetry logger provider bound to // the given exporter. func ( ologsdk.Exporter) *ologsdk.LoggerProvider { // TODO mem limiter? := ologsdk.NewLoggerProvider( ologsdk.WithProcessor(ologsdk.NewBatchProcessor()), ) return } // BindOtelLogger binds an OpenTelemetry logger to a machine. func ( am.Api, *ologsdk.LoggerProvider, string, ) { := .Logger(.Id()) .SemLogger().EnableId(false) := func( am.LogLevel, string, ...any) { := olog.Record{} .SetTimestamp(time.Now()) if strings.HasPrefix(, "[error") { .SetSeverity(olog.SeverityError) } else { switch { case am.LogExternal: .SetSeverity(olog.SeverityInfo4) .SetSeverityText(am.LogChanges.String()) case am.LogChanges: .SetSeverity(olog.SeverityInfo3) .SetSeverityText(am.LogChanges.String()) case am.LogOps: .SetSeverity(olog.SeverityInfo1) .SetSeverityText(am.LogOps.String()) case am.LogDecisions: .SetSeverity(olog.SeverityTrace4) .SetSeverityText(am.LogDecisions.String()) case am.LogEverything: .SetSeverity(olog.SeverityTrace1) .SetSeverityText(am.LogEverything.String()) default: } } .SetBody(olog.StringValue(fmt.Sprintf(, ...))) if != "" { .AddAttributes( olog.String("service.name", ), ) } .AddAttributes( olog.String("asyncmachine.id", .Id()), ) .Emit(.Context(), ) } .SemLogger().SetLogger() } func () (*trace.TraceID, *trace.SpanID, error) { , := trace.TraceIDFromHex(os.Getenv(EnvOtelTraceId)) if != nil { return nil, nil, } , := trace.SpanIDFromHex(os.Getenv(EnvOtelSpanId)) if != nil { return nil, nil, } return &, &, nil } // globals var ( tracer trace.Tracer provider *sdktrace.TracerProvider ) // MachBindOtelEnv bind an OpenTelemetry tracer to [mach], based on environment // variables: // - AM_SERVICE (required) // - AM_OTEL_TRACE (required) // - AM_OTEL_TRACE_TXS // - OTEL_EXPORTER_OTLP_ENDPOINT // - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT // // This tracer is inherited by submachines, and this function applies only to // top-level machines. func ( am.Api) error { // TODO return string binding ID if .ParentId() != "" { return nil } := os.Getenv(EnvService) if os.Getenv(EnvOtelTrace) == "" || == "" { return nil } // init the tracer and provider := .Context() var error if tracer == nil || provider == nil { tracer, provider, = NewOtelProvider(, ) if != nil { return } } var trace.Span // handle env-passed root span if , , := OtelTraceIdFromEnv(); != nil { _, = tracer.Start(.Context(), "root") := .SpanContext().TraceID().String() := .SpanContext().SpanID().String() _ = os.Setenv(EnvOtelTraceId, ) _ = os.Setenv(EnvOtelSpanId, ) } // dedicated machine tracer := &OtelMachTracerOpts{ SkipTransitions: os.Getenv(EnvOtelTraceTxs) == "", SkipLogArgs: os.Getenv(EnvOtelTraceArgs) == "", SkipAuto: os.Getenv(EnvOtelTraceNoauto) != "", } if := os.Getenv(EnvOtelTraceAllowStates); != "" { .AllowStates = strings.Split(, ",") } if := os.Getenv(EnvOtelTraceSkipStates); != "" { .SkipStates = strings.Split(, ",") } if := os.Getenv(EnvOtelTraceAllowStatesRe); != "" { if .AllowStatesRe, = regexp.Compile(); != nil { return } } if := os.Getenv(EnvOtelTraceSkipStatesRe); != "" { if .SkipStatesRe, = regexp.Compile(); != nil { return } } := NewOtelMachTracer(, , tracer, ) // flush and close var am.HandlerDispose = func( string, context.Context) { := 100 * time.Millisecond .dispose() time.Sleep() // flush tracing = provider.ForceFlush() if != nil { log.Printf("Error flushing tracer: %v", ) } time.Sleep() // // finish tracing // if err := p.Shutdown(ctx); err != nil { // log.Printf("Error shutting down tracer: %v", err) // } } // dispose somehow := ssam.DisposedStates.RegisterDisposal if .Has1() { .Add1(, am.A{ ssam.DisposedArgHandler: , }) } else { .OnDispose() } // bind the Otel tracer = .BindTracer() if != nil { return } // run the root init manually .MachineInit() return nil }