package node
import (
"bufio"
"context"
"errors"
"fmt"
"maps"
"net"
"os"
"os/exec"
"slices"
"strconv"
"time"
"golang.org/x/sync/errgroup"
"github.com/pancsta/asyncmachine-go/internal/utils"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/node/states"
"github.com/pancsta/asyncmachine-go/pkg/rpc"
ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
ampipe "github.com/pancsta/asyncmachine-go/pkg/states/pipes"
)
var (
ssS = states .SupervisorStates
ssB = states .BootstrapStates
)
type Supervisor struct {
*am .ExceptionHandler
Mach *am .Machine
WorkerKind string
WorkerBin []string
Name string
LogEnabled bool
Max int
Min int
Warm int
MaxClientWorkers int
WorkerErrTtl time .Duration
WorkerErrRecent time .Duration
WorkerErrKill int
ConnTimeout time .Duration
DeliveryTimeout time .Duration
OpTimeout time .Duration
PoolPause time .Duration
HealthcheckPause time .Duration
Heartbeat time .Duration
WorkerCheckInterval time .Duration
PublicAddr string
PublicMux *rpc .Mux
PublicRpcs map [string ]*rpc .Server
LocalAddr string
LocalRpc *rpc .Server
workers map [string ]*workerInfo
schemaWorker am .Schema
TestFork func (string ) error
TestKill func (string ) error
normalizeStart time .Time
WorkerReadyState am .HandlerFinal
WorkerGoneState am .HandlerFinal
KillWorkerState am .HandlerFinal
ClientSendPayloadState am .HandlerFinal
SuperSendPayloadState am .HandlerFinal
HealthcheckState am .HandlerFinal
}
func NewSupervisor (
ctx context .Context , workerKind string , workerBin []string ,
workerSchema am .Schema , opts *SupervisorOpts ,
) (*Supervisor , error ) {
if len (workerBin ) == 0 || workerBin [0 ] == "" {
return nil , errors .New ("super: workerBin required" )
}
if workerSchema == nil {
return nil , errors .New ("super: workerSchema required" )
}
if opts == nil {
opts = &SupervisorOpts {}
}
err := amhelp .SchemaImplements (workerSchema , states .WorkerStates .Names ())
if err != nil {
err := fmt .Errorf (
"worker has to implement am/node/states/WorkerStates: %w" , err )
return nil , err
}
hostname := utils .Hostname ()
if len (hostname ) > 15 {
hostname = hostname [:15 ]
}
name := fmt .Sprintf ("%s-%s-%s-%d" , workerKind , hostname ,
time .Now ().Format ("150405" ), opts .InstanceNum )
s := &Supervisor {
WorkerKind : workerKind ,
WorkerBin : workerBin ,
Name : name ,
LogEnabled : os .Getenv (EnvAmNodeLogSupervisor ) != "" ,
Max : 10 ,
Min : 2 ,
Warm : 5 ,
MaxClientWorkers : 10 ,
ConnTimeout : 5 * time .Second ,
DeliveryTimeout : 5 * time .Second ,
OpTimeout : time .Second ,
Heartbeat : 1 * time .Minute ,
WorkerCheckInterval : 1 * time .Second ,
PoolPause : 5 * time .Second ,
HealthcheckPause : 500 * time .Millisecond ,
WorkerErrTtl : 10 * time .Minute ,
WorkerErrRecent : 1 * time .Minute ,
WorkerErrKill : 3 ,
PublicRpcs : make (map [string ]*rpc .Server ),
schemaWorker : workerSchema ,
workers : map [string ]*workerInfo {},
}
if amhelp .IsDebug () {
s .DeliveryTimeout = 10 * s .DeliveryTimeout
s .OpTimeout = 10 * s .OpTimeout
}
mach , err := am .NewCommon (ctx , "ns-" +s .Name , states .SupervisorSchema ,
ssS .Names (), s , opts .Parent , &am .Opts {Tags : []string {
"node-supervisor" , "kind:" + workerKind ,
"instance:" + strconv .Itoa (opts .InstanceNum ),
"host:" + utils .Hostname (),
}})
if err != nil {
return nil , err
}
mach .SemLogger ().SetArgsMapper (LogArgs )
s .Mach = mach
_ = amhelp .MachDebugEnv (mach )
mach .AddBreakpoint (am .S {ssS .ErrWorker }, nil , false )
s .WorkerReadyState = amhelp .RemoveMulti (mach , ssS .WorkerReady )
s .WorkerGoneState = amhelp .RemoveMulti (mach , ssS .WorkerGone )
s .KillWorkerState = amhelp .RemoveMulti (mach , ssS .KillWorker )
s .ClientSendPayloadState = amhelp .RemoveMulti (mach , ssS .ClientSendPayload )
s .SuperSendPayloadState = amhelp .RemoveMulti (mach , ssS .SuperSendPayload )
s .HealthcheckState = amhelp .RemoveMulti (mach , ssS .Healthcheck )
err = amhelp .Implements (mach .StateNames (), ssS .Names ())
if err != nil {
err := fmt .Errorf (
"client has to implement am/node/states/SupervisorStates: %w" , err )
return nil , err
}
return s , nil
}
func (s *Supervisor ) ErrWorkerState (e *am .Event ) {
if !s .Mach .WillBe1 (ssS .Exception ) {
s .Mach .Remove (am .S {ssS .ErrWorker , ssS .Exception }, nil )
}
err := am .ParseArgs (e .Args ).Err
args := ParseArgs (e .Args )
w := s .workers [args .LocalAddr ]
if !errors .Is (err , ErrWorkerKill ) && w != nil {
err1 := w .errs .Add (utils .RandId (0 ), err , 0 )
err2 := w .errsRecent .Add (utils .RandId (0 ), err , 0 )
if err := errors .Join (err1 , err2 ); err != nil {
s .Mach .Log ("failed to add error to worker %s: %v" , args .LocalAddr , err )
}
if w .errs .ItemCount () > s .WorkerErrKill {
s .Mach .Add1 (ssS .KillingWorker , Pass (&A {
LocalAddr : args .LocalAddr ,
}))
}
}
if args .Bootstrap != nil {
args .Bootstrap .Dispose ()
}
s .Mach .Remove1 (ssS .PoolReady , nil )
}
func (s *Supervisor ) StartEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a != nil && a .PublicAddr != "" && a .LocalAddr != ""
}
func (s *Supervisor ) ClientConnectedState (e *am .Event ) {
s .Mach .Remove1 (ssS .ClientConnected , nil )
}
func (s *Supervisor ) ClientDisconnectedEnter (e *am .Event ) bool {
a := rpc .ParseArgs (e .Args )
return a != nil && a .Addr != ""
}
func (s *Supervisor ) ClientDisconnectedState (e *am .Event ) {
s .Mach .Remove1 (ssS .ClientDisconnected , nil )
addr := rpc .ParseArgs (e .Args ).Addr
srv , ok := s .PublicRpcs [addr ]
if !ok {
s .log ("client %s disconnected, but not found" , addr )
}
srv .Stop (true )
delete (s .PublicRpcs , addr )
}
func (s *Supervisor ) StartState (e *am .Event ) {
var err error
ctx := s .Mach .NewStateCtx (ssS .Start )
args := ParseArgs (e .Args )
s .LocalAddr = args .LocalAddr
s .PublicAddr = args .PublicAddr
s .PublicMux , err = rpc .NewMux (ctx , "ns-pub-" +s .Name , s .newClientConn ,
&rpc .MuxOpts {Parent : s .Mach })
s .PublicMux .Addr = s .PublicAddr
if err != nil {
_ = AddErrRpc (s .Mach , err , nil )
return
}
opts := &rpc .ServerOpts {
Parent : s .Mach ,
PayloadState : ssS .SuperSendPayload ,
}
s .LocalRpc , err = rpc .NewServer (ctx , s .LocalAddr , "ns-loc-" +s .Name , s .Mach ,
opts )
if err != nil {
_ = AddErrRpc (s .Mach , err , nil )
return
}
s .LocalRpc .DeliveryTimeout = s .DeliveryTimeout
err = rpc .BindServerMulti (s .LocalRpc .Mach , s .Mach , ssS .LocalRpcReady ,
ssS .SuperConnected , ssS .SuperDisconnected )
if err != nil {
_ = AddErrRpc (s .Mach , err , nil )
return
}
s .PublicMux .Start ()
s .LocalRpc .Start ()
go func () {
err := amhelp .WaitForAll (ctx , s .ConnTimeout ,
s .PublicMux .Mach .When1 (ssrpc .MuxStates .Ready , nil ),
s .LocalRpc .Mach .When1 (ssrpc .ServerStates .RpcReady , nil ))
if ctx .Err () != nil {
return
}
if err != nil {
err := errors .Join (err , s .PublicMux .Mach .Err (), s .LocalRpc .Mach .Err ())
_ = AddErrRpc (s .Mach , err , nil )
return
}
select {
case <- ctx .Done ():
return
case <- s .Mach .When1 (ssS .PoolReady , nil ):
}
t := time .NewTicker (s .Heartbeat )
defer t .Stop ()
for {
select {
case <- ctx .Done ():
return
case <- t .C :
s .Mach .Add1 (ssS .Heartbeat , nil )
}
}
}()
}
func (s *Supervisor ) StartEnd (e *am .Event ) {
if s .PublicMux != nil {
s .PublicMux .Stop (true )
}
if s .LocalRpc != nil {
s .LocalRpc .Stop (true )
}
}
func (s *Supervisor ) ForkWorkerEnter (e *am .Event ) bool {
return len (s .workers ) < s .Max
}
func (s *Supervisor ) ForkWorkerState (e *am .Event ) {
s .Mach .Remove1 (ssS .ForkWorker , nil )
ctx := s .Mach .NewStateCtx (ssS .Start )
boot , err := newBootstrap (ctx , s )
if err != nil {
_ = AddErrWorker (nil , s .Mach , err , nil )
return
}
argsOut := &A {Bootstrap : boot }
res := boot .Mach .Add1 (states .BootstrapStates .Start , nil )
if res != am .Executed || boot .Mach .IsErr () {
_ = AddErrWorker (e , s .Mach , ErrWorkerConn , Pass (argsOut ))
return
}
go func () {
err := amhelp .WaitForAll (ctx , s .ConnTimeout ,
boot .server .Mach .When1 (ssrpc .ServerStates .RpcReady , nil ))
if ctx .Err () != nil {
return
}
if err != nil {
_ = AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
s .Mach .Add1 (ssS .ForkingWorker , Pass (argsOut ))
}()
}
func (s *Supervisor ) ForkingWorkerEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a .Bootstrap != nil && a .Bootstrap .Addr () != "" &&
len (s .workers ) < s .Max
}
func (s *Supervisor ) ForkingWorkerState (e *am .Event ) {
s .Mach .Remove1 (ssS .ForkingWorker , nil )
ctx := s .Mach .NewStateCtx (ssS .Start )
args := ParseArgs (e .Args )
boot := args .Bootstrap
bootAddr := boot .Addr ()
argsOut := &A {Bootstrap : boot }
if s .TestFork != nil {
go func () {
if ctx .Err () != nil {
return
}
if err := s .TestFork (bootAddr ); err != nil {
_ = AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
s .Mach .Add1 (ssS .SetWorker , Pass (&A {
WorkerAddr : bootAddr ,
WorkerInfo : newWorkerInfo (s , nil ),
}))
}()
return
}
var cmdArgs []string
if len (s .WorkerBin ) > 1 {
cmdArgs = s .WorkerBin [1 :]
}
cmdArgs = slices .Concat (cmdArgs , []string {"-a" , bootAddr })
s .log ("forking worker %s %s" , s .WorkerBin [0 ], cmdArgs )
cmd := exec .CommandContext (ctx , s .WorkerBin [0 ], cmdArgs ...)
cmd .Env = os .Environ ()
s .Mach .Add1 (ssS .SetWorker , Pass (&A {
WorkerAddr : bootAddr ,
WorkerInfo : newWorkerInfo (s , cmd .Process ),
}))
stderr , err := cmd .StderrPipe ()
if err != nil {
_ = AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
scanner := bufio .NewScanner (stderr )
err = cmd .Start ()
if err != nil {
_ = AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
go func () {
var out string
for scanner .Scan () {
out += scanner .Text () + "\n"
}
err := cmd .Wait ()
if err != nil {
if out != "" {
s .log ("fork error: %s" , out )
}
_ = AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
}()
}
func (s *Supervisor ) WorkerConnectedEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a != nil && a .LocalAddr != ""
}
func (s *Supervisor ) WorkerConnectedState (e *am .Event ) {
s .Mach .Remove1 (ssS .WorkerConnected , nil )
ctx := s .Mach .NewStateCtx (ssS .Start )
args := ParseArgs (e .Args )
argsOut := *args
go func () {
if ctx .Err () != nil {
return
}
workerAddr := args .LocalAddr
_ , port , err := net .SplitHostPort (workerAddr )
if err != nil {
_ = AddErrWorker (e , s .Mach , err , Pass (&argsOut ))
return
}
wrpc , err := rpc .NewClient (ctx , workerAddr , s .Name +"-" +port ,
s .schemaWorker , &rpc .ClientOpts {Parent : s .Mach })
if err != nil {
_ = AddErrWorker (e , s .Mach , err , Pass (&argsOut ))
return
}
wrpc .Start ()
err = amhelp .WaitForErrAny (ctx , s .ConnTimeout , wrpc .Mach ,
wrpc .Mach .When1 (ssC .Ready , ctx ))
if ctx .Err () != nil {
return
}
if err != nil {
_ = AddErrWorker (e , s .Mach , wrpc .Mach .Err (), Pass (&argsOut ))
return
}
argsOut .WorkerRpc = wrpc
s .Mach .Add1 (ssS .WorkerForked , Pass (&argsOut ))
}()
}
func (s *Supervisor ) WorkerForkedEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a != nil && a .LocalAddr != "" && a .WorkerRpc != nil
}
func (s *Supervisor ) WorkerForkedState (e *am .Event ) {
s .Mach .Remove1 (ssS .WorkerForked , nil )
args := ParseArgs (e .Args )
addr := args .LocalAddr
bootAddr := args .BootAddr
wrpc := args .WorkerRpc
argsOut := &A {
LocalAddr : addr ,
Id : wrpc .Mach .Id (),
}
info , ok := s .workers [bootAddr ]
if !ok {
_ = AddErrWorker (e , s .Mach , ErrWorkerMissing , Pass (argsOut ))
return
}
delete (s .workers , bootAddr )
info .rpc = wrpc
info .w = wrpc .NetMach
info .publicAddr = args .PublicAddr
info .localAddr = addr
s .workers [addr ] = info
err := errors .Join (
ampipe .BindReady (wrpc .Mach , s .Mach , ssS .WorkerReady , "" ),
wrpc .Mach .BindHandlers (&struct {
ExceptionState am .HandlerFinal
ReadyEnd am .HandlerFinal
}{
ExceptionState : func (e *am .Event ) {
_ = AddErrWorker (e , s .Mach , wrpc .Mach .Err (), Pass (argsOut ))
},
ReadyEnd : func (e *am .Event ) {
s .Mach .EvAdd1 (e , ssS .KillWorker , Pass (argsOut ))
},
}),
)
if err != nil {
_ = AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
wrpc .NetMach .Add1 (ssW .Healthcheck , nil )
s .Mach .Add1 (ssS .PoolReady , nil )
}
func (s *Supervisor ) KillingWorkerEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a != nil && a .LocalAddr != ""
}
func (s *Supervisor ) KillingWorkerState (e *am .Event ) {
s .Mach .Remove1 (ssS .KillingWorker , nil )
addr := ParseArgs (e .Args ).LocalAddr
argsOut := &A {LocalAddr : addr }
if s .TestKill != nil {
if err := s .TestKill (addr ); err != nil {
s .Mach .AddErr (err , Pass (argsOut ))
return
}
return
}
w , ok := s .workers [addr ]
if !ok {
_ = AddErrWorker (e , s .Mach , ErrWorkerMissing , Pass (argsOut ))
return
}
err := w .proc .Kill ()
if err != nil {
_ = AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
s .Mach .Add1 (ssS .WorkerKilled , Pass (argsOut ))
}
func (s *Supervisor ) WorkerKilledEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a != nil && a .LocalAddr != ""
}
func (s *Supervisor ) WorkerKilledState (e *am .Event ) {
s .Mach .Remove1 (ssS .WorkerKilled , nil )
args := ParseArgs (e .Args )
delete (s .workers , args .LocalAddr )
s .Mach .Remove1 (ssS .PoolReady , nil )
}
func (s *Supervisor ) PoolReadyEnter (e *am .Event ) bool {
return len (s .readyWorkers ()) >= s .min ()
}
func (s *Supervisor ) PoolReadyExit (e *am .Event ) bool {
return len (s .readyWorkers ()) < s .min ()
}
func (s *Supervisor ) HeartbeatState (e *am .Event ) {
ctx := s .Mach .NewStateCtx (ssS .Heartbeat )
ready := s .readyWorkers ()
if ready == nil {
s .Mach .Remove1 (ssS .Heartbeat , nil )
return
}
go func () {
defer s .Mach .Remove1 (ssS .Heartbeat , nil )
eg , parCtx := errgroup .WithContext (ctx )
for _ , info := range ready {
eg .Go (func () error {
ok := false
tick := info .w .Tick (ssW .Healthcheck )
for i := 0 ; i < 3 ; i ++ {
info .w .Add1 (ssW .Healthcheck , nil )
if ctx .Err () != nil {
return ctx .Err ()
}
if tick < info .w .Tick (ssW .Healthcheck ) {
ok = true
break
}
_ = amhelp .Wait (ctx , s .HealthcheckPause )
}
if !ok {
return AddErrWorker (e , s .Mach , ErrWorkerHealth , Pass (&A {
LocalAddr : info .localAddr ,
Id : info .w .Id (),
}))
}
return nil
})
}
go eg .Wait ()
err := amhelp .WaitForAll (ctx , s .ConnTimeout , parCtx .Done ())
if err != nil {
_ = AddErrPool (s .Mach , fmt .Errorf ("%w: %w" , ErrHeartbeat , err ), nil )
return
}
s .Mach .Add1 (ssS .PoolReady , nil )
s .Mach .Remove1 (ssS .PoolReady , nil )
idle , err := s .Workers (ctx , StateIdle )
if ctx .Err () != nil {
return
}
if err != nil {
_ = AddErrWorker (e , s .Mach , err , nil )
return
}
if len (idle ) > 0 {
s .Mach .Add1 (ssS .WorkersAvailable , nil )
} else {
s .Mach .Remove1 (ssS .WorkersAvailable , nil )
}
}()
}
func (s *Supervisor ) NormalizingPoolState (e *am .Event ) {
ctx := s .Mach .NewStateCtx (ssS .NormalizingPool )
s .normalizeStart = time .Now ()
go func () {
if ctx .Err () != nil {
return
}
defer s .Mach .Add1 (ssS .PoolNormalized , nil )
ready := false
for i := 0 ; i < 5 ; i ++ {
existing , err := s .Workers (ctx , "" )
if ctx .Err () != nil {
return
}
if err != nil {
continue
}
for ii := len (existing ); ii < s .min ()+s .Warm && ii < s .Max ; ii ++ {
s .Mach .Add1 (ssS .ForkWorker , nil )
}
check := func () bool {
if ctx .Err () != nil {
return false
}
ready , _ := s .Workers (ctx , StateReady )
if ctx .Err () != nil {
return false
}
if len (ready ) >= s .min () {
s .Mach .Add1 (ssS .PoolReady , nil )
return false
}
return true
}
_ = amhelp .Interval (ctx , s .ConnTimeout , s .WorkerCheckInterval , check )
if ready {
break
}
if !amhelp .Wait (ctx , s .PoolPause ) {
return
}
ready = s .Mach .Is1 (ssS .PoolReady )
if ready {
break
}
s .log ("failed to normalize pool, round %d" , i )
}
if !ready {
_ = AddErrPoolStr (s .Mach , "failed to normalize pool" , nil )
}
s .normalizeStart = time .Time {}
}()
}
func (s *Supervisor ) ProvideWorkerEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a .WorkerRpcId != "" && a .SuperRpcId != ""
}
func (s *Supervisor ) ProvideWorkerState (e *am .Event ) {
s .Mach .Remove1 (ssS .ProvideWorker , nil )
args := ParseArgs (e .Args )
ctx := s .Mach .NewStateCtx (ssS .Start )
idle := s .idleWorkers ()
go func () {
if ctx .Err () != nil {
return
}
for _ , info := range idle {
res := amhelp .Add1Sync (ctx , info .rpc .NetMach , ssW .ServeClient ,
PassRpc (&A {Id : args .WorkerRpcId }))
if ctx .Err () != nil {
return
}
if res != am .Executed {
s .log ("worker %s rejected %s" , info .rpc .NetMach .Id (), args .WorkerRpcId )
continue
}
s .Mach .Add1 (ssS .ClientSendPayload , rpc .PassRpc (&rpc .A {
Name : "worker_addr" ,
Payload : &rpc .MsgSrvPayload {
Name : ssS .ProvideWorker ,
Source : s .Mach .Id (),
Destination : args .SuperRpcId ,
Data : info .publicAddr ,
},
}))
s .log ("worker %s provided to %s" , info .rpc .NetMach .Id (), args .SuperRpcId )
break
}
}()
}
func (s *Supervisor ) ListWorkersEnter (e *am .Event ) bool {
ch := ParseArgs (e .Args ).WorkersCh
return ch != nil && cap (ch ) > 0
}
func (s *Supervisor ) ListWorkersState (e *am .Event ) {
s .Mach .Remove1 (ssS .ListWorkers , nil )
args := ParseArgs (e .Args )
switch args .WorkerState {
case StateReady :
args .WorkersCh <- s .readyWorkers ()
case StateIniting :
args .WorkersCh <- s .initingWorkers ()
case StateBusy :
args .WorkersCh <- s .busyWorkers ()
case StateIdle :
args .WorkersCh <- s .idleWorkers ()
default :
args .WorkersCh <- slices .Collect (maps .Values (s .workers ))
}
}
func (s *Supervisor ) SetWorkerEnter (e *am .Event ) bool {
return ParseArgs (e .Args ).WorkerAddr != ""
}
func (s *Supervisor ) SetWorkerState (e *am .Event ) {
s .Mach .Remove1 (ssS .SetWorker , nil )
args := ParseArgs (e .Args )
addr := args .WorkerAddr
if args .WorkerInfo != nil {
s .workers [addr ] = args .WorkerInfo
} else {
delete (s .workers , addr )
}
}
func (s *Supervisor ) Start (publicAddr string ) {
s .Mach .Add1 (ssS .Start , Pass (&A {
LocalAddr : "localhost:0" ,
PublicAddr : publicAddr ,
}))
}
func (s *Supervisor ) Stop () {
s .Mach .Remove1 (ssS .Start , nil )
s .Mach .Dispose ()
}
func (s *Supervisor ) SetPool (min , max , warm , maxPerClient int ) {
if max < min {
min = max
}
s .Min = min
s .Max = max
s .Warm = warm
if maxPerClient == 0 {
maxPerClient = s .Max
}
s .MaxClientWorkers = maxPerClient
s .CheckPool ()
}
func (s *Supervisor ) CheckPool () bool {
s .Mach .Add1 (ssS .NormalizingPool , nil )
s .Mach .Add1 (ssS .PoolReady , nil )
return s .Mach .Is1 (ssS .PoolReady )
}
func (s *Supervisor ) Workers (
ctx context .Context , state WorkerState ,
) ([]*workerInfo , error ) {
wCh := make (chan []*workerInfo , 1 )
res := s .Mach .Add1 (ssS .ListWorkers , Pass (&A {
WorkersCh : wCh ,
WorkerState : state ,
}))
if res == am .Canceled {
return nil , fmt .Errorf ("listing workers: %w" , am .ErrCanceled )
}
select {
case <- ctx .Done ():
return nil , nil
case <- time .After (s .OpTimeout ):
return nil , fmt .Errorf ("listing workers: %w" , am .ErrTimeout )
case workers := <- wCh :
return workers , nil
}
}
func (s *Supervisor ) Dispose () {
s .Mach .Dispose ()
}
func (s *Supervisor ) initingWorkers () []*workerInfo {
var ret []*workerInfo
for _ , info := range s .workers {
if info .rpc == nil {
ret = append (ret , info )
}
}
return ret
}
func (s *Supervisor ) rpcWorkers () []*workerInfo {
var ret []*workerInfo
for _ , info := range s .workers {
if info .rpc != nil && info .rpc .NetMach != nil {
ret = append (ret , info )
}
}
return ret
}
func (s *Supervisor ) idleWorkers () []*workerInfo {
var ret []*workerInfo
for _ , info := range s .rpcWorkers () {
w := info .rpc .NetMach
if !info .hasErrs () && w .Is1 (ssW .Idle ) {
ret = append (ret , info )
}
}
return ret
}
func (s *Supervisor ) busyWorkers () []*workerInfo {
var ret []*workerInfo
for _ , info := range s .rpcWorkers () {
w := info .rpc .NetMach
if !info .hasErrs () && info .rpc != nil &&
w .Any1 (sgW .WorkStatus ...) && !w .Is1 (ssW .Idle ) {
ret = append (ret , info )
}
}
return ret
}
func (s *Supervisor ) readyWorkers () []*workerInfo {
var ret []*workerInfo
for _ , info := range s .rpcWorkers () {
w := info .rpc .NetMach
if !info .hasErrs () && info .rpc != nil && w .Is1 (ssW .Ready ) {
ret = append (ret , info )
}
}
return ret
}
func (s *Supervisor ) log (msg string , args ...any ) {
if !s .LogEnabled {
return
}
s .Mach .Log (msg , args ...)
}
func (s *Supervisor ) min () int {
if s .Min > s .Max {
return s .Max
}
return s .Min
}
func (s *Supervisor ) newClientConn (
num int , conn net .Conn ,
) (*rpc .Server , error ) {
s .log ("new client connection %d" , num )
ctx := s .Mach .NewStateCtx (ssS .Start )
name := fmt .Sprintf ("ns-pub-%d-%s" , num , s .Name )
opts := &rpc .ServerOpts {
Parent : s .PublicMux .Mach ,
PayloadState : ssS .ClientSendPayload ,
}
rpcS , err := rpc .NewServer (ctx , s .PublicAddr , name , s .Mach , opts )
if err != nil {
return nil , err
}
rpcS .DeliveryTimeout = s .DeliveryTimeout
err = rpc .BindServerMulti (rpcS .Mach , s .Mach , ssS .PublicRpcReady ,
ssS .ClientConnected , ssS .ClientDisconnected )
if err != nil {
return nil , err
}
ok := s .Mach .Eval ("newClientConn" , func () {
s .PublicRpcs [conn .RemoteAddr ().String ()] = rpcS
}, ctx )
if !ok {
return nil , am .ErrHandlerTimeout
}
s .log ("new client connection %d ready" , num )
return rpcS , nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .