package rpc

// TODO call ClientBye on Disposing

import (
	
	
	
	
	
	
	
	

	

	
	amhelp 
	am 
	
	ampipe 
)

var (
	ssS = states.ServerStates
	ssW = states.WorkerStates
)

// Server is an RPC server that can be bound to a worker machine and provide
// remote access to its states and methods.
type Server struct {
	*ExceptionHandler
	Mach *am.Machine

	// Source is a state Source, either a local or remote RPC worker.
	Source am.Api
	// Addr is the address of the server on the network.
	Addr string
	// DeliveryTimeout is a timeout for SendPayload to the client.
	DeliveryTimeout time.Duration
	// PushInterval is the interval for clock updates, effectively throttling
	// the number of updates sent to the client within the interval window.
	// 0 means pushes are disabled. Setting to a very small value will make
	// pushes instant.
	PushInterval time.Duration
	// PushAllTicks will push all ticks to the client, enabling client-side
	// final handlers. TODO more info, implement via a queue
	PushAllTicks bool
	// Listener can be set manually before starting the server.
	Listener atomic.Pointer[net.Listener]
	// Conn can be set manually before starting the server.
	Conn net.Conn
	// NoNewListener will prevent the server from creating a new listener if
	// one is not provided, or has been closed. Useful for cmux.
	NoNewListener bool
	LogEnabled    bool
	CallCount     uint64

	// AllowId will limit clients to a specific ID, if set.
	AllowId string

	rpcServer *rpc2.Server
	// rpcClient is the internal rpc2 client.
	rpcClient atomic.Pointer[rpc2.Client]
	clockMx   sync.Mutex
	ticker    *time.Ticker
	// mutMx is a lock preventing mutation methods from racing each other.
	mutMx         sync.Mutex
	skipClockPush atomic.Bool
	tracer        *WorkerTracer
	// ID of the currently connected client.
	clientId         atomic.Pointer[string]
	deliveryHandlers any

	// lastClockHTime is the last (human) time a clock update was sent to the
	// client.
	lastClockHTime time.Time
	lastClock      am.Time
	lastClockSum   atomic.Uint64
	lastClockMsg   *ClockMsg
	lastQueueTick  uint64
}

// interfaces
var (
	_ serverRpcMethods    = &Server{}
	_ clientServerMethods = &Server{}
)

// NewServer creates a new RPC server, bound to a worker machine.
// The source machine has to implement am/rpc/states/WorkerStatesDef interface.
func (
	 context.Context,  string,  string,  am.Api,
	 *ServerOpts,
) (*Server, error) {
	if  == "" {
		 = "rpc"
	}
	if  == nil {
		 = &ServerOpts{}
	}

	// check the worker
	if !.StatesVerified() {
		return nil, fmt.Errorf("worker states not verified, call VerifyStates()")
	}
	 := .HasHandlers()
	if  && !.Has(ssW.Names()) {
		// error only when some handlers bound, skip deterministic machines
		 := fmt.Errorf(
			"%w: RPC worker with handlers has to implement "+
				"pkg/rpc/states/WorkerStatesDef",
			am.ErrSchema)

		return nil, 
	}

	 := &Server{
		ExceptionHandler: &ExceptionHandler{},
		Addr:             ,
		PushInterval:     250 * time.Millisecond,
		DeliveryTimeout:  5 * time.Second,
		LogEnabled:       os.Getenv(EnvAmRpcLogServer) != "",
		Source:           ,

		// queue ticks start at 1
		lastQueueTick: 1,
	}
	var  uint64
	.lastClockSum.Store()

	// state machine
	,  := am.NewCommon(, "rs-"+, states.ServerSchema, ssS.Names(),
		, .Parent, &am.Opts{Tags: []string{"rpc-server"}})
	if  != nil {
		return nil, 
	}
	.SemLogger().SetArgsMapper(LogArgs)
	.OnDispose(func( string,  context.Context) {
		if  := .Listener.Load();  != nil {
			_ = (*).Close()
			.Listener.Store(nil)
		}
		.rpcServer = nil
		_ = .Source.DetachTracer(.tracer)
		_ = .Source.DetachHandlers(.deliveryHandlers)
	})
	.Mach = 
	// optional env debug
	if os.Getenv(EnvAmRpcDbg) != "" {
		amhelp.MachDebugEnv()
	}

	// bind to worker via Tracer API
	.tracer = &WorkerTracer{s: }
	_ = .BindTracer(.tracer)

	// handle payload
	if  {

		// payload state
		 := ssW.SendPayload
		if .PayloadState != "" {
			 = .PayloadState
		}

		// payload handlers
		var  any
		if  == ssW.SendPayload {
			// default handlers
			 = &SendPayloadHandlers{
				SendPayloadState: getSendPayloadState(, ssW.SendPayload),
			}
		} else {
			// dynamic handlers
			 = createSendPayloadHandlers(, )
		}
		 = .BindHandlers()
		if  != nil {
			return nil, 
		}
		.OnDispose(func( string,  context.Context) {
			_ = .DetachHandlers()
		})
	}

	return , nil
}

// ///// ///// /////

// ///// HANDLERS

// ///// ///// /////

func ( *Server) ( *am.Event) {
	if ParseArgs(.Args).Dispose {
		.Mach.Dispose()
	}
}

func ( *Server) ( *am.Event) bool {
	if .Listener.Load() == nil && .NoNewListener {
		return false
	}
	if .Addr == "" {
		return false
	}

	return true
}

func ( *Server) ( *am.Event) {
	 := .Mach.NewStateCtx(ssS.RpcStarting)
	 := .Mach.NewStateCtx(ssS.Start)
	.log("Starting RPC on %s", .Addr)
	.bindRpcHandlers()
	 := .rpcServer

	// unblock
	go func() {
		// has to be ctxStart, not ctxRpcStarting TODO why?
		if .Err() != nil {
			return // expired
		}

		if .Conn != nil {
			.Addr = .Conn.LocalAddr().String()
		} else if  := .Listener.Load();  != nil {
			// update Addr from listener (support for external and :0)
			.Addr = (*).Addr().String()
		} else {
			// create a listener if not provided
			// use Start as the context
			 := net.ListenConfig{}
			,  := .Listen(, "tcp4", .Addr)
			if  != nil {
				// add err to mach
				AddErrNetwork(, .Mach, )
				// add outcome to mach
				.Mach.Remove1(ssS.RpcStarting, nil)

				return
			}

			.Listener.Store(&)
			// update Addr from listener (support for external and :0)
			.Addr = .Addr().String()
		}

		.log("RPC started on %s", .Addr)

		// fork to accept
		go func() {
			if .Err() != nil {
				return // expired
			}
			.Mach.EvAdd1(, ssS.RpcReady, Pass(&A{Addr: .Addr}))

			// accept (block)
			 := .Listener.Load()
			if .Conn != nil {
				.ServeConn(.Conn)
			} else {
				.Accept(*)
			}
			if .Err() != nil {
				return // expired
			}

			// clean up
			if  != nil {
				(*).Close()
				.Listener.Store(nil)
			}
			if .Err() != nil {
				return // expired
			}

			// restart on failed listener
			if .Mach.Is1(ssS.Start) {
				.Mach.EvRemove1(, ssS.RpcReady, nil)
				.Mach.EvAdd1(, ssS.RpcStarting, nil)
			}
		}()

		// bind to client events
		.OnDisconnect(func( *rpc2.Client) {
			.Mach.EvRemove1(, ssS.ClientConnected, Pass(&A{Client: }))
		})
		.OnConnect(func( *rpc2.Client) {
			.Mach.EvAdd1(, ssS.ClientConnected, Pass(&A{Client: }))
		})
	}()
}

func ( *Server) ( *am.Event) bool {
	// only from RpcStarting
	return .Mach.Is1(ssS.RpcStarting)
}

// RpcReadyState starts a ticker to compensate for clock push debounces.
func ( *Server) ( *am.Event) {
	// no ticker for instant clocks
	if .PushInterval == 0 {
		return
	}

	 := .Mach.NewStateCtx(ssS.RpcReady)
	if .ticker == nil {
		.ticker = time.NewTicker(.PushInterval)
	}

	// avoid dispose
	 := .ticker

	go func() {
		if .Err() != nil {
			return // expired
		}

		// push clock updates, debounced by genClockUpdate
		for {
			select {
			case <-.Done():
				.ticker.Stop()
				return

			case <-.C:
				.pushClockUpdate(false)
			}
		}
	}()
}

// TODO tell the client Bye to gracefully disconn

func ( *Server) ( *am.Event) {
	if  := .rpcClient.Load();  != nil {
		_ = .Close()
		// s.rpcClient = nil
	}
}

// ///// ///// /////

// ///// METHODS

// ///// ///// /////

// Start starts the server, optionally creating a Listener (if Addr provided).
// Results in either RpcReady or Exception.
func ( *Server) () am.Result {
	return .Mach.Add1(ssS.Start, nil)
}

// Stop stops the server, and optionally disposes resources.
func ( *Server) ( bool) am.Result {
	if .Mach == nil {
		return am.Canceled
	}
	if  {
		.log("disposing")
	}
	// TODO use Disposing
	 := .Mach.Remove1(ssS.Start, Pass(&A{
		Dispose: ,
	}))

	return 
}

// SendPayload sends a payload to the client. It's usually called by a handler
// for SendPayload.
func ( *Server) (
	 context.Context,  *am.Event,  *ArgsPayload,
) error {
	// TODO add SendPayloadAsync calling RemoteSendingPayload first
	// TODO bind to an async state

	if .Mach.Not1(ssS.ClientConnected) || .Mach.Not1(ssS.HandshakeDone) {
		return ErrNoConn
	}

	// check destination
	 := .ClientId()
	if .Destination != "" &&  != .Destination {
		return fmt.Errorf("%w: %s != %s", ErrDestination, .Destination, )
	}

	defer .Mach.PanicToErr(nil)

	.Token = utils.RandId(0)
	if  != nil {
		.Source = .MachineId
		.SourceTx = .TransitionId
	}
	.log("sending payload %s from %s to %s", .Name, .Source,
		.Destination)

	// TODO failsafe
	return .rpcClient.Load().CallWithContext(,
		ClientSendPayload.Value, , &Empty{})
}

func ( *Server) () string {
	 := .clientId.Load()
	if  == nil {
		return ""
	}

	return *
}

// GetKind returns a kind of RPC component (server / client).
func ( *Server) () Kind {
	return KindServer
}

func ( *Server) ( string,  ...any) {
	if !.LogEnabled {
		return
	}
	.Mach.Log(, ...)
}

func ( *Server) () {
	// new RPC instance, release prev resources
	.rpcServer = rpc2.NewServer()

	.rpcServer.Handle(ServerHello.Value, .RemoteHello)
	.rpcServer.Handle(ServerHandshake.Value, .RemoteHandshake)
	.rpcServer.Handle(ServerAdd.Value, .RemoteAdd)
	.rpcServer.Handle(ServerAddNS.Value, .RemoteAddNS)
	.rpcServer.Handle(ServerRemove.Value, .RemoteRemove)
	.rpcServer.Handle(ServerSet.Value, .RemoteSet)
	.rpcServer.Handle(ServerSync.Value, .RemoteSync)
	.rpcServer.Handle(ServerBye.Value, .RemoteBye)

	// TODO RemoteLog, RemoteWhenArgs, RemoteGetMany

	// s.rpcServer.Handle("RemoteLog", s.RemoteLog)
	// s.rpcServer.Handle("RemoteWhenArgs", s.RemoteWhenArgs)
}

func ( *Server) ( bool) {
	 := .rpcClient.Load()
	if  == nil {
		return
	}
	if .skipClockPush.Load() && ! {
		// TODO log lvl 2
		// s.log("force-skip clock push")
		return
	}

	if .Mach.Not1(ssS.ClientConnected) ||
		.Mach.Not1(ssS.HandshakeDone) {
		// TODO log lvl 2
		// s.log("skip clock push")
		return
	}

	// disabled
	if .PushInterval == 0 && ! {
		return
	}

	// push all ticks
	// TODO PushAllTicks
	// if s.PushAllTicks {
	// }

	// push the latest clock only
	 := .genClockUpdate(false)
	// debounce
	if  == nil {
		return
	}

	// notify without a response
	defer .Mach.PanicToErr(nil)
	.log("pushClockUpdate %d", .lastClockSum.Load())
	.CallCount++

	// TODO failsafe retry
	 := .Notify(ClientSetClock.Value, )
	if  != nil {
		.Mach.Remove1(ssS.ClientConnected, nil)
		AddErr(nil, .Mach, "pushClockUpdate", )
	}
}

func ( *Server) ( bool) *ClockMsg {
	.clockMx.Lock()
	defer .clockMx.Unlock()

	// exit if too often
	if ! && (time.Since(.lastClockHTime) < .PushInterval) {
		// s.log("genClockUpdate: too soon")
		return nil
	}
	 := time.Now()
	 := .Source.QueueTick()
	 := .Source.Time(nil)

	// exit if no change since the last sync
	var  uint64
	for ,  := range  {
		 += 
	}
	// update on a state change and queue tick change
	if  == .lastClockSum.Load() &&  == .lastQueueTick {
		// flooooood
		// s.log("genClockUpdate: no change t%d q%d", tSum, qTick)
		return nil
	}

	// proceed - valid clock update
	.lastClockMsg = NewClockMsg(, .lastClock, , .lastQueueTick, )
	.lastClock = 
	.lastClockHTime = 
	.lastQueueTick = 
	.lastClockSum.Store()
	.log("genClockUpdate: t%d q%d ch%d (%s)", , ,
		.lastClockMsg.Checksum, .Source.ActiveStates(nil))

	return .lastClockMsg
}

// ///// ///// /////

// ///// REMOTE METHODS

// ///// ///// /////

func ( *Server) (
	 *rpc2.Client,  *ArgsHello,  *RespHandshake,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.clockMx.Lock()
	defer .clockMx.Unlock()
	// TODO pass ID and key here
	// TODO check if client here is the same as RespHandshakeAck

	 := .Source.Export()
	* = RespHandshake{
		Serialized: ,
	}

	// return the schema if requested
	if .ReqSchema {
		// TODO get via Api.Export to avoid races
		 := .Source.Schema()
		.Schema = 
	}

	 := .Time.Sum(nil)
	.log("RemoteHello: t%v q%d", , .QueueTick)
	.Mach.Add1(ssS.Handshaking, nil)
	.lastClock = .Time
	.lastQueueTick = .QueueTick
	.lastClockSum.Store()
	.lastClockHTime = time.Now()

	// TODO timeout for RemoteHandshake

	return nil
}

func ( *Server) (
	 *rpc2.Client,  *string,  *Empty,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	// TODO pass ID and key here
	if  == nil || * == "" {
		.Mach.Remove1(ssS.Handshaking, nil)
		AddErrRpcStr(nil, .Mach, "handshake failed: ID missing")

		return ErrInvalidParams
	}

	// check access TODO test
	if .AllowId != "" && * != .AllowId {
		.Mach.Remove1(ssS.Handshaking, nil)

		return fmt.Errorf("%w: %s != %s", ErrNoAccess, *, .AllowId)
	}

	 := .Source.Time(nil).Sum(nil)
	 := .Source.QueueTick()
	.log("RemoteHandshake: t%v q%d", , )

	// accept the client
	.rpcClient.Store()
	.clientId.Store()
	.Mach.Add1(ssS.HandshakeDone, Pass(&A{Id: *}))

	// state changed during the handshake, push manually
	if .lastClockSum.Load() !=  || .lastQueueTick !=  &&
		.PushInterval == 0 {
		.pushClockUpdate(true)
	}

	return nil
}

func ( *Server) (
	 *rpc2.Client,  *ArgsMut,  *RespResult,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.mutMx.Lock()
	defer .mutMx.Unlock()

	// validate
	if .States == nil {
		return ErrInvalidParams
	}

	// execute
	var  am.Result
	.skipClockPush.Store(true)
	if .Event != nil {
		 = .Source.EvAdd(.Event, amhelp.IndexesToStates(
			.Source.StateNames(), .States), .Args)
	} else {
		// TODO eval
		 = .Source.Add(amhelp.IndexesToStates(.Source.StateNames(),
			.States), .Args)
	}

	// return
	* = RespResult{
		Result: ,
		Clock:  .genClockUpdate(true),
	}
	.skipClockPush.Store(false)

	return nil
}

func ( *Server) (
	 *rpc2.Client,  *ArgsMut,  *Empty,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.mutMx.Lock()
	defer .mutMx.Unlock()

	// validate
	if .States == nil {
		return ErrInvalidParams
	}

	// execute
	.skipClockPush.Store(true)
	_ = .Source.Add(amhelp.IndexesToStates(.Source.StateNames(), .States),
		.Args)
	.skipClockPush.Store(false)

	return nil
}

func ( *Server) (
	 *rpc2.Client,  *ArgsMut,  *RespResult,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.mutMx.Lock()
	defer .mutMx.Unlock()

	// validate
	if .States == nil {
		return ErrInvalidParams
	}

	// execute
	.skipClockPush.Store(true)
	 := .Source.Remove(amhelp.IndexesToStates(.Source.StateNames(),
		.States), .Args)
	.skipClockPush.Store(false)

	// return
	* = RespResult{
		Result: ,
		Clock:  .genClockUpdate(true),
	}
	return nil
}

func ( *Server) (
	 *rpc2.Client,  *ArgsMut,  *RespResult,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.mutMx.Lock()
	defer .mutMx.Unlock()

	// validate
	if .States == nil {
		return ErrInvalidParams
	}

	// execute
	.skipClockPush.Store(true)
	 := .Source.Set(amhelp.IndexesToStates(.Source.StateNames(),
		.States), .Args)
	.skipClockPush.Store(false)

	// return
	* = RespResult{
		Result: ,
		Clock:  .genClockUpdate(true),
	}
	return nil
}

func ( *Server) (
	 *rpc2.Client,  uint64,  *RespSync,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.log("RemoteSync")

	if .Source.Time(nil).Sum(nil) >  {
		* = RespSync{
			Time:      .Source.Time(nil),
			QueueTick: .Source.QueueTick(),
		}
	} else {
		* = RespSync{}
	}

	.log("RemoteSync: %v", .Time)

	return nil
}

// RemoteBye means the client says goodbye and will disconnect shortly.
func ( *Server) (
	 *rpc2.Client,  *Empty,  *Empty,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.log("RemoteBye")

	.Mach.Remove1(ssS.ClientConnected, Pass(&A{
		Addr: .Addr,
	}))
	go func() {
		select {
		case <-time.After(100 * time.Millisecond):
			.log("rpc.Close timeout")
		case <-amhelp.ExecAndClose(func() {
			if  := .rpcClient.Load();  != nil {
				_ = .Close()
			}
		}):
			.log("rpc.Close")
		}

		time.Sleep(100 * time.Millisecond)
		.Mach.Remove1(ssS.HandshakeDone, nil)
	}()

	// forget client
	.rpcClient.Store(nil)
	.clientId.Store(nil)

	return nil
}

func ( *Server) (
	 *rpc2.Client,  bool,  *Empty,
) error {
	if .Mach.Not1(ssS.Start) {
		return am.ErrCanceled
	}
	.log("RemoteSetPushAllTicks")

	.PushAllTicks = 

	return nil
}

// ///// ///// /////

// ///// BINDINGS

// ///// ///// /////

// BindServer binds RpcReady and ClientConnected with Add/Remove, to custom
// states.
func (,  *am.Machine, ,  string) error {
	if  == "" ||  == "" {
		return fmt.Errorf("rpcReady and clientConn must be set")
	}

	 := &struct {
		 am.HandlerFinal
		   am.HandlerFinal

		 am.HandlerFinal
		   am.HandlerFinal
	}{
		: ampipe.Add(, , ssS.RpcReady, ),
		:   ampipe.Remove(, , ssS.RpcReady, ),

		: ampipe.Add(, , ssS.ClientConnected,
			),
		: ampipe.Remove(, , ssS.ClientConnected,
			),
	}

	return .BindHandlers()
}

// BindServerMulti binds RpcReady, ClientConnected, and ClientDisconnected.
// RpcReady is Add/Remove, other two are Add-only to passed multi states.
func (
	,  *am.Machine, , ,  string,
) error {
	if  == "" ||  == "" ||  == "" {
		return fmt.Errorf("rpcReady, clientConn, and clientDisconn must be set")
	}

	 := &struct {
		 am.HandlerFinal
		   am.HandlerFinal

		 am.HandlerFinal
		   am.HandlerFinal
	}{
		: ampipe.Add(, , ssS.RpcReady, ),
		:   ampipe.Remove(, , ssS.RpcReady, ),

		: ampipe.Add(, ,
			ssS.ClientConnected, ),
		: ampipe.Add(, ,
			ssS.ClientConnected, ),
	}

	return .BindHandlers()
}

// BindServerRpcReady bind RpcReady using Add to a custom multi state.
func (,  *am.Machine,  string) error {
	 := &struct {
		 am.HandlerFinal
	}{
		: ampipe.Add(, , ssS.RpcReady, ),
	}

	return .BindHandlers()
}

// ///// ///// /////

// ///// MISC

// ///// ///// /////

type ServerOpts struct {
	// PayloadState is a state for the server to listen on, to deliver payloads
	// to the client. The client activates this state to request a payload from
	// the worker. Default: am/rpc/states/WorkerStates.SendPayload.
	PayloadState string
	// Parent is a parent state machine for a new Server state machine. See
	// [am.Opts].
	Parent am.Api
}

type SendPayloadHandlers struct {
	SendPayloadState am.HandlerFinal
}

// getSendPayloadState returns a handler (usually SendPayloadState), that will
// deliver a payload to the RPC client. The resulting function can be bound in
// anon handlers.
func getSendPayloadState( *Server,  string) am.HandlerFinal {
	return func( *am.Event) {
		// self-remove
		.Machine().EvRemove1(, , nil)

		 := .Mach.NewStateCtx(ssS.Start)
		 := ParseArgs(.Args)
		 := &A{Name: .Name}

		// side-effect error handling
		if .Payload == nil || .Name == "" {
			 := fmt.Errorf("invalid payload args [name, payload]")
			.Machine().EvAddErrState(, ssW.ErrSendPayload, , Pass())

			return
		}

		// unblock and forward to the client
		go func() {
			// timeout context
			,  := context.WithTimeout(, .DeliveryTimeout)
			defer ()

			 := .SendPayload(, , .Payload)
			if  != nil {
				.Machine().EvAddErrState(, ssW.ErrSendPayload, , Pass())
			}
		}()
	}
}

// createSendPayloadHandlers creates SendPayload handlers for a custom (dynamic)
// state name. Useful when binding >1 RPC server into the same state source.
func createSendPayloadHandlers( *Server,  string) any {
	 := getSendPayloadState(, )

	// define a struct with the handler
	 := reflect.StructOf([]reflect.StructField{
		{
			Name:  + "State",
			Type: reflect.TypeOf(),
		},
	})

	// new instance and set handler
	 := reflect.New().Elem()
	.Field(0).Set(reflect.ValueOf())
	 := .Addr().Interface()

	return 
}