// Package repl provides a REPL and CLI functionality for aRPC connections.
package repl import ( amhelp am ssrpc ssam ) type Repl struct { // TODO JSON output *am.ExceptionHandler Mach *am.Machine // TODO keep NAMED addresses eg CLI0, filename1, filename2, CLI1 // and manipulate when watching for changes, keeping the prev ones Addrs []string Cmd *cobra.Command C *console.Console // am-dbg address for new clients DbgAddr string // TODO avoid empty entries rpcClients []*rpc.Client lastMsg string selectedMach string } func ( context.Context, string) (*Repl, error) { := &Repl{} // REPL machine , := am.NewCommon(, , states.ReplSchema, ss.Names(), , nil, &am.Opts{ DontLogId: true, HandlerTimeout: 1 * time.Second, Id: "r-" + , Tags: []string{"arpc-repl"}, }) if != nil { return nil, } .SetGroups(states.ReplGroups, ss) // add Disposed handlers := ssam.DisposedHandlers{} = .BindHandlers(&) if != nil { return nil, } .Mach = .SemLogger().SetArgsMapper(LogArgs) return , nil } // ///// ///// ///// // ///// HANDLERS // ///// ///// ///// func ( *Repl) ( *am.Event) bool { return len(.Addrs) > 0 } func ( *Repl) ( *am.Event) { // init clients TODO avoid empty entries .rpcClients = make([]*rpc.Client, len(.Addrs)) for , := range .Addrs { // create , := .newRpcClient(, strconv.Itoa()) if != nil { .Mach.EvAddErr(, , nil) continue } // save .rpcClients[] = } .Mach.Add1(ss.Connecting, nil) } func ( *Repl) ( *am.Event) bool { return len(.rpcClients) > 0 } func ( *Repl) ( *am.Event) { // reconn existing clients for , := range .rpcClients { if == nil { continue } if .Mach.Not1(ss.Start) { .Start() } else { .Mach.Add1(ssrpc.ClientStates.Connecting, nil) } } } func ( *Repl) ( *am.Event) bool { for , := range .rpcClients { if == nil { continue } if .Mach.Is1(ssrpc.ClientStates.Connecting) { return false } } return true } func ( *Repl) ( *am.Event) bool { return .Transition().Mutation.Source != nil } func ( *Repl) ( *am.Event) { := .Transition().Mutation .Mach.Log("Connected to %s", .Source.MachId) .Mach.Add1(ss.Connected, nil) .Mach.Add1(ss.ConnectedFully, nil) } func ( *Repl) ( *am.Event) bool { return .Transition().Mutation.Source != nil } func ( *Repl) ( *am.Event) { := .Transition().Mutation .Mach.Log("Disconnected from %s", .Source.MachId) .Mach.Remove1(ss.Connected, nil) .Mach.Remove1(ss.ConnectedFully, nil) } func ( *Repl) ( *am.Event) { := .Mach.NewStateCtx(ss.ReplMode) // console & shell fmt.Println("Welcome to aRPC! Tab to start, help, or Ctrl+D to exit.") , := historyFromFile(historyPath) if != nil { .Mach.Log("failed to open history file %s", historyPath) } .injectCompletions() .C = console.New("arpc") .C.NewlineAfter = false .C.NewlineBefore = false := .C.Shell() _ = .Config.Set("completion-ignore-case", true) _ = .Config.Set("show-all-if-ambiguous", true) _ = .Config.Set("show-all-if-unmodified", true) // TODO bind ctrl+x to copy .Keymap.Register(map[string]func(){ "copy": func() { println("COPY CMD") }, }) = .Config.Bind("emacs", inputrc.Unescape(`\C-x`), "copy", false) if != nil { // TODO debug println("ERROR: ", ) } // TODO bind chained complete (always open) // tab // sh.Config.Bind("menu-select", // inputrc.Unescape(`\C-i`), "accept-and-menu-complete", false) // menu := .C.ActiveMenu() .AddHistorySource("local history", ) .SetCommands(func() *cobra.Command { return .Cmd }) .AddInterrupt(io.EOF, .exitCtrlD) setupPrompt() // fork and start go func() { := .C.StartContext() if != nil { .Mach.AddErr(, nil) } }() } func ( *Repl) ( *am.Event) bool { := ParseArgs(.Args) // confirm theres a Ready worker var *rpc.NetworkMachine for , := range .rpcClients { if == nil { continue } if .Mach.Not1(ssrpc.ClientStates.Ready) { continue } if .NetMach.RemoteId() == .MachId || .MachId == "." { = .NetMach break } } if == nil { return false } if nil != amhelp.Implements(.StateNames(), .States) { return false } // TODO confirm args integrity return true } func ( *Repl) ( *am.Event) { := ParseArgs(.Args) // confirm theres a Ready worker var *rpc.NetworkMachine for , := range .rpcClients { if == nil { continue } if .Mach.Not1(ssrpc.ClientStates.Ready) { continue } if .NetMach.RemoteId() == .MachId || .MachId == "." { = .NetMach break } } // pass cli args := am.A{} for , := range .MutArgs[0] { := .MutArgs[1][] [] = } := .Add(.States, ) .Print(.String()) } func ( *Repl) ( *am.Event) bool { := ParseArgs(.Args) // confirm theres a Ready worker var *rpc.NetworkMachine for , := range .rpcClients { if == nil { continue } if .Mach.Not1(ssrpc.ClientStates.Ready) { continue } if .NetMach.RemoteId() == .MachId || .MachId == "." { = .NetMach break } } if == nil { return false } if nil != amhelp.Implements(.StateNames(), .States) { return false } // TODO confirm args integrity return true } func ( *Repl) ( *am.Event) { := ParseArgs(.Args) // confirm theres a Ready worker var *rpc.NetworkMachine for , := range .rpcClients { if == nil { continue } if .Mach.Not1(ssrpc.ClientStates.Ready) { continue } if .NetMach.RemoteId() == .MachId || .MachId == "." { = .NetMach break } } // pass cli args := am.A{} for , := range .MutArgs[0] { := .MutArgs[1][] [] = } := .Remove(.States, ) .Print(.String()) } func ( *Repl) ( *am.Event) bool { := ParseArgs(.Args) if len(.MachIds) == 0 { return false } // find at least one good match // TODO --strict var bool for , := range .rpcClients { if slices.Contains(.MachIds, .NetMach.RemoteId()) && amhelp.Implements(.NetMach.StateNames(), .States) == nil { = true break } } if ! { return false } // TODO confirm args integrity return true } func ( *Repl) ( *am.Event) { := ParseArgs(.Args) // pass cli args := am.A{} for , := range .MutArgs[0] { := .MutArgs[1][] [] = } , , := 0, 0, 0 for , := range .rpcClients { if == nil { continue } if !slices.Contains(.MachIds, .NetMach.RemoteId()) || amhelp.Implements(.NetMach.StateNames(), .States) != nil { continue } // count results switch .NetMach.Add(.States, ) { case am.Executed: ++ case am.Canceled: ++ default: ++ } } .Print(` Executed: %d Queued: %d Canceled: %d`, , , ) } func ( *Repl) ( *am.Event) bool { := ParseArgs(.Args) if len(.MachIds) == 0 { return false } // find at least one good match // TODO --strict var bool for , := range .rpcClients { if == nil { continue } if slices.Contains(.MachIds, .NetMach.RemoteId()) && amhelp.Implements(.NetMach.StateNames(), .States) == nil { = true break } } if ! { return false } // TODO confirm args integrity return true } func ( *Repl) ( *am.Event) { := ParseArgs(.Args) // pass cli args := am.A{} for , := range .MutArgs[0] { := .MutArgs[1][] [] = } , , := 0, 0, 0 for , := range .rpcClients { if == nil { continue } if !slices.Contains(.MachIds, .NetMach.RemoteId()) || amhelp.Implements(.NetMach.StateNames(), .States) != nil { continue } // count results switch .NetMach.Add(.States, ) { case am.Executed: ++ case am.Canceled: ++ default: ++ } } .Print(` Executed: %d Queued: %d Canceled: %d`, , , ) } func ( *Repl) ( *am.Event) bool { := ParseArgs(.Args) return != nil && .RpcCh != nil && // check buffered channel cap(.RpcCh) > 0 } func ( *Repl) ( *am.Event) { // TODO maybe merge with pkg/pubsub.Topic#ListMachineStates // TODO extract to pkg/helpers.MachGroup .Mach.Remove1(ss.ListMachines, nil) := ParseArgs(.Args) := .ListFilters if == nil { = &ListFilters{} } := .RpcCh := make([]*rpc.Client, 0) for , := range .rpcClients { if == nil { continue } // start time // TODO optimize if .StartIdx > 0 && < .StartIdx { continue } // limit the number of results if .Limit > 0 && len() >= .Limit { break } // Schema-less (ATM) // TODO list NoSchema as disconnected, but marked if .Mach.Tick(ssrpc.ClientStates.Ready) == 0 { if .NoSchema { = append(, ) } continue } // conn status if .SkipDisconn && .Mach.Not1(ssrpc.ClientStates.Ready) { continue } := .NetMach := .RemoteId() // ID // exact if .IdExact != "" && != .IdExact { continue } // regexp if .IdRegexp != nil && !.IdRegexp.MatchString() { continue } // substring if .IdSubstr != "" && !strings.Contains(, .IdSubstr) { continue } // prefix match if .IdPrefix != "" && !strings.HasPrefix(, .IdPrefix) { continue } // suffix match if .IdSuffix != "" && !strings.HasSuffix(, .IdSuffix) { continue } // parent ID if .Parent != "" && .ParentId() != .Parent { continue } // mtime // min if .MtimeMin > 0 && .MtimeMin > .Time(nil).Sum(nil) { continue } // max if .MtimeMax > 0 && .MtimeMax < .Time(nil).Sum(nil) { continue } // states := .StateNames() // check if inactive states match if len(.StatesActive) > 0 { // missing states if amhelp.Implements(, .StatesActive) != nil { continue } if !.Is(.StatesActive) { continue } } // check if inactive states match if len(.StatesInactive) > 0 { // missing states if amhelp.Implements(, .StatesInactive) != nil { continue } if !.Not(.StatesInactive) { continue } } = append(, ) } <- } func ( *Repl) ( *am.Event) bool { // enter only if all ready := 0 for , := range .rpcClients { if == nil { continue } if .Mach.Is1(ssrpc.ClientStates.Ready) { ++ } } return > 0 && == len(.rpcClients) } func ( *Repl) ( *am.Event) bool { // exit if going to Disconnecting / Disconnected // TODO use TimeIndex := .Transition().TargetStates() := .Mutation() := .Machine().Index1(ss.Start) if slices.Contains(, ss.Disconnected) || slices.Contains(, ss.Disconnecting) || // dont block a restart (.Type == am.MutationRemove && .IsCalled()) { return true } // exit only if all ready := 0 for , := range .rpcClients { if == nil { continue } if .Mach.Is1(ssrpc.ClientStates.Ready) { ++ } } return == 0 && == len(.rpcClients) } func ( *Repl) ( *am.Event) { := .Mach.NewStateCtx(ss.Disconnecting) go func() { if .Err() != nil { return // expired } // TODO parallel for , := range .rpcClients { if == nil { continue } .Stop(, false) } .Mach.Add1(ss.Disconnected, nil) }() } func ( *Repl) ( *am.Event) { for , := range .rpcClients { if == nil { continue } .Stop(context.Background(), false) } } func ( *Repl) ( *am.Event) { .Mach.Remove1(ss.Connecting, nil) .Mach.Add1(ss.ConnectedFully, nil) } func ( *Repl) ( *am.Event) bool { // exit if going to Disconnecting / Disconnected := .Transition().TargetStates() if slices.Contains(, ss.Disconnected) || slices.Contains(, ss.Disconnecting) { return true } // exit only if none connected := 0 for , := range .rpcClients { if == nil { continue } if .Mach.Is1(ssrpc.ClientStates.Ready) { ++ } } return == 0 } func ( *Repl) ( *am.Event) bool { return len(ParseArgs(.Args).Addrs) > 0 } // TODO avoid a full restart func ( *Repl) ( *am.Event) { := .Mach // use the new addr list and dispose .Addrs = ParseArgs(.Args).Addrs := .WhenTicks(ss.Disconnected, 1, nil) .Add1(ss.Disconnecting, nil) // then restart go func() { // TODO not safe <- .Remove(S{ss.Start, ss.Disconnected}, nil) .Add(S{ss.Start, ss.Connecting}, nil) }() } func ( *Repl) ( *am.Event) { := .Mutation() if .Source == nil { .Mach.Log("unknown RPC client") return } := .Source.MachId .Mach.Log("RPC client %s disconnected", ) } // ///// ///// ///// // ///// METHODS // ///// ///// ///// func ( *Repl) ( string, ...any) { = utils.Sp(, ...) := strings.Split(, "\n") for , := range { fmt.Print("\u001B[90m=>\u001B[0m " + + "\n") } } func ( *Repl) ( string, ...any) { = utils.Sp(, ...) := strings.Split(, "\n") for , := range { fmt.Print("\u001B[31merr>\u001B[0m " + + "\n") } } // PrintMsg prints a message about the prompt. func ( *Repl) ( string, ...any) { = fmt.Sprintf(, ...) if .lastMsg == { return } .lastMsg = _, _ = .C.TransientPrintf("%s", ) } func ( *Repl) ( *ListFilters) ([]*rpc.Client, error) { := make(chan []*rpc.Client, 1) := .Mach.Add1(ss.ListMachines, Pass(&A{ RpcCh: , ListFilters: , })) if == am.Canceled { return nil, fmt.Errorf("list unavailable: %w", am.ErrCanceled) } return <-, nil } // NetMach returns an RPC worker with a given ID, or nil. func ( *Repl) ( string) *rpc.NetworkMachine { // first connected TODO document if == "." { for , := range .rpcClients { if == nil { continue } if .Mach.Is1(ssrpc.ClientStates.Ready) { return .NetMach } } return nil } , := .ListMachines(&ListFilters{IdExact: }) if len() == 0 { .PrintErr("mach ID unknown") return nil } return [0].NetMach } // NetMachArgs returns a list of registered typed args for a given machine. func ( *Repl) ( string) []string { // first connected TODO document if == "." { for , := range .rpcClients { if == nil { continue } if .Mach.Is1(ssrpc.ClientStates.Ready) { return .Args() } } return nil } , := .ListMachines(&ListFilters{IdExact: }) if len() == 0 { .PrintErr("mach ID unknown") return nil } return [0].Args() } func ( *Repl) (, string) (*rpc.Client, error) { := .Mach.NewStateCtx(ss.Start) // empty schema RPC client (`rc-WDHASH-0` for 1st client) := strings.Replace(.Mach.Id(), "repl-", "", 1) + "-" + , := rpc.NewClient(, , , am.Schema{}, &rpc.ClientOpts{Parent: .Mach}) if != nil { return nil, } // telemetry if .DbgAddr != "" { _ = amhelp.MachDebug(.Mach, .DbgAddr, .Mach.SemLogger().Level(), false, amhelp.SemConfigEnv(true)) .LogEnabled = true } // bind pipes = pipes.BindReady(.Mach, .Mach, ss.RpcConn, ss.RpcDisconn) if != nil { return nil, } = pipes.BindErr(.Mach, .Mach, ss.ErrNetwork) if != nil { return nil, } return , nil } func ( *Repl) ( *console.Console) { .Mach.Add1(ss.Disposing, nil) // TODO fix console ctx not being honored os.Exit(0) // reader := bufio.NewReader(os.Stdin) // // fmt.Print("Confirm exit (Y/y): ") // // text, _ := reader.ReadString('\n') // answer := strings.TrimSpace(text) // // if (answer == "Y") || (answer == "y") { // r.Mach.Add(ss.Disposing, nil) // } } // ///// ///// ///// // ///// COMPLETION // ///// ///// ///// // injectCompletions switches completion types. func ( *Repl) () { for , := range .Cmd.Commands() { switch .Name() { // TODO enum, not strings case "add", "remove", "when", "when-not": // MACH STATES .ValidArgsFunction = .newCompletionFunc(.completeMachStates) case "inspect", "mach", "time": // MACH .ValidArgsFunction = .newCompletionFunc(.completeMach) case "when-time": // MACH and state flag .ValidArgsFunction = .newCompletionFunc(.completeMach) := .newCompletionFunc(.completeStates) _ = .RegisterFlagCompletionFunc("state", ) case "group-add", "group-remove": // MACH .ValidArgsFunction = .newCompletionFunc(.completeAllStatesFlags) := .newCompletionFunc(.completeAllStates) // TODO return err _ = .RegisterFlagCompletionFunc("active", ) _ = .RegisterFlagCompletionFunc("inactive", ) case "list": // flags only := .newCompletionFunc(.completeAllStates) // TODO return err _ = .RegisterFlagCompletionFunc("active", ) _ = .RegisterFlagCompletionFunc("inactive", ) .ValidArgsFunction = .newCompletionFunc(.completeFlags) } } } func ( *Repl) ( completionFunc) completionFunc { return func( *cobra.Command, []string, string, ) ([]string, cobra.ShellCompDirective) { if .Mach.Not1(ss.Connected) { .PrintMsg("not connected") return []string{}, cobra.ShellCompDirectiveNoFileComp } return (, , ) } } func ( *Repl) ( *cobra.Command, []string, string, ) ([]string, cobra.ShellCompDirective) { if len() == 0 { return []string{}, cobra.ShellCompDirectiveNoFileComp } // states var *rpc.NetworkMachine for , := range .rpcClients { if == nil { continue } // dotmach if [0] == "." { = .NetMach break } if .NetMach.RemoteId() == [0] { = .NetMach break } } if == nil { return []string{}, cobra.ShellCompDirectiveNoFileComp } := .StateNames() // flags completion when mach and states are passed if len() == 2 { = append(, listCmdFlags()...) } return completionsNarrowDown(, ) } func ( *Repl) ( *cobra.Command, []string, string, ) ([]string, cobra.ShellCompDirective) { := S{} for , := range .rpcClients { if == nil { continue } = append(, .NetMach.StateNames()...) } // flags completion when mach and states are passed if len() == 1 { = append(, listCmdFlags()...) } = utils.SlicesUniq() return completionsNarrowDown(, ) } func ( *Repl) ( *cobra.Command, []string, string, ) ([]string, cobra.ShellCompDirective) { := S{} for , := range .rpcClients { if == nil { continue } = append(, .NetMach.StateNames()...) } = utils.SlicesUniq() return completionsNarrowDown(, ) } // completeMachStates returns a list of completions for positional arguments // and flags for MACH STATE commands. func ( *Repl) ( *cobra.Command, []string, string, ) ([]string, cobra.ShellCompDirective) { var []string := len() switch { case 0: // mach , := .ListMachines(nil) = make([]string, len()) for , := range { [] = .NetMach.RemoteId() } // dotmach is the first connected machine = append(, ".") default: // states var *rpc.NetworkMachine for , := range .rpcClients { if == nil { continue } // dotmach if [0] == "." { = .NetMach break } if .NetMach.RemoteId() == [0] { = .NetMach break } } if == nil { return []string{}, cobra.ShellCompDirectiveNoFileComp } = .StateNames() .selectedMach = .RemoteId() // filter available states if .Name() == "add" { = slices.DeleteFunc(, func( string) bool { return .Is1() && !amhelp.IsMulti(, ) }) } else if .Name() == "remove" { = slices.DeleteFunc(, func( string) bool { return .Not1() }) } // flags completion when mach and states are passed if == 2 { = append(, listCmdFlags()...) } } // either --arg or --var, but only when a state name present if == 1 { = slices.DeleteFunc(, func( string) bool { return == "--val" || == "--arg" }) // states present } else if > 1 { , , := .C.Shell().Line().TokenizeSpace(0) // TODO fix missing --arg --val if !slices.Contains(, "--arg") { = append(, "--arg") } if !slices.Contains(, "--val") { = append(, "--val") } // TODO complete --val with example tags := 0 := 0 := false := []string{} for , := range { = strings.TrimSpace() if == "--arg" { ++ } else if == "--val" { ++ } if strings.HasPrefix(, "--") { = true } else { = append(, ) } } // rm already present = slices.DeleteFunc(, func( string) bool { return slices.Contains(, ) }) // --arg first, then --val if != { = slices.DeleteFunc(, func( string) bool { return == "--arg" }) } else { = slices.DeleteFunc(, func( string) bool { return == "--val" }) } // TODO remove duped arg names // args only after args if { = slices.DeleteFunc(, func( string) bool { return !strings.HasPrefix(, "--") }) } } return completionsNarrowDown(, ) } // completeMach returns a list of completion for a positional arguments // and flags for MACH commands. func ( *Repl) ( *cobra.Command, []string, string, ) ([]string, cobra.ShellCompDirective) { var []string if len() == 0 { = make([]string, len(.rpcClients)) for , := range .rpcClients { if == nil { continue } [] = .NetMach.RemoteId() } // . is the first connected machine = append(, ".") } // flags completion when mach and states are passed if len() == 1 { = append(, listCmdFlags()...) } return completionsNarrowDown(, ) } func ( *Repl) ( *cobra.Command, []string, string, ) ([]string, cobra.ShellCompDirective) { return listCmdFlags(), cobra.ShellCompDirectiveNoFileComp }