package machine
import (
"context"
"errors"
"fmt"
"maps"
"math"
"os"
"reflect"
"regexp"
"runtime"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
var _ Api = &Machine {}
type Machine struct {
QueueLimit uint16
HandlerTimeout time .Duration
HandlerDeadline time .Duration
LastHandlerDeadline atomic .Pointer [time .Time ]
HandlerBackoff time .Duration
EvalTimeout time .Duration
LogStackTrace bool
PanicToException bool
DisposeTimeout time .Duration
subs *Subscriptions
panicCaught atomic .Bool
logId atomic .Bool
statesVerified atomic .Bool
id string
ctx context .Context
parentId string
disposing atomic .Bool
disposed atomic .Bool
queueRunning atomic .Bool
tags atomic .Pointer [[]string ]
tracers []Tracer
tracersMx sync .RWMutex
err atomic .Pointer [error ]
t atomic .Pointer [Transition ]
schema Schema
groups map [string ][]int
groupsOrder []string
schemaMx sync .RWMutex
activeStates S
activeStatesMx sync .RWMutex
queue []*Mutation
queueTick uint64
machineTick uint32
queueTicksPending uint64
queueToken atomic .Uint64
queueMx sync .RWMutex
queueProcessing atomic .Bool
queueLen atomic .Uint32
resolver RelationsResolver
stateNames S
stateNamesExport S
loopLock sync .Mutex
handlers []*handler
handlersMx sync .RWMutex
clock Clock
cancel context .CancelFunc
logLevel atomic .Pointer [LogLevel ]
logger atomic .Pointer [LoggerFn ]
semLogger SemLogger
handlerStart chan *handlerCall
handlerEnd chan bool
handlerPanic chan recoveryData
handlerTimer *time .Timer
logEntriesLock sync .Mutex
logEntries []*LogEntry
logArgs atomic .Pointer [LogArgsMapperFn ]
currentHandler atomic .Value
disposeHandlers []HandlerDispose
timeLast atomic .Pointer [Time ]
whenDisposed chan struct {}
handlerLoopRunning atomic .Bool
handlerLoopVer atomic .Int32
detectEval bool
unlockDisposed atomic .Bool
breakpointsMx sync .Mutex
breakpoints []*breakpoint
onError atomic .Pointer [HandlerError ]
onChange atomic .Pointer [HandlerChange ]
}
func NewCommon (
ctx context .Context , id string , stateSchema Schema , stateNames S ,
handlers any , parent Api , opts *Opts ,
) (*Machine , error ) {
machOpts := &Opts {Id : id }
if opts != nil {
machOpts = opts
machOpts .Id = id
}
if os .Getenv (EnvAmDebug ) != "" {
machOpts = OptsWithDebug (machOpts )
} else if os .Getenv (EnvAmTestRunner ) != "" {
machOpts .HandlerTimeout = 1 * time .Second
}
if parent != nil {
machOpts .Parent = parent
}
if machOpts .LogArgs == nil {
machOpts .LogArgs = NewArgsMapper (LogArgs , 0 )
}
mach := New (ctx , stateSchema , machOpts )
err := mach .VerifyStates (stateNames )
if err != nil {
return nil , err
}
if handlers != nil {
err = mach .BindHandlers (handlers )
if err != nil {
return nil , err
}
}
return mach , nil
}
func New (ctx context .Context , schema Schema , opts *Opts ) *Machine {
parsedStates , err := ParseSchema (schema )
m := &Machine {
HandlerTimeout : 100 * time .Millisecond ,
HandlerDeadline : 10 * time .Second ,
HandlerBackoff : 3 * time .Second ,
EvalTimeout : time .Second ,
LogStackTrace : true ,
PanicToException : true ,
QueueLimit : 1000 ,
DisposeTimeout : time .Second ,
id : randId (),
schema : parsedStates ,
clock : Clock {},
handlers : []*handler {},
handlerStart : make (chan *handlerCall ),
handlerEnd : make (chan bool ),
handlerPanic : make (chan recoveryData ),
handlerTimer : time .NewTimer (24 * time .Hour ),
whenDisposed : make (chan struct {}),
queueTick : 1 ,
}
m .subs = NewSubscriptionManager (m , m .clock , m .is , m .not , m .log )
m .semLogger = &semLogger {mach : m }
m .logId .Store (true )
m .timeLast .Store (&Time {})
lvl := LogNothing
m .logLevel .Store (&lvl )
opts = cloneOptions (opts )
var parent Api
if opts != nil {
if opts .Id != "" {
m .id = opts .Id
}
if opts .HandlerTimeout != 0 {
m .HandlerTimeout = opts .HandlerTimeout
}
if opts .HandlerDeadline != 0 {
m .HandlerDeadline = opts .HandlerDeadline
}
if opts .HandlerBackoff != 0 {
m .HandlerBackoff = opts .HandlerBackoff
}
if opts .DontPanicToException {
m .PanicToException = false
}
if opts .DontLogStackTrace {
m .LogStackTrace = false
}
if opts .DontLogId {
m .logId .Store (false )
}
if opts .Resolver != nil {
m .resolver = opts .Resolver
}
if opts .LogLevel != LogNothing {
m .semLogger .SetLevel (opts .LogLevel )
}
if opts .Tracers != nil {
m .tracers = opts .Tracers
}
if opts .LogArgs != nil {
m .logArgs .Store (&opts .LogArgs )
}
if opts .QueueLimit != 0 {
m .QueueLimit = opts .QueueLimit
}
if len (opts .Tags ) > 0 {
tags := slicesUniq (opts .Tags )
m .tags .Store (&tags )
}
m .detectEval = opts .DetectEval
parent = opts .Parent
m .parentId = opts .ParentId
}
if os .Getenv (EnvAmDetectEval ) != "" {
m .detectEval = true
}
if m .resolver == nil {
m .resolver = &DefaultRelationsResolver {
Machine : m ,
}
}
if _ , ok := m .schema [StateException ]; !ok {
m .schema [StateException ] = State {
Multi : true ,
}
}
for name := range m .schema {
m .stateNames = append (m .stateNames , name )
slices .Sort (m .stateNames )
m .clock [name ] = 0
}
m .resolver .NewSchema (m .schema , m .stateNames )
if ctx == nil {
ctx = context .TODO ()
}
m .ctx , m .cancel = context .WithCancel (ctx )
if parent != nil {
m .parentId = parent .Id ()
pTracers := parent .Tracers ()
for _ , t := range pTracers {
t .NewSubmachine (parent , m )
}
}
for i := range m .tracers {
ctxTr := m .tracers [i ].MachineInit (m )
if ctxTr != nil {
m .ctx = ctxTr
}
}
if err != nil {
m .AddErr (err , nil )
}
return m
}
func (m *Machine ) OnDispose (fn HandlerDispose ) {
m .handlersMx .Lock ()
defer m .handlersMx .Unlock ()
m .disposeHandlers = append (m .disposeHandlers , fn )
}
func (m *Machine ) Dispose () {
if m .Has1 (StateStart ) {
m .Remove1 (StateStart , nil )
}
go func () {
if m .disposing .Load () {
m .log (LogDecisions , "[Dispose] already disposed" )
return
}
m .queueProcessing .Store (false )
m .unlockDisposed .Store (true )
m .doDispose (false )
}()
}
func (m *Machine ) IsDisposed () bool {
return m .disposed .Load ()
}
func (m *Machine ) DisposeForce () {
m .doDispose (true )
}
func (m *Machine ) doDispose (force bool ) {
if m .disposed .Load () {
return
}
if !m .disposing .CompareAndSwap (false , true ) {
return
}
m .cancel ()
if !force {
whenIdle := m .WhenQueueEnds ()
select {
case <- time .After (m .DisposeTimeout ):
m .log (LogDecisions , "[doDispose] timeout waiting for queue to drain" )
case <- whenIdle :
}
}
if !m .disposed .CompareAndSwap (false , true ) {
return
}
m .tracersMx .RLock ()
for i := range m .tracers {
m .tracers [i ].MachineDispose (m .Id ())
}
m .tracersMx .RUnlock ()
if !force {
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
m .subs .Mx .Lock ()
defer m .subs .Mx .Unlock ()
m .tracersMx .Lock ()
defer m .tracersMx .Unlock ()
m .handlersMx .Lock ()
defer m .handlersMx .Unlock ()
m .queueMx .Lock ()
defer m .queueMx .Unlock ()
}
m .log (LogEverything , "[end] doDispose" )
if m .Err () == nil && m .ctx .Err () != nil {
err := m .ctx .Err ()
m .err .Store (&err )
}
m .handlers = nil
go func () {
time .Sleep (100 * time .Millisecond )
m .loopLock .Lock ()
defer m .loopLock .Unlock ()
closeSafe (m .handlerEnd )
closeSafe (m .handlerPanic )
closeSafe (m .handlerStart )
}()
m .subs .dispose ()
for _ , mut := range m .queue {
if !mut .IsCheck {
continue
}
if done , ok := mut .Args [argCheckDone ].(*CheckDone ); ok {
closeSafe (done .Ch )
}
}
if m .handlerTimer != nil {
m .handlerTimer .Stop ()
}
if m .unlockDisposed .Load () {
m .unlockDisposed .Store (false )
m .queueProcessing .Store (false )
}
for _ , fn := range m .disposeHandlers {
fn (m .id , m .ctx )
}
closeSafe (m .whenDisposed )
}
func (m *Machine ) getHandlers (locked bool ) []*handler {
if !locked {
m .handlersMx .Lock ()
defer m .handlersMx .Unlock ()
}
return slices .Clone (m .handlers )
}
func (m *Machine ) setHandlers (locked bool , handlers []*handler ) {
if !locked {
m .handlersMx .Lock ()
defer m .handlersMx .Unlock ()
}
m .handlers = handlers
}
func (m *Machine ) WhenErr (disposeCtx context .Context ) <-chan struct {} {
return m .When ([]string {StateException }, disposeCtx )
}
func (m *Machine ) When (states S , ctx context .Context ) <-chan struct {} {
if m .disposed .Load () {
return newClosedChan ()
}
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
return m .subs .When (m .mustParseStates (states ), ctx )
}
func (m *Machine ) When1 (state string , ctx context .Context ) <-chan struct {} {
return m .When (S {state }, ctx )
}
func (m *Machine ) WhenNot (states S , ctx context .Context ) <-chan struct {} {
if m .disposed .Load () {
ch := make (chan struct {})
close (ch )
return ch
}
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
return m .subs .WhenNot (m .mustParseStates (states ), ctx )
}
func (m *Machine ) WhenNot1 (state string , ctx context .Context ) <-chan struct {} {
return m .WhenNot (S {state }, ctx )
}
func (m *Machine ) WhenTime (
states S , times Time , ctx context .Context ,
) <-chan struct {} {
if m .disposed .Load () {
return newClosedChan ()
}
if len (states ) != len (times ) {
err := fmt .Errorf (
"whenTime: states and times must have the same length (%s)" , j (states ))
m .AddErr (err , nil )
return newClosedChan ()
}
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
return m .subs .WhenTime (states , times , ctx )
}
func (m *Machine ) WhenTime1 (
state string , ticks uint64 , ctx context .Context ,
) <-chan struct {} {
return m .WhenTime (S {state }, Time {ticks }, ctx )
}
func (m *Machine ) WhenTicks (
state string , ticks int , ctx context .Context ,
) <-chan struct {} {
return m .WhenTime (S {state }, Time {uint64 (ticks ) + m .Tick (state )}, ctx )
}
func (m *Machine ) WhenQuery (
clockCheck func (clock Clock ) bool , ctx context .Context ,
) <-chan struct {} {
if m .disposed .Load () {
return newClosedChan ()
}
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
return m .subs .WhenQuery (clockCheck , ctx )
}
func (m *Machine ) WhenQueueEnds () <-chan struct {} {
if m .disposed .Load () || !m .queueRunning .Load () {
return newClosedChan ()
}
m .queueMx .Lock ()
defer m .queueMx .Unlock ()
return m .subs .WhenQueueEnds ()
}
func (m *Machine ) WhenQueue (tick Result ) <-chan struct {} {
if m .disposed .Load () {
return newClosedChan ()
}
m .queueMx .Lock ()
defer m .queueMx .Unlock ()
if m .queueTick >= uint64 (tick ) {
return newClosedChan ()
}
return m .subs .WhenQueue (tick )
}
func (m *Machine ) WhenArgs (
state string , args A , ctx context .Context ,
) <-chan struct {} {
if m .disposed .Load () {
return newClosedChan ()
}
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
states := m .mustParseStates (S {state })
return m .subs .WhenArgs (states [0 ], args , ctx )
}
func (m *Machine ) WhenDisposed () <-chan struct {} {
return m .whenDisposed
}
func (m *Machine ) QueueTick () uint64 {
m .queueMx .Lock ()
defer m .queueMx .Unlock ()
return m .queueTick
}
func (m *Machine ) MachineTick () uint32 {
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
return m .machineTick
}
func (m *Machine ) Time (states S ) Time {
if m .disposed .Load () {
return nil
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
return m .time (states )
}
func (m *Machine ) time (states S ) Time {
if m .disposed .Load () {
return nil
}
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
if states == nil {
states = m .stateNames
}
ret := make (Time , len (states ))
for i , s := range states {
ret [i ] = m .clock [s ]
}
return ret
}
func (m *Machine ) PrependMut (mut *Mutation ) Result {
if m .disposing .Load () {
return Canceled
}
isEval := mut .Type == mutationEval
statesParsed := m .mustParseStates (IndexToStates (m .StateNames (), mut .Called ))
m .log (LogOps , "[prepend:%s] %s%s" , mut .Type , j (statesParsed ),
mut .LogArgs (m .SemLogger ().ArgsMapper ()))
m .queueMx .Lock ()
if !isEval {
mut .cacheCalled .Store (&statesParsed )
}
if m .id == "ns-TestManyStates" {
print ()
}
m .queue = append ([]*Mutation {mut }, m .queue ...)
lenQ := len (m .queue )
m .queueLen .Store (uint32 (lenQ ))
if !isEval {
mut .QueueLen = int32 (lenQ )
mut .QueueToken = m .queueToken .Add (1 )
mut .QueueTickNow = m .queueTick
}
m .queueMx .Unlock ()
if !isEval {
m .tracersMx .RLock ()
for i := 0 ; !m .disposing .Load () && i < len (m .tracers ); i ++ {
m .tracers [i ].MutationQueued (m , mut )
}
m .tracersMx .RUnlock ()
}
res := m .processQueue ()
return res
}
func (m *Machine ) Add (states S , args A ) Result {
if m .disposing .Load () || m .Backoff () {
return Canceled
}
if m .id == "ns-TestManyStates" {
print ()
}
if uint16 (m .queueLen .Load ()) >= m .QueueLimit {
if !slices .Contains (states , StateException ) || m .IsErr () {
return Canceled
}
}
queueTick := m .queueMutation (MutationAdd , states , args , nil )
if queueTick == uint64 (Executed ) {
return Executed
}
m .breakpoint (states , nil )
res := m .processQueue ()
if res == Queued {
return Result (queueTick )
}
return res
}
func (m *Machine ) Add1 (state string , args A ) Result {
return m .Add (S {state }, args )
}
func (m *Machine ) Toggle (states S , args A ) Result {
if m .disposing .Load () {
return Canceled
}
if m .Is (states ) {
return m .Remove (states , args )
}
return m .Add (states , args )
}
func (m *Machine ) Toggle1 (state string , args A ) Result {
if m .disposing .Load () {
return Canceled
}
if m .Is1 (state ) {
return m .Remove1 (state , args )
}
return m .Add1 (state , args )
}
func (m *Machine ) EvToggle (e *Event , states S , args A ) Result {
if m .disposing .Load () {
return Canceled
}
if m .Is (states ) {
return m .EvRemove (e , states , args )
}
return m .EvAdd (e , states , args )
}
func (m *Machine ) EvToggle1 (e *Event , state string , args A ) Result {
if m .disposing .Load () {
return Canceled
}
if m .disposing .Load () {
return Canceled
}
if m .Is1 (state ) {
return m .EvRemove1 (e , state , args )
}
return m .EvAdd1 (e , state , args )
}
func (m *Machine ) AddErr (err error , args A ) Result {
return m .AddErrState (StateException , err , args )
}
func (m *Machine ) AddErrState (state string , err error , args A ) Result {
if m .disposing .Load () || m .Backoff () || err == nil {
return Canceled
}
m .err .Store (&err )
var trace string
if m .LogStackTrace {
trace = captureStackTrace ()
}
argsT := &AT {
Err : err ,
ErrTrace : trace ,
}
if onErr := m .onError .Load (); onErr != nil {
(*onErr )(m , err )
}
return m .Add (S {state , StateException }, PassMerge (args , argsT ))
}
func (m *Machine ) CanAdd (states S , args A ) Result {
if m .disposing .Load () || m .Backoff () {
return Canceled
}
return m .PrependMut (&Mutation {
Type : MutationAdd ,
Called : m .Index (states ),
Args : args ,
IsCheck : true ,
})
}
func (m *Machine ) CanAdd1 (state string , args A ) Result {
return m .CanAdd (S {state }, args )
}
func (m *Machine ) CanRemove (states S , args A ) Result {
if m .disposing .Load () || m .Backoff () {
return Canceled
}
return m .PrependMut (&Mutation {
Type : MutationRemove ,
Called : m .Index (states ),
Args : args ,
IsCheck : true ,
})
}
func (m *Machine ) CanRemove1 (state string , args A ) Result {
return m .CanRemove (S {state }, nil )
}
func (m *Machine ) PanicToErr (args A ) {
if !m .PanicToException || m .disposing .Load () {
return
}
r := recover ()
if r == nil {
return
}
if err , ok := r .(error ); ok {
m .AddErr (err , args )
} else {
m .AddErr (fmt .Errorf ("%v" , err ), args )
}
}
func (m *Machine ) PanicToErrState (state string , args A ) {
if !m .PanicToException || m .disposing .Load () {
return
}
r := recover ()
if r == nil {
return
}
if err , ok := r .(error ); ok {
m .AddErrState (state , err , args )
} else {
m .AddErrState (state , fmt .Errorf ("%v" , err ), args )
}
}
func (m *Machine ) IsErr () bool {
return m .Is (S {StateException })
}
func (m *Machine ) Err () error {
err := m .err .Load ()
if err == nil {
return nil
}
return *err
}
func (m *Machine ) Remove (states S , args A ) Result {
if m .disposing .Load () || m .Backoff () {
return Canceled
}
if uint16 (m .queueLen .Load ()) >= m .QueueLimit {
if !slices .Contains (states , StateException ) || !m .IsErr () {
return Canceled
}
}
m .queueMx .RLock ()
lenQueue := len (m .queue )
var statesAny []S
for _ , name := range states {
statesAny = append (statesAny , S {name })
}
if lenQueue == 0 && m .Transition () != nil && !m .Any (statesAny ...) {
m .queueMx .RUnlock ()
return Executed
}
m .queueMx .RUnlock ()
queueTick := m .queueMutation (MutationRemove , states , args , nil )
if queueTick == uint64 (Executed ) {
return Executed
}
m .breakpoint (nil , states )
res := m .processQueue ()
if res == Queued {
return Result (queueTick + 1 )
}
return res
}
func (m *Machine ) Remove1 (state string , args A ) Result {
return m .Remove (S {state }, args )
}
func (m *Machine ) Set (states S , args A ) Result {
if m .disposing .Load () || uint16 (m .queueLen .Load ()) >= m .QueueLimit {
return Canceled
}
queueTick := m .queueMutation (MutationSet , states , args , nil )
if queueTick == uint64 (Executed ) {
return Executed
}
res := m .processQueue ()
if res == Queued {
return Result (queueTick + 1 )
}
return res
}
func (m *Machine ) Id () string {
return m .id
}
func (m *Machine ) ParentId () string {
return m .parentId
}
func (m *Machine ) Tags () []string {
tags := m .tags .Load ()
if tags != nil {
return slices .Clone (*tags )
}
return nil
}
func (m *Machine ) SetTags (tags []string ) {
m .tags .Store (&tags )
}
func (m *Machine ) Is (states S ) bool {
if m .disposing .Load () {
return false
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
return m .is (states )
}
func (m *Machine ) Is1 (state string ) bool {
return m .Is (S {state })
}
func (m *Machine ) is (states S ) bool {
if m .disposing .Load () {
return false
}
activeStates := m .activeStates
for _ , s := range states {
if !slices .Contains (m .stateNames , s ) {
m .log (LogDecisions , "[is] state %s not found" , s )
return false
}
if !slices .Contains (activeStates , s ) {
return false
}
}
return true
}
func (m *Machine ) Not (states S ) bool {
if m .disposing .Load () {
return false
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
return m .not (states )
}
func (m *Machine ) not (states S ) bool {
return slicesNone (m .mustParseStates (states ), m .activeStates )
}
func (m *Machine ) Not1 (state string ) bool {
return m .Not (S {state })
}
func (m *Machine ) Any (states ...S ) bool {
for _ , s := range states {
if m .Is (s ) {
return true
}
}
return false
}
func (m *Machine ) Any1 (states ...string ) bool {
for _ , s := range states {
if m .Is1 (s ) {
return true
}
}
return false
}
func (m *Machine ) queueMutation (
mutType MutationType , states S , args A , event *Event ,
) uint64 {
statesParsed := m .mustParseStates (states )
multi := false
for _ , state := range statesParsed {
if m .schema [state ].Multi {
multi = true
break
}
}
if !multi && len (args ) == 0 &&
m .detectQueueDuplicates (mutType , statesParsed , false ) {
m .log (LogOps , "[queue:skipped] Duplicate detected for [%s] %s" ,
mutType , j (statesParsed ))
return uint64 (Executed )
}
if args == nil {
args = A {}
}
var source *MutSource
if event != nil {
tx := event .Transition ()
source = &MutSource {
MachId : event .MachineId ,
TxId : event .TransitionId ,
}
if tx != nil {
source .MachTime = tx .TimeBefore .Sum (nil )
}
}
mut := &Mutation {
Type : mutType ,
Called : m .Index (statesParsed ),
Args : args ,
Source : source ,
}
mut .cacheCalled .Store (&statesParsed )
m .queueMx .Lock ()
if m .id == "ns-TestManyStates" {
print ()
}
m .queue = append (m .queue , mut )
lenQ := len (m .queue )
m .queueLen .Store (uint32 (lenQ ))
m .queueTicksPending += 1
mut .QueueLen = int32 (lenQ )
mut .QueueTick = m .queueTicksPending + m .queueTick
mut .QueueTickNow = m .queueTick
m .queueMx .Unlock ()
m .log (LogOps , "[queue:%s] %s%s" , mutType , j (statesParsed ),
mut .LogArgs (m .SemLogger ().ArgsMapper ()))
m .tracersMx .RLock ()
for i := 0 ; !m .disposing .Load () && i < len (m .tracers ); i ++ {
m .tracers [i ].MutationQueued (m , mut )
}
m .tracersMx .RUnlock ()
if mut .Type == MutationAdd {
m .breakpoint (statesParsed , nil )
} else if mutType == MutationRemove {
m .breakpoint (nil , statesParsed )
}
return mut .QueueTick
}
func (m *Machine ) Eval (source string , fn func (), ctx context .Context ) bool {
if m .disposing .Load () {
return false
}
if source == "" {
panic ("error: source of eval is required" )
}
if m .detectEval {
trace := captureStackTrace ()
for i := 0 ; !m .disposing .Load () && i < len (m .handlers ); i ++ {
handler := m .handlers [i ]
for _ , method := range handler .methodNames {
match := fmt .Sprintf (".(*%s).%s(" , handler .name , method )
for _ , line := range strings .Split (trace , "\n" ) {
if !strings .Contains (line , match ) {
continue
}
msg := fmt .Sprintf ("error: Eval() called directly in handler %s.%s" ,
handler .name , method )
panic (msg )
}
}
}
}
m .log (LogOps , "[eval] %s" , source )
done := make (chan struct {})
canceled := atomic .Bool {}
wrap := func () {
defer close (done )
if canceled .Load () {
return
}
fn ()
}
if ctx == nil {
ctx = context .Background ()
}
mut := &Mutation {
Type : mutationEval ,
eval : wrap ,
evalSource : source ,
ctx : ctx ,
}
_ = m .PrependMut (mut )
select {
case <- time .After (m .EvalTimeout ):
canceled .Store (true )
m .log (LogOps , "[eval:timeout] %s" , source )
m .AddErr (fmt .Errorf ("%w: eval:%s" , ErrEvalTimeout , source ), nil )
return false
case <- m .ctx .Done ():
canceled .Store (true )
return false
case <- ctx .Done ():
canceled .Store (true )
m .log (LogDecisions , "[eval:ctxdone] %s" , source )
return false
case <- done :
}
m .log (LogEverything , "[eval:end] %s" , source )
return true
}
func (m *Machine ) NewStateCtx (state string ) context .Context {
if m .disposing .Load () {
return context .TODO ()
}
m .mustParseStates (S {state })
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
return m .subs .NewStateCtx (state )
}
func (m *Machine ) MustBindHandlers (handlers any ) {
if err := m .BindHandlers (handlers ); err != nil {
panic (err )
}
}
func (m *Machine ) BindHandlers (handlers any ) error {
if m .disposing .Load () {
return nil
}
first := false
if !m .handlerLoopRunning .Load () {
first = true
m .handlerLoopRunning .Store (true )
go m .handlerLoop ()
}
v := reflect .ValueOf (handlers )
if v .Kind () != reflect .Ptr || v .Elem ().Kind () != reflect .Struct {
return errors .New ("BindHandlers expects a pointer to a struct" )
}
name := reflect .TypeOf (handlers ).Elem ().Name ()
if name == "" {
name = "anon"
if os .Getenv (EnvAmDebug ) != "" {
buf := make ([]byte , 4024 )
n := runtime .Stack (buf , false )
stack := string (buf [:n ])
lines := strings .Split (stack , "\n" )
name = lines [len (lines )-2 ]
name = strings .TrimLeft (emitterNameRe .FindString (name ), "/" )
}
}
var methodNames []string
if m .detectEval || os .Getenv (EnvAmDebug ) != "" {
var err error
methodNames , err = ListHandlers (handlers , m .stateNames )
if err != nil {
return fmt .Errorf ("listing handlers: %w" , err )
}
}
h := m .newHandler (handlers , name , &v , methodNames )
old := m .getHandlers (false )
m .setHandlers (false , append (old , h ))
if name != "" {
m .log (LogOps , "[handlers] bind %s" , name )
} else {
m .log (LogOps , "[handlers] bind %d" , len (old ))
}
if first && m .IsErr () {
m .AddErr (m .Err (), nil )
}
return nil
}
func (m *Machine ) DetachHandlers (handlers any ) error {
if m .disposing .Load () {
return nil
}
m .handlersMx .Lock ()
defer m .handlersMx .Unlock ()
old := m .getHandlers (true )
var match *handler
var matchIndex int
for i , h := range old {
if h .h == handlers {
match = h
matchIndex = i
break
}
}
if match == nil {
return errors .New ("handlers not bound" )
}
m .setHandlers (true , slices .Delete (old , matchIndex , matchIndex +1 ))
match .dispose ()
m .log (LogOps , "[handlers] detach %s" , match .name )
return nil
}
func (m *Machine ) HasHandlers () bool {
m .handlersMx .RLock ()
defer m .handlersMx .RUnlock ()
return len (m .handlers ) > 0
}
func (m *Machine ) newHandler (
handlers any , name string , methods *reflect .Value , methodNames []string ,
) *handler {
if m .disposing .Load () {
return &handler {}
}
e := &handler {
name : name ,
h : handlers ,
methods : methods ,
methodNames : methodNames ,
methodCache : make (map [string ]reflect .Value ),
missingCache : make (map [string ]struct {}),
}
return e
}
func (m *Machine ) recoverToErr (handler *handler , r recoveryData ) {
if m .disposing .Load () {
return
}
m .panicCaught .Store (true )
m .currentHandler .Store ("" )
t := m .t .Load ()
index := m .StateNames ()
iException := m .Index1 (StateException )
mut := t .Mutation
if mut .IsCalled (iException ) {
return
}
m .log (LogOps , "[recover] handling panic..." )
err := fmt .Errorf ("%s" , r .err )
m .err .Store (&err )
if t .latestHandlerIsFinal {
m .recoverFinalPhase ()
}
m .log (LogOps , "[cancel] (%s) by recover" , j (t .TargetStates ()))
t .IsAccepted .Store (false )
t .IsCompleted .Store (true )
errMut := &Mutation {
Type : MutationAdd ,
Called : m .Index (S {StateException }),
Args : Pass (&AT {
Err : err ,
ErrTrace : r .stack ,
Panic : &ExceptionArgsPanic {
CalledStates : IndexToStates (index , mut .Called ),
StatesBefore : t .StatesBefore (),
Transition : t ,
},
}),
}
m .PrependMut (errMut )
go m .handlerLoop ()
}
func (m *Machine ) recoverFinalPhase () {
t := m .t .Load ()
finals := slices .Concat (t .Exits , t .Enters )
m .activeStatesMx .RLock ()
activeStates := m .activeStates
m .activeStatesMx .RUnlock ()
found := false
for _ , s := range finals {
if t .latestHandlerToState == s {
found = true
}
if !found {
continue
}
if t .latestHandlerIsEnter {
activeStates = slicesWithout (activeStates , s )
} else {
activeStates = append (activeStates , s )
}
}
m .log (LogOps , "[recover] partial final states as (%s)" ,
j (activeStates ))
m .activeStatesMx .Lock ()
defer m .activeStatesMx .Unlock ()
m .setActiveStates (t .CalledStates (), activeStates , t .IsAuto ())
}
func (m *Machine ) mustParseStates (states S ) S {
if m .disposing .Load () {
return nil
}
seen := make (map [string ]struct {})
dups := false
for i := range states {
if _ , ok := m .schema [states [i ]]; !ok {
panic (fmt .Errorf (
"%w: %s not defined in schema for %s" , ErrStateMissing ,
states [i ], m .id ))
}
if _ , ok := seen [states [i ]]; !ok {
seen [states [i ]] = struct {}{}
} else {
dups = true
}
}
if dups {
return slicesUniq (states )
}
return states
}
func (m *Machine ) ParseStates (states S ) S {
if m .disposing .Load () {
return nil
}
seen := make (map [string ]struct {})
dups := false
for i := range states {
if _ , ok := m .schema [states [i ]]; !ok {
continue
}
if _ , ok := seen [states [i ]]; !ok {
seen [states [i ]] = struct {}{}
} else {
dups = true
}
}
if dups {
return slicesUniq (states )
}
return slices .Collect (maps .Keys (seen ))
}
func (m *Machine ) VerifyStates (states S ) error {
if m .disposing .Load () {
return nil
}
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
return m .verifyStates (states )
}
func (m *Machine ) verifyStates (states S ) error {
var errs []error
var checked []string
for _ , s := range states {
if _ , ok := m .schema [s ]; !ok {
err := fmt .Errorf ("state %s not defined in schema for %s" , s , m .id )
errs = append (errs , err )
continue
}
checked = append (checked , s )
}
if len (errs ) > 1 {
return errors .Join (errs ...)
} else if len (errs ) == 1 {
return errs [0 ]
}
if len (m .stateNames ) > len (states ) {
missing := StatesDiff (m .stateNames , checked )
return fmt .Errorf (
"error: trying to verify less states than registered: %s" , j (missing ))
}
m .stateNames = slicesUniq (states )
m .stateNamesExport = nil
m .statesVerified .Store (true )
m .tracersMx .RLock ()
for i := 0 ; !m .disposing .Load () && i < len (m .tracers ); i ++ {
m .tracers [i ].VerifyStates (m )
}
m .tracersMx .RUnlock ()
return nil
}
func (m *Machine ) StatesVerified () bool {
return m .statesVerified .Load ()
}
func (m *Machine ) setActiveStates (
calledStates S , targetStates S , isAuto bool ,
) S {
if m .disposing .Load () {
return S {}
}
previous := m .activeStates
newStates := StatesDiff (targetStates , m .activeStates )
removedStates := StatesDiff (m .activeStates , targetStates )
noChangeStates := StatesDiff (targetStates , newStates )
m .activeStates = slices .Clone (targetStates )
for _ , state := range targetStates {
data := m .schema [state ]
if !slices .Contains (previous , state ) {
m .clock [state ]++
} else if slices .Contains (calledStates , state ) && data .Multi {
m .clock [state ] += 2
newStates = append (newStates , state )
}
}
for _ , state := range removedStates {
m .clock [state ]++
}
if m .SemLogger ().Level () >= LogExternal {
logMsg := ""
if len (newStates ) > 0 {
logMsg += " +" + strings .Join (newStates , " +" )
}
if len (removedStates ) > 0 {
logMsg += " -" + strings .Join (removedStates , " -" )
}
if len (noChangeStates ) > 0 && m .semLogger .Level () > LogDecisions {
logMsg += " " + j (noChangeStates )
}
if len (logMsg ) > 0 {
label := "state"
if isAuto {
label = "auto_"
}
args := m .t .Load ().Mutation .LogArgs (m .SemLogger ().ArgsMapper ())
m .log (LogChanges , "[" +label +"]" +logMsg +args )
}
}
return previous
}
func (m *Machine ) AddBreakpoint1 (added string , removed string , strict bool ) {
if added != "" {
m .AddBreakpoint (S {added }, nil , strict )
} else if removed != "" {
m .AddBreakpoint (nil , S {removed }, strict )
} else {
m .log (LogOps , "[breakpoint] invalid" )
}
}
func (m *Machine ) AddBreakpoint (added S , removed S , strict bool ) {
m .breakpointsMx .Lock ()
defer m .breakpointsMx .Unlock ()
m .breakpoints = append (m .breakpoints , &breakpoint {
Added : added ,
Removed : removed ,
Strict : strict ,
})
}
func (m *Machine ) breakpoint (added S , removed S ) {
m .breakpointsMx .Lock ()
defer m .breakpointsMx .Unlock ()
found := false
for _ , bp := range m .breakpoints {
if len (added ) > 0 && !slices .Equal (bp .Added , added ) {
continue
}
if len (removed ) > 0 && !slices .Equal (bp .Removed , removed ) {
continue
}
if bp .Strict {
if len (bp .Added ) > 0 && m .Is (bp .Added ) {
continue
}
if len (bp .Removed ) > 0 && m .Not (bp .Removed ) {
continue
}
}
found = true
}
if !found {
return
}
if m .LogStackTrace {
m .log (LogChanges , "[breakpoint] Machine.breakpoint\n%s" ,
captureStackTrace ())
} else {
m .log (LogChanges , "[breakpoint] Machine.breakpoint" )
}
}
func (m *Machine ) processQueue () Result {
if m .queueLen .Load () == 0 || m .disposing .Load () {
return Canceled
}
if !m .queueProcessing .CompareAndSwap (false , true ) {
m .queueMx .Lock ()
defer m .queueMx .Unlock ()
label := "items"
if len (m .queue ) == 1 {
label = "item"
}
m .log (LogOps , "[postpone] queue running (%d %s)" , len (m .queue ), label )
return Queued
}
var ret []Result
m .queueRunning .Store (false )
for m .queueLen .Load () > 0 {
m .queueRunning .Store (true )
if m .disposing .Load () {
return Canceled
}
m .queueMx .Lock ()
lenQ := len (m .queue )
if lenQ < 1 {
m .Log ("ERROR: missing queue item" )
return Canceled
}
mut := m .queue [0 ]
m .queue = m .queue [1 :]
m .queueLen .Store (uint32 (lenQ - 1 ))
if mut .QueueTick > 0 {
m .queueTicksPending -= 1
m .queueTick += 1
}
m .queueMx .Unlock ()
if mut .ctx != nil && mut .ctx .Err () != nil {
ret = append (ret , Executed )
continue
}
if mut .Type == mutationEval {
m .Log ("eval: " + mut .evalSource )
mut .eval ()
continue
}
t := newTransition (m , mut )
m .schemaMx .RLock ()
ret = append (ret , t .emitEvents ())
m .timeLast .Store (&t .TimeAfter )
m .schemaMx .RUnlock ()
if t .Mutation .IsCheck {
if done , ok := mut .Args [argCheckDone ].(*CheckDone ); ok {
done .Canceled = t .IsAccepted .Load ()
closeSafe (done .Ch )
}
} else if t .IsAccepted .Load () && !t .Mutation .IsCheck {
m .processSubscriptions (t )
}
t .CleanCache ()
}
m .t .Store (nil )
m .queueProcessing .Store (false )
m .queueRunning .Store (false )
m .tracersMx .RLock ()
for i := 0 ; !m .disposing .Load () && i < len (m .tracers ); i ++ {
m .tracers [i ].QueueEnd (m )
}
m .tracersMx .RUnlock ()
m .queueMx .Lock ()
for _ , ch := range m .subs .ProcessWhenQueueEnds () {
closeSafe (ch )
}
m .queueMx .Unlock ()
if len (ret ) == 0 {
return Canceled
}
return ret [0 ]
}
func (m *Machine ) processSubscriptions (t *Transition ) {
m .activeStatesMx .RLock ()
toCancel := m .subs .ProcessStateCtx (t .cacheDeactivated )
toClose := slices .Concat (
m .subs .ProcessWhen (t .cacheActivated , t .cacheDeactivated ),
m .subs .ProcessWhenTime (t .ClockBefore ()),
m .subs .ProcessWhenQueue (m .queueTick ),
m .subs .ProcessWhenQuery (),
)
m .activeStatesMx .RUnlock ()
for _ , cancel := range toCancel {
cancel ()
}
for _ , ch := range toClose {
closeSafe (ch )
}
}
func (m *Machine ) Ctx () context .Context {
return m .ctx
}
func (m *Machine ) Log (msg string , args ...any ) {
if m .disposing .Load () {
return
}
prefix := "[exter"
msg = strings .ReplaceAll (msg , "\n" , " " )
m .log (LogExternal , prefix +"] " +msg , args ...)
}
func (m *Machine ) log (level LogLevel , msg string , args ...any ) {
if level > m .semLogger .Level () || m .disposing .Load () {
return
}
prefix := ""
if m .logId .Load () {
id := m .id
if len (id ) > 5 {
id = id [:5 ]
}
prefix = "[" + id + "] "
msg = prefix + msg
}
out := fmt .Sprintf (msg , args ...)
logger := m .semLogger .Logger ()
if logger != nil {
logger (level , msg , args ...)
} else {
fmt .Println (out )
}
t := m .Transition ()
if t != nil && !t .IsCompleted .Load () {
t .InternalLogEntriesLock .Lock ()
defer t .InternalLogEntriesLock .Unlock ()
t .LogEntries = append (t .LogEntries , &LogEntry {level , out })
} else {
m .logEntriesLock .Lock ()
defer m .logEntriesLock .Unlock ()
if len (m .logEntries ) > 0 && m .logEntries [len (m .logEntries )-1 ].Text == out &&
!strings .HasPrefix (out , prefix +"[pipe-" ) {
return
}
m .logEntries = append (m .logEntries , &LogEntry {
Level : level ,
Text : out ,
})
}
}
func (m *Machine ) SemLogger () SemLogger {
return m .semLogger
}
func (m *Machine ) handle (
name string , args A , isFinal , isEnter , isSelf bool ,
) (Result , bool ) {
if m .disposing .Load () {
return Canceled , false
}
t := m .t .Load ()
e := &Event {
Name : name ,
machine : m ,
machApi : m ,
Args : args ,
TransitionId : t .Id ,
MachineId : m .Id (),
}
targetStates := t .TargetStates ()
t .latestHandlerIsEnter = isEnter
t .latestHandlerIsFinal = isFinal
if e .Args == nil {
e .Args = A {}
}
res , handlerCalled := m .processHandlers (e )
if m .panicCaught .Load () {
res = Canceled
m .panicCaught .Store (false )
}
if !isFinal && res == Canceled {
if m .semLogger .Level () >= LogOps {
var self string
if isSelf {
self = ":self"
}
m .log (LogOps , "[cancel%s] (%s) by %s" , self ,
j (targetStates ), name )
}
return Canceled , handlerCalled
}
return res , handlerCalled
}
func (m *Machine ) processHandlers (e *Event ) (Result , bool ) {
if m .disposing .Load () {
return Canceled , false
}
handlerCalled := false
handlers := m .getHandlers (false )
for i := 0 ; !m .disposing .Load () && i < len (handlers ); i ++ {
h := handlers [i ]
if h == nil {
continue
}
h .mx .Lock ()
methodName := e .Name
handlerName := strconv .Itoa (i ) + ":" + h .name
if m .semLogger .Level () >= LogEverything {
emitterID := truncateStr (handlerName , 15 )
emitterID = padString (strings .ReplaceAll (emitterID , " " , "_" ), 15 , "_" )
m .log (LogEverything , "[handle:%-15s] %s" , emitterID , methodName )
}
_ , ok := h .missingCache [methodName ]
if ok {
h .mx .Unlock ()
continue
}
method , ok := h .methodCache [methodName ]
if !ok {
method = h .methods .MethodByName (methodName )
if !method .IsValid () {
method = h .methods .Elem ().FieldByName (methodName )
}
if !method .IsValid () {
h .missingCache [methodName ] = struct {}{}
h .mx .Unlock ()
continue
}
h .methodCache [methodName ] = method
}
h .mx .Unlock ()
m .log (LogOps , "[handler:%d] %s" , i , methodName )
m .currentHandler .Store (methodName )
var ret bool
var timeout bool
handlerCalled = true
m .tracersMx .RLock ()
tx := m .t .Load ()
for i := range m .tracers {
m .tracers [i ].HandlerStart (tx , handlerName , methodName )
}
m .tracersMx .RUnlock ()
handlerCall := &handlerCall {
fn : method ,
name : methodName ,
event : e ,
timeout : false ,
}
select {
case <- m .ctx .Done ():
break
case m .handlerStart <- handlerCall :
}
m .handlerTimer .Reset (m .HandlerTimeout )
select {
case <- m .ctx .Done ():
case <- m .handlerTimer .C :
m .log (LogOps , "[cancel] (%s) by timeout" , j (tx .TargetStates ()))
m .log (LogDecisions , "[handler:timeout]: %s from %s" , methodName , h .name )
timeout = true
select {
case <- m .handlerEnd :
m .log (LogEverything , "[handler:ack-timeout] %s from %s" , e .Name , h .name )
case <- time .After (m .HandlerDeadline ):
m .log (LogEverything , "[handler:deadline] %s from %s" , e .Name , h .name )
go m .handlerLoop ()
m .queueMx .Lock ()
m .queue = nil
m .queueLen .Store (0 )
m .queueTicksPending = 0
m .queueMx .Unlock ()
err := fmt .Errorf ("%w: %s from %s" , ErrHandlerTimeout , methodName ,
h .name )
m .EvAddErr (e , err , Pass (&AT {
TargetStates : tx .TargetStates (),
CalledStates : tx .CalledStates (),
TimeBefore : tx .TimeBefore ,
TimeAfter : tx .TimeAfter ,
Event : e .Export (),
}))
now := time .Now ()
m .LastHandlerDeadline .Store (&now )
}
case r := <- m .handlerPanic :
m .recoverToErr (h , r )
case ret = <- m .handlerEnd :
m .log (LogEverything , "[handler:end] %s from %s" , e .Name , h .name )
}
m .handlerTimer .Stop ()
m .currentHandler .Store ("" )
m .tracersMx .RLock ()
for i := range m .tracers {
m .tracers [i ].HandlerEnd (tx , handlerName , methodName )
}
m .tracersMx .RUnlock ()
switch {
case timeout :
return Canceled , handlerCalled
case strings .HasSuffix (e .Name , SuffixState ):
case strings .HasSuffix (e .Name , SuffixEnd ):
default :
if !ret {
return Canceled , handlerCalled
}
}
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
for _ , ch := range m .subs .ProcessWhenArgs (e ) {
close (ch )
}
return Executed , handlerCalled
}
func (m *Machine ) handlerLoop () {
ver := m .handlerLoopVer .Add (1 )
catch := func () {
err := recover ()
if err == nil {
return
}
if !m .disposing .Load () {
m .handlerPanic <- recoveryData {
err : err ,
stack : captureStackTrace (),
}
}
}
if m .PanicToException {
defer catch ()
}
for {
select {
case <- m .ctx .Done ():
m .handlerLoopDone ()
return
case call , ok := <- m .handlerStart :
if !ok {
return
}
ret := true
if call .event .IsValid () {
callRet := call .fn .Call ([]reflect .Value {reflect .ValueOf (call .event )})
if len (callRet ) > 0 {
ret = callRet [0 ].Interface ().(bool )
}
} else {
m .log (LogDecisions , "[handler:invalid] %s" , call .name )
ret = false
}
currVer := m .handlerLoopVer .Load ()
if currVer != ver {
m .AddErr (fmt .Errorf (
"deadlined handler finished, theoretical leak: %s" , call .name ), nil )
return
}
m .loopLock .Lock ()
select {
case <- m .ctx .Done ():
m .handlerLoopDone ()
m .loopLock .Unlock ()
return
case m .handlerEnd <- ret :
m .loopLock .Unlock ()
}
}
}
}
func (m *Machine ) handlerLoopDone () {
v , _ := m .ctx .Value (CtxKey ).(CtxValue )
m .log (LogOps , "[doDispose] ctx handlerLoopDone %v" , v )
m .Dispose ()
}
func (m *Machine ) detectQueueDuplicates (mutationType MutationType ,
states S , isCheck bool ,
) bool {
if m .disposing .Load () {
return false
}
found , idx , qTick := m .IsQueued (mutationType , states , true , true , 0 , isCheck ,
PositionAny )
if !found {
return false
}
var counterMutType MutationType
switch mutationType {
case MutationAdd :
counterMutType = MutationRemove
case MutationRemove :
counterMutType = MutationAdd
case MutationSet :
fallthrough
default :
return idx > 0 && len (m .queue )-1 > 0
}
counterMutFound , _ , _ := m .IsQueued (counterMutType , states ,
false , false , qTick +1 , isCheck , PositionAny )
return !counterMutFound
}
func (m *Machine ) Transition () *Transition {
return m .t .Load ()
}
func (m *Machine ) Clock (states S ) Clock {
if m .disposing .Load () {
return Clock {}
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
if states == nil {
states = m .stateNames
}
ret := make (Clock )
for _ , state := range states {
ret [state ] = m .clock [state ]
}
return ret
}
func (m *Machine ) Tick (state string ) uint64 {
if m .disposing .Load () {
return 0
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
return m .clock [state ]
}
func (m *Machine ) IsQueued (mutType MutationType , states S ,
withoutArgsOnly bool , statesStrictEqual bool , minQueueTick uint64 ,
isCheck bool , position Position ,
) (found bool , idx uint16 , qTick uint64 ) {
if m .disposing .Load () {
return false , 0 , 0
}
m .queueMx .RLock ()
defer m .queueMx .RUnlock ()
iter := m .queue
switch position {
case PositionLast :
idx := math .Max (0 , float64 (len (iter )-1 ))
iter = iter [int (idx ):]
case PositionFirst :
iter = iter [0 :1 ]
}
for i , mut := range iter {
if minQueueTick > 0 && mut .QueueTick < minQueueTick {
continue
}
if mut .IsCheck == isCheck &&
mut .Type == mutType &&
((withoutArgsOnly && len (mut .Args ) == 0 ) || !withoutArgsOnly ) &&
((statesStrictEqual &&
len (mut .Called ) == len (states )) ||
(!statesStrictEqual &&
len (mut .Called ) >= len (states ))) &&
slicesEvery (mut .Called , m .Index (states )) {
return true , uint16 (i ), mut .QueueTick
}
}
return false , 0 , 0
}
func (m *Machine ) IsQueuedAbove (threshold int , mutType MutationType ,
states S , withoutArgsOnly bool , statesStrictEqual bool , minQueueTick uint64 ,
) bool {
if m .disposing .Load () {
return false
}
m .queueMx .RLock ()
defer m .queueMx .RUnlock ()
c := 0
for _ , mut := range m .queue {
if minQueueTick > 0 && mut .QueueTick < minQueueTick {
continue
}
if mut .IsCheck == false &&
mut .Type == mutType &&
((withoutArgsOnly && len (mut .Args ) == 0 ) || !withoutArgsOnly ) &&
((statesStrictEqual &&
len (mut .Called ) == len (states )) ||
(!statesStrictEqual &&
len (mut .Called ) >= len (states ))) &&
slicesEvery (mut .Called , m .Index (states )) {
c ++
if c >= threshold {
return true
}
}
}
return false
}
func (m *Machine ) QueueLen () uint16 {
return uint16 (m .queueLen .Load ())
}
func (m *Machine ) WillBe (states S , position ...Position ) bool {
if len (position ) == 0 {
position = []Position {PositionAny }
}
found , idx , _ := m .IsQueued (MutationAdd , states , false , false , 0 , false ,
position [0 ])
switch {
case !found :
return false
case idx == 0 && position [0 ] == PositionFirst :
return true
case idx == uint16 (m .queueLen .Load ())-1 && position [0 ] == PositionLast :
return true
default :
return true
}
}
func (m *Machine ) WillBe1 (state string , position ...Position ) bool {
return m .WillBe (S {state }, position ...)
}
func (m *Machine ) WillBeRemoved (states S , position ...Position ) bool {
if len (position ) == 0 {
position = []Position {PositionAny }
}
found , idx , _ := m .IsQueued (MutationRemove , states , false , false , 0 , false ,
position [0 ])
switch {
case !found :
return false
case idx == 0 && position [0 ] == PositionFirst :
return true
case idx == uint16 (m .queueLen .Load ())-1 && position [0 ] == PositionLast :
return true
default :
return true
}
}
func (m *Machine ) WillBeRemoved1 (state string , position ...Position ) bool {
return m .WillBeRemoved (S {state }, position ...)
}
func (m *Machine ) Has (states S ) bool {
if m .disposing .Load () {
return false
}
return slicesEvery (m .stateNames , states )
}
func (m *Machine ) Has1 (state string ) bool {
return m .Has (S {state })
}
func (m *Machine ) IsClock (clock Clock ) bool {
if m .disposing .Load () {
return false
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
for state , tick := range clock {
if m .clock [state ] != tick {
return false
}
}
return true
}
func (m *Machine ) WasClock (clock Clock ) bool {
if m .disposing .Load () {
return false
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
for state , tick := range clock {
if m .clock [state ] < tick {
return false
}
}
return true
}
func (m *Machine ) IsTime (t Time , states S ) bool {
if m .disposing .Load () {
return false
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
if states == nil {
states = m .stateNames
}
for i , tick := range t {
if m .clock [states [i ]] != tick {
return false
}
}
return true
}
func (m *Machine ) WasTime (t Time , states S ) bool {
if m .disposing .Load () {
return false
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
if states == nil {
states = m .stateNames
}
for i , tick := range t {
if m .clock [states [i ]] < tick {
return false
}
}
return true
}
func (m *Machine ) String () string {
if m .disposing .Load () {
return ""
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
ret := "("
for _ , state := range m .stateNames {
if !slices .Contains (m .activeStates , state ) {
continue
}
if ret != "(" {
ret += " "
}
ret += fmt .Sprintf ("%s:%d" , state , m .clock [state ])
}
return ret + ")"
}
func (m *Machine ) StringAll () string {
if m .disposing .Load () {
return ""
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
ret := "("
ret2 := "["
for _ , state := range m .stateNames {
if slices .Contains (m .activeStates , state ) {
if ret != "(" {
ret += " "
}
ret += fmt .Sprintf ("%s:%d" , state , m .clock [state ])
continue
}
if ret2 != "[" {
ret2 += " "
}
ret2 += fmt .Sprintf ("%s:%d" , state , m .clock [state ])
}
return ret + ") " + ret2 + "]"
}
func (m *Machine ) Inspect (states S ) string {
if m .disposing .Load () {
return ""
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
if states == nil {
states = m .stateNames
}
ret := ""
for _ , name := range states {
state := m .schema [name ]
active := "0"
if slices .Contains (m .activeStates , name ) {
active = "1"
}
ret += fmt .Sprintf ("%s %s\n" +
" |Tick %d\n" , active , name , m .clock [name ])
if state .Auto {
ret += " |Auto true\n"
}
if state .Multi {
ret += " |Multi true\n"
}
if state .Add != nil {
ret += " |Add " + j (state .Add ) + "\n"
}
if state .Require != nil {
ret += " |Require " + j (state .Require ) + "\n"
}
if state .Remove != nil {
ret += " |Remove " + j (state .Remove ) + "\n"
}
if state .After != nil {
ret += " |After " + j (state .After ) + "\n"
}
}
return ret
}
func (m *Machine ) Switch (groups ...S ) string {
activeStates := m .ActiveStates (nil )
for _ , states := range groups {
for _ , state := range states {
if slices .Contains (activeStates , state ) {
return state
}
}
}
return ""
}
func (m *Machine ) ActiveStates (states S ) S {
if m .disposing .Load () {
return S {}
}
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
if states == nil {
return slices .Clone (m .activeStates )
}
ret := make (S , 0 , len (states ))
for _ , state := range states {
if slices .Contains (m .activeStates , state ) {
ret = append (ret , state )
}
}
return ret
}
func (m *Machine ) StateNames () S {
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
if m .stateNamesExport == nil {
m .stateNamesExport = slices .Clone (m .stateNames )
}
return m .stateNamesExport
}
func (m *Machine ) StateNamesMatch (re *regexp .Regexp ) S {
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
ret := S {}
for _ , name := range m .stateNames {
if re .MatchString (name ) {
ret = append (ret , name )
}
}
return ret
}
func (m *Machine ) Queue () []*Mutation {
if m .disposing .Load () {
return nil
}
m .queueMx .RLock ()
defer m .queueMx .RUnlock ()
return slices .Clone (m .queue )
}
func (m *Machine ) Schema () Schema {
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
return CloneSchema (m .schema )
}
func (m *Machine ) SchemaVer () int {
return len (m .StateNames ())
}
func (m *Machine ) SetSchema (newSchema Schema , names S ) error {
if m .Transition () != nil {
return fmt .Errorf ("%w: cannot set schema during a transition" , ErrSchema )
}
m .schemaMx .Lock ()
m .queueMx .RLock ()
defer m .queueMx .RUnlock ()
if len (newSchema ) <= len (m .schema ) {
m .schemaMx .Unlock ()
return fmt .Errorf ("%w: new schema too short (> %d states)" ,
ErrSchema , len (m .schema ))
}
if len (newSchema ) != len (names ) {
m .schemaMx .Unlock ()
return fmt .Errorf ("%w: new schema has to be the same length as" +
" state names" , ErrSchema )
}
old := m .schema
parsed , err := ParseSchema (newSchema )
if err != nil {
m .schemaMx .Unlock ()
return err
}
m .schema = parsed
if err = m .verifyStates (names ); err != nil {
m .schemaMx .Unlock ()
return err
}
m .subs .SetClock (m .Clock (nil ))
m .schemaMx .Unlock ()
m .resolver .NewSchema (m .schema , m .stateNames )
m .tracersMx .RLock ()
for i := 0 ; !m .disposing .Load () && i < len (m .tracers ); i ++ {
m .tracers [i ].SchemaChange (m , old )
}
m .tracersMx .RUnlock ()
return nil
}
func (m *Machine ) EvAdd (event *Event , states S , args A ) Result {
if m .disposing .Load () || m .Backoff () ||
uint16 (m .queueLen .Load ()) >= m .QueueLimit {
return Canceled
}
queueTick := m .queueMutation (MutationAdd , states , args , event )
if queueTick == uint64 (Executed ) {
return Executed
}
m .breakpoint (states , nil )
res := m .processQueue ()
if res == Queued {
return Result (queueTick + 1 )
}
return res
}
func (m *Machine ) EvAdd1 (event *Event , state string , args A ) Result {
return m .EvAdd (event , S {state }, args )
}
func (m *Machine ) EvRemove (event *Event , states S , args A ) Result {
if m .disposing .Load () || m .Backoff () ||
uint16 (m .queueLen .Load ()) >= m .QueueLimit {
return Canceled
}
m .queueMx .RLock ()
lenQueue := len (m .queue )
var statesAny []S
for _ , name := range states {
statesAny = append (statesAny , S {name })
}
if lenQueue == 0 && m .Transition () != nil && !m .Any (statesAny ...) {
m .queueMx .RUnlock ()
return Executed
}
m .queueMx .RUnlock ()
queueTick := m .queueMutation (MutationRemove , states , args , event )
if queueTick == uint64 (Executed ) {
return Executed
}
m .breakpoint (nil , states )
res := m .processQueue ()
if res == Queued {
return Result (queueTick + 1 )
}
return res
}
func (m *Machine ) EvRemove1 (event *Event , state string , args A ) Result {
return m .EvRemove (event , S {state }, args )
}
func (m *Machine ) EvAddErr (event *Event , err error , args A ) Result {
return m .EvAddErrState (event , StateException , err , args )
}
func (m *Machine ) EvAddErrState (
event *Event , state string , err error , args A ,
) Result {
if m .disposing .Load () || m .Backoff () ||
uint16 (m .queueLen .Load ()) >= m .QueueLimit || err == nil {
return Canceled
}
m .err .Store (&err )
var trace string
if m .LogStackTrace {
trace = captureStackTrace ()
}
onErr := m .onError .Load ()
if onErr != nil {
(*onErr )(m , err )
}
argsT := &AT {
Err : err ,
ErrTrace : trace ,
}
return m .EvAdd (event , S {state , StateException }, PassMerge (args , argsT ))
}
func (m *Machine ) Export () (*Serialized , Schema , error ) {
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
m .queueMx .RLock ()
defer m .queueMx .RUnlock ()
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
if !m .statesVerified .Load () {
return nil , nil , fmt .Errorf ("%w: call VerifyStates first" , ErrSchema )
}
t := m .time (nil )
m .log (LogOps , "[import] exported at %d ticks" , t .Sum (nil ))
return &Serialized {
ID : m .id ,
Time : t ,
MachineTick : m .machineTick ,
StateNames : m .stateNames ,
QueueTick : m .queueTick ,
}, CloneSchema (m .schema ), nil
}
func (m *Machine ) Import (data *Serialized ) error {
m .activeStatesMx .RLock ()
defer m .activeStatesMx .RUnlock ()
m .queueMx .RLock ()
defer m .queueMx .RUnlock ()
m .schemaMx .Lock ()
defer m .schemaMx .Unlock ()
if m .id != data .ID {
return fmt .Errorf ("import ID mismatch" )
}
if len (m .schema ) != len (data .StateNames ) {
return fmt .Errorf ("%w: importing diff state len" , ErrSchema )
}
var sum uint64
m .activeStates = nil
for idx , v := range data .Time {
state := data .StateNames [idx ]
sum += v
if !slices .Contains (m .stateNames , state ) {
return fmt .Errorf ("%w: %s" , ErrStateUnknown , state )
}
if IsActiveTick (v ) {
m .activeStates = append (m .activeStates , state )
}
m .clock [state ] = v
}
m .stateNames = data .StateNames
m .stateNamesExport = nil
m .statesVerified .Store (true )
m .machineTick = data .MachineTick + 1
if m .Has1 (StateMachineRestored ) {
m .Add1 (StateMachineRestored , nil )
}
m .log (LogChanges , "[import] imported %d times, now at %d ticks" ,
m .machineTick , sum )
return nil
}
func (m *Machine ) Index1 (state string ) int {
if m .disposing .Load () {
return -1
}
return slices .Index (m .StateNames (), state )
}
func (m *Machine ) Index (states S ) []int {
if m .disposing .Load () {
return []int {}
}
index := m .StateNames ()
return StatesToIndex (index , states )
}
func (m *Machine ) Resolver () RelationsResolver {
return m .resolver
}
func (m *Machine ) BindTracer (tracer Tracer ) error {
m .tracersMx .Lock ()
defer m .tracersMx .Unlock ()
v := reflect .ValueOf (tracer )
if v .Kind () != reflect .Ptr || v .Elem ().Kind () != reflect .Struct {
return errors .New ("BindTracer expects a pointer to a struct" )
}
name := reflect .TypeOf (tracer ).Elem ().Name ()
m .tracers = append (m .tracers , tracer )
m .log (LogOps , "[tracers] bind %s" , name )
return nil
}
func (m *Machine ) DetachTracer (tracer Tracer ) error {
if m .disposing .Load () {
return nil
}
m .tracersMx .Lock ()
defer m .tracersMx .Unlock ()
v := reflect .ValueOf (tracer )
if v .Kind () != reflect .Ptr || v .Elem ().Kind () != reflect .Struct {
return errors .New ("DetachTracer expects a pointer to a struct" )
}
name := reflect .TypeOf (tracer ).Elem ().Name ()
for i , t := range m .tracers {
if t == tracer {
m .tracers = slices .Delete (m .tracers , i , i +1 )
m .log (LogOps , "[tracers] detach %s" , name )
return nil
}
}
return errors .New ("tracer not bound" )
}
func (m *Machine ) Tracers () []Tracer {
m .tracersMx .Lock ()
defer m .tracersMx .Unlock ()
return slices .Clone (m .tracers )
}
func (m *Machine ) OnError (fn HandlerError ) {
m .onError .Store (&fn )
}
func (m *Machine ) Backoff () bool {
last := m .LastHandlerDeadline .Load ()
return last != nil && time .Since (*last ) < m .HandlerBackoff
}
func (m *Machine ) OnChange (fn HandlerChange ) {
m .onChange .Store (&fn )
}
func (m *Machine ) SetGroups (groups any , optStates States ) {
m .schemaMx .Lock ()
defer m .schemaMx .Unlock ()
list := map [string ][]int {}
order := []string {}
index := m .stateNames
if groups != nil {
val := reflect .ValueOf (groups )
typ := val .Type ()
for i := 0 ; i < val .NumField (); i ++ {
field := typ .Field (i )
kind := field .Type .Kind ()
if kind != reflect .Slice {
continue
}
name := field .Name
value := val .Field (i ).Interface ()
if states , ok := value .(S ); ok {
list [name ] = StatesToIndex (index , states )
order = append (order , name )
}
}
}
if optStates != nil {
groups , order2 := optStates .StateGroups ()
for _ , name := range order2 {
list [name ] = groups [name ]
order = append (order , name )
}
}
m .groups = list
m .groupsOrder = order
}
func (m *Machine ) SetGroupsString (groups map [string ]S , order []string ) {
m .schemaMx .Lock ()
defer m .schemaMx .Unlock ()
list := map [string ][]int {}
for name , states := range groups {
list [name ] = StatesToIndex (m .stateNames , states )
}
m .groups = list
m .groupsOrder = order
}
func (m *Machine ) Groups () (map [string ][]int , []string ) {
m .schemaMx .RLock ()
defer m .schemaMx .RUnlock ()
return m .groups , m .groupsOrder
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .