package pubsub

import (
	

	
	
	
	
)

func ( *PubSub) ( context.Context) {
	// We don't bother subscribing to "connectivity" events because we always run identify after
	// every new connection.
	,  := .host.EventBus().Subscribe([]interface{}{
		&event.EvtPeerIdentificationCompleted{},
		&event.EvtPeerProtocolsUpdated{},
	})
	if  != nil {
		log.Errorf("failed to subscribe to peer identification events: %v", )
		return
	}
	defer .Close()

	.newPeersPrioLk.RLock()
	.newPeersMx.Lock()
	for ,  := range .host.Network().Peers() {
		if .host.Network().Connectedness() != network.Connected {
			continue
		}
		.newPeersPend[] = struct{}{}
	}
	.newPeersMx.Unlock()
	.newPeersPrioLk.RUnlock()

	select {
	case .newPeers <- struct{}{}:
	default:
	}

	var  func(protocol.ID) bool
	if .protoMatchFunc != nil {
		var  []func(protocol.ID) bool
		for ,  := range .rt.Protocols() {

			 = append(, .protoMatchFunc())
		}
		 = func( protocol.ID) bool {
			for ,  := range  {
				if ()() {
					return true
				}
			}
			return false
		}
	} else {
		 := make(map[protocol.ID]struct{})
		for ,  := range .rt.Protocols() {
			[] = struct{}{}
		}
		 = func( protocol.ID) bool {
			,  := []
			return 
		}
	}

	for .Err() == nil {
		var  any
		select {
		case <-.Done():
			return
		case  = <-.Out():
		}

		var  []protocol.ID
		var  peer.ID
		switch ev := .(type) {
		case event.EvtPeerIdentificationCompleted:
			 = .Peer
			 = .Protocols
		case event.EvtPeerProtocolsUpdated:
			 = .Peer
			 = .Added
		default:
			continue
		}

		// We don't bother checking connectivity (connected and non-"limited") here because
		// we'll check when actually handling the new peer.

		for ,  := range  {
			if () {
				.notifyNewPeer()
				break
			}
		}
	}

}

func ( *PubSub) ( peer.ID) {
	.newPeersPrioLk.RLock()
	.newPeersMx.Lock()
	.newPeersPend[] = struct{}{}
	.newPeersMx.Unlock()
	.newPeersPrioLk.RUnlock()

	select {
	case .newPeers <- struct{}{}:
	default:
	}
}