package node
import (
"context"
"errors"
"fmt"
"os"
"slices"
"time"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/node/states"
"github.com/pancsta/asyncmachine-go/pkg/rpc"
ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
ampipe "github.com/pancsta/asyncmachine-go/pkg/states/pipes"
)
var ssC = states .ClientStates
type Client struct {
*am .ExceptionHandler
Mach *am .Machine
Name string
SuperAddr string
LogEnabled bool
LeaveSuper bool
ConnTimeout time .Duration
RpcSuper *rpc .Client
RpcWorker *rpc .Client
nodeList []string
schemaClient am .Schema
schemaWorker am .Schema
}
var _ ssrpc .ConsumerHandlers = &Client {}
func NewClient (ctx context .Context , clientId string , workerKind string ,
workerSchema am .Schema , opts *ClientOpts ,
) (*Client , error ) {
if opts == nil {
opts = &ClientOpts {}
}
if opts .ClientSchema == nil {
opts .ClientSchema = states .ClientSchema
}
if opts .ClientStates == nil {
opts .ClientStates = states .ClientStates .Names ()
}
if clientId == "" {
return nil , errors .New ("client: clientId required" )
}
if workerSchema == nil {
return nil , errors .New ("client: workerSchema required" )
}
if workerKind == "" {
return nil , errors .New ("client: workerKind required" )
}
err := amhelp .SchemaImplements (workerSchema , states .WorkerStates .Names ())
if err != nil {
err := fmt .Errorf (
"worker has to implement am/node/states/WorkerStates: %w" , err )
return nil , err
}
name := fmt .Sprintf ("%s-%s-%s" , workerKind , clientId ,
time .Now ().Format ("150405" ))
c := &Client {
Name : name ,
ConnTimeout : 5 * time .Second ,
LogEnabled : os .Getenv (EnvAmNodeLogClient ) != "" ,
schemaClient : opts .ClientSchema ,
schemaWorker : workerSchema ,
}
if amhelp .IsDebug () {
c .ConnTimeout = 10 * c .ConnTimeout
}
mach , err := am .NewCommon (ctx , "nc-" +name , c .schemaClient ,
opts .ClientStates , c , opts .Parent , &am .Opts {
Tags : slices .Concat ([]string {"node-client" }, opts .Tags ),
})
if err != nil {
return nil , err
}
mach .SemLogger ().SetArgsMapper (LogArgs )
c .Mach = mach
_ = amhelp .MachDebugEnv (mach )
err = amhelp .Implements (mach .StateNames (), ssC .Names ())
if err != nil {
err := fmt .Errorf (
"client has to implement am/node/states/ClientStates: %w" , err )
return nil , err
}
return c , nil
}
func (c *Client ) StartEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a != nil && len (a .NodesList ) > 0
}
func (c *Client ) StartState (e *am .Event ) {
var err error
ctx := c .Mach .NewStateCtx (ssC .Start )
args := ParseArgs (e .Args )
addr := args .NodesList [0 ]
c .nodeList = args .NodesList
c .RpcSuper , err = rpc .NewClient (ctx , addr , GetSuperClientId (c .Name ),
states .SupervisorSchema , &rpc .ClientOpts {
Parent : c .Mach ,
Consumer : c .Mach ,
})
if err != nil {
err := fmt .Errorf ("failed to connect to the Supervisor: %w" , err )
_ = AddErrRpc (c .Mach , err , nil )
return
}
err = errors .Join (
ampipe .BindConnected (c .RpcSuper .Mach , c .Mach , ssC .SuperDisconnected ,
ssC .SuperConnecting , ssC .SuperConnected , ssC .SuperDisconnecting ),
ampipe .BindErr (c .RpcSuper .Mach , c .Mach , ssC .ErrSupervisor ),
ampipe .BindReady (c .RpcSuper .Mach , c .Mach , ssC .SuperReady , "" ),
)
if err != nil {
c .Mach .AddErr (err , nil )
return
}
go func () {
for i , addr := range args .NodesList {
if ctx .Err () != nil {
return
}
c .Mach .Log ("trying node %d: %s" , i , addr )
c .Mach .Remove1 (am .StateException , nil )
c .RpcSuper .Addr = addr
c .RpcSuper .ConnRetries = 3
c .RpcSuper .Start ()
err := amhelp .WaitForAny (ctx , c .ConnTimeout ,
c .Mach .When1 (ssC .SuperReady , nil ),
c .RpcSuper .Mach .WhenNot1 (ssrpc .ClientStates .Start , nil ),
)
if ctx .Err () != nil {
return
}
if c .RpcSuper .Mach .Not1 (ssrpc .ClientStates .Start ) {
c .RpcSuper .Stop (ctx , false )
c .Mach .Remove1 (ssC .Exception , nil )
continue
}
if err != nil {
err := errors .Join (err , c .RpcSuper .Mach .Err ())
_ = AddErrRpc (c .Mach , err , nil )
return
}
}
}()
}
func (c *Client ) StartEnd (e *am .Event ) {
if c .RpcSuper != nil {
c .RpcSuper .Stop (context .TODO (), true )
}
if c .RpcWorker != nil {
c .RpcWorker .Stop (context .TODO (), true )
}
}
func (c *Client ) WorkerRequestedEnter (e *am .Event ) bool {
return c .RpcSuper .NetMach .Is1 (ssS .WorkersAvailable )
}
func (c *Client ) WorkerRequestedState (e *am .Event ) {
c .RpcSuper .NetMach .Add1 (ssS .ProvideWorker , PassRpc (&A {
SuperRpcId : c .RpcSuper .Mach .Id (),
WorkerRpcId : rpc .GetClientId (GetWorkerClientId (c .Name )),
}))
}
func (c *Client ) WorkerPayloadEnter (e *am .Event ) bool {
a := rpc .ParseArgs (e .Args )
return a != nil && a .Name != "" && a .Payload != nil
}
func (c *Client ) WorkerPayloadState (e *am .Event ) {
c .Mach .Remove1 (ssC .WorkerPayload , nil )
args := rpc .ParseArgs (e .Args )
c .log ("worker %s delivered: %s" , args .Payload .Source , args .Name )
if args .Name != ssS .ProvideWorker {
return
}
if c .Mach .Not1 (ssC .WorkerRequested ) {
c .log ("worker not requested" )
return
}
ctx := c .Mach .NewStateCtx (ssC .WorkerRequested )
ctxStart := c .Mach .NewStateCtx (ssC .Start )
addr , ok := args .Payload .Data .(string )
if !ok || addr == "" {
err := errors .New ("invalid worker address" )
c .Mach .AddErrState (ssC .ErrSupervisor , err , nil )
return
}
c .log ("connecting to worker: %s" , addr )
go func () {
var err error
c .RpcWorker , err = rpc .NewClient (ctxStart , addr , GetWorkerClientId (c .Name ),
c .schemaWorker , &rpc .ClientOpts {
Parent : c .Mach ,
Consumer : c .Mach ,
})
if err != nil {
err := fmt .Errorf ("failed to connect to the NetworkMachine: %w" , err )
_ = AddErrRpc (c .Mach , err , nil )
return
}
c .RpcWorker .HelloDelay = 100 * time .Millisecond
err = errors .Join (
ampipe .BindConnected (c .RpcWorker .Mach , c .Mach , ssC .WorkerDisconnected ,
ssC .WorkerConnecting , ssC .WorkerConnected , ssC .WorkerDisconnecting ),
ampipe .BindErr (c .RpcWorker .Mach , c .Mach , ssC .ErrWorker ),
ampipe .BindReady (c .RpcWorker .Mach , c .Mach , ssC .WorkerReady , "" ),
)
if err != nil {
c .Mach .AddErr (err , nil )
return
}
c .RpcWorker .Start ()
err = amhelp .WaitForAny (ctx , c .ConnTimeout ,
c .Mach .When1 (ssC .WorkerReady , nil ),
c .RpcWorker .Mach .WhenErr (ctxStart ),
)
if err != nil {
c .Mach .Remove1 (ssC .WorkerRequested , nil )
return
}
}()
}
func (c *Client ) Start (nodesList []string ) {
c .Mach .Add1 (ssC .Start , Pass (&A {
NodesList : nodesList ,
}))
}
func (c *Client ) Stop (ctx context .Context ) {
if c .RpcSuper != nil {
c .RpcSuper .Stop (ctx , false )
}
if c .RpcWorker != nil {
c .RpcWorker .Stop (ctx , false )
}
c .Mach .Remove1 (ssC .Start , nil )
}
func (c *Client ) ReqWorker (ctx context .Context ) error {
_ , err := amhelp .NewReqAdd1 (c .Mach , ssC .WorkerRequested , nil ).Run (ctx )
if err != nil {
return err
}
err = amhelp .WaitForAll (ctx , c .ConnTimeout ,
c .Mach .When1 (ssC .WorkerReady , nil ))
if err != nil {
return err
}
c .log ("worker connected: %s" , c .RpcWorker .NetMach .Id ())
return nil
}
func (c *Client ) Dispose (ctx context .Context ) {
c .Stop (ctx )
c .RpcSuper = nil
c .RpcWorker = nil
c .Mach .Dispose ()
}
func (c *Client ) log (msg string , args ...any ) {
if !c .LogEnabled {
return
}
c .Mach .Log (msg , args ...)
}
type ClientOpts struct {
Parent am .Api
Tags []string
ClientSchema am .Schema
ClientStates am .S
}
func GetWorkerClientId (name string ) string {
return "nc-worker-" + name
}
func GetSuperClientId (name string ) string {
return "nc-super-" + name
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .