package pubsub

import (
	
	
	
	
	
	
	
	
	
	
	

	
	ps 
	
	
	
	
	quicTransport 
	ma 
	
	

	
	amhelp 
	am 
	
	udsTransport 
	
	ssam 
)

// Topic is a single-topic PubSub based on lib2p-pubsub with manual discovery.
// Each peer can have many exposed state machines and broadcasts their clock
// changes on the channel, introduces them when joining and via private messages
// to newly joined peers.
// TODO optimizations:
//   - avoid txs which will get cancelled (CPU)
//   - rate limit the TOTAL amount of msgs in the network (CPU, network)
//   - hook into libp2p errors, eg queue full, and act accordingly
type Topic struct {
	*am.ExceptionHandler

	// T represents the PubSub topic used for communication.
	T *ps.Topic
	// Mach is a state machine for this PubSub.
	Mach        *am.Machine
	MachMetrics atomic.Pointer[am.Machine]
	HostToPeer  map[string]string
	// ListenAddrs contains the list of multiaddresses that this topic listens on.
	// None will allocate one automatically.
	ListenAddrs []ma.Multiaddr
	// Addrs is a list of addresses for this peer.
	Addrs atomic.Pointer[[]ma.Multiaddr]
	// Name indicates the name of the channel or topic instance,
	// typically associated with a given process.
	Name string
	// ConnAddrs contains a list of addresses for initial
	// connections to discovery nodes in the network.
	ConnAddrs []ma.Multiaddr
	// List of exposed state machines, index => mach_id, indexes are used on the
	// channel. Indexes should NOT change, as they are used for addressing. `nil`
	// value means empty.
	ExposedMachs []*am.Machine
	// Debounce for clock broadcasts (separate per each exposed state machine).
	Debounce   time.Time
	LogEnabled bool
	// HeartbeatFreq broadcasts changed clocks of exposed machines.
	HeartbeatFreq time.Duration
	// Maximum msgs per minute sent to the network. Does not include MsgInfo,
	// which are debounced.
	MaxMsgsPerWin int
	// DebugWorkerTelemetry exposes local workers to am-dbg
	DebugWorkerTelemetry bool
	ConnectionsToReady   int
	// all amounts and delayes are multiplied by this factor
	Multiplayer        int
	SendInfoDebounceMs int
	// Max allowed queue length to send MsgInfo to newly joned peers, as well as
	// received msgs.
	MaxQueueLen uint16
	// Number of gossips to send in SendGossipsState
	GossipAmount int

	// Use this hardcoded schema instead of exchanging real ones. Limits all the
	// workers to this one.
	// TODO catalog of named schemas, exchange unknown ones
	TestSchema am.Schema
	TestStates am.S

	// THIS PEER

	// host represents the local libp2p host used for handling
	// peer-to-peer connections.
	host host.Host
	// ps is the underlying Gossipsub instance, used for managing
	// PubSub operations like subscriptions and publishing.
	ps      *ps.PubSub
	sub     *ps.Subscription
	handler *ps.TopicEventHandler

	// OTHER PEERS

	// workers is a list of machines from other peers, indexed by peer ID and
	// [Info.Index1]. Use [states.TopicStatesDef.ListMachines] to list
	// them.
	workers map[string]map[int]*rpc.Worker
	info    map[string]map[int]*Info
	pool    *errgroup.Group
	// gossips about local workers heard from other peers
	missingUpdates PeerGossips
	// tracers attached to ExposedMachs.
	tracers []*Tracer
	// clock updates which arrived before MsgInfo TODO GC
	pendingMachUpdates map[string]map[int]am.Time
	// TODO verify number of exposed workers
	missingPeers  map[string]struct{}
	lastReqHello  time.Time
	lastReqUpdate time.Time
	// last MsgInfo request seen per peer
	reqInfo map[string]time.Time
	// last MsgUpdates request seen per peer
	reqUpdate map[string]time.Time
	// TODO proper retry
	retried bool
	// current messages time window (10s).
	msgsWin int
	// msgs send in the current messages time window
	msgsWinCount int
	// use Unix Domain Sockets as a localhost-only transport
	transportUds bool
}

func (
	 context.Context, ,  string,  []*am.Machine,
	 *TopicOpts,
) (*Topic, error) {
	if  == nil {
		 = &TopicOpts{}
	}

	 := &Topic{
		Multiplayer:          5,
		MaxQueueLen:          10,
		GossipAmount:         5,
		Name:                 ,
		ExposedMachs:         ,
		LogEnabled:           os.Getenv(EnvAmPubsubLog) != "",
		DebugWorkerTelemetry: os.Getenv(EnvAmPubsubDbg) != "",
		HeartbeatFreq:        time.Second,
		MaxMsgsPerWin:        10,
		ConnectionsToReady:   5,
		SendInfoDebounceMs:   500,

		tracers: make([]*Tracer, len()),
		workers: make(map[string]map[int]*rpc.Worker),
		// TODO config
		pool:               amhelp.Pool(10),
		info:               make(map[string]map[int]*Info),
		pendingMachUpdates: make(map[string]map[int]am.Time),
		missingPeers:       make(map[string]struct{}),
		missingUpdates:     make(PeerGossips),
		reqInfo:            make(map[string]time.Time),
		reqUpdate:          make(map[string]time.Time),
	}

	// attach tracers
	for ,  := range  {
		 := &Tracer{}
		 := .BindTracer()
		if  != nil {
			return nil, 
		}
		.tracers[] = 
	}

	if  == "" {
		 = utils.RandId(2)
	}

	,  := am.NewCommon(, "ps-"++"-"+, states.TopicSchema,
		ss.Names(), , .Parent, &am.Opts{
			Tags: []string{"pubsub:" + },

			// TODO DEBUG
			// HandlerTimeout:  time.Minute,
			// HandlerDeadline: 10 * time.Minute,
			// QueueLimit:     10,
		})
	if  != nil {
		return nil, 
	}

	.SemLogger().SetArgsMapper(LogArgs)
	.SetGroups(states.TopicGroups, states.TopicStates)
	.Mach = 

	return , nil
}

// ///// ///// /////

// ///// HANDLERS

// ///// ///// /////

func ( *Topic) ( *am.Event) bool {
	 := am.ParseArgs(.Args).Err

	// ignore ErrEvalTimeout
	return !errors.Is(, am.ErrEvalTimeout)
}

func ( *Topic) ( *am.Event) {
	// super
	.ExceptionHandler.ExceptionState()
	 := am.ParseArgs(.Args)
	 := .Err
	 := .TargetStates

	// retry JoiningState timeouts (once)
	if errors.Is(, am.ErrHandlerTimeout) &&
		slices.Contains(, ss.ErrJoining) && !.retried {
		.retried = true
		.Mach.EvRemove1(, ss.Exception, nil)
	}
}

// TODO exit
func ( *Topic) ( *am.Event) bool {
	return .ConnCount() >= .ConnectionsToReady
}

func ( *Topic) ( *am.Event) bool {
	return len(.ListenAddrs) > 0
}

func ( *Topic) ( *am.Event) {
	 := .Mach.NewStateCtx(ss.Start)

	go func() {
		if .Err() != nil {
			return // expired
		}
		.Mach.PanicToErrState(ss.ErrJoining, nil)

		// new libp2p host
		var  libp2p.Option
		 := libp2p.Transport(quicTransport.NewTransport)
		if .transportUds {
			 = libp2p.Transport(udsTransport.NewUDSTransport)
			 = libp2p.NoSecurity
		} else {
			// TODO cache
			, ,  := crypto.GenerateKeyPair(crypto.Ed25519, 0)
			if  != nil {
				_ = AddErrJoining(, .Mach, , nil)
				return
			}
			 = libp2p.Identity()
		}

		,  := libp2p.New(,
			// libp2p.Transport(tcpTransport.NewTCPTransport),
			// libp2p.Transport(webtransport.New),
			// libp2p.Transport(webrtc.New),
			libp2p.ListenAddrs(.ListenAddrs...),
			// TODO debug
			// libp2p.ResourceManager(&network.NullResourceManager{}),
			,
		)

		if .Err() != nil {
			return // expired
		}
		if  != nil {
			_ = AddErrJoining(, .Mach, , nil)
			return
		}
		.host = 

		// TODO late tags, init earlier and append tags
		 := .ID().String()
		 := .Mach.Tags()
		 = append(, "peer:"+)
		.Mach.SetTags()
		amhelp.MachDebugEnv(.Mach)

		// resources := relayv2.DefaultResources()
		// resources.MaxReservations = 256
		// _, err = relayv2.New(t.host, relayv2.WithResources(resources))
		// if err != nil {
		// 	_ = AddErrJoining(e, t.Mach, err, nil)
		// 	return
		// }

		// new gossipsub
		,  := ps.NewGossipSub(, )
		if  != nil {
			_ = AddErrJoining(, .Mach, , nil)
			return
		}
		.ps = 

		// setup mDNS discovery to find local peers
		// TODO doesnt trigger any discoveries
		// s := mdns.NewMdnsService(t.host, "am-"+t.Name,
		//   &discoveryNotifee{h: t.host})
		// err = s.Start()
		// if err != nil {
		// 	_ = AddErrJoining(e, t.Mach, err, nil)
		// 	return
		// }

		// address
		,  := .GetPeerAddrs()
		if  != nil {
			_ = AddErrJoining(, .Mach, , nil)
			return
		}
		.Addrs.Store(&)
		.Mach.Log("Listening on %s", )
		.Mach.EvAdd1(, ss.Connecting, nil)

		// mark as completed
		.Mach.EvAdd1(, ss.Started, Pass(&A{
			PeerId: ,
			Peer:   .peerName(),
		}))

		// start Heartbeat
		if .HeartbeatFreq == 0 {
			return
		}
		 := time.NewTicker(.HeartbeatFreq * time.Duration(.Multiplayer))
		defer .Stop()
		for {
			select {
			case <-.Done():
				return // expired
			case <-.C:
				.Mach.EvRemove1(, ss.Heartbeat, nil)
				.Mach.EvAdd1(, ss.Heartbeat, nil)
			}
		}
	}()
}

func ( *Topic) ( *am.Event) {
	if .host != nil {
		.host.Close()
	}
	.host = nil
	.ps = nil
}

func ( *Topic) ( *am.Event) bool {
	return len(.ConnAddrs) > 0
}

func ( *Topic) ( *am.Event) {
	 := .Mach.NewStateCtx(ss.Connecting)

	// TODO
	// t.host.Peerstore().AddAddrs()
	// t.ConnAddrs[0].Equal()

	go func() {
		if .Err() != nil {
			return // expired
		}

		// errgroup with conc limit
		 := errgroup.Group{}
		// TODO config
		.SetLimit(10)

		// try all the addrs
		for ,  := range .ConnAddrs {
			.Go(func() error {
				// stop if state ctx expired
				if .Err() != nil {
					return .Err()
				}

				// extract infos and connect
				,  := peer.AddrInfosFromP2pAddrs()
				if  != nil {
					 := fmt.Errorf("%w: %s", , )
					_ = ssam.AddErrConnecting(, .Mach, , nil)
					// dont stop, no err
					return nil
				}
				for ,  := range  {
					.log("Trying %s", )
					 := .host.Connect(, )
					if  == nil {
						return nil // connected
					}
				}

				// dont stop, no err
				return nil
			})
		}

		// block
		_ = .Wait()
		if .Err() != nil {
			return // expired
		}

		// check if successful
		if .ConnCount() <= 0 {
			 := errors.New("failed to establish any connections")
			_ = ssam.AddErrConnecting(, .Mach, , nil)
			return
		}

		// next
		.Mach.EvAdd1(, ss.Connected, nil)
	}()
}

// func (t *Topic) DisconnectingStart(e *am.Event) {
// }

func ( *Topic) ( *am.Event) {
	 := .Mach.NewStateCtx(ss.Joining)

	go func() {
		if .Err() != nil {
			return // expired
		}

		// join topic
		,  := .ps.Join(.Name)
		if .Err() != nil {
			return // expired
		}
		if  != nil {
			_ = AddErrJoining(, .Mach, , nil)
			return
		}
		.T = 

		// msg subscription
		// TODO config
		,  := .T.Subscribe(ps.WithBufferSize(1024))
		if .Err() != nil {
			return // expired
		}
		if  != nil {
			_ = AddErrJoining(, .Mach, , nil)
			return
		}
		.sub = 

		// next
		.Mach.EvAdd1(, ss.Joined, nil)
	}()
}

func ( *Topic) ( *am.Event) {
	 := .Mach
	for ,  := range ParseArgs(.Args).Msgs {
		if  == nil {
			continue
		}
		 := .GetFrom().String()

		var  Msg
		// TODO custom err state
		if  := msgpack.Unmarshal(.Data, &);  != nil {
			// generic msg
			.Add1(ss.MsgReceived, Pass(&A{
				Msgs:   []*ps.Message{},
				Length: 1,
			}))
		} else {
			switch .Type {

			case MsgTypeInfo:
				var  MsgInfo
				if  := msgpack.Unmarshal(.Data, &);  == nil {
					// handle schemas
					.Add1(ss.MsgInfo, Pass(&A{
						MsgInfo: &,
						PeerId:  ,
						Peer:    .peerName(),
					}))
					// handle gossips TODO handle in MsgInfo
					amhelp.AskAdd1(, ss.MissPeersByGossip, Pass(&A{
						PeersGossip: .PeerGossips,
						PeerId:      ,
						Peer:        .peerName(),
					}))
					amhelp.AskAdd1(, ss.MissUpdatesByGossip, Pass(&A{
						PeersGossip: .PeerGossips,
						PeerId:      ,
					}))
				} else {
					.EvAddErr(, , nil)
				}

			case MsgTypeBye:
				var  MsgBye
				if  := msgpack.Unmarshal(.Data, &);  == nil {
					.Add1(ss.MsgBye, Pass(&A{
						MsgBye: &,
						PeerId: ,
					}))
				}

			case MsgTypeUpdates:
				var  MsgUpdates
				if  := msgpack.Unmarshal(.Data, &);  == nil {
					.Add1(ss.MsgUpdates, Pass(&A{
						MsgUpdates: &,
						PeerId:     ,
					}))
				} else {
					.EvAddErr(, , nil)
				}

			case MsgTypeGossip:
				var  MsgGossip
				if  := msgpack.Unmarshal(.Data, &);  == nil {
					// TODO handle both in MissPeersState
					amhelp.AskAdd1(, ss.MissPeersByGossip, Pass(&A{
						PeersGossip: .PeerGossips,
						PeerId:      ,
					}))
					amhelp.AskAdd1(, ss.MissUpdatesByGossip, Pass(&A{
						PeersGossip: .PeerGossips,
						PeerId:      ,
					}))
				} else {
					.EvAddErr(, , nil)
				}

			// requests

			case MsgTypeReqInfo:
				var  MsgReqInfo
				if  := msgpack.Unmarshal(.Data, &);  == nil {
					amhelp.AskAdd1(, ss.MsgReqInfo, Pass(&A{
						MsgReqInfo: &,
						PeerId:     ,
						Peer:       .peerName(),
						HTime:      time.Now(),
					}))
				} else {
					.EvAddErr(, , nil)
				}

			case MsgTypeReqUpdates:
				var  MsgReqUpdates
				if  := msgpack.Unmarshal(.Data, &);  == nil {
					amhelp.AskAdd1(, ss.MsgReqUpdates, Pass(&A{
						MsgReqUpdates: &,
						PeerId:        ,
						Peer:          .peerName(),
					}))
				} else {
					.EvAddErr(, , nil)
				}

			default:
				// Log an error for unsupported message types
				 := fmt.Errorf("unsupported msg type: %s", .Type)
				_ = AddErrListening(nil, , , nil)
			}
		}
	}
}

func ( *Topic) ( *am.Event) {
	 := .Mach
	 := .NewStateCtx(ss.Joined)
	// TODO SemLogger().JoinTopic(t.Name)
	 := .host.ID().String()

	.retried = false

	if !.IsValid() {
		return
	}

	// TODO push out also on heartbeat (requires sub.Next == chan)
	// TODO config
	 := 50
	 := make([]*ps.Message, )
	 := sync.Mutex{}
	 := 0

	// receive msgs TODO extract
	go func() {
		for {
			,  := .sub.Next()
			if .Err() != nil {
				return // expired
			}
			if  != nil {
				_ = AddErrListening(, , , nil)
				continue
			}
			// no self msgs
			 := .GetFrom().String()
			if  ==  {
				continue
			}

			// drop msgs above threshold
			// TODO config
			if .QueueLen() > .MaxQueueLen {
				continue
			}

			// stash for now
			.Lock()
			if  <  {
				[] = 
				++
				.Unlock()
				continue
			}

			// flush
			.Add1(ss.ProcessMsgs, Pass(&A{
				Msgs:   ,
				Length: ,
			}))
			 = make([]*ps.Message, )
			 = 0
			.Unlock()
		}
	}()

	// periodic flush TODO flush on heartbeat
	go func() {
		if .Err() != nil {
			return // expired
		}

		 := time.NewTicker(1 * time.Second)
		defer .Stop()
		for {
			select {
			case <-.Done():
				// exit
			case <-.C:
				// flush
				.Lock()
				if  > 0 {
					.Add1(ss.ProcessMsgs, Pass(&A{
						Msgs:   ,
						Length: ,
					}))
					 = make([]*ps.Message, )
					 = 0
				}
				.Unlock()
			}
		}
	}()

	// topic events TODO extract
	go func() {
		if .Err() != nil {
			return // expired
		}

		// peer subscription
		,  := .T.EventHandler()
		if  != nil {
			_ = AddErrJoining(nil, , , nil)
			return
		}
		.handler = 

		for {
			,  := .handler.NextPeerEvent()
			if .Err() != nil {
				return // expired
			}
			if  != nil {
				_ = AddErrListening(nil, , , nil)
				continue
			}

			 := .Peer.String()
			switch .Type {
			case ps.PeerJoin:
				.Add1(ss.PeerJoined, Pass(&A{
					PeerId: ,
					Peer:   .peerName(),
				}))
			case ps.PeerLeave:
				.Add1(ss.PeerLeft, Pass(&A{
					PeerId: ,
					Peer:   .peerName(),
				}))
			}
		}
	}()

	if len(.ExposedMachs) > 0 {
		go func() {
			// TODO config HelloDelay
			 := 1
			 := 30
			 :=  + rand.Intn(-)
			if !amhelp.Wait(, time.Duration(*100*int(time.Millisecond))) {
				return // expired
			}

			// send Hello
			.Add1(ss.SendInfo, Pass(&A{
				PeerIds: []string{.host.ID().String()},
			}))
		}()
	}
}

func ( *Topic) ( *am.Event) {
	// TODO SemLogger().LeaveTopic(t.Name)
	if .handler != nil {
		.handler.Cancel()
	}
}

func ( *Topic) ( *am.Event) bool {
	return ParseArgs(.Args).PeerId != ""
}

// EVENTS

// func (t *Topic) PeerLeftState(e *am.Event) {
// 	// TODO direct all owned local workers to MachLeft
// }

func ( *Topic) ( *am.Event) bool {
	 := ParseArgs(.Args)
	if  == nil || .MsgInfo == nil || .PeerId == "" {
		return false
	}

	for ,  := range .MsgInfo.PeerInfo {
		// missing always pass
		if ,  := .missingPeers[];  {
			return true
		}
		if ,  := .workers[]; ! {
			return true
		}

		// check if theres a new one
		for  := range  {
			if ,  := .workers[][]; ! {
				.log("Missing mach %d for peer %s", , )
				return true
			}
		}
	}

	return false
}

func ( *Topic) ( *am.Event) {
	 := .Mach.NewStateCtx(ss.Start)
	 := ParseArgs(.Args)
	 := 0
	 := .host.ID().String()

	for ,  := range .MsgInfo.PeerInfo {
		// skip self info
		if  ==  {
			continue
		}

		// init peer
		if ,  := .workers[]; ! {
			.workers[] = make(map[int]*rpc.Worker)
			.info[] = make(map[int]*Info)
		}

		// remove from missing
		if ,  := .missingPeers[];  {
			delete(.missingPeers, )
			.Mach.EvRemove1(, ss.MissPeersByGossip, nil)
		}

		.metric("Info", )
		.metric("Peer", "")
		// create local workers
		for ,  := range  {
			// check already exists
			if ,  := .workers[][];  {
				// update to the newest clock
				if .MTime.Sum(nil) > .Time(nil).Sum(nil) {
					.InternalUpdateClock(.MTime, 0, true)
				}
				// TODO check schema?
				continue
			}

			// compose a unique ID
			 := []string{"pubsub-worker", "src-id:" + .Id}
			 := "ps-" + .Id + "-" + utils.RandId(4)
			// mach schema
			 := .Schema
			if .TestSchema != nil {
				 = .TestSchema
			}
			 := .States
			if .TestStates != nil {
				 = .TestStates
			}
			,  := rpc.NewWorker(, , nil, , , .Mach, )
			if  != nil {
				// TODO custom error?
				.Mach.EvAddErr(, , nil)
				continue
			}
			if .DebugWorkerTelemetry {
				// TODO DEBUG
				if .peerName() == "P5" {
					amhelp.MachDebugEnv()
				}
			}

			// check for ahead-of-time MsgUpdates
			// add ctx to go-s
			if ,  := .pendingMachUpdates[][];  {
				.InternalUpdateClock(, 0, true)
				// GC
				delete(.pendingMachUpdates[], )
				if len(.pendingMachUpdates[]) == 0 {
					delete(.pendingMachUpdates, )
				}
				.Mach.EvRemove1(, ss.MissPeersByUpdates, nil)

			} else {
				.InternalUpdateClock(.MTime, 0, true)
			}

			.workers[][] = 
			// save the info for re-broadcast
			.info[][] = 
			++
		}
	}

	// confirm we know the sender
	 := .PeerId
	if ,  := .workers[]; ! {
		.missingPeers[] = struct{}{}
		.metric("Gossip", )
	}

	if  > 0 {
		.log("Added %d new workers (total %d)", , .workersCount())
		.log("Known peers == %d (missing == %d)",
			len(.workers), len(.missingPeers))

		for  := range .missingPeers {
			 := .peerName()
			if  == "" {
				 = 
			}
			.log("Missing example: %s", )
			break
		}
	}
}

func ( *Topic) ( *am.Event) bool {
	 := ParseArgs(.Args)
	if  == nil || .PeersGossip == nil {
		return false
	}

	// process peer gossips
	 := .host.ID().String()
	for ,  := range .PeersGossip {
		// skip self
		if  ==  {
			continue
		}
		// already known
		if ,  := .missingPeers[];  {
			continue
		}

		// pass if unknown or the amount of exposed machs differs
		if ,  := .workers[]; ! || len() != len(.workers[]) {
			return true
		}
	}

	return false
}

func ( *Topic) ( *am.Event) {
	 := ParseArgs(.Args)

	// process peer gossips
	 := .host.ID().String()
	for  := range .PeersGossip {
		// skip self
		if  ==  {
			continue
		}
		// already known
		if ,  := .missingPeers[];  {
			continue
		}

		// TODO support `count` from gossip
		// if p, ok := t.workers[peerId]; !ok || len(p) != count {
		if ,  := .workers[]; ! {
			.missingPeers[] = struct{}{}
			 := .peerName()
			if  == "" {
				 = 
			}
			.log("New missing %s", )
			.log("Known: %d; Missing: %d",
				len(.workers), len(.missingPeers))
		}
	}
}

func ( *Topic) ( *am.Event) bool {
	return len(.missingPeers) == 0
}

func ( *Topic) ( *am.Event) bool {
	 := ParseArgs(.Args)
	if  == nil || .PeersGossip == nil {
		return false
	}

	// process peer gossips
	 := .host.ID().String()
	for ,  := range .PeersGossip {
		// skip self
		if  ==  {
			continue
		}
		// skip missing
		if ,  := .workers[]; ! {
			continue
		}

		for ,  := range  {
			,  := .workers[][]
			if ! {
				continue
			}

			// accept when received time is higher
			if  > .Time(nil).Sum(nil) {
				return true
			}
		}
	}

	return false
}

func ( *Topic) ( *am.Event) {
	 := ParseArgs(.Args)

	// process peer gossips
	 := .host.ID().String()
	for ,  := range .PeersGossip {
		// skip self
		if  ==  {
			continue
		}
		// skip missing
		if ,  := .workers[]; ! {
			continue
		}
		// init
		if ,  := .missingUpdates[]; ! {
			.missingUpdates[] = make(Gossips)
		}

		for ,  := range  {
			,  := .workers[][]
			if ! {
				.missingUpdates[][] = 
				.metric("Gossip", )
				continue
			}

			// accept when received time is higher
			if  > .Time(nil).Sum(nil) {
				.missingUpdates[][] = 
				.metric("Gossip", )
			}
		}

		// prevent empty entries
		if len(.missingUpdates[]) == 0 {
			delete(.missingUpdates, )
		}
	}
}

func ( *Topic) ( *am.Event) bool {
	return len(.missingUpdates) == 0
}

func ( *Topic) ( *am.Event) bool {
	return ParseArgs(.Args).PeerId != ""
}

func ( *Topic) ( *am.Event) {
	.Mach.EvRemove1(, ss.PeerJoined, nil)
	 := ParseArgs(.Args)
	 := .PeerId

	// mark as missing
	// TODO add MissPeerByJoin ?
	.missingPeers[] = struct{}{}
	// say hello, but drop msgs above threshold
	// TODO config
	if .Mach.QueueLen() > .MaxQueueLen {
		return
	}
	.Mach.EvAdd1(, ss.SendInfo, Pass(&A{
		PeerIds: []string{.host.ID().String()},
	}))
}

func ( *Topic) ( *am.Event) bool {
	return ParseArgs(.Args).MsgBye != nil
}

// MSGS

// func (t *Topic) MsgByeState(e *am.Event) {
// 	// TODO
// }

func ( *Topic) ( *am.Event) bool {
	return ParseArgs(.Args).Msgs != nil
}

func ( *Topic) ( *am.Event) bool {
	// sec / multiplayer time window
	 := time.Now().Second() / .Multiplayer
	if .msgsWin !=  {
		.msgsWin = 
		.msgsWinCount = 0
	}

	.msgsWinCount++
	if .msgsWinCount > .MaxMsgsPerWin {
		.log("Too many messages in last time window, dropping..")
		return false
	}

	 := ParseArgs(.Args)
	return  != nil && len(.Msg) > 0
}

func ( *Topic) ( *am.Event) {
	 := .Mach.NewStateCtx(ss.Start)
	 := ParseArgs(.Args)
	 := .Msg

	switch .MsgType {
	case string(MsgTypeReqInfo):
		.lastReqHello = time.Now()
	case string(MsgTypeReqUpdates):
		.lastReqUpdate = time.Now()
	}

	// unblock
	if !.IsValid() {
		return
	}
	.pool.Go(func() error {
		if .Err() != nil {
			return nil // expired
		}
		// TODO concurrent write to a map?!
		//  ihave, ok := gs.gossip[p]
		 := .T.Publish(, )
		if .Err() != nil {
			return nil // expired
		}
		.Mach.EvAddErr(, , nil)

		return nil
	})
}

func ( *Topic) ( *am.Event) bool {
	return len(ParseArgs(.Args).PeerIds) > 0
}

func ( *Topic) ( *am.Event) {
	 := .Mach.NewStateCtx(ss.SendInfo)
	 := ParseArgs(.Args)

	// random delay
	 := .SendInfoDebounceMs * .Multiplayer
	 :=  * 2
	 := time.Duration(rand.Intn()) * time.Millisecond

	// unblock
	if !.IsValid() {
		.Mach.EvRemove1(, ss.SendInfo, nil)
		return
	}
	go func() {
		defer .Mach.EvRemove1(, ss.SendInfo, nil)
		if !amhelp.Wait(, ) {
			return // expired
		}
		.Mach.EvAdd1(, ss.DoSendInfo, Pass(&A{
			PeerIds: .PeerIds,
		}))
	}()
}

func ( *Topic) ( *am.Event) bool {
	 := ParseArgs(.Args)
	return  != nil && .MsgUpdates != nil && .PeerId != ""
}

func ( *Topic) ( *am.Event) {
	 := ParseArgs(.Args)
	 := .host.ID().String()

	for ,  := range .MsgUpdates.PeerClocks {
		// skip self info
		if  ==  {
			continue
		}

		// delay if peer unknown
		,  := .workers[]
		if ! {
			.Mach.EvAdd1(, ss.MissPeersByUpdates, Pass(&A{
				MachClocks: ,
				PeerId:     ,
				Peer:       .peerName(),
			}))

			// process on ss.MsgInfo
			continue
		}

		 := .missingUpdates[] //lint:ignore gosimple
		.metric("Update", )

		// for all updated machines
		for ,  := range  {

			,  := []
			if ! {
				// TODO missing schema per peer
				.log("worker %s:%d not found, delaying..", , )
				continue
			}

			// skip old updates
			if .Sum(nil) > .Time(nil).Sum(nil) {
				.InternalUpdateClock(, 0, true)
			}

			// clean up
			if  == nil {
				continue
			}

			// apply a newer update
			if ,  := [];  &&
				 <= .Sum(nil) {

				delete(, )
				if len() == 0 {
					delete(.missingUpdates, )
				}
			}
		}
	}

	// confirm we know the sender
	 := .PeerId
	if ,  := .workers[]; ! {
		.missingPeers[] = struct{}{}
	}
}

func ( *Topic) ( *am.Event) bool {
	 := ParseArgs(.Args)
	if  == nil || .MsgReqInfo == nil || .PeerId == "" {
		return false
	}

	// check if we know any of the requested peers
	 := len(.workers)
	 := .host.ID().String()
	for ,  := range .MsgReqInfo.PeerIds {
		if ,  := .workers[];  ||  ==  {
			// only 1 known peer answers (randomly)
			// TODO increase the probability with repeated requests
			return  == 0 || rand.Intn() != 0
		}
	}

	return false
}

func ( *Topic) ( *am.Event) {
	 := ParseArgs(.Args)

	// collect peers we know (incl this peer)
	 := .host.ID().String()
	 := []string{}
	for ,  := range .MsgReqInfo.PeerIds {
		// TODO skip info for peers we just sent out / received
		if ,  := .workers[];  ||  ==  {
			 = append(, )
		}

		// memorize requested peer IDs as recently requested (de-dup)
		.reqInfo[] = time.Now()
	}

	// confirm we know the sender
	 := .PeerId
	if ,  := .workers[]; ! {
		.missingPeers[] = struct{}{}
	}

	.Mach.Add1(ss.SendInfo, Pass(&A{
		PeerIds: ,
	}))
}

func ( *Topic) ( *am.Event) bool {
	 := ParseArgs(.Args)
	if  == nil || .MsgReqInfo == nil || .PeerId == "" {
		return false
	}

	// check if we know any the requested peers
	 := .host.ID().String()
	for ,  := range .MsgReqInfo.PeerIds {
		if ,  := .workers[];  ||  ==  {
			// TODO increase the probability with repeated requests
			// return rand.Intn(t.Multiplayer) != 0
			return rand.Intn(len(.workers)) != 0
		}
	}

	return false
}

// TODO this is never fired
func ( *Topic) ( *am.Event) {
	// TODO per-mach index requests (partial)
	// t.Mach.EvRemove1(e, ss.MsgReqInfo, nil)
	 := ParseArgs(.Args)
	 := .Mach.NewStateCtx(ss.Start)

	// collect updates from peers we know
	 := .host.ID().String()
	 := &MsgUpdates{
		Msg:        Msg{MsgTypeUpdates},
		PeerClocks: make(map[string]MachClocks),
	}

	for ,  := range .MsgReqUpdates.PeerIds {
		// this peer
		if  ==  {
			.PeerClocks[] = make(MachClocks)
			for ,  := range .ExposedMachs {
				.PeerClocks[][] = .Time(nil)
			}
		}

		// memorize requested peer IDs as recently requested (de-dup)
		.reqUpdate[] = time.Now()

		// remote peer
		if ,  := .workers[];  {
			.PeerClocks[] = make(MachClocks)
			for ,  := range  {
				.PeerClocks[][] = .Time(nil)
			}
		}
	}

	// confirm we know the sender
	 := .PeerId
	if ,  := .workers[]; ! {
		.missingPeers[] = struct{}{}
	}

	// send updates
	if len(.PeerClocks) <= 0 {
		return
	}

	// unblock
	if !.IsValid() {
		return
	}
	.pool.Go(func() error {
		if .Err() != nil {
			return nil // expired
		}

		,  := msgpack.Marshal()
		if  != nil {
			.Mach.EvAddErr(, , nil)
			return nil
		}
		.Mach.EvAdd1(, ss.SendMsg, Pass(&A{
			Msg:     ,
			MsgType: string(MsgTypeUpdates),
		}))

		return nil
	})
}

func ( *Topic) ( *am.Event) bool {
	 := ParseArgs(.Args)
	return  != nil && .MachClocks != nil && .PeerId != ""
}

func ( *Topic) ( *am.Event) {
	// t.Mach.EvRemove1(e, ss.MissPeersByUpdates, nil)
	 := ParseArgs(.Args)
	 := .MachClocks
	 := .PeerId

	,  := .pendingMachUpdates[]
	if ! {
		.pendingMachUpdates[] = 
		.metric("Gossip", )

		// process on ss.MsgInfo
		return
	}

	// some already delayed, merge (keep higher clocks)
	for ,  := range  {
		if ,  := .pendingMachUpdates[][];  {
			// merge
			if .Sum(nil) > .Sum(nil) {
				.pendingMachUpdates[][] = 
				.metric("Gossip", )
			}
		} else {
			// add
			.pendingMachUpdates[][] = 
			.metric("Gossip", )
		}
	}
}

func ( *Topic) ( *am.Event) bool {
	return len(.pendingMachUpdates) == 0
}

func ( *Topic) ( *am.Event) bool {
	 := ParseArgs(.Args)
	return  != nil && .WorkersCh != nil &&
		// check buffered channel
		cap(.WorkersCh) > 0
}

// ACTIONS

func ( *Topic) ( *am.Event) {
	.Mach.EvRemove1(, ss.ListMachines, nil)

	 := ParseArgs(.Args)
	 := .ListFilters
	if  == nil {
		 = &ListFilters{}
	}
	 := .WorkersCh
	 := make([]*rpc.Worker, 0)

	// TODO use amhelp.Group
	for ,  := range .workers {
		for ,  := range  {

			// ID
			if .IdExact != "" && .Id() != .IdExact {
				continue
			}
			// ID regexp
			if .IdRegexp != nil && !.IdRegexp.MatchString(.Id()) {
				continue
			}
			// ID substring
			if .IdPartial != "" &&
				!strings.Contains(.Id(), .IdPartial) {

				continue
			}

			// peer ID
			if .PeerId != "" &&  != .PeerId {
				continue
			}

			// parent ID
			if .Parent != "" && .ParentId() != .Parent {
				continue
			}

			// TODO filers.DepthLevel
			// TODO filers.ChildrenMax
			// TODO filers.ChildrenMin

			 = append(, )
		}
	}

	// TODO dont send on closed chann
	.log("listing %d workers", len())
	 <- 
}

func ( *Topic) ( *am.Event) {
	 := .Mach
	.EvRemove1(, ss.Heartbeat, nil)

	// make sure these are in sync
	.EvRemove1(, ss.MissPeersByGossip, nil)
	.EvRemove1(, ss.MissPeersByUpdates, nil)

	// delegate work
	 := am.S{ss.SendGossips, ss.ReqMissingUpdates, ss.ReqMissingPeers}
	for ,  := range  {
		if .Not1() && !.WillBe1() {
			.EvAdd1(, , nil)
		}
	}

	// send updates, if any
	if .Is1(ss.SendUpdates) || .WillBe1(ss.SendUpdates) {
		return
	}

	// collect updates
	 := make(MachClocks)
	for ,  := range .ExposedMachs {
		if !.tracers[].dirty.Load() {
			continue
		}
		 := .Time(nil)
		.tracers[].dirty.Store(false)
		[] = 
	}

	.EvAdd1(, ss.SendUpdates, Pass(&A{
		MachClocks: ,
	}))
}

func ( *Topic) ( *am.Event) bool {
	 := ParseArgs(.Args)
	return len(.MachClocks) > 0
}

func ( *Topic) ( *am.Event) {
	defer .Mach.EvRemove1(, ss.SendUpdates, nil)
	 := ParseArgs(.Args).MachClocks

	// send updates
	 := .host.ID().String()
	 := &MsgUpdates{
		Msg:        Msg{MsgTypeUpdates},
		PeerClocks: make(map[string]MachClocks),
	}
	.PeerClocks[] = 

	// make sure it's still on
	if !.IsValid() {
		return
	}

	,  := msgpack.Marshal()
	if  != nil {
		.Mach.EvAddErr(, , nil)
		return
	}
	.Mach.EvAdd1(, ss.SendMsg, Pass(&A{
		Msg:     ,
		MsgType: string(MsgTypeUpdates),
	}))
}

func ( *Topic) ( *am.Event) bool {
	if len(.workers) == 0 {
		return false
	}
	// randomize gossip per 1 Heartbeat
	// TODO config
	// if rand.Intn(t.Multiplayer) != 0 {
	if rand.Intn(len(.workers)) != 0 {
		return false
	}

	return true
}

func ( *Topic) ( *am.Event) {
	 := .Mach.NewStateCtx(ss.SendGossips)

	 := slices.Collect(maps.Keys(.workers))
	 := PeerGossips{}
	for range .GossipAmount {
		 := [rand.Intn(len())]
		// skip empty peers
		if len(.workers[]) == 0 {
			continue
		}
		 := map[int]uint64{}
		for ,  := range .workers[] {
			[] = .Time(nil).Sum(nil)
		}
		[] = 
	}

	// unblock
	if !.IsValid() {
		.Mach.EvRemove1(, ss.SendGossips, nil)
		return
	}
	go func() {
		defer .Mach.EvRemove1(, ss.SendGossips, nil)
		if .Err() != nil {
			return // expired
		}

		 := &MsgGossip{
			Msg:         Msg{MsgTypeGossip},
			PeerGossips: ,
		}
		,  := msgpack.Marshal()
		if  != nil {
			.Mach.EvAddErr(, , nil)
			return
		}
		.Mach.EvAdd1(, ss.SendMsg, Pass(&A{
			Msg:     ,
			MsgType: string(MsgTypeGossip),
		}))
	}()
}

// how often to look for missing peers
// TODO config
var reqMissPeersFreq = time.Second * 5

func ( *Topic) ( *am.Event) bool {
	// nothing is missing
	if len(.missingPeers) == 0 && len(.pendingMachUpdates) == 0 {
		return false
	}

	// too early
	if time.Since(.lastReqHello) <= reqMissPeersFreq {
		return false
	}

	return true
}

func ( *Topic) ( *am.Event) {
	 := .Mach
	 := .NewStateCtx(ss.ReqMissingPeers)

	// TODO config
	 := 5
	 := 15
	// how often to request MsgHello per 1 peer
	 := time.Second * 5 * time.Duration(.Multiplayer)

	// req missing peers
	// TODO fair request dist per peer IDs
	 := []string{}
	 := slices.Concat(
		slices.Collect(maps.Keys(.missingPeers)),
		slices.Collect(maps.Keys(.pendingMachUpdates)))
	for  := 0;  <  && len() < ; ++ {
		// random
		 := [rand.Intn(len())]
		// dup, skip
		if slices.Contains(, ) {
			continue
		}
		// too early, skip
		if ,  := .reqInfo[];  && time.Since() <  {
			continue
		}
		 = append(, )
		.reqInfo[] = time.Now()
	}

	if len() <= 0 {
		.EvRemove1(, ss.ReqMissingPeers, nil)
		return
	}

	.log("missing peers: %d", len())

	// unblock
	if !.IsValid() {
		.EvRemove1(, ss.ReqMissingPeers, nil)
		return
	}
	.pool.Go(func() error {
		defer .EvRemove1(, ss.ReqMissingPeers, nil)
		if .Err() != nil {
			return nil // expired
		}

		// encode and send
		 := &MsgReqInfo{
			Msg:     Msg{MsgTypeReqInfo},
			PeerIds: ,
		}
		,  := msgpack.Marshal()
		if  != nil {
			.EvAddErr(, , nil)
			return nil
		}
		.EvAdd1(, ss.SendMsg, Pass(&A{
			Msg:     ,
			MsgType: string(.Type),
			PeerId:  [0],
			Peer:    .peerName([0]),
		}))

		return nil
	})
}

// how often to look for missing updates
// TODO config
var reqMissUpdatesFreq = time.Second * 5

func ( *Topic) ( *am.Event) bool {
	// nothing is missing
	if len(.missingUpdates) == 0 {
		return false
	}
	// too early
	if time.Since(.lastReqUpdate) <= reqMissUpdatesFreq {
		return false
	}

	return true
}

func ( *Topic) ( *am.Event) {
	 := .Mach.NewStateCtx(ss.ReqMissingUpdates)

	// TODO config
	 := 5
	 := 15
	// how often to request MsgUpdates per 1 peer
	 := time.Second * time.Duration(5*.Multiplayer)

	// req missing peers
	// TODO fair request dist per peer IDs
	 := []string{}
	 := slices.Collect(maps.Keys(.missingUpdates))
	for  := 0;  <  && len() < ; ++ {
		// random
		 := [rand.Intn(len())]
		// dup, skip
		if slices.Contains(, ) {
			continue
		}
		// too early, skip
		if ,  := .reqUpdate[];  && time.Since() <  {
			continue
		}
		 = append(, )
		.reqUpdate[] = time.Now()
	}

	if len() <= 0 {
		.Mach.EvRemove1(, ss.ReqMissingUpdates, nil)
		return
	}

	// unblock
	if !.IsValid() {
		.Mach.EvRemove1(, ss.ReqMissingUpdates, nil)
		return
	}
	.pool.Go(func() error {
		defer .Mach.EvRemove1(, ss.ReqMissingUpdates, nil)
		if .Err() != nil {
			return nil // expired
		}

		 := &MsgReqUpdates{
			Msg:     Msg{MsgTypeReqUpdates},
			PeerIds: ,
		}
		,  := msgpack.Marshal()
		if  != nil {
			.Mach.EvAddErr(, , nil)
			return nil
		}
		.Mach.EvAdd1(, ss.SendMsg, Pass(&A{
			Msg: ,
			// debug
			MsgType: string(.Type),
			PeerId:  [0],
			Peer:    .peerName([0]),
		}))

		return nil
	})
}

func ( *Topic) ( *am.Event) bool {
	return len(ParseArgs(.Args).PeerIds) > 0
}

func ( *Topic) ( *am.Event) {
	.Mach.EvRemove1(, ss.DoSendInfo, nil)
	 := ParseArgs(.Args)
	 := .PeerIds

	// collect gossips and info
	 := PeerGossips{}
	 := .host.ID().String()
	 := slices.Clone(.ExposedMachs)

	// gossip about 5 random peers
	for range min(.GossipAmount, len(.workers)) {
		 := slices.Collect(maps.Keys(.workers))
		 := [rand.Intn(len(.workers))]
		// skip empty peers
		if len(.workers[]) == 0 {
			continue
		}
		 := map[int]uint64{}
		for ,  := range .workers[] {
			[] = .Time(nil).Sum(nil)
		}
		[] = 
	}

	if !.IsValid() {
		return
	}

	// try again if didnt go through
	// if !ok {
	// 	t.Mach.Remove1(ss.SendHello, nil)
	// 	t.Mach.Add1(ss.SendHello, nil)
	// 	// TODO detect too many retires (compare PeerJoined ticks)
	// 	return
	// }

	// list all requested machs
	 := &MsgInfo{
		Msg:         Msg{MsgTypeInfo},
		PeerInfo:    PeerInfo{},
		PeerGossips: ,
	}
	for ,  := range  {
		 := MachInfo{}

		// say hello
		if  ==  {
			for ,  := range  {

				// mach schema
				var  am.Schema
				if .TestSchema == nil {
					 = .Schema()
				}
				var  am.S
				if .TestStates == nil {
					 = .StateNames()
				}

				// info msg
				[] = &Info{
					Id:     .Id(),
					Schema: ,
					States: ,
					MTime:  .Time(nil),
					Tags:   .Tags(),
					Parent: .ParentId(),
				}
			}
		} else {

			// fwd what we know
			if ,  := .info[]; ! {
				continue
			}
			// TODO send only requested ones
			 = .info[]
		}

		.PeerInfo[] = 
	}

	// send
	,  := msgpack.Marshal()
	if  != nil {
		.Mach.EvAddErr(, , nil)
		return
	}
	.Mach.EvAdd1(, ss.SendMsg, Pass(&A{
		Msg:     ,
		MsgType: string(MsgTypeInfo),
	}))
}

// ///// ///// /////

// ///// METHODS

// ///// ///// /////

func ( *Topic) () am.Result {
	return .Mach.Add1(ss.Start, nil)
}

func ( *Topic) () int {
	return .host.ConnManager().(*connmgr.BasicConnMgr).GetInfo().ConnCount
}

func ( *Topic) () am.Result {
	return .Mach.Add1(ss.Joining, nil)
}

func ( *Topic) ( context.Context) am.Result {
	if .Mach.Add1(ss.Start, nil) == am.Canceled {
		return am.Canceled
	}
	 := amhelp.WaitForAll(, time.Second, .Mach.When1(ss.Connected, ))
	if .Err() != nil {
		return am.Canceled
	}
	if  != nil {
		return am.Canceled
	}

	return .Join()
}

func ( *Topic) () am.Result {
	return .Mach.Add1(ss.Disposing, nil)
}

func ( *Topic) () ([]ma.Multiaddr, error) {
	if .Mach.Not1(ss.Start) {
		return nil, fmt.Errorf("%w: %s", am.ErrStateInactive, ss.Start)
	}
	 := .host

	// Get the peer ID
	 := .ID()

	// Get all listen addresses
	 := .Addrs()
	if len() == 0 {
		return nil, errors.New("no listen addresses available")
	}

	// Create a slice to hold the encapsulated multiaddresses
	 := make([]ma.Multiaddr, len())
	for ,  := range  {
		// Create a new multiaddress by combining the network address and peer ID
		[] = .Encapsulate(ma.StringCast("/p2p/" + .String()))
	}

	return , nil
}

func ( *Topic) () int {
	 := 0
	for ,  := range .workers {
		 += len()
	}
	return 
}

func ( *Topic) ( string,  ...any) {
	if !.LogEnabled {
		return
	}
	.Mach.Log(, ...)
}

func ( *Topic) ( string) string {
	if ,  := .HostToPeer[];  {
		return 
	}

	return ""
}

func ( *Topic) (,  string) bool {
	 := .MachMetrics.Load()
	if  == nil {
		return false
	}

	 :=  + 
	if  := .peerName();  != "" {
		 =  + 
	}
	if !.Has1() {
		return false
	}

	.Add1(, nil)

	return true
}

func ( *Topic) () {
	 := .MachMetrics.Load()
	if  == nil {
		return
	}

	 := func( string) {
		if !.Has1() {
			return
		}

		.Add1(, nil)
	}

	.Mach.Eval("syncMetrics", func() {
		 := ""

		for  := range .missingUpdates {
			 = "Gossip" + 
			if ,  := .HostToPeer[];  {
				 =  + "Gossip"
			}
			()
		}

		for  := range .pendingMachUpdates {
			 = "Gossip" + 
			if ,  := .HostToPeer[];  {
				 =  + "Gossip"
			}
			()
		}

		for  := range .missingPeers {
			 = "Gossip" + 
			if ,  := .HostToPeer[];  {
				 =  + "Gossip"
			}
			()
		}

		for  := range .info {
			 = "Info" + 
			if ,  := .HostToPeer[];  {
				 =  + "Info"
			}
			()
			("Peer")
		}
	}, nil)
}