package observedaddrs

import (
	
	
	
	
	
	
	

	
	
	logging 
	basichost 
	

	ma 
	manet 
)

var log = logging.Logger("observedaddrs")

// ActivationThresh is the minimum number of observers required for an observed address
// to be considered valid. We may not advertise this address even if we have these many
// observations if better observed addresses are available.
var ActivationThresh = 4

var (
	// observedAddrManagerWorkerChannelSize defines how many addresses can be enqueued
	// for adding to an ObservedAddrManager.
	observedAddrManagerWorkerChannelSize = 16
	// natTypeChangeTickrInterval is the interval between two nat device change events.
	//
	// Computing the NAT type is expensive and the information in the event is not too
	// useful, so this interval is long.
	natTypeChangeTickrInterval = 1 * time.Minute
)

const maxExternalThinWaistAddrsPerLocalAddr = 3

// thinWaist is a struct that stores the address along with it's thin waist prefix and rest of the multiaddr
type thinWaist struct {
	Addr, TW, Rest ma.Multiaddr
}

var errTW = errors.New("not a thinwaist address")

func thinWaistForm( ma.Multiaddr) (thinWaist, error) {
	if len() < 2 {
		return thinWaist{}, errTW
	}
	if ,  := [0].Code(), [1].Code(); ( != ma.P_IP4 &&  != ma.P_IP6) || ( != ma.P_TCP &&  != ma.P_UDP) {
		return thinWaist{}, errTW
	}
	return thinWaist{Addr: , TW: [:2], Rest: [2:]}, nil
}

// getObserver returns the observer for the multiaddress
// For an IPv4 multiaddress the observer is the IP address
// For an IPv6 multiaddress the observer is the first /56 prefix of the IP address
func getObserver( ma.Multiaddr) (string, error) {
	,  := manet.ToIP()
	if  != nil {
		return "", 
	}
	if  := .To4();  != nil {
		return .String(), nil
	}
	// Count /56 prefix as a single observer.
	return .Mask(net.CIDRMask(56, 128)).String(), nil
}

// connMultiaddrs provides IsClosed along with network.ConnMultiaddrs. It is easier to mock this than network.Conn
type connMultiaddrs interface {
	network.ConnMultiaddrs
	IsClosed() bool
}

// observerSetCacheSize is the number of transport sharing the same thinwaist (tcp, ws, wss), (quic, webtransport, webrtc-direct)
// This is 3 in practice right now, but keep a buffer of few extra elements
const observerSetCacheSize = 10

// observerSet is the set of observers who have observed ThinWaistAddr
type observerSet struct {
	ObservedTWAddr ma.Multiaddr
	ObservedBy     map[string]int

	mu               sync.RWMutex            // protects following
	cachedMultiaddrs map[string]ma.Multiaddr // cache of localMultiaddr rest(addr - thinwaist) => output multiaddr
}

func ( *observerSet) ( ma.Multiaddr) ma.Multiaddr {
	if  == nil {
		return .ObservedTWAddr
	}
	 := string(.Bytes())
	.mu.RLock()
	,  := .cachedMultiaddrs[]
	.mu.RUnlock()
	if  {
		return 
	}

	.mu.Lock()
	defer .mu.Unlock()
	// Check if some other go routine added this while we were waiting
	,  = .cachedMultiaddrs[]
	if  {
		return 
	}
	if .cachedMultiaddrs == nil {
		.cachedMultiaddrs = make(map[string]ma.Multiaddr, observerSetCacheSize)
	}
	if len(.cachedMultiaddrs) == observerSetCacheSize {
		// remove one entry if we will go over the limit
		for  := range .cachedMultiaddrs {
			delete(.cachedMultiaddrs, )
			break
		}
	}
	.cachedMultiaddrs[] = ma.Join(.ObservedTWAddr, )
	return .cachedMultiaddrs[]
}

type observation struct {
	conn     connMultiaddrs
	observed ma.Multiaddr
}

// Manager maps connection's local multiaddrs to their externally observable multiaddress
type Manager struct {
	// Our listen addrs
	listenAddrs func() []ma.Multiaddr
	// worker channel for new observations
	wch chan observation
	// eventbus for identify observations
	eventbus event.Bus

	// for closing
	wg         sync.WaitGroup
	ctx        context.Context
	ctxCancel  context.CancelFunc
	stopNotify func()

	mu sync.RWMutex
	// local thin waist => external thin waist => observerSet
	externalAddrs map[string]map[string]*observerSet
	// connObservedTWAddrs maps the connection to the last observed thin waist multiaddr on that connection
	connObservedTWAddrs map[connMultiaddrs]ma.Multiaddr
}

var _ basichost.ObservedAddrsManager = (*Manager)(nil)

// NewManager returns a new manager using peerstore.OwnObservedAddressTTL as the TTL.
func ( event.Bus,  network.Network) (*Manager, error) {
	 := func() []ma.Multiaddr {
		 := .ListenAddresses()
		,  := .InterfaceListenAddresses()
		if  != nil {
			log.Warn("error getting interface listen addresses", "err", )
		}
		return append(, ...)
	}
	,  := newManagerWithListenAddrs(, )
	if  != nil {
		return nil, 
	}

	return , nil
}

// newManagerWithListenAddrs uses the listenAddrs directly to simplify creation in tests.
func newManagerWithListenAddrs( event.Bus,  func() []ma.Multiaddr) (*Manager, error) {
	 := &Manager{
		externalAddrs:       make(map[string]map[string]*observerSet),
		connObservedTWAddrs: make(map[connMultiaddrs]ma.Multiaddr),
		wch:                 make(chan observation, observedAddrManagerWorkerChannelSize),
		listenAddrs:         ,
		eventbus:            ,
		stopNotify:          func() {},
	}
	.ctx, .ctxCancel = context.WithCancel(context.Background())
	return , nil
}

// Start tracking addrs
func ( *Manager) ( network.Network) {
	 := &network.NotifyBundle{
		DisconnectedF: func( network.Network,  network.Conn) {
			.removeConn()
		},
	}

	,  := .eventbus.Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.Name("observed-addrs-manager"))
	if  != nil {
		log.Error("failed to start observed addrs manager: identify subscription failed.", "err", )
		return
	}
	,  := .eventbus.Emitter(new(event.EvtNATDeviceTypeChanged), eventbus.Stateful)
	if  != nil {
		log.Error("failed to start observed addrs manager: nat device type changed emitter error.", "err", )
		.Close()
		return
	}

	.Notify()
	.stopNotify = func() {
		.StopNotify()
	}

	.wg.Add(2)
	go .eventHandler(, )
	go .worker()
}

// AddrsFor return all activated observed addresses associated with the given
// (resolved) listen address.
func ( *Manager) ( ma.Multiaddr) ( []ma.Multiaddr) {
	if  == nil {
		return nil
	}
	.mu.RLock()
	defer .mu.RUnlock()
	,  := thinWaistForm()
	if  != nil {
		return nil
	}

	 := .getTopExternalAddrs(string(.TW.Bytes()), ActivationThresh)
	 := make([]ma.Multiaddr, 0, len())
	for ,  := range  {
		 = append(, .cacheMultiaddr(.Rest))
	}
	return 
}

// appendInferredAddrs infers the external address of addresses for the addresses
// that we are listening on using the thin waist mapping.
//
// e.g. If we have observations for a QUIC address on port 9000, and we are
// listening on the same interface and port 9000 for WebTransport, we can infer
// the external WebTransport address.
func ( *Manager) ( map[string][]*observerSet,  []ma.Multiaddr) []ma.Multiaddr {
	if  == nil {
		 = make(map[string][]*observerSet)
		for  := range .externalAddrs {
			[] = append([], .getTopExternalAddrs(, ActivationThresh)...)
		}
	}
	 := .listenAddrs()
	 := make(map[string]struct{})
	for ,  := range  {
		if ,  := [string(.Bytes())];  {
			// We've already added this
			continue
		}
		[string(.Bytes())] = struct{}{}
		,  := thinWaistForm()
		if  != nil {
			continue
		}
		for ,  := range [string(.TW.Bytes())] {
			 = append(, .cacheMultiaddr(.Rest))
		}
	}
	return 
}

// Addrs return all observed addresses with at least minObservers observers
// If minObservers <= 0, it will return all addresses with at least ActivationThresh observers.
func ( *Manager) ( int) []ma.Multiaddr {
	.mu.RLock()
	defer .mu.RUnlock()

	if  <= 0 {
		 = ActivationThresh
	}

	 := make(map[string][]*observerSet)
	for  := range .externalAddrs {
		[] = append([], .getTopExternalAddrs(, )...)
	}
	 := make([]ma.Multiaddr, 0, maxExternalThinWaistAddrsPerLocalAddr*5) // assume 5 transports
	 = .appendInferredAddrs(, )
	return 
}

func ( *Manager) ( string,  int) []*observerSet {
	 := make([]*observerSet, 0, len(.externalAddrs[]))
	for ,  := range .externalAddrs[] {
		if len(.ObservedBy) >=  {
			 = append(, )
		}
	}
	slices.SortFunc(, func(,  *observerSet) int {
		 := len(.ObservedBy) - len(.ObservedBy)
		if  != 0 {
			return 
		}
		// In case we have elements with equal counts,
		// keep the address list stable by using the lexicographically smaller address
		return .ObservedTWAddr.Compare(.ObservedTWAddr)
	})
	// TODO(sukunrt): Improve this logic. Return only if the addresses have a
	// threshold fraction of the maximum observations
	 := len()
	if  > maxExternalThinWaistAddrsPerLocalAddr {
		 = maxExternalThinWaistAddrsPerLocalAddr
	}
	return [:]
}

func ( *Manager) ( event.Subscription,  event.Emitter) {
	defer .wg.Done()
	 := time.NewTicker(natTypeChangeTickrInterval)
	defer .Stop()
	var ,  network.NATDeviceType
	for {
		select {
		case  := <-.Out():
			 := .(event.EvtPeerIdentificationCompleted)
			select {
			case .wch <- observation{
				conn:     .Conn,
				observed: .ObservedAddr,
			}:
			default:
				log.Debug("dropping address observation due to full buffer",
					"from", .Conn.RemoteMultiaddr(),
					"observed", .ObservedAddr,
				)
			}
		case <-.C:
			,  := .getNATType()
			if  !=  {
				.Emit(event.EvtNATDeviceTypeChanged{
					TransportProtocol: network.NATTransportUDP,
					NatDeviceType:     ,
				})
			}
			if  !=  {
				.Emit(event.EvtNATDeviceTypeChanged{
					TransportProtocol: network.NATTransportTCP,
					NatDeviceType:     ,
				})
			}
			,  = , 
		case <-.ctx.Done():
			return
		}
	}
}

func ( *Manager) () {
	defer .wg.Done()
	for {
		select {
		case  := <-.wch:
			.maybeRecordObservation(.conn, .observed)
		case <-.ctx.Done():
			return
		}
	}
}

func ( *Manager) ( connMultiaddrs,  ma.Multiaddr) ( bool,  thinWaist,  thinWaist) {
	if  == nil ||  == nil {
		return false, thinWaist{}, thinWaist{}
	}
	// Ignore observations from loopback nodes. We already know our loopback
	// addresses.
	if manet.IsIPLoopback() {
		return false, thinWaist{}, thinWaist{}
	}

	// Provided by NAT64 peers, these addresses are specific to the peer and not publicly routable
	if manet.IsNAT64IPv4ConvertedIPv6Addr() {
		return false, thinWaist{}, thinWaist{}
	}

	// Ignore p2p-circuit addresses. These are the observed address of the relay.
	// Not useful for us.
	if isRelayedAddress() {
		return false, thinWaist{}, thinWaist{}
	}

	,  := thinWaistForm(.LocalMultiaddr())
	if  != nil {
		log.Info("failed to get interface listen addrs", "err", )
		return false, thinWaist{}, thinWaist{}
	}

	 := .listenAddrs()
	for ,  := range  {
		,  := thinWaistForm()
		if  != nil {
			[] = nil
			continue
		}
		[] = .TW
	}

	if !ma.Contains(, .TW) {
		// not in our list
		return false, thinWaist{}, thinWaist{}
	}

	,  = thinWaistForm()
	if  != nil {
		return false, thinWaist{}, thinWaist{}
	}
	if !hasConsistentTransport(.TW, .TW) {
		log.Debug(
			"invalid observed address for local address",
			"observed", ,
			"local", .Addr,
		)
		return false, thinWaist{}, thinWaist{}
	}

	return true, , 
}

func ( *Manager) ( connMultiaddrs,  ma.Multiaddr) {
	, ,  := .shouldRecordObservation(, )
	if ! {
		return
	}
	log.Debug("added own observed listen addr", "conn", , "observed", )

	.mu.Lock()
	defer .mu.Unlock()
	.recordObservationUnlocked(, , )
}

func ( *Manager) ( connMultiaddrs, ,  thinWaist) {
	if .IsClosed() {
		// dont record if the connection is already closed. Any previous observations will be removed in
		// the disconnected callback
		return
	}
	 := string(.TW.Bytes())
	 := string(.TW.Bytes())
	,  := getObserver(.RemoteMultiaddr())
	if  != nil {
		return
	}

	,  := .connObservedTWAddrs[]
	if  {
		// we have received the same observation again, nothing to do
		if .Equal(.TW) {
			return
		}
		// if we have a previous entry remove it from externalAddrs
		.removeExternalAddrsUnlocked(, , string(.Bytes()))
	}
	.connObservedTWAddrs[] = .TW
	.addExternalAddrsUnlocked(.TW, , , )
}

func ( *Manager) (, ,  string) {
	,  := .externalAddrs[][]
	if ! {
		return
	}
	.ObservedBy[]--
	if .ObservedBy[] <= 0 {
		delete(.ObservedBy, )
	}
	if len(.ObservedBy) == 0 {
		delete(.externalAddrs[], )
	}
	if len(.externalAddrs[]) == 0 {
		delete(.externalAddrs, )
	}
}

func ( *Manager) ( ma.Multiaddr, , ,  string) {
	,  := .externalAddrs[][]
	if ! {
		 = &observerSet{
			ObservedTWAddr: ,
			ObservedBy:     make(map[string]int),
		}
		if ,  := .externalAddrs[]; ! {
			.externalAddrs[] = make(map[string]*observerSet)
		}
		.externalAddrs[][] = 
	}
	.ObservedBy[]++
}

func ( *Manager) ( connMultiaddrs) {
	if  == nil {
		return
	}
	.mu.Lock()
	defer .mu.Unlock()

	,  := .connObservedTWAddrs[]
	if ! {
		return
	}
	delete(.connObservedTWAddrs, )

	,  := thinWaistForm(.LocalMultiaddr())
	if  != nil {
		return
	}

	,  := getObserver(.RemoteMultiaddr())
	if  != nil {
		return
	}

	.removeExternalAddrsUnlocked(, string(.TW.Bytes()), string(.Bytes()))
}

func ( *Manager) () (,  network.NATDeviceType) {
	.mu.RLock()
	defer .mu.RUnlock()

	var ,  []int
	var ,  int
	for ,  := range .externalAddrs {
		 := false
		for ,  := range  {
			for ,  := range .ObservedTWAddr {
				if .Code() == ma.P_TCP {
					 = true
					break
				}
			}
		}
		for ,  := range  {
			if  {
				 = append(, len(.ObservedBy))
				 += len(.ObservedBy)
			} else {
				 = append(, len(.ObservedBy))
				 += len(.ObservedBy)
			}
		}
	}

	sort.Sort(sort.Reverse(sort.IntSlice()))
	sort.Sort(sort.Reverse(sort.IntSlice()))

	,  := 0, 0
	for  := 0;  < maxExternalThinWaistAddrsPerLocalAddr &&  < len(); ++ {
		 += []
	}
	for  := 0;  < maxExternalThinWaistAddrsPerLocalAddr &&  < len(); ++ {
		 += []
	}

	// If the top elements cover more than 1/2 of all the observations, there's a > 50% chance that
	// hole punching based on outputs of observed address manager will succeed
	//
	// The `3*maxExternalThinWaistAddrsPerLocalAddr` is a magic number, we just want sufficient
	// observations to decide about NAT Type
	if  >= 3*maxExternalThinWaistAddrsPerLocalAddr {
		if  >= /2 {
			 = network.NATDeviceTypeEndpointIndependent
		} else {
			 = network.NATDeviceTypeEndpointDependent
		}
	}
	if  >= 3*maxExternalThinWaistAddrsPerLocalAddr {
		if  >= /2 {
			 = network.NATDeviceTypeEndpointIndependent
		} else {
			 = network.NATDeviceTypeEndpointDependent
		}
	}
	return
}

func ( *Manager) () error {
	.stopNotify()
	.ctxCancel()
	.wg.Wait()
	return nil
}

// hasConsistentTransport returns true if the thin waist address `aTW` shares the same
// protocols with `bTW`
func hasConsistentTransport(,  ma.Multiaddr) bool {
	if len() != len() {
		return false
	}
	for ,  := range  {
		if [].Code() != .Code() {
			return false
		}
	}
	return true
}

func isRelayedAddress( ma.Multiaddr) bool {
	for ,  := range  {
		if .Code() == ma.P_CIRCUIT {
			return true
		}
	}
	return false
}