package node
import (
"bufio"
"context"
"errors"
"fmt"
"maps"
"net"
"os"
"os/exec"
"slices"
"strconv"
"time"
"golang.org/x/sync/errgroup"
"github.com/pancsta/asyncmachine-go/internal/utils"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/node/states"
arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
ampipe "github.com/pancsta/asyncmachine-go/pkg/states/pipes"
)
var (
ssS = states .SupervisorStates
ssB = states .BootstrapStates
)
type Supervisor struct {
*am .ExceptionHandler
Mach *am .Machine
WorkerKind string
WorkerBin []string
Name string
LogEnabled bool
Max int
Min int
Warm int
MaxClientWorkers int
WorkerErrTtl time .Duration
WorkerErrRecent time .Duration
WorkerErrKill int
ConnTimeout time .Duration
DeliveryTimeout time .Duration
OpTimeout time .Duration
PoolPause time .Duration
HealthcheckPause time .Duration
Heartbeat time .Duration
WorkerCheckInterval time .Duration
PublicAddr string
PublicMux *arpc .Mux
PublicRpcs map [string ]*arpc .Server
LocalAddr string
LocalRpc *arpc .Server
workers map [string ]*workerInfo
schemaWorker am .Schema
TestFork func (string ) error
TestKill func (string ) error
normalizeStart time .Time
WorkerReadyState am .HandlerFinal
WorkerGoneState am .HandlerFinal
KillWorkerState am .HandlerFinal
HealthcheckState am .HandlerFinal
}
func NewSupervisor (
ctx context .Context , workerKind string , workerBin []string ,
workerSchema am .Schema , opts *SupervisorOpts ,
) (*Supervisor , error ) {
if len (workerBin ) == 0 || workerBin [0 ] == "" {
return nil , errors .New ("super: workerBin required" )
}
if workerSchema == nil {
return nil , errors .New ("super: workerSchema required" )
}
if opts == nil {
opts = &SupervisorOpts {}
}
err := amhelp .SchemaImplements (workerSchema , states .WorkerStates .Names ())
if err != nil {
err := fmt .Errorf (
"worker has to implement am/node/states/WorkerStates: %w" , err ,
)
return nil , err
}
hostname := utils .Hostname ()
if len (hostname ) > 15 {
hostname = hostname [:15 ]
}
name := fmt .Sprintf ("%s-%s-%s-%d" , workerKind , hostname ,
time .Now ().Format ("150405" ), opts .InstanceNum )
s := &Supervisor {
WorkerKind : workerKind ,
WorkerBin : workerBin ,
Name : name ,
LogEnabled : os .Getenv (EnvAmNodeLogSupervisor ) != "" ,
Max : 10 ,
Min : 2 ,
Warm : 5 ,
MaxClientWorkers : 10 ,
ConnTimeout : 5 * time .Second ,
DeliveryTimeout : 5 * time .Second ,
OpTimeout : time .Second ,
Heartbeat : 1 * time .Minute ,
WorkerCheckInterval : 1 * time .Second ,
PoolPause : 5 * time .Second ,
HealthcheckPause : 500 * time .Millisecond ,
WorkerErrTtl : 10 * time .Minute ,
WorkerErrRecent : 1 * time .Minute ,
WorkerErrKill : 3 ,
PublicRpcs : make (map [string ]*arpc .Server ),
schemaWorker : workerSchema ,
workers : map [string ]*workerInfo {},
}
if amhelp .IsDebug () {
s .DeliveryTimeout = 10 * s .DeliveryTimeout
s .OpTimeout = 10 * s .OpTimeout
}
mach , err := am .NewCommon (ctx , "ns-" +s .Name , states .SupervisorSchema ,
ssS .Names (), s , opts .Parent , &am .Opts {Tags : []string {
"node-supervisor" , "kind:" + workerKind ,
"instance:" + strconv .Itoa (opts .InstanceNum ),
"host:" + utils .Hostname (),
}})
if err != nil {
return nil , err
}
mach .SemLogger ().SetArgsMapper (amhelp .LogArgsMapper )
s .Mach = mach
_ = amhelp .MachDebugEnv (mach )
mach .AddBreakpoint (am .S {ssS .ErrWorker }, nil , false )
s .WorkerReadyState = amhelp .RemoveMulti (mach , ssS .WorkerReady )
s .WorkerGoneState = amhelp .RemoveMulti (mach , ssS .WorkerGone )
s .KillWorkerState = amhelp .RemoveMulti (mach , ssS .KillWorker )
s .HealthcheckState = amhelp .RemoveMulti (mach , ssS .Healthcheck )
err = amhelp .Implements (mach .StateNames (), ssS .Names ())
if err != nil {
err := fmt .Errorf (
"client has to implement am/node/states/SupervisorStates: %w" , err ,
)
return nil , err
}
return s , nil
}
var _ = ssS .ErrWorker
func (s *Supervisor ) ErrWorkerState (e *am .Event ) {
if !s .Mach .WillBe1 (ssS .Exception ) {
s .Mach .Remove (am .S {ssS .ErrWorker , ssS .Exception }, nil )
}
err := am .ParseArgs [am .AException ](e .Args ).Err
args := am .ParseArgs [A ](e .Args )
w := s .workers [args .LocalAddr ]
if !errors .Is (err , ErrWorkerKill ) && w != nil {
err1 := w .errs .Add (utils .RandId (0 ), err , 0 )
err2 := w .errsRecent .Add (utils .RandId (0 ), err , 0 )
if err := errors .Join (err1 , err2 ); err != nil {
s .Mach .Log ("failed to add error to worker %s: %v" , args .LocalAddr , err )
}
if w .errs .ItemCount () > s .WorkerErrKill {
s .Mach .Add1 (ssS .KillingWorker , Pass (&A {
LocalAddr : args .LocalAddr ,
}))
}
}
if args .Bootstrap != nil {
args .Bootstrap .Dispose ()
}
s .Mach .Remove1 (ssS .PoolReady , nil )
}
var _ = ssS .ClientConnected
func (s *Supervisor ) ClientConnectedState (e *am .Event ) {
s .Mach .Remove1 (ssS .ClientConnected , nil )
}
var _ = ssS .ClientDisconnected
func (s *Supervisor ) ClientDisconnectedEnter (e *am .Event ) bool {
a := am .ParseArgs [arpc .A ](e .Args )
return a != nil && a .Addr != ""
}
func (s *Supervisor ) ClientDisconnectedState (e *am .Event ) {
s .Mach .Remove1 (ssS .ClientDisconnected , nil )
addr := am .ParseArgs [arpc .A ](e .Args ).Addr
srv , ok := s .PublicRpcs [addr ]
if !ok {
s .log ("client %s disconnected, but not found" , addr )
}
srv .Stop (e , true )
delete (s .PublicRpcs , addr )
}
var _ = ssS .Start
func (s *Supervisor ) StartEnter (e *am .Event ) bool {
a := am .ParseArgs [A ](e .Args )
return a != nil && a .PublicAddr != "" && a .LocalAddr != ""
}
func (s *Supervisor ) StartState (e *am .Event ) {
var err error
ctx := s .Mach .NewStateCtx (ssS .Start )
args := am .ParseArgs [A ](e .Args )
s .LocalAddr = args .LocalAddr
s .PublicAddr = args .PublicAddr
s .PublicMux , err = arpc .NewMux (ctx , s .PublicAddr , "ns-pub-" +s .Name , nil ,
&arpc .MuxOpts {
Parent : s .Mach ,
NewServerFn : s .newClientConn ,
})
if err != nil {
AddErrRpc (e , s .Mach , err , nil )
return
}
opts := &arpc .ServerOpts {
Parent : s .Mach ,
}
s .LocalRpc , err = arpc .NewServer (ctx , s .LocalAddr , "ns-loc-" +s .Name , s .Mach ,
opts )
if err != nil {
AddErrRpc (e , s .Mach , err , nil )
return
}
s .LocalRpc .DeliveryTimeout = s .DeliveryTimeout
_, err = arpc .BindServerMulti (s .LocalRpc .Mach , s .Mach , ssS .LocalRpcReady ,
ssS .SuperConnected , ssS .SuperDisconnected )
if err != nil {
AddErrRpc (e , s .Mach , err , nil )
return
}
s .PublicMux .Start (e )
s .LocalRpc .Start (e )
go func () {
err := amhelp .WaitForAll (ctx , s .ConnTimeout ,
s .PublicMux .Mach .When1 (ssrpc .MuxStates .Ready , nil ),
s .LocalRpc .Mach .When1 (ssrpc .ServerStates .RpcReady , nil ))
if ctx .Err () != nil {
return
}
if err != nil {
err := errors .Join (err , s .PublicMux .Mach .Err (), s .LocalRpc .Mach .Err ())
AddErrRpc (e , s .Mach , err , nil )
return
}
select {
case <- ctx .Done ():
return
case <- s .Mach .When1 (ssS .PoolReady , nil ):
}
t := time .NewTicker (s .Heartbeat )
defer t .Stop ()
for {
select {
case <- ctx .Done ():
return
case <- t .C :
s .Mach .Add1 (ssS .Heartbeat , nil )
}
}
}()
}
func (s *Supervisor ) StartEnd (e *am .Event ) {
if s .PublicMux != nil {
s .PublicMux .Stop (e , true )
}
if s .LocalRpc != nil {
s .LocalRpc .Stop (e , true )
}
}
var _ = ssS .ForkWorker
func (s *Supervisor ) ForkWorkerEnter (e *am .Event ) bool {
return len (s .workers ) < s .Max
}
func (s *Supervisor ) ForkWorkerState (e *am .Event ) {
s .Mach .Remove1 (ssS .ForkWorker , nil )
ctx := s .Mach .NewStateCtx (ssS .Start )
boot , err := newBootstrap (ctx , s )
if err != nil {
AddErrWorker (nil , s .Mach , err , nil )
return
}
argsOut := &A {Bootstrap : boot }
res := boot .Mach .Add1 (ssB .Start , nil )
if res != am .Executed || boot .Mach .IsErr () {
AddErrWorker (e , s .Mach , ErrWorkerConn , Pass (argsOut ))
return
}
go func () {
err := amhelp .WaitForAll (ctx , s .ConnTimeout ,
boot .server .Mach .When1 (ssrpc .ServerStates .RpcReady , nil ))
if ctx .Err () != nil {
return
}
if err != nil {
AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
s .Mach .EvAdd1 (e , ssS .ForkingWorker , Pass (argsOut ))
}()
}
var _ = ssS .ForkingWorker
func (s *Supervisor ) ForkingWorkerEnter (e *am .Event ) bool {
a := am .ParseArgs [A ](e .Args )
return a .Bootstrap != nil && a .Bootstrap .Addr () != "" &&
len (s .workers ) < s .Max
}
func (s *Supervisor ) ForkingWorkerState (e *am .Event ) {
s .Mach .Remove1 (ssS .ForkingWorker , nil )
ctx := s .Mach .NewStateCtx (ssS .Start )
args := am .ParseArgs [A ](e .Args )
boot := args .Bootstrap
bootAddr := boot .Addr ()
argsOut := &A {Bootstrap : boot }
if s .TestFork != nil {
go func () {
if ctx .Err () != nil {
return
}
if err := s .TestFork (bootAddr ); err != nil {
AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
s .Mach .Add1 (ssS .SetWorker , Pass (&A {
WorkerAddr : bootAddr ,
WorkerInfo : newWorkerInfo (s , nil ),
}))
}()
return
}
var cmdArgs []string
if len (s .WorkerBin ) > 1 {
cmdArgs = s .WorkerBin [1 :]
}
cmdArgs = slices .Concat (cmdArgs , []string {"-a" , bootAddr })
s .log ("forking worker %s %s" , s .WorkerBin [0 ], cmdArgs )
cmd := exec .CommandContext (ctx , s .WorkerBin [0 ], cmdArgs ...)
cmd .Env = os .Environ ()
s .Mach .Add1 (ssS .SetWorker , Pass (&A {
WorkerAddr : bootAddr ,
WorkerInfo : newWorkerInfo (s , cmd .Process ),
}))
stderr , err := cmd .StderrPipe ()
if err != nil {
AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
scanner := bufio .NewScanner (stderr )
err = cmd .Start ()
if err != nil {
AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
go func () {
var out string
for scanner .Scan () {
out += scanner .Text () + "\n"
}
err := cmd .Wait ()
if err != nil {
if out != "" {
s .log ("fork error: %s" , out )
}
AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
}()
}
var _ = ssS .WorkerConnected
func (s *Supervisor ) WorkerConnectedEnter (e *am .Event ) bool {
a := am .ParseArgs [A ](e .Args )
return a != nil && a .LocalAddr != ""
}
func (s *Supervisor ) WorkerConnectedState (e *am .Event ) {
s .Mach .Remove1 (ssS .WorkerConnected , nil )
ctx := s .Mach .NewStateCtx (ssS .Start )
args := am .ParseArgs [A ](e .Args )
argsOut := *args
go func () {
if ctx .Err () != nil {
return
}
workerAddr := args .LocalAddr
_ , port , err := net .SplitHostPort (workerAddr )
if err != nil {
AddErrWorker (e , s .Mach , err , Pass (&argsOut ))
return
}
wrpc , err := arpc .NewClient (ctx , workerAddr , s .Name +"-" +port ,
s .schemaWorker , &arpc .ClientOpts {Parent : s .Mach })
if err != nil {
AddErrWorker (e , s .Mach , err , Pass (&argsOut ))
return
}
wrpc .Start (e )
err = amhelp .WaitForErrAny (ctx , s .ConnTimeout , wrpc .Mach ,
wrpc .Mach .When1 (ssC .Ready , ctx ))
if ctx .Err () != nil {
return
}
if err != nil {
AddErrWorker (e , s .Mach , wrpc .Mach .Err (), Pass (&argsOut ))
return
}
argsOut .WorkerRpc = wrpc
s .Mach .Add1 (ssS .WorkerForked , Pass (&argsOut ))
}()
}
var _ = ssS .WorkerForked
func (s *Supervisor ) WorkerForkedEnter (e *am .Event ) bool {
a := am .ParseArgs [A ](e .Args )
return a != nil && a .LocalAddr != "" && a .WorkerRpc != nil
}
func (s *Supervisor ) WorkerForkedState (e *am .Event ) {
s .Mach .Remove1 (ssS .WorkerForked , nil )
args := am .ParseArgs [A ](e .Args )
addr := args .LocalAddr
bootAddr := args .BootAddr
wrpc := args .WorkerRpc
argsOut := &A {
LocalAddr : addr ,
Id : wrpc .Mach .Id (),
}
info , ok := s .workers [bootAddr ]
if !ok {
AddErrWorker (e , s .Mach , ErrWorkerMissing , Pass (argsOut ))
return
}
delete (s .workers , bootAddr )
info .rpc = wrpc
info .w = wrpc .NetMach
info .publicAddr = args .PublicAddr
info .localAddr = addr
s .workers [addr ] = info
_ , err1 := ampipe .BindReady (wrpc .Mach , s .Mach , ssS .WorkerReady , "" )
_ , err2 := wrpc .Mach .HandlersBind (&struct {
ExceptionState am .HandlerFinal
ReadyEnd am .HandlerFinal
}{
ExceptionState : func (e *am .Event ) {
AddErrWorker (e , s .Mach , wrpc .Mach .Err (), Pass (argsOut ))
},
ReadyEnd : func (e *am .Event ) {
s .Mach .EvAdd1 (e , ssS .KillWorker , Pass (argsOut ))
},
})
err := errors .Join (err1 , err2 )
if err != nil {
AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
wrpc .NetMach .Add1 (ssW .Healthcheck , nil )
s .Mach .Add1 (ssS .PoolReady , nil )
}
var _ = ssS .KillingWorker
func (s *Supervisor ) KillingWorkerEnter (e *am .Event ) bool {
a := am .ParseArgs [A ](e .Args )
return a != nil && a .LocalAddr != ""
}
func (s *Supervisor ) KillingWorkerState (e *am .Event ) {
s .Mach .Remove1 (ssS .KillingWorker , nil )
addr := am .ParseArgs [A ](e .Args ).LocalAddr
argsOut := &A {LocalAddr : addr }
if s .TestKill != nil {
if err := s .TestKill (addr ); err != nil {
s .Mach .AddErr (err , Pass (argsOut ))
return
}
return
}
w , ok := s .workers [addr ]
if !ok {
AddErrWorker (e , s .Mach , ErrWorkerMissing , Pass (argsOut ))
return
}
err := w .proc .Kill ()
if err != nil {
AddErrWorker (e , s .Mach , err , Pass (argsOut ))
return
}
s .Mach .Add1 (ssS .WorkerKilled , Pass (argsOut ))
}
var _ = ssS .WorkerKilled
func (s *Supervisor ) WorkerKilledEnter (e *am .Event ) bool {
a := am .ParseArgs [A ](e .Args )
return a != nil && a .LocalAddr != ""
}
func (s *Supervisor ) WorkerKilledState (e *am .Event ) {
s .Mach .Remove1 (ssS .WorkerKilled , nil )
args := am .ParseArgs [A ](e .Args )
delete (s .workers , args .LocalAddr )
s .Mach .Remove1 (ssS .PoolReady , nil )
}
var _ = ssS .PoolReady
func (s *Supervisor ) PoolReadyEnter (e *am .Event ) bool {
return len (s .readyWorkers ()) >= s .min ()
}
func (s *Supervisor ) PoolReadyExit (e *am .Event ) bool {
return len (s .readyWorkers ()) < s .min ()
}
var _ = ssS .Heartbeat
func (s *Supervisor ) HeartbeatState (e *am .Event ) {
ctx := s .Mach .NewStateCtx (ssS .Heartbeat )
ready := s .readyWorkers ()
if ready == nil {
s .Mach .Remove1 (ssS .Heartbeat , nil )
return
}
go func () {
defer s .Mach .Remove1 (ssS .Heartbeat , nil )
eg , parCtx := errgroup .WithContext (ctx )
for _ , info := range ready {
eg .Go (func () error {
ok := false
tick := info .w .Tick (ssW .Healthcheck )
for i := 0 ; i < 3 ; i ++ {
info .w .Add1 (ssW .Healthcheck , nil )
if ctx .Err () != nil {
return ctx .Err ()
}
if tick < info .w .Tick (ssW .Healthcheck ) {
ok = true
break
}
_ = amhelp .Wait (ctx , s .HealthcheckPause )
}
if !ok {
AddErrWorker (e , s .Mach , ErrWorkerHealth , Pass (&A {
LocalAddr : info .localAddr ,
Id : info .w .Id (),
}))
return ErrWorkerHealth
}
return nil
})
}
go eg .Wait ()
err := amhelp .WaitForAll (ctx , s .ConnTimeout , parCtx .Done ())
if err != nil {
AddErrPool (e , s .Mach , fmt .Errorf ("%w: %w" , ErrHeartbeat , err ), nil )
return
}
s .Mach .Add1 (ssS .PoolReady , nil )
s .Mach .Remove1 (ssS .PoolReady , nil )
idle , err := s .Workers (ctx , StateIdle )
if ctx .Err () != nil {
return
}
if err != nil {
AddErrWorker (e , s .Mach , err , nil )
return
}
if len (idle ) > 0 {
s .Mach .Add1 (ssS .WorkersAvailable , nil )
} else {
s .Mach .Remove1 (ssS .WorkersAvailable , nil )
}
}()
}
var _ = ssS .NormalizingPool
func (s *Supervisor ) NormalizingPoolState (e *am .Event ) {
ctx := s .Mach .NewStateCtx (ssS .NormalizingPool )
s .normalizeStart = time .Now ()
go func () {
if ctx .Err () != nil {
return
}
defer s .Mach .Add1 (ssS .PoolNormalized , nil )
ready := false
for i := 0 ; i < 5 ; i ++ {
existing , err := s .Workers (ctx , "" )
if ctx .Err () != nil {
return
}
if err != nil {
continue
}
for ii := len (existing ); ii < s .min ()+s .Warm && ii < s .Max ; ii ++ {
s .Mach .Add1 (ssS .ForkWorker , nil )
}
check := func () bool {
if ctx .Err () != nil {
return false
}
ready , _ := s .Workers (ctx , StateReady )
if ctx .Err () != nil {
return false
}
if len (ready ) >= s .min () {
s .Mach .Add1 (ssS .PoolReady , nil )
return false
}
return true
}
_ = amhelp .Interval (ctx , s .ConnTimeout , s .WorkerCheckInterval , check )
if ready {
break
}
if !amhelp .Wait (ctx , s .PoolPause ) {
return
}
ready = s .Mach .Is1 (ssS .PoolReady )
if ready {
break
}
s .log ("failed to normalize pool, round %d" , i )
}
if !ready {
AddErrPoolStr (e , s .Mach , "failed to normalize pool" , nil )
}
s .normalizeStart = time .Time {}
}()
}
var _ = ssS .ProvideWorker
func (s *Supervisor ) ProvideWorkerEnter (e *am .Event ) bool {
a := am .ParseArgs [A ](e .Args )
return a .WorkerRpcId != "" && a .SuperRpcId != ""
}
func (s *Supervisor ) ProvideWorkerState (e *am .Event ) {
s .Mach .Remove1 (ssS .ProvideWorker , nil )
args := am .ParseArgs [A ](e .Args )
ctx := s .Mach .NewStateCtx (ssS .Start )
idle := s .idleWorkers ()
go func () {
if ctx .Err () != nil {
return
}
for _ , info := range idle {
ok := amhelp .Add1Sync (ctx , info .rpc .NetMach , ssW .ServeClient ,
Pass (&ARpc {
Id : args .WorkerRpcId ,
}))
if ctx .Err () != nil {
return
}
if ok {
s .log ("worker %s rejected %s" , info .rpc .NetMach .Id (), args .WorkerRpcId )
continue
}
return
}
}()
}
var _ = ssS .ListWorkers
func (s *Supervisor ) ListWorkersEnter (e *am .Event ) bool {
ch := am .ParseArgs [A ](e .Args ).WorkersCh
return ch != nil && cap (ch ) > 0
}
func (s *Supervisor ) ListWorkersState (e *am .Event ) {
s .Mach .Remove1 (ssS .ListWorkers , nil )
args := am .ParseArgs [A ](e .Args )
switch args .WorkerState {
case StateReady :
args .WorkersCh <- s .readyWorkers ()
case StateIniting :
args .WorkersCh <- s .initingWorkers ()
case StateBusy :
args .WorkersCh <- s .busyWorkers ()
case StateIdle :
args .WorkersCh <- s .idleWorkers ()
default :
args .WorkersCh <- slices .Collect (maps .Values (s .workers ))
}
}
var _ = ssS .SetWorker
func (s *Supervisor ) SetWorkerEnter (e *am .Event ) bool {
return am .ParseArgs [A ](e .Args ).WorkerAddr != ""
}
func (s *Supervisor ) SetWorkerState (e *am .Event ) {
s .Mach .Remove1 (ssS .SetWorker , nil )
args := am .ParseArgs [A ](e .Args )
addr := args .WorkerAddr
if args .WorkerInfo != nil {
s .workers [addr ] = args .WorkerInfo
} else {
delete (s .workers , addr )
}
}
func (s *Supervisor ) Start (publicAddr string ) {
s .Mach .Add1 (ssS .Start , Pass (&A {
LocalAddr : "localhost:0" ,
PublicAddr : publicAddr ,
}))
}
func (s *Supervisor ) Stop () {
s .Mach .Remove1 (ssS .Start , nil )
s .Mach .Dispose ()
}
func (s *Supervisor ) SetPool (min , max , warm , maxPerClient int ) {
if max < min {
min = max
}
s .Min = min
s .Max = max
s .Warm = warm
if maxPerClient == 0 {
maxPerClient = s .Max
}
s .MaxClientWorkers = maxPerClient
s .CheckPool ()
}
func (s *Supervisor ) CheckPool () bool {
s .Mach .Add1 (ssS .NormalizingPool , nil )
s .Mach .Add1 (ssS .PoolReady , nil )
return s .Mach .Is1 (ssS .PoolReady )
}
func (s *Supervisor ) Workers (
ctx context .Context , state WorkerState ,
) ([]*workerInfo , error ) {
wCh := make (chan []*workerInfo , 1 )
res := s .Mach .Add1 (ssS .ListWorkers , Pass (&A {
WorkersCh : wCh ,
WorkerState : state ,
}))
if res == am .Canceled {
return nil , fmt .Errorf ("listing workers: %w" , am .ErrCanceled )
}
select {
case <- ctx .Done ():
return nil , nil
case <- time .After (s .OpTimeout ):
return nil , fmt .Errorf ("listing workers: %w" , am .ErrTimeout )
case workers := <- wCh :
return workers , nil
}
}
func (s *Supervisor ) Dispose () {
s .Mach .Dispose ()
}
func (s *Supervisor ) initingWorkers () []*workerInfo {
var ret []*workerInfo
for _ , info := range s .workers {
if info .rpc == nil {
ret = append (ret , info )
}
}
return ret
}
func (s *Supervisor ) rpcWorkers () []*workerInfo {
var ret []*workerInfo
for _ , info := range s .workers {
if info .rpc != nil && info .rpc .NetMach != nil {
ret = append (ret , info )
}
}
return ret
}
func (s *Supervisor ) idleWorkers () []*workerInfo {
var ret []*workerInfo
for _ , info := range s .rpcWorkers () {
w := info .rpc .NetMach
if !info .hasErrs () && w .Is1 (ssW .Idle ) {
ret = append (ret , info )
}
}
return ret
}
func (s *Supervisor ) busyWorkers () []*workerInfo {
var ret []*workerInfo
for _ , info := range s .rpcWorkers () {
w := info .rpc .NetMach
if !info .hasErrs () && info .rpc != nil &&
w .Any1 (sgW .WorkStatus ...) && !w .Is1 (ssW .Idle ) {
ret = append (ret , info )
}
}
return ret
}
func (s *Supervisor ) readyWorkers () []*workerInfo {
var ret []*workerInfo
for _ , info := range s .rpcWorkers () {
w := info .rpc .NetMach
if !info .hasErrs () && info .rpc != nil && w .Is1 (ssW .Ready ) {
ret = append (ret , info )
}
}
return ret
}
func (s *Supervisor ) log (msg string , args ...any ) {
if !s .LogEnabled {
return
}
s .Mach .Log (msg , args ...)
}
func (s *Supervisor ) min () int {
if s .Min > s .Max {
return s .Max
}
return s .Min
}
func (s *Supervisor ) newClientConn (
mux *arpc .Mux , id string , conn net .Conn ,
) (*arpc .Server , error ) {
s .log ("new client connection %s" , id )
ctx := s .Mach .NewStateCtx (ssS .Start )
name := fmt .Sprintf ("ns-pub-%s-%s" , id , s .Name )
opts := &arpc .ServerOpts {
Parent : mux .Mach ,
}
rpcS , err := arpc .NewServer (ctx , s .PublicAddr , name , s .Mach , opts )
if err != nil {
return nil , err
}
rpcS .DeliveryTimeout = s .DeliveryTimeout
_, err = arpc .BindServerMulti (rpcS .Mach , s .Mach , ssS .PublicRpcReady ,
ssS .ClientConnected , ssS .ClientDisconnected )
if err != nil {
return nil , err
}
ok := s .Mach .Eval ("newClientConn" , func () {
s .PublicRpcs [conn .RemoteAddr ().String ()] = rpcS
}, ctx )
if !ok {
return nil , am .ErrHandlerTimeout
}
s .log ("new client connection %s ready" , id )
return rpcS , nil
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .