package rpc

// TODO call ClientBye on Disposing

import (
	
	
	
	
	
	
	
	
	

	

	
	amhelp 
	am 
	
	ampipe 
)

var (
	ssS = states.ServerStates
	ssW = states.NetSourceStates
)

// Server is an RPC server that can be bound to a worker machine and provide
// remote access to its states and methods.
type Server struct {
	*ExceptionHandler
	Mach *am.Machine

	// Source is a state Source, either a local or remote RPC worker.
	Source am.Api
	// Addr is the address of the server on the network.
	Addr string
	// DeliveryTimeout is a timeout for SendPayload to the client.
	DeliveryTimeout time.Duration
	// Listener can be set manually before starting the server.
	Listener atomic.Pointer[net.Listener]
	// Conn can be set manually before starting the server.
	Conn net.Conn
	// NoNewListener will prevent the server from creating a new listener if
	// one is not provided or has been closed. Useful for cmux.
	NoNewListener bool
	LogEnabled    bool
	CallCount     uint64
	// Typed arguments struct value with defaults
	Args any
	// Typed arguments prefix in a resulting [am.A] map.
	ArgsPrefix string

	// sync settings

	// PushInterval is the interval for clock updates, effectively throttling
	// the number of updates sent to the client within the interval window.
	// 0 means pushes are disabled. Setting to a very small value will make
	// pushes instant.
	PushInterval atomic.Pointer[time.Duration]
	// syncMutations will push all clock changes for each mutation, enabling
	// client-side mutation filtering.
	syncMutations     bool
	syncAllowedStates am.S
	syncSkippedStates am.S
	syncSchema        bool
	syncShallowClocks bool

	// security

	// AllowId will limit clients to a specific ID, if set.
	AllowId string

	// internal

	rpcServer *rpc2.Server
	// rpcClient is the internal rpc2 client.
	rpcClient atomic.Pointer[rpc2.Client]
	tracer    *sourceTracer
	// TODO bind the a machine ticker (if handlers bound)
	ticker *time.Ticker

	// lock data collection (RemoteHello and tracer data)
	lockCollection sync.Mutex
	// lock exporting updates (pushClient and responses to mutations)
	lockExport sync.Mutex
	// server is currently responding to a client, and pushing should be skipped
	// respInProgress atomic.Bool

	// ID of the currently connected client.
	clientId         atomic.Pointer[string]
	deliveryHandlers any
	lastPush         time.Time
	lastPushData     *tracerData
}

// interfaces
var (
	_ serverRpcMethods    = &Server{}
	_ clientServerMethods = &Server{}
)

// NewServer creates a new RPC server, bound to a worker machine.
// The source machine has to implement [states.NetSourceStatesDef] interface.
func (
	 context.Context,  string,  string,  am.Api,
	 *ServerOpts,
) (*Server, error) {
	if  == "" {
		 = "rpc"
	}
	if  == nil {
		 = &ServerOpts{}
	}

	// check the source
	if !.StatesVerified() {
		return nil, fmt.Errorf(
			"net source states not verified, call VerifyStates()")
	}
	 := .HasHandlers()
	if  && !.Has(ssW.Names()) {
		// error only when some handlers bound, skip deterministic machines
		 := fmt.Errorf(
			"%w: NetSourceMach with handlers has to implement "+
				"pkg/rpc/states/NetSourceStatesDef",
			am.ErrSchema)

		return nil, 
	}

	// TODO validate state names

	 := &Server{
		ExceptionHandler: &ExceptionHandler{},
		Addr:             ,
		DeliveryTimeout:  5 * time.Second,
		LogEnabled:       os.Getenv(EnvAmRpcLogServer) != "",
		Source:           ,
		Args:             .Args,
		ArgsPrefix:       .ArgsPrefix,

		lastPushData: &tracerData{},
	}
	 := 250 * time.Millisecond
	.PushInterval.Store(&)

	// state machine
	,  := am.NewCommon(, "rs-"+, states.ServerSchema, ssS.Names(),
		, .Parent, &am.Opts{Tags: []string{"rpc-server"}})
	if  != nil {
		return nil, 
	}
	.SemLogger().SetArgsMapper(LogArgs)
	.SetGroups(states.ServerGroups, ssS)
	.OnDispose(func( string,  context.Context) {
		if  := .Listener.Load();  != nil {
			_ = (*).Close()
			.Listener.Store(nil)
		}
		.rpcServer = nil
		_ = .Source.DetachTracer(.tracer)
		_ = .Source.DetachHandlers(.deliveryHandlers)
	})
	.Mach = 
	// optional env debug
	if os.Getenv(EnvAmRpcDbg) != "" {
		_ = amhelp.MachDebugEnv()
	}

	// bind to source via Tracer API (inactive until activated)
	.tracer = &sourceTracer{
		s: ,
		dataLatest: &tracerData{
			// queue ticks start at 1
			queueTick: 1,
		},
	}
	// queue ticks start at 1
	.tracer.dataLatest.queueTick = 1
	if  = .BindTracer(.tracer);  != nil {
		return nil, 
	}
	.OnDispose(func( string,  context.Context) {
		_ = .DetachTracer(.tracer)
	})

	// handle payload
	if  {

		// payload state
		 := ssW.SendPayload
		if .PayloadState != "" {
			 = .PayloadState
		}

		// payload handlers
		var  any
		if  == ssW.SendPayload {
			// default handlers
			 = &SendPayloadHandlers{
				SendPayloadState: getSendPayloadState(, ssW.SendPayload),
			}
		} else {
			// dynamic handlers TODO use ampipe.Bind
			 = createSendPayloadHandlers(, )
		}
		 = .BindHandlers()
		if  != nil {
			return nil, 
		}
		.OnDispose(func( string,  context.Context) {
			_ = .DetachHandlers()
		})
	}

	return , nil
}

// ///// ///// /////

// ///// HANDLERS

// ///// ///// /////

func ( *Server) ( *am.Event) {
	if ParseArgs(.Args).Dispose {
		.Mach.Dispose()
	}
}

func ( *Server) ( *am.Event) bool {
	if .Listener.Load() == nil && .NoNewListener {
		return false
	}
	if .Addr == "" {
		return false
	}

	return true
}

func ( *Server) ( *am.Event) {
	 := .Mach.NewStateCtx(ssS.RpcStarting)
	 := .Mach.NewStateCtx(ssS.Start)
	.log("Starting RPC on %s", .Addr)
	.bindRpcHandlers()
	 := .rpcServer

	// unblock
	go func() {
		// has to be ctxStart, not ctxRpcStarting TODO why?
		if .Err() != nil {
			return // expired
		}

		if .Conn != nil {
			.Addr = .Conn.LocalAddr().String()
		} else if  := .Listener.Load();  != nil {
			// update Addr from listener (support for external and :0)
			.Addr = (*).Addr().String()
		} else {
			// create a listener if not provided
			// use Start as the context
			 := net.ListenConfig{}
			,  := .Listen(, "tcp4", .Addr)
			if  != nil {
				// add err to mach
				AddErrNetwork(, .Mach, )
				// add outcome to mach
				.Mach.Remove1(ssS.RpcStarting, nil)

				return
			}

			.Listener.Store(&)
			// update Addr from listener (support for external and :0)
			.Addr = .Addr().String()
		}

		.log("RPC started on %s", .Addr)

		// fork to accept
		go func() {
			if .Err() != nil {
				return // expired
			}
			.Mach.EvAdd1(, ssS.RpcReady, Pass(&A{Addr: .Addr}))

			// accept (block)
			 := .Listener.Load()
			if .Conn != nil {
				.ServeConn(.Conn)
			} else {
				.Accept(*)
			}
			if .Err() != nil {
				return // expired
			}

			// clean up
			if  != nil {
				(*).Close()
				.Listener.Store(nil)
			}
			if .Err() != nil {
				return // expired
			}

			// restart on failed listener
			if .Mach.Is1(ssS.Start) {
				.Mach.EvRemove1(, ssS.RpcReady, nil)
				.Mach.EvAdd1(, ssS.RpcStarting, nil)
			}
		}()

		// bind to client events
		.OnDisconnect(func( *rpc2.Client) {
			.Mach.EvRemove1(, ssS.ClientConnected, Pass(&A{Client: }))
		})
		.OnConnect(func( *rpc2.Client) {
			.Mach.EvAdd1(, ssS.ClientConnected, Pass(&A{Client: }))
		})
	}()
}

func ( *Server) ( *am.Event) bool {
	// only from RpcStarting
	return .Mach.Is1(ssS.RpcStarting)
}

// RpcReadyState starts a ticker to compensate for clock push debounces.
func ( *Server) ( *am.Event) {
	// no ticker for instant clocks
	if *.PushInterval.Load() == 0 {
		return
	}

	 := .Mach.NewStateCtx(ssS.RpcReady)
	if .ticker == nil {
		.ticker = time.NewTicker(*.PushInterval.Load())
	}

	// avoid dispose
	 := .ticker

	go func() {
		if .Err() != nil {
			return // expired
		}

		// push clock updates, debounced by getLatestUpdate
		for {
			select {
			case <-.Done():
				.ticker.Stop()
				return

			case <-.C:
				.pushClient()
			}
		}
	}()
}

// TODO tell the client Bye to gracefully disconn

func ( *Server) ( *am.Event) {
	if  := .rpcClient.Load();  != nil {
		_ = .Close()
		// s.rpcClient = nil
	}
}

// ///// ///// /////

// ///// METHODS

// ///// ///// /////

// Start starts the server, optionally creating a Listener (if Addr provided).
// Results in either RpcReady or Exception.
func ( *Server) () am.Result {
	return .Mach.Add1(ssS.Start, nil)
}

// Stop stops the server, and optionally disposes resources.
func ( *Server) ( bool) am.Result {
	if .Mach == nil {
		return am.Canceled
	}
	if  {
		.log("disposing")
	}
	// TODO use Disposing
	 := .Mach.Remove1(ssS.Start, Pass(&A{
		Dispose: ,
	}))

	return 
}

// SendPayload sends a payload to the client. It's usually called by a handler
// for SendPayload. [event] is optional.
func ( *Server) (
	 context.Context,  *am.Event,  *MsgSrvPayload,
) error {
	// TODO add SendPayloadAsync calling RemoteSendingPayload first
	// TODO bind to an async state

	if .Mach.Not1(ssS.ClientConnected) || .Mach.Not1(ssS.HandshakeDone) {
		return ErrNoConn
	}

	// check destination
	 := .ClientId()
	if .Destination != "" &&  != .Destination {
		return fmt.Errorf("%w: %s != %s", ErrDestination, .Destination, )
	}

	defer .Mach.PanicToErr(nil)

	.Token = utils.RandId(0)
	if  != nil {
		.Source = .MachineId
		.SourceTx = .TransitionId
	}
	.log("sending payload %s from %s to %s", .Name, .Source,
		.Destination)

	// TODO failsafe
	return .rpcClient.Load().CallWithContext(,
		ClientSendPayload.Value, , &MsgEmpty{})
}

func ( *Server) () string {
	 := .clientId.Load()
	if  == nil {
		return ""
	}

	return *
}

// GetKind returns a kind of RPC component (server / client).
func ( *Server) () Kind {
	return KindServer
}

func ( *Server) ( string,  ...any) {
	if !.LogEnabled {
		return
	}
	.Mach.Log(, ...)
}

func ( *Server) () {
	// new RPC instance, release prev resources
	.rpcServer = rpc2.NewServer()

	.rpcServer.Handle(ServerHello.Value, .RemoteHello)
	.rpcServer.Handle(ServerHandshake.Value, .RemoteHandshake)
	.rpcServer.Handle(ServerAdd.Value, .RemoteAdd)
	.rpcServer.Handle(ServerAddNS.Value, .RemoteAddNS)
	.rpcServer.Handle(ServerRemove.Value, .RemoteRemove)
	.rpcServer.Handle(ServerSet.Value, .RemoteSet)
	.rpcServer.Handle(ServerSync.Value, .RemoteSync)
	.rpcServer.Handle(ServerArgs.Value, .RemoteArgs)
	.rpcServer.Handle(ServerBye.Value, .RemoteBye)

	// TODO RemoteWhenArgs

	// s.rpcServer.Handle("RemoteLog", s.RemoteLog)
	// s.rpcServer.Handle("RemoteWhenArgs", s.RemoteWhenArgs)
}

// pushClient pushes an update to the client. It can be either a single or a
// multi update. It can also be throttled and happen later.
func ( *Server) () {
	 := .rpcClient.Load()
	if  == nil {
		return
	}

	// push disabled or not ready
	if *.PushInterval.Load() == 0 || .Mach.Not1(ssS.HandshakeDone) {
		return
	}

	// skip if currently exporting
	if !.lockExport.TryLock() {
		.log("skip parallel export")
		return
	}
	defer .lockExport.Unlock()

	// collect
	 := .tracer.DataLatest()
	if  == nil {
		return
	}

	// no change
	if .lastPushData.mTrackedTimeSum == .mTrackedTimeSum &&
		.lastPushData.queueTick == .queueTick {

		// s.log("skip no diff")
		return
	}

	// too often
	if time.Since(.lastPush) < *.PushInterval.Load() {
		// s.log("update too soon")
		return
	}

	// sync
	.log("pushClient:try t%d", .mTrackedTimeSum)
	var  error
	if .syncMutations {
		 = .pushUpdateMutations(.tracer.DataQueue())
	} else {
		 = .pushUpdateLatest()
	}
	if  != nil {
		.Mach.Remove1(ssS.ClientConnected, nil)
		AddErr(nil, .Mach, "pushClient", )

		return
	}
	.log("pushClient:ok t%d", .mTrackedTimeSum)

	.storeLastPush()
}

// call via by pushClient
func ( *Server) ( []tracerMutation) error {
	 := .rpcClient.Load()
	if  == nil {
		return nil
	}
	defer .Mach.PanicToErr(nil)

	// calculate diffs
	 := calcUpdateMutations(.syncSchema, , .lastPushData)

	// nothing to push
	if len(.MutationType) == 0 {
		return nil
	}

	// notify without a response
	.CallCount++

	// TODO failsafe retry (stateful)
	return .Notify(ClientUpdateMutations.Value, )
}

// call via by pushClient
func ( *Server) ( *tracerData) error {
	 := .rpcClient.Load()
	if  == nil {
		return nil
	}
	defer .Mach.PanicToErr(nil)

	// calculate diff
	 := calcUpdate(.syncSchema, , .lastPushData, .syncShallowClocks)

	// nothing to push
	if len(.Indexes) == 0 {
		return nil
	}

	// notify without a response
	.CallCount++
	// fmt.Printf("[S] update %v\n", update)
	// fmt.Printf("[S] time %v\n", data.mTime)

	// TODO failsafe retry (stateful)
	return .Notify(ClientUpdate.Value, )
}

// newMsgMutation creates a new response to a mutation call from the client.
// Requires [s.lockExport]
func ( *Server) (
	 am.Result,  *tracerData,
) *MsgSrvMutation {
	 := MsgSrvMutation{Result: }
	// calculate diff
	if .syncMutations {
		.Mutations = calcUpdateMutations(.syncSchema, .tracer.DataQueue(),
			.lastPushData)
	} else {
		.Update = calcUpdate(.syncSchema, , .lastPushData,
			.syncShallowClocks)
	}

	// fmt.Printf("[S] QueueTick: %d - %d\n", data.queueTick, s.lastPushQTick)
	// fmt.Printf("[S] MachTick: %d - %d\n", data.machTick, s.lastPushMachTick)

	.storeLastPush()

	return &
}

func ( *Server) ( *tracerData) {
	.lastPush = time.Now()
	.lastPushData = 
}

// ///// ///// /////

// ///// REMOTE METHODS

// ///// ///// /////
// TODO add local errs

func ( *Server) (
	 *rpc2.Client,  *MsgCliHello,  *MsgSrvHello,
) error {
	// validate
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	if  == nil || .Id == "" {
		.Mach.Remove1(ssS.Handshaking, nil)
		AddErrRpcStr(nil, .Mach, "handshake failed: ID missing")

		return ErrInvalidParams
	}

	// locks
	.lockCollection.Lock()
	defer .lockCollection.Unlock()

	// check access TODO test
	if .AllowId != "" && .Id != .AllowId {
		.Mach.Remove1(ssS.Handshaking, nil)

		return fmt.Errorf("%w: %s != %s", ErrNoAccess, .Id, .AllowId)
	}

	// set up sync
	.syncAllowedStates = .AllowedStates
	.syncSkippedStates = .SkippedStates
	.syncShallowClocks = .ShallowClocks
	.syncMutations = .SyncMutations

	// prep the mach msg
	, ,  := .Source.Export()
	.tracer.calcTrackedStates(.StateNames)
	.tracer.active = true
	 := len(.StateNames)
	 := .Time.Filter(.tracer.trackedStateIdxs).Sum(nil)

	// client-bound indexes when no schema synced
	if !.SyncSchema {
		.StateNames = am.StatesShared(.StateNames,
			.tracer.trackedStates)
		.Time = .Time.Filter(.tracer.trackedStateIdxs)

		// zero non-tracked for consistent checksums
	} else {
		for  := range .StateNames {
			if slices.Contains(.tracer.trackedStateIdxs, ) {
				continue
			}
			.Time[] = 0
		}
	}
	* = MsgSrvHello{
		Serialized:  ,
		StatesCount: uint32(),
	}

	// return the schema if requested
	if .SyncSchema {
		.syncSchema = true
		if .SchemaHash == "" ||
			.SchemaHash != amhelp.SchemaHash() {

			.Schema = 
		}
	}

	// memorize
	.lastPushData.mTime = .Time
	.lastPushData.queueTick = .QueueTick
	.lastPushData.mTrackedTimeSum = 
	.lastPush = time.Now()
	.clientId.Store(&.Id)

	.log("RemoteHello: t%v q%d", , .QueueTick)
	.Mach.Add1(ssS.Handshaking, nil)

	// TODO timeout for RemoteHandshake via push loop

	return nil
}

func ( *Server) (
	 *rpc2.Client,  *MsgEmpty,  *MsgEmpty,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	 := .clientId.Load()
	if  == nil {
		return ErrNoConn
	}
	 := .Source.Time(nil).Sum(nil)
	 := .Source.QueueTick()
	.log("RemoteHandshake: t%v q%d", , )

	// accept the client
	.rpcClient.Store()
	.Mach.Add1(ssS.HandshakeDone, Pass(&A{
		Id: *,
	}))

	return nil
}

func ( *Server) (
	 *rpc2.Client,  *MsgCliMutation,  *MsgSrvMutation,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.lockExport.Lock()
	defer .lockExport.Unlock()

	// validate
	if .States == nil {
		return ErrInvalidParams
	}

	// typed args
	 := .Args
	if .ArgsPrefix != "" && .Args != nil {
		 = am.A{
			.ArgsPrefix: amhelp.ArgsFromMap(.Args, .Args),
		}
	}

	// execute
	var  am.Result
	if .Event != nil {
		 = .Source.EvAdd(.Event, amhelp.IndexesToStates(
			.Source.StateNames(), .States), )
	} else {
		// TODO eval
		 = .Source.Add(amhelp.IndexesToStates(.Source.StateNames(),
			.States), )
	}

	// return
	 := .tracer.DataLatest()
	* = *.newMsgMutation(, )

	return nil
}

func ( *Server) (
	 *rpc2.Client,  *MsgCliMutation,  *MsgEmpty,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.lockExport.Lock()
	defer .lockExport.Unlock()

	// validate
	if .States == nil {
		return ErrInvalidParams
	}

	// typed args
	 := .Args
	if .ArgsPrefix != "" && .Args != nil {
		 = am.A{
			.ArgsPrefix: amhelp.ArgsFromMap(.Args, .Args),
		}
	}

	// execute TODO event trace
	_ = .Source.Add(amhelp.IndexesToStates(.Source.StateNames(), .States),
		)

	return nil
}

func ( *Server) (
	 *rpc2.Client,  *MsgCliMutation,  *MsgSrvMutation,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.lockExport.Lock()
	defer .lockExport.Unlock()

	// validate
	if .States == nil {
		return ErrInvalidParams
	}

	// typed args
	 := .Args
	if .ArgsPrefix != "" && .Args != nil {
		 = am.A{
			.ArgsPrefix: amhelp.ArgsFromMap(.Args, .Args),
		}
	}

	// execute TODO event trace
	 := .Source.Remove(amhelp.IndexesToStates(.Source.StateNames(),
		.States), )

	// return
	 := .tracer.DataLatest()
	* = *.newMsgMutation(, )

	return nil
}

func ( *Server) (
	 *rpc2.Client,  *MsgCliMutation,  *MsgSrvMutation,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.lockExport.Lock()
	defer .lockExport.Unlock()

	// validate
	if .States == nil {
		return ErrInvalidParams
	}

	// typed args
	 := .Args
	if .ArgsPrefix != "" && .Args != nil {
		 = am.A{
			.ArgsPrefix: amhelp.ArgsFromMap(.Args, .Args),
		}
	}

	// execute TODO event trace
	 := .Source.Set(amhelp.IndexesToStates(.Source.StateNames(),
		.States), )

	// return
	 := .tracer.DataLatest()
	* = *.newMsgMutation(, )

	return nil
}

func ( *Server) (
	 *rpc2.Client,  *MsgEmpty,  *MsgSrvSync,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.Mach.Add1(ssS.MetricSync, nil)

	* = MsgSrvSync{
		Time:      .Source.Time(nil),
		QueueTick: .Source.QueueTick(),
	}
	.log("RemoteSync: [%v]", .Time)

	return nil
}

func ( *Server) (
	 *rpc2.Client,  *MsgEmpty,  *MsgSrvArgs,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.Mach.Add1(ssS.MetricSync, nil)

	// args TODO cache
	if .Args != nil {
		,  := utils.StructFields(.Args)
		if  != nil {
			return 
		}
		(*).Args = 
	}

	return nil
}

// RemoteBye means the client says goodbye and will disconnect shortly.
func ( *Server) (
	 *rpc2.Client,  *MsgEmpty,  *MsgEmpty,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.log("RemoteBye")

	.Mach.Remove1(ssS.ClientConnected, Pass(&A{
		Addr: .Addr,
	}))
	go func() {
		select {
		case <-time.After(100 * time.Millisecond):
			.log("rpc.Close timeout")
		case <-amhelp.ExecAndClose(func() {
			if  := .rpcClient.Load();  != nil {
				_ = .Close()
			}
		}):
			.log("rpc.Close")
		}

		time.Sleep(100 * time.Millisecond)
		.Mach.Remove1(ssS.HandshakeDone, nil)
	}()

	// forget client
	.rpcClient.Store(nil)
	.clientId.Store(nil)

	return nil
}

// ///// ///// /////

// ///// BINDINGS

// ///// ///// /////

// BindServer binds RpcReady and ClientConnected with Add/Remove, to custom
// states.
func (,  *am.Machine, ,  string) error {
	if  == "" ||  == "" {
		return fmt.Errorf("rpcReady and clientConn must be set")
	}

	 := &struct {
		 am.HandlerFinal
		   am.HandlerFinal

		 am.HandlerFinal
		   am.HandlerFinal
	}{
		: ampipe.Add(, , ssS.RpcReady, ),
		:   ampipe.Remove(, , ssS.RpcReady, ),

		: ampipe.Add(, , ssS.ClientConnected,
			),
		: ampipe.Remove(, , ssS.ClientConnected,
			),
	}

	return .BindHandlers()
}

// BindServerMulti binds RpcReady, ClientConnected, and ClientDisconnected.
// RpcReady is Add/Remove, the other two are Add-only to passed multi states.
func (
	,  *am.Machine, , ,  string,
) error {
	if  == "" ||  == "" ||  == "" {
		return fmt.Errorf("rpcReady, clientConn, and clientDisconn must be set")
	}

	 := &struct {
		 am.HandlerFinal
		   am.HandlerFinal

		 am.HandlerFinal
		   am.HandlerFinal
	}{
		: ampipe.Add(, , ssS.RpcReady, ),
		:   ampipe.Remove(, , ssS.RpcReady, ),

		: ampipe.Add(, ,
			ssS.ClientConnected, ),
		: ampipe.Add(, ,
			ssS.ClientConnected, ),
	}

	return .BindHandlers()
}

// BindServerRpcReady bind RpcReady using Add to a custom multi state.
func (,  *am.Machine,  string) error {
	 := &struct {
		 am.HandlerFinal
	}{
		: ampipe.Add(, , ssS.RpcReady, ),
	}

	return .BindHandlers()
}

// ///// ///// /////

// ///// MISC

// ///// ///// /////

type ServerOpts struct {
	// PayloadState is a state for the server to listen on, to deliver payloads
	// to the client. The client activates this state to request a payload from
	// the worker. Default: am/rpc/states/WorkerStates.SendPayload.
	PayloadState string
	// Parent is a parent state machine for a new Server state machine. See
	// [am.Opts].
	Parent am.Api
	// Typed arguments struct pointer
	Args       any
	ArgsPrefix string
}

type SendPayloadHandlers struct {
	SendPayloadState am.HandlerFinal
}

// getSendPayloadState returns a handler (usually SendPayloadState) that will
// deliver a payload to the RPC client. The resulting function can be bound in
// anon handlers.
func getSendPayloadState( *Server,  string) am.HandlerFinal {
	return func( *am.Event) {
		// self-remove
		.Machine().EvRemove1(, , nil)

		 := .Mach.NewStateCtx(ssS.Start)
		 := ParseArgs(.Args)
		 := &A{Name: .Name}

		// side-effect error handling
		if .Payload == nil || .Name == "" {
			 := fmt.Errorf("invalid payload args [name, payload]")
			.Machine().EvAddErrState(, ssW.ErrSendPayload, , Pass())

			return
		}

		// unblock and forward to the client
		go func() {
			// timeout context
			,  := context.WithTimeout(, .DeliveryTimeout)
			defer ()

			 := .SendPayload(, , .Payload)
			if  != nil {
				.Machine().EvAddErrState(, ssW.ErrSendPayload, , Pass())
			}
		}()
	}
}

// createSendPayloadHandlers creates SendPayload handlers for a custom (dynamic)
// state name. Useful when binding >1 RPC server into the same state source.
func createSendPayloadHandlers( *Server,  string) any {
	// TODO migrate to ampipe.Bind
	 := getSendPayloadState(, )

	// define a struct with the handler
	 := reflect.StructOf([]reflect.StructField{
		{
			Name:  + am.SuffixState,
			Type: reflect.TypeOf(),
		},
	})

	// new instance and set handler
	 := reflect.New().Elem()
	.Field(0).Set(reflect.ValueOf())
	 := .Addr().Interface()

	return 
}

// calcUpdate calculates a new update based on previously pushed data.
func calcUpdate(
	 bool, ,  *tracerData,  bool,
) *MsgSrvUpdate {
	var  []uint16
	var  []uint32
	if  {
		,  = genShallowUpdate(, , )
	} else {
		,  = genDeepUpdate(, , )
	}

	return &MsgSrvUpdate{
		QueueTick: uint16(.queueTick - .queueTick),
		MachTick:  uint8(.machTick - .machTick),
		Indexes:   ,
		Ticks:     ,
		Checksum:  .checksum,
	}
}

// calcUpdate calculates a new mutation update based on the queue and previously
// pushed data.
func calcUpdateMutations(
	 bool,  []tracerMutation,  *tracerData,
) *MsgSrvUpdateMuts {
	 := &MsgSrvUpdateMuts{}
	for  := range  {
		 := &[]
		.MutationType = append(.MutationType, .mutType)
		 := make([]uint16, len(.calledIdxs))
		for  := range .calledIdxs {
			[] = uint16(.calledIdxs[])
		}
		.CalledStates = append(.CalledStates, )
		.Updates = append(.Updates, *calcUpdate(, &.data, ,
			false))

		 = &.data
	}

	return 
}

func genDeepUpdate(
	 bool, ,  *tracerData,
) ( []uint16,  []uint32) {
	 = make([]uint16, 0, len(.tracked))
	 = make([]uint32, 0, len(.tracked))

	for  := range .tracked {
		 := .trackedIdxs[]
		 := .mTime
		 := .mTime

		// client has shorter state names
		 := uint16()
		if ! {
			 = uint16()
		}

		// first call or new schema
		if  == nil ||  >= len() {
			if [] == 0 {
				continue
			}
			 = append(, )
			 = append(, uint32([]))

			// regular update
		} else if [] != [] {
			 = append(, )
			 = append(, uint32([]-[]))
		}
	}

	return , 
}

func genShallowUpdate(
	 bool, ,  *tracerData,
) ( []uint16,  []uint32) {
	 = make([]uint16, 0, len(.tracked))
	 = make([]uint32, 0, len(.tracked))

	for  := range .tracked {
		 := .trackedIdxs[]
		 := .mTime
		 := .mTime

		// client has shorter state names
		 := uint16()
		if ! {
			 = uint16()
		}

		// first call or new schema
		if  == nil || int() >= len() {
			if [] == 0 {
				continue
			}
			 = append(, )
			 := 0
			if am.IsActiveTick([]) {
				 = 1
			}
			 = append(, uint32())

			// regular update
		} else if []%2 != []%2 {
			 = append(, )
			 = append(, uint32(1))
		}
	}

	return , 
}