package pubsub

import (
	
	
	
	
	
	
	

	pb 

	
	
	
	
	
	
	
	

	
)

const (
	// GossipSubID_v10 is the protocol ID for version 1.0.0 of the GossipSub protocol.
	// It is advertised along with GossipSubID_v11 and GossipSubID_v12 for backwards compatibility.
	GossipSubID_v10 = protocol.ID("/meshsub/1.0.0")

	// GossipSubID_v11 is the protocol ID for version 1.1.0 of the GossipSub protocol.
	// It is advertised along with GossipSubID_v12 for backwards compatibility.
	// See the spec for details about how v1.1.0 compares to v1.0.0:
	// https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md
	GossipSubID_v11 = protocol.ID("/meshsub/1.1.0")

	// GossipSubID_v12 is the protocol ID for version 1.2.0 of the GossipSub protocol.
	// See the spec for details about how v1.2.0 compares to v1.1.0:
	// https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md
	GossipSubID_v12 = protocol.ID("/meshsub/1.2.0")
)

// Defines the default gossipsub parameters.
var (
	GossipSubD                                = 6
	GossipSubDlo                              = 5
	GossipSubDhi                              = 12
	GossipSubDscore                           = 4
	GossipSubDout                             = 2
	GossipSubHistoryLength                    = 5
	GossipSubHistoryGossip                    = 3
	GossipSubDlazy                            = 6
	GossipSubGossipFactor                     = 0.25
	GossipSubGossipRetransmission             = 3
	GossipSubHeartbeatInitialDelay            = 100 * time.Millisecond
	GossipSubHeartbeatInterval                = 1 * time.Second
	GossipSubFanoutTTL                        = 60 * time.Second
	GossipSubPrunePeers                       = 16
	GossipSubPruneBackoff                     = time.Minute
	GossipSubUnsubscribeBackoff               = 10 * time.Second
	GossipSubConnectors                       = 8
	GossipSubMaxPendingConnections            = 128
	GossipSubConnectionTimeout                = 30 * time.Second
	GossipSubDirectConnectTicks        uint64 = 300
	GossipSubDirectConnectInitialDelay        = time.Second
	GossipSubOpportunisticGraftTicks   uint64 = 60
	GossipSubOpportunisticGraftPeers          = 2
	GossipSubGraftFloodThreshold              = 10 * time.Second
	GossipSubMaxIHaveLength                   = 5000
	GossipSubMaxIHaveMessages                 = 10
	GossipSubMaxIDontWantLength               = 10
	GossipSubMaxIDontWantMessages             = 1000
	GossipSubIWantFollowupTime                = 3 * time.Second
	GossipSubIDontWantMessageThreshold        = 1024 // 1KB
	GossipSubIDontWantMessageTTL              = 3    // 3 heartbeats
)

type checksum struct {
	payload [32]byte
	length  uint8
}

// GossipSubParams defines all the gossipsub specific parameters.
type GossipSubParams struct {
	// overlay parameters.

	// D sets the optimal degree for a GossipSub topic mesh. For example, if D == 6,
	// each peer will want to have about six peers in their mesh for each topic they're subscribed to.
	// D should be set somewhere between Dlo and Dhi.
	D int

	// Dlo sets the lower bound on the number of peers we keep in a GossipSub topic mesh.
	// If we have fewer than Dlo peers, we will attempt to graft some more into the mesh at
	// the next heartbeat.
	Dlo int

	// Dhi sets the upper bound on the number of peers we keep in a GossipSub topic mesh.
	// If we have more than Dhi peers, we will select some to prune from the mesh at the next heartbeat.
	Dhi int

	// Dscore affects how peers are selected when pruning a mesh due to over subscription.
	// At least Dscore of the retained peers will be high-scoring, while the remainder are
	// chosen randomly.
	Dscore int

	// Dout sets the quota for the number of outbound connections to maintain in a topic mesh.
	// When the mesh is pruned due to over subscription, we make sure that we have outbound connections
	// to at least Dout of the survivor peers. This prevents sybil attackers from overwhelming
	// our mesh with incoming connections.
	//
	// Dout must be set below Dlo, and must not exceed D / 2.
	Dout int

	// gossip parameters

	// HistoryLength controls the size of the message cache used for gossip.
	// The message cache will remember messages for HistoryLength heartbeats.
	HistoryLength int

	// HistoryGossip controls how many cached message ids we will advertise in
	// IHAVE gossip messages. When asked for our seen message IDs, we will return
	// only those from the most recent HistoryGossip heartbeats. The slack between
	// HistoryGossip and HistoryLength allows us to avoid advertising messages
	// that will be expired by the time they're requested.
	//
	// HistoryGossip must be less than or equal to HistoryLength to
	// avoid a runtime panic.
	HistoryGossip int

	// Dlazy affects how many peers we will emit gossip to at each heartbeat.
	// We will send gossip to at least Dlazy peers outside our mesh. The actual
	// number may be more, depending on GossipFactor and how many peers we're
	// connected to.
	Dlazy int

	// GossipFactor affects how many peers we will emit gossip to at each heartbeat.
	// We will send gossip to GossipFactor * (total number of non-mesh peers), or
	// Dlazy, whichever is greater.
	GossipFactor float64

	// GossipRetransmission controls how many times we will allow a peer to request
	// the same message id through IWANT gossip before we start ignoring them. This is designed
	// to prevent peers from spamming us with requests and wasting our resources.
	GossipRetransmission int

	// heartbeat interval

	// HeartbeatInitialDelay is the short delay before the heartbeat timer begins
	// after the router is initialized.
	HeartbeatInitialDelay time.Duration

	// HeartbeatInterval controls the time between heartbeats.
	HeartbeatInterval time.Duration

	// SlowHeartbeatWarning is the duration threshold for heartbeat processing before emitting
	// a warning; this would be indicative of an overloaded peer.
	SlowHeartbeatWarning float64

	// FanoutTTL controls how long we keep track of the fanout state. If it's been
	// FanoutTTL since we've published to a topic that we're not subscribed to,
	// we'll delete the fanout map for that topic.
	FanoutTTL time.Duration

	// PrunePeers controls the number of peers to include in prune Peer eXchange.
	// When we prune a peer that's eligible for PX (has a good score, etc), we will try to
	// send them signed peer records for up to PrunePeers other peers that we
	// know of.
	PrunePeers int

	// PruneBackoff controls the backoff time for pruned peers. This is how long
	// a peer must wait before attempting to graft into our mesh again after being pruned.
	// When pruning a peer, we send them our value of PruneBackoff so they know
	// the minimum time to wait. Peers running older versions may not send a backoff time,
	// so if we receive a prune message without one, we will wait at least PruneBackoff
	// before attempting to re-graft.
	PruneBackoff time.Duration

	// UnsubscribeBackoff controls the backoff time to use when unsuscribing
	// from a topic. A peer should not resubscribe to this topic before this
	// duration.
	UnsubscribeBackoff time.Duration

	// Connectors controls the number of active connection attempts for peers obtained through PX.
	Connectors int

	// MaxPendingConnections sets the maximum number of pending connections for peers attempted through px.
	MaxPendingConnections int

	// ConnectionTimeout controls the timeout for connection attempts.
	ConnectionTimeout time.Duration

	// DirectConnectTicks is the number of heartbeat ticks for attempting to reconnect direct peers
	// that are not currently connected.
	DirectConnectTicks uint64

	// DirectConnectInitialDelay is the initial delay before opening connections to direct peers
	DirectConnectInitialDelay time.Duration

	// OpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh
	// with opportunistic grafting. Every OpportunisticGraftTicks we will attempt to select some
	// high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls
	// below a threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds).
	OpportunisticGraftTicks uint64

	// OpportunisticGraftPeers is the number of peers to opportunistically graft.
	OpportunisticGraftPeers int

	// If a GRAFT comes before GraftFloodThreshold has elapsed since the last PRUNE,
	// then there is an extra score penalty applied to the peer through P7.
	GraftFloodThreshold time.Duration

	// MaxIHaveLength is the maximum number of messages to include in an IHAVE message.
	// Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
	// peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
	// default if your system is pushing more than 5000 messages in HistoryGossip heartbeats;
	// with the defaults this is 1666 messages/s.
	MaxIHaveLength int

	// MaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat.
	MaxIHaveMessages int

	// MaxIDontWantLength is the maximum number of messages to include in an IDONTWANT message. Also controls
	// the maximum number of IDONTWANT ids we will accept to protect against IDONTWANT floods. This value
	// should be adjusted if your system anticipates a larger amount than specified per heartbeat.
	MaxIDontWantLength int
	// MaxIDontWantMessages is the maximum number of IDONTWANT messages to accept from a peer within a heartbeat.
	MaxIDontWantMessages int

	// Time to wait for a message requested through IWANT following an IHAVE advertisement.
	// If the message is not received within this window, a broken promise is declared and
	// the router may apply bahavioural penalties.
	IWantFollowupTime time.Duration

	// IDONTWANT is only sent for messages larger than the threshold. This should be greater than
	// D_high * the size of the message id. Otherwise, the attacker can do the amplication attack by sending
	// small messages while the receiver replies back with larger IDONTWANT messages.
	IDontWantMessageThreshold int

	// IDONTWANT is cleared when it's older than the TTL.
	IDontWantMessageTTL int
}

// NewGossipSub returns a new PubSub object using the default GossipSubRouter as the router.
func ( context.Context,  host.Host,  ...Option) (*PubSub, error) {
	 := DefaultGossipSubRouter()
	 = append(, WithRawTracer(.tagTracer))
	return NewGossipSubWithRouter(, , , ...)
}

// NewGossipSubWithRouter returns a new PubSub object using the given router.
func ( context.Context,  host.Host,  PubSubRouter,  ...Option) (*PubSub, error) {
	return NewPubSub(, , , ...)
}

// DefaultGossipSubRouter returns a new GossipSubRouter with default parameters.
func ( host.Host) *GossipSubRouter {
	 := DefaultGossipSubParams()
	return &GossipSubRouter{
		peers:        make(map[peer.ID]protocol.ID),
		mesh:         make(map[string]map[peer.ID]struct{}),
		fanout:       make(map[string]map[peer.ID]struct{}),
		lastpub:      make(map[string]int64),
		gossip:       make(map[peer.ID][]*pb.ControlIHave),
		control:      make(map[peer.ID]*pb.ControlMessage),
		backoff:      make(map[string]map[peer.ID]time.Time),
		peerhave:     make(map[peer.ID]int),
		peerdontwant: make(map[peer.ID]int),
		unwanted:     make(map[peer.ID]map[checksum]int),
		iasked:       make(map[peer.ID]int),
		outbound:     make(map[peer.ID]bool),
		connect:      make(chan connectInfo, .MaxPendingConnections),
		cab:          pstoremem.NewAddrBook(),
		mcache:       NewMessageCache(.HistoryGossip, .HistoryLength),
		protos:       GossipSubDefaultProtocols,
		feature:      GossipSubDefaultFeatures,
		tagTracer:    newTagTracer(.ConnManager()),
		params:       ,
	}
}

// DefaultGossipSubParams returns the default gossip sub parameters
// as a config.
func () GossipSubParams {
	return GossipSubParams{
		D:                         GossipSubD,
		Dlo:                       GossipSubDlo,
		Dhi:                       GossipSubDhi,
		Dscore:                    GossipSubDscore,
		Dout:                      GossipSubDout,
		HistoryLength:             GossipSubHistoryLength,
		HistoryGossip:             GossipSubHistoryGossip,
		Dlazy:                     GossipSubDlazy,
		GossipFactor:              GossipSubGossipFactor,
		GossipRetransmission:      GossipSubGossipRetransmission,
		HeartbeatInitialDelay:     GossipSubHeartbeatInitialDelay,
		HeartbeatInterval:         GossipSubHeartbeatInterval,
		FanoutTTL:                 GossipSubFanoutTTL,
		PrunePeers:                GossipSubPrunePeers,
		PruneBackoff:              GossipSubPruneBackoff,
		UnsubscribeBackoff:        GossipSubUnsubscribeBackoff,
		Connectors:                GossipSubConnectors,
		MaxPendingConnections:     GossipSubMaxPendingConnections,
		ConnectionTimeout:         GossipSubConnectionTimeout,
		DirectConnectTicks:        GossipSubDirectConnectTicks,
		DirectConnectInitialDelay: GossipSubDirectConnectInitialDelay,
		OpportunisticGraftTicks:   GossipSubOpportunisticGraftTicks,
		OpportunisticGraftPeers:   GossipSubOpportunisticGraftPeers,
		GraftFloodThreshold:       GossipSubGraftFloodThreshold,
		MaxIHaveLength:            GossipSubMaxIHaveLength,
		MaxIHaveMessages:          GossipSubMaxIHaveMessages,
		MaxIDontWantLength:        GossipSubMaxIDontWantLength,
		MaxIDontWantMessages:      GossipSubMaxIDontWantMessages,
		IWantFollowupTime:         GossipSubIWantFollowupTime,
		IDontWantMessageThreshold: GossipSubIDontWantMessageThreshold,
		IDontWantMessageTTL:       GossipSubIDontWantMessageTTL,
		SlowHeartbeatWarning:      0.1,
	}
}

// WithPeerScore is a gossipsub router option that enables peer scoring.
func ( *PeerScoreParams,  *PeerScoreThresholds) Option {
	return func( *PubSub) error {
		,  := .rt.(*GossipSubRouter)
		if ! {
			return fmt.Errorf("pubsub router is not gossipsub")
		}

		// sanity check: validate the score parameters
		 := .validate()
		if  != nil {
			return 
		}

		// sanity check: validate the threshold values
		 = .validate()
		if  != nil {
			return 
		}

		.score = newPeerScore()
		.gossipThreshold = .GossipThreshold
		.publishThreshold = .PublishThreshold
		.graylistThreshold = .GraylistThreshold
		.acceptPXThreshold = .AcceptPXThreshold
		.opportunisticGraftThreshold = .OpportunisticGraftThreshold

		.gossipTracer = newGossipTracer()

		// hook the tracer
		if .tracer != nil {
			.tracer.raw = append(.tracer.raw, .score, .gossipTracer)
		} else {
			.tracer = &pubsubTracer{
				raw:   []RawTracer{.score, .gossipTracer},
				pid:   .host.ID(),
				idGen: .idGen,
			}
		}

		return nil
	}
}

// WithFloodPublish is a gossipsub router option that enables flood publishing.
// When this is enabled, published messages are forwarded to all peers with score >=
// to publishThreshold
func ( bool) Option {
	return func( *PubSub) error {
		,  := .rt.(*GossipSubRouter)
		if ! {
			return fmt.Errorf("pubsub router is not gossipsub")
		}

		.floodPublish = 

		return nil
	}
}

// WithPeerExchange is a gossipsub router option that enables Peer eXchange on PRUNE.
// This should generally be enabled in bootstrappers and well connected/trusted nodes
// used for bootstrapping.
func ( bool) Option {
	return func( *PubSub) error {
		,  := .rt.(*GossipSubRouter)
		if ! {
			return fmt.Errorf("pubsub router is not gossipsub")
		}

		.doPX = 

		return nil
	}
}

// WithDirectPeers is a gossipsub router option that specifies peers with direct
// peering agreements. These peers are connected outside of the mesh, with all (valid)
// message unconditionally forwarded to them. The router will maintain open connections
// to these peers. Note that the peering agreement should be reciprocal with direct peers
// symmetrically configured at both ends.
func ( []peer.AddrInfo) Option {
	return func( *PubSub) error {
		,  := .rt.(*GossipSubRouter)
		if ! {
			return fmt.Errorf("pubsub router is not gossipsub")
		}

		 := make(map[peer.ID]struct{})
		for ,  := range  {
			[.ID] = struct{}{}
			.host.Peerstore().AddAddrs(.ID, .Addrs, peerstore.PermanentAddrTTL)
		}

		.direct = 

		if .tagTracer != nil {
			.tagTracer.direct = 
		}

		return nil
	}
}

// WithDirectConnectTicks is a gossipsub router option that sets the number of
// heartbeat ticks between attempting to reconnect direct peers that are not
// currently connected. A "tick" is based on the heartbeat interval, which is
// 1s by default. The default value for direct connect ticks is 300.
func ( uint64) Option {
	return func( *PubSub) error {
		,  := .rt.(*GossipSubRouter)
		if ! {
			return fmt.Errorf("pubsub router is not gossipsub")
		}
		.params.DirectConnectTicks = 
		return nil
	}
}

// WithGossipSubParams is a gossip sub router option that allows a custom
// config to be set when instantiating the gossipsub router.
func ( GossipSubParams) Option {
	return func( *PubSub) error {
		,  := .rt.(*GossipSubRouter)
		if ! {
			return fmt.Errorf("pubsub router is not gossipsub")
		}
		// Overwrite current config and associated variables in the router.
		.params = 
		.connect = make(chan connectInfo, .MaxPendingConnections)
		.mcache = NewMessageCache(.HistoryGossip, .HistoryLength)

		return nil
	}
}

// GossipSubRouter is a router that implements the gossipsub protocol.
// For each topic we have joined, we maintain an overlay through which
// messages flow; this is the mesh map.
// For each topic we publish to without joining, we maintain a list of peers
// to use for injecting our messages in the overlay with stable routes; this
// is the fanout map. Fanout peer lists are expired if we don't publish any
// messages to their topic for GossipSubFanoutTTL.
type GossipSubRouter struct {
	p            *PubSub
	peers        map[peer.ID]protocol.ID          // peer protocols
	direct       map[peer.ID]struct{}             // direct peers
	mesh         map[string]map[peer.ID]struct{}  // topic meshes
	fanout       map[string]map[peer.ID]struct{}  // topic fanout
	lastpub      map[string]int64                 // last publish time for fanout topics
	gossip       map[peer.ID][]*pb.ControlIHave   // pending gossip
	control      map[peer.ID]*pb.ControlMessage   // pending control messages
	peerhave     map[peer.ID]int                  // number of IHAVEs received from peer in the last heartbeat
	peerdontwant map[peer.ID]int                  // number of IDONTWANTs received from peer in the last heartbeat
	unwanted     map[peer.ID]map[checksum]int     // TTL of the message ids peers don't want
	iasked       map[peer.ID]int                  // number of messages we have asked from peer in the last heartbeat
	outbound     map[peer.ID]bool                 // connection direction cache, marks peers with outbound connections
	backoff      map[string]map[peer.ID]time.Time // prune backoff
	connect      chan connectInfo                 // px connection requests
	cab          peerstore.AddrBook

	protos  []protocol.ID
	feature GossipSubFeatureTest

	mcache       *MessageCache
	tracer       *pubsubTracer
	score        *peerScore
	gossipTracer *gossipTracer
	tagTracer    *tagTracer
	gate         *peerGater

	// config for gossipsub parameters
	params GossipSubParams

	// whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted
	// nodes.
	doPX bool

	// threshold for accepting PX from a peer; this should be positive and limited to scores
	// attainable by bootstrappers and trusted nodes
	acceptPXThreshold float64

	// threshold for peer score to emit/accept gossip
	// If the peer score is below this threshold, we won't emit or accept gossip from the peer.
	// When there is no score, this value is 0.
	gossipThreshold float64

	// flood publish score threshold; we only publish to peers with score >= to the threshold
	// when using flood publishing or the peer is a fanout or floodsub peer.
	publishThreshold float64

	// threshold for peer score before we graylist the peer and silently ignore its RPCs
	graylistThreshold float64

	// threshold for median peer score before triggering opportunistic grafting
	opportunisticGraftThreshold float64

	// whether to use flood publishing
	floodPublish bool

	// number of heartbeats since the beginning of time; this allows us to amortize some resource
	// clean up -- eg backoff clean up.
	heartbeatTicks uint64
}

type connectInfo struct {
	p   peer.ID
	spr *record.Envelope
}

func ( *GossipSubRouter) () []protocol.ID {
	return .protos
}

func ( *GossipSubRouter) ( *PubSub) {
	.p = 
	.tracer = .tracer

	// start the scoring
	.score.Start()

	// and the gossip tracing
	.gossipTracer.Start()

	// and the tracer for connmgr tags
	.tagTracer.Start()

	// start using the same msg ID function as PubSub for caching messages.
	.mcache.SetMsgIdFn(.idGen.ID)

	// start the heartbeat
	go .heartbeatTimer()

	// start the PX connectors
	for  := 0;  < .params.Connectors; ++ {
		go .connector()
	}

	// Manage our address book from events emitted by libp2p
	go .manageAddrBook()

	// connect to direct peers
	if len(.direct) > 0 {
		go func() {
			if .params.DirectConnectInitialDelay > 0 {
				time.Sleep(.params.DirectConnectInitialDelay)
			}
			for  := range .direct {
				.connect <- connectInfo{p: }
			}
		}()
	}
}

func ( *GossipSubRouter) () {
	,  := .p.host.EventBus().Subscribe([]interface{}{
		&event.EvtPeerIdentificationCompleted{},
		&event.EvtPeerConnectednessChanged{},
	})
	if  != nil {
		log.Errorf("failed to subscribe to peer identification events: %v", )
		return
	}
	defer .Close()

	for {
		select {
		case <-.p.ctx.Done():
			,  := .cab.(io.Closer)
			if  {
				 := .Close()
				if  != nil {
					log.Warnf("failed to close addr book: %v", )
				}
			}
			return
		case  := <-.Out():
			switch ev := .(type) {
			case event.EvtPeerIdentificationCompleted:
				if .SignedPeerRecord != nil {
					,  := peerstore.GetCertifiedAddrBook(.cab)
					if  {
						 := peerstore.RecentlyConnectedAddrTTL
						if .p.host.Network().Connectedness(.Peer) == network.Connected {
							 = peerstore.ConnectedAddrTTL
						}
						,  := .ConsumePeerRecord(.SignedPeerRecord, )
						if  != nil {
							log.Warnf("failed to consume signed peer record: %v", )
						}
					}
				}
			case event.EvtPeerConnectednessChanged:
				if .Connectedness != network.Connected {
					.cab.UpdateAddrs(.Peer, peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
				}
			}
		}
	}
}

func ( *GossipSubRouter) ( peer.ID,  protocol.ID) {
	log.Debugf("PEERUP: Add new peer %s using %s", , )
	.tracer.AddPeer(, )
	.peers[] = 

	// track the connection direction
	 := false
	 := .p.host.Network().ConnsToPeer()
:
	for ,  := range  {
		 := .Stat()

		if .Limited {
			continue
		}

		if .Direction == network.DirOutbound {
			// only count the connection if it has a pubsub stream
			for ,  := range .GetStreams() {
				if .Protocol() ==  {
					 = true
					break 
				}
			}
		}
	}
	.outbound[] = 
}

func ( *GossipSubRouter) ( peer.ID) {
	log.Debugf("PEERDOWN: Remove disconnected peer %s", )
	.tracer.RemovePeer()
	delete(.peers, )
	for ,  := range .mesh {
		delete(, )
	}
	for ,  := range .fanout {
		delete(, )
	}
	delete(.gossip, )
	delete(.control, )
	delete(.outbound, )
}

func ( *GossipSubRouter) ( string,  int) bool {
	// check all peers in the topic
	,  := .p.topics[]
	if ! {
		return false
	}

	,  := 0, 0
	// floodsub peers
	for  := range  {
		if !.feature(GossipSubFeatureMesh, .peers[]) {
			++
		}
	}

	// gossipsub peers
	 = len(.mesh[])

	if  == 0 {
		 = .params.Dlo
	}

	if + >=  ||  >= .params.Dhi {
		return true
	}

	return false
}

func ( *GossipSubRouter) ( peer.ID) AcceptStatus {
	,  := .direct[]
	if  {
		return AcceptAll
	}

	if .score.Score() < .graylistThreshold {
		return AcceptNone
	}

	return .gate.AcceptFrom()
}

// PreValidation sends the IDONTWANT control messages to all the mesh
// peers. They need to be sent right before the validation because they
// should be seen by the peers as soon as possible.
func ( *GossipSubRouter) ( []*Message) {
	 := make(map[string][]string)
	for ,  := range  {
		if len(.GetData()) < .params.IDontWantMessageThreshold {
			continue
		}
		 := .GetTopic()
		[] = append([], .p.idGen.ID())
	}
	for ,  := range  {
		if len() == 0 {
			continue
		}
		// shuffle the messages got from the RPC envelope
		shuffleStrings()
		// send IDONTWANT to all the mesh peers
		for  := range .mesh[] {
			// send to only peers that support IDONTWANT
			if .feature(GossipSubFeatureIdontwant, .peers[]) {
				 := []*pb.ControlIDontWant{{MessageIDs: }}
				 := rpcWithControl(nil, nil, nil, nil, nil, )
				.sendRPC(, , true)
			}
		}
	}
}

func ( *GossipSubRouter) ( *RPC) {
	 := .GetControl()
	if  == nil {
		return
	}

	 := .handleIHave(.from, )
	 := .handleIWant(.from, )
	 := .handleGraft(.from, )
	.handlePrune(.from, )
	.handleIDontWant(.from, )

	if len() == 0 && len() == 0 && len() == 0 {
		return
	}

	 := rpcWithControl(, nil, , nil, , nil)
	.sendRPC(.from, , false)
}

func ( *GossipSubRouter) ( peer.ID,  *pb.ControlMessage) []*pb.ControlIWant {
	// we ignore IHAVE gossip from any peer whose score is below the gossip threshold
	 := .score.Score()
	if  < .gossipThreshold {
		log.Debugf("IHAVE: ignoring peer %s with score below threshold [score = %f]", , )
		return nil
	}

	// IHAVE flood protection
	.peerhave[]++
	if .peerhave[] > .params.MaxIHaveMessages {
		log.Debugf("IHAVE: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", , .peerhave[])
		return nil
	}
	if .iasked[] >= .params.MaxIHaveLength {
		log.Debugf("IHAVE: peer %s has already advertised too many messages (%d); ignoring", , .iasked[])
		return nil
	}

	 := make(map[string]struct{})
	for ,  := range .GetIhave() {
		 := .GetTopicID()
		,  := .mesh[]
		if ! {
			continue
		}

		if !.p.peerFilter(, ) {
			continue
		}

	:
		for ,  := range .GetMessageIDs() {
			// prevent remote peer from sending too many msg_ids on a single IHAVE message
			if  >= .params.MaxIHaveLength {
				log.Debugf("IHAVE: peer %s has sent IHAVE on topic %s with too many messages (%d); ignoring remaining msgs", , , len(.MessageIDs))
				break 
			}

			if .p.seenMessage() {
				continue
			}
			[] = struct{}{}
		}
	}

	if len() == 0 {
		return nil
	}

	 := len()
	if +.iasked[] > .params.MaxIHaveLength {
		 = .params.MaxIHaveLength - .iasked[]
	}

	log.Debugf("IHAVE: Asking for %d out of %d messages from %s", , len(), )

	 := make([]string, 0, len())
	for  := range  {
		 = append(, )
	}

	// ask in random order
	shuffleStrings()

	// truncate to the messages we are actually asking for and update the iasked counter
	 = [:]
	.iasked[] += 

	.gossipTracer.AddPromise(, )

	return []*pb.ControlIWant{{MessageIDs: }}
}

func ( *GossipSubRouter) ( peer.ID,  *pb.ControlMessage) []*pb.Message {
	// we don't respond to IWANT requests from any peer whose score is below the gossip threshold
	 := .score.Score()
	if  < .gossipThreshold {
		log.Debugf("IWANT: ignoring peer %s with score below threshold [score = %f]", , )
		return nil
	}

	 := make(map[string]*pb.Message)
	for ,  := range .GetIwant() {
		for ,  := range .GetMessageIDs() {
			// Check if that peer has sent IDONTWANT before, if so don't send them the message
			if ,  := .unwanted[][computeChecksum()];  {
				continue
			}

			, ,  := .mcache.GetForPeer(, )
			if ! {
				continue
			}

			if !.p.peerFilter(, .GetTopic()) {
				continue
			}

			if  > .params.GossipRetransmission {
				log.Debugf("IWANT: Peer %s has asked for message %s too many times; ignoring request", , )
				continue
			}

			[] = .Message
		}
	}

	if len() == 0 {
		return nil
	}

	log.Debugf("IWANT: Sending %d messages to %s", len(), )

	 := make([]*pb.Message, 0, len())
	for ,  := range  {
		 = append(, )
	}

	return 
}

func ( *GossipSubRouter) ( peer.ID,  *pb.ControlMessage) []*pb.ControlPrune {
	var  []string

	 := .doPX
	 := .score.Score()
	 := time.Now()

	for ,  := range .GetGraft() {
		 := .GetTopicID()

		if !.p.peerFilter(, ) {
			continue
		}

		,  := .mesh[]
		if ! {
			// don't do PX when there is an unknown topic to avoid leaking our peers
			 = false
			// spam hardening: ignore GRAFTs for unknown topics
			continue
		}

		// check if it is already in the mesh; if so do nothing (we might have concurrent grafting)
		,  := []
		if  {
			continue
		}

		// we don't GRAFT to/from direct peers; complain loudly if this happens
		,  := .direct[]
		if  {
			log.Warnf("GRAFT: ignoring request from direct peer %s", )
			// this is possibly a bug from non-reciprocal configuration; send a PRUNE
			 = append(, )
			// but don't PX
			 = false
			continue
		}

		// make sure we are not backing off that peer
		,  := .backoff[][]
		if  && .Before() {
			log.Debugf("GRAFT: ignoring backed off peer %s", )
			// add behavioural penalty
			.score.AddPenalty(, 1)
			// no PX
			 = false
			// check the flood cutoff -- is the GRAFT coming too fast?
			 := .Add(.params.GraftFloodThreshold - .params.PruneBackoff)
			if .Before() {
				// extra penalty
				.score.AddPenalty(, 1)
			}
			// refresh the backoff
			.addBackoff(, , false)
			 = append(, )
			continue
		}

		// check the score
		if  < 0 {
			// we don't GRAFT peers with negative score
			log.Debugf("GRAFT: ignoring peer %s with negative score [score = %f, topic = %s]", , , )
			// we do send them PRUNE however, because it's a matter of protocol correctness
			 = append(, )
			// but we won't PX to them
			 = false
			// add/refresh backoff so that we don't reGRAFT too early even if the score decays back up
			.addBackoff(, , false)
			continue
		}

		// check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts
		// from peers with outbound connections; this is a defensive check to restrict potential
		// mesh takeover attacks combined with love bombing
		if len() >= .params.Dhi && !.outbound[] {
			 = append(, )
			.addBackoff(, , false)
			continue
		}

		log.Debugf("GRAFT: add mesh link from %s in %s", , )
		.tracer.Graft(, )
		[] = struct{}{}
	}

	if len() == 0 {
		return nil
	}

	 := make([]*pb.ControlPrune, 0, len())
	for ,  := range  {
		 = append(, .makePrune(, , , false))
	}

	return 
}

func ( *GossipSubRouter) ( peer.ID,  *pb.ControlMessage) {
	 := .score.Score()

	for ,  := range .GetPrune() {
		 := .GetTopicID()
		,  := .mesh[]
		if ! {
			continue
		}

		log.Debugf("PRUNE: Remove mesh link to %s in %s", , )
		.tracer.Prune(, )
		delete(, )
		// is there a backoff specified by the peer? if so obey it.
		 := .GetBackoff()
		if  > 0 {
			.doAddBackoff(, , time.Duration()*time.Second)
		} else {
			.addBackoff(, , false)
		}

		 := .GetPeers()
		if len() > 0 {
			// we ignore PX from peers with insufficient score
			if  < .acceptPXThreshold {
				log.Debugf("PRUNE: ignoring PX from peer %s with insufficient score [score = %f, topic = %s]", , , )
				continue
			}

			.pxConnect()
		}
	}
}

func ( *GossipSubRouter) ( peer.ID,  *pb.ControlMessage) {
	if .unwanted[] == nil {
		.unwanted[] = make(map[checksum]int)
	}

	// IDONTWANT flood protection
	if .peerdontwant[] >= .params.MaxIDontWantMessages {
		log.Debugf("IDONWANT: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", , .peerdontwant[])
		return
	}
	.peerdontwant[]++

	 := 0
	// Remember all the unwanted message ids
:
	for ,  := range .GetIdontwant() {
		for ,  := range .GetMessageIDs() {
			// IDONTWANT flood protection
			if  >= .params.MaxIDontWantLength {
				log.Debugf("IDONWANT: peer %s has advertised too many ids (%d) within this message; ignoring", , )
				break 
			}

			++
			.unwanted[][computeChecksum()] = .params.IDontWantMessageTTL
		}
	}
}

func ( *GossipSubRouter) ( peer.ID,  string,  bool) {
	 := .params.PruneBackoff
	if  {
		 = .params.UnsubscribeBackoff
	}
	.doAddBackoff(, , )
}

func ( *GossipSubRouter) ( peer.ID,  string,  time.Duration) {
	,  := .backoff[]
	if ! {
		 = make(map[peer.ID]time.Time)
		.backoff[] = 
	}
	 := time.Now().Add()
	if [].Before() {
		[] = 
	}
}

func ( *GossipSubRouter) ( []*pb.PeerInfo) {
	if len() > .params.PrunePeers {
		shufflePeerInfo()
		 = [:.params.PrunePeers]
	}

	 := make([]connectInfo, 0, len())

	for ,  := range  {
		 := peer.ID(.PeerID)

		,  := .peers[]
		if  {
			continue
		}

		var  *record.Envelope
		if .SignedPeerRecord != nil {
			// the peer sent us a signed record; ensure that it is valid
			, ,  := record.ConsumeEnvelope(.SignedPeerRecord, peer.PeerRecordEnvelopeDomain)
			if  != nil {
				log.Warnf("error unmarshalling peer record obtained through px: %s", )
				continue
			}
			,  := .(*peer.PeerRecord)
			if ! {
				log.Warnf("bogus peer record obtained through px: envelope payload is not PeerRecord")
				continue
			}
			if .PeerID !=  {
				log.Warnf("bogus peer record obtained through px: peer ID %s doesn't match expected peer %s", .PeerID, )
				continue
			}
			 = 
		}

		 = append(, connectInfo{, })
	}

	if len() == 0 {
		return
	}

	for ,  := range  {
		select {
		case .connect <- :
		default:
			log.Debugf("ignoring peer connection attempt; too many pending connections")
		}
	}
}

func ( *GossipSubRouter) () {
	for {
		select {
		case  := <-.connect:
			if .p.host.Network().Connectedness(.p) == network.Connected {
				continue
			}

			log.Debugf("connecting to %s", .p)
			,  := peerstore.GetCertifiedAddrBook(.cab)
			if  && .spr != nil {
				,  := .ConsumePeerRecord(.spr, peerstore.TempAddrTTL)
				if  != nil {
					log.Debugf("error processing peer record: %s", )
				}
			}

			,  := context.WithTimeout(.p.ctx, .params.ConnectionTimeout)
			 := .p.host.Connect(, peer.AddrInfo{ID: .p, Addrs: .cab.Addrs(.p)})
			()
			if  != nil {
				log.Debugf("error connecting to %s: %s", .p, )
			}

		case <-.p.ctx.Done():
			return
		}
	}
}

func ( *GossipSubRouter) ( *Message) {
	.mcache.Put()

	 := .ReceivedFrom
	 := .GetTopic()

	 := make(map[peer.ID]struct{})

	// any peers in the topic?
	,  := .p.topics[]
	if ! {
		return
	}

	if .floodPublish &&  == .p.host.ID() {
		for  := range  {
			,  := .direct[]
			if  || .score.Score() >= .publishThreshold {
				[] = struct{}{}
			}
		}
	} else {
		// direct peers
		for  := range .direct {
			,  := []
			if  {
				[] = struct{}{}
			}
		}

		// floodsub peers
		for  := range  {
			if !.feature(GossipSubFeatureMesh, .peers[]) && .score.Score() >= .publishThreshold {
				[] = struct{}{}
			}
		}

		// gossipsub peers
		,  := .mesh[]
		if ! {
			// we are not in the mesh for topic, use fanout peers
			,  = .fanout[]
			if ! || len() == 0 {
				// we don't have any, pick some with score above the publish threshold
				 := .getPeers(, .params.D, func( peer.ID) bool {
					,  := .direct[]
					return ! && .score.Score() >= .publishThreshold
				})

				if len() > 0 {
					 = peerListToMap()
					.fanout[] = 
				}
			}
			.lastpub[] = time.Now().UnixNano()
		}

		for  := range  {
			 := .p.idGen.ID()
			// Check if it has already received an IDONTWANT for the message.
			// If so, don't send it to the peer
			if ,  := .unwanted[][computeChecksum()];  {
				continue
			}
			[] = struct{}{}
		}
	}

	 := rpcWithMessages(.Message)
	for  := range  {
		if  ==  ||  == peer.ID(.GetFrom()) {
			continue
		}

		.sendRPC(, , false)
	}
}

func ( *GossipSubRouter) ( string) {
	,  := .mesh[]
	if  {
		return
	}

	log.Debugf("JOIN %s", )
	.tracer.Join()

	,  = .fanout[]
	if  {
		 := .backoff[]
		// these peers have a score above the publish threshold, which may be negative
		// so drop the ones with a negative score
		for  := range  {
			,  := []
			if .score.Score() < 0 ||  {
				delete(, )
			}
		}

		if len() < .params.D {
			// we need more peers; eager, as this would get fixed in the next heartbeat
			 := .getPeers(, .params.D-len(), func( peer.ID) bool {
				// filter our current peers, direct peers, peers we are backing off, and
				// peers with negative scores
				,  := []
				,  := .direct[]
				,  := []
				return ! && ! && ! && .score.Score() >= 0
			})
			for ,  := range  {
				[] = struct{}{}
			}
		}
		.mesh[] = 
		delete(.fanout, )
		delete(.lastpub, )
	} else {
		 := .backoff[]
		 := .getPeers(, .params.D, func( peer.ID) bool {
			// filter direct peers, peers we are backing off and peers with negative score
			,  := .direct[]
			,  := []
			return ! && ! && .score.Score() >= 0
		})
		 = peerListToMap()
		.mesh[] = 
	}

	for  := range  {
		log.Debugf("JOIN: Add mesh link to %s in %s", , )
		.tracer.Graft(, )
		.sendGraft(, )
	}
}

func ( *GossipSubRouter) ( string) {
	,  := .mesh[]
	if ! {
		return
	}

	log.Debugf("LEAVE %s", )
	.tracer.Leave()

	delete(.mesh, )

	for  := range  {
		log.Debugf("LEAVE: Remove mesh link to %s in %s", , )
		.tracer.Prune(, )
		.sendPrune(, , true)
		// Add a backoff to this peer to prevent us from eagerly
		// re-grafting this peer into our mesh if we rejoin this
		// topic before the backoff period ends.
		.addBackoff(, , true)
	}
}

func ( *GossipSubRouter) ( peer.ID,  string) {
	 := []*pb.ControlGraft{{TopicID: &}}
	 := rpcWithControl(nil, nil, nil, , nil, nil)
	.sendRPC(, , false)
}

func ( *GossipSubRouter) ( peer.ID,  string,  bool) {
	 := []*pb.ControlPrune{.makePrune(, , .doPX, )}
	 := rpcWithControl(nil, nil, nil, nil, , nil)
	.sendRPC(, , false)
}

func ( *GossipSubRouter) ( peer.ID,  *RPC,  bool) {
	// do we own the RPC?
	 := false

	// piggyback control message retries
	,  := .control[]
	if  {
		 = copyRPC()
		 = true
		.piggybackControl(, , )
		delete(.control, )
	}

	// piggyback gossip
	,  := .gossip[]
	if  {
		if ! {
			 = copyRPC()
			 = true
		}
		.piggybackGossip(, , )
		delete(.gossip, )
	}

	,  := .p.peers[]
	if ! {
		return
	}

	// If we're below the max message size, go ahead and send
	if .Size() < .p.maxMessageSize {
		.doSendRPC(, , , )
		return
	}

	// Potentially split the RPC into multiple RPCs that are below the max message size
	 := appendOrMergeRPC(nil, .p.maxMessageSize, *)
	for ,  := range  {
		if .Size() > .p.maxMessageSize {
			// This should only happen if a single message/control is above the maxMessageSize.
			.doDropRPC(, , fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", .Size(), .p.maxMessageSize, .Size()-.p.maxMessageSize))
			continue
		}
		.doSendRPC(, , , )
	}
}

func ( *GossipSubRouter) ( *RPC,  peer.ID,  string) {
	if log.Level() <= zapcore.DebugLevel {
		log.Debugf("dropping message to peer %s: %s", , )
	}
	.tracer.DropRPC(, )
	// push control messages that need to be retried
	 := .GetControl()
	if  != nil {
		.pushControl(, )
	}
}

func ( *GossipSubRouter) ( *RPC,  peer.ID,  *rpcQueue,  bool) {
	var  error
	if  {
		 = .UrgentPush(, false)
	} else {
		 = .Push(, false)
	}
	if  != nil {
		.doDropRPC(, , "queue full")
		return
	}
	.tracer.SendRPC(, )
}

// appendOrMergeRPC appends the given RPCs to the slice, merging them if possible.
// If any elem is too large to fit in a single RPC, it will be split into multiple RPCs.
// If an RPC is too large and can't be split further (e.g. Message data is
// bigger than the RPC limit), then it will be returned as an oversized RPC.
// The caller should filter out oversized RPCs.
func appendOrMergeRPC( []*RPC,  int,  ...RPC) []*RPC {
	if len() == 0 {
		return 
	}

	if len() == 0 && len() == 1 && [0].Size() <  {
		// Fast path: no merging needed and only one element
		return append(, &[0])
	}

	 := 
	if len() == 0 {
		 = append(, &RPC{RPC: pb.RPC{}})
		[0].from = [0].from
	}

	for ,  := range  {
		 := [len()-1]

		// Merge/Append publish messages
		// TODO: Never merge messages. The current behavior is the same as the
		// old behavior. In the future let's not merge messages. Since,
		// it may increase message latency.
		for ,  := range .GetPublish() {
			if .Publish = append(.Publish, ); .Size() >  {
				.Publish = .Publish[:len(.Publish)-1]
				 = &RPC{RPC: pb.RPC{}, from: .from}
				.Publish = append(.Publish, )
				 = append(, )
			}
		}

		// Merge/Append Subscriptions
		for ,  := range .GetSubscriptions() {
			if .Subscriptions = append(.Subscriptions, ); .Size() >  {
				.Subscriptions = .Subscriptions[:len(.Subscriptions)-1]
				 = &RPC{RPC: pb.RPC{}, from: .from}
				.Subscriptions = append(.Subscriptions, )
				 = append(, )
			}
		}

		// Merge/Append Control messages
		if  := .GetControl();  != nil {
			if .Control == nil {
				.Control = &pb.ControlMessage{}
				if .Size() >  {
					.Control = nil
					 = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: .from}
					 = append(, )
				}
			}

			for ,  := range .GetGraft() {
				if .Control.Graft = append(.Control.Graft, ); .Size() >  {
					.Control.Graft = .Control.Graft[:len(.Control.Graft)-1]
					 = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: .from}
					.Control.Graft = append(.Control.Graft, )
					 = append(, )
				}
			}

			for ,  := range .GetPrune() {
				if .Control.Prune = append(.Control.Prune, ); .Size() >  {
					.Control.Prune = .Control.Prune[:len(.Control.Prune)-1]
					 = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: .from}
					.Control.Prune = append(.Control.Prune, )
					 = append(, )
				}
			}

			for ,  := range .GetIwant() {
				if len(.Control.Iwant) == 0 {
					// Initialize with a single IWANT.
					// For IWANTs we don't need more than a single one,
					// since there are no topic IDs here.
					 := &pb.ControlIWant{}
					if .Control.Iwant = append(.Control.Iwant, ); .Size() >  {
						.Control.Iwant = .Control.Iwant[:len(.Control.Iwant)-1]
						 = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
							Iwant: []*pb.ControlIWant{},
						}}, from: .from}
						 = append(, )
					}
				}
				for ,  := range .GetMessageIDs() {
					if .Control.Iwant[0].MessageIDs = append(.Control.Iwant[0].MessageIDs, ); .Size() >  {
						.Control.Iwant[0].MessageIDs = .Control.Iwant[0].MessageIDs[:len(.Control.Iwant[0].MessageIDs)-1]
						 = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
							Iwant: []*pb.ControlIWant{{MessageIDs: []string{}}},
						}}, from: .from}
						 = append(, )
					}
				}
			}

			for ,  := range .GetIhave() {
				if len(.Control.Ihave) == 0 ||
					.Control.Ihave[len(.Control.Ihave)-1].TopicID != .TopicID {
					// Start a new IHAVE if we are referencing a new topic ID
					 := &pb.ControlIHave{TopicID: .TopicID}
					if .Control.Ihave = append(.Control.Ihave, ); .Size() >  {
						.Control.Ihave = .Control.Ihave[:len(.Control.Ihave)-1]
						 = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
							Ihave: []*pb.ControlIHave{},
						}}, from: .from}
						 = append(, )
					}
				}
				for ,  := range .GetMessageIDs() {
					 := .Control.Ihave[len(.Control.Ihave)-1]
					if .MessageIDs = append(.MessageIDs, ); .Size() >  {
						.MessageIDs = .MessageIDs[:len(.MessageIDs)-1]
						 = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
							Ihave: []*pb.ControlIHave{{TopicID: .TopicID, MessageIDs: []string{}}},
						}}, from: .from}
						 = append(, )
					}
				}
			}
		}
	}

	return 
}

func ( *GossipSubRouter) () {
	time.Sleep(.params.HeartbeatInitialDelay)
	select {
	case .p.eval <- .heartbeat:
	case <-.p.ctx.Done():
		return
	}

	 := time.NewTicker(.params.HeartbeatInterval)
	defer .Stop()

	for {
		select {
		case <-.C:
			select {
			case .p.eval <- .heartbeat:
			case <-.p.ctx.Done():
				return
			}
		case <-.p.ctx.Done():
			return
		}
	}
}

func ( *GossipSubRouter) () {
	 := time.Now()
	defer func() {
		if .params.SlowHeartbeatWarning > 0 {
			 := time.Duration(.params.SlowHeartbeatWarning * float64(.params.HeartbeatInterval))
			if  := time.Since();  >  {
				log.Warnw("slow heartbeat", "took", )
			}
		}
	}()

	.heartbeatTicks++

	 := make(map[peer.ID][]string)
	 := make(map[peer.ID][]string)
	 := make(map[peer.ID]bool)

	// clean up expired backoffs
	.clearBackoff()

	// clean up iasked counters
	.clearIHaveCounters()

	// clean up IDONTWANT counters
	.clearIDontWantCounters()

	// apply IWANT request penalties
	.applyIwantPenalties()

	// ensure direct peers are connected
	.directConnect()

	// cache scores throughout the heartbeat
	 := make(map[peer.ID]float64)
	 := func( peer.ID) float64 {
		,  := []
		if ! {
			 = .score.Score()
			[] = 
		}
		return 
	}

	// maintain the mesh for topics we have joined
	for ,  := range .mesh {
		 := func( peer.ID) {
			.tracer.Prune(, )
			delete(, )
			.addBackoff(, , false)
			 := []
			[] = append(, )
		}

		 := func( peer.ID) {
			log.Debugf("HEARTBEAT: Add mesh link to %s in %s", , )
			.tracer.Graft(, )
			[] = struct{}{}
			 := []
			[] = append(, )
		}

		// drop all peers with negative score, without PX
		for  := range  {
			if () < 0 {
				log.Debugf("HEARTBEAT: Prune peer %s with negative score [score = %f, topic = %s]", , (), )
				()
				[] = true
			}
		}

		// do we have enough peers?
		if  := len();  < .params.Dlo {
			 := .backoff[]
			 := .params.D - 
			 := .getPeers(, , func( peer.ID) bool {
				// filter our current and direct peers, peers we are backing off, and peers with negative score
				,  := []
				,  := []
				,  := .direct[]
				return ! && ! && ! && () >= 0
			})

			for ,  := range  {
				()
			}
		}

		// do we have too many peers?
		if len() >= .params.Dhi {
			 := peerMapToList()

			// sort by score (but shuffle first for the case we don't use the score)
			shufflePeers()
			sort.Slice(, func(,  int) bool {
				return ([]) > ([])
			})

			// We keep the first D_score peers by score and the remaining up to D randomly
			// under the constraint that we keep D_out peers in the mesh (if we have that many)
			shufflePeers([.params.Dscore:])

			// count the outbound peers we are keeping
			 := 0
			for ,  := range [:.params.D] {
				if .outbound[] {
					++
				}
			}

			// if it's less than D_out, bubble up some outbound peers from the random selection
			if  < .params.Dout {
				 := func( int) {
					// rotate the plst to the right and put the ith peer in the front
					 := []
					for  := ;  > 0; -- {
						[] = [-1]
					}
					[0] = 
				}

				// first bubble up all outbound peers already in the selection to the front
				if  > 0 {
					 := 
					for  := 1;  < .params.D &&  > 0; ++ {
						 := []
						if .outbound[] {
							()
							--
						}
					}
				}

				// now bubble up enough outbound peers outside the selection to the front
				 := .params.Dout - 
				for  := .params.D;  < len() &&  > 0; ++ {
					 := []
					if .outbound[] {
						()
						--
					}
				}
			}

			// prune the excess peers
			for ,  := range [.params.D:] {
				log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", , )
				()
			}
		}

		// do we have enough outboud peers?
		if len() >= .params.Dlo {
			// count the outbound peers we have
			 := 0
			for  := range  {
				if .outbound[] {
					++
				}
			}

			// if it's less than D_out, select some peers with outbound connections and graft them
			if  < .params.Dout {
				 := .params.Dout - 
				 := .backoff[]
				 := .getPeers(, , func( peer.ID) bool {
					// filter our current and direct peers, peers we are backing off, and peers with negative score
					,  := []
					,  := []
					,  := .direct[]
					return ! && ! && ! && .outbound[] && () >= 0
				})

				for ,  := range  {
					()
				}
			}
		}

		// should we try to improve the mesh with opportunistic grafting?
		if .heartbeatTicks%.params.OpportunisticGraftTicks == 0 && len() > 1 {
			// Opportunistic grafting works as follows: we check the median score of peers in the
			// mesh; if this score is below the opportunisticGraftThreshold, we select a few peers at
			// random with score over the median.
			// The intention is to (slowly) improve an underperforming mesh by introducing good
			// scoring peers that may have been gossiping at us. This allows us to get out of sticky
			// situations where we are stuck with poor peers and also recover from churn of good peers.

			// now compute the median peer score in the mesh
			 := peerMapToList()
			sort.Slice(, func(,  int) bool {
				return ([]) < ([])
			})
			 := len() / 2
			 := [[]]

			// if the median score is below the threshold, select a better peer (if any) and GRAFT
			if  < .opportunisticGraftThreshold {
				 := .backoff[]
				 = .getPeers(, .params.OpportunisticGraftPeers, func( peer.ID) bool {
					,  := []
					,  := []
					,  := .direct[]
					return ! && ! && ! && () > 
				})

				for ,  := range  {
					log.Debugf("HEARTBEAT: Opportunistically graft peer %s on topic %s", , )
					()
				}
			}
		}

		// 2nd arg are mesh peers excluded from gossip. We already push
		// messages to them, so its redundant to gossip IHAVEs.
		.emitGossip(, )
	}

	// expire fanout for topics we haven't published to in a while
	 := time.Now().UnixNano()
	for ,  := range .lastpub {
		if +int64(.params.FanoutTTL) <  {
			delete(.fanout, )
			delete(.lastpub, )
		}
	}

	// maintain our fanout for topics we are publishing but we have not joined
	for ,  := range .fanout {
		// check whether our peers are still in the topic and have a score above the publish threshold
		for  := range  {
			,  := .p.topics[][]
			if ! || () < .publishThreshold {
				delete(, )
			}
		}

		// do we need more peers?
		if len() < .params.D {
			 := .params.D - len()
			 := .getPeers(, , func( peer.ID) bool {
				// filter our current and direct peers and peers with score above the publish threshold
				,  := []
				,  := .direct[]
				return ! && ! && () >= .publishThreshold
			})

			for ,  := range  {
				[] = struct{}{}
			}
		}

		// 2nd arg are fanout peers excluded from gossip. We already push
		// messages to them, so its redundant to gossip IHAVEs.
		.emitGossip(, )
	}

	// send coalesced GRAFT/PRUNE messages (will piggyback gossip)
	.sendGraftPrune(, , )

	// flush all pending gossip that wasn't piggybacked above
	.flush()

	// advance the message history window
	.mcache.Shift()
}

func ( *GossipSubRouter) () {
	if len(.peerhave) > 0 {
		// throw away the old map and make a new one
		.peerhave = make(map[peer.ID]int)
	}

	if len(.iasked) > 0 {
		// throw away the old map and make a new one
		.iasked = make(map[peer.ID]int)
	}
}

func ( *GossipSubRouter) () {
	if len(.peerdontwant) > 0 {
		// throw away the old map and make a new one
		.peerdontwant = make(map[peer.ID]int)
	}

	// decrement TTLs of all the IDONTWANTs and delete it from the cache when it reaches zero
	for ,  := range .unwanted {
		for  := range  {
			[]--
			if [] == 0 {
				delete(, )
			}
		}
	}
}

func ( *GossipSubRouter) () {
	for ,  := range .gossipTracer.GetBrokenPromises() {
		log.Infof("peer %s didn't follow up in %d IWANT requests; adding penalty", , )
		.score.AddPenalty(, )
	}
}

func ( *GossipSubRouter) () {
	// we only clear once every 15 ticks to avoid iterating over the map(s) too much
	if .heartbeatTicks%15 != 0 {
		return
	}

	 := time.Now()
	for ,  := range .backoff {
		for ,  := range  {
			// add some slack time to the expiration
			// https://github.com/libp2p/specs/pull/289
			if .Add(2 * GossipSubHeartbeatInterval).Before() {
				delete(, )
			}
		}
		if len() == 0 {
			delete(.backoff, )
		}
	}
}

func ( *GossipSubRouter) () {
	// we donly do this every some ticks to allow pending connections to complete and account
	// for restarts/downtime
	if .heartbeatTicks%.params.DirectConnectTicks != 0 {
		return
	}

	var  []peer.ID
	for  := range .direct {
		,  := .peers[]
		if ! {
			 = append(, )
		}
	}

	if len() > 0 {
		go func() {
			for ,  := range  {
				.connect <- connectInfo{p: }
			}
		}()
	}
}

func ( *GossipSubRouter) (,  map[peer.ID][]string,  map[peer.ID]bool) {
	for ,  := range  {
		 := make([]*pb.ControlGraft, 0, len())
		for ,  := range  {
			// copy topic string here since
			// the reference to the string
			// topic here changes with every
			// iteration of the slice.
			 := 
			 = append(, &pb.ControlGraft{TopicID: &})
		}

		var  []*pb.ControlPrune
		,  := []
		if  {
			delete(, )
			 = make([]*pb.ControlPrune, 0, len())
			for ,  := range  {
				 = append(, .makePrune(, , .doPX && ![], false))
			}
		}

		 := rpcWithControl(nil, nil, nil, , , nil)
		.sendRPC(, , false)
	}

	for ,  := range  {
		 := make([]*pb.ControlPrune, 0, len())
		for ,  := range  {
			 = append(, .makePrune(, , .doPX && ![], false))
		}

		 := rpcWithControl(nil, nil, nil, nil, , nil)
		.sendRPC(, , false)
	}
}

// emitGossip emits IHAVE gossip advertising items in the message cache window
// of this topic.
func ( *GossipSubRouter) ( string,  map[peer.ID]struct{}) {
	 := .mcache.GetGossipIDs()
	if len() == 0 {
		return
	}

	// shuffle to emit in random order
	shuffleStrings()

	// if we are emitting more than GossipSubMaxIHaveLength mids, truncate the list
	if len() > .params.MaxIHaveLength {
		// we do the truncation (with shuffling) per peer below
		log.Debugf("too many messages for gossip; will truncate IHAVE list (%d messages)", len())
	}

	// Send gossip to GossipFactor peers above threshold, with a minimum of D_lazy.
	// First we collect the peers above gossipThreshold that are not in the exclude set
	// and then randomly select from that set.
	// We also exclude direct peers, as there is no reason to emit gossip to them.
	 := make([]peer.ID, 0, len(.p.topics[]))
	for  := range .p.topics[] {
		,  := []
		,  := .direct[]
		if ! && ! && .feature(GossipSubFeatureMesh, .peers[]) && .score.Score() >= .gossipThreshold {
			 = append(, )
		}
	}

	 := .params.Dlazy
	 := int(.params.GossipFactor * float64(len()))
	if  >  {
		 = 
	}

	if  > len() {
		 = len()
	} else {
		shufflePeers()
	}
	 = [:]

	// Emit the IHAVE gossip to the selected peers.
	for ,  := range  {
		 := 
		if len() > .params.MaxIHaveLength {
			// we do this per peer so that we emit a different set for each peer.
			// we have enough redundancy in the system that this will significantly increase the message
			// coverage when we do truncate.
			 = make([]string, .params.MaxIHaveLength)
			shuffleStrings()
			copy(, )
		}
		.enqueueGossip(, &pb.ControlIHave{TopicID: &, MessageIDs: })
	}
}

func ( *GossipSubRouter) () {
	// send gossip first, which will also piggyback pending control
	for ,  := range .gossip {
		delete(.gossip, )
		 := rpcWithControl(nil, , nil, nil, nil, nil)
		.sendRPC(, , false)
	}

	// send the remaining control messages that wasn't merged with gossip
	for ,  := range .control {
		delete(.control, )
		 := rpcWithControl(nil, nil, nil, .Graft, .Prune, nil)
		.sendRPC(, , false)
	}
}

func ( *GossipSubRouter) ( peer.ID,  *pb.ControlIHave) {
	 := .gossip[]
	 = append(, )
	.gossip[] = 
}

func ( *GossipSubRouter) ( peer.ID,  *RPC,  []*pb.ControlIHave) {
	 := .GetControl()
	if  == nil {
		 = &pb.ControlMessage{}
		.Control = 
	}

	.Ihave = 
}

func ( *GossipSubRouter) ( peer.ID,  *pb.ControlMessage) {
	// remove IHAVE/IWANT/IDONTWANT from control message, gossip is not retried
	.Ihave = nil
	.Iwant = nil
	.Idontwant = nil
	if .Graft != nil || .Prune != nil {
		.control[] = 
	}
}

func ( *GossipSubRouter) ( peer.ID,  *RPC,  *pb.ControlMessage) {
	// check control message for staleness first
	var  []*pb.ControlGraft
	var  []*pb.ControlPrune

	for ,  := range .GetGraft() {
		 := .GetTopicID()
		,  := .mesh[]
		if ! {
			continue
		}
		_,  = []
		if  {
			 = append(, )
		}
	}

	for ,  := range .GetPrune() {
		 := .GetTopicID()
		,  := .mesh[]
		if ! {
			 = append(, )
			continue
		}
		_,  = []
		if ! {
			 = append(, )
		}
	}

	if len() == 0 && len() == 0 {
		return
	}

	 := .Control
	if  == nil {
		 = &pb.ControlMessage{}
		.Control = 
	}

	if len() > 0 {
		.Graft = append(.Graft, ...)
	}
	if len() > 0 {
		.Prune = append(.Prune, ...)
	}
}

func ( *GossipSubRouter) ( peer.ID,  string,  bool,  bool) *pb.ControlPrune {
	if !.feature(GossipSubFeaturePX, .peers[]) {
		// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
		return &pb.ControlPrune{TopicID: &}
	}

	 := uint64(.params.PruneBackoff / time.Second)
	if  {
		 = uint64(.params.UnsubscribeBackoff / time.Second)
	}

	var  []*pb.PeerInfo
	if  {
		// select peers for Peer eXchange
		 := .getPeers(, .params.PrunePeers, func( peer.ID) bool {
			return  !=  && .score.Score() >= 0
		})

		,  := peerstore.GetCertifiedAddrBook(.cab)
		 = make([]*pb.PeerInfo, 0, len())
		for ,  := range  {
			// see if we have a signed peer record to send back; if we don't, just send
			// the peer ID and let the pruned peer find them in the DHT -- we can't trust
			// unsigned address records through px anyway.
			var  []byte
			if  {
				 := .GetPeerRecord()
				var  error
				if  != nil {
					,  = .Marshal()
					if  != nil {
						log.Warnf("error marshaling signed peer record for %s: %s", , )
					}
				}
			}
			 = append(, &pb.PeerInfo{PeerID: []byte(), SignedPeerRecord: })
		}
	}

	return &pb.ControlPrune{TopicID: &, Peers: , Backoff: &}
}

func ( *GossipSubRouter) ( string,  int,  func(peer.ID) bool) []peer.ID {
	,  := .p.topics[]
	if ! {
		return nil
	}

	 := make([]peer.ID, 0, len())
	for  := range  {
		if .feature(GossipSubFeatureMesh, .peers[]) && () && .p.peerFilter(, ) {
			 = append(, )
		}
	}

	shufflePeers()

	if  > 0 && len() >  {
		 = [:]
	}

	return 
}

// WithDefaultTagTracer returns the tag tracer of the GossipSubRouter as a PubSub option.
// This is useful for cases where the GossipSubRouter is instantiated externally, and is
// injected into the GossipSub constructor as a dependency. This allows the tag tracer to be
// also injected into the GossipSub constructor as a PubSub option dependency.
func ( *GossipSubRouter) () Option {
	return WithRawTracer(.tagTracer)
}

// SendControl dispatches the given set of control messages to the given peer.
// The control messages are sent as a single RPC, with the given (optional) messages.
// Args:
//
//	p: the peer to send the control messages to.
//	ctl: the control messages to send.
//	msgs: the messages to send in the same RPC (optional).
//	The control messages are piggybacked on the messages.
//
// Returns:
//
//	nothing.
func ( *GossipSubRouter) ( peer.ID,  *pb.ControlMessage,  ...*pb.Message) {
	 := rpcWithControl(, .Ihave, .Iwant, .Graft, .Prune, .Idontwant)
	.sendRPC(, , false)
}

func peerListToMap( []peer.ID) map[peer.ID]struct{} {
	 := make(map[peer.ID]struct{})
	for ,  := range  {
		[] = struct{}{}
	}
	return 
}

func peerMapToList( map[peer.ID]struct{}) []peer.ID {
	 := make([]peer.ID, 0, len())
	for  := range  {
		 = append(, )
	}
	return 
}

func shufflePeers( []peer.ID) {
	for  := range  {
		 := rand.Intn( + 1)
		[], [] = [], []
	}
}

func shufflePeerInfo( []*pb.PeerInfo) {
	for  := range  {
		 := rand.Intn( + 1)
		[], [] = [], []
	}
}

func shuffleStrings( []string) {
	for  := range  {
		 := rand.Intn( + 1)
		[], [] = [], []
	}
}

func computeChecksum( string) checksum {
	var  checksum
	if len() > 32 || len() == 0 {
		.payload = sha256.Sum256([]byte())
	} else {
		.length = uint8(copy(.payload[:], ))
	}
	return 
}