package pubsub

import (
	
	
	
	
	

	pubsub 
	

	amhelp 
	am 
	
	
)

const (
	// EnvAmPubsubLog enables machine logging for pubsub.
	EnvAmPubsubLog = "AM_PUBSUB_LOG"
	// EnvAmPubsubDbg exposes PubSub workers over dbg telemetry.
	EnvAmPubsubDbg = "AM_PUBSUB_DBG"
)

var ss = states.TopicStates

// ///// ///// /////

// ///// ARGS

// ///// ///// /////
// TODO keep with states

// PARTIALS

type MsgType string

const (
	MsgTypeUpdates    MsgType = "updates"
	MsgTypeBye        MsgType = "bye"
	MsgTypeInfo       MsgType = "info"
	MsgTypeReqInfo    MsgType = "reqinfo"
	MsgTypeReqUpdates MsgType = "requpdates"
	MsgTypeGossip     MsgType = "gossip"
)

type Msg struct {
	Type MsgType
}

type (
	Gossips map[int]uint64
	// PeerGossips is peer ID => machine time sum
	// TODO check local time sum
	PeerGossips map[string]Gossips
)

// Info is sent when a peer exposes state machines in the topic.
type Info struct {
	Id     string
	Schema am.Schema
	States am.S
	MTime  am.Time
	Tags   []string
	Parent string
}

type (
	MachInfo map[int]*Info
	PeerInfo map[string]MachInfo
)

// Machine clocks indexed by a local peer index.
type (
	MachClocks map[int]am.Time
	PeerClocks map[string]MachClocks
)

// WRAPPERS

// MsgByeMach is sent when a peer un-exposes state machines in the topic.
type MsgByeMach struct {
	PeerBye bool
	Id      string
	MTime   am.Time
}

type MsgInfo struct {
	Msg
	// peer ID => mach idx => schema
	PeerInfo PeerInfo
	// Number of workers from each peer
	PeerGossips PeerGossips
}

type MsgUpdates struct {
	Msg
	PeerClocks PeerClocks
	// TODO removed machs
}

type MsgGossip struct {
	Msg
	// Number of workers from each peer
	PeerGossips PeerGossips
}

type MsgReqUpdates struct {
	Msg
	// TODO list specific mach indexes
	PeerIds []string
}

type MsgReqInfo struct {
	Msg
	PeerIds []string
}

// TODO implement
type MsgBye struct {
	Msg
	Machs map[int]MsgByeMach
}

// ///// ///// /////

// ///// TRACER

// ///// ///// /////

type Tracer struct {
	*am.TracerNoOp
	// This machine's clock has been updated and needs to be synced.
	dirty atomic.Bool
}

// TransitionEnd sends a message when a transition ends
func ( *Tracer) ( *am.Transition) {
	.dirty.Store(true)
}

type TopicOpts struct {
	// Parent is a parent state machine for a new Topic state machine. See
	// [am.Opts].
	Parent am.Api
}

// ///// ///// /////

// ///// ARGS

// ///// ///// /////

const APrefix = "am_pubsub"

// A is a struct for node arguments. It's a typesafe alternative to [am.A].
type A struct {
	// MsgInfo happens when a peer introduces it's exposed state machines.
	MsgInfo       *MsgInfo
	MsgUpdates    *MsgUpdates
	MsgBye        *MsgBye
	MsgReqInfo    *MsgReqInfo
	MsgReqUpdates *MsgReqUpdates
	MsgGossip     *MsgGossip
	// Msgs is a list of PubSub messages.
	Msgs []*pubsub.Message
	// Length is a general length used for logging
	Length int `log:"length"`
	// Msg is a raw msg.
	Msg []byte
	// PeerId is a peer ID
	Peer    string `log:"peer"`
	PeerId  string
	PeerIds []string
	// MachId is a state machine ID
	MachId string `log:"mach_id"`
	// MTime is machine time
	MTime am.Time `log:"mtime"`
	// HTime is human time
	HTime time.Time
	// Addrs is a list of addresses.
	Addrs []multiaddr.Multiaddr `log:"addrs"`
	// NetMachs is a return channel for a list of [rpc.NetworkMachine]. It has to
	// be buffered or the mutation will fail.
	NetMachs    chan<- []*rpc.NetworkMachine
	ListFilters *ListFilters
	MachClocks  MachClocks
	MsgType     string `log:"msg_type"`
	PeersGossip PeerGossips
}

// TODO merge with REPL, add tag-based queries (to find workers)
type ListFilters struct {
	IdExact   string
	IdPartial string
	IdRegexp  *regexp.Regexp
	Parent    string
	PeerId    string
	// Level to traverse towards the tree root.
	DepthLevel  int
	ChildrenMax int
	ChildrenMin int
}

// ParseArgs extracts A from [am.Event.Args][APrefix].
func ( am.A) *A {
	if ,  := [APrefix].(*A);  != nil {
		return 
	}
	return &A{}
}

// Pass prepares [am.A] from A to pass to further mutations.
func ( *A) am.A {
	return am.A{APrefix: }
}

// LogArgs is an args logger for A and pubsub.A.
func ( am.A) map[string]string {
	 := ParseArgs()
	if  == nil {
		return nil
	}

	return amhelp.ArgsToLogMap(, 0)
}

// ///// ///// /////

// ///// ERRORS

// ///// ///// /////

// sentinel errors

var (
	ErrJoining   = errors.New("error joining")
	ErrListening = errors.New("error listening")
)

// error mutations

// AddErrJoining wraps an error in the ErrJoining sentinel and adds to a
// machine.
func (
	 *am.Event,  *am.Machine,  error,  am.A,
) error {
	 = fmt.Errorf("%w: %w", ErrJoining, )
	.EvAddErrState(, ss.ErrJoining, , )

	return 
}

// AddErrListening wraps an error in the ErrListening sentinel and adds to a
// machine.
func (
	 *am.Event,  *am.Machine,  error,  am.A,
) error {
	 = fmt.Errorf("%w: %w", ErrListening, )
	.EvAddErrState(, ss.ErrListening, , )

	return 
}

// ///// ///// /////

// ///// DISCOVERY

// ///// ///// /////

// discoveryNotifee gets notified when we find a new peer via mDNS discovery
// type discoveryNotifee struct {
// 	h host.Host
// }
//
// // TODO enable
// func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) {
// 	fmt.Printf("discovered new peer %s", pi.ID.MutString())
// 	err := n.h.Connect(context.Background(), pi)
// 	if err != nil {
// 		fmt.Printf("error connecting to peer %s: %s", pi.ID.MutString(), err)
// 	}
// }