package node

import (
	
	
	
	
	
	
	
	
	
	
	

	

	
	amhelp 
	am 
	
	
	ssrpc 
	ampipe 
)

var (
	ssS = states.SupervisorStates
	ssB = states.BootstrapStates
)

type Supervisor struct {
	*am.ExceptionHandler
	Mach *am.Machine

	// WorkerKind is the kind of worker this supervisor is managing.
	WorkerKind string
	// WorkerBin is the path and args to the worker binary.
	WorkerBin []string
	// Name is the name of the supervisor.
	Name       string
	LogEnabled bool

	// worker pool

	// Max is the maximum number of workers. Default is 10.
	Max int
	// Min is the minimum number of workers. Default is 2.
	Min int
	// Warm is the number of warm (ready) workers. Default is 5.
	Warm int
	// MaxClientWorkers is the maximum number of workers per 1 client. Defaults to
	// Max.
	MaxClientWorkers int
	// WorkerErrTtl is the time to keep worker errors in memory. Default is 10m.
	WorkerErrTtl time.Duration
	// WorkerErrRecent is the time to consider recent errors. Default is 1m.
	WorkerErrRecent time.Duration
	// WorkerErrKill is the number of errors to kill a worker. Default is 3.
	WorkerErrKill int

	// network
	// TODO group under Supervisor.Times struct

	// ConnTimeout is the time to wait for an outbound connection to be
	// established. Default is 5s.
	ConnTimeout time.Duration
	// DeliveryTimeout is a timeout for RPC delivery.
	DeliveryTimeout time.Duration
	// OpTimeout is the default timeout for operations (eg getters).
	OpTimeout time.Duration
	// PoolPause is the time to wait between normalizing the pool. Default is 5s.
	PoolPause time.Duration
	// HealthcheckPause is the time between trying to get a Healtcheck response
	// from a worker.
	HealthcheckPause time.Duration
	// Heartbeat is the frequency of the Heartbeat state, which normalized the
	// pool and checks workers. Default 1m.
	Heartbeat time.Duration
	// WorkerCheckInterval defines how often to pull a worker's state. Default 1s.
	WorkerCheckInterval time.Duration

	// PublicAddr is the address for the public RPC server to listen on. The
	// effective address is at [PublicMux.Addr].
	PublicAddr string
	// PublicMux is the public listener to create RPC servers for each client.
	PublicMux *rpc.Mux
	// PublicRpc are the public RPC servers of connected clients, indexed by
	// remote addresses.
	PublicRpcs map[string]*rpc.Server

	// LocalAddr is the address for the local RPC server to listen on. The
	// effective address is at [LocalRpc.Addr].
	LocalAddr string
	// LocalRpc is the local RPC server, used by other supervisors to connect.
	// TODO rpc/mux
	LocalRpc *rpc.Server

	// TODO healthcheck endpoint
	// HttpAddr string

	// workers is a map of local RPC addresses to workerInfo data.
	workers map[string]*workerInfo

	// schemaWorker is the struct for the worker.
	schemaWorker am.Schema

	// in-memory workers

	TestFork func(string) error
	TestKill func(string) error

	normalizeStart time.Time

	// self removing multi handlers

	WorkerReadyState       am.HandlerFinal
	WorkerGoneState        am.HandlerFinal
	KillWorkerState        am.HandlerFinal
	ClientSendPayloadState am.HandlerFinal
	SuperSendPayloadState  am.HandlerFinal
	HealthcheckState       am.HandlerFinal
}

// NewSupervisor initializes and returns a new Supervisor instance with
// specified context, worker attributes, and options.
//
// workerBin: path to run the worker binary file (for [exec.CommandContext]).
func (
	 context.Context,  string,  []string,
	 am.Schema,  *SupervisorOpts,
) (*Supervisor, error) {
	// validate
	if len() == 0 || [0] == "" {
		return nil, errors.New("super: workerBin required")
	}
	if  == nil {
		return nil, errors.New("super: workerSchema required")
	}
	if  == nil {
		 = &SupervisorOpts{}
	}

	 := amhelp.SchemaImplements(, states.WorkerStates.Names())
	if  != nil {
		 := fmt.Errorf(
			"worker has to implement am/node/states/WorkerStates: %w", )
		return nil, 
	}

	 := utils.Hostname()
	if len() > 15 {
		 = [:15]
	}
	 := fmt.Sprintf("%s-%s-%s-%d", , ,
		time.Now().Format("150405"), .InstanceNum)

	 := &Supervisor{
		WorkerKind: ,
		WorkerBin:  ,
		Name:       ,
		LogEnabled: os.Getenv(EnvAmNodeLogSupervisor) != "",

		// defaults

		Max:                 10,
		Min:                 2,
		Warm:                5,
		MaxClientWorkers:    10,
		ConnTimeout:         5 * time.Second,
		DeliveryTimeout:     5 * time.Second,
		OpTimeout:           time.Second,
		Heartbeat:           1 * time.Minute,
		WorkerCheckInterval: 1 * time.Second,
		PoolPause:           5 * time.Second,
		HealthcheckPause:    500 * time.Millisecond,
		WorkerErrTtl:        10 * time.Minute,
		WorkerErrRecent:     1 * time.Minute,
		WorkerErrKill:       3,

		// rpc

		PublicRpcs: make(map[string]*rpc.Server),

		// internals

		schemaWorker: ,
		workers:      map[string]*workerInfo{},
	}

	if amhelp.IsDebug() {
		// increase only the timeouts used by [context.WithTimeout] or [time.Sleep]
		// directly
		.DeliveryTimeout = 10 * .DeliveryTimeout
		.OpTimeout = 10 * .OpTimeout
	}

	,  := am.NewCommon(, "ns-"+.Name, states.SupervisorSchema,
		ssS.Names(), , .Parent, &am.Opts{Tags: []string{
			"node-supervisor", "kind:" + ,
			"instance:" + strconv.Itoa(.InstanceNum),
			"host:" + utils.Hostname(),
		}})
	if  != nil {
		return nil, 
	}

	.SemLogger().SetArgsMapper(LogArgs)
	.Mach = 
	_ = amhelp.MachDebugEnv()
	.AddBreakpoint(am.S{ssS.ErrWorker}, nil, false)

	// self removing multi handlers
	.WorkerReadyState = amhelp.RemoveMulti(, ssS.WorkerReady)
	.WorkerGoneState = amhelp.RemoveMulti(, ssS.WorkerGone)
	.KillWorkerState = amhelp.RemoveMulti(, ssS.KillWorker)
	.ClientSendPayloadState = amhelp.RemoveMulti(, ssS.ClientSendPayload)
	.SuperSendPayloadState = amhelp.RemoveMulti(, ssS.SuperSendPayload)
	.HealthcheckState = amhelp.RemoveMulti(, ssS.Healthcheck)

	// check base states
	 = amhelp.Implements(.StateNames(), ssS.Names())
	if  != nil {
		 := fmt.Errorf(
			"client has to implement am/node/states/SupervisorStates: %w", )
		return nil, 
	}

	return , nil
}

// ///// ///// /////

// ///// HANDLERS

// ///// ///// /////

func ( *Supervisor) ( *am.Event) {
	// remove err as handled, if the last err
	if !.Mach.WillBe1(ssS.Exception) {
		.Mach.Remove(am.S{ssS.ErrWorker, ssS.Exception}, nil)
	}
	 := am.ParseArgs(.Args).Err
	 := ParseArgs(.Args)
	 := .workers[.LocalAddr]

	// possibly kill the worker
	if !errors.Is(, ErrWorkerKill) &&  != nil {
		 := .errs.Add(utils.RandId(0), , 0)
		 := .errsRecent.Add(utils.RandId(0), , 0)
		if  := errors.Join(, );  != nil {
			.Mach.Log("failed to add error to worker %s: %v", .LocalAddr, )
		}

		// kill if too many errs
		if .errs.ItemCount() > .WorkerErrKill {
			.Mach.Add1(ssS.KillingWorker, Pass(&A{
				LocalAddr: .LocalAddr,
			}))
		}
	}

	// dispose bootstrap
	if .Bootstrap != nil {
		.Bootstrap.Dispose()
	}

	// re-check the pool status
	.Mach.Remove1(ssS.PoolReady, nil)
}

// TODO ErrPool

func ( *Supervisor) ( *am.Event) bool {
	 := ParseArgs(.Args)
	return  != nil && .PublicAddr != "" && .LocalAddr != ""
}

func ( *Supervisor) ( *am.Event) {
	.Mach.Remove1(ssS.ClientConnected, nil)
}

func ( *Supervisor) ( *am.Event) bool {
	 := rpc.ParseArgs(.Args)
	return  != nil && .Addr != ""
}

func ( *Supervisor) ( *am.Event) {
	.Mach.Remove1(ssS.ClientDisconnected, nil)

	 := rpc.ParseArgs(.Args).Addr
	,  := .PublicRpcs[]
	if ! {
		.log("client %s disconnected, but not found", )
	}
	.Stop(true)
	delete(.PublicRpcs, )
}

func ( *Supervisor) ( *am.Event) {
	var  error
	 := .Mach.NewStateCtx(ssS.Start)
	 := ParseArgs(.Args)
	.LocalAddr = .LocalAddr
	.PublicAddr = .PublicAddr

	// public rpc (muxed)
	.PublicMux,  = rpc.NewMux(, "ns-pub-"+.Name, .newClientConn,
		&rpc.MuxOpts{Parent: .Mach})
	.PublicMux.Addr = .PublicAddr
	if  != nil {
		_ = AddErrRpc(.Mach, , nil)
		return
	}

	// local rpc TODO mux
	 := &rpc.ServerOpts{
		Parent:       .Mach,
		PayloadState: ssS.SuperSendPayload,
	}
	.LocalRpc,  = rpc.NewServer(, .LocalAddr, "ns-loc-"+.Name, .Mach,
		)
	if  != nil {
		_ = AddErrRpc(.Mach, , nil)
		return
	}
	.LocalRpc.DeliveryTimeout = .DeliveryTimeout
	 = rpc.BindServerMulti(.LocalRpc.Mach, .Mach, ssS.LocalRpcReady,
		ssS.SuperConnected, ssS.SuperDisconnected)
	if  != nil {
		_ = AddErrRpc(.Mach, , nil)
		return
	}

	// start
	.PublicMux.Start()
	.LocalRpc.Start()

	// unblock
	go func() {
		// wait for the RPC servers to become ready
		 := amhelp.WaitForAll(, .ConnTimeout,
			.PublicMux.Mach.When1(ssrpc.MuxStates.Ready, nil),
			.LocalRpc.Mach.When1(ssrpc.ServerStates.RpcReady, nil))
		if .Err() != nil {
			return // expired
		}
		if  != nil {
			 := errors.Join(, .PublicMux.Mach.Err(), .LocalRpc.Mach.Err())
			_ = AddErrRpc(.Mach, , nil)
			return
		}

		// wait for the pool
		select {
		case <-.Done():
			return // expired
		case <-.Mach.When1(ssS.PoolReady, nil):
		}

		// start Heartbeat
		 := time.NewTicker(.Heartbeat)
		defer .Stop()

		for {
			select {
			case <-.Done():
				return // expired
			case <-.C:
				.Mach.Add1(ssS.Heartbeat, nil)
			}
		}
	}()
}

func ( *Supervisor) ( *am.Event) {
	// TODO stop all rpc servers
	if .PublicMux != nil {
		.PublicMux.Stop(true)
	}
	if .LocalRpc != nil {
		.LocalRpc.Stop(true)
	}
}

func ( *Supervisor) ( *am.Event) bool {
	return len(.workers) < .Max
}

func ( *Supervisor) ( *am.Event) {
	.Mach.Remove1(ssS.ForkWorker, nil)
	 := .Mach.NewStateCtx(ssS.Start)

	// init bootstrap machine
	,  := newBootstrap(, )
	if  != nil {
		_ = AddErrWorker(nil, .Mach, , nil)
		return
	}
	 := &A{Bootstrap: }

	// start connection-bootstrap machine
	 := .Mach.Add1(states.BootstrapStates.Start, nil)
	if  != am.Executed || .Mach.IsErr() {
		_ = AddErrWorker(, .Mach, ErrWorkerConn, Pass())
		return
	}

	// unblock
	go func() {
		// wait for bootstrap RPC to become ready
		 := amhelp.WaitForAll(, .ConnTimeout,
			.server.Mach.When1(ssrpc.ServerStates.RpcReady, nil))
		if .Err() != nil {
			return // expired
		}
		if  != nil {
			_ = AddErrWorker(, .Mach, , Pass())
			return
		}

		// next
		.Mach.Add1(ssS.ForkingWorker, Pass())
	}()
}

func ( *Supervisor) ( *am.Event) bool {
	 := ParseArgs(.Args)
	return .Bootstrap != nil && .Bootstrap.Addr() != "" &&
		len(.workers) < .Max
}

func ( *Supervisor) ( *am.Event) {
	.Mach.Remove1(ssS.ForkingWorker, nil)
	 := .Mach.NewStateCtx(ssS.Start)
	 := ParseArgs(.Args)
	 := .Bootstrap
	 := .Addr()
	 := &A{Bootstrap: }

	// test forking, if provided
	if .TestFork != nil {
		// unblock
		go func() {
			if .Err() != nil {
				return // expired
			}
			if  := .TestFork();  != nil {
				_ = AddErrWorker(, .Mach, , Pass())
				return
			}
			// fake entry
			.Mach.Add1(ssS.SetWorker, Pass(&A{
				WorkerAddr: ,
				WorkerInfo: newWorkerInfo(, nil),
			}))
		}()

		// tests end here
		return
	}

	// prep for forking
	var  []string
	if len(.WorkerBin) > 1 {
		 = .WorkerBin[1:]
	}
	// TODO custom param name -a
	// TODO store PIDs of workers, clean up old PIDs
	 = slices.Concat(, []string{"-a", })
	.log("forking worker %s %s", .WorkerBin[0], )
	 := exec.CommandContext(, .WorkerBin[0], ...)
	.Env = os.Environ()
	.Mach.Add1(ssS.SetWorker, Pass(&A{
		WorkerAddr: ,
		WorkerInfo: newWorkerInfo(, .Process),
	}))

	// read errors
	,  := .StderrPipe()
	if  != nil {
		_ = AddErrWorker(, .Mach, , Pass())
		return
	}
	 := bufio.NewScanner()

	// fork the worker
	 = .Start()
	if  != nil {
		_ = AddErrWorker(, .Mach, , Pass())
		return
	}

	// monitor the fork
	go func() {
		var  string
		for .Scan() {
			 += .Text() + "\n"
		}

		// skip ctx expire, [cmd] already inherited it
		 := .Wait()
		if  != nil {
			if  != "" {
				.log("fork error: %s", )
			}
			_ = AddErrWorker(, .Mach, , Pass())

			return
		}
	}()
}

func ( *Supervisor) ( *am.Event) bool {
	 := ParseArgs(.Args)
	return  != nil && .LocalAddr != ""
}

func ( *Supervisor) ( *am.Event) {
	.Mach.Remove1(ssS.WorkerConnected, nil)

	 := .Mach.NewStateCtx(ssS.Start)
	 := ParseArgs(.Args)
	// copy args
	 := *

	// unblock
	go func() {
		// bootstraps dispose themselves, which closes chans
		if .Err() != nil {
			return // expired
		}

		// rpc to the worker
		 := .LocalAddr
		, ,  := net.SplitHostPort()
		if  != nil {
			_ = AddErrWorker(, .Mach, , Pass(&))
			return
		}
		,  := rpc.NewClient(, , .Name+"-"+,
			.schemaWorker, &rpc.ClientOpts{Parent: .Mach})
		if  != nil {
			_ = AddErrWorker(, .Mach, , Pass(&))
			return
		}

		// wait for client ready
		.Start()
		 = amhelp.WaitForErrAny(, .ConnTimeout, .Mach,
			.Mach.When1(ssC.Ready, ))
		if .Err() != nil {
			return // expired
		}
		if  != nil {
			_ = AddErrWorker(, .Mach, .Mach.Err(), Pass(&))
			return
		}

		// next
		.WorkerRpc = 
		.Mach.Add1(ssS.WorkerForked, Pass(&))
	}()
}

func ( *Supervisor) ( *am.Event) bool {
	 := ParseArgs(.Args)
	return  != nil && .LocalAddr != "" && .WorkerRpc != nil
}

func ( *Supervisor) ( *am.Event) {
	.Mach.Remove1(ssS.WorkerForked, nil)

	 := ParseArgs(.Args)
	 := .LocalAddr
	 := .BootAddr
	 := .WorkerRpc
	 := &A{
		LocalAddr: ,
		Id:        .Mach.Id(),
	}

	// switch addresses (boot -> local) and update the worker map
	,  := .workers[]
	if ! {
		_ = AddErrWorker(, .Mach, ErrWorkerMissing, Pass())
		return
	}
	delete(.workers, )

	// update worker info
	.rpc = 
	.w = .NetMach
	.publicAddr = .PublicAddr
	.localAddr = 
	.workers[] = 

	// custom pipe worker states
	 := errors.Join(
		ampipe.BindReady(.Mach, .Mach, ssS.WorkerReady, ""),
		.Mach.BindHandlers(&struct {
			 am.HandlerFinal
			       am.HandlerFinal
		}{
			: func( *am.Event) {
				_ = AddErrWorker(, .Mach, .Mach.Err(), Pass())
			},
			: func( *am.Event) {
				// TODO why this kills the workers? which Ready ends?
				.Mach.EvAdd1(, ssS.KillWorker, Pass())
			},
		}),
	)
	if  != nil {
		_ = AddErrWorker(, .Mach, , Pass())
		return
	}

	// ping and re-check the pool status
	.NetMach.Add1(ssW.Healthcheck, nil)
	.Mach.Add1(ssS.PoolReady, nil)
}

func ( *Supervisor) ( *am.Event) bool {
	 := ParseArgs(.Args)
	return  != nil && .LocalAddr != ""
}

func ( *Supervisor) ( *am.Event) {
	.Mach.Remove1(ssS.KillingWorker, nil)

	 := ParseArgs(.Args).LocalAddr
	 := &A{LocalAddr: }

	// fake kill in tests
	if .TestKill != nil {
		if  := .TestKill();  != nil {
			.Mach.AddErr(, Pass())
			return
		}

		return
	}

	,  := .workers[]
	if ! {
		_ = AddErrWorker(, .Mach, ErrWorkerMissing, Pass())
		return
	}
	 := .proc.Kill()
	if  != nil {
		_ = AddErrWorker(, .Mach, , Pass())
		return
	}

	// TODO confirm port disconnect

	.Mach.Add1(ssS.WorkerKilled, Pass())
}

func ( *Supervisor) ( *am.Event) bool {
	 := ParseArgs(.Args)
	return  != nil && .LocalAddr != ""
}

func ( *Supervisor) ( *am.Event) {
	.Mach.Remove1(ssS.WorkerKilled, nil)

	 := ParseArgs(.Args)
	delete(.workers, .LocalAddr)

	// re-check the pool status
	.Mach.Remove1(ssS.PoolReady, nil)
}

func ( *Supervisor) ( *am.Event) bool {
	// TODO timeouts in tests
	return len(.readyWorkers()) >= .min()
}

func ( *Supervisor) ( *am.Event) bool {
	return len(.readyWorkers()) < .min()
}

func ( *Supervisor) ( *am.Event) {
	// TODO detect stuck NormalizingPool
	// TODO time limit heartbeat
	// TODO test
	 := .Mach.NewStateCtx(ssS.Heartbeat)

	// clear gone workers TODO check if binding is enough
	// for _, info := range s.StartedWorkers() {
	// 	w := info.rpc.NetworkMachine
	//
	// 	// was Ready, but not anymore
	// 	if w.Not1(ssW.Ready) && w.Tick(ssW.Ready) > 2 {
	// 		s.Mach.Add(ssS.KillingWorker, Pass(&A{
	// 			LocalAddr: info.localAddr,
	// 		}))
	// 	}
	// }

	// get ready workers, or abort
	 := .readyWorkers()
	if  == nil {
		.Mach.Remove1(ssS.Heartbeat, nil)
		return
	}

	// unblock
	go func() {
		defer .Mach.Remove1(ssS.Heartbeat, nil)

		// parallel group
		,  := errgroup.WithContext()
		for ,  := range  {
			.Go(func() error {
				// 3 tries per worker
				 := false
				 := .w.Tick(ssW.Healthcheck)
				for  := 0;  < 3; ++ {

					// blocking RPC call
					.w.Add1(ssW.Healthcheck, nil)
					if .Err() != nil {
						return .Err() // expired
					}
					if  < .w.Tick(ssW.Healthcheck) {
						 = true
						break
					}
					_ = amhelp.Wait(, .HealthcheckPause)
				}

				if ! {
					return AddErrWorker(, .Mach, ErrWorkerHealth, Pass(&A{
						LocalAddr: .localAddr,
						Id:        .w.Id(),
					}))
				}

				return nil
			})
		}

		// dont wait here...
		go .Wait() //nolint:errcheck
		// ...wait with a timeout instead
		 := amhelp.WaitForAll(, .ConnTimeout, .Done())
		if  != nil {
			_ = AddErrPool(.Mach, fmt.Errorf("%w: %w", ErrHeartbeat, ), nil)
			return
		}

		// update states PoolReady (negotiation lets the right one in)
		.Mach.Add1(ssS.PoolReady, nil)
		.Mach.Remove1(ssS.PoolReady, nil)

		// update WorkersAvailable
		,  := .Workers(, StateIdle)
		if .Err() != nil {
			return // expired
		}
		if  != nil {
			_ = AddErrWorker(, .Mach, , nil)
			return
		}
		if len() > 0 {
			.Mach.Add1(ssS.WorkersAvailable, nil)
		} else {
			.Mach.Remove1(ssS.WorkersAvailable, nil)
		}
	}()
}

func ( *Supervisor) ( *am.Event) {
	 := .Mach.NewStateCtx(ssS.NormalizingPool)
	.normalizeStart = time.Now()

	// unblock
	go func() {
		if .Err() != nil {
			return // expired
		}

		defer .Mach.Add1(ssS.PoolNormalized, nil)
		 := false

		// 5 rounds
		for  := 0;  < 5; ++ {

			// list workers
			,  := .Workers(, "")
			if .Err() != nil {
				return // expired
			}
			if  != nil {
				continue
			}
			// fork missing ones (include warm workers)
			for  := len();  < .min()+.Warm &&  < .Max; ++ {
				.Mach.Add1(ssS.ForkWorker, nil)
			}

			// wait and keep checking
			 := func() bool {
				if .Err() != nil {
					return false // expired
				}

				,  := .Workers(, StateReady)
				if .Err() != nil {
					return false // expired
				}
				if len() >= .min() {
					.Mach.Add1(ssS.PoolReady, nil)
					return false
				}

				// continue
				return true
			}
			// blocking call
			_ = amhelp.Interval(, .ConnTimeout, .WorkerCheckInterval, )
			if  {
				break
			}

			// not ok, wait a bit
			if !amhelp.Wait(, .PoolPause) {
				return // expired
			}

			 = .Mach.Is1(ssS.PoolReady)
			if  {
				break
			}

			.log("failed to normalize pool, round %d", )
		}

		if ! {
			_ = AddErrPoolStr(.Mach, "failed to normalize pool", nil)
		}

		.normalizeStart = time.Time{}
	}()
}

func ( *Supervisor) ( *am.Event) bool {
	 := ParseArgs(.Args)
	return .WorkerRpcId != "" && .SuperRpcId != ""
}

func ( *Supervisor) ( *am.Event) {
	.Mach.Remove1(ssS.ProvideWorker, nil)
	 := ParseArgs(.Args)
	 := .Mach.NewStateCtx(ssS.Start)
	 := .idleWorkers()

	// unblock
	go func() {
		if .Err() != nil {
			return // expired
		}

		// find an idle worker
		for ,  := range  {

			// confirm with the worker
			 := amhelp.Add1Sync(, .rpc.NetMach, ssW.ServeClient,
				PassRpc(&A{Id: .WorkerRpcId}))
			if .Err() != nil {
				return // expired
			}
			if  != am.Executed {
				.log("worker %s rejected %s", .rpc.NetMach.Id(), .WorkerRpcId)
				continue
			}

			// send the addr to the client via RPC SendPayload
			.Mach.Add1(ssS.ClientSendPayload, rpc.PassRpc(&rpc.A{
				Name: "worker_addr",
				Payload: &rpc.MsgSrvPayload{
					Name:        ssS.ProvideWorker,
					Source:      .Mach.Id(),
					Destination: .SuperRpcId,
					Data:        .publicAddr,
				},
			}))

			.log("worker %s provided to %s", .rpc.NetMach.Id(), .SuperRpcId)
			break
		}
	}()
}

func ( *Supervisor) ( *am.Event) bool {
	 := ParseArgs(.Args).WorkersCh
	// require a buffered channel
	return  != nil && cap() > 0
}

func ( *Supervisor) ( *am.Event) {
	.Mach.Remove1(ssS.ListWorkers, nil)
	 := ParseArgs(.Args)

	switch .WorkerState {
	case StateReady:
		.WorkersCh <- .readyWorkers()
	case StateIniting:
		.WorkersCh <- .initingWorkers()
	case StateBusy:
		.WorkersCh <- .busyWorkers()
	case StateIdle:
		.WorkersCh <- .idleWorkers()
	default:
		.WorkersCh <- slices.Collect(maps.Values(.workers))
	}
}

func ( *Supervisor) ( *am.Event) bool {
	return ParseArgs(.Args).WorkerAddr != ""
}

func ( *Supervisor) ( *am.Event) {
	.Mach.Remove1(ssS.SetWorker, nil)
	 := ParseArgs(.Args)
	 := .WorkerAddr

	if .WorkerInfo != nil {
		.workers[] = .WorkerInfo
	} else {
		delete(.workers, )
	}
}

// ///// ///// /////

// ///// METHODS

// ///// ///// /////

func ( *Supervisor) ( string) {
	.Mach.Add1(ssS.Start, Pass(&A{
		LocalAddr:  "localhost:0",
		PublicAddr: ,
	}))
}

func ( *Supervisor) () {
	.Mach.Remove1(ssS.Start, nil)
	.Mach.Dispose()
}

// SetPool sets the pool parameters with defaults.
func ( *Supervisor) (, , ,  int) {
	if  <  {
		 = 
	}
	.Min = 
	.Max = 
	.Warm = 
	if  == 0 {
		 = .Max
	}
	.MaxClientWorkers = 

	.CheckPool()
}

// CheckPool tries to set pool as ready and normalizes it, if not.
func ( *Supervisor) () bool {
	.Mach.Add1(ssS.NormalizingPool, nil)
	.Mach.Add1(ssS.PoolReady, nil)

	return .Mach.Is1(ssS.PoolReady)
}

// Workers returns a list of workers in a desired state. If [ctx] expires, it
// will reutrn nil, nil.
func ( *Supervisor) (
	 context.Context,  WorkerState,
) ([]*workerInfo, error) {
	 := make(chan []*workerInfo, 1)
	 := .Mach.Add1(ssS.ListWorkers, Pass(&A{
		WorkersCh:   ,
		WorkerState: ,
	}))
	if  == am.Canceled {
		return nil, fmt.Errorf("listing workers: %w", am.ErrCanceled)
	}

	select {
	case <-.Done():
		return nil, nil // expired
	case <-time.After(.OpTimeout):
		return nil, fmt.Errorf("listing workers: %w", am.ErrTimeout)
	case  := <-:
		return , nil
	}
}

func ( *Supervisor) () {
	.Mach.Dispose()
}

// InitingWorkers returns workers being currently initialized. Only call in
// handlers or use the [ListWorkers] state.
func ( *Supervisor) () []*workerInfo {
	var  []*workerInfo
	for ,  := range .workers {
		if .rpc == nil {
			 = append(, )
		}
	}

	return 
}

// RpcWorkers returns workers with an RPC connection. Only call in handlers or
// use the [ListWorkers] state.
func ( *Supervisor) () []*workerInfo {
	var  []*workerInfo
	for ,  := range .workers {
		if .rpc != nil && .rpc.NetMach != nil {
			 = append(, )
		}
	}

	return 
}

// IdleWorkers returns Idle workers. Only call in handlers or use the
// [ListWorkers] state.
func ( *Supervisor) () []*workerInfo {
	var  []*workerInfo
	for ,  := range .rpcWorkers() {
		 := .rpc.NetMach
		if !.hasErrs() && .Is1(ssW.Idle) {
			 = append(, )
		}
	}

	return 
}

// BusyWorkers returns Busy workers. Only call in handlers or use the
// [ListWorkers] state.
func ( *Supervisor) () []*workerInfo {
	var  []*workerInfo
	for ,  := range .rpcWorkers() {
		 := .rpc.NetMach
		if !.hasErrs() && .rpc != nil &&
			.Any1(sgW.WorkStatus...) && !.Is1(ssW.Idle) {
			 = append(, )
		}
	}

	return 
}

// ReadyWorkers returns Ready workers. Only call in handlers or use the
// [ListWorkers] state.
func ( *Supervisor) () []*workerInfo {
	var  []*workerInfo
	for ,  := range .rpcWorkers() {
		 := .rpc.NetMach
		if !.hasErrs() && .rpc != nil && .Is1(ssW.Ready) {
			 = append(, )
		}
	}

	return 
}

func ( *Supervisor) ( string,  ...any) {
	if !.LogEnabled {
		return
	}
	.Mach.Log(, ...)
}

func ( *Supervisor) () int {
	if .Min > .Max {
		return .Max
	}
	return .Min
}

// newClientConn creates a new RPC server for a client.
// TODO keep one forked and bind immediately
func ( *Supervisor) (
	 int,  net.Conn,
) (*rpc.Server, error) {
	.log("new client connection %d", )
	 := .Mach.NewStateCtx(ssS.Start)
	 := fmt.Sprintf("ns-pub-%d-%s", , .Name)

	 := &rpc.ServerOpts{
		Parent:       .PublicMux.Mach,
		PayloadState: ssS.ClientSendPayload,
	}
	,  := rpc.NewServer(, .PublicAddr, , .Mach, )
	if  != nil {
		return nil, 
	}

	// set up
	.DeliveryTimeout = .DeliveryTimeout
	 = rpc.BindServerMulti(.Mach, .Mach, ssS.PublicRpcReady,
		ssS.ClientConnected, ssS.ClientDisconnected)
	if  != nil {
		return nil, 
	}

	// store
	 := .Mach.Eval("newClientConn", func() {
		// TODO check ctx
		.PublicRpcs[.RemoteAddr().String()] = 
	}, )
	if ! {
		return nil, am.ErrHandlerTimeout
	}

	.log("new client connection %d ready", )
	return , nil
}