// Package machine is a nondeterministic, multi-state, clock-based, relational, // optionally-accepting, and non-blocking state machine.
package machine // import "github.com/pancsta/asyncmachine-go/pkg/machine" import ( ) var _ Api = &Machine{} // Machine provides a state machine API with mutations, state schema, handlers, // subscriptions, tracers, and helpers methods. It also holds a queue of // mutations to execute. type Machine struct { // Maximum number of mutations that can be queued. Default: 1000. QueueLimit uint16 // HandlerTimeout defined the time for a handler to execute before it causes // StateException. Default: 1s. See also [Opts.HandlerTimeout]. // Using HandlerTimeout can cause race conditions, unless paired with // [Event.IsValid]. HandlerTimeout time.Duration // HandlerDeadline is a grace period after a handler timeout, before the // machine moves on. HandlerDeadline time.Duration // LastHandlerDeadline stores when the last HandlerDeadline was hit. LastHandlerDeadline atomic.Pointer[time.Time] // HandlerBackoff is the time after a [HandlerDeadline], during which the // machine will return [Canceled] to any mutation. HandlerBackoff time.Duration // EvalTimeout is the time the machine will try to execute an eval func. // Like any other handler, eval func also has [HandlerTimeout]. Default: 1s. EvalTimeout time.Duration // If true, the machine will log stack traces of errors. Default: true. // Requires an ExceptionHandler binding and [Machine.PanicToException] set. LogStackTrace bool // If true, the machine will catch panic and trigger the [StateException] // state. Default: true. Can be disabled via AM_DEBUG=1. PanicToException bool // DisposeTimeout specifies the duration to wait for the queue to drain during // disposal. Default 1s. DisposeTimeout time.Duration // subs is the subscription manager. subs *Subscriptions panicCaught atomic.Bool // If true, logs will start with the machine's id (5 chars). // Default: true. logId atomic.Bool // statesVerified assures the state names have been ordered using // [Machine.VerifyStates]. statesVerified atomic.Bool // Unique ID of this machine. Default: random. id string // ctx is the context of the machine. ctx context.Context // parentId is the id of the parent machine (if any). parentId string // disposing disabled auto schema disposing atomic.Bool // disposed tells if the machine has been disposed and is no-op. disposed atomic.Bool // queueRunning indicates the queue is currently being executed. queueRunning atomic.Bool // tags are short strings describing the machine. tags atomic.Pointer[[]string] // tracers are optional tracers for telemetry integrations. tracers []Tracer tracersMx sync.RWMutex // Err is the last error that occurred. err atomic.Pointer[error] // Currently executing transition (if any). t atomic.Pointer[Transition] // schema is a map of state names to state definitions. // TODO atomic? schema Schema groups map[string][]int groupsOrder []string schemaMx sync.RWMutex // activeStates is a list of currently active schema. activeStates S activeStatesMx sync.RWMutex // queue of mutations to be executed. queue []*Mutation // queueTick is the number of times the queue has processed an appended // mutation. Starts from [1], for easy comparison with [Result]. queueTick uint64 machineTick uint32 // queueTicksPending is the part of the queue with queue ticks assigned. queueTicksPending uint64 queueToken atomic.Uint64 queueMx sync.RWMutex queueProcessing atomic.Bool // total len of the queue, both appended (with queue ticks) and prepended. // TODO should be uint16 queueLen atomic.Uint32 // length of the ticking mutations // Relation resolver, used to produce target schema of a transition. // Default: *DefaultRelationsResolver. resolver RelationsResolver // List of all the registered state names. stateNames S stateNamesExport S loopLock sync.Mutex handlers []*handler handlersMx sync.RWMutex clock Clock cancel context.CancelFunc logLevel atomic.Pointer[LogLevel] logger atomic.Pointer[LoggerFn] semLogger SemLogger handlerStart chan *handlerCall handlerEnd chan bool handlerPanic chan recoveryData handlerTimer *time.Timer logEntriesLock sync.Mutex logEntries []*LogEntry logArgs atomic.Pointer[LogArgsMapperFn] currentHandler atomic.Value disposeHandlers []HandlerDispose timeLast atomic.Pointer[Time] // Channel closing when the machine finished disposal. Read-only. // TODO replace with Ctx.Done() ? whenDisposed chan struct{} handlerLoopRunning atomic.Bool handlerLoopVer atomic.Int32 detectEval bool // unlockDisposed means that disposal is in progress and holding the queueMx unlockDisposed atomic.Bool // breakpoints are a list of breakpoints for debugging. [][added, removed] breakpointsMx sync.Mutex breakpoints []*breakpoint onError atomic.Pointer[HandlerError] onChange atomic.Pointer[HandlerChange] } // NewCommon creates a new Machine instance with all the common options set. func ( context.Context, string, Schema, S, any, Api, *Opts, ) (*Machine, error) { := &Opts{Id: } if != nil { = .Id = } if os.Getenv(EnvAmDebug) != "" { = OptsWithDebug() } else if os.Getenv(EnvAmTestRunner) != "" { .HandlerTimeout = 1 * time.Second } if != nil { .Parent = } if .LogArgs == nil { .LogArgs = NewArgsMapper(LogArgs, 0) } := New(, , ) := .VerifyStates() if != nil { return nil, } if != nil { = .BindHandlers() if != nil { return nil, } } return , nil } // New creates a new Machine instance, bound to context and modified with // optional Opts. func ( context.Context, Schema, *Opts) *Machine { // parse relations , := ParseSchema() := &Machine{ HandlerTimeout: 100 * time.Millisecond, HandlerDeadline: 10 * time.Second, HandlerBackoff: 3 * time.Second, EvalTimeout: time.Second, LogStackTrace: true, PanicToException: true, QueueLimit: 1000, DisposeTimeout: time.Second, id: randId(), schema: , clock: Clock{}, handlers: []*handler{}, handlerStart: make(chan *handlerCall), handlerEnd: make(chan bool), handlerPanic: make(chan recoveryData), handlerTimer: time.NewTimer(24 * time.Hour), whenDisposed: make(chan struct{}), // queue ticks start from 1 to align with the [Result] enum queueTick: 1, } .subs = NewSubscriptionManager(, .clock, .is, .not, .log) .semLogger = &semLogger{mach: } .logId.Store(true) .timeLast.Store(&Time{}) := LogNothing .logLevel.Store(&) // parse opts // TODO extract = cloneOptions() var Api if != nil { if .Id != "" { .id = .Id } if .HandlerTimeout != 0 { .HandlerTimeout = .HandlerTimeout } if .HandlerDeadline != 0 { .HandlerDeadline = .HandlerDeadline } if .HandlerBackoff != 0 { .HandlerBackoff = .HandlerBackoff } if .DontPanicToException { .PanicToException = false } if .DontLogStackTrace { .LogStackTrace = false } if .DontLogId { .logId.Store(false) } if .Resolver != nil { .resolver = .Resolver } if .LogLevel != LogNothing { .semLogger.SetLevel(.LogLevel) } if .Tracers != nil { .tracers = .Tracers } if .LogArgs != nil { .logArgs.Store(&.LogArgs) } if .QueueLimit != 0 { .QueueLimit = .QueueLimit } if len(.Tags) > 0 { := slicesUniq(.Tags) .tags.Store(&) } .detectEval = .DetectEval = .Parent .parentId = .ParentId } if os.Getenv(EnvAmDetectEval) != "" { .detectEval = true } // default resolver if .resolver == nil { .resolver = &DefaultRelationsResolver{ Machine: , } } // define the exception state (if missing) if , := .schema[StateException]; ! { .schema[StateException] = State{ Multi: true, } } // infer and sort states for defaults for := range .schema { .stateNames = append(.stateNames, ) slices.Sort(.stateNames) .clock[] = 0 } // notify the resolver .resolver.NewSchema(.schema, .stateNames) // init context (support nil for examples) if == nil { = context.TODO() } .ctx, .cancel = context.WithCancel() if != nil { .parentId = .Id() := .Tracers() // info the tracers about this being a submachine for , := range { .NewSubmachine(, ) } } // tracers for := range .tracers { := .tracers[].MachineInit() // TODO check that ctxTr is a child of ctx if != nil { .ctx = } } if != nil { .AddErr(, nil) } return } // OnDispose adds a function to be called after the machine gets disposed. // These functions will run synchronously just before WhenDisposed() channel // gets closed. Considering it's a low-level feature, it's advised to handle // disposal via dedicated states. func ( *Machine) ( HandlerDispose) { .handlersMx.Lock() defer .handlersMx.Unlock() .disposeHandlers = append(.disposeHandlers, ) } // Dispose disposes the machine and all its emitters. You can wait for the // completion of the disposal with `<-mach.WhenDisposed`. func ( *Machine) () { // doDispose in a goroutine to avoid a deadlock when called from within a // handler // TODO var if .Has1(StateStart) { .Remove1(StateStart, nil) } go func() { if .disposing.Load() { .log(LogDecisions, "[Dispose] already disposed") return } .queueProcessing.Store(false) .unlockDisposed.Store(true) .doDispose(false) }() } // IsDisposed returns true if the machine has been disposed. func ( *Machine) () bool { return .disposed.Load() } // DisposeForce disposes the machine and all its emitters, without waiting for // the queue to drain. Will cause panics. func ( *Machine) () { .doDispose(true) } func ( *Machine) ( bool) { if .disposed.Load() { // already disposed return } if !.disposing.CompareAndSwap(false, true) { // already disposing return } .cancel() if ! { := .WhenQueueEnds() select { case <-time.After(.DisposeTimeout): .log(LogDecisions, "[doDispose] timeout waiting for queue to drain") case <-: } } if !.disposed.CompareAndSwap(false, true) { // already disposed return } // TODO needed? // time.Sleep(100 * time.Millisecond) .tracersMx.RLock() for := range .tracers { .tracers[].MachineDispose(.Id()) } .tracersMx.RUnlock() // skip the locks when forcing if ! { .activeStatesMx.Lock() defer .activeStatesMx.Unlock() .subs.Mx.Lock() defer .subs.Mx.Unlock() .tracersMx.Lock() defer .tracersMx.Unlock() .handlersMx.Lock() defer .handlersMx.Unlock() .queueMx.Lock() defer .queueMx.Unlock() } .log(LogEverything, "[end] doDispose") if .Err() == nil && .ctx.Err() != nil { := .ctx.Err() .err.Store(&) } .handlers = nil // dispose event loop go func() { time.Sleep(100 * time.Millisecond) .loopLock.Lock() defer .loopLock.Unlock() closeSafe(.handlerEnd) closeSafe(.handlerPanic) closeSafe(.handlerStart) }() // dispose chans .subs.dispose() for , := range .queue { if !.IsCheck { continue } if , := .Args[argCheckDone].(*CheckDone); { closeSafe(.Ch) } } if .handlerTimer != nil { .handlerTimer.Stop() // m.handlerTimer = nil } // release the queue lock if .unlockDisposed.Load() { .unlockDisposed.Store(false) .queueProcessing.Store(false) } // TODO close all the CheckDone chans in the queue // run doDispose handlers // TODO timeouts? for , := range .disposeHandlers { (.id, .ctx) } // TODO disposeHandlers refs to other machines // m.disposeHandlers = nil // the end closeSafe(.whenDisposed) } func ( *Machine) ( bool) []*handler { if ! { .handlersMx.Lock() defer .handlersMx.Unlock() } return slices.Clone(.handlers) } func ( *Machine) ( bool, []*handler) { if ! { .handlersMx.Lock() defer .handlersMx.Unlock() } .handlers = } // WhenErr returns a channel that will be closed when the machine is in the // StateException state. // // ctx: optional context that will close the channel early. func ( *Machine) ( context.Context) <-chan struct{} { return .When([]string{StateException}, ) } // When returns a channel that will be closed when all the passed states // become active or the machine gets disposed. // // ctx: optional context that will close the channel early. func ( *Machine) ( S, context.Context) <-chan struct{} { if .disposed.Load() { return newClosedChan() } // locks .activeStatesMx.Lock() defer .activeStatesMx.Unlock() return .subs.When(.mustParseStates(), ) } // When1 is an alias to When() for a single state. // See When. func ( *Machine) ( string, context.Context) <-chan struct{} { return .When(S{}, ) } // WhenNot returns a channel that will be closed when all the passed states // become inactive or the machine gets disposed. // // ctx: optional context that will close the channel early. func ( *Machine) ( S, context.Context) <-chan struct{} { if .disposed.Load() { := make(chan struct{}) close() return } // locks .activeStatesMx.Lock() defer .activeStatesMx.Unlock() return .subs.WhenNot(.mustParseStates(), ) } // WhenNot1 is an alias to WhenNot() for a single state. // See WhenNot. func ( *Machine) ( string, context.Context) <-chan struct{} { return .WhenNot(S{}, ) } // WhenTime returns a channel that will be closed when all the passed states // have passed the specified time. The time is a logical clock of the state. // Machine time can be sourced from [Machine.Time](), or [Machine.Clock](). // // ctx: optional context that will close the channel early. func ( *Machine) ( S, Time, context.Context, ) <-chan struct{} { if .disposed.Load() { return newClosedChan() } // close early on invalid if len() != len() { := fmt.Errorf( "whenTime: states and times must have the same length (%s)", j()) .AddErr(, nil) return newClosedChan() } // locks .activeStatesMx.Lock() defer .activeStatesMx.Unlock() return .subs.WhenTime(, , ) } // WhenTime1 waits till ticks for a single state equal the given value (or // more). // // ctx: optional context that will close the channel early. func ( *Machine) ( string, uint64, context.Context, ) <-chan struct{} { return .WhenTime(S{}, Time{}, ) } // WhenTicks waits N ticks of a single state (relative to now). Uses WhenTime // underneath. // // ctx: optional context that will close the channel early. func ( *Machine) ( string, int, context.Context, ) <-chan struct{} { return .WhenTime(S{}, Time{uint64() + .Tick()}, ) } // WhenQuery returns a channel that will be closed when the passed [clockCheck] // function returns true. [clockCheck] should be a pure function and // non-blocking.` // // ctx: optional context that will close the channel early. func ( *Machine) ( func( Clock) bool, context.Context, ) <-chan struct{} { // TODO test case // TODO add to Api if .disposed.Load() { return newClosedChan() } // locks .activeStatesMx.Lock() defer .activeStatesMx.Unlock() return .subs.WhenQuery(, ) } // WhenQueueEnds closes every time the queue ends, or the optional ctx expires. // // ctx: optional context that will close the channel early. func ( *Machine) () <-chan struct{} { // finish early if .disposed.Load() || !.queueRunning.Load() { return newClosedChan() } // locks .queueMx.Lock() defer .queueMx.Unlock() return .subs.WhenQueueEnds() } // WhenQueue waits until the passed queueTick gets processed. // TODO example func ( *Machine) ( Result) <-chan struct{} { if .disposed.Load() { return newClosedChan() } // locks .queueMx.Lock() defer .queueMx.Unlock() // finish early if .queueTick >= uint64() { return newClosedChan() } return .subs.WhenQueue() } // WhenArgs returns a channel that will be closed when the passed state // becomes active with all the passed args. Args are compared using the native // '=='. It's meant to be used with async Multi states, to filter out // a specific call. // // ctx: optional context that will close the channel when handler loop ends. func ( *Machine) ( string, A, context.Context, ) <-chan struct{} { if .disposed.Load() { return newClosedChan() } // locks .activeStatesMx.Lock() defer .activeStatesMx.Unlock() := .mustParseStates(S{}) return .subs.WhenArgs([0], , ) } // WhenDisposed returns a channel that will be closed when the machine is // disposed. Requires bound handlers. Use Machine.disposed in case no handlers // have been bound. func ( *Machine) () <-chan struct{} { return .whenDisposed } // TODO implement +rpc worker // func (m *Machine) SetArgsComp(comp func(args A, match A) bool) { // return false // } // debug // func (m *Machine) QueueDump() []string { // m.queueLock.Lock() // defer m.queueLock.Unlock() // ret := make([]string, 0) // // index := m.StateNames() // for _, mut := range m.queue { // if mut.Type == mutationEval { // continue // } // // ret = append(ret, mut.StringFromIndex(index)) // } // // return ret // } // QueueTick is the number of times the queue has processed a mutation. Starts // from [1], for easy comparison with [Result]. func ( *Machine) () uint64 { .queueMx.Lock() defer .queueMx.Unlock() return .queueTick } // MachineTick is the number of times the machine has been started. Each start // means an empty queue. Only set by [Machine.Import]. func ( *Machine) () uint32 { .schemaMx.RLock() defer .schemaMx.RUnlock() return .machineTick } // Time returns machine's time, a list of ticks per state. Returned value // includes the specified states, or all the states if nil. func ( *Machine) ( S) Time { if .disposed.Load() { return nil } .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() return .time() } func ( *Machine) ( S) Time { if .disposed.Load() { return nil } .schemaMx.RLock() defer .schemaMx.RUnlock() if == nil { = .stateNames } := make(Time, len()) for , := range { [] = .clock[] } return } // PrependMut prepends the mutation to the front of the queue. This is a special // cases only method and should be used with caution, as it can create an // infinite loop. It's useful for postponing mutations inside a negotiation // handler. The returned Result can't be waited on, as prepended mutations don't // create a queue tick. func ( *Machine) ( *Mutation) Result { if .disposing.Load() { return Canceled } := .Type == mutationEval := .mustParseStates(IndexToStates(.StateNames(), .Called)) .log(LogOps, "[prepend:%s] %s%s", .Type, j(), .LogArgs(.SemLogger().ArgsMapper())) .queueMx.Lock() if ! { .cacheCalled.Store(&) } if .id == "ns-TestManyStates" { print() } .queue = append([]*Mutation{}, .queue...) := len(.queue) .queueLen.Store(uint32()) if ! { .QueueLen = int32() .QueueToken = .queueToken.Add(1) .QueueTickNow = .queueTick } .queueMx.Unlock() // tracers if ! { .tracersMx.RLock() for := 0; !.disposing.Load() && < len(.tracers); ++ { .tracers[].MutationQueued(, ) } .tracersMx.RUnlock() } := .processQueue() return } // Add activates a list of states in the machine, returning the result of the // transition (Executed, Queued, Canceled). // Like every mutation method, it will resolve relations and trigger handlers. func ( *Machine) ( S, A) Result { if .disposing.Load() || .Backoff() { return Canceled } if .id == "ns-TestManyStates" { print() } // let Exception in even with a full queue, but only once if uint16(.queueLen.Load()) >= .QueueLimit { if !slices.Contains(, StateException) || .IsErr() { return Canceled } } := .queueMutation(MutationAdd, , , nil) if == uint64(Executed) { return Executed } .breakpoint(, nil) := .processQueue() if == Queued { return Result() } return } // Add1 is a shorthand method to add a single state with the passed args. // See Add(). func ( *Machine) ( string, A) Result { return .Add(S{}, ) } // Toggle deactivates a list of states in case all are active, or activates // them otherwise. Returns the result of the transition (Executed, Queued, // Canceled). func ( *Machine) ( S, A) Result { if .disposing.Load() { return Canceled } if .Is() { return .Remove(, ) } return .Add(, ) } // Toggle1 activates or deactivates a single state, depending on its current // state. Returns the result of the transition (Executed, Queued, Canceled). func ( *Machine) ( string, A) Result { if .disposing.Load() { return Canceled } if .Is1() { return .Remove1(, ) } return .Add1(, ) } // EvToggle is a traced version of [Machine.Toggle]. func ( *Machine) ( *Event, S, A) Result { if .disposing.Load() { return Canceled } if .Is() { return .EvRemove(, , ) } return .EvAdd(, , ) } // EvToggle1 is a traced version of [Machine.Toggle1]. func ( *Machine) ( *Event, string, A) Result { if .disposing.Load() { return Canceled } if .disposing.Load() { return Canceled } if .Is1() { return .EvRemove1(, , ) } return .EvAdd1(, , ) } // AddErr is a dedicated method to add the StateException state with the passed // error and optional arguments. // Like every mutation method, it will resolve relations and trigger handlers. // AddErr produces a stack trace of the error, if LogStackTrace is enabled. func ( *Machine) ( error, A) Result { return .AddErrState(StateException, , ) } // AddErrState adds a dedicated error state, along with the build in // StateException state. Like every mutation method, it will resolve relations // and trigger handlers. AddErrState produces a stack trace of the error, if // LogStackTrace is enabled. func ( *Machine) ( string, error, A) Result { if .disposing.Load() || .Backoff() || == nil { return Canceled } // TODO test Err() .err.Store(&) var string if .LogStackTrace { = captureStackTrace() } // build args := &AT{ Err: , ErrTrace: , } // OnError handler if := .onError.Load(); != nil { (*)(, ) } // TODO prepend to the queue? what effects / benefits return .Add(S{, StateException}, PassMerge(, )) } // CanAdd checks if [states] can be added and returns Executed or // [AT.CheckDone] if a dry run mutation passes. Useful for reducing failed // negotiations. func ( *Machine) ( S, A) Result { if .disposing.Load() || .Backoff() { return Canceled } return .PrependMut(&Mutation{ Type: MutationAdd, Called: .Index(), Args: , IsCheck: true, }) } // CanAdd1 is [Machine.CanAdd] for a single state. func ( *Machine) ( string, A) Result { return .CanAdd(S{}, ) } // CanRemove checks if [states] can be removed and returns Executed or // [AT.CheckDone] if a dry run mutation passes. Useful for reducing failed // negotiations. func ( *Machine) ( S, A) Result { if .disposing.Load() || .Backoff() { return Canceled } return .PrependMut(&Mutation{ Type: MutationRemove, Called: .Index(), Args: , IsCheck: true, }) } // CanRemove1 is [Machine.CanRemove] for a single state. func ( *Machine) ( string, A) Result { return .CanRemove(S{}, nil) } // PanicToErr will catch a panic and add the StateException state. Needs to // be called in a defer statement, just like a recover() call. func ( *Machine) ( A) { if !.PanicToException || .disposing.Load() { return } := recover() if == nil { return } if , := .(error); { .AddErr(, ) } else { .AddErr(fmt.Errorf("%v", ), ) } } // PanicToErrState will catch a panic and add the StateException state, along // with the passed state. Needs to be called in a defer statement, just like a // recover() call. func ( *Machine) ( string, A) { if !.PanicToException || .disposing.Load() { return } := recover() if == nil { return } if , := .(error); { .AddErrState(, , ) } else { .AddErrState(, fmt.Errorf("%v", ), ) } } // IsErr checks if the machine has the StateException state currently active. func ( *Machine) () bool { return .Is(S{StateException}) } // Err returns the last error. func ( *Machine) () error { := .err.Load() if == nil { return nil } return * } // Remove deactivates a list of states in the machine, returning the result of // the transition (Executed, Queued, Canceled). // Like every mutation method, it will resolve relations and trigger handlers. func ( *Machine) ( S, A) Result { if .disposing.Load() || .Backoff() { return Canceled } // let Exception in even with a full queue, but only once if uint16(.queueLen.Load()) >= .QueueLimit { if !slices.Contains(, StateException) || !.IsErr() { return Canceled } } // return early if none of the states is active .queueMx.RLock() := len(.queue) // try ignoring this mutation, if none of the states is currently active var []S for , := range { = append(, S{}) } if == 0 && .Transition() != nil && !.Any(...) { .queueMx.RUnlock() return Executed } .queueMx.RUnlock() := .queueMutation(MutationRemove, , , nil) if == uint64(Executed) { return Executed } .breakpoint(nil, ) := .processQueue() if == Queued { return Result( + 1) } return } // Remove1 is [Machine.Remove1] for a single state. func ( *Machine) ( string, A) Result { return .Remove(S{}, ) } // Set deactivates a list of states in the machine, returning the result of // the transition (Executed, Canceled, Queued). // Like every mutation method, it will resolve relations and trigger handlers. func ( *Machine) ( S, A) Result { if .disposing.Load() || uint16(.queueLen.Load()) >= .QueueLimit { return Canceled } := .queueMutation(MutationSet, , , nil) if == uint64(Executed) { return Executed } // TODO breakpoints from states-stateNames() as added, and the rest as removed := .processQueue() if == Queued { return Result( + 1) } return } // TODO Set1Cond(name, args, cond bool) // Id returns the machine's id. func ( *Machine) () string { return .id } // ParentId returns the ID of the parent machine (if any). func ( *Machine) () string { return .parentId } // Tags returns machine's tags, a list of unstructured strings without spaces. func ( *Machine) () []string { := .tags.Load() if != nil { return slices.Clone(*) } return nil } // SetTags updates the machine's tags with the provided slice of strings. func ( *Machine) ( []string) { .tags.Store(&) } // Is checks if all the passed states are currently active. // // machine.StringAll() // ()[Foo:0 Bar:0 Baz:0] // machine.Add(S{"Foo"}) // machine.Is(S{"Foo"}) // true // machine.Is(S{"Foo", "Bar"}) // false func ( *Machine) ( S) bool { if .disposing.Load() { return false } .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() return .is() } // Is1 is [Machine.Is] for a single state. func ( *Machine) ( string) bool { return .Is(S{}) } // is is an unsafe version of Is(), make sure to acquire activeStatesMx. func ( *Machine) ( S) bool { if .disposing.Load() { return false } := .activeStates // TODO optimize, dont copy, use map index for , := range { if !slices.Contains(.stateNames, ) { .log(LogDecisions, "[is] state %s not found", ) return false } if !slices.Contains(, ) { return false } } return true } // Not checks if **none** of the passed states are currently active. // // machine.StringAll() // // -> ()[A:0 B:0 C:0 D:0] // machine.Add(S{"A", "B"}) // // // not(A) and not(C) // machine.Not(S{"A", "C"}) // // -> false // // // not(C) and not(D) // machine.Not(S{"C", "D"}) // // -> true func ( *Machine) ( S) bool { if .disposing.Load() { return false } .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() return .not() } func ( *Machine) ( S) bool { return slicesNone(.mustParseStates(), .activeStates) } // Not1 is [Machine.Not] for a single state. func ( *Machine) ( string) bool { return .Not(S{}) } // Any is a group call to Is, returns true if any of the params return true // from Is. // // machine.StringAll() // ()[Foo:0 Bar:0 Baz:0] // machine.Add(S{"Foo"}) // // is(Foo, Bar) or is(Bar) // machine.Any(S{"Foo", "Bar"}, S{"Bar"}) // false // // is(Foo) or is(Bar) // machine.Any(S{"Foo"}, S{"Bar"}) // true func ( *Machine) ( ...S) bool { for , := range { if .Is() { return true } } return false } // Any1 is group call to [Machine.Is1], returns true if any of the params return // true from [Machine.Is1]. func ( *Machine) ( ...string) bool { for , := range { if .Is1() { return true } } return false } // queueMutation queues a mutation to be executed. Returns >= 0 if the mutation // was queued. 0 mean NoOp. func ( *Machine) ( MutationType, S, A, *Event, ) uint64 { := .mustParseStates() := false for , := range { if .schema[].Multi { = true break } } // TODO check queuelen > max(int16) // Detect duplicates and avoid queueing them, but not for multi states, nor // any args. if ! && len() == 0 && .detectQueueDuplicates(, , false) { .log(LogOps, "[queue:skipped] Duplicate detected for [%s] %s", , j()) return uint64(Executed) } // args should always be initialized if == nil { = A{} } // prep the mutation var *MutSource if != nil { := .Transition() = &MutSource{ MachId: .MachineId, TxId: .TransitionId, } if != nil { .MachTime = .TimeBefore.Sum(nil) } } := &Mutation{ Type: , Called: .Index(), Args: , Source: , } .cacheCalled.Store(&) // work the queue and persist in the mutation .queueMx.Lock() if .id == "ns-TestManyStates" { print() } .queue = append(.queue, ) := len(.queue) .queueLen.Store(uint32()) .queueTicksPending += 1 .QueueLen = int32() .QueueTick = .queueTicksPending + .queueTick .QueueTickNow = .queueTick .queueMx.Unlock() // tracers .log(LogOps, "[queue:%s] %s%s", , j(), .LogArgs(.SemLogger().ArgsMapper())) .tracersMx.RLock() for := 0; !.disposing.Load() && < len(.tracers); ++ { .tracers[].MutationQueued(, ) } .tracersMx.RUnlock() // breakpoints if .Type == MutationAdd { .breakpoint(, nil) } else if == MutationRemove { .breakpoint(nil, ) } return .QueueTick } // Eval executes a function on the machine's queue, allowing to avoid using // locks for non-handler code. Blocking code should NOT be scheduled here. // Eval cannot be called within a handler's critical zone, as both are using // the same serial queue and will deadlock. Eval has a timeout of // HandlerTimeout/2 and will return false in case it happens. Evals do not // trigger consensus, thus are much faster than state mutations. // // ctx: nil context defaults to machine's context. // // Note: usage of Eval is discouraged. But if you have to, use AM_DETECT_EVAL in // tests for deadlock detection. Most usages of eval can be replaced with // atomics or returning from mutation via channels. func ( *Machine) ( string, func(), context.Context) bool { if .disposing.Load() { return false } if == "" { panic("error: source of eval is required") } // check every method of every handler against the stack trace if .detectEval { := captureStackTrace() for := 0; !.disposing.Load() && < len(.handlers); ++ { := .handlers[] for , := range .methodNames { // skip " in goroutine N" entries := fmt.Sprintf(".(*%s).%s(", .name, ) for , := range strings.Split(, "\n") { if !strings.Contains(, ) { continue } := fmt.Sprintf("error: Eval() called directly in handler %s.%s", .name, ) panic() } } } } .log(LogOps, "[eval] %s", ) // wrap the func with a handlerLoopDone channel := make(chan struct{}) := atomic.Bool{} := func() { // TODO optimize: close earlier when [canceled] defer close() if .Load() { return } () } if == nil { = context.Background() } // prepend to the queue the queue, but ignore the result := &Mutation{ Type: mutationEval, eval: , evalSource: , ctx: , } // TODO handle Canceled? _ = .PrependMut() // wait with a timeout select { case <-time.After(.EvalTimeout): .Store(true) .log(LogOps, "[eval:timeout] %s", ) .AddErr(fmt.Errorf("%w: eval:%s", ErrEvalTimeout, ), nil) return false case <-.ctx.Done(): .Store(true) return false case <-.Done(): .Store(true) .log(LogDecisions, "[eval:ctxdone] %s", ) return false case <-: } .log(LogEverything, "[eval:end] %s", ) return true } // NewStateCtx returns a new sub-context, bound to the current clock's tick of // the passed state. // // Context cancels when the state has been deactivated, or right away, // if it isn't currently active. // // State contexts are used to check state expirations and should be checked // often inside goroutines. func ( *Machine) ( string) context.Context { if .disposing.Load() { return context.TODO() } // TODO handle cancelation while parsing the queue .mustParseStates(S{}) .activeStatesMx.Lock() defer .activeStatesMx.Unlock() return .subs.NewStateCtx() } // MustBindHandlers is a panicking version of BindHandlers, useful in tests. func ( *Machine) ( any) { if := .BindHandlers(); != nil { panic() } } // BindHandlers binds a struct of handler methods to machine's states, based on // the naming convention, eg `FooState(e *Event)`. Negotiation handlers can // optionally return bool. func ( *Machine) ( any) error { if .disposing.Load() { return nil } := false if !.handlerLoopRunning.Load() { = true .handlerLoopRunning.Store(true) // start the handler loop go .handlerLoop() } := reflect.ValueOf() if .Kind() != reflect.Ptr || .Elem().Kind() != reflect.Struct { return errors.New("BindHandlers expects a pointer to a struct") } // extract the name := reflect.TypeOf().Elem().Name() if == "" { = "anon" if os.Getenv(EnvAmDebug) != "" { := make([]byte, 4024) := runtime.Stack(, false) := string([:]) := strings.Split(, "\n") = [len()-2] = strings.TrimLeft(emitterNameRe.FindString(), "/") } } // detect methods var []string if .detectEval || os.Getenv(EnvAmDebug) != "" { var error , = ListHandlers(, .stateNames) if != nil { return fmt.Errorf("listing handlers: %w", ) } } := .newHandler(, , &, ) := .getHandlers(false) .setHandlers(false, append(, )) if != "" { .log(LogOps, "[handlers] bind %s", ) } else { // index for anon handlers .log(LogOps, "[handlers] bind %d", len()) } // TODO sem logger for handlers // if already in Exception when 1st handler group is bound, re-add the err if && .IsErr() { .AddErr(.Err(), nil) } return nil } // DetachHandlers detaches previously bound machine handlers. func ( *Machine) ( any) error { if .disposing.Load() { return nil } .handlersMx.Lock() defer .handlersMx.Unlock() := .getHandlers(true) var *handler var int for , := range { if .h == { = = break } } if == nil { return errors.New("handlers not bound") } .setHandlers(true, slices.Delete(, , +1)) .dispose() .log(LogOps, "[handlers] detach %s", .name) return nil } // HasHandlers returns true if this machine has bound handlers, and thus an // allocated goroutine. It also makes it nondeterministic. func ( *Machine) () bool { .handlersMx.RLock() defer .handlersMx.RUnlock() // TODO keep a cache flag? return len(.handlers) > 0 } // newHandler creates a new handler for Machine. // Each handler should be consumed by one receiver only to guarantee the // delivery of all events. func ( *Machine) ( any, string, *reflect.Value, []string, ) *handler { if .disposing.Load() { return &handler{} } := &handler{ name: , h: , methods: , methodNames: , methodCache: make(map[string]reflect.Value), missingCache: make(map[string]struct{}), } return } // recoverToErr recovers to the StateException state by catching panics. func ( *Machine) ( *handler, recoveryData) { if .disposing.Load() { return } .panicCaught.Store(true) .currentHandler.Store("") := .t.Load() := .StateNames() := .Index1(StateException) // dont double handle an exception (no nesting) := .Mutation if .IsCalled() { return } .log(LogOps, "[recover] handling panic...") := fmt.Errorf("%s", .err) .err.Store(&) // final phase, trouble... if .latestHandlerIsFinal { .recoverFinalPhase() } .log(LogOps, "[cancel] (%s) by recover", j(.TargetStates())) // negotiation phase - canceling is enough .IsAccepted.Store(false) .IsCompleted.Store(true) // prepend add:Exception to the beginning of the queue := &Mutation{ Type: MutationAdd, Called: .Index(S{StateException}), Args: Pass(&AT{ Err: , ErrTrace: .stack, Panic: &ExceptionArgsPanic{ CalledStates: IndexToStates(, .Called), StatesBefore: .StatesBefore(), Transition: , }, }), } // prepend the exception to the queue .PrependMut() // restart the handler loop go .handlerLoop() } func ( *Machine) () { := .t.Load() // try to fix active states := slices.Concat(.Exits, .Enters) .activeStatesMx.RLock() := .activeStates .activeStatesMx.RUnlock() := false // walk over enter/exits and remove states after the last step, // as their final handlers haven't been executed for , := range { if .latestHandlerToState == { = true } if ! { continue } if .latestHandlerIsEnter { = slicesWithout(, ) } else { = append(, ) } } .log(LogOps, "[recover] partial final states as (%s)", j()) .activeStatesMx.Lock() defer .activeStatesMx.Unlock() .setActiveStates(.CalledStates(), , .IsAuto()) } // mustParseStates parses the states and returns them as a list. // Panics when a state is not defined. func ( *Machine) ( S) S { // TODO replace with Has() & ParseStates() if .disposing.Load() { return nil } // check if all states are defined in m.Struct := make(map[string]struct{}) := false for := range { if , := .schema[[]]; ! { panic(fmt.Errorf( "%w: %s not defined in schema for %s", ErrStateMissing, [], .id)) } if , := [[]]; ! { [[]] = struct{}{} } else { // mark as duplicated = true } } if { return slicesUniq() } return } // ParseStates parses a list of states, removing unknown ones and duplicates. // Use [Machine.Has] and [Machine.Has1] to check if a state is defined for the // machine. func ( *Machine) ( S) S { if .disposing.Load() { return nil } // check if all states are defined in the schema := make(map[string]struct{}) := false for := range { if , := .schema[[]]; ! { continue } if , := [[]]; ! { [[]] = struct{}{} } else { // mark as duplicated = true } } if { return slicesUniq() } return slices.Collect(maps.Keys()) } // VerifyStates verifies an array of state names and returns an error in case // at least one isn't defined. It also retains the order and uses it for // StateNames. Verification can be checked via Machine.StatesVerified. func ( *Machine) ( S) error { if .disposing.Load() { return nil } .schemaMx.RLock() defer .schemaMx.RUnlock() return .verifyStates() } func ( *Machine) ( S) error { var []error var []string for , := range { if , := .schema[]; ! { := fmt.Errorf("state %s not defined in schema for %s", , .id) = append(, ) continue } = append(, ) } if len() > 1 { return errors.Join(...) } else if len() == 1 { return [0] } if len(.stateNames) > len() { := StatesDiff(.stateNames, ) return fmt.Errorf( "error: trying to verify less states than registered: %s", j()) } // memorize the state names order .stateNames = slicesUniq() .stateNamesExport = nil .statesVerified.Store(true) // tracers .tracersMx.RLock() for := 0; !.disposing.Load() && < len(.tracers); ++ { .tracers[].VerifyStates() } .tracersMx.RUnlock() return nil } // StatesVerified returns true if the state names have been ordered // using VerifyStates. func ( *Machine) () bool { return .statesVerified.Load() } // setActiveStates sets the new active states incrementing the counters and // returning the previously active states. func ( *Machine) ( S, S, bool, ) S { if .disposing.Load() { // no-op return S{} } := .activeStates := StatesDiff(, .activeStates) := StatesDiff(.activeStates, ) := StatesDiff(, ) .activeStates = slices.Clone() // Tick all new states by +1 and already active and called multi states by +2 for , := range { := .schema[] if !slices.Contains(, ) { // tick by +1 // TODO wrap on overflow .clock[]++ } else if slices.Contains(, ) && .Multi { // tick by +2 to indicate a new instance // TODO wrap on overflow .clock[] += 2 // treat prev active multi states as new states, for logging = append(, ) } } // tick deactivated states by +1 for , := range { .clock[]++ } // construct a logging msg if .SemLogger().Level() >= LogExternal { := "" if len() > 0 { += " +" + strings.Join(, " +") } if len() > 0 { += " -" + strings.Join(, " -") } if len() > 0 && .semLogger.Level() > LogDecisions { += " " + j() } if len() > 0 { := "state" if { = "auto_" } := .t.Load().Mutation.LogArgs(.SemLogger().ArgsMapper()) .log(LogChanges, "["++"]"++) } } return } func ( *Machine) ( string, string, bool) { if != "" { .AddBreakpoint(S{}, nil, ) } else if != "" { .AddBreakpoint(nil, S{}, ) } else { .log(LogOps, "[breakpoint] invalid") } } // AddBreakpoint adds a breakpoint for an outcome of mutation (added and // removed states) checked by mutation equality. Once such a mutation happens, // a log message will be printed out. We can set an IDE's breakpoint on this // line and see the mutation's sync stack trace. If [Machine.LogStackTrace] is // set, the stack trace will be printed out as well. Many breakpoints can be // added, but none removed. // // Breakpoints are useful to find the caller of a mutation, but don't work with // [Machine.Set]. // // strict: strict skips already active / inactive (for strict of diff equality). func ( *Machine) ( S, S, bool) { // TODO strict: dont breakpoint added states when already active .breakpointsMx.Lock() defer .breakpointsMx.Unlock() .breakpoints = append(.breakpoints, &breakpoint{ Added: , Removed: , Strict: , }) } func ( *Machine) ( S, S) { .breakpointsMx.Lock() defer .breakpointsMx.Unlock() := false for , := range .breakpoints { // check if the breakpoint matches if len() > 0 && !slices.Equal(.Added, ) { continue } if len() > 0 && !slices.Equal(.Removed, ) { continue } // strict skips already active / inactive if .Strict { if len(.Added) > 0 && .Is(.Added) { continue } if len(.Removed) > 0 && .Not(.Removed) { continue } } = true } if ! { return } // ----- ----- ----- // SET THE IDE BREAKPOINT BELOW // ----- ----- ----- if .LogStackTrace { .log(LogChanges, "[breakpoint] Machine.breakpoint\n%s", captureStackTrace()) } else { .log(LogChanges, "[breakpoint] Machine.breakpoint") } } // processQueue processes the queue of mutations. It's the main loop of the // machine. func ( *Machine) () Result { // empty queue if .queueLen.Load() == 0 || .disposing.Load() { return Canceled } // try to acquire the lock TODO safer locking for handler deadlines? if !.queueProcessing.CompareAndSwap(false, true) { .queueMx.Lock() defer .queueMx.Unlock() := "items" if len(.queue) == 1 { = "item" } .log(LogOps, "[postpone] queue running (%d %s)", len(.queue), ) return Queued } var []Result // execute the queue .queueRunning.Store(false) for .queueLen.Load() > 0 { .queueRunning.Store(true) if .disposing.Load() { return Canceled } // shift the queue .queueMx.Lock() := len(.queue) if < 1 { .Log("ERROR: missing queue item") return Canceled } := .queue[0] .queue = .queue[1:] .queueLen.Store(uint32( - 1)) // queue ticks if .QueueTick > 0 { .queueTicksPending -= 1 .queueTick += 1 } .queueMx.Unlock() // support for context cancelation if .ctx != nil && .ctx.Err() != nil { = append(, Executed) continue } // special case for Eval mutations if .Type == mutationEval { .Log("eval: " + .evalSource) .eval() continue } // TODO race in relations.go:79; m.queueProcessing failing? := newTransition(, ) // execute the transition and set active states .schemaMx.RLock() = append(, .emitEvents()) .timeLast.Store(&.TimeAfter) .schemaMx.RUnlock() // TODO assert QueueTick same as mut queue tick? // parse wait chans if .Mutation.IsCheck { // TODO test case if , := .Args[argCheckDone].(*CheckDone); { .Canceled = .IsAccepted.Load() closeSafe(.Ch) } } else if .IsAccepted.Load() && !.Mutation.IsCheck { // TODO optimize process only when ticks change (incl queue tick) .processSubscriptions() } .CleanCache() } // release the locks .t.Store(nil) .queueProcessing.Store(false) .queueRunning.Store(false) // tracers .tracersMx.RLock() for := 0; !.disposing.Load() && < len(.tracers); ++ { .tracers[].QueueEnd() } .tracersMx.RUnlock() // subscriptions // TODO deadlock with doDispose .queueMx.Lock() for , := range .subs.ProcessWhenQueueEnds() { closeSafe() } .queueMx.Unlock() if len() == 0 { return Canceled } return [0] } func ( *Machine) ( *Transition) { // lock .activeStatesMx.RLock() // collect := .subs.ProcessStateCtx(.cacheDeactivated) := slices.Concat( .subs.ProcessWhen(.cacheActivated, .cacheDeactivated), .subs.ProcessWhenTime(.ClockBefore()), .subs.ProcessWhenQueue(.queueTick), .subs.ProcessWhenQuery(), ) // unlock .activeStatesMx.RUnlock() // close outside the critical zone for , := range { () } for , := range { closeSafe() } } // TODO implement +rpc worker // func (m *Subscriptions) SetArgsComp(comp func(args A, match A) bool) { // return false // } // Ctx return machine's root context. func ( *Machine) () context.Context { return .ctx } // Log logs an [extern] message unless LogNothing is set. // Optionally redirects to a custom logger from SemLogger().SetLogger. func ( *Machine) ( string, ...any) { if .disposing.Load() { return } := "[exter" // single lines only = strings.ReplaceAll(, "\n", " ") .log(LogExternal, +"] "+, ...) } // log logs a message if the log level is high enough. // Optionally redirects to a custom logger from SemLogger().SetLogger. func ( *Machine) ( LogLevel, string, ...any) { if > .semLogger.Level() || .disposing.Load() { return } := "" if .logId.Load() { := .id if len() > 5 { = [:5] } = "[" + + "] " = + } := fmt.Sprintf(, ...) := .semLogger.Logger() if != nil { (, , ...) } else { fmt.Println() } // dont modify completed transitions := .Transition() if != nil && !.IsCompleted.Load() { // append the log msg to the current transition .InternalLogEntriesLock.Lock() defer .InternalLogEntriesLock.Unlock() .LogEntries = append(.LogEntries, &LogEntry{, }) } else { // append the log msg the machine and collect at the end of the next // transition .logEntriesLock.Lock() defer .logEntriesLock.Unlock() // prevent dups (except piping) if len(.logEntries) > 0 && .logEntries[len(.logEntries)-1].Text == && !strings.HasPrefix(, +"[pipe-") { return } .logEntries = append(.logEntries, &LogEntry{ Level: , Text: , }) } } // SemLogger returns the semantic logger of the machine func ( *Machine) () SemLogger { return .semLogger } // handle triggers methods on handlers structs. // locked: transition lock currently held func ( *Machine) ( string, A, , , bool, ) (Result, bool) { if .disposing.Load() { return Canceled, false } := .t.Load() := &Event{ Name: , machine: , machApi: , Args: , TransitionId: .Id, MachineId: .Id(), } := .TargetStates() .latestHandlerIsEnter = .latestHandlerIsFinal = // always init args if .Args == nil { .Args = A{} } // call the handlers , := .processHandlers() if .panicCaught.Load() { = Canceled .panicCaught.Store(false) } // negotiation support if ! && == Canceled { if .semLogger.Level() >= LogOps { var string if { = ":self" } .log(LogOps, "[cancel%s] (%s) by %s", , j(), ) } return Canceled, } return , } func ( *Machine) ( *Event) (Result, bool) { if .disposing.Load() { return Canceled, false } := false := .getHandlers(false) for := 0; !.disposing.Load() && < len(); ++ { := [] if == nil { continue } .mx.Lock() := .Name // TODO descriptive name := strconv.Itoa() + ":" + .name if .semLogger.Level() >= LogEverything { := truncateStr(, 15) = padString(strings.ReplaceAll(, " ", "_"), 15, "_") .log(LogEverything, "[handle:%-15s] %s", , ) } // cache , := .missingCache[] if { .mx.Unlock() continue } , := .methodCache[] if ! { = .methods.MethodByName() // support field handlers if !.IsValid() { = .methods.Elem().FieldByName() } if !.IsValid() { .missingCache[] = struct{}{} .mx.Unlock() continue } .methodCache[] = } .mx.Unlock() // call the handler .log(LogOps, "[handler:%d] %s", , ) .currentHandler.Store() var bool var bool = true // tracers .tracersMx.RLock() := .t.Load() for := range .tracers { .tracers[].HandlerStart(, , ) } .tracersMx.RUnlock() := &handlerCall{ fn: , name: , event: , timeout: false, } select { case <-.ctx.Done(): break case .handlerStart <- : } // reuse the timer each time .handlerTimer.Reset(.HandlerTimeout) // wait on the result / timeout / context select { case <-.ctx.Done(): case <-.handlerTimer.C: // timeout, fork a new handler loop .log(LogOps, "[cancel] (%s) by timeout", j(.TargetStates())) .log(LogDecisions, "[handler:timeout]: %s from %s", , .name) = true // wait for the handler to exit within HandlerDeadline select { case <-.handlerEnd: // accepted timeout (good) .log(LogEverything, "[handler:ack-timeout] %s from %s", .Name, .name) // TODO optimize re-use a timer like timeout case <-time.After(.HandlerDeadline): .log(LogEverything, "[handler:deadline] %s from %s", .Name, .name) // deadlined timeout (bad) // fork a new handler loop go .handlerLoop() // clear the queue .queueMx.Lock() .queue = nil .queueLen.Store(0) .queueTicksPending = 0 .queueMx.Unlock() // TODO dispose all argCheckDone chans // enqueue the relevant err := fmt.Errorf("%w: %s from %s", ErrHandlerTimeout, , .name) .EvAddErr(, , Pass(&AT{ TargetStates: .TargetStates(), CalledStates: .CalledStates(), TimeBefore: .TimeBefore, TimeAfter: .TimeAfter, Event: .Export(), })) // activate Backoff for further mutations := time.Now() .LastHandlerDeadline.Store(&) } case := <-.handlerPanic: // recover partial state // TODO pass tx info via &AT{} .recoverToErr(, ) case = <-.handlerEnd: .log(LogEverything, "[handler:end] %s from %s", .Name, .name) // ok } .handlerTimer.Stop() .currentHandler.Store("") // tracers .tracersMx.RLock() for := range .tracers { .tracers[].HandlerEnd(, , ) } .tracersMx.RUnlock() // handle negotiation switch { case : return Canceled, case strings.HasSuffix(.Name, SuffixState): case strings.HasSuffix(.Name, SuffixEnd): // returns from State and End handlers are ignored default: if ! { return Canceled, } } } // state args matchers .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() for , := range .subs.ProcessWhenArgs() { close() } return Executed, } func ( *Machine) () { := .handlerLoopVer.Add(1) := func() { := recover() if == nil { return } if !.disposing.Load() { .handlerPanic <- recoveryData{ err: , stack: captureStackTrace(), } } } // catch panics and fwd if .PanicToException { defer () } // wait for a handler call or context for { select { case <-.ctx.Done(): .handlerLoopDone() return case , := <-.handlerStart: if ! { return } := true // handler signature: FooState(e *am.Event) // TODO optimize https://github.com/golang/go/issues/7818 if .event.IsValid() { := .fn.Call([]reflect.Value{reflect.ValueOf(.event)}) if len() > 0 { = [0].Interface().(bool) } } else { .log(LogDecisions, "[handler:invalid] %s", .name) = false } // exit, a new clone is running := .handlerLoopVer.Load() if != { .AddErr(fmt.Errorf( "deadlined handler finished, theoretical leak: %s", .name), nil) return } .loopLock.Lock() // pass the result to handlerLoop select { case <-.ctx.Done(): .handlerLoopDone() .loopLock.Unlock() return case .handlerEnd <- : .loopLock.Unlock() } } } } func ( *Machine) () { // doDispose with context , := .ctx.Value(CtxKey).(CtxValue) .log(LogOps, "[doDispose] ctx handlerLoopDone %v", ) .Dispose() } // detectQueueDuplicates checks for duplicated mutations // 1. Check if a mutation is scheduled (without params) // 2. Check if a counter mutation isn't scheduled later (any params) func ( *Machine) ( MutationType, S, bool, ) bool { // TODO test qTick and counterMutFound if .disposing.Load() { return false } // check if this mutation is already scheduled , , := .IsQueued(, , true, true, 0, , PositionAny) if ! { return false } var MutationType switch { case MutationAdd: = MutationRemove case MutationRemove: = MutationAdd case MutationSet: fallthrough default: // avoid duplicating `set` only if at the end of the queue return > 0 && len(.queue)-1 > 0 } // Check if a counter-mutation is scheduled and broaden the match // - with or without params // - state sets same or bigger than `states` , , := .IsQueued(, , false, false, +1, , PositionAny) return ! } // Transition returns the current transition, if any. func ( *Machine) () *Transition { return .t.Load() } // Clock returns current machine's clock, a state-keyed map of ticks. If states // are passed, only the ticks of the passed states are returned. func ( *Machine) ( S) Clock { if .disposing.Load() { return Clock{} } .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() if == nil { = .stateNames } := make(Clock) for , := range { [] = .clock[] } return } // Tick return the current tick for a given state. func ( *Machine) ( string) uint64 { if .disposing.Load() { return 0 } .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() return .clock[] } // IsQueued checks if a particular mutation has been queued. Returns // an index (idx, true), or (0, false), if not found. // // mutType: add, remove, set // // states: list of states used in the mutation // // withoutArgsOnly: matches only mutation without the arguments object // // statesStrictEqual: states of the mutation have to be exactly like `states` // and not a superset. // // minQueueTick: minimal queue tick assigned to the matched mutation // // isCheck: the mutation has to be a [Mutation.IsCheck] // // position: position in the queue, after applying the [startIndex] func ( *Machine) ( MutationType, S, bool, bool, uint64, bool, Position, ) ( bool, uint16, uint64) { // TODO combine params into a struct `QueueQuery` // TODO return the found mutation, not the mutable index // TODO test case if .disposing.Load() { return false, 0, 0 } .queueMx.RLock() defer .queueMx.RUnlock() // position TODO test case := .queue switch { case PositionLast: := math.Max(0, float64(len()-1)) = [int():] case PositionFirst: = [0:1] } for , := range { // start index via qticks #326 if > 0 && .QueueTick < { continue } if .IsCheck == && .Type == && (( && len(.Args) == 0) || !) && // target states have to be at least as long as the checked ones // or exactly the same in case of a strict_equal (( && len(.Called) == len()) || (! && len(.Called) >= len())) && // and all the checked ones have to be included in the target ones slicesEvery(.Called, .Index()) { // return queueTick return true, uint16(), .QueueTick } } return false, 0, 0 } // IsQueuedAbove ... N times. This method allows for rate-limiting of // mutations for specific states and a threshold. func ( *Machine) ( int, MutationType, S, bool, bool, uint64, ) bool { if .disposing.Load() { return false } // TODO test .queueMx.RLock() defer .queueMx.RUnlock() := 0 for , := range .queue { // start index via qticks #326 if > 0 && .QueueTick < { continue } // no rate limiting for checks if .IsCheck == false && .Type == && (( && len(.Args) == 0) || !) && // target states have to be at least as long as the checked ones // or exactly the same in case of a strict_equal (( && len(.Called) == len()) || (! && len(.Called) >= len())) && // and all the checked ones have to be included in the target ones slicesEvery(.Called, .Index()) { ++ if >= { return true } } } return false } func ( *Machine) () uint16 { return uint16(.queueLen.Load()) } // WillBe returns true if the passed states are scheduled to be activated. // Does not cover implied states, only called ones. // See [Machine.IsQueued] to perform more detailed queries. // // position: optional position assertion func ( *Machine) ( S, ...Position) bool { // TODO test if len() == 0 { = []Position{PositionAny} } , , := .IsQueued(MutationAdd, , false, false, 0, false, [0]) switch { case !: return false case == 0 && [0] == PositionFirst: return true case == uint16(.queueLen.Load())-1 && [0] == PositionLast: return true default: return true } } // WillBe1 returns true if the passed state is scheduled to be activated. // See IsQueued to perform more detailed queries. func ( *Machine) ( string, ...Position) bool { // TODO test return .WillBe(S{}, ...) } // WillBeRemoved returns true if the passed states are scheduled to be // deactivated. Does not cover implied states, only called ones. See // [Machine.IsQueued] to perform more detailed queries. // // position: optional position assertion func ( *Machine) ( S, ...Position) bool { // TODO test if len() == 0 { = []Position{PositionAny} } , , := .IsQueued(MutationRemove, , false, false, 0, false, [0]) switch { case !: return false case == 0 && [0] == PositionFirst: return true case == uint16(.queueLen.Load())-1 && [0] == PositionLast: return true default: return true } } // WillBeRemoved1 returns true if the passed state is scheduled to be // deactivated. See IsQueued to perform more detailed queries. func ( *Machine) ( string, ...Position) bool { // TODO test return .WillBeRemoved(S{}, ...) } // Has return true is passed states are registered in the machine. Useful for // checking if a machine implements a specific state set. func ( *Machine) ( S) bool { if .disposing.Load() { return false } return slicesEvery(.stateNames, ) } // Has1 is shorthand for Has. It returns true if the passed state is // registered in the machine. func ( *Machine) ( string) bool { return .Has(S{}) } // IsClock checks if the machine has changed since the passed // clock. Returns true if at least one state has changed. func ( *Machine) ( Clock) bool { if .disposing.Load() { return false } .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() for , := range { if .clock[] != { return false } } return true } // WasClock checks if the passed time has happened (or happening right now). // Returns false if at least one state is too early. func ( *Machine) ( Clock) bool { if .disposing.Load() { return false } .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() for , := range { if .clock[] < { return false } } return true } // IsTime checks if the machine has changed since the passed // time (list of ticks). Returns true if at least one state has changed. The // states param is optional and can be used to check only a subset of states. func ( *Machine) ( Time, S) bool { if .disposing.Load() { return false } .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() if == nil { = .stateNames } for , := range { if .clock[[]] != { return false } } return true } // WasTime checks if the passed time has happened (or happening right now). // Returns false if at least one state is too early. The // states param is optional and can be used to check only a subset of states. func ( *Machine) ( Time, S) bool { if .disposing.Load() { return false } .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() if == nil { = .stateNames } for , := range { if .clock[[]] < { return false } } return true } // String returns a one line representation of the currently active states, // with their clock values. Inactive states are omitted. // Eg: (Foo:1 Bar:3) func ( *Machine) () string { if .disposing.Load() { return "" } .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() := "(" for , := range .stateNames { if !slices.Contains(.activeStates, ) { continue } if != "(" { += " " } += fmt.Sprintf("%s:%d", , .clock[]) } return + ")" } // StringAll returns a one line representation of all the states, with their // clock values. Inactive states are in square brackets. // // (Foo:1 Bar:3) [Baz:2] func ( *Machine) () string { if .disposing.Load() { return "" } .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() := "(" := "[" for , := range .stateNames { if slices.Contains(.activeStates, ) { if != "(" { += " " } += fmt.Sprintf("%s:%d", , .clock[]) continue } if != "[" { += " " } += fmt.Sprintf("%s:%d", , .clock[]) } return + ") " + + "]" } // Inspect returns a multi-line string representation of the machine (states, // relations, clocks). // states: param for ordered or partial results. func ( *Machine) ( S) string { if .disposing.Load() { return "" } .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() if == nil { = .stateNames } := "" for , := range { := .schema[] := "0" if slices.Contains(.activeStates, ) { = "1" } += fmt.Sprintf("%s %s\n"+ " |Tick %d\n", , , .clock[]) if .Auto { += " |Auto true\n" } if .Multi { += " |Multi true\n" } if .Add != nil { += " |Add " + j(.Add) + "\n" } if .Require != nil { += " |Require " + j(.Require) + "\n" } if .Remove != nil { += " |Remove " + j(.Remove) + "\n" } if .After != nil { += " |After " + j(.After) + "\n" } } return } // Switch returns the first state from the passed list that is currently active, // making it handy for switch statements. Useful for state groups. // // switch mach.Switch(ss.GroupPlaying) { // case "Playing": // case "Paused": // case "Stopped": // } func ( *Machine) ( ...S) string { := .ActiveStates(nil) for , := range { for , := range { if slices.Contains(, ) { return } } } return "" } // ActiveStates returns a copy of the currently active states when states is // nil, optionally limiting the results to a subset of states. func ( *Machine) ( S) S { if .disposing.Load() { return S{} } .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() if == nil { return slices.Clone(.activeStates) } := make(S, 0, len()) for , := range { if slices.Contains(.activeStates, ) { = append(, ) } } return } // StateNames returns a SHARED copy of all the state names. func ( *Machine) () S { .schemaMx.RLock() defer .schemaMx.RUnlock() if .stateNamesExport == nil { .stateNamesExport = slices.Clone(.stateNames) } return .stateNamesExport } // TODO docs func ( *Machine) ( *regexp.Regexp) S { .schemaMx.RLock() defer .schemaMx.RUnlock() := S{} for , := range .stateNames { if .MatchString() { = append(, ) } } return } // Queue returns a copy of the currently active states. func ( *Machine) () []*Mutation { if .disposing.Load() { return nil } .queueMx.RLock() defer .queueMx.RUnlock() return slices.Clone(.queue) } // Schema returns a copy of machine's schema. func ( *Machine) () Schema { .schemaMx.RLock() defer .schemaMx.RUnlock() return CloneSchema(.schema) } // SchemaVer return the current version of the schema. func ( *Machine) () int { return len(.StateNames()) } // SetSchema sets the machine's schema. It will automatically call // VerifyStates with the names param and handle EventSchemaChange if successful. // The new schema has to be longer than the previous one (no relations-only // changes). The length of the schema is also the version of the schema. func ( *Machine) ( Schema, S) error { if .Transition() != nil { return fmt.Errorf("%w: cannot set schema during a transition", ErrSchema) } // locks .schemaMx.Lock() .queueMx.RLock() defer .queueMx.RUnlock() // validate if len() <= len(.schema) { .schemaMx.Unlock() return fmt.Errorf("%w: new schema too short (> %d states)", ErrSchema, len(.schema)) } if len() != len() { .schemaMx.Unlock() return fmt.Errorf("%w: new schema has to be the same length as"+ " state names", ErrSchema) } // replace and unlock := .schema , := ParseSchema() if != nil { .schemaMx.Unlock() return } .schema = if = .verifyStates(); != nil { .schemaMx.Unlock() return } // TODO is this safe? .subs.SetClock(.Clock(nil)) .schemaMx.Unlock() // notify the resolver and tracers .resolver.NewSchema(.schema, .stateNames) .tracersMx.RLock() for := 0; !.disposing.Load() && < len(.tracers); ++ { .tracers[].SchemaChange(, ) } .tracersMx.RUnlock() return nil } // EvAdd is like Add, but passed the source event as the 1st param, which // results in traceable transitions. func ( *Machine) ( *Event, S, A) Result { if .disposing.Load() || .Backoff() || uint16(.queueLen.Load()) >= .QueueLimit { return Canceled } := .queueMutation(MutationAdd, , , ) if == uint64(Executed) { return Executed } .breakpoint(, nil) := .processQueue() if == Queued { return Result( + 1) } return } // EvAdd1 is like Add1 but passed the source event as the 1st param, which // results in traceable transitions. func ( *Machine) ( *Event, string, A) Result { return .EvAdd(, S{}, ) } // EvRemove is like Remove but passed the source event as the 1st param, which // results in traceable transitions. func ( *Machine) ( *Event, S, A) Result { if .disposing.Load() || .Backoff() || uint16(.queueLen.Load()) >= .QueueLimit { return Canceled } // return early if none of the states is active .queueMx.RLock() := len(.queue) // try ignoring this mutation, if none of the states is currently active var []S for , := range { = append(, S{}) } if == 0 && .Transition() != nil && !.Any(...) { .queueMx.RUnlock() return Executed } .queueMx.RUnlock() := .queueMutation(MutationRemove, , , ) if == uint64(Executed) { return Executed } .breakpoint(nil, ) := .processQueue() if == Queued { return Result( + 1) } return } // EvRemove1 is like Remove1, but passed the source event as the 1st param, // which results in traceable transitions. func ( *Machine) ( *Event, string, A) Result { return .EvRemove(, S{}, ) } // EvAddErr is like AddErr, but passed the source event as the 1st param, which // results in traceable transitions. func ( *Machine) ( *Event, error, A) Result { return .EvAddErrState(, StateException, , ) } // EvAddErrState is like AddErrState, but passed the source event as the 1st // param, which results in traceable transitions. func ( *Machine) ( *Event, string, error, A, ) Result { if .disposing.Load() || .Backoff() || uint16(.queueLen.Load()) >= .QueueLimit || == nil { return Canceled } // TODO test Err() .err.Store(&) var string if .LogStackTrace { = captureStackTrace() } // error handler := .onError.Load() if != nil { (*)(, ) } // build args // TODO read [event] and fill out relevant fields := &AT{ Err: , ErrTrace: , } // TODO prepend to the queue? test return .EvAdd(, S{, StateException}, PassMerge(, )) } // Export exports the machine state as Serialized: ID, machine time, and // state names. func ( *Machine) () (*Serialized, Schema, error) { .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() .queueMx.RLock() defer .queueMx.RUnlock() .schemaMx.RLock() defer .schemaMx.RUnlock() if !.statesVerified.Load() { return nil, nil, fmt.Errorf("%w: call VerifyStates first", ErrSchema) } := .time(nil) .log(LogOps, "[import] exported at %d ticks", .Sum(nil)) return &Serialized{ ID: .id, Time: , MachineTick: .machineTick, StateNames: .stateNames, // export only QueueTick: .queueTick, }, CloneSchema(.schema), nil } // Import imports the machine state from Serialized. It's not safe to import // into a machine which has already produces transitions and/or // has telemetry connected (use [Machine.SetSchema] instead). func ( *Machine) ( *Serialized) error { .activeStatesMx.RLock() defer .activeStatesMx.RUnlock() .queueMx.RLock() defer .queueMx.RUnlock() .schemaMx.Lock() defer .schemaMx.Unlock() // verify if .id != .ID { return fmt.Errorf("import ID mismatch") } if len(.schema) != len(.StateNames) { return fmt.Errorf("%w: importing diff state len", ErrSchema) } // restore active states and clocks var uint64 .activeStates = nil for , := range .Time { := .StateNames[] += if !slices.Contains(.stateNames, ) { return fmt.Errorf("%w: %s", ErrStateUnknown, ) } if IsActiveTick() { .activeStates = append(.activeStates, ) } .clock[] = } // restore ID and state names .stateNames = .StateNames .stateNamesExport = nil .statesVerified.Store(true) .machineTick = .MachineTick + 1 // trigger MachineRestored, if defined if .Has1(StateMachineRestored) { .Add1(StateMachineRestored, nil) } .log(LogChanges, "[import] imported %d times, now at %d ticks", .machineTick, ) return nil } // Index1 returns the index of a state in the machine's StateNames() list, or -1 // when not found or machine has been disposed. func ( *Machine) ( string) int { if .disposing.Load() { return -1 } return slices.Index(.StateNames(), ) } // Index returns a list of state indexes in the machine's StateNames() list, // with -1s for missing ones. func ( *Machine) ( S) []int { if .disposing.Load() { return []int{} } := .StateNames() return StatesToIndex(, ) } // Resolver returns the relation resolver, used to produce target states of // transitions. func ( *Machine) () RelationsResolver { return .resolver } // BindTracer binds a Tracer to the machine. Tracers can cause StateException in // submachines, before any handlers are bound. Use the Err() getter to examine // such errors. func ( *Machine) ( Tracer) error { .tracersMx.Lock() defer .tracersMx.Unlock() := reflect.ValueOf() if .Kind() != reflect.Ptr || .Elem().Kind() != reflect.Struct { return errors.New("BindTracer expects a pointer to a struct") } := reflect.TypeOf().Elem().Name() .tracers = append(.tracers, ) .log(LogOps, "[tracers] bind %s", ) return nil } // DetachTracer tries to remove a tracer from the machine. Returns an error in // case the tracer wasn't bound. func ( *Machine) ( Tracer) error { if .disposing.Load() { return nil } .tracersMx.Lock() defer .tracersMx.Unlock() := reflect.ValueOf() if .Kind() != reflect.Ptr || .Elem().Kind() != reflect.Struct { return errors.New("DetachTracer expects a pointer to a struct") } := reflect.TypeOf().Elem().Name() for , := range .tracers { if == { // TODO check .tracers = slices.Delete(.tracers, , +1) .log(LogOps, "[tracers] detach %s", ) return nil } } return errors.New("tracer not bound") } // Tracers return a copy of currenty attached tracers. func ( *Machine) () []Tracer { .tracersMx.Lock() defer .tracersMx.Unlock() return slices.Clone(.tracers) } // OnError is the most basic error handler, useful for machines without any // handlers. func ( *Machine) ( HandlerError) { .onError.Store(&) } // Backoff is true in case the machine had a recent HandlerDeadline. During a // backoff, all mutations will be [Canceled]. func ( *Machine) () bool { := .LastHandlerDeadline.Load() return != nil && time.Since(*) < .HandlerBackoff } // OnChange is the most basic state-change handler, useful for machines without // any handlers. func ( *Machine) ( HandlerChange) { .onChange.Store(&) } // SetGroups organizes the schema into a tree using schema-v2 structs. func ( *Machine) ( any, States) { // TODO rename to SchemaOrganize(optGroups, optStates, ...) // TODO call VerifyStates from optStates.Names() .schemaMx.Lock() defer .schemaMx.Unlock() := map[string][]int{} := []string{} := .stateNames // add all the groups if != nil { // TODO recursive for inherited groups := reflect.ValueOf() := .Type() for := 0; < .NumField(); ++ { := .Field() := .Type.Kind() if != reflect.Slice { continue } := .Name := .Field().Interface() if , := .(S); { [] = StatesToIndex(, ) = append(, ) } } } // add all the schemas (nested) // state schema structure if != nil { , := .StateGroups() for , := range { [] = [] = append(, ) } } .groups = .groupsOrder = } // SetGroupsString is like SetGroups, but work with the schema-v1 format. func ( *Machine) ( map[string]S, []string) { // TODO rename to SchemaOrganizeSimple(optGroups, optStates, ...) .schemaMx.Lock() defer .schemaMx.Unlock() := map[string][]int{} for , := range { [] = StatesToIndex(.stateNames, ) } .groups = .groupsOrder = } func ( *Machine) () (map[string][]int, []string) { // TODO rename to SchemaTree(optGroups, optStates, ...) // return a CLEAR tree (groups, inherited) per each mach .schemaMx.RLock() defer .schemaMx.RUnlock() return .groups, .groupsOrder }