package telemetry
import (
"context"
"encoding/gob"
"fmt"
"log"
"net/rpc"
"os"
"slices"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/pancsta/asyncmachine-go/internal/utils"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
)
const (
DbgAddr = "localhost:6831"
EnvAmDbgAddr = "AM_DBG_ADDR"
)
type DbgMsg interface {
Clock (statesIndex am .S , state string ) uint64
Is (statesIndex am .S , states am .S ) bool
}
type DbgMsgStruct struct {
ID string
StatesIndex am .S
States am .Schema
Groups map [string ][]int
GroupsOrder []string
Parent string
Tags []string
}
func (d *DbgMsgStruct ) Clock (_ am .S , _ string ) uint64 {
return 0
}
func (d *DbgMsgStruct ) Is (_ am .S , _ am .S ) bool {
return false
}
type DbgMsgTx struct {
MachineID string
ID string
Clocks am .Time
QueueTick uint64
MutQueueToken uint64
MutQueueTick uint64
Type am .MutationType
CalledStates []string
CalledStatesIdxs []int
Steps []*am .Step
LogEntries []*am .LogEntry
PreLogEntries []*am .LogEntry
Queue int
Time *time .Time
IsAuto bool
Accepted bool
IsCheck bool
IsQueued bool
Args map [string ]string
QueueDump []string
}
func (m *DbgMsgTx ) Clock (statesIndex am .S , state string ) uint64 {
idx := slices .Index (statesIndex , state )
if len (m .Clocks ) <= idx {
return 0
}
return m .Clocks [idx ]
}
func (m *DbgMsgTx ) Is (statesIndex am .S , states am .S ) bool {
for _ , state := range states {
idx := m .Index (statesIndex , state )
if idx == -1 {
return false
}
if len (m .Clocks ) <= idx || !am .IsActiveTick (m .Clocks [idx ]) {
return false
}
}
return true
}
func (m *DbgMsgTx ) Index (statesIndex am .S , state string ) int {
idx := slices .Index (statesIndex , state )
return idx
}
func (m *DbgMsgTx ) ActiveStates (statesIndex am .S ) am .S {
ret := am .S {}
for _ , state := range statesIndex {
idx := slices .Index (statesIndex , state )
if len (m .Clocks ) <= idx {
continue
}
if am .IsActiveTick (m .Clocks [idx ]) {
ret = append (ret , state )
}
}
return ret
}
func (m *DbgMsgTx ) CalledStateNames (statesIndex am .S ) am .S {
if m .CalledStates != nil {
return m .CalledStates
}
ret := make (am .S , len (m .CalledStatesIdxs ))
for i , idx := range m .CalledStatesIdxs {
ret [i ] = statesIndex [idx ]
}
return ret
}
func (m *DbgMsgTx ) TxString (statesIndex am .S ) string {
ret := "tx#" + m .ID + "\n[" + m .Type .String () + "] " +
utils .J (m .CalledStateNames (statesIndex )) + "\n"
for _ , step := range m .Steps {
ret += "- " + step .StringFromIndex (statesIndex ) + "\n"
}
return ret
}
func (m *DbgMsgTx ) MutString (statesIndex am .S ) string {
ret := "+"
if m .Type == am .MutationRemove {
ret = "-"
}
ret += utils .J (m .CalledStateNames (statesIndex ))
return ret
}
func (m *DbgMsgTx ) Is1 (statesIndex am .S , state string ) bool {
return m .Is (statesIndex , am .S {state })
}
func (m *DbgMsgTx ) TimeSum () uint64 {
sum := uint64 (0 )
for _ , clock := range m .Clocks {
sum += clock
}
return sum
}
type dbgClient struct {
addr string
rpc *rpc .Client
}
func newDbgClient(addr string ) (*dbgClient , error ) {
client , err := rpc .Dial ("tcp4" , addr )
if err != nil {
return nil , err
}
return &dbgClient {addr : addr , rpc : client }, nil
}
func (c *dbgClient ) sendMsgTx (msg *DbgMsgTx ) error {
var reply string
err := c .rpc .Call ("RPCServer.DbgMsgTx" , msg , &reply )
if err != nil {
return err
}
return nil
}
func (c *dbgClient ) sendMsgSchema (msg *DbgMsgStruct ) error {
if c == nil {
return nil
}
var reply string
err := c .rpc .Call ("RPCServer.DbgMsgSchema" , msg , &reply )
if err != nil {
return err
}
return nil
}
type DbgTracer struct {
*am .NoOpTracer
Addr string
Mach am .Api
outbox chan func ()
c *dbgClient
errCount atomic .Int32
exited atomic .Bool
mx sync .Mutex
lastTx string
queued int
lastMTime am .Time
}
var _ am .Tracer = &DbgTracer {}
func NewDbgTracer (mach am .Api , addr string ) *DbgTracer {
t := &DbgTracer {
Addr : addr ,
Mach : mach ,
outbox : make (chan func (), 1000 ),
}
ctx := t .Mach .Ctx ()
go func () {
for {
select {
case f := <- t .outbox :
f ()
case <- ctx .Done ():
return
}
}
}()
return t
}
func (t *DbgTracer ) MachineInit (mach am .Api ) context .Context {
gob .Register (am .Relation (0 ))
var err error
t .Mach = mach
t .lastMTime = mach .Time (nil )
t .outbox <- func () {
if mach .IsDisposed () {
return
}
t .c , err = newDbgClient (t .Addr )
if err != nil && os .Getenv (am .EnvAmLog ) != "" {
log .Printf ("%s: failed to connect to am-dbg: %s\n" , mach .Id (), err )
return
}
err = sendMsgSchema (mach , t .c )
if err != nil && os .Getenv (am .EnvAmLog ) != "" {
log .Println (err , nil )
return
}
}
return nil
}
func (t *DbgTracer ) SchemaChange (mach am .Api , _ am .Schema ) {
t .lastMTime = mach .Time (nil )
t .outbox <- func () {
err := sendMsgSchema (mach , t .c )
if err != nil {
err = fmt .Errorf ("failed to send new struct to am-dbg: %w" , err )
mach .AddErr (err , nil )
return
}
}
}
func (t *DbgTracer ) TransitionEnd (tx *am .Transition ) {
mach := tx .Api
if t .errCount .Load () > 10 && !t .exited .Load () {
t .exited .Store (true )
if os .Getenv (am .EnvAmLog ) != "" {
log .Println (mach .Id () + ": too many errors - detaching dbg tracer" )
}
go func () {
err := mach .DetachTracer (t )
if err != nil && os .Getenv (am .EnvAmLog ) != "" {
log .Printf (mach .Id ()+": failed to detach dbg tracer: %s\n" , err )
}
}()
return
}
mut := tx .Mutation
if mut .IsCheck && !mach .SemLogger ().IsCan () {
return
}
tx .InternalLogEntriesLock .Lock ()
defer tx .InternalLogEntriesLock .Unlock ()
t .mx .Lock ()
defer t .mx .Unlock ()
msg := &DbgMsgTx {
MachineID : mach .Id (),
ID : tx .Id ,
Clocks : tx .TimeAfter ,
Accepted : tx .IsAccepted .Load (),
IsCheck : mut .IsCheck ,
Type : mut .Type ,
CalledStatesIdxs : mut .Called ,
Steps : tx .Steps ,
LogEntries : removeLogPrefix (mach , tx .LogEntries ),
PreLogEntries : removeLogPrefix (mach , tx .PreLogEntries ),
IsAuto : mut .Auto ,
Queue : int (tx .QueueLen ),
QueueTick : mach .QueueTick (),
MutQueueToken : mut .QueueToken ,
}
semlog := mach .SemLogger ()
if semlog .IsArgs () {
msg .Args = mut .MapArgs (semlog .ArgsMapper ())
}
t .lastTx = tx .Id
t .lastMTime = tx .TimeAfter
t .queued = 0
t .outbox <- func () {
if t .c == nil {
if os .Getenv (am .EnvAmLog ) != "" {
log .Println (mach .Id () + ": no connection to am-dbg" )
}
t .errCount .Add (1 )
return
}
err := t .c .sendMsgTx (msg )
if err != nil {
if os .Getenv (am .EnvAmLog ) != "" {
log .Printf (mach .Id ()+":failed to send a msg to am-dbg: %s" , err )
}
t .errCount .Add (1 )
}
}
}
func (t *DbgTracer ) MachineDispose (id string ) {
go func () {
time .Sleep (100 * time .Millisecond )
if t .c != nil && t .c .rpc != nil {
_ = t .c .rpc .Close ()
}
}()
}
func (t *DbgTracer ) MutationQueued (mach am .Api , mut *am .Mutation ) {
if mut .IsCheck && !mach .SemLogger ().IsCan () {
return
}
t .mx .Lock ()
defer t .mx .Unlock ()
msg := &DbgMsgTx {
MachineID : mach .Id (),
ID : t .lastTx + "-" + strconv .Itoa (t .queued ),
Clocks : t .lastMTime ,
Accepted : true ,
IsCheck : mut .IsCheck ,
Type : mut .Type ,
CalledStatesIdxs : mut .Called ,
IsAuto : mut .Auto ,
Queue : int (mut .QueueLen ),
IsQueued : true ,
QueueTick : mach .QueueTick (),
MutQueueTick : mut .QueueTick ,
MutQueueToken : mut .QueueToken ,
}
semlog := mach .SemLogger ()
if semlog .IsArgs () {
msg .Args = mut .MapArgs (semlog .ArgsMapper ())
}
if err := am .ParseArgs (mut .Args ).Err ; err != nil {
if msg .Args == nil {
msg .Args = make (map [string ]string )
}
msg .Args ["err" ] = err .Error()
}
t .queued ++
t .outbox <- func () {
if t .c == nil {
if os .Getenv (am .EnvAmLog ) != "" {
log .Println (mach .Id () + ": no connection to am-dbg" )
}
t .errCount .Add (1 )
return
}
err := t .c .sendMsgTx (msg )
if err != nil {
if os .Getenv (am .EnvAmLog ) != "" {
log .Printf (mach .Id ()+":failed to send a msg to am-dbg: %s" , err )
}
t .errCount .Add (1 )
}
}
}
func TransitionsToDbg (mach am .Api , addr string ) error {
if addr == "" {
addr = DbgAddr
}
tracers := mach .Tracers ()
for _ , tracer := range tracers {
if t , ok := tracer .(*DbgTracer ); ok && t .Addr == addr {
return nil
}
}
tracer := NewDbgTracer (mach , addr )
err := mach .BindTracer (tracer )
if err != nil {
return err
}
tracer .MachineInit (mach )
return nil
}
func sendMsgSchema(mach am .Api , client *dbgClient ) error {
groups , order := mach .Groups ()
msg := &DbgMsgStruct {
ID : mach .Id (),
StatesIndex : mach .StateNames (),
States : mach .Schema (),
Groups : groups ,
GroupsOrder : order ,
Parent : mach .ParentId (),
Tags : mach .Tags (),
}
err := client .sendMsgSchema (msg )
if err != nil {
return fmt .Errorf ("failed to send a msg to am-dbg: %w" , err )
}
return nil
}
func removeLogPrefix(mach am .Api , entries []*am .LogEntry ) []*am .LogEntry {
clone := slices .Clone (entries )
if !mach .SemLogger ().IsId () {
return clone
}
maxIdLen := 5
addChars := 3
prefixLen := min (len (mach .Id ())+addChars , maxIdLen +addChars )
ret := make ([]*am .LogEntry , len (clone ))
for i , le := range clone {
if le == nil || len (le .Text ) < prefixLen {
continue
}
ret [i ] = &am .LogEntry {
Level : le .Level ,
Text : le .Text [prefixLen :],
}
}
return ret
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .