// Package telemetry provides telemetry exporters for asyncmachine: am-dbg, // Prometheus, and OpenTelemetry.
package telemetry import ( am ) // ///// ///// ///// // ///// AM-DBG // ///// ///// ///// const ( // DbgAddr is the default address of the am-dbg server. DbgAddr = "localhost:6831" // EnvAmDbgAddr is the address of a running am-dbg instance. // "1" expands to "localhost:6831" EnvAmDbgAddr = "AM_DBG_ADDR" ) // DbgMsg is the interface for the messages to be sent to the am-dbg server. type DbgMsg interface { // Clock returns the state's clock, using the passed index Clock(statesIndex am.S, state string) uint64 // Is returns true if the state is active, using the passed index Is(statesIndex am.S, states am.S) bool } // ///// STRUCT // DbgMsgStruct contains the state and relations data. type DbgMsgStruct struct { // TODO refac: DbgMsgSchema // TODO add schema ver // Machine ID ID string // state names defining the indexes for diffs StatesIndex am.S // all the states with relations // TODO refac: Schema States am.Schema // list of group names and state indexes Groups map[string][]int // order of groups GroupsOrder []string // parent machine ID Parent string // machine tags Tags []string // TODO include the current mach time // MTime am.Time } func ( *DbgMsgStruct) ( am.S, string) uint64 { return 0 } func ( *DbgMsgStruct) ( am.S, am.S) bool { return false } // ///// QUEUE // type DbgMsgQueue struct { // Mutation *am.Mutation // } // ///// TRANSITION // DbgMsgTx contains transition data. type DbgMsgTx struct { MachineID string // Transition ID // TODO refac: Id ID string // Clocks is represents the machine time [am.Time] from after the current // transition. // TODO refac to TimeAfter, re-gen all the assets Clocks am.Time // QueueTick is the current queue tick in the machine. // transition. QueueTick uint64 // TODO QueueDebug with all string entries for comparison // MutQueueToken is the token of a prepended mutation, can be scheduled or // executed, depending on IsQueued. MutQueueToken uint64 // MutQueueTick is the assigned queue tick when the tx will be executed. // Only for IsQueued. MutQueueTick uint64 // mutation type Type am.MutationType // called states // TODO remove. Deprecated use CalledStateNames(index) CalledStates []string // TODO rename to CalledStates, re-gen all assets CalledStatesIdxs []int // all the transition steps Steps []*am.Step // log entries created during the transition LogEntries []*am.LogEntry // log entries before the transition, which happened after the prev one PreLogEntries []*am.LogEntry // queue length at the start of the transition // TODO rename to QueueLen // TODO change to int32 Queue int // Time is human time. Don't send this over the wire. // TODO remove or skip in msgpack // TODO rename to HTime Time *time.Time // transition was triggered by an auto state IsAuto bool // result of the transition // TODO rename to IsAccepted Accepted bool // is this a check (Can*) tx or mutation? IsCheck bool // is this a queued mutation? IsQueued bool Args map[string]string QueueDump []string // TODO add Mutation.Source, only if semLogger.IsGraph() == true // Source *am.MutSource // TODO include missed mutations (?) because of the queue limit // since the previous tx // TODO add Transition.PipesAdded // TODO add Transition.PipesRemoved // TODO add Transition.CtxAdded // TODO add Transition.CtxRemoved // TODO add time taken via tracer } func ( *DbgMsgTx) ( am.S, string) uint64 { := slices.Index(, ) if len(.Clocks) <= { return 0 } return .Clocks[] } func ( *DbgMsgTx) ( am.S, am.S) bool { for , := range { := .Index(, ) // new schema issue if == -1 { return false } if len(.Clocks) <= || !am.IsActiveTick(.Clocks[]) { return false } } return true } func ( *DbgMsgTx) ( am.S, string) int { := slices.Index(, ) //nolint:typecheck return } func ( *DbgMsgTx) ( am.S) am.S { := am.S{} for , := range { := slices.Index(, ) if len(.Clocks) <= { continue } if am.IsActiveTick(.Clocks[]) { = append(, ) } } return } func ( *DbgMsgTx) ( am.S) am.S { // old compat if .CalledStates != nil { return .CalledStates } := make(am.S, len(.CalledStatesIdxs)) for , := range .CalledStatesIdxs { [] = [] } return } // TODO unify with Tx String func ( *DbgMsgTx) ( am.S) string { := "tx#" + .ID + "\n[" + .Type.String() + "] " + utils.J(.CalledStateNames()) + "\n" // TODO add source from mutation for , := range .Steps { += "- " + .StringFromIndex() + "\n" } return } // TODO unify with Mut String func ( *DbgMsgTx) ( am.S) string { := "+" if .Type == am.MutationRemove { = "-" } += utils.J(.CalledStateNames()) return } func ( *DbgMsgTx) ( am.S, string) bool { return .Is(, am.S{}) } // TODO Sum() and TimeSum(idxs []int) func ( *DbgMsgTx) () uint64 { := uint64(0) for , := range .Clocks { += } return } // ///// CLIENT type dbgClient struct { addr string rpc *rpc.Client } func newDbgClient( string) (*dbgClient, error) { // log.Printf("Connecting to %s", url) , := rpc.Dial("tcp4", ) if != nil { return nil, } return &dbgClient{addr: , rpc: }, nil } func ( *dbgClient) ( *DbgMsgTx) error { var string // DEBUG // fmt.Printf("sendMsgTx %v\n", msg.CalledStates) // TODO const name := .rpc.Call("RPCServer.DbgMsgTx", , &) if != nil { return } return nil } func ( *dbgClient) ( *DbgMsgStruct) error { if == nil { return nil } var string // TODO use Go() to not block // TODO const name := .rpc.Call("RPCServer.DbgMsgSchema", , &) if != nil { return } return nil } // ///// TRACER type DbgTracer struct { *am.NoOpTracer Addr string Mach am.Api outbox chan func() c *dbgClient errCount atomic.Int32 exited atomic.Bool mx sync.Mutex lastTx string // number of queued mutations since lastTx queued int lastMTime am.Time } var _ am.Tracer = &DbgTracer{} func ( am.Api, string) *DbgTracer { := &DbgTracer{ Addr: , Mach: , outbox: make(chan func(), 1000), } := .Mach.Ctx() // process the queue go func() { for { select { case := <-.outbox: () case <-.Done(): return } } }() return } func ( *DbgTracer) ( am.Api) context.Context { gob.Register(am.Relation(0)) var error .Mach = .lastMTime = .Time(nil) // add to the queue .outbox <- func() { if .IsDisposed() { return } .c, = newDbgClient(.Addr) if != nil && os.Getenv(am.EnvAmLog) != "" { log.Printf("%s: failed to connect to am-dbg: %s\n", .Id(), ) return } = sendMsgSchema(, .c) if != nil && os.Getenv(am.EnvAmLog) != "" { log.Println(, nil) return } } return nil } func ( *DbgTracer) ( am.Api, am.Schema) { .lastMTime = .Time(nil) // add to the queue .outbox <- func() { := sendMsgSchema(, .c) if != nil { = fmt.Errorf("failed to send new struct to am-dbg: %w", ) .AddErr(, nil) return } } } func ( *DbgTracer) ( *am.Transition) { := .Api if .errCount.Load() > 10 && !.exited.Load() { .exited.Store(true) if os.Getenv(am.EnvAmLog) != "" { log.Println(.Id() + ": too many errors - detaching dbg tracer") } go func() { := .DetachTracer() if != nil && os.Getenv(am.EnvAmLog) != "" { log.Printf(.Id()+": failed to detach dbg tracer: %s\n", ) } }() return } := .Mutation // skip check mutations when not logging them if .IsCheck && !.SemLogger().IsCan() { return } .InternalLogEntriesLock.Lock() defer .InternalLogEntriesLock.Unlock() .mx.Lock() defer .mx.Unlock() := &DbgMsgTx{ MachineID: .Id(), ID: .Id, Clocks: .TimeAfter, Accepted: .IsAccepted.Load(), IsCheck: .IsCheck, Type: .Type, CalledStatesIdxs: .Called, Steps: .Steps, // no locking necessary, as the tx is finalized (read-only) LogEntries: removeLogPrefix(, .LogEntries), PreLogEntries: removeLogPrefix(, .PreLogEntries), IsAuto: .Auto, Queue: int(.QueueLen), QueueTick: .QueueTick(), MutQueueToken: .QueueToken, // debug // QueueDump: mach.QueueDump(), } // collect args := .SemLogger() if .IsArgs() { .Args = .MapArgs(.ArgsMapper()) } .lastTx = .Id .lastMTime = .TimeAfter .queued = 0 // add to the queue .outbox <- func() { if .c == nil { if os.Getenv(am.EnvAmLog) != "" { log.Println(.Id() + ": no connection to am-dbg") } .errCount.Add(1) return } := .c.sendMsgTx() if != nil { if os.Getenv(am.EnvAmLog) != "" { log.Printf(.Id()+":failed to send a msg to am-dbg: %s", ) } .errCount.Add(1) } } } func ( *DbgTracer) ( string) { // TODO lock & dispose? // t.Mach = nil go func() { // TODO push out pending m.logEntries using add:Healthcheck (if set) time.Sleep(100 * time.Millisecond) if .c != nil && .c.rpc != nil { _ = .c.rpc.Close() } }() } func ( *DbgTracer) ( am.Api, *am.Mutation) { // skip check mutations when not logging them if .IsCheck && !.SemLogger().IsCan() { return } .mx.Lock() defer .mx.Unlock() := &DbgMsgTx{ MachineID: .Id(), ID: .lastTx + "-" + strconv.Itoa(.queued), Clocks: .lastMTime, Accepted: true, IsCheck: .IsCheck, Type: .Type, CalledStatesIdxs: .Called, IsAuto: .Auto, Queue: int(.QueueLen), IsQueued: true, QueueTick: .QueueTick(), MutQueueTick: .QueueTick, MutQueueToken: .QueueToken, // debug // QueueDump: mach.QueueDump(), } // collect args := .SemLogger() if .IsArgs() { .Args = .MapArgs(.ArgsMapper()) } if := am.ParseArgs(.Args).Err; != nil { if .Args == nil { .Args = make(map[string]string) } .Args["err"] = .Error() } .queued++ // add to the queue .outbox <- func() { if .c == nil { if os.Getenv(am.EnvAmLog) != "" { log.Println(.Id() + ": no connection to am-dbg") } .errCount.Add(1) return } := .c.sendMsgTx() if != nil { if os.Getenv(am.EnvAmLog) != "" { log.Printf(.Id()+":failed to send a msg to am-dbg: %s", ) } .errCount.Add(1) } } } // ///// FUNCS // TransitionsToDbg sends transitions to the am-dbg server. func ( am.Api, string) error { if == "" { = DbgAddr } // prevent double debugging := .Tracers() for , := range { if , := .(*DbgTracer); && .Addr == { return nil } } // add the tracer := NewDbgTracer(, ) := .BindTracer() if != nil { return } // call manually for existing machines .MachineInit() return nil } // sendMsgSchema sends the machine's states and relations func sendMsgSchema( am.Api, *dbgClient) error { , := .Groups() := &DbgMsgStruct{ ID: .Id(), StatesIndex: .StateNames(), States: .Schema(), Groups: , GroupsOrder: , Parent: .ParentId(), Tags: .Tags(), } // TODO retries := .sendMsgSchema() if != nil { return fmt.Errorf("failed to send a msg to am-dbg: %w", ) } return nil } func removeLogPrefix( am.Api, []*am.LogEntry) []*am.LogEntry { := slices.Clone() if !.SemLogger().IsId() { return } := 5 := 3 // "[] " := min(len(.Id())+, +) := make([]*am.LogEntry, len()) for , := range { if == nil || len(.Text) < { continue } [] = &am.LogEntry{ Level: .Level, Text: .Text[:], } } return }