package dbg
import (
"context"
"encoding/gob"
"fmt"
"net/rpc"
"os"
"slices"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/coder/websocket"
"github.com/pancsta/asyncmachine-go/internal/utils"
"github.com/pancsta/asyncmachine-go/pkg/machine"
)
const (
DbgAddr = "localhost:6831"
DbgAddrWeb = "localhost:6832"
EnvAmDbgAddr = "AM_DBG_ADDR"
)
type DbgMsg interface {
Clock (statesIndex machine .S , state string ) uint64
Is (statesIndex machine .S , states machine .S ) bool
}
type DbgMsgStruct struct {
ID string
StatesIndex machine .S
States machine .Schema
Groups map [string ][]int
GroupsOrder []string
Parent string
Tags []string
}
func (d *DbgMsgStruct ) Clock (_ machine .S , _ string ) uint64 {
return 0
}
func (d *DbgMsgStruct ) Is (_ machine .S , _ machine .S ) bool {
return false
}
type DbgMsgTx struct {
MachineID string
ID string
Clocks machine .Time
QueueTick uint64
MutQueueToken uint64
MutQueueTick uint64
Type machine .MutationType
CalledStates []string
CalledStatesIdxs []int
Steps []*machine .Step
LogEntries []*machine .LogEntry
PreLogEntries []*machine .LogEntry
Queue int
Time *time .Time
IsAuto bool
Accepted bool
IsCheck bool
IsQueued bool
Args map [string ]string
QueueDump []string
}
func (m *DbgMsgTx ) Clock (statesIndex machine .S , state string ) uint64 {
idx := slices .Index (statesIndex , state )
if len (m .Clocks ) <= idx {
return 0
}
return m .Clocks [idx ]
}
func (m *DbgMsgTx ) Is (statesIndex machine .S , states machine .S ) bool {
for _ , state := range states {
idx := m .Index (statesIndex , state )
if idx == -1 {
return false
}
if len (m .Clocks ) <= idx || !machine .IsActiveTick (m .Clocks [idx ]) {
return false
}
}
return true
}
func (m *DbgMsgTx ) Index (statesIndex machine .S , state string ) int {
idx := slices .Index (statesIndex , state )
return idx
}
func (m *DbgMsgTx ) ActiveStates (statesIndex machine .S ) machine .S {
ret := machine .S {}
for _ , state := range statesIndex {
idx := slices .Index (statesIndex , state )
if len (m .Clocks ) <= idx {
continue
}
if machine .IsActiveTick (m .Clocks [idx ]) {
ret = append (ret , state )
}
}
return ret
}
func (m *DbgMsgTx ) CalledStateNames (statesIndex machine .S ) machine .S {
if m .CalledStates != nil {
return m .CalledStates
}
ret := make (machine .S , len (m .CalledStatesIdxs ))
for i , idx := range m .CalledStatesIdxs {
ret [i ] = statesIndex [idx ]
}
return ret
}
func (m *DbgMsgTx ) TxString (statesIndex machine .S ) string {
info := "mach://" + m .MachineID + "/" + m .ID + "\n"
info += m .Time .UTC ().Format (time .RFC3339Nano ) + "\n\n"
tx := "[" + m .Type .String () + "] " +
utils .J (m .CalledStateNames (statesIndex )) + "\n"
steps := ""
for _ , step := range m .Steps {
steps += "- " + step .StringFromIndex (statesIndex ) + "\n"
}
return info + tx + steps
}
func (m *DbgMsgTx ) MutString (statesIndex machine .S ) string {
ret := "+"
if m .Type == machine .MutationRemove {
ret = "-"
}
ret += utils .J (m .CalledStateNames (statesIndex ))
return ret
}
func (m *DbgMsgTx ) Is1 (statesIndex machine .S , state string ) bool {
return m .Is (statesIndex , machine .S {state })
}
func (m *DbgMsgTx ) TimeSum () uint64 {
sum := uint64 (0 )
for _ , clock := range m .Clocks {
sum += clock
}
return sum
}
type client struct {
addr string
rpc *rpc .Client
}
func newDbgClient(ctx context .Context , addr string , ws bool ) (*client , error ) {
var cli *rpc .Client
var err error
if ws {
ws , _ , err := websocket .Dial (ctx ,
"ws://" +addr +"/dbg.ws" , &websocket .DialOptions {})
if err != nil {
return nil , fmt .Errorf ("websocket.Dial failed %w" , err )
}
conn := websocket .NetConn (ctx , ws , websocket .MessageBinary )
cli = rpc .NewClient (conn )
} else {
cli , err = rpc .Dial ("tcp4" , addr )
if err != nil {
return nil , err
}
}
return &client {addr : addr , rpc : cli }, nil
}
func (c *client ) sendMsgTx (msg *DbgMsgTx ) error {
var reply string
err := c .rpc .Call ("RPCServer.DbgMsgTx" , msg , &reply )
if err != nil {
return err
}
return nil
}
func (c *client ) sendMsgSchema (msg *DbgMsgStruct ) error {
if c == nil {
return nil
}
var reply string
err := c .rpc .Call ("RPCServer.DbgMsgSchema" , msg , &reply )
if err != nil {
return err
}
return nil
}
type Tracer struct {
*machine .TracerNoOp
Mach machine .Api
Addr string
Ws bool
outbox chan func ()
c *client
errCount atomic .Int32
exited atomic .Bool
mx sync .Mutex
lastTx string
queued int
lastMTime machine .Time
}
var _ machine .Tracer = &Tracer {}
func NewTracer (mach machine .Api , addr string ) *Tracer {
t := &Tracer {
Addr : addr ,
Mach : mach ,
outbox : make (chan func (), 1000 ),
}
ctx := t .Mach .Context ()
go func () {
for {
select {
case f := <- t .outbox :
f ()
case <- ctx .Done ():
return
}
}
}()
return t
}
func (t *Tracer ) MachineInit (mach machine .Api ) context .Context {
t .mx .Lock ()
defer t .mx .Unlock ()
gob .Register (machine .Relation (0 ))
var err error
t .Mach = mach
t .lastMTime = mach .Time (nil )
t .outbox <- func () {
if mach .IsDisposed () {
return
}
t .c , err = newDbgClient (mach .Context (), t .Addr , t .Ws )
if err != nil && os .Getenv (machine .EnvAmLog ) != "" {
return
}
err = sendMsgSchema (mach , t .c )
if err != nil && os .Getenv (machine .EnvAmLog ) != "" {
return
}
}
return nil
}
func (t *Tracer ) SchemaChange (mach machine .Api , _ machine .Schema ) {
t .lastMTime = mach .Time (nil )
t .outbox <- func () {
err := sendMsgSchema (mach , t .c )
if err != nil {
err = fmt .Errorf ("failed to send new struct to am-dbg: %w" , err )
mach .AddErr (err , nil )
return
}
}
}
func (t *Tracer ) TransitionEnd (tx *machine .Transition ) {
mach := tx .MachApi
if t .errCount .Load () > 10 && !t .exited .Load () {
t .exited .Store (true )
if os .Getenv (machine .EnvAmLog ) != "" {
}
go func () {
err := mach .DetachTracer (t )
if err != nil && os .Getenv (machine .EnvAmLog ) != "" {
}
}()
return
}
mut := tx .Mutation
if mut .IsCheck && !mach .SemLogger ().IsCan () {
return
}
tx .InternalLogEntriesLock .Lock ()
defer tx .InternalLogEntriesLock .Unlock ()
t .mx .Lock ()
defer t .mx .Unlock ()
msg := &DbgMsgTx {
MachineID : mach .Id (),
ID : tx .Id ,
Clocks : tx .TimeAfter ,
Accepted : tx .IsAccepted .Load (),
IsCheck : mut .IsCheck ,
Type : mut .Type ,
CalledStatesIdxs : mut .Called ,
Steps : tx .Steps ,
LogEntries : removeLogPrefix (mach , tx .LogEntries ),
PreLogEntries : removeLogPrefix (mach , tx .PreLogEntries ),
IsAuto : mut .IsAuto ,
Queue : int (tx .QueueLen ),
QueueTick : mach .QueueTick (),
MutQueueToken : mut .QueueToken ,
}
semlog := mach .SemLogger ()
if semlog .IsArgs () {
msg .Args = mut .MapArgs (semlog .ArgsMapper ())
}
t .lastTx = tx .Id
t .lastMTime = tx .TimeAfter
t .queued = 0
t .outbox <- func () {
if t .c == nil {
if os .Getenv (machine .EnvAmLog ) != "" {
}
t .errCount .Add (1 )
return
}
err := t .c .sendMsgTx (msg )
if err != nil {
if os .Getenv (machine .EnvAmLog ) != "" {
}
t .errCount .Add (1 )
}
}
}
func (t *Tracer ) MachineDispose (id string ) {
go func () {
time .Sleep (100 * time .Millisecond )
if t .c != nil && t .c .rpc != nil {
_ = t .c .rpc .Close ()
}
}()
}
func (t *Tracer ) MutationQueued (mach machine .Api , mut *machine .Mutation ) {
if mut .IsCheck && !mach .SemLogger ().IsCan () {
return
}
t .mx .Lock ()
defer t .mx .Unlock ()
msg := &DbgMsgTx {
MachineID : mach .Id (),
ID : t .lastTx + "-" + strconv .Itoa (t .queued ),
Clocks : t .lastMTime ,
Accepted : true ,
IsCheck : mut .IsCheck ,
Type : mut .Type ,
CalledStatesIdxs : mut .Called ,
IsAuto : mut .IsAuto ,
Queue : int (mut .QueueLen ),
IsQueued : true ,
QueueTick : mach .QueueTick (),
MutQueueTick : mut .QueueTick ,
MutQueueToken : mut .QueueToken ,
}
semlog := mach .SemLogger ()
if semlog .IsArgs () {
msg .Args = mut .MapArgs (semlog .ArgsMapper ())
}
if err := machine .ParseArgs (mut .Args ).Err ; err != nil {
if msg .Args == nil {
msg .Args = make (map [string ]string )
}
msg .Args ["err" ] = err .Error()
}
t .queued ++
t .outbox <- func () {
if t .c == nil {
if os .Getenv (machine .EnvAmLog ) != "" {
}
t .errCount .Add (1 )
return
}
err := t .c .sendMsgTx (msg )
if err != nil {
if os .Getenv (machine .EnvAmLog ) != "" {
}
t .errCount .Add (1 )
}
}
}
type Opts struct {
WebSocket bool
}
func TransitionsToDbg (mach machine .Api , addr string , opts ...*Opts ) error {
if addr == "" {
addr = DbgAddr
}
tracers := mach .Tracers ()
for _ , tracer := range tracers {
if t , ok := tracer .(*Tracer ); ok && t .Addr == addr {
return nil
}
}
tracer := NewTracer (mach , addr )
err := mach .BindTracer (tracer )
if err != nil {
return err
}
if len (opts ) > 0 && opts [0 ].WebSocket {
tracer .Ws = true
}
tracer .MachineInit (mach )
return nil
}
func sendMsgSchema(mach machine .Api , client *client ) error {
groups , order := mach .Groups ()
msg := &DbgMsgStruct {
ID : mach .Id (),
StatesIndex : mach .StateNames (),
States : mach .Schema (),
Groups : groups ,
GroupsOrder : order ,
Parent : mach .ParentId (),
Tags : mach .Tags (),
}
err := client .sendMsgSchema (msg )
if err != nil {
return fmt .Errorf ("failed to send a msg to am-dbg: %w" , err )
}
return nil
}
func removeLogPrefix(
mach machine .Api , entries []*machine .LogEntry ,
) []*machine .LogEntry {
clone := slices .Clone (entries )
if !mach .SemLogger ().IsId () {
return clone
}
maxIdLen := 5
addChars := 3
prefixLen := min (len (mach .Id ())+addChars , maxIdLen +addChars )
ret := make ([]*machine .LogEntry , len (clone ))
for i , le := range clone {
if le == nil || len (le .Text ) < prefixLen {
continue
}
ret [i ] = &machine .LogEntry {
Level : le .Level ,
Text : le .Text [prefixLen :],
}
}
return ret
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .