package rpc

import (
	
	
	
	
	

	

	amhelp 
	am 
	
)

// MuxNewServerFn is a function to create a new RPC server for each incoming
// connection.
type MuxNewServerFn func(num int, conn net.Conn) (*Server, error)

var ssM = states.MuxStates

// Mux creates a new RPC server for each incoming connection.
type Mux struct {
	*am.ExceptionHandler

	Mach *am.Machine
	// Source is the state source to expose via RPC. Required if NewServerFn
	// isnt provided.
	Source am.Api
	// NewServerFn creates a new instance of Server and is called for every new
	// connection.
	NewServerFn MuxNewServerFn
	// Typed arguments struct value
	Args any

	Name string
	Addr string
	// The listener used by this Mux, can be set manually before Start().
	Listener   net.Listener
	LogEnabled bool
	// The last error returned by NewServerFn.
	NewServerErr error
	Opts         MuxOpts

	clients   []net.Conn
	cmux      cmux.CMux
	connCount atomic.Int64
}

// NewMux initializes a Mux instance to handle RPC server creation for incoming
// connections with the given parameters.
//
// newServerFn: when nil, [Mux.Source] needs to be set manually before calling
// [Mux.Start].
func (
	 context.Context,  string,  MuxNewServerFn,  *MuxOpts,
) (*Mux, error) {
	if  == nil {
		 = &MuxOpts{}
	}
	 := &Mux{
		Name:        ,
		LogEnabled:  os.Getenv(EnvAmRpcLogMux) != "",
		NewServerFn: ,
		Args:        .Args,
		Opts:        *,
	}

	,  := am.NewCommon(, "rm-"+, states.MuxSchema, ssM.Names(),
		, .Parent, &am.Opts{Tags: []string{"rpc-mux"}})
	if  != nil {
		return nil, 
	}
	.SemLogger().SetArgsMapper(LogArgs)
	.SetGroups(states.MuxGroups, ssC)
	.Mach = 
	// optional env debug
	if os.Getenv(EnvAmRpcDbg) != "" {
		_ = amhelp.MachDebugEnv()
	}

	return , nil
}

// ///// ///// /////

// ///// HANDLERS

// ///// ///// /////

func ( *Mux) ( *am.Event) {
	.ExceptionHandler.ExceptionState()
	// TODO restart depending on Start, err, and backoff
	// errors.Is(err, cmux.ErrListenerClosed)
	// errors.Is(err, cmux.ErrServerClosed)
}

func ( *Mux) ( *am.Event) bool {
	 := ParseArgs(.Args)
	return  != nil && .Err != nil
}

func ( *Mux) ( *am.Event) {
	 := ParseArgs(.Args)
	.NewServerErr = .Err
}

func ( *Mux) ( *am.Event) bool {
	return .NewServerFn != nil || .Source != nil
}

func ( *Mux) ( *am.Event) {
	 := .Mach.NewStateCtx(ssM.Start)
	 := .Addr
	 := .Mach

	// TODO websocket srv, RpcMuxState

	// unblock
	go func() {
		if .Err() != nil {
			return // expired
		}

		// create a listener if not provided TODO websockets
		if .Listener == nil {
			// use Start as the context
			 := net.ListenConfig{}
			,  := .Listen(, "tcp4", )
			if  != nil {
				// add err to mach
				AddErrNetwork(, , )
				// add outcome to mach
				.EvRemove1(, ssM.Start, nil)

				return
			}

			.Listener = 
		}

		// Create a new cmux instance.
		.cmux = cmux.New(.Listener)

		// update Addr from listener (support for external and :0)
		.Addr = .Listener.Addr().String()
		.log("mux started on %s", .Addr)

		// fork
		 := .cmux.Match(cmux.Any())
		go .accept(, )

		// TODO healthcheck loop

		// start cmux
		if  := .cmux.Serve();  != nil {
			.AddErr(, nil)
			.EvRemove1(, ssM.Start, nil)
		}
	}()
}

func ( *Mux) ( *am.Event) {
	if .Listener != nil {
		_ = .Listener.Close()
		.Listener = nil
	}
}

func ( *Mux) ( *am.Event) {
	.Mach.EvRemove1(, ssM.ClientConnected, nil)
}

func ( *Mux) ( *am.Event) bool {
	return len(.clients) == 0
}

func ( *Mux) ( *am.Event) {
	// TODO remove stale clients
}

// ///// ///// /////

// ///// METHODS

// ///// ///// /////

// TODO RpcAcceptingState
func ( *Mux) ( *am.Event,  net.Listener) {
	 := .Mach
	defer .PanicToErr(nil)

	go .Mach.Add1(ssM.Ready, Pass(&A{
		Addr: .Addr().String(),
	}))

	for {
		// TODO handle ErrListenerClosed and ErrServerClosed
		,  := .Accept()
		if  != nil {

			.AddErr(, nil)
			continue
		}
		.Mach.Add1(ssM.ClientConnected, Pass(&A{
			Addr: .RemoteAddr().String(),
		}))

		// get a new conn number
		var  int64 = -1
		for {
			 = .connCount.Load()
			if .connCount.CompareAndSwap(, +1) {
				break
			}
		}

		// new instance
		var  *Server
		if .NewServerFn == nil {
			,  = NewServer(.Mach.Context(), ":0",
				.Name+"-"+strconv.Itoa(int()), .Source, &ServerOpts{
					Parent:   .Mach,
					Args:     .Args,
					ParseRpc: .Opts.ParseRpc,
				})
		} else {
			,  = .NewServerFn(int(), )
		}
		// TODO return this err to the RPC client
		if  != nil {
			_ = .Close()
			.Log("failed to create a new server: %s", )
			continue
		}

		// inject net.Conn
		.Conn = 
		.Start()

		// TODO optimize: re-use old instances?
		// TODO handle with a state, not a goroutine
		go func() {
			// dispose on disconnect
			 := .Mach.NewStateCtx(ssM.Start)
			<-.Mach.When1(ssS.ClientConnected, )
			<-.Mach.WhenNot1(ssS.ClientConnected, )
			.Stop(, true)
		}()
	}
}

func ( *Mux) ( *am.Event) am.Result {
	return .Mach.EvAdd1(, ssM.Start, nil)
}

func ( *Mux) ( *am.Event,  bool) am.Result {
	 := .Mach.EvRemove1(, ssM.Start, nil)
	if  {
		.Mach.Dispose()
	}

	return 
}

func ( *Mux) ( string,  ...any) {
	if !.LogEnabled {
		return
	}
	.Mach.Log(, ...)
}

// ///// ///// /////

// ///// MISC

// ///// ///// /////

type MuxOpts struct {
	// Parent is a parent state machine for a new Mux state machine. See
	// [am.Opts].
	Parent am.Api
	// Typed arguments struct value
	Args any
	// optional RPC args parser
	ParseRpc func(args am.A) am.A

	// Listen on a WebSocket connection instead of TCP.
	// WebSocket bool
	// // HTTP URL without proto to tunnel the TCP listen over a WebSocket conn.
	// // See WsListenPath.
	// WebSocketTunnel string
}

// WEBSOCKET

// type wsHandlerMux struct {
// 	m     *Mux
// 	event *am.Event
// }
//
// // ServeHTTP continues [Server.RpcStartingState].
// func (h *wsHandlerMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 	mach := h.m.Mach
//
// 	connWs, err := websocket.Accept(w, r, &websocket.AcceptOptions{
// 		// TODO security
// 		InsecureSkipVerify: true,
// 	})
// 	if err != nil {
// 		log.Printf("Upgrade error: %v", err)
// 		return
// 	}
// 	conn := websocket.NetConn(mach.Context(), connWs, websocket.MessageBinary)
//
// 	// TODO RpcAcceptingState
// 	// next and stay alive
// 	mach.EvAdd1(h.event, ssM.RpcAccepting, nil)
// 	<-mach.WhenNot1(ss.Start, nil)
// 	print()
// }