package node
import (
"context"
"errors"
"fmt"
"os"
"time"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/node/states"
"github.com/pancsta/asyncmachine-go/pkg/rpc"
ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
ampipe "github.com/pancsta/asyncmachine-go/pkg/states/pipes"
)
var ssC = states .ClientStates
type Client struct {
*am .ExceptionHandler
Mach *am .Machine
Name string
SuperAddr string
LogEnabled bool
LeaveSuper bool
ConnTimeout time .Duration
SuperRpc *rpc .Client
WorkerRpc *rpc .Client
nodeList []string
stateDeps *ClientStateDeps
}
var _ ssrpc .ConsumerHandlers = &Client {}
func NewClient (ctx context .Context , id string , workerKind string ,
stateDeps *ClientStateDeps , opts *ClientOpts ,
) (*Client , error ) {
if id == "" {
return nil , errors .New ("client: workerStruct required" )
}
if stateDeps == nil {
return nil , errors .New ("client: stateNames required" )
}
if stateDeps .WorkerSStruct == nil {
return nil , errors .New ("client: workerStruct required" )
}
if stateDeps .WorkerSNames == nil {
return nil , errors .New ("client: stateNames required" )
}
if stateDeps .ClientSStruct == nil {
return nil , errors .New ("client: workerStruct required" )
}
if stateDeps .ClientSNames == nil {
return nil , errors .New ("client: stateNames required" )
}
if workerKind == "" {
return nil , errors .New ("client: workerKind required" )
}
if opts == nil {
opts = &ClientOpts {}
}
err := amhelp .Implements (stateDeps .WorkerSNames , states .WorkerStates .Names ())
if err != nil {
err := fmt .Errorf (
"worker has to implement am/node/states/WorkerStates: %w" , err )
return nil , err
}
name := fmt .Sprintf ("%s-%s-%s" , workerKind , id ,
time .Now ().Format ("150405" ))
c := &Client {
Name : name ,
ConnTimeout : 5 * time .Second ,
LogEnabled : os .Getenv (EnvAmNodeLogClient ) != "" ,
stateDeps : stateDeps ,
}
if amhelp .IsDebug () {
c .ConnTimeout = 10 * c .ConnTimeout
}
mach , err := am .NewCommon (ctx , "nc-" +name , stateDeps .ClientSStruct ,
stateDeps .ClientSNames , c , opts .Parent , &am .Opts {
Tags : []string {"node-client" },
})
if err != nil {
return nil , err
}
mach .SemLogger ().SetArgsMapper (LogArgs )
c .Mach = mach
amhelp .MachDebugEnv (mach )
err = amhelp .Implements (mach .StateNames (), ssC .Names ())
if err != nil {
err := fmt .Errorf (
"client has to implement am/node/states/ClientStates: %w" , err )
return nil , err
}
return c , nil
}
func (c *Client ) StartEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a != nil && len (a .NodesList ) > 0
}
func (c *Client ) StartState (e *am .Event ) {
var err error
ctx := c .Mach .NewStateCtx (ssC .Start )
args := ParseArgs (e .Args )
addr := args .NodesList [0 ]
c .nodeList = args .NodesList
c .SuperRpc , err = rpc .NewClient (ctx , addr , GetSuperClientId (c .Name ),
states .SupervisorSchema , ssS .Names (), &rpc .ClientOpts {
Parent : c .Mach ,
Consumer : c .Mach ,
})
if err != nil {
err := fmt .Errorf ("failed to connect to the Supervisor: %w" , err )
_ = AddErrRpc (c .Mach , err , nil )
return
}
err = errors .Join (
ampipe .BindConnected (c .SuperRpc .Mach , c .Mach , ssC .SuperDisconnected ,
ssC .SuperConnecting , ssC .SuperConnected , ssC .SuperDisconnecting ),
ampipe .BindErr (c .SuperRpc .Mach , c .Mach , ssC .ErrSupervisor ),
ampipe .BindReady (c .SuperRpc .Mach , c .Mach , ssC .SuperReady , "" ),
)
if err != nil {
c .Mach .AddErr (err , nil )
return
}
go func () {
for i , addr := range args .NodesList {
if ctx .Err () != nil {
return
}
c .Mach .Log ("trying node %d: %s" , i , addr )
c .Mach .Remove1 (am .StateException , nil )
c .SuperRpc .Addr = addr
c .SuperRpc .ConnRetries = 3
c .SuperRpc .Start ()
err := amhelp .WaitForAny (ctx , c .ConnTimeout ,
c .Mach .When1 (ssC .SuperReady , nil ),
c .SuperRpc .Mach .WhenNot1 (ssrpc .ClientStates .Start , nil ),
)
if ctx .Err () != nil {
return
}
if c .SuperRpc .Mach .Not1 (ssrpc .ClientStates .Start ) {
c .SuperRpc .Stop (ctx , false )
c .Mach .Remove1 (ssC .Exception , nil )
continue
}
if err != nil {
err := errors .Join (err , c .SuperRpc .Mach .Err ())
_ = AddErrRpc (c .Mach , err , nil )
return
}
}
}()
}
func (c *Client ) StartEnd (e *am .Event ) {
if c .SuperRpc != nil {
c .SuperRpc .Stop (context .TODO (), true )
}
if c .WorkerRpc != nil {
c .WorkerRpc .Stop (context .TODO (), true )
}
}
func (c *Client ) WorkerRequestedEnter (e *am .Event ) bool {
return c .SuperRpc .Worker .Is1 (ssS .WorkersAvailable )
}
func (c *Client ) WorkerRequestedState (e *am .Event ) {
c .SuperRpc .Worker .Add1 (ssS .ProvideWorker , PassRpc (&A {
SuperRpcId : c .SuperRpc .Mach .Id (),
WorkerRpcId : rpc .GetClientId (GetWorkerClientId (c .Name )),
}))
}
func (c *Client ) WorkerPayloadEnter (e *am .Event ) bool {
a := rpc .ParseArgs (e .Args )
return a != nil && a .Name != "" && a .Payload != nil
}
func (c *Client ) WorkerPayloadState (e *am .Event ) {
c .Mach .Remove1 (ssC .WorkerPayload , nil )
args := rpc .ParseArgs (e .Args )
c .log ("worker %s delivered: %s" , args .Payload .Source , args .Name )
if args .Name != ssS .ProvideWorker {
return
}
if c .Mach .Not1 (ssC .WorkerRequested ) {
c .log ("worker not requested" )
return
}
ctx := c .Mach .NewStateCtx (ssC .WorkerRequested )
ctxStart := c .Mach .NewStateCtx (ssC .Start )
addr , ok := args .Payload .Data .(string )
if !ok || addr == "" {
err := errors .New ("invalid worker address" )
c .Mach .AddErrState (ssC .ErrSupervisor , err , nil )
return
}
c .log ("connecting to worker: %s" , addr )
go func () {
var err error
c .WorkerRpc , err = rpc .NewClient (ctxStart , addr , GetWorkerClientId (c .Name ),
c .stateDeps .WorkerSStruct , c .stateDeps .WorkerSNames , &rpc .ClientOpts {
Parent : c .Mach ,
Consumer : c .Mach ,
})
if err != nil {
err := fmt .Errorf ("failed to connect to the Worker: %w" , err )
_ = AddErrRpc (c .Mach , err , nil )
return
}
c .WorkerRpc .HelloDelay = 100 * time .Millisecond
err = errors .Join (
ampipe .BindConnected (c .WorkerRpc .Mach , c .Mach , ssC .WorkerDisconnected ,
ssC .WorkerConnecting , ssC .WorkerConnected , ssC .WorkerDisconnecting ),
ampipe .BindErr (c .WorkerRpc .Mach , c .Mach , ssC .ErrWorker ),
ampipe .BindReady (c .WorkerRpc .Mach , c .Mach , ssC .WorkerReady , "" ),
)
if err != nil {
c .Mach .AddErr (err , nil )
return
}
c .WorkerRpc .Start ()
err = amhelp .WaitForAny (ctx , c .ConnTimeout ,
c .Mach .When1 (ssC .WorkerReady , nil ),
c .WorkerRpc .Mach .WhenErr (ctxStart ),
)
if err != nil {
c .Mach .Remove1 (ssC .WorkerRequested , nil )
return
}
}()
}
func (c *Client ) Start (nodesList []string ) {
c .Mach .Add1 (ssC .Start , Pass (&A {
NodesList : nodesList ,
}))
}
func (c *Client ) Stop (ctx context .Context ) {
if c .SuperRpc != nil {
c .SuperRpc .Stop (ctx , false )
}
if c .WorkerRpc != nil {
c .WorkerRpc .Stop (ctx , false )
}
c .Mach .Remove1 (ssC .Start , nil )
}
func (c *Client ) ReqWorker (ctx context .Context ) error {
_ , err := amhelp .NewReqAdd1 (c .Mach , ssC .WorkerRequested , nil ).Run (ctx )
if err != nil {
return err
}
err = amhelp .WaitForAll (ctx , c .ConnTimeout ,
c .Mach .When1 (ssC .WorkerReady , nil ))
if err != nil {
return err
}
c .log ("worker connected: %s" , c .WorkerRpc .Worker .Id ())
return nil
}
func (c *Client ) Dispose (ctx context .Context ) {
c .Stop (ctx )
c .SuperRpc = nil
c .WorkerRpc = nil
c .Mach .Dispose ()
}
func (c *Client ) log (msg string , args ...any ) {
if !c .LogEnabled {
return
}
c .Mach .Log (msg , args ...)
}
type ClientStateDeps struct {
ClientSStruct am .Schema
ClientSNames am .S
WorkerSStruct am .Schema
WorkerSNames am .S
}
type ClientOpts struct {
Parent am .Api
Tags []string
}
func GetWorkerClientId (name string ) string {
return "nc-worker-" + name
}
func GetSuperClientId (name string ) string {
return "nc-super-" + name
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .