package node
import (
"context"
"errors"
"fmt"
"time"
"github.com/pancsta/asyncmachine-go/internal/utils"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/node/states"
"github.com/pancsta/asyncmachine-go/pkg/rpc"
ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
ampipe "github.com/pancsta/asyncmachine-go/pkg/states/pipes"
)
var (
ssW = states .WorkerStates
sgW = states .WorkerGroups
)
type Worker struct {
*am .ExceptionHandler
Mach *am .Machine
Name string
Kind string
AcceptClient string
ConnTimeout time .Duration
DeliveryTimeout time .Duration
BootAddr string
BootRpc *rpc .Client
LocalAddr string
LocalRpc *rpc .Server
PublicAddr string
PublicRpc *rpc .Server
}
func NewWorker (ctx context .Context , kind string , workerStruct am .Schema ,
stateNames am .S , opts *WorkerOpts ,
) (*Worker , error ) {
if kind == "" {
return nil , errors .New ("worker: kind required" )
}
if stateNames == nil {
return nil , errors .New ("worker: stateNames required" )
}
if workerStruct == nil {
return nil , errors .New ("worker: workerStruct required" )
}
if opts == nil {
opts = &WorkerOpts {}
}
name := fmt .Sprintf ("%s-%s-%s" , kind , utils .Hostname (), utils .RandId (6 ))
w := &Worker {
ConnTimeout : 5 * time .Second ,
DeliveryTimeout : 5 * time .Second ,
Name : name ,
Kind : kind ,
}
if amhelp .IsDebug () {
w .DeliveryTimeout = 10 * w .DeliveryTimeout
}
mach , err := am .NewCommon (ctx , "nw-" +w .Name , workerStruct , stateNames , w ,
opts .Parent , &am .Opts {Tags : []string {"node-worker" }})
if err != nil {
return nil , err
}
mach .SemLogger ().SetArgsMapper (LogArgs )
w .Mach = mach
amhelp .MachDebugEnv (mach )
err = amhelp .Implements (mach .StateNames (), ssW .Names ())
if err != nil {
err := fmt .Errorf (
"client has to implement am/node/states/WorkerStates: %w" , err )
return nil , err
}
return w , nil
}
func (w *Worker ) ErrNetworkState (e *am .Event ) {
}
func (w *Worker ) StartEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a .LocalAddr != ""
}
func (w *Worker ) StartState (e *am .Event ) {
var err error
ctx := w .Mach .NewStateCtx (ssW .Start )
args := ParseArgs (e .Args )
w .BootAddr = args .LocalAddr
opts := &rpc .ServerOpts {
PayloadState : ssW .SuperSendPayload ,
Parent : w .Mach ,
}
w .LocalRpc , err = rpc .NewServer (ctx , "localhost:0" , "nw-loc-" +w .Name , w .Mach ,
opts )
if err != nil {
_ = AddErrRpc (w .Mach , err , nil )
return
}
w .LocalRpc .DeliveryTimeout = w .DeliveryTimeout
err = errors .Join (
rpc .BindServer (w .LocalRpc .Mach , w .Mach , ssW .LocalRpcReady ,
ssW .SuperConnected ),
ampipe .BindErr (w .LocalRpc .Mach , w .Mach , ssW .ErrSupervisor ))
if err != nil {
w .Mach .AddErr (err , nil )
return
}
opts = &rpc .ServerOpts {
PayloadState : ssW .ClientSendPayload ,
Parent : w .Mach ,
}
w .PublicRpc , err = rpc .NewServer (ctx , "0.0.0.0:0" , "nw-pub-" +w .Name ,
w .Mach , opts )
if err != nil {
_ = AddErrRpc (w .Mach , err , nil )
return
}
w .PublicRpc .DeliveryTimeout = w .DeliveryTimeout
err = errors .Join (
rpc .BindServer (w .PublicRpc .Mach , w .Mach , ssW .PublicRpcReady ,
ssW .ClientConnected ),
ampipe .BindErr (w .PublicRpc .Mach , w .Mach , ssW .ErrClient ))
if err != nil {
w .Mach .AddErr (err , nil )
return
}
if w .LocalRpc .Start () != am .Executed {
_ = AddErrRpc (w .Mach , nil , nil )
return
}
if w .PublicRpc .Start () != am .Executed {
_ = AddErrRpc (w .Mach , nil , nil )
return
}
}
func (w *Worker ) StartEnd (e *am .Event ) {
args := ParseArgs (e .Args )
if w .PublicRpc != nil {
w .PublicRpc .Stop (true )
}
if w .LocalRpc != nil {
w .LocalRpc .Stop (true )
}
if w .BootRpc != nil {
go w .BootRpc .Stop (context .TODO (), true )
}
if args .Dispose {
w .Mach .Dispose ()
}
}
func (w *Worker ) LocalRpcReadyState (e *am .Event ) {
w .LocalAddr = w .LocalRpc .Addr
}
func (w *Worker ) PublicRpcReadyState (e *am .Event ) {
w .PublicAddr = w .PublicRpc .Addr
}
func (w *Worker ) RpcReadyState (e *am .Event ) {
var err error
ctx := w .Mach .NewStateCtx (ssW .RpcReady )
w .Mach .EvAdd1 (e , ssW .Ready , nil )
opts := &rpc .ClientOpts {Parent : w .Mach }
w .BootRpc , err = rpc .NewClient (ctx , w .BootAddr , "nw-" +w .Name ,
states .BootstrapSchema , states .BootstrapStates .Names (), opts )
if err != nil {
_ = AddErrRpc (w .Mach , err , nil )
return
}
err = ampipe .BindErr (w .BootRpc .Mach , w .Mach , "" )
if err != nil {
_ = AddErrRpc (w .Mach , err , nil )
return
}
w .BootRpc .Start ()
go func () {
err := amhelp .WaitForAll (ctx , w .ConnTimeout ,
w .BootRpc .Mach .When1 (ssrpc .ClientStates .Ready , nil ))
if ctx .Err () != nil {
return
}
if err != nil {
_ = AddErrRpc (w .Mach , err , nil )
return
}
w .BootRpc .Worker .EvAdd1 (e , ssB .WorkerAddr , PassRpc (&A {
LocalAddr : w .LocalAddr ,
PublicAddr : w .PublicAddr ,
Id : w .Mach .Id (),
}))
w .Mach .Log ("Passed the local port to the bootstrap machine" )
go w .BootRpc .Stop (w .Mach .Ctx (), true )
}()
}
func (w *Worker ) HealthcheckState (e *am .Event ) {
w .Mach .Remove1 (ssW .Healthcheck , nil )
}
func (w *Worker ) ServeClientEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a != nil && a .Id != ""
}
func (w *Worker ) ServeClientState (e *am .Event ) {
w .Mach .Remove1 (ssW .ServeClient , nil )
args := ParseArgs (e .Args )
w .AcceptClient = args .Id
w .PublicRpc .AllowId = w .AcceptClient
}
func (w *Worker ) SendPayloadEnter (e *am .Event ) bool {
return false
}
func (w *Worker ) Start (bootAddr string ) am .Result {
return w .Mach .Add1 (ssW .Start , Pass (&A {LocalAddr : bootAddr }))
}
func (w *Worker ) Stop (dispose bool ) {
w .Mach .Remove1 (ssW .Start , Pass (&A {Dispose : dispose }))
}
type WorkerOpts struct {
Parent am .Api
Tags []string
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .