package machine
import (
"context"
"errors"
"fmt"
"maps"
"math"
"os"
"reflect"
"regexp"
"runtime"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
var _ Api = &Machine {}
type Machine struct {
QueueLimit uint16
HandlerTimeout time .Duration
HandlerDeadline time .Duration
LastHandlerDeadline atomic .Pointer [time .Time ]
HandlerBackoff time .Duration
EvalTimeout time .Duration
LogStackTrace bool
PanicToException bool
DisposeTimeout time .Duration
subs *Subscriptions
panicCaught atomic .Bool
logId atomic .Bool
statesVerified atomic .Bool
id string
ctx context .Context
ctxParent context .Context
parentId string
disposing atomic .Bool
disposed atomic .Bool
queueRunning atomic .Bool
tags atomic .Pointer [[]string ]
tracers []Tracer
tracersMx sync .RWMutex
err atomic .Pointer [error ]
t atomic .Pointer [Transition ]
tDbg *Transition
schema Schema
groups map [string ][]int
groupsOrder []string
schemaMx sync .RWMutex
activeStates S
activeStatesMx sync .RWMutex
queue []*Mutation
queueTick uint64
machineTick uint32
queueTicksPending uint64
queueToken atomic .Uint64
queueMx sync .RWMutex
queueProcessing atomic .Bool
queueLen atomic .Uint32
resolver RelationsResolver
stateNames S
stateNamesExport S
loopLock sync .Mutex
handlers []*handler
handlersMx sync .RWMutex
clock Clock
cancel context .CancelFunc
logLevel atomic .Pointer [LogLevel ]
logger atomic .Pointer [LoggerFn ]
semLogger SemLogger
handlerStart chan *handlerCall
handlerEnd chan bool
handlerPanic chan recoveryData
handlerTimer *time .Timer
logEntriesLock sync .Mutex
logEntries []*LogEntry
logArgs atomic .Pointer [LogArgsMapperFn ]
currentHandler atomic .Value
disposeHandlers []HandlerDispose
timeLast atomic .Pointer [Time ]
whenDisposed chan struct {}
handlerLoopRunning atomic .Bool
handlerLoopVer atomic .Int32
detectEval bool
unlockDisposed atomic .Bool
breakpointsMx sync .Mutex
breakpoints []*breakpoint
onError atomic .Pointer [HandlerError ]
onChange atomic .Pointer [HandlerChange ]
}
func NewCommon (
ctx context .Context , id string , stateSchema Schema , stateNames S ,
handlers any , parent Api , opts *Opts ,
) (*Machine , error ) {
machOpts := &Opts {Id : id }
if opts != nil {
machOpts = opts
machOpts .Id = id
}
if os .Getenv (EnvAmDebug ) != "" {
machOpts = OptsWithDebug (machOpts )
} else if os .Getenv (EnvAmTestRunner ) != "" {
machOpts .HandlerTimeout = 1 * time .Second
}
if parent != nil {
machOpts .Parent = parent
}
if machOpts .LogArgs == nil {
machOpts .LogArgs = NewLogArgsMapper (0 , LogArgs )
}
mach := New (ctx , stateSchema , machOpts )
err := mach .VerifyStates (stateNames )
if err != nil {
return nil , err
}
if handlers != nil {
err = mach .BindHandlers (handlers )
if err != nil {
return nil , err
}
}
return mach , nil
}
func New (ctx context .Context , schema Schema , opts *Opts ) *Machine {
parsedStates , err := ParseSchema (schema )
m := &Machine {
HandlerTimeout : 100 * time .Millisecond ,
HandlerDeadline : 10 * time .Second ,
HandlerBackoff : 3 * time .Second ,
EvalTimeout : time .Second ,
LogStackTrace : true ,
PanicToException : true ,
QueueLimit : 1000 ,
DisposeTimeout : time .Second ,
id : randId (),
schema : parsedStates ,
clock : Clock {},
handlers : []*handler {},
handlerStart : make (chan *handlerCall ),
handlerEnd : make (chan bool ),
handlerPanic : make (chan recoveryData ),
handlerTimer : time .NewTimer (24 * time .Hour ),
whenDisposed : make (chan struct {}),
queueTick : 1 ,
}
m .subs = NewSubscriptionManager (m , m .clock , m .is , m .not , m .log )
m .semLogger = &semLogger {mach : m }
m .logId .Store (true )
m .timeLast .Store (&Time {})
lvl := LogNothing
m .logLevel .Store (&lvl )
opts = cloneOptions (opts )
var parent Api
if opts != nil {
if opts .Id != "" {
m .id = opts .Id
}
if opts .HandlerTimeout != 0 {
m .HandlerTimeout = opts .HandlerTimeout
}
if opts .HandlerDeadline != 0 {
m .HandlerDeadline = opts .HandlerDeadline
}
if opts .HandlerBackoff != 0 {
m .HandlerBackoff = opts .HandlerBackoff
}
if opts .DontPanicToException {
m .PanicToException = false
}
if opts .DontLogStackTrace {
m .LogStackTrace = false
}
if opts .DontLogId {
m .logId .Store (false )
}
if opts .Resolver != nil {
m .resolver = opts .Resolver
}
if opts .LogLevel != LogNothing {
m .semLogger .SetLevel (opts .LogLevel )
}
if opts .Tracers != nil {
m .tracers = opts .Tracers
}
if opts .LogArgs != nil {
m .logArgs .Store (&opts .LogArgs )
}
if opts .QueueLimit != 0 {
m .QueueLimit = opts .QueueLimit
}
if len (opts .Tags ) > 0 {
tags := slicesUniq (opts .Tags )
m .tags .Store (&tags )
}
m .detectEval = opts .DetectEval
parent = opts .Parent
m .parentId = opts .ParentId
}
if os .Getenv (EnvAmDetectEval ) != "" {
m .detectEval = true
}
if m .resolver == nil {
m .resolver = &DefaultRelationsResolver {
Machine : m ,
}
}
if _ , ok := m .schema [StateException ]; !ok {
m .schema [StateException ] = State {
Multi : true ,
}
}
for name := range m .schema {
m .stateNames = append (m .stateNames , name )
slices .Sort (m .stateNames )
m .clock [name ] = 0
}
m .resolver .NewSchema (m .schema , m .stateNames )
if ctx == nil {
ctx = context .TODO ()
}
m .ctxParent = ctx
m .ctx , m .cancel = context .WithCancel (m .ctxParent )
if parent != nil {
m .parentId = parent .Id ()
pTracers := parent .Tracers ()
for _ , t := range pTracers {
t .NewSubmachine (parent , m )
}
}
for i := range m .tracers {
ctxTr := m .tracers [i ].MachineInit (m )
if ctxTr != nil {
m .ctx = ctxTr
}
}
if err != nil {
m .AddErr (err , nil )
}
return m
}
func (m *Machine ) OnDispose (fn HandlerDispose ) {
m .handlersMx .Lock ()
defer m .handlersMx .Unlock ()
m .disposeHandlers = append (m .disposeHandlers , fn )
}
func (m *Machine ) Dispose () {
if m .Has1 (StateStart ) {
m .Remove1 (StateStart , nil )
time .Sleep (100 * time .Millisecond )
}
go func () {
if m .disposing .Load () {
m .log (LogDecisions , "[Dispose] already disposed" )
return
}
m .queueProcessing .Store (false )
m .unlockDisposed .Store (true )
m .doDispose (false )
}()
}
func (m *Machine ) IsDisposed () bool {
return m .disposed .Load ()
}
func (m *Machine ) DisposeForce () {
m .doDispose (true )
}
func (m *Machine ) doDispose (force bool ) {
if m .disposed .Load () {
return
}
if !m .disposing .CompareAndSwap (false , true ) {
return
}
if !force {
whenIdle := m .WhenQueueEnds ()
select {
case <- time .After (m .DisposeTimeout ):
m .log (LogDecisions , "[doDispose] timeout waiting for queue to drain" )
case <- whenIdle :
}
}
if !m .disposed .CompareAndSwap (false , true ) {
return
}
m .tracersMx .RLock ()
for i := range m .tracers {
m .tracers [i ].MachineDispose (m .Id ())
}
m .tracersMx .RUnlock ()
if !force {
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
m .subs .Mx .Lock ()
defer m .subs .Mx .Unlock ()
m .tracersMx .Lock ()
defer m .tracersMx .Unlock ()
m .handlersMx .Lock ()
defer m .handlersMx .Unlock ()
m .queueMx .Lock ()
defer m .queueMx .Unlock ()
}
m .log (LogEverything , "[end] doDispose" )
if m .Err () == nil && m .ctx .Err () != nil {
err := m .ctx .Err ()
m .err .Store (&err )
}
m .handlers = nil
go func () {
time .Sleep (100 * time .Millisecond )
m .loopLock .Lock ()
defer m .loopLock .Unlock ()
closeSafe (m .handlerEnd )
closeSafe (m .handlerPanic )
closeSafe (m .handlerStart )
}()
m .subs .dispose ()
for _ , mut := range m .queue {
if !mut .IsCheck {
continue
}
if done , ok := mut .Args [argCheckDone ].(*CheckDone ); ok {
closeSafe (done .Ch )
}
}
time .Sleep (100 * time .Millisecond )
if m .handlerTimer != nil {
m .handlerTimer .Stop ()
}
if m .unlockDisposed .Load () {
m .unlockDisposed .Store (false )
m .queueProcessing .Store (false )
}
for _ , fn := range m .disposeHandlers {
fn (m .id , m .ctx )
}
m .cancel ()
closeSafe (m .whenDisposed )
}
func (m *Machine ) getHandlers (locked bool ) []*handler {
if !locked {
m .handlersMx .Lock ()
defer m .handlersMx .Unlock ()
}
return slices .Clone (m .handlers )
}
func (m *Machine ) setHandlers (locked bool , handlers []*handler ) {
if !locked {
m .handlersMx .Lock ()
defer m .handlersMx .Unlock ()
}
m .handlers = handlers
}
func (m *Machine ) WhenErr (disposeCtx context .Context ) <-chan struct {} {
return m .When ([]string {StateException }, disposeCtx )
}
func (m *Machine ) When (states S , ctx context .Context ) <-chan struct {} {
if m .disposed .Load () {
return newClosedChan ()
}
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
return m .subs .When (m .mustParseStates (states ), ctx )
}
func (m *Machine ) When1 (state string , ctx context .Context ) <-chan struct {} {
return m .When (S {state }, ctx )
}
func (m *Machine ) WhenNot (states S , ctx context .Context ) <-chan struct {} {
if m .disposed .Load () {
ch := make (chan struct {})
close (ch )
return ch
}
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
return m .subs .WhenNot (m .mustParseStates (states ), ctx )
}
func (m *Machine ) WhenNot1 (state string , ctx context .Context ) <-chan struct {} {
return m .WhenNot (S {state }, ctx )
}
func (m *Machine ) WhenTime (
states S , times Time , ctx context .Context ,
) <-chan struct {} {
if m .disposed .Load () {
return newClosedChan ()
}
if len (states ) != len (times ) {
err := fmt .Errorf (
"whenTime: states and times must have the same length (%s)" , j (states ))
m .AddErr (err , nil )
return newClosedChan ()
}
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
return m .subs .WhenTime (states , times , ctx )
}
func (m *Machine ) WhenTime1 (
state string , ticks uint64 , ctx context .Context ,
) <-chan struct {} {
return m .WhenTime (S {state }, Time {ticks }, ctx )
}
func (m *Machine ) WhenTicks (
state string , ticks int , ctx context .Context ,
) <-chan struct {} {
return m .WhenTime (S {state }, Time {uint64 (ticks ) + m .Tick (state )}, ctx )
}
func (m *Machine ) WhenQuery (
clockCheck func (clock Clock ) bool , ctx context .Context ,
) <-chan struct {} {
if m .disposed .Load () {
return newClosedChan ()
}
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
return m .subs .WhenQuery (clockCheck , ctx )
}
func (m *Machine ) WhenQueueEnds () <-chan struct {} {
if m .disposed .Load () || !m .queueRunning .Load () {
return newClosedChan ()
}
m .queueMx .Lock ()
defer m .queueMx .Unlock ()
return m .subs .WhenQueueEnds ()
}
func (m *Machine ) WhenQueue (tick Result ) <-chan struct {} {
if m .disposed .Load () {
return newClosedChan ()
}
m .queueMx .Lock ()
defer m .queueMx .Unlock ()
if m .queueTick >= uint64 (tick ) {
return newClosedChan ()
}
return m .subs .WhenQueue (tick )
}
func (m *Machine ) WhenArgs (
state string , args A , ctx context .Context ,
) <-chan struct {} {
if m .disposed .Load () {
return newClosedChan ()
}
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
states := m .mustParseStates (S {state })
return m .subs .WhenArgs (states [0 ], args , ctx )
}
func (m *Machine ) WhenDisposed () <-chan struct {} {
return m .whenDisposed
}
func (m *Machine ) QueueTick () uint64 {
m .queueMx .Lock ()
defer m .queueMx .Unlock ()
return m .queueTick
}
func (m *Machine ) MachineTick () uint32 {
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
return m .machineTick
}
func (m *Machine ) Time (states S ) Time {
if m .disposed .Load () {
return nil
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
return m .time (states )
}
func (m *Machine ) time (states S ) Time {
if m .disposed .Load () {
return nil
}
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
if states == nil {
states = m .stateNames
}
ret := make (Time , len (states ))
for i , s := range states {
ret [i ] = m .clock [s ]
}
return ret
}
func (m *Machine ) PrependMut (mut *Mutation ) Result {
if m .disposing .Load () {
return Canceled
}
isEval := mut .Type == mutationEval
statesParsed := m .mustParseStates (IndexToStates (m .StateNames (), mut .Called ))
m .log (LogOps , "[prepend:%s] %s%s" , mut .Type , j (statesParsed ),
mut .LogArgs (m .SemLogger ().ArgsMapper ()))
m .queueMx .Lock ()
if !isEval {
mut .cacheCalled .Store (&statesParsed )
}
if m .id == "ns-TestManyStates" {
print ()
}
m .queue = append ([]*Mutation {mut }, m .queue ...)
lenQ := len (m .queue )
m .queueLen .Store (uint32 (lenQ ))
if !isEval {
mut .QueueLen = int32 (lenQ )
mut .QueueToken = m .queueToken .Add (1 )
mut .QueueTickNow = m .queueTick
}
m .queueMx .Unlock ()
if !isEval {
m .tracersMx .RLock ()
for i := 0 ; !m .disposing .Load () && i < len (m .tracers ); i ++ {
m .tracers [i ].MutationQueued (m , mut )
}
m .tracersMx .RUnlock ()
}
res := m .processQueue ()
return res
}
func (m *Machine ) Add (states S , args A ) Result {
if m .disposing .Load () || m .Backoff () {
return Canceled
}
if uint16 (m .queueLen .Load ()) >= m .QueueLimit {
if !slices .Contains (states , StateException ) || m .IsErr () {
return Canceled
}
}
queueTick := m .queueMutation (MutationAdd , states , args , nil )
if queueTick == uint64 (Executed ) {
return Executed
}
m .breakpoint (states , nil )
res := m .processQueue ()
if res == Queued {
return Result (queueTick )
}
return res
}
func (m *Machine ) Add1 (state string , args A ) Result {
return m .Add (S {state }, args )
}
func (m *Machine ) Toggle (states S , args A ) Result {
if m .disposing .Load () {
return Canceled
}
if m .Is (states ) {
return m .Remove (states , args )
}
return m .Add (states , args )
}
func (m *Machine ) Toggle1 (state string , args A ) Result {
if m .disposing .Load () {
return Canceled
}
if m .Is1 (state ) {
return m .Remove1 (state , args )
}
return m .Add1 (state , args )
}
func (m *Machine ) EvToggle (e *Event , states S , args A ) Result {
if m .disposing .Load () {
return Canceled
}
if m .Is (states ) {
return m .EvRemove (e , states , args )
}
return m .EvAdd (e , states , args )
}
func (m *Machine ) EvToggle1 (e *Event , state string , args A ) Result {
if m .disposing .Load () {
return Canceled
}
if m .disposing .Load () {
return Canceled
}
if m .Is1 (state ) {
return m .EvRemove1 (e , state , args )
}
return m .EvAdd1 (e , state , args )
}
func (m *Machine ) AddErr (err error , args A ) Result {
return m .AddErrState (StateException , err , args )
}
func (m *Machine ) AddErrState (state string , err error , args A ) Result {
if m .disposing .Load () || m .Backoff () || err == nil {
return Canceled
}
m .err .Store (&err )
var trace string
if m .LogStackTrace {
trace = captureStackTrace ()
}
argsT := &AT {
Err : err ,
ErrTrace : trace ,
}
if onErr := m .onError .Load (); onErr != nil {
(*onErr )(m , err )
}
return m .Add (S {state , StateException }, PassMerge (args , argsT ))
}
func (m *Machine ) CanAdd (states S , args A ) Result {
if m .disposing .Load () || m .Backoff () {
return Canceled
}
return m .PrependMut (&Mutation {
Type : MutationAdd ,
Called : m .Index (states ),
Args : args ,
IsCheck : true ,
})
}
func (m *Machine ) CanAdd1 (state string , args A ) Result {
return m .CanAdd (S {state }, args )
}
func (m *Machine ) CanRemove (states S , args A ) Result {
if m .disposing .Load () || m .Backoff () {
return Canceled
}
return m .PrependMut (&Mutation {
Type : MutationRemove ,
Called : m .Index (states ),
Args : args ,
IsCheck : true ,
})
}
func (m *Machine ) CanRemove1 (state string , args A ) Result {
return m .CanRemove (S {state }, nil )
}
func (m *Machine ) PanicToErr (args A ) {
if !m .PanicToException || m .disposing .Load () {
return
}
r := recover ()
if r == nil {
return
}
if err , ok := r .(error ); ok {
m .AddErr (err , args )
} else {
m .AddErr (fmt .Errorf ("%v" , err ), args )
}
}
func (m *Machine ) PanicToErrState (state string , args A ) {
if !m .PanicToException || m .disposing .Load () {
return
}
r := recover ()
if r == nil {
return
}
if err , ok := r .(error ); ok {
m .AddErrState (state , err , args )
} else {
m .AddErrState (state , fmt .Errorf ("%v" , err ), args )
}
}
func (m *Machine ) IsErr () bool {
return m .Is (S {StateException })
}
func (m *Machine ) Err () error {
err := m .err .Load ()
if err == nil {
return nil
}
return *err
}
func (m *Machine ) Remove (states S , args A ) Result {
if m .disposing .Load () || m .Backoff () {
return Canceled
}
if uint16 (m .queueLen .Load ()) >= m .QueueLimit {
if !slices .Contains (states , StateException ) || !m .IsErr () {
return Canceled
}
}
m .queueMx .RLock ()
lenQueue := len (m .queue )
var statesAny []S
for _ , name := range states {
statesAny = append (statesAny , S {name })
}
if lenQueue == 0 && m .Transition () != nil && !m .Any (statesAny ...) {
m .queueMx .RUnlock ()
return Executed
}
m .queueMx .RUnlock ()
queueTick := m .queueMutation (MutationRemove , states , args , nil )
if queueTick == uint64 (Executed ) {
return Executed
}
m .breakpoint (nil , states )
res := m .processQueue ()
if res == Queued {
return Result (queueTick + 1 )
}
return res
}
func (m *Machine ) Remove1 (state string , args A ) Result {
return m .Remove (S {state }, args )
}
func (m *Machine ) Set (states S , args A ) Result {
if m .disposing .Load () || uint16 (m .queueLen .Load ()) >= m .QueueLimit {
return Canceled
}
queueTick := m .queueMutation (MutationSet , states , args , nil )
if queueTick == uint64 (Executed ) {
return Executed
}
res := m .processQueue ()
if res == Queued {
return Result (queueTick + 1 )
}
return res
}
func (m *Machine ) Id () string {
return m .id
}
func (m *Machine ) ParentId () string {
return m .parentId
}
func (m *Machine ) Tags () []string {
tags := m .tags .Load ()
if tags != nil {
return slices .Clone (*tags )
}
return nil
}
func (m *Machine ) SetTags (tags []string ) {
m .tags .Store (&tags )
}
func (m *Machine ) Is (states S ) bool {
if m .disposing .Load () {
return false
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
return m .is (states )
}
func (m *Machine ) Is1 (state string ) bool {
return m .Is (S {state })
}
func (m *Machine ) is (states S ) bool {
if m .disposing .Load () {
return false
}
activeStates := m .activeStates
for _ , s := range states {
if !slices .Contains (m .stateNames , s ) {
m .log (LogDecisions , "[is] state %s not found" , s )
return false
}
if !slices .Contains (activeStates , s ) {
return false
}
}
return true
}
func (m *Machine ) Not (states S ) bool {
if m .disposing .Load () {
return false
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
return m .not (states )
}
func (m *Machine ) not (states S ) bool {
return slicesNone (m .mustParseStates (states ), m .activeStates )
}
func (m *Machine ) Not1 (state string ) bool {
return m .Not (S {state })
}
func (m *Machine ) Any (states ...S ) bool {
for _ , s := range states {
if m .Is (s ) {
return true
}
}
return false
}
func (m *Machine ) Any1 (states ...string ) bool {
for _ , s := range states {
if m .Is1 (s ) {
return true
}
}
return false
}
func (m *Machine ) queueMutation (
mutType MutationType , states S , args A , event *Event ,
) uint64 {
statesParsed := m .mustParseStates (states )
multi := false
for _ , state := range statesParsed {
if m .schemaSafe ()[state ].Multi {
multi = true
break
}
}
if !multi && len (args ) == 0 &&
m .detectQueueDuplicates (mutType , statesParsed , false ) {
m .log (LogOps , "[queue:skipped] Duplicate detected for [%s] %s" ,
mutType , j (statesParsed ))
return uint64 (Executed )
}
if args == nil {
args = A {}
}
var source *MutSource
if event != nil {
tx := event .Transition ()
source = &MutSource {
MachId : event .MachineId ,
TxId : event .TransitionId ,
}
if tx != nil {
source .MachTime = tx .TimeBefore .Sum (nil )
}
}
mut := &Mutation {
Type : mutType ,
Called : m .Index (statesParsed ),
Args : args ,
Source : source ,
}
mut .cacheCalled .Store (&statesParsed )
m .queueMx .Lock ()
if m .id == "ns-TestManyStates" {
print ()
}
m .queue = append (m .queue , mut )
lenQ := len (m .queue )
m .queueLen .Store (uint32 (lenQ ))
m .queueTicksPending += 1
mut .QueueLen = int32 (lenQ )
mut .QueueTick = m .queueTicksPending + m .queueTick
mut .QueueTickNow = m .queueTick
m .queueMx .Unlock ()
m .log (LogOps , "[queue:%s] %s%s" , mutType , j (statesParsed ),
mut .LogArgs (m .SemLogger ().ArgsMapper ()))
m .tracersMx .RLock ()
for i := 0 ; !m .disposing .Load () && i < len (m .tracers ); i ++ {
m .tracers [i ].MutationQueued (m , mut )
}
m .tracersMx .RUnlock ()
if mut .Type == MutationAdd {
m .breakpoint (statesParsed , nil )
} else if mutType == MutationRemove {
m .breakpoint (nil , statesParsed )
}
return mut .QueueTick
}
func (m *Machine ) Eval (source string , fn func (), ctx context .Context ) bool {
if m .disposing .Load () {
return false
}
if source == "" {
panic ("error: source of eval is required" )
}
if m .detectEval {
trace := captureStackTrace ()
for i := 0 ; !m .disposing .Load () && i < len (m .handlers ); i ++ {
handler := m .handlers [i ]
for _ , method := range handler .methodNames {
match := fmt .Sprintf (".(*%s).%s(" , handler .name , method )
for _ , line := range strings .Split (trace , "\n" ) {
if !strings .Contains (line , match ) {
continue
}
msg := fmt .Sprintf ("error: Eval() called directly in handler %s.%s" ,
handler .name , method )
panic (msg )
}
}
}
}
m .log (LogOps , "[eval] %s" , source )
done := make (chan struct {})
canceled := atomic .Bool {}
wrap := func () {
defer close (done )
if canceled .Load () {
return
}
fn ()
}
if ctx == nil {
ctx = context .Background ()
}
mut := &Mutation {
Type : mutationEval ,
eval : wrap ,
evalSource : source ,
ctx : ctx ,
}
_ = m .PrependMut (mut )
select {
case <- time .After (m .EvalTimeout ):
canceled .Store (true )
m .log (LogOps , "[eval:timeout] %s" , source )
m .AddErr (fmt .Errorf ("%w: eval:%s" , ErrEvalTimeout , source ), nil )
return false
case <- m .ctx .Done ():
canceled .Store (true )
return false
case <- ctx .Done ():
canceled .Store (true )
m .log (LogDecisions , "[eval:ctxdone] %s" , source )
return false
case <- done :
}
m .log (LogEverything , "[eval:end] %s" , source )
return true
}
func (m *Machine ) NewStateCtx (state string ) context .Context {
if m .disposing .Load () {
return context .TODO ()
}
m .mustParseStates (S {state })
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
return m .subs .NewStateCtx (state )
}
func (m *Machine ) MustBindHandlers (handlers any ) {
if err := m .BindHandlers (handlers ); err != nil {
panic (err )
}
}
func (m *Machine ) BindHandlers (handlers any ) error {
if m .disposing .Load () {
return nil
}
first := false
if !m .handlerLoopRunning .Load () {
first = true
m .handlerLoopRunning .Store (true )
go m .handlerLoop ()
}
v := reflect .ValueOf (handlers )
if v .Kind () != reflect .Ptr || v .Elem ().Kind () != reflect .Struct {
return errors .New ("BindHandlers expects a pointer to a struct" )
}
name := reflect .TypeOf (handlers ).Elem ().Name ()
if name == "" {
name = "anon"
if os .Getenv (EnvAmDebug ) != "" {
buf := make ([]byte , 4024 )
n := runtime .Stack (buf , false )
stack := string (buf [:n ])
lines := strings .Split (stack , "\n" )
name = lines [len (lines )-2 ]
name = strings .TrimLeft (emitterNameRe .FindString (name ), "/" )
}
}
var methodNames []string
if m .detectEval || os .Getenv (EnvAmDebug ) != "" {
var err error
methodNames , err = ListHandlers (handlers , m .stateNames )
if err != nil {
return fmt .Errorf ("listing handlers: %w" , err )
}
}
h := m .newHandler (handlers , name , &v , methodNames )
old := m .getHandlers (false )
m .setHandlers (false , append (old , h ))
if name != "" {
m .log (LogOps , "[handlers] bind %s" , name )
} else {
m .log (LogOps , "[handlers] bind %d" , len (old ))
}
if first && m .IsErr () {
m .AddErr (m .Err (), nil )
}
return nil
}
func (m *Machine ) DetachHandlers (handlers any ) error {
if m .disposing .Load () {
return nil
}
m .handlersMx .Lock ()
defer m .handlersMx .Unlock ()
old := m .getHandlers (true )
var match *handler
var matchIndex int
for i , h := range old {
if h .h == handlers {
match = h
matchIndex = i
break
}
}
if match == nil {
return errors .New ("handlers not bound" )
}
m .setHandlers (true , slices .Delete (old , matchIndex , matchIndex +1 ))
match .dispose ()
m .log (LogOps , "[handlers] detach %s" , match .name )
return nil
}
func (m *Machine ) HasHandlers () bool {
m .handlersMx .RLock ()
defer m .handlersMx .RUnlock ()
return len (m .handlers ) > 0
}
func (m *Machine ) newHandler (
handlers any , name string , methods *reflect .Value , methodNames []string ,
) *handler {
if m .disposing .Load () {
return &handler {}
}
e := &handler {
name : name ,
h : handlers ,
methods : methods ,
methodNames : methodNames ,
methodCache : make (map [string ]reflect .Value ),
missingCache : make (map [string ]struct {}),
}
return e
}
func (m *Machine ) recoverToErr (handler *handler , r recoveryData ) {
if m .disposing .Load () {
return
}
m .panicCaught .Store (true )
m .currentHandler .Store ("" )
t := m .t .Load ()
index := m .StateNames ()
iException := m .Index1 (StateException )
mut := t .Mutation
if mut .IsCalled (iException ) {
return
}
m .log (LogOps , "[recover] handling panic..." )
err := fmt .Errorf ("%s" , r .err )
m .err .Store (&err )
if t .latestHandlerIsFinal {
m .recoverFinalPhase ()
}
m .log (LogOps , "[cancel] (%s) by recover" , j (t .TargetStates ()))
t .IsAccepted .Store (false )
t .IsCompleted .Store (true )
errMut := &Mutation {
Type : MutationAdd ,
Called : m .Index (S {StateException }),
Args : Pass (&AT {
Err : err ,
ErrTrace : r .stack ,
Panic : &ExceptionArgsPanic {
CalledStates : IndexToStates (index , mut .Called ),
StatesBefore : t .StatesBefore (),
Transition : t ,
},
}),
}
m .PrependMut (errMut )
go m .handlerLoop ()
}
func (m *Machine ) recoverFinalPhase () {
t := m .t .Load ()
finals := slices .Concat (t .Exits , t .Enters )
m .activeStatesMx .RLock ()
activeStates := m .activeStates
m .activeStatesMx .RUnlock ()
found := false
for _ , s := range finals {
if t .latestHandlerToState == s {
found = true
}
if !found {
continue
}
if t .latestHandlerIsEnter {
activeStates = slicesWithout (activeStates , s )
} else {
activeStates = append (activeStates , s )
}
}
m .log (LogOps , "[recover] partial final states as (%s)" ,
j (activeStates ))
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
m .setActiveStates (t .CalledStates (), activeStates , t .IsAuto ())
}
func (m *Machine ) mustParseStates (states S ) S {
if m .disposing .Load () {
return nil
}
seen := make (map [string ]struct {})
dups := false
for i := range states {
if _ , ok := m .schemaSafe ()[states [i ]]; !ok {
panic (fmt .Errorf (
"%w: %s not defined in schema for %s" , ErrStateMissing ,
states [i ], m .id ))
}
if _ , ok := seen [states [i ]]; !ok {
seen [states [i ]] = struct {}{}
} else {
dups = true
}
}
if dups {
return slicesUniq (states )
}
return states
}
func (m *Machine ) ParseStates (states S ) S {
if m .disposing .Load () {
return nil
}
seen := make (map [string ]struct {})
dups := false
for i := range states {
if _ , ok := m .schemaSafe ()[states [i ]]; !ok {
continue
}
if _ , ok := seen [states [i ]]; !ok {
seen [states [i ]] = struct {}{}
} else {
dups = true
}
}
if dups {
return slicesUniq (states )
}
return slices .Collect (maps .Keys (seen ))
}
func (m *Machine ) VerifyStates (states S ) error {
if m .disposing .Load () {
return nil
}
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
return m .verifyStates (states )
}
func (m *Machine ) verifyStates (states S ) error {
var errs []error
var checked []string
for _ , s := range states {
if _ , ok := m .schema [s ]; !ok {
err := fmt .Errorf ("state %s not defined in schema for %s" , s , m .id )
errs = append (errs , err )
continue
}
checked = append (checked , s )
}
if len (errs ) > 1 {
return errors .Join (errs ...)
} else if len (errs ) == 1 {
return errs [0 ]
}
if len (m .stateNames ) > len (states ) {
missing := StatesDiff (m .stateNames , checked )
return fmt .Errorf (
"error: trying to verify less states than registered for %s; " +
"missing: %s" , m .id , j (missing ))
}
m .stateNames = slicesUniq (states )
m .stateNamesExport = nil
m .statesVerified .Store (true )
m .tracersMx .RLock ()
for i := 0 ; !m .disposing .Load () && i < len (m .tracers ); i ++ {
m .tracers [i ].VerifyStates (m )
}
m .tracersMx .RUnlock ()
return nil
}
func (m *Machine ) StatesVerified () bool {
return m .statesVerified .Load ()
}
func (m *Machine ) setActiveStates (
calledStates S , targetStates S , isAuto bool ,
) S {
if m .disposing .Load () {
return S {}
}
previous := m .activeStates
newStates := StatesDiff (targetStates , m .activeStates )
removedStates := StatesDiff (m .activeStates , targetStates )
noChangeStates := StatesDiff (targetStates , newStates )
m .activeStates = slices .Clone (targetStates )
for _ , state := range targetStates {
data := m .schemaSafe ()[state ]
if !slices .Contains (previous , state ) {
m .clock [state ]++
} else if slices .Contains (calledStates , state ) && data .Multi {
m .clock [state ] += 2
newStates = append (newStates , state )
}
}
for _ , state := range removedStates {
m .clock [state ]++
}
if m .SemLogger ().Level () >= LogExternal {
logMsg := ""
if len (newStates ) > 0 {
logMsg += " +" + strings .Join (newStates , " +" )
}
if len (removedStates ) > 0 {
logMsg += " -" + strings .Join (removedStates , " -" )
}
if len (noChangeStates ) > 0 && m .semLogger .Level () > LogDecisions {
logMsg += " " + j (noChangeStates )
}
if len (logMsg ) > 0 {
label := "state"
if isAuto {
label = "auto_"
}
args := m .t .Load ().Mutation .LogArgs (m .SemLogger ().ArgsMapper ())
m .log (LogChanges , "[" +label +"]" +logMsg +args )
}
}
return previous
}
func (m *Machine ) AddBreakpoint1 (added string , removed string , strict bool ) {
if added != "" {
m .AddBreakpoint (S {added }, nil , strict )
} else if removed != "" {
m .AddBreakpoint (nil , S {removed }, strict )
} else {
m .log (LogOps , "[breakpoint] invalid" )
}
}
func (m *Machine ) AddBreakpoint (added S , removed S , strict bool ) {
m .breakpointsMx .Lock ()
defer m .breakpointsMx .Unlock ()
m .breakpoints = append (m .breakpoints , &breakpoint {
Added : added ,
Removed : removed ,
Strict : strict ,
})
}
func (m *Machine ) breakpoint (added S , removed S ) {
m .breakpointsMx .Lock ()
defer m .breakpointsMx .Unlock ()
found := false
for _ , bp := range m .breakpoints {
if len (added ) > 0 && !slices .Equal (bp .Added , added ) {
continue
}
if len (removed ) > 0 && !slices .Equal (bp .Removed , removed ) {
continue
}
if bp .Strict {
if len (bp .Added ) > 0 && m .Is (bp .Added ) {
continue
}
if len (bp .Removed ) > 0 && m .Not (bp .Removed ) {
continue
}
}
found = true
}
if !found {
return
}
if m .LogStackTrace {
m .log (LogChanges , "[breakpoint] Machine.breakpoint\n%s" ,
captureStackTrace ())
} else {
m .log (LogChanges , "[breakpoint] Machine.breakpoint" )
}
}
func (m *Machine ) processQueue () Result {
if m .queueLen .Load () == 0 || m .disposing .Load () {
return Canceled
}
if !m .queueProcessing .CompareAndSwap (false , true ) {
m .queueMx .Lock ()
defer m .queueMx .Unlock ()
label := "items"
if len (m .queue ) == 1 {
label = "item"
}
m .log (LogOps , "[postpone] queue running (%d %s)" , len (m .queue ), label )
return Queued
}
var ret []Result
m .queueRunning .Store (false )
for m .queueLen .Load () > 0 {
m .queueRunning .Store (true )
if m .disposing .Load () {
return Canceled
}
m .queueMx .Lock ()
lenQ := len (m .queue )
if lenQ < 1 {
m .Log ("ERROR: missing queue item" )
return Canceled
}
mut := m .queue [0 ]
m .queue = m .queue [1 :]
m .queueLen .Store (uint32 (lenQ - 1 ))
if mut .QueueTick > 0 {
m .queueTicksPending -= 1
m .queueTick += 1
}
m .queueMx .Unlock ()
if mut .ctx != nil && mut .ctx .Err () != nil {
ret = append (ret , Executed )
continue
}
if mut .Type == mutationEval {
m .Log ("eval: " + mut .evalSource )
mut .eval ()
continue
}
t := newTransition (m , mut )
m .schemaMx .RLock ()
ret = append (ret , t .emitEvents ())
m .timeLast .Store (&t .TimeAfter )
m .schemaMx .RUnlock ()
if t .Mutation .IsCheck {
if done , ok := mut .Args [argCheckDone ].(*CheckDone ); ok {
done .Canceled = t .IsAccepted .Load ()
closeSafe (done .Ch )
}
} else if t .IsAccepted .Load () && !t .Mutation .IsCheck {
m .processSubscriptions (t )
}
t .CleanCache ()
}
m .t .Store (nil )
m .queueProcessing .Store (false )
m .queueRunning .Store (false )
m .tracersMx .RLock ()
for i := 0 ; !m .disposing .Load () && i < len (m .tracers ); i ++ {
m .tracers [i ].QueueEnd (m )
}
m .tracersMx .RUnlock ()
m .queueMx .Lock ()
for _ , ch := range m .subs .ProcessWhenQueueEnds () {
closeSafe (ch )
}
m .queueMx .Unlock ()
if len (ret ) == 0 {
return Canceled
}
return ret [0 ]
}
func (m *Machine ) processSubscriptions (t *Transition ) {
m .activeStatesMx .RLock ()
toCancel := m .subs .ProcessStateCtx (t .cacheDeactivated )
toClose := slices .Concat (
m .subs .ProcessWhen (t .cacheActivated , t .cacheDeactivated ),
m .subs .ProcessWhenTime (t .ClockBefore ()),
m .subs .ProcessWhenQueue (m .queueTick ),
m .subs .ProcessWhenQuery (),
)
m .activeStatesMx .RUnlock ()
for _ , cancel := range toCancel {
cancel ()
}
for _ , ch := range toClose {
closeSafe (ch )
}
}
func (m *Machine ) Context () context .Context {
return m .ctxParent
}
func (m *Machine ) Log (msg string , args ...any ) {
if m .disposing .Load () {
return
}
prefix := "[exter"
msg = strings .ReplaceAll (msg , "\n" , " " )
m .log (LogExternal , prefix +"] " +msg , args ...)
}
func (m *Machine ) log (level LogLevel , msg string , args ...any ) {
if level > m .semLogger .Level () || m .disposing .Load () {
return
}
prefix := ""
if m .logId .Load () {
id := m .id
if len (id ) > 5 {
id = id [:5 ]
}
prefix = "[" + id + "] "
msg = prefix + msg
}
out := fmt .Sprintf (msg , args ...)
logger := m .semLogger .Logger ()
if logger != nil {
logger (level , msg , args ...)
} else {
fmt .Println (out )
}
t := m .Transition ()
if t != nil && !t .IsCompleted .Load () {
t .InternalLogEntriesLock .Lock ()
defer t .InternalLogEntriesLock .Unlock ()
t .LogEntries = append (t .LogEntries , &LogEntry {level , out })
} else {
m .logEntriesLock .Lock ()
defer m .logEntriesLock .Unlock ()
if len (m .logEntries ) > 0 && m .logEntries [len (m .logEntries )-1 ].Text == out &&
!strings .HasPrefix (out , prefix +"[pipe-" ) {
return
}
m .logEntries = append (m .logEntries , &LogEntry {
Level : level ,
Text : out ,
})
}
}
func (m *Machine ) SemLogger () SemLogger {
return m .semLogger
}
func (m *Machine ) handle (
name string , args A , isFinal , isEnter , isSelf bool ,
) (Result , bool ) {
if m .disposing .Load () {
return Canceled , false
}
t := m .t .Load ()
e := &Event {
Name : name ,
machine : m ,
machApi : m ,
Args : args ,
TransitionId : t .Id ,
MachineId : m .Id (),
}
targetStates := t .TargetStates ()
t .latestHandlerIsEnter = isEnter
t .latestHandlerIsFinal = isFinal
if e .Args == nil {
e .Args = A {}
}
res , handlerCalled := m .processHandlers (e )
if m .panicCaught .Load () {
res = Canceled
m .panicCaught .Store (false )
}
if !isFinal && res == Canceled {
if m .semLogger .Level () >= LogOps {
var self string
if isSelf {
self = ":self"
}
m .log (LogOps , "[cancel%s] (%s) by %s" , self ,
j (targetStates ), name )
}
return Canceled , handlerCalled
}
return res , handlerCalled
}
func (m *Machine ) processHandlers (e *Event ) (Result , bool ) {
if m .disposing .Load () {
return Canceled , false
}
handlerCalled := false
handlers := m .getHandlers (false )
for i := 0 ; !m .disposing .Load () && i < len (handlers ); i ++ {
h := handlers [i ]
if h == nil {
continue
}
h .mx .Lock ()
methodName := e .Name
handlerName := strconv .Itoa (i ) + ":" + h .name
if m .semLogger .Level () >= LogEverything {
emitterID := truncateStr (handlerName , 15 )
emitterID = padString (strings .ReplaceAll (emitterID , " " , "_" ), 15 , "_" )
m .log (LogEverything , "[handle:%-15s] %s" , emitterID , methodName )
}
_ , ok := h .missingCache [methodName ]
if ok {
h .mx .Unlock ()
continue
}
method , ok := h .methodCache [methodName ]
if !ok {
method = h .methods .MethodByName (methodName )
if !method .IsValid () {
method = h .methods .Elem ().FieldByName (methodName )
}
if !method .IsValid () {
h .missingCache [methodName ] = struct {}{}
h .mx .Unlock ()
continue
}
h .methodCache [methodName ] = method
}
h .mx .Unlock ()
m .log (LogOps , "[handler:%d] %s" , i , methodName )
m .currentHandler .Store (methodName )
var ret bool
var timeout bool
handlerCalled = true
m .tracersMx .RLock ()
tx := m .t .Load ()
for i := range m .tracers {
m .tracers [i ].HandlerStart (tx , handlerName , methodName )
}
m .tracersMx .RUnlock ()
handlerCall := &handlerCall {
fn : method ,
name : methodName ,
event : e ,
timeout : false ,
}
select {
case <- m .ctx .Done ():
break
case m .handlerStart <- handlerCall :
}
m .handlerTimer .Reset (m .HandlerTimeout )
select {
case <- m .ctx .Done ():
case <- m .handlerTimer .C :
m .log (LogOps , "[cancel] (%s) by timeout" , j (tx .TargetStates ()))
m .log (LogDecisions , "[handler:timeout]: %s from %s" , methodName , h .name )
timeout = true
select {
case <- m .handlerEnd :
m .log (LogEverything , "[handler:ack-timeout] %s from %s" , e .Name , h .name )
case <- time .After (m .HandlerDeadline ):
m .log (LogEverything , "[handler:deadline] %s from %s" , e .Name , h .name )
go m .handlerLoop ()
m .queueMx .Lock ()
m .queue = nil
m .queueLen .Store (0 )
m .queueTicksPending = 0
m .queueMx .Unlock ()
err := fmt .Errorf ("%w: %s from %s" , ErrHandlerTimeout , methodName ,
h .name )
m .EvAddErr (e , err , Pass (&AT {
TargetStates : tx .TargetStates (),
CalledStates : tx .CalledStates (),
TimeBefore : tx .TimeBefore ,
TimeAfter : tx .TimeAfter ,
Event : e .Export (),
}))
now := time .Now ()
m .LastHandlerDeadline .Store (&now )
}
case r := <- m .handlerPanic :
m .recoverToErr (h , r )
case ret = <- m .handlerEnd :
m .log (LogEverything , "[handler:end] %s from %s" , e .Name , h .name )
}
m .handlerTimer .Stop ()
m .currentHandler .Store ("" )
m .tracersMx .RLock ()
for i := range m .tracers {
m .tracers [i ].HandlerEnd (tx , handlerName , methodName )
}
m .tracersMx .RUnlock ()
switch {
case timeout :
return Canceled , handlerCalled
case strings .HasSuffix (e .Name , SuffixState ):
case strings .HasSuffix (e .Name , SuffixEnd ):
default :
if !ret {
return Canceled , handlerCalled
}
}
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
for _ , ch := range m .subs .ProcessWhenArgs (e ) {
close (ch )
}
return Executed , handlerCalled
}
func (m *Machine ) handlerLoop () {
ver := m .handlerLoopVer .Add (1 )
catch := func () {
err := recover ()
if err == nil {
return
}
if !m .disposing .Load () {
m .handlerPanic <- recoveryData {
err : err ,
stack : captureStackTrace (),
}
}
}
if m .PanicToException {
defer catch ()
}
handleCall := func (call *handlerCall ) bool {
ret := true
if call .event .IsValid () {
callRet := call .fn .Call ([]reflect .Value {reflect .ValueOf (call .event )})
if len (callRet ) > 0 {
ret = callRet [0 ].Interface ().(bool )
}
} else {
m .log (LogDecisions , "[handler:invalid] %s" , call .name )
ret = false
}
currVer := m .handlerLoopVer .Load ()
if currVer != ver {
m .AddErr (fmt .Errorf (
"deadlined handler finished, theoretical leak: %s" , call .name ), nil )
return false
}
m .loopLock .Lock ()
select {
case <- m .ctx .Done ():
m .handlerLoopDone ()
m .loopLock .Unlock ()
return false
case m .handlerEnd <- ret :
m .loopLock .Unlock ()
}
return true
}
grace :
for {
select {
case <- m .ctxParent .Done ():
if m .Has1 (StateDisposing ) {
m .Add1 (StateDisposing , nil )
} else {
m .Dispose ()
}
break grace
case call , ok := <- m .handlerStart :
if !ok || !handleCall (call ) {
return
}
}
}
ctx , cancel := context .WithTimeout (context .Background (), 3 *time .Second )
defer cancel ()
for {
select {
case <- ctx .Done ():
m .handlerLoopDone ()
return
case <- m .ctx .Done ():
m .handlerLoopDone ()
return
case call , ok := <- m .handlerStart :
if !ok || !handleCall (call ) {
return
}
}
}
}
func (m *Machine ) handlerLoopDone () {
v , _ := m .ctx .Value (CtxKey ).(CtxValue )
m .log (LogOps , "[doDispose] ctx handlerLoopDone %v" , v )
m .Dispose ()
}
func (m *Machine ) detectQueueDuplicates (mutationType MutationType ,
states S , isCheck bool ,
) bool {
if m .disposing .Load () {
return false
}
found , idx , qTick := m .IsQueued (mutationType , states , true , true , 0 , isCheck ,
PositionAny )
if !found {
return false
}
var counterMutType MutationType
switch mutationType {
case MutationAdd :
counterMutType = MutationRemove
case MutationRemove :
counterMutType = MutationAdd
case MutationSet :
fallthrough
default :
return idx > 0 && len (m .queue )-1 > 0
}
counterMutFound , _ , _ := m .IsQueued (counterMutType , states ,
false , false , qTick +1 , isCheck , PositionAny )
return !counterMutFound
}
func (m *Machine ) Transition () *Transition {
return m .t .Load ()
}
func (m *Machine ) Clock (states S ) Clock {
if m .disposing .Load () {
return Clock {}
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
if states == nil {
states = m .stateNames
}
ret := make (Clock )
for _ , state := range states {
ret [state ] = m .clock [state ]
}
return ret
}
func (m *Machine ) Tick (state string ) uint64 {
if m .disposing .Load () {
return 0
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
return m .clock [state ]
}
func (m *Machine ) IsQueued (mutType MutationType , states S ,
withoutArgsOnly bool , statesStrictEqual bool , minQueueTick uint64 ,
isCheck bool , position Position ,
) (found bool , idx uint16 , qTick uint64 ) {
if m .disposing .Load () {
return false , 0 , 0
}
m .queueMx .RLock ()
defer m .queueMx .RUnlock ()
iter := m .queue
switch position {
case PositionLast :
idx := math .Max (0 , float64 (len (iter )-1 ))
iter = iter [int (idx ):]
case PositionFirst :
iter = iter [0 :1 ]
}
for i , mut := range iter {
if minQueueTick > 0 && mut .QueueTick < minQueueTick {
continue
}
if mut .IsCheck == isCheck &&
mut .Type == mutType &&
((withoutArgsOnly && len (mut .Args ) == 0 ) || !withoutArgsOnly ) &&
((statesStrictEqual &&
len (mut .Called ) == len (states )) ||
(!statesStrictEqual &&
len (mut .Called ) >= len (states ))) &&
slicesEvery (mut .Called , m .Index (states )) {
return true , uint16 (i ), mut .QueueTick
}
}
return false , 0 , 0
}
func (m *Machine ) IsQueuedAbove (threshold int , mutType MutationType ,
states S , withoutArgsOnly bool , statesStrictEqual bool , minQueueTick uint64 ,
) bool {
if m .disposing .Load () {
return false
}
m .queueMx .RLock ()
defer m .queueMx .RUnlock ()
c := 0
for _ , mut := range m .queue {
if minQueueTick > 0 && mut .QueueTick < minQueueTick {
continue
}
if mut .IsCheck == false &&
mut .Type == mutType &&
((withoutArgsOnly && len (mut .Args ) == 0 ) || !withoutArgsOnly ) &&
((statesStrictEqual &&
len (mut .Called ) == len (states )) ||
(!statesStrictEqual &&
len (mut .Called ) >= len (states ))) &&
slicesEvery (mut .Called , m .Index (states )) {
c ++
if c >= threshold {
return true
}
}
}
return false
}
func (m *Machine ) QueueLen () uint16 {
return uint16 (m .queueLen .Load ())
}
func (m *Machine ) WillBe (states S , position ...Position ) bool {
if len (position ) == 0 {
position = []Position {PositionAny }
}
found , idx , _ := m .IsQueued (MutationAdd , states , false , false , 0 , false ,
position [0 ])
switch {
case !found :
return false
case idx == 0 && position [0 ] == PositionFirst :
return true
case idx == uint16 (m .queueLen .Load ())-1 && position [0 ] == PositionLast :
return true
default :
return true
}
}
func (m *Machine ) WillBe1 (state string , position ...Position ) bool {
return m .WillBe (S {state }, position ...)
}
func (m *Machine ) WillBeRemoved (states S , position ...Position ) bool {
if len (position ) == 0 {
position = []Position {PositionAny }
}
found , idx , _ := m .IsQueued (MutationRemove , states , false , false , 0 , false ,
position [0 ])
switch {
case !found :
return false
case idx == 0 && position [0 ] == PositionFirst :
return true
case idx == uint16 (m .queueLen .Load ())-1 && position [0 ] == PositionLast :
return true
default :
return true
}
}
func (m *Machine ) WillBeRemoved1 (state string , position ...Position ) bool {
return m .WillBeRemoved (S {state }, position ...)
}
func (m *Machine ) Has (states S ) bool {
if m .disposing .Load () {
return false
}
return slicesEvery (m .stateNames , states )
}
func (m *Machine ) Has1 (state string ) bool {
return m .Has (S {state })
}
func (m *Machine ) IsClock (clock Clock ) bool {
if m .disposing .Load () {
return false
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
for state , tick := range clock {
if m .clock [state ] != tick {
return false
}
}
return true
}
func (m *Machine ) WasClock (clock Clock ) bool {
if m .disposing .Load () {
return false
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
for state , tick := range clock {
if m .clock [state ] < tick {
return false
}
}
return true
}
func (m *Machine ) IsTime (t Time , states S ) bool {
if m .disposing .Load () {
return false
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
if states == nil {
states = m .stateNames
}
for i , tick := range t {
if m .clock [states [i ]] != tick {
return false
}
}
return true
}
func (m *Machine ) WasTime (t Time , states S ) bool {
if m .disposing .Load () {
return false
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
if states == nil {
states = m .stateNames
}
for i , tick := range t {
if m .clock [states [i ]] < tick {
return false
}
}
return true
}
func (m *Machine ) String () string {
if m .disposing .Load () {
return ""
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
ret := "("
for _ , state := range m .stateNames {
if !slices .Contains (m .activeStates , state ) {
continue
}
if ret != "(" {
ret += " "
}
ret += fmt .Sprintf ("%s:%d" , state , m .clock [state ])
}
return ret + ")"
}
func (m *Machine ) StringAll () string {
if m .disposing .Load () {
return ""
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
ret := "("
ret2 := "["
for _ , state := range m .stateNames {
if slices .Contains (m .activeStates , state ) {
if ret != "(" {
ret += " "
}
ret += fmt .Sprintf ("%s:%d" , state , m .clock [state ])
continue
}
if ret2 != "[" {
ret2 += " "
}
ret2 += fmt .Sprintf ("%s:%d" , state , m .clock [state ])
}
return ret + ") " + ret2 + "]"
}
func (m *Machine ) Inspect (states S ) string {
if m .disposing .Load () {
return ""
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
if states == nil {
states = m .stateNames
}
ret := ""
for _ , name := range states {
state := m .schemaSafe ()[name ]
active := "0"
if slices .Contains (m .activeStates , name ) {
active = "1"
}
ret += fmt .Sprintf ("%s %s\n" +
" |Tick %d\n" , active , name , m .clock [name ])
if state .Auto {
ret += " |Auto true\n"
}
if state .Multi {
ret += " |Multi true\n"
}
if state .Add != nil {
ret += " |Add " + j (state .Add ) + "\n"
}
if state .Require != nil {
ret += " |Require " + j (state .Require ) + "\n"
}
if state .Remove != nil {
ret += " |Remove " + j (state .Remove ) + "\n"
}
if state .After != nil {
ret += " |After " + j (state .After ) + "\n"
}
}
return ret
}
func (m *Machine ) Switch (groups ...S ) string {
activeStates := m .ActiveStates (nil )
for _ , states := range groups {
for _ , state := range states {
if slices .Contains (activeStates , state ) {
return state
}
}
}
return ""
}
func (m *Machine ) ActiveStates (states S ) S {
if m .disposing .Load () {
return S {}
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
if states == nil {
return slices .Clone (m .activeStates )
}
ret := make (S , 0 , len (states ))
for _ , state := range states {
if slices .Contains (m .activeStates , state ) {
ret = append (ret , state )
}
}
return ret
}
func (m *Machine ) StateNames () S {
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
if m .stateNamesExport == nil {
m .stateNamesExport = slices .Clone (m .stateNames )
}
return m .stateNamesExport
}
func (m *Machine ) StateNamesMatch (re *regexp .Regexp ) S {
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
ret := S {}
for _ , name := range m .stateNames {
if re .MatchString (name ) {
ret = append (ret , name )
}
}
return ret
}
func (m *Machine ) Queue () []*Mutation {
if m .disposing .Load () {
return nil
}
m .queueMx .RLock ()
defer m .queueMx .RUnlock ()
return slices .Clone (m .queue )
}
func (m *Machine ) Schema () Schema {
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
return SchemaClone (m .schema )
}
func (m *Machine ) schemaSafe () Schema {
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
return m .schema
}
func (m *Machine ) SchemaVer () int {
return len (m .StateNames ())
}
func (m *Machine ) SetSchema (newSchema Schema , names S ) error {
if m .Transition () != nil {
return fmt .Errorf ("%w: cannot set schema during a transition" , ErrSchema )
}
m .schemaMx .Lock ()
m .queueMx .RLock ()
defer m .queueMx .RUnlock ()
if len (newSchema ) <= len (m .schema ) {
m .schemaMx .Unlock ()
return fmt .Errorf ("%w: new schema too short (> %d states)" ,
ErrSchema , len (m .schema ))
}
if len (newSchema ) != len (names ) {
m .schemaMx .Unlock ()
return fmt .Errorf ("%w: new schema has to be the same length as" +
" state names" , ErrSchema )
}
old := m .schema
parsed , err := ParseSchema (newSchema )
if err != nil {
m .schemaMx .Unlock ()
return err
}
m .schema = parsed
if err = m .verifyStates (names ); err != nil {
m .schemaMx .Unlock ()
return err
}
m .subs .SetClock (m .Clock (nil ))
m .schemaMx .Unlock ()
m .resolver .NewSchema (m .schema , m .stateNames )
m .tracersMx .RLock ()
for i := 0 ; !m .disposing .Load () && i < len (m .tracers ); i ++ {
m .tracers [i ].SchemaChange (m , old )
}
m .tracersMx .RUnlock ()
return nil
}
func (m *Machine ) EvAdd (event *Event , states S , args A ) Result {
if m .disposing .Load () || m .Backoff () ||
uint16 (m .queueLen .Load ()) >= m .QueueLimit {
return Canceled
}
queueTick := m .queueMutation (MutationAdd , states , args , event )
if queueTick == uint64 (Executed ) {
return Executed
}
m .breakpoint (states , nil )
res := m .processQueue ()
if res == Queued {
return Result (queueTick + 1 )
}
return res
}
func (m *Machine ) EvAdd1 (event *Event , state string , args A ) Result {
return m .EvAdd (event , S {state }, args )
}
func (m *Machine ) EvRemove (event *Event , states S , args A ) Result {
if m .disposing .Load () || m .Backoff () ||
uint16 (m .queueLen .Load ()) >= m .QueueLimit {
return Canceled
}
m .queueMx .RLock ()
lenQueue := len (m .queue )
var statesAny []S
for _ , name := range states {
statesAny = append (statesAny , S {name })
}
if lenQueue == 0 && m .Transition () != nil && !m .Any (statesAny ...) {
m .queueMx .RUnlock ()
return Executed
}
m .queueMx .RUnlock ()
queueTick := m .queueMutation (MutationRemove , states , args , event )
if queueTick == uint64 (Executed ) {
return Executed
}
m .breakpoint (nil , states )
res := m .processQueue ()
if res == Queued {
return Result (queueTick + 1 )
}
return res
}
func (m *Machine ) EvRemove1 (event *Event , state string , args A ) Result {
return m .EvRemove (event , S {state }, args )
}
func (m *Machine ) EvAddErr (event *Event , err error , args A ) Result {
return m .EvAddErrState (event , StateException , err , args )
}
func (m *Machine ) EvAddErrState (
event *Event , state string , err error , args A ,
) Result {
if m .disposing .Load () || m .Backoff () ||
uint16 (m .queueLen .Load ()) >= m .QueueLimit || err == nil {
return Canceled
}
m .err .Store (&err )
var trace string
if m .LogStackTrace {
trace = captureStackTrace ()
}
onErr := m .onError .Load ()
if onErr != nil {
(*onErr )(m , err )
}
argsT := &AT {
Err : err ,
ErrTrace : trace ,
}
return m .EvAdd (event , S {state , StateException }, PassMerge (args , argsT ))
}
func (m *Machine ) Export () (*Serialized , Schema , error ) {
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
m .queueMx .RLock ()
defer m .queueMx .RUnlock ()
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
if !m .statesVerified .Load () {
return nil , nil , fmt .Errorf ("%w: call VerifyStates first" , ErrSchema )
}
t := m .time (nil )
m .log (LogOps , "[import] exported at %d ticks" , t .Sum (nil ))
return &Serialized {
ID : m .id ,
Time : t ,
MachineTick : m .machineTick ,
StateNames : m .stateNames ,
QueueTick : m .queueTick ,
}, SchemaClone (m .schema ), nil
}
func (m *Machine ) Import (data *Serialized ) error {
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
m .queueMx .RLock ()
defer m .queueMx .RUnlock ()
m .schemaMx .Lock ()
defer m .schemaMx .Unlock ()
if m .id != data .ID {
return fmt .Errorf ("import ID mismatch" )
}
if len (m .schema ) != len (data .StateNames ) {
return fmt .Errorf ("%w: importing diff state len" , ErrSchema )
}
var sum uint64
m .activeStates = nil
for idx , v := range data .Time {
state := data .StateNames [idx ]
sum += v
if !slices .Contains (m .stateNames , state ) {
return fmt .Errorf ("%w: %s" , ErrStateUnknown , state )
}
if IsActiveTick (v ) {
m .activeStates = append (m .activeStates , state )
}
m .clock [state ] = v
}
m .stateNames = data .StateNames
m .stateNamesExport = nil
m .statesVerified .Store (true )
m .machineTick = data .MachineTick + 1
if m .Has1 (StateMachineRestored ) {
m .Add1 (StateMachineRestored , nil )
}
m .log (LogChanges , "[import] imported %d times, now at %d ticks" ,
m .machineTick , sum )
return nil
}
func (m *Machine ) Index1 (state string ) int {
if m .disposing .Load () {
return -1
}
return slices .Index (m .StateNames (), state )
}
func (m *Machine ) Index (states S ) []int {
if m .disposing .Load () {
return []int {}
}
index := m .StateNames ()
return StatesToIndex (index , states )
}
func (m *Machine ) Resolver () RelationsResolver {
return m .resolver
}
func (m *Machine ) BindTracer (tracer Tracer ) error {
if m .disposing .Load () {
return nil
}
m .tracersMx .Lock ()
defer m .tracersMx .Unlock ()
v := reflect .ValueOf (tracer )
if v .Kind () != reflect .Ptr || v .Elem ().Kind () != reflect .Struct {
return errors .New ("BindTracer expects a pointer to a struct" )
}
name := reflect .TypeOf (tracer ).Elem ().Name ()
m .tracers = append (m .tracers , tracer )
m .log (LogOps , "[tracers] bind %s" , name )
return nil
}
func (m *Machine ) DetachTracer (tracer Tracer ) error {
if m .disposing .Load () {
return nil
}
m .tracersMx .Lock ()
defer m .tracersMx .Unlock ()
v := reflect .ValueOf (tracer )
if v .Kind () != reflect .Ptr || v .Elem ().Kind () != reflect .Struct {
return errors .New ("DetachTracer expects a pointer to a struct" )
}
name := reflect .TypeOf (tracer ).Elem ().Name ()
for i , t := range m .tracers {
if t == tracer {
m .tracers = slices .Delete (m .tracers , i , i +1 )
m .log (LogOps , "[tracers] detach %s" , name )
return nil
}
}
return errors .New ("tracer not bound" )
}
func (m *Machine ) Tracers () []Tracer {
m .tracersMx .Lock ()
defer m .tracersMx .Unlock ()
return slices .Clone (m .tracers )
}
func (m *Machine ) OnError (fn HandlerError ) {
m .onError .Store (&fn )
}
func (m *Machine ) Backoff () bool {
last := m .LastHandlerDeadline .Load ()
return last != nil && time .Since (*last ) < m .HandlerBackoff
}
func (m *Machine ) OnChange (fn HandlerChange ) {
m .onChange .Store (&fn )
}
func (m *Machine ) SetGroups (groups any , optStates States ) {
m .schemaMx .Lock ()
defer m .schemaMx .Unlock ()
list := map [string ][]int {}
order := []string {}
index := m .stateNames
if groups != nil {
val := reflect .ValueOf (groups )
typ := val .Type ()
for i := 0 ; i < val .NumField (); i ++ {
field := typ .Field (i )
kind := field .Type .Kind ()
if kind != reflect .Slice {
continue
}
name := field .Name
value := val .Field (i ).Interface ()
if states , ok := value .(S ); ok {
list [name ] = StatesToIndex (index , states )
order = append (order , name )
}
}
}
if optStates != nil {
groups , order2 := optStates .StateGroups ()
for _ , name := range order2 {
list [name ] = groups [name ]
order = append (order , name )
}
}
m .groups = list
m .groupsOrder = order
}
func (m *Machine ) SetGroupsString (groups map [string ]S , order []string ) {
m .schemaMx .Lock ()
defer m .schemaMx .Unlock ()
list := map [string ][]int {}
for name , states := range groups {
list [name ] = StatesToIndex (m .stateNames , states )
}
m .groups = list
m .groupsOrder = order
}
func (m *Machine ) Groups () (map [string ][]int , []string ) {
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
return m .groups , m .groupsOrder
}
func (m *Machine ) Fork (ctx context .Context , e *Event , fn func ()) {
if !e .IsValid () {
return
}
m .Go (ctx , fn )
}
func (m *Machine ) Go (ctx context .Context , fn func ()) {
go func () {
if ctx .Err () != nil {
return
}
fn ()
}()
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .