package node
import (
"context"
"errors"
"fmt"
"os"
"slices"
"time"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/node/states"
arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
ampipe "github.com/pancsta/asyncmachine-go/pkg/states/pipes"
)
var ssC = states .ClientStates
type Client struct {
*am .ExceptionHandler
Mach *am .Machine
Name string
SuperAddr string
LogEnabled bool
LeaveSuper bool
ConnTimeout time .Duration
RpcSuper *arpc .Client
RpcWorker *arpc .Client
nodeList []string
schemaClient am .Schema
schemaWorker am .Schema
}
var _ ssrpc .ConsumerHandlers = &Client {}
func NewClient (ctx context .Context , clientId string , workerKind string ,
workerSchema am .Schema , opts *ClientOpts ,
) (*Client , error ) {
if opts == nil {
opts = &ClientOpts {}
}
if opts .ClientSchema == nil {
opts .ClientSchema = states .ClientSchema
}
if opts .ClientStates == nil {
opts .ClientStates = states .ClientStates .Names ()
}
if clientId == "" {
return nil , errors .New ("client: clientId required" )
}
if workerSchema == nil {
return nil , errors .New ("client: workerSchema required" )
}
if workerKind == "" {
return nil , errors .New ("client: workerKind required" )
}
err := amhelp .SchemaImplements (workerSchema , states .WorkerStates .Names ())
if err != nil {
err := fmt .Errorf (
"worker has to implement am/node/states/WorkerStates: %w" , err ,
)
return nil , err
}
name := fmt .Sprintf ("%s-%s-%s" , workerKind , clientId ,
time .Now ().Format ("150405" ))
c := &Client {
Name : name ,
ConnTimeout : 5 * time .Second ,
LogEnabled : os .Getenv (EnvAmNodeLogClient ) != "" ,
schemaClient : opts .ClientSchema ,
schemaWorker : workerSchema ,
}
if amhelp .IsDebug () {
c .ConnTimeout = 10 * c .ConnTimeout
}
mach , err := am .NewCommon (ctx , "nc-" +name , c .schemaClient ,
opts .ClientStates , c , opts .Parent , &am .Opts {
Tags : slices .Concat ([]string {"node-client" }, opts .Tags ),
})
if err != nil {
return nil , err
}
mach .SemLogger ().SetArgsMapper (amhelp .LogArgsMapper )
c .Mach = mach
_ = amhelp .MachDebugEnv (mach )
err = amhelp .Implements (mach .StateNames (), ssC .Names ())
if err != nil {
err := fmt .Errorf (
"client has to implement am/node/states/ClientStates: %w" , err ,
)
return nil , err
}
return c , nil
}
var _ = ssC .Start
func (c *Client ) StartEnter (e *am .Event ) bool {
a := am .ParseArgs [A ](e .Args )
return a != nil && len (a .NodesList ) > 0
}
func (c *Client ) StartState (e *am .Event ) {
var err error
ctx := c .Mach .NewStateCtx (ssC .Start )
args := am .ParseArgs [A ](e .Args )
addr := args .NodesList [0 ]
c .nodeList = args .NodesList
c .RpcSuper , err = arpc .NewClient (ctx , addr , GetSuperClientId (c .Name ),
states .SupervisorSchema , &arpc .ClientOpts {
Parent : c .Mach ,
Consumer : c .Mach ,
})
if err != nil {
err := fmt .Errorf ("failed to connect to the Supervisor: %w" , err )
AddErrRpc (e , c .Mach , err , nil )
return
}
_ , err1 := ampipe .BindConnected (c .RpcSuper .Mach , c .Mach ,
ssC .SuperDisconnected ,
ssC .SuperConnecting , ssC .SuperConnected , ssC .SuperDisconnecting )
_ , err2 := ampipe .BindErr (c .RpcSuper .Mach , c .Mach , ssC .ErrSupervisor )
_ , err3 := ampipe .BindReady (c .RpcSuper .Mach , c .Mach , ssC .SuperReady , "" )
err = errors .Join (err1 , err2 , err3 )
if err != nil {
c .Mach .AddErr (err , nil )
return
}
go func () {
for i , addr := range args .NodesList {
if ctx .Err () != nil {
return
}
c .Mach .Log ("trying node %d: %s" , i , addr )
c .Mach .Remove1 (am .StateException , nil )
c .RpcSuper .Addr = addr
c .RpcSuper .ConnRetries = 3
c .RpcSuper .Start (e )
err := amhelp .WaitForAny (
ctx , c .ConnTimeout ,
c .Mach .When1 (ssC .SuperReady , nil ),
c .RpcSuper .Mach .WhenNot1 (ssrpc .ClientStates .Start , nil ),
)
if ctx .Err () != nil {
return
}
if c .RpcSuper .Mach .Not1 (ssrpc .ClientStates .Start ) {
c .RpcSuper .Stop (ctx , e , false )
c .Mach .Remove1 (ssC .Exception , nil )
continue
}
if err != nil {
err := errors .Join (err , c .RpcSuper .Mach .Err ())
AddErrRpc (e , c .Mach , err , nil )
return
}
}
}()
}
var _ = ssC .Start
func (c *Client ) StartEnd (e *am .Event ) {
if c .RpcSuper != nil {
c .RpcSuper .Stop (context .TODO (), e , true )
}
if c .RpcWorker != nil {
c .RpcWorker .Stop (context .TODO (), e , true )
}
}
var _ = ssC .WorkerRequested
func (c *Client ) WorkerRequestedEnter (e *am .Event ) bool {
return c .RpcSuper .NetMach .Is1 (ssS .WorkersAvailable )
}
func (c *Client ) WorkerRequestedState (e *am .Event ) {
c .RpcSuper .NetMach .Add1 (ssS .ProvideWorker , Pass (&ARpc {
SuperRpcId : c .RpcSuper .Mach .Id (),
WorkerRpcId : arpc .GetClientId (GetWorkerClientId (c .Name )),
}))
}
var _ = ssC .ServerPayload
func (c *Client ) ServerPayloadEnter (e *am .Event ) bool {
a := am .ParseArgs [arpc .AServerPayload ](e .Args )
return a != nil && a .Name != "" && a .Payload != nil
}
func (c *Client ) ServerPayloadState (e *am .Event ) {
c .Mach .Remove1 (ssC .ServerPayload , nil )
args := am .ParseArgs [arpc .AServerPayload ](e .Args )
c .log ("worker %s delivered: %s" , args .Payload .Source , args .Name )
if args .Name != ssS .ProvideWorker {
return
}
if c .Mach .Not1 (ssC .WorkerRequested ) {
c .log ("worker not requested" )
return
}
ctx := c .Mach .NewStateCtx (ssC .WorkerRequested )
ctxStart := c .Mach .NewStateCtx (ssC .Start )
addr , ok := args .Payload .Data .(string )
if !ok || addr == "" {
err := errors .New ("invalid worker address" )
c .Mach .AddErrState (ssC .ErrSupervisor , err , nil )
return
}
c .log ("connecting to worker: %s" , addr )
go func () {
var err error
c .RpcWorker , err = arpc .NewClient (ctxStart , addr , GetWorkerClientId (c .Name ),
c .schemaWorker , &arpc .ClientOpts {
Parent : c .Mach ,
Consumer : c .Mach ,
})
if err != nil {
err := fmt .Errorf ("failed to connect to the NetworkMachine: %w" , err )
AddErrRpc (e , c .Mach , err , nil )
return
}
c .RpcWorker .HelloDelay = 100 * time .Millisecond
_ , err1 := ampipe .BindConnected (c .RpcWorker .Mach , c .Mach ,
ssC .WorkerDisconnected ,
ssC .WorkerConnecting , ssC .WorkerConnected , ssC .WorkerDisconnecting )
_ , err2 := ampipe .BindErr (c .RpcWorker .Mach , c .Mach , ssC .ErrWorker )
_ , err3 := ampipe .BindReady (c .RpcWorker .Mach , c .Mach , ssC .WorkerReady , "" )
err = errors .Join (err1 , err2 , err3 )
if err != nil {
c .Mach .AddErr (err , nil )
return
}
c .RpcWorker .Start (e )
err = amhelp .WaitForAny (
ctx , c .ConnTimeout ,
c .Mach .When1 (ssC .WorkerReady , nil ),
c .RpcWorker .Mach .WhenErr (ctxStart ),
)
if err != nil {
c .Mach .Remove1 (ssC .WorkerRequested , nil )
return
}
}()
}
func (c *Client ) Start (nodesList []string ) {
c .Mach .Add1 (ssC .Start , Pass (&A {
NodesList : nodesList ,
}))
}
func (c *Client ) Stop (ctx context .Context ) {
if c .RpcSuper != nil {
c .RpcSuper .Stop (ctx , nil , false )
}
if c .RpcWorker != nil {
c .RpcWorker .Stop (ctx , nil , false )
}
c .Mach .Remove1 (ssC .Start , nil )
}
func (c *Client ) ReqWorker (ctx context .Context ) error {
_ , err := amhelp .NewReqAdd1 (c .Mach , ssC .WorkerRequested , nil ).Run (ctx )
if err != nil {
return err
}
err = amhelp .WaitForAll (ctx , c .ConnTimeout ,
c .Mach .When1 (ssC .WorkerReady , nil ))
if err != nil {
return err
}
c .log ("worker connected: %s" , c .RpcWorker .NetMach .Id ())
return nil
}
func (c *Client ) Dispose (ctx context .Context ) {
c .Stop (ctx )
c .RpcSuper = nil
c .RpcWorker = nil
c .Mach .Dispose ()
}
func (c *Client ) log (msg string , args ...any ) {
if !c .LogEnabled {
return
}
c .Mach .Log (msg , args ...)
}
type ClientOpts struct {
Parent am .Api
Tags []string
ClientSchema am .Schema
ClientStates am .S
}
func GetWorkerClientId (name string ) string {
return "nc-worker-" + name
}
func GetSuperClientId (name string ) string {
return "nc-super-" + name
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .