// Package helpers is a set of useful functions when working with async state // machines.
package helpers import ( am ssam ampipe amtele ) const ( // EnvAmLogPrint prints machine log to stdout. EnvAmLogPrint = "AM_LOG_PRINT" // EnvAmHealthcheck enables a healthcheck ticker for every debugged machine. EnvAmHealthcheck = "AM_HEALTHCHECK" // EnvAmTestRunner indicates the main test tunner, disables any telemetry. EnvAmTestRunner = "AM_TEST_RUNNER" // EnvAmLogFull enables all the features of [am.SemLogger]. EnvAmLogFull = "AM_LOG_FULL" // EnvAmLogSteps logs transition steps. // See [am.SemLogger.EnableSteps]. // "1" | "" (default) EnvAmLogSteps = "AM_LOG_STEPS" // EnvAmLogGraph logs the graph structure (mutation traces, pipes, RPC, etc). // See [am.SemLogger.EnableGraph]. // "1" | "" (default) EnvAmLogGraph = "AM_LOG_GRAPH" // EnvAmLogChecks logs Can methods. See [am.SemLogger.EnableCan]. // "1" | "" (default) EnvAmLogChecks = "AM_LOG_CHECKS" // EnvAmLogQueued logs queued mutations. See [am.SemLogger.EnableQueued]. // "1" | "" (default) EnvAmLogQueued = "AM_LOG_QUEUED" // EnvAmLogArgs logs mutation args. See [am.SemLogger.EnableArgs]. // "1" | "" (default) EnvAmLogArgs = "AM_LOG_ARGS" // EnvAmLogWhen logs When methods. See [am.SemLogger.EnableWhen]. // "1" | "" (default) EnvAmLogWhen = "AM_LOG_WHEN" // EnvAmLogStateCtx logs state ctxs. See [am.SemLogger.EnableStateCtx]. // "1" | "" (default) EnvAmLogStateCtx = "AM_LOG_STATE_CTX" // EnvAmLogFile enables file logging (using machine ID as the name). // "1" | "" (default) EnvAmLogFile = "AM_LOG_FILE" healthcheckInterval = 30 * time.Second ) var ErrTestAutoDisable = errors.New("feature disabled when AM_TEST_RUNNER") type ( S = am.S A = am.A Schema = am.Schema ) // Add1Block activates a state and waits until it becomes active. If it's a // multi-state, it also waits for it to deactivate. Returns early if a // non-multi state is already active. Useful to avoid the queue but can't // handle a rejected negotiation. // Deprecated: use Add1Sync instead. func ( context.Context, am.Api, string, am.A, ) am.Result { // TODO remove once Add1Sync ready // support for multi-states if IsMulti(, ) { := .WhenTicks(, 2, ) := .Add1(, ) <- return } if .Is1() { return am.Executed } , := context.WithCancel() defer () := .WhenTicks(, 1, ) := .Add1(, ) if == am.Canceled { // dispose "when" ch early () return } // wait select { case <-: return am.Executed case <-.Done(): return am.Canceled } } // Add1Sync activates a state and waits until it becomes activate, or canceled. // Add1Sync is a newer version of Add1Block that supports queued rejections, // but at the moment it is not compatible with RPC. This method checks // expiration ctx and returns as [am.Canceled]. func ( context.Context, am.Api, string, am.A, ) am.Result { := .Add1(, ) switch { case am.Executed: return case am.Canceled: return default: // wait TODO select ctx.Done select { case <-.Done(): return am.Canceled case <-.WhenQueue(): if .Is1() { return am.Executed } return am.Canceled } } } // Add1Async adds a state from an async op and waits for another one // from the op to become active. Theoretically, it should work with any state // pair, including Multi states (assuming they remove themselves). Not // compatible with queued negotiation at the moment. func ( context.Context, am.Api, string, string, am.A, ) am.Result { := 1 // wait 2 ticks for multi states if IsMulti(, ) { = 2 } , := context.WithCancel() defer () := .WhenTicks(, , ) := .Add1(, ) if == am.Canceled { // dispose "when" ch early () return } // wait select { case <-: return am.Executed case <-.Done(): return am.Canceled } } // TODO AddSync // TODO EvAdd1Async // TODO EvAdd1Sync // TODO EvAddSync // Remove? // IsMulti returns true if a state is a multi state. func ( am.Api, string) bool { // TODO safeguard return .Schema()[].Multi } // StatesToIndexes converts a list of state names to a list of state indexes, // for the given machine. It returns -1 for unknown states. func ( am.S, am.S) []int { := make([]int, len()) for , := range { [] = slices.Index(, ) } return } // IndexesToStates converts a list of state indexes to a list of state names, // for a given machine. func ( am.S, []int) am.S { := make(am.S, len()) for , := range { if == -1 || >= len() { [] = "unknown" + strconv.Itoa() continue } [] = [] } return } // MachDebug exports transition telemetry to an am-dbg instance listening at // [amDbgAddr]. func ( am.Api, string, am.LogLevel, bool, *am.SemConfig, ) error { // no debug for CI if IsTestRunner() { return nil } := .SemLogger() if { .SetLevel() } else { .SetEmpty() } if == "" { return nil } // semantic logging if .Steps { .EnableSteps(true) } if .Graph { .EnableGraph(true) } if .Can { .EnableCan(true) } if .Queued { .EnableQueued(true) } if .StateCtx { .EnableStateCtx(true) } if .Can { .EnableCan(true) } if .When { .EnableWhen(true) } if .Args { .EnableArgs(true) } // tracer for telemetry := amtele.TransitionsToDbg(, ) if != nil { return } if os.Getenv(EnvAmHealthcheck) != "" { Healthcheck() } return nil } // SemConfigEnv returns a SemConfigEnv based on env vars, or the [forceFull] // flag. func ( bool) *am.SemConfig { // TODO rename to SemConfigEnv // full override if os.Getenv(EnvAmLogFull) != "" || { return &am.SemConfig{ Steps: true, Graph: true, Can: true, Queued: true, StateCtx: true, When: true, Args: true, } } // selective logging return &am.SemConfig{ Steps: os.Getenv(EnvAmLogSteps) != "", Graph: os.Getenv(EnvAmLogGraph) != "", Can: os.Getenv(EnvAmLogChecks) != "", Queued: os.Getenv(EnvAmLogQueued) != "", StateCtx: os.Getenv(EnvAmLogStateCtx) != "", When: os.Getenv(EnvAmLogWhen) != "", Args: os.Getenv(EnvAmLogArgs) != "", } } // MachDebugEnv sets up a machine for debugging, based on env vars only: // AM_DBG_ADDR, AM_LOG, AM_LOG_*, and AM_DEBUG. This function should be called // right after the machine is created (to catch all the log entries). func ( am.Api) error { := os.Getenv(amtele.EnvAmDbgAddr) := am.EnvLogLevel("") := os.Getenv(EnvAmLogPrint) != "" // expand the default addr if == "1" { = amtele.DbgAddr } return MachDebug(, , , , SemConfigEnv(false)) } // Healthcheck adds a state to a machine every 5 seconds, until the context is // done. This makes sure all the logs are pushed to the telemetry server. // TODO use machine scheduler when ready func ( am.Api) { if !.Has1("Healthcheck") { return } go func() { := time.NewTicker(healthcheckInterval) for { select { case <-.C: .Add1(ssam.BasicStates.Healthcheck, nil) case <-.Ctx().Done(): .Stop() } } }() } // TODO StableWhen(dur, states, ctx) - like When, but makes sure the state is // stable for the duration. // NewReqAdd creates a new failsafe request to add states to a machine. See // See MutRequest for more info and NewMutRequest for the defaults. func ( am.Api, am.S, am.A) *MutRequest { return NewMutRequest(, am.MutationAdd, , ) } // NewReqAdd1 creates a new failsafe request to add a single state to a machine. // See MutRequest for more info and NewMutRequest for the defaults. func ( am.Api, string, am.A) *MutRequest { return NewReqAdd(, am.S{}, ) } // NewReqRemove creates a new failsafe request to remove states from a machine. // See MutRequest for more info and NewMutRequest for the defaults. func ( am.Api, am.S, am.A) *MutRequest { return NewMutRequest(, am.MutationRemove, , ) } // NewReqRemove1 creates a new failsafe request to remove a single state from a // machine. See MutRequest for more info and NewMutRequest for the defaults. func ( am.Api, string, am.A) *MutRequest { return NewReqRemove(, am.S{}, ) } // MutRequest is a failsafe request for a machine mutation. It supports retries, // backoff, max duration, delay, and timeout policies. It will try to mutate // the machine until the context is done, or the max duration is reached. Queued // mutations are considered supported a success. type MutRequest struct { Mach am.Api MutType am.MutationType States am.S Args am.A // PolicyRetries is the max number of retries. PolicyRetries int // PolicyDelay is the delay before the first retry, then doubles. PolicyDelay time.Duration // PolicyBackoff is the max time to wait between retries. PolicyBackoff time.Duration // PolicyMaxDuration is the max time to wait for the mutation to be accepted. PolicyMaxDuration time.Duration } // NewMutRequest creates a new MutRequest with defaults - 10 retries, 100ms // delay, 5s backoff, and 5s max duration. func ( am.Api, am.MutationType, am.S, am.A, ) *MutRequest { // TODO increase duration for AM_DEBUG return &MutRequest{ Mach: , MutType: , States: , Args: , // defaults PolicyRetries: 10, PolicyDelay: 100 * time.Millisecond, PolicyBackoff: 5 * time.Second, PolicyMaxDuration: 5 * time.Second, } } func ( *MutRequest) ( am.Api, am.MutationType, am.S, am.A, ) *MutRequest { return &MutRequest{ Mach: , MutType: , States: , Args: , PolicyRetries: .PolicyRetries, PolicyBackoff: .PolicyBackoff, PolicyMaxDuration: .PolicyMaxDuration, PolicyDelay: .PolicyDelay, } } func ( *MutRequest) ( int) *MutRequest { .PolicyRetries = return } func ( *MutRequest) ( time.Duration) *MutRequest { .PolicyBackoff = return } func ( *MutRequest) ( time.Duration) *MutRequest { .PolicyMaxDuration = return } func ( *MutRequest) ( time.Duration) *MutRequest { .PolicyDelay = return } func ( *MutRequest) ( context.Context) (am.Result, error) { // policies := retrypolicy.Builder[am.Result](). WithMaxDuration(.PolicyMaxDuration). WithMaxRetries(.PolicyRetries) if .PolicyBackoff != 0 { = .WithBackoff(.PolicyDelay, .PolicyBackoff) } else { = .WithDelay(.PolicyDelay) } , := failsafe.NewExecutor[am.Result](.Build()).WithContext(). Get(.get) return , } func ( *MutRequest) () (am.Result, error) { := .Mach.Add(.States, .Args) return , ResultToErr() } // Wait waits for a duration, or until the context is done. Returns true if the // duration has passed, or false if ctx is done. func ( context.Context, time.Duration) bool { := time.After() select { case <-.Done(): return false case <-: return true } } // Interval runs a function at a given interval, for a given duration, or until // the context is done. Returns nil if the duration has passed, or err is ctx is // done. The function should return false to stop the interval. func ( context.Context, time.Duration, time.Duration, func() bool, ) error { := time.Now().Add() := time.NewTicker() for { select { case <-.Done(): .Stop() return .Err() case <-.C: if time.Now().After() { .Stop() return nil } if !() { .Stop() return nil } } } } // WaitForAll waits for a list of channels to close, or until the context is // done, or until the timeout is reached. Returns nil if all channels are // closed, or ErrTimeout, or ctx.Err(). // // It's advised to check the state ctx after this call, as it usually means // expiration and not a timeout. func ( context.Context, time.Duration, ...<-chan struct{}, ) error { // TODO test // TODO support mach disposal via am.ErrDisposed // exit early if len() == 0 { return nil } if .Err() != nil { return .Err() } // timeout if IsDebug() { = 100 * } := time.After() // wait on all chans for , := range { select { case <-.Done(): // TODO check and log state ctx name return .Err() case <-: return am.ErrTimeout case <-: // pass } } return nil } // WaitForErrAll is like WaitForAll, but also waits on WhenErr of a passed // machine. For state machines with error handling (like retry) it's recommended // to measure machine time of [am.StateException] instead. func ( context.Context, time.Duration, am.Api, ...<-chan struct{}, ) error { // TODO test // exit early if len() == 0 { return nil } if .Err() != nil { return .Err() } // timeout if IsDebug() { = 100 * } := time.After() := .WhenErr() // wait on all chans for , := range { select { case <-.Done(): // TODO check and log state ctx name return .Err() case <-: return fmt.Errorf("%s: %w", am.StateException, .Err()) case <-: return am.ErrTimeout case <-: // pass } } return nil } // WaitForAny waits for any of the channels to close, or until the context is // done, or until the timeout is reached. Returns nil if any channel is // closed, or ErrTimeout, or ctx.Err(). // // It's advised to check the state ctx after this call, as it usually means // expiration and not a timeout. // // This function uses reflection to wait for multiple channels at once. func ( context.Context, time.Duration, ...<-chan struct{}, ) error { // TODO test // TODO reflection-less selectes for 1/2/3 chans // exit early if .Err() != nil { return .Err() } if IsDebug() { = 100 * } := time.After() // create select cases := make([]reflect.SelectCase, 2+len()) [0] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(.Done()), } [1] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(), } for , := range { [+2] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(), } } // wait , , := reflect.Select() switch { case 0: // TODO check and log state ctx name return .Err() case 1: return am.ErrTimeout default: return nil } } // WaitForErrAny is like WaitForAny, but also waits on WhenErr of a passed // machine. For state machines with error handling (like retry) it's recommended // to measure machine time of [am.StateException] instead. func ( context.Context, time.Duration, *am.Machine, ...<-chan struct{}, ) error { // TODO test // TODO reflection-less selectes for 1/2/3 chans // exit early if .Err() != nil { return .Err() } if IsDebug() { = 100 * } := time.After() // create select cases := 3 := make([]reflect.SelectCase, +len()) [0] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(.Done()), } [1] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(), } [2] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(), } for , := range { [+] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(), } } // wait , , := reflect.Select() switch { case 0: // TODO check and log state ctx name (if any) return .Err() case 1: return am.ErrTimeout case 2: return .Err() default: return nil } } // Activations return the number of state activations from the number of ticks // passed. func ( uint64) int { return int(( + 1) / 2) } // ExecAndClose closes the chan when the function ends. func ( func()) <-chan struct{} { := make(chan struct{}) go func() { () close() }() return } // EnableDebugging sets env vars for debugging tested machines with am-dbg on // port 6831. func ( bool) { if { _ = os.Setenv(am.EnvAmDebug, "2") } else { _ = os.Setenv(am.EnvAmDebug, "1") } _ = os.Setenv(amtele.EnvAmDbgAddr, "1") _ = os.Setenv(EnvAmLogFull, "1") SetEnvLogLevel(am.LogOps) // _ = os.Setenv(EnvAmHealthcheck, "1") } // SetEnvLogLevel sets AM_LOG env var to the passed log level. It will affect // all future state machines using MachDebugEnv. func ( am.LogLevel) { _ = os.Setenv(am.EnvAmLog, strconv.Itoa(int())) } // Implements checks is statesChecked implement statesNeeded. It's an equivalent // of Machine.Has(), but for slices of state names, and with better error msgs. func (, am.S) error { for , := range { if !slices.Contains(, ) { return errors.New("missing state: " + ) } } return nil } // ArgsToLogMap converts an [A] (arguments) struct to a map of strings using // `log` tags as keys, and their cased string values. func ( interface{}, int) map[string]string { if == 0 { = max(4, am.LogArgsMaxLen) } := false := make(map[string]string) := reflect.ValueOf().Elem() if !.IsValid() { return } := reflect.TypeOf().Elem() for := 0; < .NumField(); ++ { := .Field() := .Field().Tag.Get("log") if == "" { continue } // check if the field is of a known type switch v := .Interface().(type) { // strings case string: if == "" { continue } [] = case []string: // combine []string into a single comma-separated string if len() == 0 { continue } = true := "" := 0 for , := range { if reflect.ValueOf().IsNil() { continue } if != "" { += ", " } += `"` + utils.TruncateStr(, /2) + `"` if >= /2 { += fmt.Sprintf(" ... (%d more)", len()-) break } ++ } if == "" { continue } [] = case bool: // skip default if ! { continue } [] = fmt.Sprintf("%v", ) case []bool: // combine []bool into a single comma-separated string if len() == 0 { continue } [] = fmt.Sprintf("%v", ) // TODO fix highlighting issues in am-dbg [] = strings.Trim([], "[]") case int: // skip default if == 0 { continue } [] = fmt.Sprintf("%d", ) case []int: // combine []int into a single comma-separated string if len() == 0 { continue } [] = fmt.Sprintf("%d", ) // TODO fix highlighting issues in am-dbg [] = strings.Trim([], "[]") // duration case time.Duration: if .Seconds() == 0 { continue } [] = .String() // MutString() method case fmt.Stringer: if reflect.ValueOf().IsNil() { continue } := .String() if == "" { continue } [] = // skip unknown types, besides []fmt.Stringer default: if .Kind() != reflect.Slice { continue } := .Len() = true := "" := 0 for := 0; < ; ++ { := .Index().Interface() , := .(fmt.Stringer) if && .String() != "" { if != "" { += ", " } += `"` + utils.TruncateStr(.String(), /2) + `"` if >= /2 { += fmt.Sprintf(" ... (%d more)", -) break } } ++ } if == "" { continue } [] = } [] = strings.ReplaceAll([], "\n", " ") if ! && len([]) > { [] = utils.TruncateStr([], ) } } return } // ArgsToArgs converts a typed arguments struct into an overlapping typed // arguments struct. Useful for removing fields which can't be passed over RPC, // and back. Both params should be pointers to a struct and share at least one // field. func [ any]( interface{}, ) { // TODO test := reflect.ValueOf().Elem() := reflect.ValueOf().Elem() for := 0; < .NumField(); ++ { := .Field() := .FieldByName(.Type().Field().Name) if .IsValid() && .CanSet() { .Set() } } return } // ArgsFromMap takes an arguments map [am.A] and copies the fields into a typed // argument struct value. Useful for typed args via RPC. func [ any]( am.A, ) { // TODO support nesting: de-ref any non-nil pointers and re-ref := reflect.ValueOf() for , := range { := .FieldByName() if .IsValid() && .CanSet() { := reflect.ValueOf() if .Type().AssignableTo(.Type()) { .Set() } else if .Type().ConvertibleTo(.Type()) { .Set(.Convert(.Type())) } // TODO else, for struct{} and *struct{}, try to de-serialize JSON } } return } // IsDebug returns true if the process is in a "simple debug mode" via AM_DEBUG. func () bool { return os.Getenv(am.EnvAmDebug) != "" && !IsTestRunner() } // IsTelemetry returns true if the process is in telemetry debug mode. func () bool { return os.Getenv(amtele.EnvAmDbgAddr) != "" && !IsTestRunner() } func () bool { return os.Getenv(EnvAmTestRunner) != "" } // GroupWhen1 will create wait channels for the same state in a group of // machines, or return a [am.ErrStateMissing]. func ( []am.Api, string, context.Context, ) ([]<-chan struct{}, error) { // validate states for , := range { if !.Has1() { return nil, fmt.Errorf( "%w: %s in machine %s", am.ErrStateMissing, , .Id()) } } // create chans var []<-chan struct{} for , := range { = append(, .When1(, )) } return , nil } // TODO func WhenAny1(mach am.Api, states am.S, ctx context.Context) // []<-chan struct{} // RemoveMulti creates a final handler which removes a multi state from a // machine. Useful to avoid FooState-Remove1-Foo repetition. func ( am.Api, string) am.HandlerFinal { return func( *am.Event) { .Remove1(, nil) } } // GetTransitionStates will extract added, removed, and touched states from // transition's clock values and steps. Requires a state names index. // Collecting touched states requires transition steps. func ( *am.Transition, am.S, ) ( am.S, am.S, am.S) { := .TimeBefore := .TimeAfter := func( am.Time, int) bool { return != nil && am.IsActiveTick(.Tick()) } for , := range { if (, ) && !(, ) { = append(, ) } else if !(, ) && (, ) { = append(, ) } else if != nil && .Tick() != .Tick() { // treat multi states as added = append(, ) } } // touched = am.S{} for , := range .Steps { if := .GetFromState(); != "" { = append(, ) } if := .GetToState(); != "" { = append(, ) } } return , , } // TODO batch and merge with am-dbg // func Batch(input <-chan any, state string, arg string, window time.Duration, // maxElement int) { // // } func ( am.Result) error { switch { case am.Canceled: return am.ErrCanceled // case am.Queued: // return am.ErrQueued default: return nil } } type MachGroup []am.Api func ( *MachGroup) ( string) bool { if == nil { return false } for , := range * { if .Not1() { return false } } return true } func ( int) *errgroup.Group { := &errgroup.Group{} .SetLimit() return } // ///// ///// ///// // ///// CONDITION // ///// ///// ///// // Cond is a set of state conditions, which when all met make the condition // true. type Cond struct { // TODO IsMatch, AnyMatch, ... for regexps // Only if all these states are active. Is S // TODO implement // Only if any of these groups of states are active. Any []S // Only if any of these states is active. Any1 S // Only if none of these states are active. Not S // Only if the clock is equal or higher then. Clock am.Clock // TODO time queries // Query string // AnyQuery string // NotQuery string } func ( Cond) () string { return fmt.Sprintf("is: %s, any: %s, not: %s, clock: %v", .Is, .Any1, .Not, .Clock) } // Check compares the specified conditions against the passed machine. When mach // is nil, Check returns false. func ( Cond) ( am.Api) bool { if == nil { return false } if !.Is(.Is) { return false } if .Any1(.Not...) { return false } if len(.Any1) > 0 && !.Any1(.Any1...) { return false } if !.WasClock(.Clock) { return false } return true } // IsEmpty returns false if no condition is defined. func ( Cond) () bool { return .Is == nil && .Any1 == nil && .Not == nil && .Clock == nil } // ///// ///// ///// // ///// STATE LOOP // ///// ///// ///// // TODO thread safety via atomics type StateLoop struct { ResetInterval time.Duration Threshold int loopState string ctxStates am.S mach am.Api ended bool check func() bool lastSTime uint64 lastHTime time.Time // mach time of [ctxStates] when started startSTime uint64 // Start Human Time startHTime time.Time } func ( *StateLoop) () string { := "ok" if .ended { = "ended" } return fmt.Sprintf("StateLoop: %s for %s/%s", , .mach.Id(), .loopState) } // Break breaks the loop. func ( *StateLoop) () { .ended = true .mach.Log(.String()) } // Sum returns a sum of state time from all context states. func ( *StateLoop) () uint64 { return .mach.Time(.ctxStates).Sum(nil) } // Ok returns true if the loop should continue. func ( *StateLoop) ( context.Context) bool { if .ended { return false } else if != nil && .Err() != nil { := fmt.Errorf("loop: arg ctx expired for %s/%s", .mach.Id(), .loopState) .mach.AddErr(, nil) .ended = true return false } else if .mach.Not1(.loopState) { := fmt.Errorf("loop: state ctx expired for %s/%s", .mach.Id(), .loopState) .mach.AddErr(, nil) .ended = true return false } // stop on a function check if .check != nil && !.check() { .ended = true return false } // reset counters on a new interval window := .mach.Time(.ctxStates).Sum(nil) if time.Since(.lastHTime) > .ResetInterval { .lastHTime = time.Now() .lastSTime = return true // check the current interval window } else if int() > .Threshold { := fmt.Errorf("loop: threshold exceeded for %s/%s", .mach.Id(), .loopState) .mach.AddErr(, nil) .ended = true return false } .lastSTime = return true } // Ended returns the ended flag, but does not any context. Useful for // negotiation handles which don't have state context yet. func ( *StateLoop) () bool { return .ended } // NewStateLoop helper creates a state loop guard bound to a specific state // (eg Heartbeat), preventing infinite loops. It monitors context, off states, // ticks of related "context states", and an optional check function. We can // adjust Threshold and ResetInterval. // Not thread safe ATM. func ( *am.Machine, string, func() bool, ) *StateLoop { := .Schema() if !.Has1() { return &StateLoop{ended: true} } // collect related states := S{} // collect dependencies of the loopState = append(, [].Require...) // collect states adding the loop state TODO auto states from the tx? := .Resolver() , := .InboundRelationsOf() for , := range { , := .RelationsBetween(, ) if len() > 0 { = append(, ) } } := &StateLoop{ ResetInterval: time.Second, Threshold: 500, loopState: , mach: , ctxStates: , startHTime: time.Now(), startSTime: .Time().Sum(nil), check: , } .Log(.String()) return } // ///// ///// ///// // ///// LOGGING // ///// ///// ///// // SlogToMachLogOpts complements SlogToMachLog. var SlogToMachLogOpts = &slog.HandlerOptions{ ReplaceAttr: func( []string, slog.Attr) slog.Attr { // omit these if .Key == slog.TimeKey || .Key == slog.LevelKey { return slog.Attr{} } return }, } // SlogToMachLog allows to use the machine logger as a slog sink. // // a.loggerMach = slog.New(slog.NewTextHandler( // amhelp.SlogToMachLog{Mach: mach}, amhelp.SlogToMachLogOpts)) type SlogToMachLog struct { Mach am.Api } func ( SlogToMachLog) ( []byte) ( int, error) { , := strings.CutPrefix(string(), "msg=") .Mach.Log() return len(), nil } // ///// ///// ///// // ///// STATE UTILS // ///// ///// ///// // TagValue returns the value part from a text tag "key:value". For tag without // value, it returns the tag name. func ( []string, string) string { for , := range { // no value if == { return } // check value := + ":" if !strings.HasPrefix(, ) { continue } , := strings.CutPrefix(, ) return } return "" } // TagValueInt is like TagValue, but returns a formatted int. func ( []string, string) int { := TagValue(, ) if == "" { return 0 } , := strconv.Atoi() return } // PrefixStates will prefix all state names with [prefix]. removeDups will skip // overlaps eg "FooFooName" will be "Foo". func ( am.Schema, string, bool, , S, ) am.Schema { // TODO rename to SchemaPrefix = am.CloneSchema() for , := range { if len() > 0 && !slices.Contains(, ) { continue } else if len() > 0 && slices.Contains(, ) { continue } for , := range .After { := if ! || !strings.HasPrefix(, ) { = + } .After[] = } for , := range .Add { := if ! || !strings.HasPrefix(, ) { = + } .Add[] = } for , := range .Remove { := if ! || !strings.HasPrefix(, ) { = + } .Remove[] = } for , := range .Require { := if ! || !strings.HasPrefix(, ) { = + } .Require[] = } := if ! || !strings.HasPrefix(, ) { = + } // replace delete(, ) [] = } return } // CountRelations will count all referenced states in all relations of the // given state. func ( *am.State) int { return len(.Remove) + len(.Add) + len(.Require) + len(.After) } // NewMirror creates a submachine which mirrors the given source machine. If // [flat] is true, only mutations changing the state will be propagated, along // with the currently active states. // // At this point, the handlers' struct needs to be defined manually with fields // of type `am.HandlerFinal`. // // [id] is optional. func ( string, bool, *am.Machine, any, am.S, ) (*am.Machine, error) { // TODO create handlers // TODO dont create a new machine, add to an existing one := reflect.ValueOf() if .Kind() != reflect.Ptr || .Elem().Kind() != reflect.Struct { return nil, errors.New("BindHandlers expects a pointer to a struct") } := .Elem() // detect methods var []string , := am.ListHandlers(, ) if != nil { return nil, fmt.Errorf("listing handlers: %w", ) } // TODO support am.Api if == "" { = "mirror-" + .Id() } := .Schema() := am.S{am.StateException} := am.Schema{} for , := range { [] = am.State{ Multi: [].Multi, } = append(, ) } := am.New(.Ctx(), , &am.Opts{ Id: , Parent: , }) // set up pipes TODO loop over handlers for , := range { var string var bool := .FieldByName() // check handler method if strings.HasSuffix(, am.SuffixState) { = [:len()-len(am.SuffixState)] = true } else if strings.HasSuffix(, am.SuffixEnd) { = [:len()-len(am.SuffixEnd)] } else { return nil, fmt.Errorf("unsupported handler %s for %s", , ) } // pipe var am.HandlerFinal if { // sync active for flats if .Is1() { .Add1(, nil) } if { = ampipe.AddFlat(, , , "") } else { = ampipe.RemoveFlat(, , , "") } } else { if { = ampipe.Add(, , , "") } else { = ampipe.Remove(, , , "") } } .Set(reflect.ValueOf()) } // bind pipe handlers if := .BindHandlers(); != nil { return nil, } return , nil } // CopySchema copies states from the source to target schema, from the passed // list of states. Returns a list of copied states, and an error. CopySchema // verifies states. func ( am.Schema, *am.Machine, am.S) error { if len() == 0 { return nil } := .Schema() for , := range { if , := []; ! { return fmt.Errorf("%w: state %s in source schema", am.ErrStateMissing, ) } [] = [] } := utils.SlicesUniq(slices.Concat(.StateNames(), )) return .SetSchema(, ) } // SchemaHash computes an MD5 hash of the passed schema. The order of states // is not important. func ( am.Schema) string { := "" if == nil { return "" } := slices.Collect(maps.Keys()) sort.Strings() for , := range { += + ":" // properties if [].Auto { += "a," } if [].Multi { += "m," } // relations := slices.Clone([].After) sort.Strings() for , := range { += + "," } += ";" := slices.Clone([].Remove) sort.Strings() for , := range { += + "," } += ";" := slices.Clone([].Add) sort.Strings() for , := range { += + "," } += ";" := slices.Clone([].Require) sort.Strings() for , := range { += + "," } += ";" } return Hash(, 6) } // Hash is a general hashing function. TODO move to pkg/machine func ( string, int) string { := md5.New() .Write([]byte()) if == 0 { = 6 } := hex.EncodeToString(.Sum(nil)) // short hash return [:] } // TODO MachHash(...) // EvalGetter is a syntax sugar for creating getters via Eval functions. Like // any eval, it can end with ErrEvalTimeout. Getting values via channels passed // to mutations is recommended and allows for a custom timeout. func [ any]( context.Context, string, int, *am.Machine, func() (, error), ) (, error) { var var error := func() { , = () } // try at least once for range min(, 1) { if !.Eval("EvGe/"+, , ) { = fmt.Errorf("%w: EvGe/%s", am.ErrEvalTimeout, ) } else { break } } return , } // TODO ChanGetter // ///// ///// ///// // ///// CAN // ///// ///// ///// // CantAdd will confirm that the mutation is impossible. Blocks. func ( am.Api, am.S, am.A) bool { := &am.CheckDone{ Ch: make(chan struct{}), } .CanAdd(, am.PassMerge(, &am.AT{ CheckDone: , })) <-.Ch return !.Canceled } // CantAdd1 is a single-state version of [CantAdd]. func ( am.Api, string, am.A) bool { return .CanAdd(am.S{}, ) == am.Canceled } // CantRemove will confirm that the mutation is impossible. Blocks. func ( am.Api, am.S, am.A) bool { := &am.CheckDone{ Ch: make(chan struct{}), } .CanRemove(, am.PassMerge(, &am.AT{ CheckDone: , })) <-.Ch return .Canceled } // CantRemove1 is a single-state version of [CantRemove]. func ( am.Api, string, am.A) bool { return .CanRemove(am.S{}, ) == am.Canceled } // AskAdd will first check if a mutation isn't impossible and only then try // to mutate the state machine. Causes the negotiation phase to execute twice. // AskAdd BLOCKS. Useful to avoid canceled transitions. // // See also [am.Machine.CanAdd] and [CantAdd]. func ( am.Api, am.S, am.A) am.Result { return AskEvAdd(nil, , , ) } // AskEvAdd is a traced version of [AskAdd]. func ( *am.Event, am.Api, am.S, am.A) am.Result { // only if not impossible if !CantAdd(, , ) { return .EvAdd(, , ) } return am.Canceled } // AskAdd1 is a single-state version of [AskAdd]. func ( am.Api, string, am.A) am.Result { return AskAdd(, S{}, ) } // AskEvAdd1 is a traced version of [AskAdd] for a single state. func ( *am.Event, am.Api, string, am.A) am.Result { return AskEvAdd(, , S{}, ) } // AskRemove will first check if a mutation isn't impossible and only then try // to mutate the state machine. Causes the negotiation phase to execute twice. // AskRemove BLOCKS. Useful to avoid canceled transitions. // // See also [am.Machine.CanRemove] and [CantRemove]. func ( am.Api, am.S, am.A) am.Result { return AskEvRemove(nil, , , ) } // AskEvRemove is a traced version of [AskRemove]. func ( *am.Event, am.Api, am.S, am.A) am.Result { // only if not impossible if !CantRemove(, , ) { return .EvRemove(, , ) } return am.Canceled } // AskRemove1 is a single-state version of [AskRemove]. func ( am.Api, string, am.A) am.Result { return AskRemove(, S{}, ) } // AskEvRemove1 is a traced version of [AskRemove] for a single state. func ( *am.Event, am.Api, string, am.A) am.Result { return AskEvRemove(, , S{}, ) } // DisposeBind registers a disposal handler, using either the dedicated state // set or the machine directly. func ( am.Api, am.HandlerDispose) { // via DisposedStates := ssam.DisposedStates.RegisterDisposal if .Has1() { .Add1(, am.A{ ssam.DisposedArgHandler: , }) return } // directly .OnDispose() } // Dispose triggers a machine disposal, using either the dedicated state set // or the machine directly. func ( am.Api) { // via DisposedStates := ssam.DisposedStates.Disposing if .Has1() { .Add1(, nil) return } // directly .Dispose() } // SchemaStates returns state names from a schema struct in a random order. func ( am.Schema) am.S { return slices.Collect(maps.Keys()) } // SchemaImplements checks if a given schema implements a certain set of states. func ( am.Schema, am.S) error { return Implements(SchemaStates(), ) }