// Package telemetry provides telemetry exporters: am-dbg, Prometheus, // OpenTelemetry.
package telemetry // TODO OTEL support groups and allow lists. When groups are on, they replace // the flat state list. Transitions are filtered to a group / allowlist. // - AM_OTEL_GROUPS=1 // - AM_OTEL_GROUPS_ALLOWLIST=1 // - AM_OTEL_STATES_ALLOWLIST=1 import ( olog ologsdk sdktrace semconv am ssam ) // TODO config const maxHist = 300 type OtelMachineData struct { ID string // Index is a unique number for this machine withing the Otel tracer. Index int Ended bool mx sync.Mutex machTrace context.Context txTrace context.Context // per-state group traces stateNames map[string]context.Context // per-state traces stateInstances map[string]context.Context // group trace for all state name groups stateGroup context.Context // group trace for all the transitions txGroup context.Context // history of recent transitions txHist []OtelTxHist } type OtelTxHist struct { Id string Ctx context.Context } type OtelMachTracerOpts struct { // if true, only state changes will be traced SkipTransitions bool // if true, only Healthcheck and Heartbeat will be skipped IncludeHealth bool // if true, transition traces won't include [am.Machine.GetLogArgs] SkipLogArgs bool // if true, auto transitions won't be traced SkipAuto bool // TODO AllowStates am.S // TODO SkipStates am.S // TODO skipping empty and canceled txs requires a custom Processor to // discard an open span // SkipCanceled bool // SkipEmpty bool Logf func(format string, args ...any) } // OtelMachTracer implements machine.Tracer for OpenTelemetry. Supports tracing // of multiple state machines, resulting in a single trace. This tracer is // automatically bound to new sub-machines. type OtelMachTracer struct { *am.TracerNoOp // TODO rewrite with better locking Tracer trace.Tracer Machines map[string]*OtelMachineData MachinesMx sync.Mutex MachinesOrder []string RootSpan trace.Span // TODO bind to env var Logf func(format string, args ...any) // map of parent Span for each submachine parentSpans map[string]trace.Span // child-parent map, used for parentSpans parents map[string]string parentsMx sync.Mutex parentSpansMx sync.Mutex opts *OtelMachTracerOpts ended bool NextIndex int } var _ am.Tracer = (*OtelMachTracer)(nil) // NewOtelMachTracer creates a new machine tracer from an OpenTelemetry tracer. // Requires OtelMachTracer.Dispose to be called at the end. func ( am.Api, trace.Span, trace.Tracer, *OtelMachTracerOpts, ) *OtelMachTracer { if == nil { panic("nil tracer") } if == nil { = &OtelMachTracerOpts{} } := &OtelMachTracer{ Tracer: , Machines: make(map[string]*OtelMachineData), RootSpan: , opts: , parentSpans: make(map[string]trace.Span), parents: make(map[string]string), } .RootSpan.End() if .Logf != nil { .Logf = .Logf } else if am.EnvLogLevel("") >= am.LogDecisions { .Logf = .Log } else { .Logf = func( string, ...any) {} } return } func ( *OtelMachTracer) ( string, bool, ) *OtelMachineData { if ! { .MachinesMx.Lock() defer .MachinesMx.Unlock() } if , := .Machines[]; { return } := &OtelMachineData{ stateInstances: make(map[string]context.Context), stateNames: make(map[string]context.Context), } if !.ended { .Logf("[otel] getMachineData: creating for %s", ) .Machines[] = .MachinesOrder = append(.MachinesOrder, ) } return } func ( *OtelMachTracer) ( am.Api) context.Context { := .Id() .parentsMx.Lock() := .NextIndex .NextIndex++ := "mach:" + strconv.Itoa() + ":" + .Log("[bind] otel traces") .Logf("[otel] MachineInit: trace %s", ) // nest under parent := .Context() , := .parents[] if { if , := .parentSpans[]; { = trace.ContextWithSpan(, ) } } else { = trace.ContextWithSpan(, .RootSpan) } .parentsMx.Unlock() // create a machine trace , := .Tracer.Start(, , trace.WithAttributes( attribute.String("id", ), )) // create a machine trace := .getMachineData(, false) .machTrace = .ID = .Index = .End() // create a group span for states , := .Tracer.Start(, strconv.Itoa()+":states", trace.WithAttributes(attribute.String("mach_id", ))) .stateGroup = // groups are only for nesting, so end it right away .End() if !.opts.SkipTransitions { // create a group span for transitions , := .Tracer.Start(, strconv.Itoa()+":transitions", trace.WithAttributes(attribute.String("mach_id", ))) .txGroup = // groups are only for nesting, so end it right away .End() } return } // NewSubmachine links 2 machines with a parent-child relationship. func ( *OtelMachTracer) (, am.Api) { // TODO race on mt.parents if .ended { .Logf("[otel] NewSubmachine: tracer already ended, ignoring %s", .Id()) return } // skip RPC machines := os.Getenv("AM_RPC_DBG") != "" for , := range .Tags() { if strings.HasPrefix(, "rpc-") && ! { return } } := .BindTracer() if != nil { .Logf("[otel] NewSubmachine: err binding tracer", .Id()) return } .parentsMx.Lock() , := .parents[.Id()] if { .parentsMx.Unlock() .Logf("Submachine already being traced (duplicate ID %s)", .Id()) return } .parents[.Id()] = .Id() .parentsMx.Unlock() := .getMachineData(.Id(), false) .mx.Lock() defer .mx.Unlock() if .Ended { .Logf("[otel] NewSubmachine: parent %s already ended", .Id()) return } .parentSpansMx.Lock() if , := .parentSpans[.Id()]; ! { // create a group span for submachines , := .Tracer.Start( .machTrace, strconv.Itoa(.Index)+":submachines", trace.WithAttributes(attribute.String("mach_id", .Id()))) .parentSpans[.Id()] = // groups are only for nesting, so end it right away .End() } .parentSpansMx.Unlock() } func ( *OtelMachTracer) ( string) { .MachinesMx.Lock() defer .MachinesMx.Unlock() .doDispose() } func ( *OtelMachTracer) ( string) { , := .Machines[] if ! { .Logf("[otel] MachineDispose: machine %s not found", ) return } .Logf("[otel] MachineDispose: disposing %s", ) .mx.Lock() defer .mx.Unlock() .parentSpansMx.Lock() defer .parentSpansMx.Unlock() delete(.parentSpans, ) delete(.Machines, ) .MachinesOrder = utils.SlicesWithout(.MachinesOrder, ) .Ended = true // transitions if .txTrace != nil { trace.SpanFromContext(.txTrace).End() } // states for , := range .stateInstances { trace.SpanFromContext().End() } for , := range .stateNames { trace.SpanFromContext().End() } // groups trace.SpanFromContext(.stateGroup).End() trace.SpanFromContext(.txGroup).End() trace.SpanFromContext(.machTrace).End() } func ( *OtelMachTracer) ( *am.Transition) { // TODO deadlock .MachinesMx.Lock() defer .MachinesMx.Unlock() if .ended { .Logf("[otel] TransitionInit: tracer already ended, ignoring %s", .Machine.Id()) return } // skip health txs := .CalledStates() := slices.Contains(, "Healthcheck") || slices.Contains(, "Heartbeat") if !.opts.IncludeHealth && { return } := .getMachineData(.Machine.Id(), true) if .Ended { .Logf("[otel] TransitionInit: machine %s already ended", .Machine.Id()) return } // if skipping transitions, only create machine data for the states trace if .opts.SkipTransitions { return } if .opts.SkipAuto && .IsAuto() { return } // source event := .Mutation.Source var trace.Span if != nil && .TxId != "" && .MachId != "" { // TODO get span from that tx (GC via LRU) and link to it if , := .Machines[.MachId]; { // TODO optimize with an index for , := range .txHist { if .Id == .TxId { = trace.SpanFromContext(.Ctx) break } } } } // label := fmt.Sprintf("%d: %s", .Index, .Mutation.StringFromIndex(.Machine.StateNames())) := // exception support var error if slices.Contains(.TargetStates(), am.StateException) { = "!" + = am.ParseArgs(.Args()).Err } // build a regular trace , := .Tracer.Start(.txGroup, , trace.WithAttributes( attribute.String("tx_id", .Id), attribute.Int64("time_before", int64(.TimeBefore.Sum(nil))), attribute.String("mutation", ), )) // decorate Exception trace if != nil { .SetAttributes( attribute.String("error", .Error()), ) } // trace logged args, if any and enabled := .Machine.SemLogger().ArgsMapper() if !.opts.SkipLogArgs && != nil { for , := range (.Args()) { .SetAttributes( attribute.String("args."+, ), ) } } if != nil { .AddLink(trace.Link{ SpanContext: .SpanContext(), Attributes: nil, }) } // expose .mx.Lock() defer .mx.Unlock() .txTrace = .txHist = append(.txHist, OtelTxHist{ Id: .Id, Ctx: , }) if len(.txHist) > maxHist { .txHist = .txHist[1:] } } func ( *OtelMachTracer) ( *am.Transition) { if .ended { .Logf("[otel] TransitionEnd: tracer already ended, ignoring %s", .Machine.Id()) return } // skip health txs := .TargetStates() := .CalledStates() := slices.Contains(, "Healthcheck") || slices.Contains(, "Heartbeat") if !.opts.IncludeHealth && { return } := .getMachineData(.Machine.Id(), false) if .Ended { .Logf("[otel] TransitionEnd: machine %s already ended", .Machine.Id()) return } .mx.Lock() defer .mx.Unlock() // parse states collected from resolving relations := am.StatesDiff(, .StatesBefore()) := am.StatesDiff(.StatesBefore(), ) // support multi states := .ClockBefore() for , := range .ClockAfter() { if > 1+[] && !slices.Contains(, ) { = append(, ) } } var ( // handle transition trace.Span string ) if !.opts.SkipTransitions && .txTrace != nil { if len() > 0 { += "+" + utils.Jw(, " +") } if len() > 0 { += " -" + utils.Jw(, " -") } = trace.SpanFromContext(.txTrace) .SetAttributes( attribute.String("states_diff", strings.Trim(, " ")), attribute.Int64("time_after", int64(.TimeAfter.Sum(nil))), attribute.Bool("accepted", .IsAccepted.Load()), attribute.Bool("auto", .IsAuto()), attribute.Int("steps_count", len(.Steps)), ) // link to old states for , := range { if , := .stateInstances[]; { .AddLink(trace.Link{ SpanContext: trace.SpanFromContext().SpanContext(), Attributes: nil, }) } } defer .End() .txTrace = nil } // handle state changes if !.IsAccepted.Load() { return } // remove old states for , := range { if , := .stateInstances[]; { trace.SpanFromContext().End() delete(.stateInstances, ) } } // add a new state trace with a group if needed for , := range { if .Ended { .Logf("[otel] TransitionEnd: machine %s already ended", .Machine.Id()) break } // name group , := .stateNames[] if ! { // create a new state name group trace, but end it right away , := .Tracer.Start(.stateGroup, strconv.Itoa(.Index)+":"+) = .stateNames[] = .End() } // multi state - add as an event _, = .stateInstances[] var context.Context var trace.Span if { = trace.SpanFromContext(.stateInstances[]) .AddEvent(.Mutation.String()) } else { , = .Tracer.Start(, strconv.Itoa(.Index)+":"+, trace.WithAttributes( attribute.String("tx_id", .Id), )) .stateInstances[] = .AddEvent(.Mutation.String()) } // link with the source tx if != nil { .AddLink(trace.Link{ SpanContext: .SpanContext(), Attributes: nil, }) } } } func ( *OtelMachTracer) ( *am.Transition, string, string, ) { if .ended { return } if .opts.SkipTransitions { return } := .getMachineData(.Machine.Id(), false) if .Ended { return } // add an event to the tx trace trace.SpanFromContext(.txTrace).AddEvent(, trace.WithAttributes( attribute.String("emitter", ), )) } func ( *OtelMachTracer) () { .MachinesMx.Lock() defer .MachinesMx.Unlock() .Logf("[otel] End") .ended = true // end traces in reverse order slices.Reverse(.MachinesOrder) for , := range .MachinesOrder { .doDispose() } // TODO remove? .RootSpan.End() .Machines = nil } func ( *OtelMachTracer) ( am.Api) {} // NewOtelLoggerProvider creates a new OpenTelemetry logger provider bound to // the given exporter. func ( ologsdk.Exporter) *ologsdk.LoggerProvider { // TODO mem limiter? := ologsdk.NewLoggerProvider( ologsdk.WithProcessor(ologsdk.NewBatchProcessor()), ) return } // BindOtelLogger binds an OpenTelemetry logger to a machine. func ( am.Api, *ologsdk.LoggerProvider, string, ) { := .Logger(.Id()) .SemLogger().EnableId(false) := func( am.LogLevel, string, ...any) { := olog.Record{} .SetTimestamp(time.Now()) if strings.HasPrefix(, "[error") { .SetSeverity(olog.SeverityError) } else { switch { case am.LogExternal: .SetSeverity(olog.SeverityInfo4) .SetSeverityText(am.LogChanges.String()) case am.LogChanges: .SetSeverity(olog.SeverityInfo3) .SetSeverityText(am.LogChanges.String()) case am.LogOps: .SetSeverity(olog.SeverityInfo1) .SetSeverityText(am.LogOps.String()) case am.LogDecisions: .SetSeverity(olog.SeverityTrace4) .SetSeverityText(am.LogDecisions.String()) case am.LogEverything: .SetSeverity(olog.SeverityTrace1) .SetSeverityText(am.LogEverything.String()) default: } } .SetBody(olog.StringValue(fmt.Sprintf(, ...))) if != "" { .AddAttributes( olog.String("service.name", ), ) } .AddAttributes( olog.String("asyncmachine.id", .Id()), ) .Emit(.Context(), ) } .SemLogger().SetLogger() } // MachBindOtelEnv bind an OpenTelemetry tracer to [mach], based on environment // variables: // - AM_SERVICE (required) // - AM_OTEL_TRACE (required) // - AM_OTEL_TRACE_TXS // - OTEL_EXPORTER_OTLP_ENDPOINT // - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT // // This tracer is inherited by submachines, and this function applies only to // top-level machines. func ( am.Api) error { if .ParentId() != "" { return nil } := os.Getenv(EnvService) if os.Getenv(EnvOtelTrace) == "" || == "" { return nil } // init the tracer and provider := .Context() , , := NewOtelProvider(, ) if != nil { return } , := .Start(.Context(), "root") // dedicated machine tracer := NewOtelMachTracer(, , , &OtelMachTracerOpts{ SkipTransitions: os.Getenv(EnvOtelTraceTxs) == "", SkipLogArgs: os.Getenv(EnvOtelTraceArgs) == "", SkipAuto: os.Getenv(EnvOtelTraceNoauto) != "", }) // flush and close var am.HandlerDispose = func( string, context.Context) { := 100 * time.Millisecond .End() time.Sleep() // flush tracing = .ForceFlush() if != nil { log.Printf("Error flushing tracer: %v", ) } time.Sleep() // finish tracing if := .Shutdown(); != nil { log.Printf("Error shutting down tracer: %v", ) } } // dispose somehow TODO use amhelp.OnDispose := ssam.DisposedStates.RegisterDisposal if .Has1() { .Add1(, am.A{ ssam.DisposedArgHandler: , }) } // TODO RegisterDispose (cast) // else { // func() { // <-mach.WhenDisposed() // dispose(mach.Id(), nil) // }() // } // bind the Otel tracer = .BindTracer() if != nil { return } // run the root init manually .MachineInit() return nil } func ( string, context.Context, ) (trace.Tracer, *sdktrace.TracerProvider, error) { otel.SetTextMapPropagator( propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, propagation.Baggage{}, )) , := otlptrace.New(, otlptracegrpc.NewClient( otlptracegrpc.WithInsecure(), ), ) if != nil { return nil, nil, } := := sdktrace.NewTracerProvider( sdktrace.WithBatcher(, sdktrace.WithMaxExportBatchSize(50), sdktrace.WithBatchTimeout(100*time.Millisecond), ), sdktrace.WithResource(resource.NewWithAttributes( semconv.SchemaURL, semconv.ServiceNameKey.String(), )), ) otel.SetTracerProvider() otel.SetTextMapPropagator(propagation.TraceContext{}) // Create a named tracer with package path as its name. return otel.Tracer(), , nil }