package pubsub

import (
	
	
	

	
	
	
)

var (
	// GossipSubConnTagBumpMessageDelivery is the amount to add to the connection manager
	// tag that tracks message deliveries. Each time a peer is the first to deliver a
	// message within a topic, we "bump" a tag by this amount, up to a maximum
	// of GossipSubConnTagMessageDeliveryCap.
	// Note that the delivery tags decay over time, decreasing by GossipSubConnTagDecayAmount
	// at every GossipSubConnTagDecayInterval.
	GossipSubConnTagBumpMessageDelivery = 1

	// GossipSubConnTagDecayInterval is the decay interval for decaying connection manager tags.
	GossipSubConnTagDecayInterval = 10 * time.Minute

	// GossipSubConnTagDecayAmount is subtracted from decaying tag values at each decay interval.
	GossipSubConnTagDecayAmount = 1

	// GossipSubConnTagMessageDeliveryCap is the maximum value for the connection manager tags that
	// track message deliveries.
	GossipSubConnTagMessageDeliveryCap = 15
)

// tagTracer is an internal tracer that applies connection manager tags to peer
// connections based on their behavior.
//
// We tag a peer's connections for the following reasons:
//   - Directly connected peers are tagged with GossipSubConnTagValueDirectPeer (default 1000).
//   - Mesh peers are tagged with a value of GossipSubConnTagValueMeshPeer (default 20).
//     If a peer is in multiple topic meshes, they'll be tagged for each.
//   - For each message that we receive, we bump a delivery tag for peer that delivered the message
//     first.
//     The delivery tags have a maximum value, GossipSubConnTagMessageDeliveryCap, and they decay at
//     a rate of GossipSubConnTagDecayAmount / GossipSubConnTagDecayInterval.
type tagTracer struct {
	sync.RWMutex

	cmgr     connmgr.ConnManager
	idGen    *msgIDGenerator
	decayer  connmgr.Decayer
	decaying map[string]connmgr.DecayingTag
	direct   map[peer.ID]struct{}

	// a map of message ids to the set of peers who delivered the message after the first delivery,
	// but before the message was finished validating
	nearFirst map[string]map[peer.ID]struct{}
}

func newTagTracer( connmgr.ConnManager) *tagTracer {
	,  := connmgr.SupportsDecay()
	if ! {
		log.Debugf("connection manager does not support decaying tags, delivery tags will not be applied")
	}
	return &tagTracer{
		cmgr:      ,
		idGen:     newMsgIdGenerator(),
		decayer:   ,
		decaying:  make(map[string]connmgr.DecayingTag),
		nearFirst: make(map[string]map[peer.ID]struct{}),
	}
}

func ( *tagTracer) ( *GossipSubRouter) {
	if  == nil {
		return
	}

	.idGen = .p.idGen
	.direct = .direct
}

func ( *tagTracer) ( peer.ID) {
	if .direct == nil {
		return
	}

	// tag peer if it is a direct peer
	,  := .direct[]
	if  {
		.cmgr.Protect(, "pubsub:<direct>")
	}
}

func ( *tagTracer) ( peer.ID,  string) {
	 := topicTag()
	.cmgr.Protect(, )
}

func ( *tagTracer) ( peer.ID,  string) {
	 := topicTag()
	.cmgr.Unprotect(, )
}

func topicTag( string) string {
	return fmt.Sprintf("pubsub:%s", )
}

func ( *tagTracer) ( string) {
	if .decayer == nil {
		return
	}

	 := fmt.Sprintf("pubsub-deliveries:%s", )
	.Lock()
	defer .Unlock()
	,  := .decayer.RegisterDecayingTag(
		,
		GossipSubConnTagDecayInterval,
		connmgr.DecayFixed(GossipSubConnTagDecayAmount),
		connmgr.BumpSumBounded(0, GossipSubConnTagMessageDeliveryCap))

	if  != nil {
		log.Warnf("unable to create decaying delivery tag: %s", )
		return
	}
	.decaying[] = 
}

func ( *tagTracer) ( string) {
	.Lock()
	defer .Unlock()
	,  := .decaying[]
	if ! {
		return
	}
	 := .Close()
	if  != nil {
		log.Warnf("error closing decaying connmgr tag: %s", )
	}
	delete(.decaying, )
}

func ( *tagTracer) ( peer.ID,  string) error {
	.RLock()
	defer .RUnlock()

	,  := .decaying[]
	if ! {
		return fmt.Errorf("no decaying tag registered for topic %s", )
	}
	return .Bump(, GossipSubConnTagBumpMessageDelivery)
}

func ( *tagTracer) ( peer.ID,  *Message) {
	 := .GetTopic()
	 := .bumpDeliveryTag(, )
	if  != nil {
		log.Warnf("error bumping delivery tag: %s", )
	}
}

// nearFirstPeers returns the peers who delivered the message while it was still validating
func ( *tagTracer) ( *Message) []peer.ID {
	.Lock()
	defer .Unlock()
	,  := .nearFirst[.idGen.ID()]
	if ! {
		return nil
	}
	 := make([]peer.ID, 0, len())
	for  := range  {
		 = append(, )
	}
	return 
}

// -- RawTracer interface methods
var _ RawTracer = (*tagTracer)(nil)

func ( *tagTracer) ( peer.ID,  protocol.ID) {
	.tagPeerIfDirect()
}

func ( *tagTracer) ( string) {
	.addDeliveryTag()
}

func ( *tagTracer) ( *Message) {
	 := .nearFirstPeers()

	.bumpTagsForMessage(.ReceivedFrom, )
	for ,  := range  {
		.bumpTagsForMessage(, )
	}

	// delete the delivery state for this message
	.Lock()
	delete(.nearFirst, .idGen.ID())
	.Unlock()
}

func ( *tagTracer) ( string) {
	.removeDeliveryTag()
}

func ( *tagTracer) ( peer.ID,  string) {
	.tagMeshPeer(, )
}

func ( *tagTracer) ( peer.ID,  string) {
	.untagMeshPeer(, )
}

func ( *tagTracer) ( *Message) {
	.Lock()
	defer .Unlock()

	// create map to start tracking the peers who deliver while we're validating
	 := .idGen.ID()
	if ,  := .nearFirst[];  {
		return
	}
	.nearFirst[] = make(map[peer.ID]struct{})
}

func ( *tagTracer) ( *Message) {
	.Lock()
	defer .Unlock()

	 := .idGen.ID()
	,  := .nearFirst[]
	if ! {
		return
	}
	[.ReceivedFrom] = struct{}{}
}

func ( *tagTracer) ( *Message,  string) {
	.Lock()
	defer .Unlock()

	// We want to delete the near-first delivery tracking for messages that have passed through
	// the validation pipeline. Other rejection reasons (missing signature, etc) skip the validation
	// queue, so we don't want to remove the state in case the message is still validating.
	switch  {
	case RejectValidationThrottled:
		fallthrough
	case RejectValidationIgnored:
		fallthrough
	case RejectValidationFailed:
		delete(.nearFirst, .idGen.ID())
	}
}

func ( *tagTracer) (peer.ID)                {}
func ( *tagTracer) ( peer.ID)            {}
func ( *tagTracer) ( *RPC)                  {}
func ( *tagTracer) ( *RPC,  peer.ID)       {}
func ( *tagTracer) ( *RPC,  peer.ID)       {}
func ( *tagTracer) ( *Message) {}