package connmgr

import (
	
	
	
	
	
	

	
	
	
	

	logging 
	ma 
)

var log = logging.Logger("connmgr")

// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the
// high watermark. New connections are given a grace period before they're subject
// to trimming. Trims are automatically run on demand, only if the time from the
// previous trim is higher than 10 seconds. Furthermore, trims can be explicitly
// requested through the public interface of this struct (see TrimOpenConns).
//
// See configuration parameters in NewConnManager.
type BasicConnMgr struct {
	*decayer

	clock clock.Clock

	cfg      *config
	segments segments

	plk       sync.RWMutex
	protected map[peer.ID]map[string]struct{}

	// channel-based semaphore that enforces only a single trim is in progress
	trimMutex sync.Mutex
	connCount atomic.Int32
	// to be accessed atomically. This is mimicking the implementation of a sync.Once.
	// Take care of correct alignment when modifying this struct.
	trimCount uint64

	lastTrimMu sync.RWMutex
	lastTrim   time.Time

	refCount                sync.WaitGroup
	ctx                     context.Context
	cancel                  func()
	unregisterMemoryWatcher func()
}

var (
	_ connmgr.ConnManager = (*BasicConnMgr)(nil)
	_ connmgr.Decayer     = (*BasicConnMgr)(nil)
)

type segment struct {
	sync.Mutex
	peers map[peer.ID]*peerInfo
}

type segments struct {
	// bucketsMu is used to prevent deadlocks when concurrent processes try to
	// grab multiple segment locks at once. If you need multiple segment locks
	// at once, you should grab this lock first. You may release this lock once
	// you have the segment locks.
	bucketsMu sync.Mutex
	buckets   [256]*segment
}

func ( *segments) ( peer.ID) *segment {
	return .buckets[[len()-1]]
}

func ( *segments) () ( int) {
	for ,  := range .buckets {
		.Lock()
		 += len(.peers)
		.Unlock()
	}
	return 
}

func ( *segment) ( peer.ID,  time.Time) *peerInfo {
	,  := .peers[]
	if  {
		return 
	}
	// create a temporary peer to buffer early tags before the Connected notification arrives.
	 = &peerInfo{
		id:        ,
		firstSeen: , // this timestamp will be updated when the first Connected notification arrives.
		temp:      true,
		tags:      make(map[string]int),
		decaying:  make(map[*decayingTag]*connmgr.DecayingValue),
		conns:     make(map[network.Conn]time.Time),
	}
	.peers[] = 
	return 
}

// NewConnManager creates a new BasicConnMgr with the provided params:
// lo and hi are watermarks governing the number of connections that'll be maintained.
// When the peer count exceeds the 'high watermark', as many peers will be pruned (and
// their connections terminated) until 'low watermark' peers remain.
func (,  int,  ...Option) (*BasicConnMgr, error) {
	 := &config{
		highWater:     ,
		lowWater:      ,
		gracePeriod:   time.Minute,
		silencePeriod: 10 * time.Second,
		clock:         clock.New(),
	}
	for ,  := range  {
		if  := ();  != nil {
			return nil, 
		}
	}

	if .decayer == nil {
		// Set the default decayer config.
		.decayer = (&DecayerCfg{}).WithDefaults()
	}

	 := &BasicConnMgr{
		cfg:       ,
		clock:     .clock,
		protected: make(map[peer.ID]map[string]struct{}, 16),
		segments:  segments{},
	}

	for  := range .segments.buckets {
		.segments.buckets[] = &segment{
			peers: make(map[peer.ID]*peerInfo),
		}
	}

	.ctx, .cancel = context.WithCancel(context.Background())

	,  := NewDecayer(.decayer, )
	.decayer = 

	.refCount.Add(1)
	go .background()
	return , nil
}

// ForceTrim trims connections down to the low watermark ignoring silence period, grace period,
// or protected status. It prioritizes closing Unprotected connections. If after closing all
// unprotected connections, we still have more than lowWaterMark connections, it'll close
// protected connections.
func ( *BasicConnMgr) () {
	 := int(.connCount.Load())
	 :=  - .cfg.lowWater
	if  < 0 {
		log.Warnw("Low on memory, but we only have a few connections", "num", , "low watermark", .cfg.lowWater)
		return
	} else {
		log.Warnf("Low on memory. Closing %d connections.", )
	}

	.trimMutex.Lock()
	defer atomic.AddUint64(&.trimCount, 1)
	defer .trimMutex.Unlock()

	// Trim connections without paying attention to the silence period.
	for ,  := range .getConnsToCloseEmergency() {
		log.Infow("low on memory. closing conn", "peer", .RemotePeer())

		.CloseWithError(network.ConnGarbageCollected)
	}

	// finally, update the last trim time.
	.lastTrimMu.Lock()
	.lastTrim = .clock.Now()
	.lastTrimMu.Unlock()
}

func ( *BasicConnMgr) () error {
	.cancel()
	if .unregisterMemoryWatcher != nil {
		.unregisterMemoryWatcher()
	}
	if  := .decayer.Close();  != nil {
		return 
	}
	.refCount.Wait()
	return nil
}

func ( *BasicConnMgr) ( peer.ID,  string) {
	.plk.Lock()
	defer .plk.Unlock()

	,  := .protected[]
	if ! {
		 = make(map[string]struct{}, 2)
		.protected[] = 
	}
	[] = struct{}{}
}

func ( *BasicConnMgr) ( peer.ID,  string) ( bool) {
	.plk.Lock()
	defer .plk.Unlock()

	,  := .protected[]
	if ! {
		return false
	}
	if delete(, ); len() == 0 {
		delete(.protected, )
		return false
	}
	return true
}

func ( *BasicConnMgr) ( peer.ID,  string) ( bool) {
	.plk.Lock()
	defer .plk.Unlock()

	,  := .protected[]
	if ! {
		return false
	}

	if  == "" {
		return true
	}

	_,  = []
	return 
}

func ( *BasicConnMgr) ( connmgr.GetConnLimiter) error {
	if .cfg.highWater > .GetConnLimit() {
		return fmt.Errorf(
			"conn manager high watermark limit: %d, exceeds the system connection limit of: %d",
			.cfg.highWater,
			.GetConnLimit(),
		)
	}
	return nil
}

// peerInfo stores metadata for a given peer.
type peerInfo struct {
	id       peer.ID
	tags     map[string]int                          // value for each tag
	decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags

	value int  // cached sum of all tag values
	temp  bool // this is a temporary entry holding early tags, and awaiting connections

	conns map[network.Conn]time.Time // start time of each connection

	firstSeen time.Time // timestamp when we began tracking this peer.
}

type peerInfos []*peerInfo

// SortByValueAndStreams sorts peerInfos by their value and stream count. It
// will sort peers with no streams before those with streams (all else being
// equal). If `sortByMoreStreams` is true it will sort peers with more streams
// before those with fewer streams. This is useful to prioritize freeing memory.
func ( peerInfos) ( *segments,  bool) {
	sort.Slice(, func(,  int) bool {
		,  := [], []

		// Grab this lock so that we can grab both segment locks below without deadlocking.
		.bucketsMu.Lock()

		// lock this to protect from concurrent modifications from connect/disconnect events
		 := .get(.id)
		.Lock()
		defer .Unlock()

		 := .get(.id)
		if  !=  {
			// These two peers are not in the same segment, lets get the lock
			.Lock()
			defer .Unlock()
		}
		.bucketsMu.Unlock()

		// temporary peers are preferred for pruning.
		if .temp != .temp {
			return .temp
		}
		// otherwise, compare by value.
		if .value != .value {
			return .value < .value
		}
		 := func( map[network.Conn]time.Time) ( bool,  int) {
			for  := range  {
				 := .Stat()
				if .Direction == network.DirInbound {
					 = true
				}
				 += .NumStreams
			}
			return
		}
		,  := (.conns)
		,  := (.conns)
		// prefer closing inactive connections (no streams open)
		if  !=  && ( == 0 ||  == 0) {
			return  < 
		}
		// incoming connections are preferred for pruning
		if  !=  {
			return 
		}

		if  {
			// prune connections with a higher number of streams first
			return  < 
		} else {
			return  < 
		}
	})
}

// TrimOpenConns closes the connections of as many peers as needed to make the peer count
// equal the low watermark. Peers are sorted in ascending order based on their total value,
// pruning those peers with the lowest scores first, as long as they are not within their
// grace period.
//
// This function blocks until a trim is completed. If a trim is underway, a new
// one won't be started, and instead it'll wait until that one is completed before
// returning.
func ( *BasicConnMgr) ( context.Context) {
	// TODO: error return value so we can cleanly signal we are aborting because:
	// (a) there's another trim in progress, or (b) the silence period is in effect.

	.doTrim()
}

func ( *BasicConnMgr) () {
	defer .refCount.Done()

	 := .cfg.gracePeriod / 2
	if .cfg.silencePeriod != 0 {
		 = .cfg.silencePeriod
	}

	 := .clock.Ticker()
	defer .Stop()

	for {
		select {
		case <-.C:
			if .connCount.Load() < int32(.cfg.highWater) {
				// Below high water, skip.
				continue
			}
		case <-.ctx.Done():
			return
		}
		.trim()
	}
}

func ( *BasicConnMgr) () {
	// This logic is mimicking the implementation of sync.Once in the standard library.
	 := atomic.LoadUint64(&.trimCount)
	.trimMutex.Lock()
	defer .trimMutex.Unlock()
	if  == atomic.LoadUint64(&.trimCount) {
		.trim()
		.lastTrimMu.Lock()
		.lastTrim = .clock.Now()
		.lastTrimMu.Unlock()
		atomic.AddUint64(&.trimCount, 1)
	}
}

// trim starts the trim, if the last trim happened before the configured silence period.
func ( *BasicConnMgr) () {
	// do the actual trim.
	for ,  := range .getConnsToClose() {
		log.Debugw("closing conn", "peer", .RemotePeer())
		.CloseWithError(network.ConnGarbageCollected)
	}
}

func ( *BasicConnMgr) ( int) []network.Conn {
	 := make(peerInfos, 0, .segments.countPeers())

	.plk.RLock()
	for ,  := range .segments.buckets {
		.Lock()
		for ,  := range .peers {
			if ,  := .protected[];  {
				// skip over protected peer.
				continue
			}
			 = append(, )
		}
		.Unlock()
	}
	.plk.RUnlock()

	// Sort peers according to their value.
	.SortByValueAndStreams(&.segments, true)

	 := make([]network.Conn, 0, +10)
	for ,  := range  {
		if  <= 0 {
			break
		}
		 := .segments.get(.id)
		.Lock()
		for  := range .conns {
			 = append(, )
		}
		 -= len(.conns)
		.Unlock()
	}
	if len() >=  {
		// We found enough connections that were not protected.
		return 
	}

	// We didn't find enough unprotected connections.
	// We have no choice but to kill some protected connections.
	 = [:0]
	.plk.RLock()
	for ,  := range .segments.buckets {
		.Lock()
		for ,  := range .peers {
			 = append(, )
		}
		.Unlock()
	}
	.plk.RUnlock()

	.SortByValueAndStreams(&.segments, true)
	for ,  := range  {
		if  <= 0 {
			break
		}
		// lock this to protect from concurrent modifications from connect/disconnect events
		 := .segments.get(.id)
		.Lock()
		for  := range .conns {
			 = append(, )
		}
		 -= len(.conns)
		.Unlock()
	}
	return 
}

// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
// connections to close.
func ( *BasicConnMgr) () []network.Conn {
	if .cfg.lowWater == 0 || .cfg.highWater == 0 {
		// disabled
		return nil
	}

	if int(.connCount.Load()) <= .cfg.lowWater {
		log.Info("open connection count below limit")
		return nil
	}

	 := make(peerInfos, 0, .segments.countPeers())
	var  int
	 := .clock.Now().Add(-.cfg.gracePeriod)

	.plk.RLock()
	for ,  := range .segments.buckets {
		.Lock()
		for ,  := range .peers {
			if ,  := .protected[];  {
				// skip over protected peer.
				continue
			}
			if .firstSeen.After() {
				// skip peers in the grace period.
				continue
			}
			// note that we're copying the entry here,
			// but since inf.conns is a map, it will still point to the original object
			 = append(, )
			 += len(.conns)
		}
		.Unlock()
	}
	.plk.RUnlock()

	if  < .cfg.lowWater {
		log.Info("open connection count above limit but too many are in the grace period")
		// We have too many connections but fewer than lowWater
		// connections out of the grace period.
		//
		// If we trimmed now, we'd kill potentially useful connections.
		return nil
	}

	// Sort peers according to their value.
	.SortByValueAndStreams(&.segments, false)

	 :=  - .cfg.lowWater

	// slightly overallocate because we may have more than one conns per peer
	 := make([]network.Conn, 0, +10)

	for ,  := range  {
		if  <= 0 {
			break
		}

		// lock this to protect from concurrent modifications from connect/disconnect events
		 := .segments.get(.id)
		.Lock()
		if len(.conns) == 0 && .temp {
			// handle temporary entries for early tags -- this entry has gone past the grace period
			// and still holds no connections, so prune it.
			delete(.peers, .id)
		} else {
			for  := range .conns {
				 = append(, )
			}
			 -= len(.conns)
		}
		.Unlock()
	}

	return 
}

// GetTagInfo is called to fetch the tag information associated with a given
// peer, nil is returned if p refers to an unknown peer.
func ( *BasicConnMgr) ( peer.ID) *connmgr.TagInfo {
	 := .segments.get()
	.Lock()
	defer .Unlock()

	,  := .peers[]
	if ! {
		return nil
	}

	 := &connmgr.TagInfo{
		FirstSeen: .firstSeen,
		Value:     .value,
		Tags:      make(map[string]int),
		Conns:     make(map[string]time.Time),
	}

	for ,  := range .tags {
		.Tags[] = 
	}
	for ,  := range .decaying {
		.Tags[.name] = .Value
	}
	for ,  := range .conns {
		.Conns[.RemoteMultiaddr().String()] = 
	}

	return 
}

// TagPeer is called to associate a string and integer with a given peer.
func ( *BasicConnMgr) ( peer.ID,  string,  int) {
	 := .segments.get()
	.Lock()
	defer .Unlock()

	 := .tagInfoFor(, .clock.Now())

	// Update the total value of the peer.
	.value +=  - .tags[]
	.tags[] = 
}

// UntagPeer is called to disassociate a string and integer from a given peer.
func ( *BasicConnMgr) ( peer.ID,  string) {
	 := .segments.get()
	.Lock()
	defer .Unlock()

	,  := .peers[]
	if ! {
		log.Debug("tried to remove tag from untracked peer: ", , )
		return
	}

	// Update the total value of the peer.
	.value -= .tags[]
	delete(.tags, )
}

// UpsertTag is called to insert/update a peer tag
func ( *BasicConnMgr) ( peer.ID,  string,  func(int) int) {
	 := .segments.get()
	.Lock()
	defer .Unlock()

	 := .tagInfoFor(, .clock.Now())

	 := .tags[]
	 := ()
	.value +=  - 
	.tags[] = 
}

// CMInfo holds the configuration for BasicConnMgr, as well as status data.
type CMInfo struct {
	// The low watermark, as described in NewConnManager.
	LowWater int

	// The high watermark, as described in NewConnManager.
	HighWater int

	// The timestamp when the last trim was triggered.
	LastTrim time.Time

	// The configured grace period, as described in NewConnManager.
	GracePeriod time.Duration

	// The current connection count.
	ConnCount int
}

// GetInfo returns the configuration and status data for this connection manager.
func ( *BasicConnMgr) () CMInfo {
	.lastTrimMu.RLock()
	 := .lastTrim
	.lastTrimMu.RUnlock()

	return CMInfo{
		HighWater:   .cfg.highWater,
		LowWater:    .cfg.lowWater,
		LastTrim:    ,
		GracePeriod: .cfg.gracePeriod,
		ConnCount:   int(.connCount.Load()),
	}
}

// Notifee returns a sink through which Notifiers can inform the BasicConnMgr when
// events occur. Currently, the notifee only reacts upon connection events
// {Connected, Disconnected}.
func ( *BasicConnMgr) () network.Notifiee {
	return (*cmNotifee)()
}

type cmNotifee BasicConnMgr

func ( *cmNotifee) () *BasicConnMgr {
	return (*BasicConnMgr)()
}

// Connected is called by notifiers to inform that a new connection has been established.
// The notifee updates the BasicConnMgr to start tracking the connection. If the new connection
// count exceeds the high watermark, a trim may be triggered.
func ( *cmNotifee) ( network.Network,  network.Conn) {
	 := .cm()

	 := .RemotePeer()
	 := .segments.get()
	.Lock()
	defer .Unlock()

	 := .RemotePeer()
	,  := .peers[]
	if ! {
		 = &peerInfo{
			id:        ,
			firstSeen: .clock.Now(),
			tags:      make(map[string]int),
			decaying:  make(map[*decayingTag]*connmgr.DecayingValue),
			conns:     make(map[network.Conn]time.Time),
		}
		.peers[] = 
	} else if .temp {
		// we had created a temporary entry for this peer to buffer early tags before the
		// Connected notification arrived: flip the temporary flag, and update the firstSeen
		// timestamp to the real one.
		.temp = false
		.firstSeen = .clock.Now()
	}

	_,  = .conns[]
	if  {
		log.Error("received connected notification for conn we are already tracking: ", )
		return
	}

	.conns[] = .clock.Now()
	.connCount.Add(1)
}

// Disconnected is called by notifiers to inform that an existing connection has been closed or terminated.
// The notifee updates the BasicConnMgr accordingly to stop tracking the connection, and performs housekeeping.
func ( *cmNotifee) ( network.Network,  network.Conn) {
	 := .cm()

	 := .RemotePeer()
	 := .segments.get()
	.Lock()
	defer .Unlock()

	,  := .peers[]
	if ! {
		log.Error("received disconnected notification for peer we are not tracking: ", )
		return
	}

	_,  = .conns[]
	if ! {
		log.Error("received disconnected notification for conn we are not tracking: ", )
		return
	}

	delete(.conns, )
	if len(.conns) == 0 {
		delete(.peers, )
	}
	.connCount.Add(-1)
}

// Listen is no-op in this implementation.
func ( *cmNotifee) ( network.Network,  ma.Multiaddr) {}

// ListenClose is no-op in this implementation.
func ( *cmNotifee) ( network.Network,  ma.Multiaddr) {}