// Package node provides distributed worker pools with supervisors.
package nodeimport (amhelpam)func init() {gob.Register(&ARpc{})}const (// EnvAmNodeLogSupervisor enables extra logging for node supervisor.EnvAmNodeLogSupervisor = "AM_NODE_LOG_SUPERVISOR"// EnvAmNodeLogClient enables extra logging for node client.EnvAmNodeLogClient = "AM_NODE_LOG_CLIENT")// states of a workertypeWorkerStatestringconst (StateInitingWorkerState = "initing"StateRpcWorkerState = "rpc"StateIdleWorkerState = "idle"StateBusyWorkerState = "busy"StateReadyWorkerState = "ready")// ///// ///// /////// ///// ERRORS// ///// ///// /////// sentinel errorsvar (ErrWorker = errors.New("worker error")ErrWorkerMissing = errors.New("worker missing")ErrWorkerHealth = errors.New("worker failed healthcheck")ErrWorkerConn = errors.New("error starting connection")ErrWorkerKill = errors.New("error killing worker")ErrPool = errors.New("pool error")ErrHeartbeat = errors.New("heartbeat failed")ErrRpc = errors.New("rpc error"))// error mutations// AddErrWorker wraps an error in the ErrWorker sentinel and adds to a machine.func ( *am.Event, *am.Machine, error, am.A,) error { = fmt.Errorf("%w: %w", ErrWorker, ) .EvAddErrState(, ssS.ErrWorker, , )return}// AddErrWorkerStr wraps a msg in the ErrWorker sentinel and adds to a machine.// TODO add event paramfunc ( *am.Machine, string, am.A) error { := fmt.Errorf("%w: %s", ErrWorker, ) .AddErrState(ssS.ErrWorker, , )return}// AddErrPool wraps an error in the ErrPool sentinel and adds to a machine.// TODO add event paramfunc ( *am.Machine, error, am.A) error { := fmt.Errorf("%w: %w", ErrPool, ) .AddErrState(ssS.ErrPool, , )return}// AddErrPoolStr wraps a msg in the ErrPool sentinel and adds to a machine.// TODO add event paramfunc ( *am.Machine, string, am.A) error { := fmt.Errorf("%w: %s", ErrPool, ) .AddErrState(ssS.ErrPool, , )return}// AddErrRpc wraps an error in the ErrRpc sentinel and adds to a machine.// TODO add event paramfunc ( *am.Machine, error, am.A) error { := fmt.Errorf("%w: %w", ErrRpc, ) .AddErrState(ssS.ErrNetwork, , )return}// ///// ///// /////// ///// ARGS// ///// ///// /////constAPrefix = "am_node"// A is a struct for node arguments. It's a typesafe alternative to [am.A].typeAstruct {// Id is a machine ID. Id string`log:"id"`// PublicAddr is the public address of a Supervisor or WorkerRpc. PublicAddr string`log:"public_addr"`// LocalAddr is the public address of a Supervisor or WorkerRpc. LocalAddr string`log:"local_addr"`// BootAddr is the local address of the Bootstrap machine. BootAddr string`log:"boot_addr"`// NodesList is a list of available nodes (supervisors' public RPC addresses). NodesList []string// WorkerRpcId is a machine ID of the worker RPC client. WorkerRpcId string`log:"id"`// SuperRpcId is a machine ID of the super RPC client. SuperRpcId string`log:"id"`// non-rpc fields// WorkerRpc is the RPC client connected to a WorkerRpc. WorkerRpc *rpc.Client// Bootstrap is the RPC machine used to connect WorkerRpc to the Supervisor. Bootstrap *bootstrap// Dispose the worker. Dispose bool// WorkerAddr is an index for WorkerInfo. WorkerAddr string// WorkerInfo describes a worker. WorkerInfo *workerInfo// WorkersCh returns a list of workers. This channel has to be buffered. WorkersCh chan<- []*workerInfo// WorkerState is a requested state of workers, eg for listings. WorkerState WorkerState}// ARpc is a subset of A, that can be passed over RPC.typeARpcstruct {// Id is a machine ID. Id string`log:"id"`// PublicAddr is the public address of a Supervisor or Worker. PublicAddr string`log:"public_addr"`// LocalAddr is the public address of a Supervisor, Worker, or [bootstrap]. LocalAddr string`log:"local_addr"`// BootAddr is the local address of the Bootstrap machine. BootAddr string`log:"boot_addr"`// NodesList is a list of available nodes (supervisors' public RPC addresses). NodesList []string// WorkerRpcId is a machine ID of the worker RPC client. WorkerRpcId string`log:"worker_rpc_id"`// SuperRpcId is a machine ID of the super RPC client. SuperRpcId string`log:"super_rpc_id"`}// ParseArgs extracts A from [am.Event.Args][APrefix].func ( am.A) *A {if , := [APrefix].(*ARpc); {returnamhelp.ArgsToArgs(, &A{}) } elseif , := [APrefix].(ARpc); {returnamhelp.ArgsToArgs(&, &A{}) }if , := [APrefix].(*A); != nil {return }return &A{}}// Pass prepares [am.A] from A to pass to further mutations.func ( *A) am.A {returnam.A{APrefix: }}// PassRpc prepares [am.A] from A to pass over RPC.func ( *A) am.A {returnam.A{APrefix: amhelp.ArgsToArgs(, &ARpc{})}}// LogArgs is an args logger for A and [rpc.A].func ( am.A) map[string]string { := rpc.ParseArgs() := ParseArgs()if == nil && == nil {returnnil }returnam.AMerge(amhelp.ArgsToLogMap(, 0), amhelp.ArgsToLogMap(, 0))}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.