// Package rpc is a transparent RPC for state machines.
package rpc import ( amhelp am ) func init() { gob.Register(&ARpc{}) gob.Register(am.Relation(0)) } const ( // EnvAmRpcLogServer enables machine logging for RPC server. EnvAmRpcLogServer = "AM_RPC_LOG_SERVER" // EnvAmRpcLogClient enables machine logging for RPC client. EnvAmRpcLogClient = "AM_RPC_LOG_CLIENT" // EnvAmRpcLogMux enables machine logging for RPC multiplexers. EnvAmRpcLogMux = "AM_RPC_LOG_MUX" // EnvAmRpcDbg enables env-based debugging for RPC components. EnvAmRpcDbg = "AM_RPC_DBG" // EnvAmReplAddr is a REPL address to listen on. "1" expands to 127.0.0.1:0. EnvAmReplAddr = "AM_REPL_ADDR" // EnvAmReplDir is a dir path to save the address file as // $AM_REPL_DIR/mach-id.addr. Optional. EnvAmReplDir = "AM_REPL_DIR" ) var ss = states.SharedStates // ///// ///// ///// // ///// TYPES // ///// ///// ///// // RPC methods type ( ServerMethod enum.Member[string] ClientMethod enum.Member[string] ) var ( // methods define on the server ServerAdd = ServerMethod{"Add"} ServerAddNS = ServerMethod{"AddNS"} ServerRemove = ServerMethod{"Remove"} ServerSet = ServerMethod{"Set"} ServerHello = ServerMethod{"Hello"} ServerHandshake = ServerMethod{"Handshake"} ServerLog = ServerMethod{"Log"} ServerSync = ServerMethod{"Sync"} ServerBye = ServerMethod{"Close"} ServerMethods = enum.New(ServerAdd, ServerAddNS, ServerRemove, ServerSet, ServerHello, ServerHandshake, ServerLog, ServerSync, ServerBye) // methods define on the client ClientSetClock = ClientMethod{"ClientSetClock"} ClientPushAllTicks = ClientMethod{"ClientPushAllTicks"} ClientSendPayload = ClientMethod{"ClientSendPayload"} ClientBye = ClientMethod{"ClientBye"} ClientSchemaChange = ClientMethod{"SchemaChange"} ClientMethods = enum.New(ClientSetClock, ClientPushAllTicks, ClientSendPayload, ClientBye, ClientSchemaChange) ) type ArgsHello struct { ReqSchema bool } // ArgsMut is args for mutation methods. type ArgsMut struct { States []int Args am.A Event *am.Event } type ArgsGet struct { Name string } type ArgsLog struct { Msg string Args []any } type ArgsPayload struct { Name string // Source is the machine ID that sent the payload. Source string // SourceTx is transition ID. SourceTx string // Destination is an optional machine ID that is supposed to receive the // payload. Useful when using rpc.Mux. Destination string // Data is the payload data. The Consumer has to know the type. Data any // Token is a unique random ID for the payload. Autofilled by the server. Token string } type RespHandshake struct { Schema am.Schema Serialized *am.Serialized } type RespResult struct { Clock *ClockMsg Result am.Result } type RespSync struct { Time am.Time QueueTick uint64 } type RespGet struct { Value any } type Empty struct{} type ClockMsg struct { // Updates contain an incremental diffs of [stateIdx, diff]. Updates [][2]int // QueueTick is an incremental diff for the queue tick. QueueTick int // Checksum is the last digit of (TimeSum + QueueTick) Checksum uint8 // DBGSum uint64 // DBGLastSum uint64 // DBGQTick uint64 // DBGLastQTick uint64 } // PushAllTicks TODO implement // PushAllTicks should list all the mutations, with clocks and queue ticks // which then will be processed as a queue. The client reconstructs the tx on // his side. TODO mind partially accepted auto states (fake called states). type PushAllTicks struct { MutationType []am.MutationType CalledStates [][]int ClockMsg []*ClockMsg } // clientServerMethods is a shared interface for RPC client/server. type clientServerMethods interface { GetKind() Kind } type Kind string const ( KindClient Kind = "client" KindServer Kind = "server" ) // ///// ///// ///// // ///// ARGS // ///// ///// ///// const APrefix = "am_rpc" // A represents typed arguments of the RPC package. It's a typesafe alternative // to [am.A]. type A struct { Id string `log:"id"` Name string `log:"name"` MachTime am.Time QueueTick uint64 Payload *ArgsPayload Addr string `log:"addr"` Err error Method string `log:"addr"` StartedAt time.Time Dispose bool // non-rpc fields Client *rpc2.Client } // ARpc is a subset of A, that can be passed over RPC. type ARpc struct { Id string `log:"id"` Name string `log:"name"` MachTime am.Time Payload *ArgsPayload Addr string `log:"addr"` Err error Method string `log:"addr"` StartedAt time.Time Dispose bool } // ParseArgs extracts A from [am.Event.Args][APrefix]. func ( am.A) *A { if , := [APrefix].(*ARpc); != nil { return amhelp.ArgsToArgs(, &A{}) } if , := [APrefix].(*A); != nil { return } return &A{} } // Pass prepares [am.A] from A to pass to further mutations. func ( *A) am.A { return am.A{APrefix: } } // PassRpc prepares [am.A] from A to pass over RPC. func ( *A) am.A { return am.A{APrefix: amhelp.ArgsToArgs(, &ARpc{})} } // LogArgs is an args logger for A. func ( am.A) map[string]string { := ParseArgs() if == nil { return nil } return amhelp.ArgsToLogMap(, 0) } // // DEBUG for perf testing TODO tag // type ClockMsg am.Time // ///// ///// ///// // ///// RPC APIS // ///// ///// ///// // serverRpcMethods is the main RPC server's exposed methods. type serverRpcMethods interface { // rpc RemoteHello(client *rpc2.Client, args *ArgsHello, resp *RespHandshake) error // mutations RemoteAdd(client *rpc2.Client, args *ArgsMut, resp *RespResult) error RemoteRemove(client *rpc2.Client, args *ArgsMut, resp *RespResult) error RemoteSet(client *rpc2.Client, args *ArgsMut, reply *RespResult) error } // clientRpcMethods is the RPC server exposed by the RPC client for bi-di comm. type clientRpcMethods interface { RemoteSetClock(worker *rpc2.Client, args *ClockMsg, resp *Empty) error RemoteSendingPayload( worker *rpc2.Client, file *ArgsPayload, resp *Empty, ) error RemoteSendPayload(worker *rpc2.Client, file *ArgsPayload, resp *Empty) error } // ///// ///// ///// // ///// ERRORS // ///// ///// ///// // sentinel errors var ( // ErrClient group ErrInvalidParams = errors.New("invalid params") ErrInvalidResp = errors.New("invalid response") ErrRpc = errors.New("rpc") ErrNoAccess = errors.New("no access") ErrNoConn = errors.New("not connected") ErrDestination = errors.New("wrong destination") // ErrNetwork group ErrNetwork = errors.New("network error") ErrNetworkTimeout = errors.New("network timeout") // TODO ErrDelivery ) // wrapping error setters func ( *am.Event, *am.Machine, string) { := fmt.Errorf("%w: %s", ErrRpc, ) .EvAddErrState(, ss.ErrRpc, , nil) } func ( *am.Event, *am.Machine, error) { = fmt.Errorf("%w: %w", ErrInvalidParams, ) .AddErrState(ss.ErrRpc, , nil) } func ( *am.Event, *am.Machine, error) { = fmt.Errorf("%w: %w", ErrInvalidResp, ) .AddErrState(ss.ErrRpc, , nil) } func ( *am.Event, *am.Machine, error) { .AddErrState(ss.ErrNetwork, , nil) } func ( *am.Event, *am.Machine, error) { = fmt.Errorf("%w: %w", ErrNoConn, ) .AddErrState(ss.ErrNetwork, , nil) } // AddErr detects sentinels from error msgs and calls the proper error setter. // TODO also return error for compat func ( *am.Event, *am.Machine, string, error) { if != "" { = fmt.Errorf("%w: %s", , ) } if strings.HasPrefix(.Error(), "gob: ") { AddErrResp(, , ) } else if strings.Contains(.Error(), "rpc2: can't find method") { AddErrRpcStr(, , .Error()) } else if strings.Contains(.Error(), "connection is shut down") || strings.Contains(.Error(), "unexpected EOF") { // TODO bind to sentinels io.ErrUnexpectedEOF, rpc2.ErrShutdown .AddErrState(ss.ErrRpc, , nil) } else if strings.Contains(.Error(), "timeout") { AddErrNetwork(, , errors.Join(, ErrNetworkTimeout)) } else if , := .(*net.OpError); { AddErrNetwork(, , ) } else { .AddErr(, nil) } } // ExceptionHandler is a shared exception handler for RPC server and // client. type ExceptionHandler struct { *am.ExceptionHandler } func ( *ExceptionHandler) ( *am.Event) bool { := ParseArgs(.Args) := .Machine() := .Has(am.S{ssC.Disconnecting, ssC.Disconnected}) if errors.Is(.Err, ErrNetwork) && && .Any1(ssC.Disconnecting, ssC.Disconnected) { // skip network errors on client disconnect .Machine().Log("ignoring ErrNetwork on Disconnecting/Disconnected") return false } return true } // ///// ///// ///// // ///// LOGGER // ///// ///// ///// type semLogger struct { mach *Worker steps atomic.Bool graph atomic.Bool } // implement [SemLogger] var _ am.SemLogger = &semLogger{} func ( *semLogger) ( am.LogArgsMapperFn) { // TODO } func ( *semLogger) () am.LogArgsMapperFn { // TODO return nil } func ( *semLogger) ( bool) { // TODO } func ( *semLogger) () bool { return false } func ( *semLogger) ( am.LoggerFn) { if == nil { .mach.logger.Store(nil) return } .mach.logger.Store(&) } func ( *semLogger) () am.LoggerFn { if := .mach.logger.Load(); != nil { return * } return nil } func ( *semLogger) ( am.LogLevel) { .mach.logLevel.Store(&) } func ( *semLogger) () am.LogLevel { return *.mach.logLevel.Load() } func ( *semLogger) ( am.LogLevel) { var am.LoggerFn = func( am.LogLevel, string, ...any) { // no-op } .mach.logger.Store(&) .mach.logLevel.Store(&) } func ( *semLogger) ( func( string, ...any), am.LogLevel, ) { var am.LoggerFn = func( am.LogLevel, string, ...any) { (, ...) } .mach.logger.Store(&) .mach.logLevel.Store(&) } func ( *semLogger) ( bool, , string) { := "remove" if { = "add" } .mach.log(am.LogOps, "[pipe-out:%s] %s to %s", , , ) } func ( *semLogger) ( bool, , string) { := "remove" if { = "add" } .mach.log(am.LogOps, "[pipe-in:%s] %s from %s", , , ) } func ( *semLogger) ( string) { .mach.log(am.LogOps, "[pipe:gc] %s", ) } func ( *semLogger) () bool { return .steps.Load() } func ( *semLogger) ( bool) { .steps.Store() } func ( *semLogger) () bool { return .graph.Load() } func ( *semLogger) ( bool) { .graph.Store() } // TODO more data types func ( *semLogger) ( bool) { // TODO } func ( *semLogger) () bool { return true } func ( *semLogger) ( bool) { // TODO } func ( *semLogger) () bool { return true } func ( *semLogger) ( bool) { // TODO params for synthetic log } func ( *semLogger) () bool { return true } func ( *semLogger) ( bool) { // TODO } func ( *semLogger) () bool { return true } func ( *semLogger) ( bool) { // TODO } func ( *semLogger) () bool { return true } // ///// ///// ///// // ///// REMOTE HANDLERS // ///// ///// ///// type remoteHandler struct { h any funcNames []string funcCache map[string]reflect.Value missingCache map[string]struct{} } func newRemoteHandler( any, []string, ) *remoteHandler { return &remoteHandler{ h: , funcNames: , funcCache: make(map[string]reflect.Value), missingCache: make(map[string]struct{}), } } // ///// ///// ///// // ///// TRACERS // ///// ///// ///// // WorkerTracer is a tracer for local worker machines (event source). type WorkerTracer struct { *am.NoOpTracer s *Server } func ( *WorkerTracer) ( *am.Transition) { // TODO channel and value in atomic, skip dups (smaller tick values) go func() { .s.mutMx.Lock() defer .s.mutMx.Unlock() .s.pushClockUpdate(false) }() } func ( *WorkerTracer) ( am.Api, am.Schema) { go func() { .s.mutMx.Lock() defer .s.mutMx.Unlock() if := .s.rpcClient.Load(); != nil { := &RespHandshake{ Schema: .Schema(), Serialized: .Export(), } := .CallWithContext(.Ctx(), ClientSchemaChange.Value, , &Empty{}) .AddErr(, nil) } }() } // TODO implement as an optimization // func (t *WorkerTracer) QueueEnd(_ *am.Transition) { // t.s.pushClockUpdate() // } // ///// ///// ///// // ///// MISC // ///// ///// ///// // MachReplEnv sets up a machine for a REPL connection in case AM_REPL_ADDR env // var is set. See MachRepl. func ( am.Api) <-chan error { := os.Getenv(EnvAmReplAddr) := os.Getenv(EnvAmReplDir) := make(chan error) switch { case "": return case "1": // expand 1 to default = "" } MachRepl(, , , nil, nil) return } // MachRepl sets up a machine for a REPL connection, which allows for // mutations, like any other RPC connection. See [/tools/cmd/arpc] for usage. // This function is considered a debugging helper and can panic. // // addr: address to listen on, default to 127.0.0.1:0 // addrDir: optional dir path to save the address file as addrDir/mach-id.addr. // addrCh: optional channel to send the address to, once ready // errCh: optional channel to send err to, once ready func ( am.Api, , string, chan<- string, chan<- error, ) { if amhelp.IsTestRunner() { return } if == "" { = "127.0.0.1:0" } if .HasHandlers() && !.Has(ssW.Names()) { := fmt.Errorf( "%w: REPL source has to implement pkg/rpc/states/WorkerStatesDef", am.ErrSchema) // panic only early panic() } , := NewMux(.Ctx(), "repl-"+.Id(), nil, &MuxOpts{ Parent: , }) // panic only early if != nil { panic() } .Addr = .Source = .Start() if == nil && == "" { if != nil { close() } return } go func() { // dispose ret channels defer func() { if != nil { close() } if != nil { close() } }() // prep the dir := false if != "" { if , := os.Stat(); os.IsNotExist() { := os.MkdirAll(, 0o755) if == nil { = true } else if != nil { <- } } else { = true } } // wait for an addr <-.Mach.When1(ssM.Ready, nil) if != nil { <- .Addr } // save to dir if && != "" { = os.WriteFile( filepath.Join(, .Id()+".addr"), []byte(.Addr), 0o644, ) if != nil { <- } } }() } // // DEBUG for perf testing // func NewClockMsg(before, after am.Time) ClockMsg { // return ClockMsg(after) // } // // // DEBUG for perf testing // func ClockFromMsg(before am.Time, msg ClockMsg) am.Time { // return am.Time(msg) // } func ( uint64, uint64) uint8 { return uint8(+) % 10 } // NewClockMsg create a new diff update based on the last and current machine // time, and last and current queue tick. func ( uint64, , am.Time, , uint64, ) *ClockMsg { := &ClockMsg{ QueueTick: int() - int(), Checksum: Checksum(, ), // debug // DBGSum: tSum, // DBGQTick: qAfter, // DBGLastSum: tBefore.Sum(), // DBGLastQTick: qBefore, } for := range { // first call or new schema if == nil || >= len() { // TODO test this path, eg schema update .Updates = append(.Updates, [2]int{, int([])}) // regular update } else if [] != [] { .Updates = append(.Updates, [2]int{, int([] - [])}) } } return } func ( am.Time, uint64, *ClockMsg, ) (am.Time, uint64) { // calculate := slices.Clone() := len() for , := range .Updates { := [0] := [1] if >= { // TODO continue } [] += uint64() } := + uint64(.QueueTick) return , } func ( net.Listener, string, chan<- int64, <-chan struct{}, ) { defer .Close() // fmt.Println("Listening on " + listenOn) // callFailsafe the destination , := net.Dial("tcp4", ) if != nil { fmt.Println("Error connecting to destination:", .Error()) return } defer .Close() // wait for the connection , := .Accept() if != nil { fmt.Println("Error accepting connection:", .Error()) return } defer .Close() // forward data bidirectionally := sync.WaitGroup{} .Add(2) := atomic.Int64{} go func() { , := io.Copy(, ) .Add() .Done() }() go func() { , := io.Copy(, ) .Add() .Done() }() // wait for the test and forwarding to finish <- // fmt.Printf("Closing counter...\n") _ = .Close() _ = .Close() _ = .Close() .Wait() := .Load() // fmt.Printf("Forwarded %d bytes\n", c) <- } func newClosedChan() chan struct{} { := make(chan struct{}) close() return }