package node
import (
"context"
"fmt"
"os"
"time"
"github.com/patrickmn/go-cache"
"github.com/pancsta/asyncmachine-go/internal/utils"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/node/states"
"github.com/pancsta/asyncmachine-go/pkg/rpc"
ampipe "github.com/pancsta/asyncmachine-go/pkg/states/pipes"
)
type SupervisorOpts struct {
InstanceNum int
Parent am .Api
Tags []string
}
type bootstrap struct {
*am .ExceptionHandler
Mach *am .Machine
Super *Supervisor
Name string
LogEnabled bool
server *rpc .Server
}
func newBootstrap(ctx context .Context , super *Supervisor ) (*bootstrap , error ) {
b := &bootstrap {
Super : super ,
Name : super .Name + utils .RandId (6 ),
LogEnabled : os .Getenv (EnvAmNodeLogSupervisor ) != "" ,
}
mach , err := am .NewCommon (ctx , "nb-" +b .Name , states .BootstrapSchema ,
ssB .Names (), b , super .Mach , &am .Opts {Tags : []string {"node-bootstrap" }})
if err != nil {
return nil , err
}
b .Mach = mach
amhelp .MachDebugEnv (mach )
return b , nil
}
func (b *bootstrap ) StartState (e *am .Event ) {
var err error
ctx := b .Mach .NewStateCtx (ssB .Start )
b .server , err = rpc .NewServer (ctx , "localhost:0" , "nb-" +b .Name , b .Mach ,
&rpc .ServerOpts {Parent : b .Mach })
if err != nil {
b .Mach .AddErrState (ssB .ErrNetwork , err , nil )
return
}
amhelp .MachDebugEnv (b .server .Mach )
err = ampipe .BindErr (b .server .Mach , b .Mach , ssB .ErrNetwork )
if err != nil {
b .Mach .AddErr (err , nil )
return
}
b .server .Start ()
go func () {
amhelp .Wait (ctx , b .Super .ConnTimeout )
if !b .Mach .IsDisposed () && b .Mach .Not1 (ssB .WorkerAddr ) {
err := fmt .Errorf ("worker bootstrap: %w" , am .ErrTimeout )
b .Super .Mach .AddErrState (ssS .ErrWorker , err , Pass (&A {
Bootstrap : b ,
Id : b .Mach .Id (),
}))
}
}()
}
func (b *bootstrap ) StartEnd (e *am .Event ) {
if b .server != nil {
b .server .Stop (true )
}
b .Mach .Dispose ()
}
func (b *bootstrap ) WorkerAddrEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a != nil && a .LocalAddr != "" && a .PublicAddr != "" && a .Id != ""
}
func (b *bootstrap ) WorkerAddrState (e *am .Event ) {
args := ParseArgs (e .Args )
argsOut := *args
argsOut .BootAddr = b .Addr ()
b .log ("worker addr %s: %s / %s" , args .Id , args .PublicAddr , args .LocalAddr )
b .Super .Mach .Add1 (ssS .WorkerConnected , Pass (&argsOut ))
go func () {
time .Sleep (1 * time .Second )
b .Mach .Remove1 (ssB .Start , nil )
}()
}
func (b *bootstrap ) Dispose () {
b .log ("disposing bootstrap" )
b .Mach .Remove1 (ssB .Start , nil )
}
func (b *bootstrap ) Addr () string {
if b .server == nil {
return ""
}
return b .server .Addr
}
func (b *bootstrap ) log (msg string , args ...any ) {
if !b .LogEnabled {
return
}
b .Mach .Log (msg , args ...)
}
type workerInfo struct {
proc *os .Process
rpc *rpc .Client
w *rpc .Worker
publicAddr string
localAddr string
errs *cache .Cache
errsRecent *cache .Cache
}
func newWorkerInfo(s *Supervisor , proc *os .Process ) *workerInfo {
return &workerInfo {
errs : cache .New (s .WorkerErrTtl , time .Minute ),
errsRecent : cache .New (s .WorkerErrRecent , time .Minute ),
proc : proc ,
}
}
func (w *workerInfo ) hasErrs () bool {
return w .errsRecent .ItemCount () > 0
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .