package pubsub

import (
	
	
	
	
	
	

	
	
	
	

	manet 
)

var (
	DefaultPeerGaterRetainStats     = 6 * time.Hour
	DefaultPeerGaterQuiet           = time.Minute
	DefaultPeerGaterDuplicateWeight = 0.125
	DefaultPeerGaterIgnoreWeight    = 1.0
	DefaultPeerGaterRejectWeight    = 16.0
	DefaultPeerGaterThreshold       = 0.33
	DefaultPeerGaterGlobalDecay     = ScoreParameterDecay(2 * time.Minute)
	DefaultPeerGaterSourceDecay     = ScoreParameterDecay(time.Hour)
)

// PeerGaterParams groups together parameters that control the operation of the peer gater
type PeerGaterParams struct {
	// when the ratio of throttled/validated messages exceeds this threshold, the gater turns on
	Threshold float64
	// (linear) decay parameter for gater counters
	GlobalDecay float64 // global counter decay
	SourceDecay float64 // per IP counter decay
	// decay interval
	DecayInterval time.Duration
	// counter zeroing threshold
	DecayToZero float64
	// how long to retain stats
	RetainStats time.Duration
	// quiet interval before turning off the gater; if there are no validation throttle events
	// for this interval, the gater turns off
	Quiet time.Duration
	// weight of duplicate message deliveries
	DuplicateWeight float64
	// weight of ignored messages
	IgnoreWeight float64
	// weight of rejected messages
	RejectWeight float64

	// priority topic delivery weights
	TopicDeliveryWeights map[string]float64
}

func ( *PeerGaterParams) () error {
	if .Threshold <= 0 {
		return fmt.Errorf("invalid Threshold; must be > 0")
	}
	if .GlobalDecay <= 0 || .GlobalDecay >= 1 {
		return fmt.Errorf("invalid GlobalDecay; must be between 0 and 1")
	}
	if .SourceDecay <= 0 || .SourceDecay >= 1 {
		return fmt.Errorf("invalid SourceDecay; must be between 0 and 1")
	}
	if .DecayInterval < time.Second {
		return fmt.Errorf("invalid DecayInterval; must be at least 1s")
	}
	if .DecayToZero <= 0 || .DecayToZero >= 1 {
		return fmt.Errorf("invalid DecayToZero; must be between 0 and 1")
	}
	// no need to check stats retention; a value of 0 means we don't retain stats
	if .Quiet < time.Second {
		return fmt.Errorf("invalud Quiet interval; must be at least 1s")
	}
	if .DuplicateWeight <= 0 {
		return fmt.Errorf("invalid DuplicateWeight; must be > 0")
	}
	if .IgnoreWeight < 1 {
		return fmt.Errorf("invalid IgnoreWeight; must be >= 1")
	}
	if .RejectWeight < 1 {
		return fmt.Errorf("invalud RejectWeight; must be >= 1")
	}

	return nil
}

// WithTopicDeliveryWeights is a fluid setter for the priority topic delivery weights
func ( *PeerGaterParams) ( map[string]float64) *PeerGaterParams {
	.TopicDeliveryWeights = 
	return 
}

// NewPeerGaterParams creates a new PeerGaterParams struct, using the specified threshold and decay
// parameters and default values for all other parameters.
func (, ,  float64) *PeerGaterParams {
	return &PeerGaterParams{
		Threshold:       ,
		GlobalDecay:     ,
		SourceDecay:     ,
		DecayToZero:     DefaultDecayToZero,
		DecayInterval:   DefaultDecayInterval,
		RetainStats:     DefaultPeerGaterRetainStats,
		Quiet:           DefaultPeerGaterQuiet,
		DuplicateWeight: DefaultPeerGaterDuplicateWeight,
		IgnoreWeight:    DefaultPeerGaterIgnoreWeight,
		RejectWeight:    DefaultPeerGaterRejectWeight,
	}
}

// DefaultPeerGaterParams creates a new PeerGaterParams struct using default values
func () *PeerGaterParams {
	return NewPeerGaterParams(DefaultPeerGaterThreshold, DefaultPeerGaterGlobalDecay, DefaultPeerGaterSourceDecay)
}

// the gater object.
type peerGater struct {
	sync.Mutex

	host host.Host

	// gater parameters
	params *PeerGaterParams

	// counters
	validate, throttle float64

	// time of last validation throttle
	lastThrottle time.Time

	// stats per peer.ID -- multiple peer IDs may share the same stats object if they are
	// colocated in the same IP
	peerStats map[peer.ID]*peerGaterStats
	// stats per IP
	ipStats map[string]*peerGaterStats

	// for unit tests
	getIP func(peer.ID) string
}

type peerGaterStats struct {
	// number of connected peer IDs mapped to this stat object
	connected int
	// stats expiration time -- only valid if connected = 0
	expire time.Time

	// counters
	deliver, duplicate, ignore, reject float64
}

// WithPeerGater is a gossipsub router option that enables reactive validation queue
// management.
// The Gater is activated if the ratio of throttled/validated messages exceeds the specified
// threshold.
// Once active, the Gater probabilistically throttles peers _before_ they enter the validation
// queue, performing Random Early Drop.
// The throttle decision is randomized, with the probability of allowing messages to enter the
// validation queue controlled by the statistical observations of the performance of all peers
// in the IP address of the gated peer.
// The Gater deactivates if there is no validation throttlinc occurring for the specified quiet
// interval.
func ( *PeerGaterParams) Option {
	return func( *PubSub) error {
		,  := .rt.(*GossipSubRouter)
		if ! {
			return fmt.Errorf("pubsub router is not gossipsub")
		}

		 := .validate()
		if  != nil {
			return 
		}

		.gate = newPeerGater(.ctx, .host, )

		// hook the tracer
		if .tracer != nil {
			.tracer.raw = append(.tracer.raw, .gate)
		} else {
			.tracer = &pubsubTracer{
				raw:   []RawTracer{.gate},
				pid:   .host.ID(),
				idGen: .idGen,
			}
		}

		return nil
	}
}

func newPeerGater( context.Context,  host.Host,  *PeerGaterParams) *peerGater {
	 := &peerGater{
		params:    ,
		peerStats: make(map[peer.ID]*peerGaterStats),
		ipStats:   make(map[string]*peerGaterStats),
		host:      ,
	}
	go .background()
	return 
}

func ( *peerGater) ( context.Context) {
	 := time.NewTicker(.params.DecayInterval)

	defer .Stop()

	for {
		select {
		case <-.C:
			.decayStats()
		case <-.Done():
			return
		}
	}
}

func ( *peerGater) () {
	.Lock()
	defer .Unlock()

	.validate *= .params.GlobalDecay
	if .validate < .params.DecayToZero {
		.validate = 0
	}

	.throttle *= .params.GlobalDecay
	if .throttle < .params.DecayToZero {
		.throttle = 0
	}

	 := time.Now()
	for ,  := range .ipStats {
		if .connected > 0 {
			.deliver *= .params.SourceDecay
			if .deliver < .params.DecayToZero {
				.deliver = 0
			}

			.duplicate *= .params.SourceDecay
			if .duplicate < .params.DecayToZero {
				.duplicate = 0
			}

			.ignore *= .params.SourceDecay
			if .ignore < .params.DecayToZero {
				.ignore = 0
			}

			.reject *= .params.SourceDecay
			if .reject < .params.DecayToZero {
				.reject = 0
			}
		} else if .expire.Before() {
			delete(.ipStats, )
		}
	}
}

func ( *peerGater) ( peer.ID) *peerGaterStats {
	,  := .peerStats[]
	if ! {
		 = .getIPStats()
		.peerStats[] = 
	}
	return 
}

func ( *peerGater) ( peer.ID) *peerGaterStats {
	 := .getPeerIP()
	,  := .ipStats[]
	if ! {
		 = &peerGaterStats{}
		.ipStats[] = 
	}
	return 
}

func ( *peerGater) ( peer.ID) string {
	if .getIP != nil {
		return .getIP()
	}

	 := func( network.Conn) string {
		 := .RemoteMultiaddr()
		,  := manet.ToIP()
		if  != nil {
			log.Warnf("error determining IP for remote peer in %s: %s", , )
			return "<unknown>"
		}
		return .String()
	}

	 := .host.Network().ConnsToPeer()
	switch len() {
	case 0:
		return "<unknown>"
	case 1:
		return ([0])
	default:
		// we have multiple connections -- order by number of streams and use the one with the
		// most streams; it's a nightmare to track multiple IPs per peer, so pick the best one.
		 := make(map[string]int)
		for ,  := range  {
			if .Stat().Limited {
				// ignore transient
				continue
			}
			[.ID()] = len(.GetStreams())
		}
		sort.Slice(, func(,  int) bool {
			return [[].ID()] > [[].ID()]
		})
		return ([0])
	}
}

// router interface
func ( *peerGater) ( peer.ID) AcceptStatus {
	if  == nil {
		return AcceptAll
	}

	.Lock()
	defer .Unlock()

	// check the quiet period; if the validation queue has not throttled for more than the Quiet
	// interval, we turn off the circuit breaker and accept.
	if time.Since(.lastThrottle) > .params.Quiet {
		return AcceptAll
	}

	// no throttle events -- or they have decayed; accept.
	if .throttle == 0 {
		return AcceptAll
	}

	// check the throttle/validate ration; if it is below threshold we accept.
	if .validate != 0 && .throttle/.validate < .params.Threshold {
		return AcceptAll
	}

	 := .getPeerStats()

	// compute the goodput of the peer; the denominator is the weighted mix of message counters
	 := .deliver + .params.DuplicateWeight*.duplicate + .params.IgnoreWeight*.ignore + .params.RejectWeight*.reject
	if  == 0 {
		return AcceptAll
	}

	// we make a randomized decision based on the goodput of the peer.
	// the probabiity is biased by adding 1 to the delivery counter so that we don't unconditionally
	// throttle in the first negative event; it also ensures that a peer always has a chance of being
	// accepted; this is not a sinkhole/blacklist.
	 := (1 + .deliver) / (1 + )
	if rand.Float64() <  {
		return AcceptAll
	}

	log.Debugf("throttling peer %s with threshold %f", , )
	return AcceptControl
}

// -- RawTracer interface methods
var _ RawTracer = (*peerGater)(nil)

// tracer interface
func ( *peerGater) ( peer.ID,  protocol.ID) {
	.Lock()
	defer .Unlock()

	 := .getPeerStats()
	.connected++
}

func ( *peerGater) ( peer.ID) {
	.Lock()
	defer .Unlock()

	 := .getPeerStats()
	.connected--
	.expire = time.Now().Add(.params.RetainStats)

	delete(.peerStats, )
}

func ( *peerGater) ( string)             {}
func ( *peerGater) ( string)            {}
func ( *peerGater) ( peer.ID,  string) {}
func ( *peerGater) ( peer.ID,  string) {}

func ( *peerGater) ( *Message) {
	.Lock()
	defer .Unlock()

	.validate++
}

func ( *peerGater) ( *Message) {
	.Lock()
	defer .Unlock()

	 := .getPeerStats(.ReceivedFrom)

	 := .GetTopic()
	 := .params.TopicDeliveryWeights[]

	if  == 0 {
		 = 1
	}

	.deliver += 
}

func ( *peerGater) ( *Message,  string) {
	.Lock()
	defer .Unlock()

	switch  {
	case RejectValidationQueueFull:
		fallthrough
	case RejectValidationThrottled:
		.lastThrottle = time.Now()
		.throttle++

	case RejectValidationIgnored:
		 := .getPeerStats(.ReceivedFrom)
		.ignore++

	default:
		 := .getPeerStats(.ReceivedFrom)
		.reject++
	}
}

func ( *peerGater) ( *Message) {
	.Lock()
	defer .Unlock()

	 := .getPeerStats(.ReceivedFrom)
	.duplicate++
}

func ( *peerGater) ( peer.ID) {}

func ( *peerGater) ( *RPC) {}

func ( *peerGater) ( *RPC,  peer.ID) {}

func ( *peerGater) ( *RPC,  peer.ID) {}

func ( *peerGater) ( *Message) {}