package identify

import (
	
	
	
	
	
	
	
	
	

	
	
	
	
	
	
	
	
	
	useragent 
	
	

	logging 
	
	ma 
	manet 
	msmux 
	
)

var log = logging.Logger("net/identify")

const (
	// ID is the protocol.ID of version 1.0.0 of the identify service.
	ID = "/ipfs/id/1.0.0"
	// IDPush is the protocol.ID of the Identify push protocol.
	// It sends full identify messages containing the current state of the peer.
	IDPush = "/ipfs/id/push/1.0.0"
	// DefaultTimeout for all id interactions, incoming / outgoing, id / id-push.
	DefaultTimeout = 5 * time.Second
	// ServiceName is the default identify service name
	ServiceName = "libp2p.identify"

	legacyIDSize          = 2 * 1024
	signedIDSize          = 8 * 1024
	maxOwnIdentifyMsgSize = 4 * 1024 // smaller than what we accept. This is 4k to be compatible with rust-libp2p
	maxMessages           = 10
	maxPushConcurrency    = 32
	// number of addresses to keep for peers we have disconnected from for peerstore.RecentlyConnectedTTL time
	// This number can be small as we already filter peer addresses based on whether the peer is connected to us over
	// localhost, private IP or public IP address
	recentlyConnectedPeerMaxAddrs = 20
	connectedPeerMaxAddrs         = 500
)

var (
	defaultNetworkPrefixRateLimits = []rate.PrefixLimit{
		{Prefix: netip.MustParsePrefix("127.0.0.0/8"), Limit: rate.Limit{}}, // inf
		{Prefix: netip.MustParsePrefix("::1/128"), Limit: rate.Limit{}},     // inf
	}
	defaultGlobalRateLimit      = rate.Limit{RPS: 2000, Burst: 3000}
	defaultIPv4SubnetRateLimits = []rate.SubnetLimit{
		{PrefixLength: 24, Limit: rate.Limit{RPS: 0.2, Burst: 10}}, // 1 every 5 seconds
	}
	defaultIPv6SubnetRateLimits = []rate.SubnetLimit{
		{PrefixLength: 56, Limit: rate.Limit{RPS: 0.2, Burst: 10}}, // 1 every 5 seconds
		{PrefixLength: 48, Limit: rate.Limit{RPS: 0.5, Burst: 20}}, // 1 every 2 seconds
	}
)

type identifySnapshot struct {
	seq       uint64
	protocols []protocol.ID
	addrs     []ma.Multiaddr
	record    *record.Envelope
}

// Equal says if two snapshots are identical.
// It does NOT compare the sequence number.
func ( identifySnapshot) ( *identifySnapshot) bool {
	 := .record != nil
	 := .record != nil
	if  !=  {
		return false
	}
	if  && !.record.Equal(.record) {
		return false
	}
	if !slices.Equal(.protocols, .protocols) {
		return false
	}
	if len(.addrs) != len(.addrs) {
		return false
	}
	for ,  := range .addrs {
		if !.Equal(.addrs[]) {
			return false
		}
	}
	return true
}

type IDService interface {
	// IdentifyConn synchronously triggers an identify request on the connection and
	// waits for it to complete. If the connection is being identified by another
	// caller, this call will wait. If the connection has already been identified,
	// it will return immediately.
	IdentifyConn(network.Conn)
	// IdentifyWait triggers an identify (if the connection has not already been
	// identified) and returns a channel that is closed when the identify protocol
	// completes.
	IdentifyWait(network.Conn) <-chan struct{}
	// OwnObservedAddrs returns the addresses peers have reported we've dialed from
	OwnObservedAddrs() []ma.Multiaddr
	// ObservedAddrsFor returns the addresses peers have reported we've dialed from,
	// for a specific local address.
	ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr
	Start()
	io.Closer
}

type identifyPushSupport uint8

const (
	identifyPushSupportUnknown identifyPushSupport = iota
	identifyPushSupported
	identifyPushUnsupported
)

type entry struct {
	// The IdentifyWaitChan is created when IdentifyWait is called for the first time.
	// IdentifyWait closes this channel when the Identify request completes, or when it fails.
	IdentifyWaitChan chan struct{}

	// PushSupport saves our knowledge about the peer's support of the Identify Push protocol.
	// Before the identify request returns, we don't know yet if the peer supports Identify Push.
	PushSupport identifyPushSupport
	// Sequence is the sequence number of the last snapshot we sent to this peer.
	Sequence uint64
}

// idService is a structure that implements ProtocolIdentify.
// It is a trivial service that gives the other peer some
// useful information about the local peer. A sort of hello.
//
// The idService sends:
//   - Our libp2p Protocol Version
//   - Our libp2p Agent Version
//   - Our public Listen Addresses
type idService struct {
	Host            host.Host
	UserAgent       string
	ProtocolVersion string

	metricsTracer MetricsTracer

	setupCompleted chan struct{} // is closed when Start has finished setting up
	ctx            context.Context
	ctxCancel      context.CancelFunc
	// track resources that need to be shut down before we shut down
	refCount sync.WaitGroup

	disableSignedPeerRecord bool
	timeout                 time.Duration

	connsMu sync.RWMutex
	// The conns map contains all connections we're currently handling.
	// Connections are inserted as soon as they're available in the swarm
	// Connections are removed from the map when the connection disconnects.
	conns map[network.Conn]entry

	addrMu sync.Mutex

	// our own observed addresses.
	observedAddrMgr            *ObservedAddrManager
	disableObservedAddrManager bool

	emitters struct {
		evtPeerProtocolsUpdated        event.Emitter
		evtPeerIdentificationCompleted event.Emitter
		evtPeerIdentificationFailed    event.Emitter
	}

	currentSnapshot struct {
		sync.Mutex
		snapshot identifySnapshot
	}

	natEmitter *natEmitter

	rateLimiter *rate.Limiter
}

type normalizer interface {
	NormalizeMultiaddr(ma.Multiaddr) ma.Multiaddr
}

// NewIDService constructs a new *idService and activates it by
// attaching its stream handler to the given host.Host.
func ( host.Host,  ...Option) (*idService, error) {
	 := config{
		timeout: DefaultTimeout,
	}
	for ,  := range  {
		(&)
	}

	 := useragent.DefaultUserAgent()
	if .userAgent != "" {
		 = .userAgent
	}

	,  := context.WithCancel(context.Background())
	 := &idService{
		Host:                    ,
		UserAgent:               ,
		ProtocolVersion:         .protocolVersion,
		ctx:                     ,
		ctxCancel:               ,
		conns:                   make(map[network.Conn]entry),
		disableSignedPeerRecord: .disableSignedPeerRecord,
		setupCompleted:          make(chan struct{}),
		metricsTracer:           .metricsTracer,
		timeout:                 .timeout,
		rateLimiter: &rate.Limiter{
			GlobalLimit:         defaultGlobalRateLimit,
			NetworkPrefixLimits: defaultNetworkPrefixRateLimits,
			SubnetRateLimiter: rate.SubnetLimiter{
				IPv4SubnetLimits: defaultIPv4SubnetRateLimits,
				IPv6SubnetLimits: defaultIPv6SubnetRateLimits,
				GracePeriod:      1 * time.Minute,
			},
		},
	}

	var  func(ma.Multiaddr) ma.Multiaddr
	if ,  := .(normalizer);  {
		 = .NormalizeMultiaddr
	}

	var  error
	if .disableObservedAddrManager {
		.disableObservedAddrManager = true
	} else {
		,  := NewObservedAddrManager(.Network().ListenAddresses,
			.Addrs, .Network().InterfaceListenAddresses, )
		if  != nil {
			return nil, fmt.Errorf("failed to create observed address manager: %s", )
		}
		,  := newNATEmitter(, , time.Minute)
		if  != nil {
			return nil, fmt.Errorf("failed to create nat emitter: %s", )
		}
		.natEmitter = 
		.observedAddrMgr = 
	}

	.emitters.evtPeerProtocolsUpdated,  = .EventBus().Emitter(&event.EvtPeerProtocolsUpdated{})
	if  != nil {
		log.Warnf("identify service not emitting peer protocol updates; err: %s", )
	}
	.emitters.evtPeerIdentificationCompleted,  = .EventBus().Emitter(&event.EvtPeerIdentificationCompleted{})
	if  != nil {
		log.Warnf("identify service not emitting identification completed events; err: %s", )
	}
	.emitters.evtPeerIdentificationFailed,  = .EventBus().Emitter(&event.EvtPeerIdentificationFailed{})
	if  != nil {
		log.Warnf("identify service not emitting identification failed events; err: %s", )
	}
	return , nil
}

func ( *idService) () {
	.Host.Network().Notify((*netNotifiee)())
	.Host.SetStreamHandler(ID, .handleIdentifyRequest)
	.Host.SetStreamHandler(IDPush, .rateLimiter.Limit(.handlePush))
	.updateSnapshot()
	close(.setupCompleted)

	.refCount.Add(1)
	go .loop(.ctx)
}

func ( *idService) ( context.Context) {
	defer .refCount.Done()

	,  := .Host.EventBus().Subscribe(
		[]any{&event.EvtLocalProtocolsUpdated{}, &event.EvtLocalAddressesUpdated{}},
		eventbus.BufSize(256),
		eventbus.Name("identify (loop)"),
	)
	if  != nil {
		log.Errorf("failed to subscribe to events on the bus, err=%s", )
		return
	}
	defer .Close()

	// Send pushes from a separate Go routine.
	// That way, we can end up with
	// * this Go routine busy looping over all peers in sendPushes
	// * another push being queued in the triggerPush channel
	 := make(chan struct{}, 1)
	.refCount.Add(1)
	go func() {
		defer .refCount.Done()

		for {
			select {
			case <-.Done():
				return
			case <-:
				.sendPushes()
			}
		}
	}()

	for {
		select {
		case ,  := <-.Out():
			if ! {
				return
			}
			if  := .updateSnapshot(); ! {
				continue
			}
			if .metricsTracer != nil {
				.metricsTracer.TriggeredPushes()
			}
			select {
			case  <- struct{}{}:
			default: // we already have one more push queued, no need to queue another one
			}
		case <-.Done():
			return
		}
	}
}

func ( *idService) ( context.Context) {
	.connsMu.RLock()
	 := make([]network.Conn, 0, len(.conns))
	for ,  := range .conns {
		// Push even if we don't know if push is supported.
		// This will be only the case while the IdentifyWaitChan call is in flight.
		if .PushSupport == identifyPushSupported || .PushSupport == identifyPushSupportUnknown {
			 = append(, )
		}
	}
	.connsMu.RUnlock()

	 := make(chan struct{}, maxPushConcurrency)
	var  sync.WaitGroup
	for ,  := range  {
		// check if the connection is still alive
		.connsMu.RLock()
		,  := .conns[]
		.connsMu.RUnlock()
		if ! {
			continue
		}
		// check if we already sent the current snapshot to this peer
		.currentSnapshot.Lock()
		 := .currentSnapshot.snapshot
		.currentSnapshot.Unlock()
		if .Sequence >= .seq {
			log.Debugw("already sent this snapshot to peer", "peer", .RemotePeer(), "seq", .seq)
			continue
		}
		// we haven't, send it now
		 <- struct{}{}
		.Add(1)
		go func( network.Conn) {
			defer .Done()
			defer func() { <- }()
			,  := context.WithTimeout(, .timeout)
			defer ()

			,  := newStreamAndNegotiate(, , IDPush, .timeout)
			if  != nil { // connection might have been closed recently
				return
			}
			// TODO: find out if the peer supports push if we didn't have any information about push support
			if  := .sendIdentifyResp(, true);  != nil {
				log.Debugw("failed to send identify push", "peer", .RemotePeer(), "error", )
				return
			}
		}()
	}
	.Wait()
}

// Close shuts down the idService
func ( *idService) () error {
	.ctxCancel()
	if !.disableObservedAddrManager {
		.observedAddrMgr.Close()
		.natEmitter.Close()
	}
	.refCount.Wait()
	return nil
}

func ( *idService) () []ma.Multiaddr {
	if .disableObservedAddrManager {
		return nil
	}
	return .observedAddrMgr.Addrs()
}

func ( *idService) ( ma.Multiaddr) []ma.Multiaddr {
	if .disableObservedAddrManager {
		return nil
	}
	return .observedAddrMgr.AddrsFor()
}

// IdentifyConn runs the Identify protocol on a connection.
// It returns when we've received the peer's Identify message (or the request fails).
// If successful, the peer store will contain the peer's addresses and supported protocols.
func ( *idService) ( network.Conn) {
	<-.IdentifyWait()
}

// IdentifyWait runs the Identify protocol on a connection.
// It doesn't block and returns a channel that is closed when we receive
// the peer's Identify message (or the request fails).
// If successful, the peer store will contain the peer's addresses and supported protocols.
func ( *idService) ( network.Conn) <-chan struct{} {
	.connsMu.Lock()
	defer .connsMu.Unlock()

	,  := .conns[]
	if ! {
		// No entry found. We may have gotten an out of order notification. Check it we should have this conn (because we're still connected)
		// We hold the ids.connsMu lock so this is safe since a disconnect event will be processed later if we are connected.
		if .IsClosed() {
			log.Debugw("connection not found in identify service", "peer", .RemotePeer())
			 := make(chan struct{})
			close()
			return 
		} else {
			.addConnWithLock()
		}
	}

	if .IdentifyWaitChan != nil {
		return .IdentifyWaitChan
	}
	// First call to IdentifyWait for this connection. Create the channel.
	.IdentifyWaitChan = make(chan struct{})
	.conns[] = 

	// Spawn an identify. The connection may actually be closed
	// already, but that doesn't really matter. We'll fail to open a
	// stream then forget the connection.
	go func() {
		defer close(.IdentifyWaitChan)
		if  := .identifyConn();  != nil {
			log.Warnf("failed to identify %s: %s", .RemotePeer(), )
			.emitters.evtPeerIdentificationFailed.Emit(event.EvtPeerIdentificationFailed{Peer: .RemotePeer(), Reason: })
			return
		}
	}()

	return .IdentifyWaitChan
}

// newStreamAndNegotiate opens a new stream on the given connection and negotiates the given protocol.
func newStreamAndNegotiate( context.Context,  network.Conn,  protocol.ID,  time.Duration) (network.Stream, error) {
	,  := .NewStream(network.WithAllowLimitedConn(, "identify"))
	if  != nil {
		log.Debugw("error opening identify stream", "peer", .RemotePeer(), "error", )
		return nil, fmt.Errorf("failed to open new stream: %w", )
	}

	// Ignore the error. Consistent with our previous behavior. (See https://github.com/libp2p/go-libp2p/issues/3109)
	_ = .SetDeadline(time.Now().Add())

	if  := .SetProtocol();  != nil {
		log.Warnf("error setting identify protocol for stream: %s", )
		_ = .Reset()
		return nil, fmt.Errorf("failed to set protocol: %w", )
	}

	// ok give the response to our handler.
	if  := msmux.SelectProtoOrFail(, );  != nil {
		log.Infow("failed negotiate identify protocol with peer", "peer", .RemotePeer(), "error", )
		_ = .Reset()
		return nil, fmt.Errorf("multistream mux select protocol failed: %w", )
	}
	return , nil
}

func ( *idService) ( network.Conn) error {
	,  := context.WithTimeout(context.Background(), .timeout)
	defer ()
	,  := newStreamAndNegotiate(network.WithAllowLimitedConn(, "identify"), , ID, .timeout)
	if  != nil {
		log.Debugw("error opening identify stream", "peer", .RemotePeer(), "error", )
		return 
	}

	return .handleIdentifyResponse(, false)
}

// handlePush handles incoming identify push streams
func ( *idService) ( network.Stream) {
	.SetDeadline(time.Now().Add(.timeout))
	if  := .handleIdentifyResponse(, true);  != nil {
		log.Debugf("failed to handle identify push: %s", )
	}
}

func ( *idService) ( network.Stream) {
	_ = .sendIdentifyResp(, false)
}

func ( *idService) ( network.Stream,  bool) error {
	if  := .Scope().SetService(ServiceName);  != nil {
		.Reset()
		return fmt.Errorf("failed to attaching stream to identify service: %w", )
	}
	defer .Close()

	.currentSnapshot.Lock()
	 := .currentSnapshot.snapshot
	.currentSnapshot.Unlock()

	log.Debugw("sending snapshot", "seq", .seq, "protocols", .protocols, "addrs", .addrs)

	 := .createBaseIdentifyResponse(.Conn(), &)
	.SignedPeerRecord = .getSignedRecord(&)

	log.Debugf("%s sending message to %s %s", ID, .Conn().RemotePeer(), .Conn().RemoteMultiaddr())
	if  := .writeChunkedIdentifyMsg(, );  != nil {
		return 
	}

	if .metricsTracer != nil {
		.metricsTracer.IdentifySent(, len(.Protocols), len(.ListenAddrs))
	}

	.connsMu.Lock()
	defer .connsMu.Unlock()
	,  := .conns[.Conn()]
	// The connection might already have been closed.
	// We *should* receive the Connected notification from the swarm before we're able to accept the peer's
	// Identify stream, but if that for some reason doesn't work, we also wouldn't have a map entry here.
	// The only consequence would be that we send a spurious Push to that peer later.
	if ! {
		return nil
	}
	.Sequence = .seq
	.conns[.Conn()] = 
	return nil
}

func ( *idService) ( network.Stream,  bool) error {
	if  := .Scope().SetService(ServiceName);  != nil {
		log.Warnf("error attaching stream to identify service: %s", )
		.Reset()
		return 
	}

	if  := .Scope().ReserveMemory(signedIDSize, network.ReservationPriorityAlways);  != nil {
		log.Warnf("error reserving memory for identify stream: %s", )
		.Reset()
		return 
	}
	defer .Scope().ReleaseMemory(signedIDSize)

	 := .Conn()

	 := pbio.NewDelimitedReader(, signedIDSize)
	 := &pb.Identify{}

	if  := readAllIDMessages(, );  != nil {
		log.Warn("error reading identify message: ", )
		.Reset()
		return 
	}

	defer .Close()

	log.Debugf("%s received message from %s %s", .Protocol(), .RemotePeer(), .RemoteMultiaddr())

	.consumeMessage(, , )

	if .metricsTracer != nil {
		.metricsTracer.IdentifyReceived(, len(.Protocols), len(.ListenAddrs))
	}

	.connsMu.Lock()
	defer .connsMu.Unlock()
	,  := .conns[]
	if ! { // might already have disconnected
		return nil
	}
	,  := .Host.Peerstore().SupportsProtocols(.RemotePeer(), IDPush)
	if  :=  == nil && len() > 0;  {
		.PushSupport = identifyPushSupported
	} else {
		.PushSupport = identifyPushUnsupported
	}

	if .metricsTracer != nil {
		.metricsTracer.ConnPushSupport(.PushSupport)
	}

	.conns[] = 
	return nil
}

func readAllIDMessages( pbio.Reader,  proto.Message) error {
	 := &pb.Identify{}
	for  := 0;  < maxMessages; ++ {
		switch  := .ReadMsg();  {
		case io.EOF:
			return nil
		case nil:
			proto.Merge(, )
		default:
			return 
		}
	}

	return fmt.Errorf("too many parts")
}

func ( *idService) () ( bool) {
	 := .Host.Mux().Protocols()
	slices.Sort()

	 := .Host.Addrs()
	slices.SortFunc(, func(,  ma.Multiaddr) int { return bytes.Compare(.Bytes(), .Bytes()) })

	 := len(.ProtocolVersion) + len(.UserAgent)
	for  := 0;  < len(); ++ {
		 += len([])
	}
	 = trimHostAddrList(, maxOwnIdentifyMsgSize--256) // 256 bytes of buffer

	 := identifySnapshot{
		addrs:     ,
		protocols: ,
	}

	if !.disableSignedPeerRecord {
		if ,  := peerstore.GetCertifiedAddrBook(.Host.Peerstore());  {
			.record = .GetPeerRecord(.Host.ID())
		}
	}

	.currentSnapshot.Lock()
	defer .currentSnapshot.Unlock()

	if .currentSnapshot.snapshot.Equal(&) {
		return false
	}

	.seq = .currentSnapshot.snapshot.seq + 1
	.currentSnapshot.snapshot = 

	log.Debugw("updating snapshot", "seq", .seq, "addrs", .addrs)
	return true
}

func ( *idService) ( network.Stream,  *pb.Identify) error {
	 := pbio.NewDelimitedWriter()

	if .SignedPeerRecord == nil || proto.Size() <= legacyIDSize {
		return .WriteMsg()
	}

	 := .SignedPeerRecord
	.SignedPeerRecord = nil
	if  := .WriteMsg();  != nil {
		return 
	}
	// then write just the signed record
	return .WriteMsg(&pb.Identify{SignedPeerRecord: })
}

func ( *idService) ( network.Conn,  *identifySnapshot) *pb.Identify {
	 := &pb.Identify{}

	 := .RemoteMultiaddr()
	 := .LocalMultiaddr()

	// set protocols this node is currently handling
	.Protocols = protocol.ConvertToStrings(.protocols)

	// observed address so other side is informed of their
	// "public" address, at least in relation to us.
	.ObservedAddr = .Bytes()

	// populate unsigned addresses.
	// peers that do not yet support signed addresses will need this.
	// Note: LocalMultiaddr is sometimes 0.0.0.0
	 := manet.IsIPLoopback() || manet.IsIPLoopback()
	.ListenAddrs = make([][]byte, 0, len(.addrs))
	for ,  := range .addrs {
		if ! && manet.IsIPLoopback() {
			continue
		}
		.ListenAddrs = append(.ListenAddrs, .Bytes())
	}
	// set our public key
	 := .Host.Peerstore().PubKey(.Host.ID())

	// check if we even have a public key.
	if  == nil {
		// public key is nil. We are either using insecure transport or something erratic happened.
		// check if we're even operating in "secure mode"
		if .Host.Peerstore().PrivKey(.Host.ID()) != nil {
			// private key is present. But NO public key. Something bad happened.
			log.Errorf("did not have own public key in Peerstore")
		}
		// if neither of the key is present it is safe to assume that we are using an insecure transport.
	} else {
		// public key is present. Safe to proceed.
		if ,  := crypto.MarshalPublicKey();  != nil {
			log.Errorf("failed to convert key to bytes")
		} else {
			.PublicKey = 
		}
	}

	// set protocol versions
	.ProtocolVersion = &.ProtocolVersion
	.AgentVersion = &.UserAgent

	return 
}

func ( *idService) ( *identifySnapshot) []byte {
	if .disableSignedPeerRecord || .record == nil {
		return nil
	}

	,  := .record.Marshal()
	if  != nil {
		log.Errorw("failed to marshal signed record", "err", )
		return nil
	}

	return 
}

// diff takes two slices of strings (a and b) and computes which elements were added and removed in b
func diff(,  []protocol.ID) (,  []protocol.ID) {
	// This is O(n^2), but it's fine because the slices are small.
	for ,  := range  {
		var  bool
		for ,  := range  {
			if  ==  {
				 = true
				break
			}
		}
		if ! {
			 = append(, )
		}
	}
	for ,  := range  {
		var  bool
		for ,  := range  {
			if  ==  {
				 = true
				break
			}
		}
		if ! {
			 = append(, )
		}
	}
	return
}

func ( *idService) ( *pb.Identify,  network.Conn,  bool) {
	 := .RemotePeer()

	,  := .Host.Peerstore().GetProtocols()
	 := protocol.ConvertFromStrings(.Protocols)
	,  := diff(, )
	.Host.Peerstore().SetProtocols(, ...)
	if  {
		.emitters.evtPeerProtocolsUpdated.Emit(event.EvtPeerProtocolsUpdated{
			Peer:    ,
			Added:   ,
			Removed: ,
		})
	}

	,  := ma.NewMultiaddrBytes(.GetObservedAddr())
	if  != nil {
		log.Debugf("error parsing received observed addr for %s: %s", , )
		 = nil
	}

	if  != nil && !.disableObservedAddrManager {
		// TODO refactor this to use the emitted events instead of having this func call explicitly.
		.observedAddrMgr.Record(, )
	}

	// mes.ListenAddrs
	 := .GetListenAddrs()
	 := make([]ma.Multiaddr, 0, len())
	for ,  := range  {
		,  := ma.NewMultiaddrBytes()
		if  != nil {
			log.Debugf("%s failed to parse multiaddr from %s %s", ID,
				, .RemoteMultiaddr())
			continue
		}
		 = append(, )
	}

	// NOTE: Do not add `c.RemoteMultiaddr()` to the peerstore if the remote
	// peer doesn't tell us to do so. Otherwise, we'll advertise it.
	//
	// This can cause an "addr-splosion" issue where the network will slowly
	// gossip and collect observed but unadvertised addresses. Given a NAT
	// that picks random source ports, this can cause DHT nodes to collect
	// many undialable addresses for other peers.

	// add certified addresses for the peer, if they sent us a signed peer record
	// otherwise use the unsigned addresses.
	,  := signedPeerRecordFromMessage()
	if  != nil {
		log.Debugf("error getting peer record from Identify message: %v", )
	}

	// Extend the TTLs on the known (probably) good addresses.
	// Taking the lock ensures that we don't concurrently process a disconnect.
	.addrMu.Lock()
	 := peerstore.RecentlyConnectedAddrTTL
	switch .Host.Network().Connectedness() {
	case network.Limited, network.Connected:
		 = peerstore.ConnectedAddrTTL
	}

	// Downgrade connected and recently connected addrs to a temporary TTL.
	for ,  := range []time.Duration{
		peerstore.RecentlyConnectedAddrTTL,
		peerstore.ConnectedAddrTTL,
	} {
		.Host.Peerstore().UpdateAddrs(, , peerstore.TempAddrTTL)
	}

	var  []ma.Multiaddr
	if  != nil {
		,  := .consumeSignedPeerRecord(.RemotePeer(), )
		if  != nil {
			log.Debugf("failed to consume signed peer record: %s", )
			 = nil
		} else {
			 = 
		}
	} else {
		 = 
	}
	 = filterAddrs(, .RemoteMultiaddr())
	if len() > connectedPeerMaxAddrs {
		 = [:connectedPeerMaxAddrs]
	}

	.Host.Peerstore().AddAddrs(, , )

	// Finally, expire all temporary addrs.
	.Host.Peerstore().UpdateAddrs(, peerstore.TempAddrTTL, 0)
	.addrMu.Unlock()

	log.Debugf("%s received listen addrs for %s: %s", .LocalPeer(), .RemotePeer(), )

	// get protocol versions
	 := .GetProtocolVersion()
	 := .GetAgentVersion()

	.Host.Peerstore().Put(, "ProtocolVersion", )
	.Host.Peerstore().Put(, "AgentVersion", )

	// get the key from the other side. we may not have it (no-auth transport)
	.consumeReceivedPubKey(, .PublicKey)

	.emitters.evtPeerIdentificationCompleted.Emit(event.EvtPeerIdentificationCompleted{
		Peer:             .RemotePeer(),
		Conn:             ,
		ListenAddrs:      ,
		Protocols:        ,
		SignedPeerRecord: ,
		ObservedAddr:     ,
		ProtocolVersion:  ,
		AgentVersion:     ,
	})
}

func ( *idService) ( peer.ID,  *record.Envelope) ([]ma.Multiaddr, error) {
	if .PublicKey == nil {
		return nil, errors.New("missing pubkey")
	}
	,  := peer.IDFromPublicKey(.PublicKey)
	if  != nil {
		return nil, fmt.Errorf("failed to derive peer ID: %s", )
	}
	if  !=  {
		return nil, fmt.Errorf("received signed peer record envelope for unexpected peer ID. expected %s, got %s", , )
	}
	,  := .Record()
	if  != nil {
		return nil, fmt.Errorf("failed to obtain record: %w", )
	}
	,  := .(*peer.PeerRecord)
	if ! {
		return nil, errors.New("not a peer record")
	}
	if .PeerID !=  {
		return nil, fmt.Errorf("received signed peer record for unexpected peer ID. expected %s, got %s", , .PeerID)
	}
	// Don't put the signed peer record into the peer store.
	// They're not used anywhere.
	// All we care about are the addresses.
	return .Addrs, nil
}

func ( *idService) ( network.Conn,  []byte) {
	 := .LocalPeer()
	 := .RemotePeer()

	if  == nil {
		log.Debugf("%s did not receive public key for remote peer: %s", , )
		return
	}

	,  := crypto.UnmarshalPublicKey()
	if  != nil {
		log.Warnf("%s cannot unmarshal key from remote peer: %s, %s", , , )
		return
	}

	// verify key matches peer.ID
	,  := peer.IDFromPublicKey()
	if  != nil {
		log.Debugf("%s cannot get peer.ID from key of remote peer: %s, %s", , , )
		return
	}

	if  !=  {
		// if the newKey's peer.ID does not match known peer.ID...

		if  == "" &&  != "" {
			// if local peerid is empty, then use the new, sent key.
			 := .Host.Peerstore().AddPubKey(, )
			if  != nil {
				log.Debugf("%s could not add key for %s to peerstore: %s", , , )
			}

		} else {
			// we have a local peer.ID and it does not match the sent key... error.
			log.Errorf("%s received key for remote peer %s mismatch: %s", , , )
		}
		return
	}

	 := .Host.Peerstore().PubKey()
	if  == nil {
		// no key? no auth transport. set this one.
		 := .Host.Peerstore().AddPubKey(, )
		if  != nil {
			log.Debugf("%s could not add key for %s to peerstore: %s", , , )
		}
		return
	}

	// ok, we have a local key, we should verify they match.
	if .Equals() {
		return // ok great. we're done.
	}

	// weird, got a different key... but the different key MATCHES the peer.ID.
	// this odd. let's log error and investigate. this should basically never happen
	// and it means we have something funky going on and possibly a bug.
	log.Errorf("%s identify got a different key for: %s", , )

	// okay... does ours NOT match the remote peer.ID?
	,  := peer.IDFromPublicKey()
	if  != nil {
		log.Errorf("%s cannot get peer.ID from local key of remote peer: %s, %s", , , )
		return
	}
	if  !=  {
		log.Errorf("%s local key for remote peer %s yields different peer.ID: %s", , , )
		return
	}

	// okay... curr key DOES NOT match new key. both match peer.ID. wat?
	log.Errorf("%s local key and received key for %s do not match, but match peer.ID", , )
}

// HasConsistentTransport returns true if the address 'a' shares a
// protocol set with any address in the green set. This is used
// to check if a given address might be one of the addresses a peer is
// listening on.
func ( ma.Multiaddr,  []ma.Multiaddr) bool {
	 := func(,  []ma.Protocol) bool {
		if len() != len() {
			return false
		}

		for ,  := range  {
			if [].Code != .Code {
				return false
			}
		}
		return true
	}

	 := .Protocols()

	for ,  := range  {
		if (, .Protocols()) {
			return true
		}
	}

	return false
}

// addConnWithLock assuems caller holds the connsMu lock
func ( *idService) ( network.Conn) {
	,  := .conns[]
	if ! {
		<-.setupCompleted
		.conns[] = entry{}
	}
}

func signedPeerRecordFromMessage( *pb.Identify) (*record.Envelope, error) {
	if len(.SignedPeerRecord) == 0 {
		return nil, nil
	}
	, ,  := record.ConsumeEnvelope(.SignedPeerRecord, peer.PeerRecordEnvelopeDomain)
	return , 
}

// netNotifiee defines methods to be used with the swarm
type netNotifiee idService

func ( *netNotifiee) () *idService {
	return (*idService)()
}

func ( *netNotifiee) ( network.Network,  network.Conn) {
	 := .IDService()

	.connsMu.Lock()
	.addConnWithLock()
	.connsMu.Unlock()

	.IDService().IdentifyWait()
}

func ( *netNotifiee) ( network.Network,  network.Conn) {
	 := .IDService()

	// Stop tracking the connection.
	.connsMu.Lock()
	delete(.conns, )
	.connsMu.Unlock()

	if !.disableObservedAddrManager {
		.observedAddrMgr.removeConn()
	}

	// Last disconnect.
	// Undo the setting of addresses to peer.ConnectedAddrTTL we did
	.addrMu.Lock()
	defer .addrMu.Unlock()

	// This check MUST happen after acquiring the Lock as identify on a different connection
	// might be trying to add addresses.
	switch .Host.Network().Connectedness(.RemotePeer()) {
	case network.Connected, network.Limited:
		return
	}
	// peerstore returns the elements in a random order as it uses a map to store the addresses
	 := .Host.Peerstore().Addrs(.RemotePeer())
	 := len()
	if  > recentlyConnectedPeerMaxAddrs {
		// We want to always save the address we are connected to
		for ,  := range  {
			if .Equal(.RemoteMultiaddr()) {
				[], [0] = [0], []
			}
		}
		 = recentlyConnectedPeerMaxAddrs
	}
	.Host.Peerstore().UpdateAddrs(.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.TempAddrTTL)
	.Host.Peerstore().AddAddrs(.RemotePeer(), [:], peerstore.RecentlyConnectedAddrTTL)
	.Host.Peerstore().UpdateAddrs(.RemotePeer(), peerstore.TempAddrTTL, 0)
}

func ( *netNotifiee) ( network.Network,  ma.Multiaddr)      {}
func ( *netNotifiee) ( network.Network,  ma.Multiaddr) {}

// filterAddrs filters the address slice based on the remote multiaddr:
//   - if it's a localhost address, no filtering is applied
//   - if it's a private network address, all localhost addresses are filtered out
//   - if it's a public address, all non-public addresses are filtered out
//   - if none of the above, (e.g. discard prefix), no filtering is applied.
//     We can't do anything meaningful here so we do nothing.
func filterAddrs( []ma.Multiaddr,  ma.Multiaddr) []ma.Multiaddr {
	switch {
	case manet.IsIPLoopback():
		return 
	case manet.IsPrivateAddr():
		return ma.FilterAddrs(, func( ma.Multiaddr) bool { return !manet.IsIPLoopback() })
	case manet.IsPublicAddr():
		return ma.FilterAddrs(, manet.IsPublicAddr)
	default:
		return 
	}
}

func trimHostAddrList( []ma.Multiaddr,  int) []ma.Multiaddr {
	 := 0
	for ,  := range  {
		 += len(.Bytes())
	}
	if  <=  {
		return 
	}

	 := func( ma.Multiaddr) int {
		var  int
		if manet.IsPublicAddr() {
			 |= 1 << 12
		} else if !manet.IsIPLoopback() {
			 |= 1 << 11
		}
		var  int
		ma.ForEach(, func( ma.Component) bool {
			switch .Protocol().Code {
			case ma.P_QUIC_V1:
				 = 5
			case ma.P_TCP:
				 = 4
			case ma.P_WSS:
				 = 3
			case ma.P_WEBTRANSPORT:
				 = 2
			case ma.P_WEBRTC_DIRECT:
				 = 1
			case ma.P_P2P:
				return false
			}
			return true
		})
		 |= 1 << 
		return 
	}

	slices.SortStableFunc(, func(,  ma.Multiaddr) int {
		return () - () // b-a for reverse order
	})
	 = 0
	for ,  := range  {
		 += len(.Bytes())
		if  >  {
			 = [:]
			break
		}
	}
	return 
}