// Package helpers provides experimental and unstable helpers.
package helpers import ( am ) func [ *any]( context.Context, chan ) (, error) { select { case <-.Done(): return nil, .Err() case := <-: return , nil } } // NestedState forwards the mutation to one of the composed submachines. Parent // state should be a Multi state and only called directly (not from a relation). // TODO test case, solve locking by passing the event to the submachine func ( *am.Event, string, func( string) *am.Machine, ) (am.Result, <-chan struct{}, error) { // validate if .Mutation().Type != am.MutationAdd { return am.Canceled, nil, fmt.Errorf( "unsupported nested mutation %s", .Mutation().Type) } // extract ID from params , := .Args[].(string) if ! || == "" { return am.Canceled, nil, am.ErrInvalidArgs } // init vars := () if == nil { return am.Canceled, nil, fmt.Errorf("submachine %s not found", ) } := .Name[0 : len(.Name)-5] // ignore active non-multi nested states := .Tick() := .Schema()[].Multi if am.IsActiveTick() && ! { return am.Canceled, nil, fmt.Errorf("nested state %s is active", ) } // fwd the state := .Add1(, .Args) // handle queuing with a timeout if am.IsQueued() && { // wait for the state to be activated again := .WhenTime(am.S{}, am.Time{ + 2}, nil) return , , nil } else if am.IsQueued() { := .When1(, nil) return , , nil } return , nil, nil } // ErrFromCtxs returns the first non-nil error from a list of contexts. func ( ...context.Context) error { for , := range { if .Err() != nil { return .Err() } } return nil } // TimeMatrix returns a matrix of state clocks for the given machines. func ( []*am.Machine) ([]am.Time, error) { if len() == 0 { return nil, errors.New("no machines provided") } := make([]am.Time, len()) := len([0].Schema()) for , := range { if len(.Schema()) != { return nil, errors.New("machines have different state lengths") } [] = .Time(nil) } return , nil } func ( *am.Machine) ([][]int, error) { := .StateNames() := 2 := 2 := make([][]int, +len()+) for , := range { [+] = make([]int, len()) // -1 shiftRows if < { [] = make([]int, len()) for := range [] { [][] = -1 } } for , := range { , := .Resolver().RelationsBetween(, ) if != nil { return nil, } for , := range { [+][] += int() } } } // -1 shiftRows := len() + for := ; < +; ++ { [] = make([]int, len()) for := range [] { [][] = -1 } } return , nil } func (, *am.Transition, am.S) ([][]int, error) { := 2 := 2 := make([][]int, +len()+) // row 0: currently set [0] = make([]int, len()) for := range { [0][] = 0 if am.IsActiveTick(.TimeBefore[]) { [0][] = 1 } } // row 1: called states [1] = make([]int, len()) for , := range { := 0 if slices.Contains(.CalledStates(), ) { = 1 } [1][] = } // steps for , := range { [+] = make([]int, len()) for , := range { := 0 for , := range .Steps { // TODO style just the cells if .FromState == && ((.ToState == "" && == ) || .ToState == ) { += int(.Type) } [+][] = } } } // ticks := len() + [] = make([]int, len()) for := range { var uint64 if != nil { = .TimeAfter[] } := .TimeAfter[] := - // add row [][] = int() } // active after = len() + + 1 [] = make([]int, len()) for := range { [][] = 0 if am.IsActiveTick(.TimeAfter[]) { [][] = 1 } } return , nil } type FanFn func(num int, state, stateDone string) type FanHandlers struct { Concurrency int AnyState am.HandlerFinal GroupTasks am.S GroupDone am.S } // FanOutIn creates [total] number of state pairs of "Name1" and "Name1Done", as // well as init and merge states ("Name", "NameDone"). [name] is treated as a // namespace and can't have other states within. Retry can be achieved by adding // the init state repetively. FanOutIn can be chained, but it should be called // before any mutations or telemetry (as it changes the state struct). The // returned handlers struct can be used to adjust the concurrency level. func ( *am.Machine, string, , int, FanFn, ) (any, error) { // TODO version with worker as states, instead of tasks as states // main state tries to push a list of tasks through the "worker states", // which results in a Done state. := .Schema() := "Done" if !.Has(am.S{, + }) { return nil, fmt.Errorf("%w: %s", am.ErrStateMissing, ) } := [] // create task states := make(am.S, ) := make(am.S, ) for := range { := strconv.Itoa() [] = + [] = + + } for := range { := strconv.Itoa() // task state [+] = // task completed state, removes task state [++] = am.State{Remove: am.S{ + }} } // inject into a fan-in state := am.StateAdd([+], am.State{ Require: , Remove: am.S{}, }) .Auto = true [+] = := slices.Concat(.StateNames(), , ) := .SetSchema(, ) if != nil { return nil, } // handlers := FanHandlers{ Concurrency: , GroupTasks: , GroupDone: , } // simulate fanout states and the init state // Task(e) // Task1(e) // Task2(e) .AnyState = func( *am.Event) { // get tx info := .Transition().CalledStates() if len() < 1 { return } for , := range { // fan-out state handler, eg Task1 // TODO extract to IsFanOutHandler(called am.S, false) bool if strings.HasPrefix(, ) && len() > len() && !strings.HasSuffix(, ) { , := strings.CutPrefix(, ) , := strconv.Atoi() if != nil { // TODO panic() } // call the function and mark as done (, , +) } // fan-out done handler, eg Task1Done // TODO extract to IsFanOutHandler(called am.S, true) bool if strings.HasPrefix(, ) && len() > len() && strings.HasSuffix(, ) { // call the init state .Add1(, nil) } // fan-out init handler, eg Task if == { // gather remaining ones var am.S for , := range { // check if done or in progress if .Any1(+, ) { continue } = append(, ) } // start N threads := len(.ActiveStates()) // TODO check against running via mach.CountActive for := ; < .Concurrency && len() > 0; ++ { // add a random one := rand.Intn(len()) .Add1([], nil) = slices.Delete(, , +1) } } } } = .BindHandlers(&) if != nil { return nil, } return , nil }