package rpc
import (
"context"
"errors"
"fmt"
"maps"
"reflect"
"regexp"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
"github.com/pancsta/asyncmachine-go/internal/utils"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)
type ClockUpdateFunc func (now am .Time , qTick uint64 , machTick uint32 )
type NetMachConn interface {
Call (ctx context .Context , method ServerMethod , args any , resp any ) bool
Notify (ctx context .Context , method ServerMethod , args any ) bool
}
type NetMachInternal struct {
nm *NetworkMachine
}
func (i *NetMachInternal ) UpdateClock (
now am .Time , qTick uint64 , machTick uint32 ,
) {
i .nm .updateClock (now , qTick , machTick )
}
func (i *NetMachInternal ) Lock () {
i .nm .clockMx .Lock ()
}
func (i *NetMachInternal ) Unlock () {
i .nm .clockMx .Unlock ()
}
type NetworkMachine struct {
LogStackTrace bool
conn NetMachConn
remoteId string
subs *am .Subscriptions
id string
ctx context .Context
disposed atomic .Bool
err atomic .Pointer [error ]
schema am .Schema
clockMx sync .RWMutex
schemaMx sync .RWMutex
machTime am .Time
machClock am .Clock
queueTick uint64
stateNames am .S
activeStates atomic .Pointer [am .S ]
activeStatesDbg am .S
indexStateCtx am .IndexStateCtx
indexWhen am .IndexWhen
indexWhenTime am .IndexWhenTime
whenDisposed chan struct {}
tracers []am .Tracer
tracersMx sync .RWMutex
handlers []*handler
handlersMx sync .Mutex
parentId string
tags []string
logLevel atomic .Pointer [am .LogLevel ]
logger atomic .Pointer [am .LoggerFn ]
logEntriesLock sync .Mutex
logEntries []*am .LogEntry
logId atomic .Bool
semLogger *semLogger
machTick uint32
t atomic .Pointer [am .Transition ]
disposeHandlers []am .HandlerDispose
filterMutations bool
}
var ssNS = states .NetSourceStates
var _ am .Api = &NetworkMachine {}
func NewNetworkMachine (
ctx context .Context , id string , conn NetMachConn , schema am .Schema ,
stateNames am .S , parent *am .Machine , tags []string , filterMutations bool ,
) (*NetworkMachine , *NetMachInternal , error ) {
if ctx == nil {
return nil , nil , errors .New ("ctx cannot be nil" )
}
if schema != nil && len (schema ) != len (stateNames ) {
return nil , nil , errors .New (
"schema and stateNames must have the same length" )
}
if parent == nil {
return nil , nil , errors .New ("parent cannot be nil" )
}
if tags == nil {
tags = []string {"rpc-worker" , "src-id:" }
}
netMach := &NetworkMachine {
LogStackTrace : true ,
conn : conn ,
id : id ,
ctx : parent .Ctx (),
schema : schema ,
stateNames : stateNames ,
indexWhen : am .IndexWhen {},
indexStateCtx : am .IndexStateCtx {},
indexWhenTime : am .IndexWhenTime {},
whenDisposed : make (chan struct {}),
machTime : make (am .Time , len (stateNames )),
machClock : am .Clock {},
queueTick : 1 ,
parentId : parent .Id (),
tags : tags ,
filterMutations : filterMutations ,
}
netMach .logId .Store (true )
for _ , state := range stateNames {
netMach .machClock [state ] = 0
}
netMach .subs = am .NewSubscriptionManager (netMach , netMach .machClock ,
netMach .is , netMach .not , netMach .log )
netMach .semLogger = &semLogger {mach : netMach }
lvl := am .LogNothing
netMach .logLevel .Store (&lvl )
netMach .activeStates .Store (&am .S {})
parent .OnDispose (func (id string , ctx context .Context ) {
netMach .Dispose ()
})
return netMach , &NetMachInternal {netMach }, nil
}
func (m *NetworkMachine ) Add (states am .S , args am .A ) am .Result {
if m .conn == nil {
return am .Canceled
}
m .MustParseStates (states )
if m .filterMutations && !m .mutAccepted (am .MutationAdd , states ) {
return am .Canceled
}
resp := &MsgSrvMutation {}
rpcArgs := &MsgCliMutation {
States : amhelp .StatesToIndexes (m .StateNames (), states ),
Args : args ,
}
if !m .conn .Call (m .Ctx (), ServerAdd , rpcArgs , resp ) {
return am .Canceled
}
return resp .Result
}
func (m *NetworkMachine ) Add1 (state string , args am .A ) am .Result {
if m .conn == nil {
return am .Canceled
}
return m .Add (am .S {state }, args )
}
func (m *NetworkMachine ) AddNS (states am .S , args am .A ) am .Result {
if m .conn == nil {
return am .Canceled
}
m .MustParseStates (states )
if m .filterMutations && !m .mutAccepted (am .MutationAdd , states ) {
return am .Canceled
}
rpcArgs := &MsgCliMutation {
States : amhelp .StatesToIndexes (m .StateNames (), states ),
Args : args ,
}
if !m .conn .Notify (m .Ctx (), ServerAddNS , rpcArgs ) {
return am .Canceled
}
return am .Executed
}
func (m *NetworkMachine ) Add1NS (state string , args am .A ) am .Result {
if m .conn == nil {
return am .Canceled
}
return m .AddNS (am .S {state }, args )
}
func (m *NetworkMachine ) Remove (states am .S , args am .A ) am .Result {
if m .conn == nil {
return am .Canceled
}
m .MustParseStates (states )
if m .filterMutations && !m .mutAccepted (am .MutationRemove , states ) {
return am .Canceled
}
resp := &MsgSrvMutation {}
rpcArgs := &MsgCliMutation {
States : amhelp .StatesToIndexes (m .StateNames (), states ),
Args : args ,
}
if !m .conn .Call (m .Ctx (), ServerRemove , rpcArgs , resp ) {
return am .Canceled
}
return resp .Result
}
func (m *NetworkMachine ) Remove1 (state string , args am .A ) am .Result {
if m .conn == nil {
return am .Canceled
}
return m .Remove (am .S {state }, args )
}
func (m *NetworkMachine ) Set (states am .S , args am .A ) am .Result {
if m .conn == nil {
return am .Canceled
}
m .MustParseStates (states )
if m .filterMutations && !m .mutAccepted (am .MutationSet , states ) {
return am .Canceled
}
resp := &MsgSrvMutation {}
rpcArgs := &MsgCliMutation {
States : amhelp .StatesToIndexes (m .StateNames (), states ),
Args : args ,
}
if !m .conn .Call (m .Ctx (), ServerSet , rpcArgs , resp ) {
return am .Canceled
}
return resp .Result
}
func (m *NetworkMachine ) AddErr (err error , args am .A ) am .Result {
if m .conn == nil {
return am .Canceled
}
return m .AddErrState (am .StateException , err , args )
}
func (m *NetworkMachine ) AddErrState (
state string , err error , args am .A ,
) am .Result {
if m .conn == nil || m .disposed .Load () {
return am .Canceled
}
m .err .Store (&err )
if m .LogStackTrace {
trace := utils .CaptureStackTrace ()
m .log (am .LogChanges , fmt .Sprintf ("ERROR: %s\nTrace:\n%s" , err , trace ))
}
argsT := &am .AT {Err : err }
errStates := am .S {state , am .StateException }
if m .Has1 (ssNS .ErrOnClient ) {
errStates = append (errStates , ssNS .ErrOnClient )
}
return m .Add (errStates , am .PassMerge (args , argsT ))
}
func (m *NetworkMachine ) EvAdd (
event *am .Event , states am .S , args am .A ,
) am .Result {
if m .conn == nil {
return am .Canceled
}
m .MustParseStates (states )
resp := &MsgSrvMutation {}
rpcArgs := &MsgCliMutation {
States : amhelp .StatesToIndexes (m .StateNames (), states ),
Args : args ,
Event : event .Export (),
}
if !m .conn .Call (m .Ctx (), ServerAdd , rpcArgs , resp ) {
return am .Canceled
}
return resp .Result
}
func (m *NetworkMachine ) EvAdd1 (
event *am .Event , state string , args am .A ,
) am .Result {
if m .conn == nil {
return am .Canceled
}
return m .EvAdd (event , am .S {state }, args )
}
func (m *NetworkMachine ) EvRemove1 (
event *am .Event , state string , args am .A ,
) am .Result {
if m .conn == nil {
return am .Canceled
}
return m .EvRemove (event , am .S {state }, args )
}
func (m *NetworkMachine ) EvRemove (
event *am .Event , states am .S , args am .A ,
) am .Result {
if m .conn == nil {
return am .Canceled
}
m .MustParseStates (states )
resp := &MsgSrvMutation {}
rpcArgs := &MsgCliMutation {
States : amhelp .StatesToIndexes (m .StateNames (), states ),
Args : args ,
Event : event .Export (),
}
if !m .conn .Call (m .Ctx (), ServerRemove , rpcArgs , resp ) {
return am .Canceled
}
return resp .Result
}
func (m *NetworkMachine ) EvAddErr (
event *am .Event , err error , args am .A ,
) am .Result {
if m .conn == nil {
return am .Canceled
}
return m .EvAddErrState (event , am .StateException , err , args )
}
func (m *NetworkMachine ) EvAddErrState (
event *am .Event , state string , err error , args am .A ,
) am .Result {
if m .conn == nil {
return am .Canceled
}
m .err .Store (&err )
if m .LogStackTrace {
trace := utils .CaptureStackTrace ()
m .log (am .LogChanges , fmt .Sprintf ("ERROR: %s\nTrace:\n%s" , err , trace ))
}
argsT := &am .AT {Err : err }
errStates := am .S {state , am .StateException }
if m .Has1 (ssNS .ErrOnClient ) {
errStates = append (errStates , ssNS .ErrOnClient )
}
return m .EvAdd (event , errStates , am .PassMerge (args , argsT ))
}
func (m *NetworkMachine ) Toggle (states am .S , args am .A ) am .Result {
if m .conn == nil {
return am .Canceled
}
if m .Is (states ) {
return m .Remove (states , args )
} else {
return m .Add (states , args )
}
}
func (m *NetworkMachine ) Toggle1 (state string , args am .A ) am .Result {
if m .conn == nil {
return am .Canceled
}
if m .Is1 (state ) {
return m .Remove1 (state , args )
} else {
return m .Add1 (state , args )
}
}
func (m *NetworkMachine ) EvToggle (
e *am .Event , states am .S , args am .A ,
) am .Result {
if m .conn == nil {
return am .Canceled
}
if m .Is (states ) {
return m .EvRemove (e , states , args )
} else {
return m .EvAdd (e , states , args )
}
}
func (m *NetworkMachine ) EvToggle1 (
e *am .Event , state string , args am .A ,
) am .Result {
if m .conn == nil {
return am .Canceled
}
if m .Is1 (state ) {
return m .EvRemove1 (e , state , args )
}
return m .EvAdd1 (e , state , args )
}
func (m *NetworkMachine ) Is (states am .S ) bool {
return m .is (states )
}
func (m *NetworkMachine ) Is1 (state string ) bool {
return m .Is (am .S {state })
}
func (m *NetworkMachine ) is (states am .S ) bool {
activeStates := m .ActiveStates (nil )
for _ , state := range m .MustParseStates (states ) {
if !slices .Contains (activeStates , state ) {
return false
}
}
return true
}
func (m *NetworkMachine ) IsErr () bool {
return m .Is1 (am .StateException )
}
func (m *NetworkMachine ) Not (states am .S ) bool {
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
return m .not (states )
}
func (m *NetworkMachine ) not (states am .S ) bool {
return utils .SlicesNone (m .MustParseStates (states ), m .ActiveStates (nil ))
}
func (m *NetworkMachine ) Not1 (state string ) bool {
return m .Not (am .S {state })
}
func (m *NetworkMachine ) Any (states ...am .S ) bool {
for _ , s := range states {
if m .Is (s ) {
return true
}
}
return false
}
func (m *NetworkMachine ) Any1 (states ...string ) bool {
for _ , s := range states {
if m .Is1 (s ) {
return true
}
}
return false
}
func (m *NetworkMachine ) Has (states am .S ) bool {
return utils .SlicesEvery (m .StateNames (), states )
}
func (m *NetworkMachine ) Has1 (state string ) bool {
return m .Has (am .S {state })
}
func (m *NetworkMachine ) IsClock (clock am .Clock ) bool {
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
for state , tick := range clock {
if m .machTime [m .Index1 (state )] != tick {
return false
}
}
return true
}
func (m *NetworkMachine ) WasClock (clock am .Clock ) bool {
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
for state , tick := range clock {
if m .machTime [m .Index1 (state )] < tick {
return false
}
}
return true
}
func (m *NetworkMachine ) IsTime (t am .Time , states am .S ) bool {
m .MustParseStates (states )
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
index := m .StateNames ()
if states == nil {
states = index
}
for i , tick := range t {
if m .machTime [slices .Index (index , states [i ])] != tick {
return false
}
}
return true
}
func (m *NetworkMachine ) WasTime (t am .Time , states am .S ) bool {
m .MustParseStates (states )
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
index := m .StateNames ()
if states == nil {
states = index
}
for i , tick := range t {
if m .machTime [slices .Index (index , states [i ])] < tick {
return false
}
}
return true
}
func (m *NetworkMachine ) Switch (groups ...am .S ) string {
activeStates := m .ActiveStates (nil )
for _ , states := range groups {
for _ , state := range states {
if slices .Contains (activeStates , state ) {
return state
}
}
}
return ""
}
func (m *NetworkMachine ) WhenErr (disposeCtx context .Context ) <-chan struct {} {
return m .When ([]string {am .StateException }, disposeCtx )
}
func (m *NetworkMachine ) When (
states am .S , ctx context .Context ,
) <-chan struct {} {
if m .disposed .Load () {
return newClosedChan ()
}
m .clockMx .Lock ()
defer m .clockMx .Unlock ()
return m .subs .When (m .MustParseStates (states ), ctx )
}
func (m *NetworkMachine ) When1 (
state string , ctx context .Context ,
) <-chan struct {} {
return m .When (am .S {state }, ctx )
}
func (m *NetworkMachine ) WhenNot (
states am .S , ctx context .Context ,
) <-chan struct {} {
if m .disposed .Load () {
ch := make (chan struct {})
close (ch )
return ch
}
m .clockMx .Lock ()
defer m .clockMx .Unlock ()
return m .subs .WhenNot (m .MustParseStates (states ), ctx )
}
func (m *NetworkMachine ) WhenNot1 (
state string , ctx context .Context ,
) <-chan struct {} {
return m .WhenNot (am .S {state }, ctx )
}
func (m *NetworkMachine ) WhenTime (
states am .S , times am .Time , ctx context .Context ,
) <-chan struct {} {
if m .disposed .Load () {
return newClosedChan ()
}
if len (states ) != len (times ) {
err := fmt .Errorf (
"whenTime: states and times must have the same length (%s)" ,
utils .J (states ))
m .AddErr (err , nil )
return newClosedChan ()
}
m .clockMx .Lock ()
defer m .clockMx .Unlock ()
return m .subs .WhenTime (states , times , ctx )
}
func (m *NetworkMachine ) WhenTime1 (
state string , ticks uint64 , ctx context .Context ,
) <-chan struct {} {
return m .WhenTime (am .S {state }, am .Time {ticks }, ctx )
}
func (m *NetworkMachine ) WhenTicks (
state string , ticks int , ctx context .Context ,
) <-chan struct {} {
return m .WhenTime (am .S {state }, am .Time {uint64 (ticks ) + m .Tick (state )}, ctx )
}
func (m *NetworkMachine ) WhenQuery (
clockCheck func (clock am .Clock ) bool , ctx context .Context ,
) <-chan struct {} {
m .clockMx .Lock ()
defer m .clockMx .Unlock ()
return m .subs .WhenQuery (clockCheck , ctx )
}
func (m *NetworkMachine ) WhenQueue (tick am .Result ) <-chan struct {} {
m .clockMx .Lock ()
defer m .clockMx .Unlock ()
if m .queueTick >= uint64 (tick ) {
return newClosedChan ()
}
return m .subs .WhenQueue (tick )
}
func (m *NetworkMachine ) WhenArgs (
state string , args am .A , ctx context .Context ,
) <-chan struct {} {
return newClosedChan ()
}
func (m *NetworkMachine ) Err () error {
err := m .err .Load ()
if err == nil {
return nil
}
return *err
}
func (m *NetworkMachine ) StateNames () am .S {
m .schemaMx .Lock ()
defer m .schemaMx .Unlock ()
return slices .Clone (m .stateNames )
}
func (m *NetworkMachine ) StateNamesMatch (re *regexp .Regexp ) am .S {
ret := am .S {}
for _ , name := range m .StateNames () {
if re .MatchString (name ) {
ret = append (ret , name )
}
}
return ret
}
func (m *NetworkMachine ) ActiveStates (states am .S ) am .S {
active := *m .activeStates .Load ()
if states == nil {
return slices .Clone (active )
}
ret := make (am .S , 0 , len (states ))
for _ , state := range active {
if slices .Contains (active , state ) {
ret = append (ret , state )
}
}
return ret
}
func (m *NetworkMachine ) Tick (state string ) uint64 {
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
return m .tick (state )
}
func (m *NetworkMachine ) tick (state string ) uint64 {
return m .machClock [state ]
}
func (m *NetworkMachine ) Clock (states am .S ) am .Clock {
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
index := m .StateNames ()
if states == nil {
states = index
}
ret := am .Clock {}
for _ , state := range states {
ret [state ] = m .machClock [state ]
}
return ret
}
func (m *NetworkMachine ) Time (states am .S ) am .Time {
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
return m .time (states )
}
func (m *NetworkMachine ) time (states am .S ) am .Time {
index := m .StateNames ()
if states == nil {
states = index
}
ret := am .Time {}
for _ , state := range states {
idx := slices .Index (index , state )
ret = append (ret , m .machTime [idx ])
}
return ret
}
func (m *NetworkMachine ) NewStateCtx (state string ) context .Context {
m .clockMx .Lock ()
defer m .clockMx .Unlock ()
if _ , ok := m .indexStateCtx [state ]; ok {
return m .indexStateCtx [state ].Ctx
}
v := am .CtxValue {
Id : m .id ,
State : state ,
Tick : m .machClock [state ],
}
stateCtx , cancel := context .WithCancel (context .WithValue (m .Ctx (),
am .CtxKey , v ))
if !m .is (am .S {state }) {
cancel ()
return stateCtx
}
binding := &am .CtxBinding {
Ctx : stateCtx ,
Cancel : cancel ,
}
m .indexStateCtx [state ] = binding
m .log (am .LogOps , "[ctx:new] %s" , state )
return stateCtx
}
func (m *NetworkMachine ) Log (msg string , args ...any ) {
m .log (am .LogExternal , msg , args ...)
}
func (m *NetworkMachine ) SemLogger () am .SemLogger {
return m .semLogger
}
func (m *NetworkMachine ) StatesVerified () bool {
return true
}
func (m *NetworkMachine ) Ctx () context .Context {
return m .ctx
}
func (m *NetworkMachine ) Id () string {
return m .id
}
func (m *NetworkMachine ) RemoteId () string {
return m .remoteId
}
func (m *NetworkMachine ) ParentId () string {
return m .parentId
}
func (m *NetworkMachine ) Tags () []string {
return m .tags
}
func (m *NetworkMachine ) String () string {
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
index := m .StateNames ()
active := m .ActiveStates (nil )
ret := "("
for _ , state := range index {
if !slices .Contains (active , state ) {
continue
}
if ret != "(" {
ret += " "
}
idx := slices .Index (index , state )
ret += fmt .Sprintf ("%s:%d" , state , m .machTime [idx ])
}
return ret + ")"
}
func (m *NetworkMachine ) StringAll () string {
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
index := m .StateNames ()
activeStates := m .ActiveStates (nil )
ret := "("
ret2 := "["
for _ , state := range index {
idx := slices .Index (index , state )
if slices .Contains (activeStates , state ) {
if ret != "(" {
ret += " "
}
ret += fmt .Sprintf ("%s:%d" , state , m .machTime [idx ])
continue
}
if ret2 != "[" {
ret2 += " "
}
ret2 += fmt .Sprintf ("%s:%d" , state , m .machTime [idx ])
}
return ret + ") " + ret2 + "]"
}
func (m *NetworkMachine ) Inspect (states am .S ) string {
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
index := m .StateNames ()
if states == nil {
states = index
}
activeStates := m .ActiveStates (nil )
ret := ""
for _ , name := range states {
state := m .schema [name ]
active := "0"
if slices .Contains (activeStates , name ) {
active = "1"
}
idx := slices .Index (index , name )
ret += fmt .Sprintf ("%s %s\n" +
" |Tick %d\n" , active , name , m .machTime [idx ])
if state .Auto {
ret += " |Auto true\n"
}
if state .Multi {
ret += " |Multi true\n"
}
if state .Add != nil {
ret += " |Add " + utils .J (state .Add ) + "\n"
}
if state .Require != nil {
ret += " |Require " + utils .J (state .Require ) + "\n"
}
if state .Remove != nil {
ret += " |Remove " + utils .J (state .Remove ) + "\n"
}
if state .After != nil {
ret += " |After " + utils .J (state .After ) + "\n"
}
}
return ret
}
func (m *NetworkMachine ) log (level am .LogLevel , msg string , args ...any ) {
if m .semLogger .Level () < level {
return
}
prefix := ""
if m .logId .Load () {
id := m .id
if len (id ) > 5 {
id = id [:5 ]
}
prefix = "[" + id + "] "
msg = prefix + msg
}
out := fmt .Sprintf (msg , args ...)
logger := m .semLogger .Logger ()
if logger != nil {
logger (level , msg , args ...)
} else {
fmt .Println (out )
}
m .logEntriesLock .Lock ()
defer m .logEntriesLock .Unlock ()
m .logEntries = append (m .logEntries , &am .LogEntry {
Level : level ,
Text : out ,
})
}
func (m *NetworkMachine ) MustParseStates (states am .S ) am .S {
m .schemaMx .Lock ()
defer m .schemaMx .Unlock ()
for _ , s := range states {
if !slices .Contains (m .stateNames , s ) {
panic (fmt .Sprintf ("state %s is not defined for %s (via %s)" , s ,
m .remoteId , m .id ))
}
}
return utils .SlicesUniq (states )
}
func (m *NetworkMachine ) Index1 (state string ) int {
return slices .Index (m .StateNames (), state )
}
func (m *NetworkMachine ) Index (states am .S ) []int {
ret := make ([]int , len (states ))
for i , state := range states {
ret [i ] = m .Index1 (state )
}
return ret
}
func (m *NetworkMachine ) Dispose () {
if !m .disposed .CompareAndSwap (false , true ) {
return
}
m .tracersMx .Lock ()
defer m .tracersMx .Unlock ()
for _ , t := range m .tracers {
t .MachineDispose (m .id )
}
for _ , fn := range m .disposeHandlers {
fn (m .id , m .ctx )
}
utils .CloseSafe (m .whenDisposed )
}
func (m *NetworkMachine ) IsDisposed () bool {
return m .disposed .Load ()
}
func (m *NetworkMachine ) WhenDisposed () <-chan struct {} {
return m .whenDisposed
}
func (m *NetworkMachine ) Export () (*am .Serialized , am .Schema , error ) {
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
m .log (am .LogChanges , "[import] exported at %d ticks" , m .time (nil ))
return &am .Serialized {
ID : m .id ,
Time : m .time (nil ),
StateNames : m .StateNames (),
MachineTick : m .machTick ,
QueueTick : m .queueTick ,
}, am .CloneSchema (m .schema ), nil
}
func (m *NetworkMachine ) Schema () am .Schema {
return m .schema
}
func (m *NetworkMachine ) BindHandlers (handlers any ) error {
v := reflect .ValueOf (handlers )
if v .Kind () != reflect .Ptr || v .Elem ().Kind () != reflect .Struct {
return errors .New ("BindTracer expects a pointer to a struct" )
}
name := reflect .TypeOf (handlers ).Elem ().Name ()
m .handlersMx .Lock ()
defer m .handlersMx .Unlock ()
h := newHandler (handlers , name , &v )
m .handlers = append (m .handlers , h )
if name != "" {
m .log (am .LogOps , "[handlers] bind %s" , name )
} else {
m .log (am .LogOps , "[handlers] bind %d" , len (m .handlers ))
}
return nil
}
func (m *NetworkMachine ) HasHandlers () bool {
return len (m .handlers ) > 0
}
func (m *NetworkMachine ) DetachHandlers (handlers any ) error {
old := m .handlers
for _ , h := range old {
if h .h == handlers {
m .handlers = utils .SlicesWithout (old , h )
return nil
}
}
return errors .New ("handlers not bound" )
}
func (m *NetworkMachine ) BindTracer (tracer am .Tracer ) error {
m .tracersMx .Lock ()
defer m .tracersMx .Unlock ()
v := reflect .ValueOf (tracer )
if v .Kind () != reflect .Ptr || v .Elem ().Kind () != reflect .Struct {
return errors .New ("BindTracer expects a pointer to a struct" )
}
name := reflect .TypeOf (tracer ).Elem ().Name ()
m .tracers = append (m .tracers , tracer )
m .log (am .LogOps , "[tracers] bind %s" , name )
return nil
}
func (m *NetworkMachine ) DetachTracer (tracer am .Tracer ) error {
m .tracersMx .Lock ()
defer m .tracersMx .Unlock ()
v := reflect .ValueOf (tracer )
if v .Kind () != reflect .Ptr || v .Elem ().Kind () != reflect .Struct {
return errors .New ("DetachTracer expects a pointer to a struct" )
}
name := reflect .TypeOf (tracer ).Elem ().Name ()
for i , t := range m .tracers {
if t == tracer {
m .tracers = slices .Delete (m .tracers , i , i +1 )
m .log (am .LogOps , "[tracers] detach %s" , name )
return nil
}
}
return errors .New ("tracer not bound" )
}
func (m *NetworkMachine ) Tracers () []am .Tracer {
m .clockMx .Lock ()
defer m .clockMx .Unlock ()
return slices .Clone (m .tracers )
}
func (m *NetworkMachine ) updateClock (
now am .Time , qTick uint64 , machTick uint32 ,
) {
m .tracersMx .Lock ()
defer m .tracersMx .Unlock ()
timeBefore := m .machTime
clockBefore := maps .Clone (m .machClock )
activeBefore := m .ActiveStates (nil )
activeNow := am .S {}
index := m .StateNames ()
for i , state := range index {
if am .IsActiveTick (now [i ]) {
activeNow = append (activeNow , state )
}
}
activeNow = m .activateRequired (activeNow )
activated := am .StatesDiff (activeNow , activeBefore )
deactivated := am .StatesDiff (activeBefore , activeNow )
tx := &am .Transition {
MachApi : m ,
Id : utils .RandId (8 ),
TimeBefore : timeBefore ,
TimeAfter : now ,
Mutation : &am .Mutation {
Type : am .MutationSet ,
Called : m .Index (activeNow ),
Args : nil ,
IsAuto : false ,
},
LogEntries : m .logEntries ,
TargetIndexes : m .Index (activeNow ),
}
tx .IsCompleted .Store (true )
tx .IsSettled .Store (true )
tx .IsAccepted .Store (true )
m .t .Store (tx )
m .logEntries = nil
for _ , t := range m .tracers {
t .TransitionInit (tx )
}
for _ , t := range m .tracers {
t .TransitionStart (tx )
}
m .machTime = now
for idx , tick := range m .machTime {
m .machClock [index [idx ]] = tick
}
m .machTick = machTick
if m .queueTick > qTick {
m .log (am .LogOps , "[queueTick] flushing (%d to %s)" , m .queueTick , qTick )
m .queueFlush ()
}
m .queueTick = qTick
m .activeStates .Store (&activeNow )
m .activeStatesDbg = activeNow
m .processHandlers (activated , deactivated )
m .clockMx .Unlock ()
for _ , t := range m .tracers {
t .TransitionEnd (tx )
}
m .processSubscriptions (activated , deactivated , clockBefore )
m .t .Store (nil )
}
func (m *NetworkMachine ) activateRequired (active am .S ) am .S {
if m .schema == nil {
return active
}
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
ret := slices .Clone (active )
visited := make (map [string ]bool )
var visit func (string )
visit = func (node string ) {
if !visited [node ] {
visited [node ] = true
for _ , reqState := range m .schema [node ].Require {
ret = append (ret , reqState )
visit (reqState )
}
}
}
for _ , state := range active {
visit (state )
}
return utils .SlicesUniq (ret )
}
func (m *NetworkMachine ) getHandlers (locked bool ) []*handler {
if !locked {
m .handlersMx .Lock ()
defer m .handlersMx .Unlock ()
}
return slices .Clone (m .handlers )
}
func (m *NetworkMachine ) processHandlers (activated , deactivated am .S ) {
if len (activated )+len (deactivated ) == 0 {
return
}
for i , h := range m .getHandlers (false ) {
if h == nil {
continue
}
for _ , state := range activated {
m .handle (h , i , state , am .SuffixState )
}
for _ , state := range deactivated {
m .handle (h , i , state , am .SuffixEnd )
}
m .handle (h , i , am .StateAny , am .SuffixState )
}
}
func (m *NetworkMachine ) handle (h *handler , i int , state , suffix string ) {
h .mx .Lock ()
methodName := state + suffix
e := am .NewEvent (nil , m )
e .Name = methodName
e .MachineId = m .remoteId
handlerName := strconv .Itoa (i ) + ":" + h .name
if m .semLogger .Level () >= am .LogEverything {
emitterId := utils .TruncateStr (handlerName , 15 )
emitterId = utils .PadString (strings .ReplaceAll (
emitterId , " " , "_" ), 15 , "_" )
m .log (am .LogEverything , "[handle:%-15s] %s" , emitterId , methodName )
}
_ , ok := h .missingCache [methodName ]
if ok {
h .mx .Unlock ()
return
}
method , ok := h .methodCache [methodName ]
if !ok {
method = h .methods .MethodByName (methodName )
if !method .IsValid () {
method = h .methods .Elem ().FieldByName (methodName )
}
if !method .IsValid () {
h .missingCache [methodName ] = struct {}{}
h .mx .Unlock ()
return
}
h .methodCache [methodName ] = method
}
m .log (am .LogOps , "[handler:%d] %s" , i , methodName )
tx := m .t .Load ()
for i := range m .tracers {
m .tracers [i ].HandlerStart (tx , handlerName , methodName )
}
_ = method .Call ([]reflect .Value {reflect .ValueOf (e )})
for i := range m .tracers {
m .tracers [i ].HandlerEnd (tx , handlerName , methodName )
}
h .mx .Unlock ()
}
func (m *NetworkMachine ) processSubscriptions (
activated , deactivated am .S , clockBefore am .Clock ,
) {
m .clockMx .RLock ()
toCancel := m .subs .ProcessStateCtx (deactivated )
toClose := slices .Concat (
m .subs .ProcessWhen (activated , deactivated ),
m .subs .ProcessWhenTime (clockBefore ),
m .subs .ProcessWhenQueue (m .queueTick ),
m .subs .ProcessWhenQuery (),
)
m .clockMx .RUnlock ()
for _ , cancel := range toCancel {
cancel ()
}
for _ , ch := range toClose {
close (ch )
}
}
func (m *NetworkMachine ) AddBreakpoint1 (
added string , removed string , strict bool ,
) {
}
func (m *NetworkMachine ) AddBreakpoint (
added am .S , removed am .S , strict bool ,
) {
}
func (m *NetworkMachine ) Groups () (map [string ][]int , []string ) {
return nil , nil
}
func (m *NetworkMachine ) CanAdd (states am .S , args am .A ) am .Result {
return am .Executed
}
func (m *NetworkMachine ) CanAdd1 (state string , args am .A ) am .Result {
return am .Executed
}
func (m *NetworkMachine ) CanRemove (states am .S , args am .A ) am .Result {
return am .Executed
}
func (m *NetworkMachine ) CanRemove1 (state string , args am .A ) am .Result {
return am .Executed
}
func (m *NetworkMachine ) Transition () *am .Transition {
return m .t .Load ()
}
func (m *NetworkMachine ) QueueLen () uint16 {
return 0
}
func (m *NetworkMachine ) queueFlush () {
m .subs .QueueFlush ()
}
func (m *NetworkMachine ) QueueTick () uint64 {
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
return m .queueTick
}
func (m *NetworkMachine ) MachineTick () uint32 {
m .clockMx .RLock ()
defer m .clockMx .RUnlock ()
return m .machTick
}
func (m *NetworkMachine ) ParseStates (states am .S ) am .S {
m .schemaMx .Lock ()
defer m .schemaMx .Unlock ()
seen := make (map [string ]struct {})
dups := false
for i := range states {
if _ , ok := m .schema [states [i ]]; !ok {
continue
}
if _ , ok := seen [states [i ]]; !ok {
seen [states [i ]] = struct {}{}
} else {
dups = true
}
}
if dups {
return utils .SlicesUniq (states )
}
return slices .Collect (maps .Keys (seen ))
}
func (m *NetworkMachine ) OnDispose (fn am .HandlerDispose ) {
m .handlersMx .Lock ()
defer m .handlersMx .Unlock ()
m .disposeHandlers = append (m .disposeHandlers , fn )
}
func (m *NetworkMachine ) mutAccepted (
mutType am .MutationType , states am .S ,
) bool {
if m .schema == nil {
return true
}
return true
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .