package connmgr
import (
"context"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/benbjohnson/clock"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
)
var log = logging .Logger ("connmgr" )
type BasicConnMgr struct {
*decayer
clock clock .Clock
cfg *config
segments segments
plk sync .RWMutex
protected map [peer .ID ]map [string ]struct {}
trimMutex sync .Mutex
connCount atomic .Int32
trimCount uint64
lastTrimMu sync .RWMutex
lastTrim time .Time
refCount sync .WaitGroup
ctx context .Context
cancel func ()
unregisterMemoryWatcher func ()
}
var (
_ connmgr .ConnManager = (*BasicConnMgr )(nil )
_ connmgr .Decayer = (*BasicConnMgr )(nil )
)
type segment struct {
sync .Mutex
peers map [peer .ID ]*peerInfo
}
type segments struct {
bucketsMu sync .Mutex
buckets [256 ]*segment
}
func (ss *segments ) get (p peer .ID ) *segment {
return ss .buckets [p [len (p )-1 ]]
}
func (ss *segments ) countPeers () (count int ) {
for _ , seg := range ss .buckets {
seg .Lock ()
count += len (seg .peers )
seg .Unlock ()
}
return count
}
func (s *segment ) tagInfoFor (p peer .ID , now time .Time ) *peerInfo {
pi , ok := s .peers [p ]
if ok {
return pi
}
pi = &peerInfo {
id : p ,
firstSeen : now ,
temp : true ,
tags : make (map [string ]int ),
decaying : make (map [*decayingTag ]*connmgr .DecayingValue ),
conns : make (map [network .Conn ]time .Time ),
}
s .peers [p ] = pi
return pi
}
func NewConnManager (low , hi int , opts ...Option ) (*BasicConnMgr , error ) {
cfg := &config {
highWater : hi ,
lowWater : low ,
gracePeriod : time .Minute ,
silencePeriod : 10 * time .Second ,
clock : clock .New (),
}
for _ , o := range opts {
if err := o (cfg ); err != nil {
return nil , err
}
}
if cfg .decayer == nil {
cfg .decayer = (&DecayerCfg {}).WithDefaults ()
}
cm := &BasicConnMgr {
cfg : cfg ,
clock : cfg .clock ,
protected : make (map [peer .ID ]map [string ]struct {}, 16 ),
segments : segments {},
}
for i := range cm .segments .buckets {
cm .segments .buckets [i ] = &segment {
peers : make (map [peer .ID ]*peerInfo ),
}
}
cm .ctx , cm .cancel = context .WithCancel (context .Background ())
decay , _ := NewDecayer (cfg .decayer , cm )
cm .decayer = decay
cm .refCount .Add (1 )
go cm .background ()
return cm , nil
}
func (cm *BasicConnMgr ) ForceTrim () {
connCount := int (cm .connCount .Load ())
target := connCount - cm .cfg .lowWater
if target < 0 {
log .Warnw ("Low on memory, but we only have a few connections" , "num" , connCount , "low watermark" , cm .cfg .lowWater )
return
} else {
log .Warnf ("Low on memory. Closing %d connections." , target )
}
cm .trimMutex .Lock ()
defer atomic .AddUint64 (&cm .trimCount , 1 )
defer cm .trimMutex .Unlock ()
for _ , c := range cm .getConnsToCloseEmergency (target ) {
log .Infow ("low on memory. closing conn" , "peer" , c .RemotePeer ())
c .CloseWithError (network .ConnGarbageCollected )
}
cm .lastTrimMu .Lock ()
cm .lastTrim = cm .clock .Now ()
cm .lastTrimMu .Unlock ()
}
func (cm *BasicConnMgr ) Close () error {
cm .cancel ()
if cm .unregisterMemoryWatcher != nil {
cm .unregisterMemoryWatcher ()
}
if err := cm .decayer .Close (); err != nil {
return err
}
cm .refCount .Wait ()
return nil
}
func (cm *BasicConnMgr ) Protect (id peer .ID , tag string ) {
cm .plk .Lock ()
defer cm .plk .Unlock ()
tags , ok := cm .protected [id ]
if !ok {
tags = make (map [string ]struct {}, 2 )
cm .protected [id ] = tags
}
tags [tag ] = struct {}{}
}
func (cm *BasicConnMgr ) Unprotect (id peer .ID , tag string ) (protected bool ) {
cm .plk .Lock ()
defer cm .plk .Unlock ()
tags , ok := cm .protected [id ]
if !ok {
return false
}
if delete (tags , tag ); len (tags ) == 0 {
delete (cm .protected , id )
return false
}
return true
}
func (cm *BasicConnMgr ) IsProtected (id peer .ID , tag string ) (protected bool ) {
cm .plk .Lock ()
defer cm .plk .Unlock ()
tags , ok := cm .protected [id ]
if !ok {
return false
}
if tag == "" {
return true
}
_, protected = tags [tag ]
return protected
}
func (cm *BasicConnMgr ) CheckLimit (systemLimit connmgr .GetConnLimiter ) error {
if cm .cfg .highWater > systemLimit .GetConnLimit () {
return fmt .Errorf (
"conn manager high watermark limit: %d, exceeds the system connection limit of: %d" ,
cm .cfg .highWater ,
systemLimit .GetConnLimit (),
)
}
return nil
}
type peerInfo struct {
id peer .ID
tags map [string ]int
decaying map [*decayingTag ]*connmgr .DecayingValue
value int
temp bool
conns map [network .Conn ]time .Time
firstSeen time .Time
}
type peerInfos []*peerInfo
func (p peerInfos ) SortByValueAndStreams (segments *segments , sortByMoreStreams bool ) {
sort .Slice (p , func (i , j int ) bool {
left , right := p [i ], p [j ]
segments .bucketsMu .Lock ()
leftSegment := segments .get (left .id )
leftSegment .Lock ()
defer leftSegment .Unlock ()
rightSegment := segments .get (right .id )
if leftSegment != rightSegment {
rightSegment .Lock ()
defer rightSegment .Unlock ()
}
segments .bucketsMu .Unlock ()
if left .temp != right .temp {
return left .temp
}
if left .value != right .value {
return left .value < right .value
}
incomingAndStreams := func (m map [network .Conn ]time .Time ) (incoming bool , numStreams int ) {
for c := range m {
stat := c .Stat ()
if stat .Direction == network .DirInbound {
incoming = true
}
numStreams += stat .NumStreams
}
return
}
leftIncoming , leftStreams := incomingAndStreams (left .conns )
rightIncoming , rightStreams := incomingAndStreams (right .conns )
if rightStreams != leftStreams && (leftStreams == 0 || rightStreams == 0 ) {
return leftStreams < rightStreams
}
if leftIncoming != rightIncoming {
return leftIncoming
}
if sortByMoreStreams {
return rightStreams < leftStreams
} else {
return leftStreams < rightStreams
}
})
}
func (cm *BasicConnMgr ) TrimOpenConns (_ context .Context ) {
cm .doTrim ()
}
func (cm *BasicConnMgr ) background () {
defer cm .refCount .Done ()
interval := cm .cfg .gracePeriod / 2
if cm .cfg .silencePeriod != 0 {
interval = cm .cfg .silencePeriod
}
ticker := cm .clock .Ticker (interval )
defer ticker .Stop ()
for {
select {
case <- ticker .C :
if cm .connCount .Load () < int32 (cm .cfg .highWater ) {
continue
}
case <- cm .ctx .Done ():
return
}
cm .trim ()
}
}
func (cm *BasicConnMgr ) doTrim () {
count := atomic .LoadUint64 (&cm .trimCount )
cm .trimMutex .Lock ()
defer cm .trimMutex .Unlock ()
if count == atomic .LoadUint64 (&cm .trimCount ) {
cm .trim ()
cm .lastTrimMu .Lock ()
cm .lastTrim = cm .clock .Now ()
cm .lastTrimMu .Unlock ()
atomic .AddUint64 (&cm .trimCount , 1 )
}
}
func (cm *BasicConnMgr ) trim () {
for _ , c := range cm .getConnsToClose () {
log .Debugw ("closing conn" , "peer" , c .RemotePeer ())
c .CloseWithError (network .ConnGarbageCollected )
}
}
func (cm *BasicConnMgr ) getConnsToCloseEmergency (target int ) []network .Conn {
candidates := make (peerInfos , 0 , cm .segments .countPeers ())
cm .plk .RLock ()
for _ , s := range cm .segments .buckets {
s .Lock ()
for id , inf := range s .peers {
if _ , ok := cm .protected [id ]; ok {
continue
}
candidates = append (candidates , inf )
}
s .Unlock ()
}
cm .plk .RUnlock ()
candidates .SortByValueAndStreams (&cm .segments , true )
selected := make ([]network .Conn , 0 , target +10 )
for _ , inf := range candidates {
if target <= 0 {
break
}
s := cm .segments .get (inf .id )
s .Lock ()
for c := range inf .conns {
selected = append (selected , c )
}
target -= len (inf .conns )
s .Unlock ()
}
if len (selected ) >= target {
return selected
}
candidates = candidates [:0 ]
cm .plk .RLock ()
for _ , s := range cm .segments .buckets {
s .Lock ()
for _ , inf := range s .peers {
candidates = append (candidates , inf )
}
s .Unlock ()
}
cm .plk .RUnlock ()
candidates .SortByValueAndStreams (&cm .segments , true )
for _ , inf := range candidates {
if target <= 0 {
break
}
s := cm .segments .get (inf .id )
s .Lock ()
for c := range inf .conns {
selected = append (selected , c )
}
target -= len (inf .conns )
s .Unlock ()
}
return selected
}
func (cm *BasicConnMgr ) getConnsToClose () []network .Conn {
if cm .cfg .lowWater == 0 || cm .cfg .highWater == 0 {
return nil
}
if int (cm .connCount .Load ()) <= cm .cfg .lowWater {
log .Info ("open connection count below limit" )
return nil
}
candidates := make (peerInfos , 0 , cm .segments .countPeers ())
var ncandidates int
gracePeriodStart := cm .clock .Now ().Add (-cm .cfg .gracePeriod )
cm .plk .RLock ()
for _ , s := range cm .segments .buckets {
s .Lock ()
for id , inf := range s .peers {
if _ , ok := cm .protected [id ]; ok {
continue
}
if inf .firstSeen .After (gracePeriodStart ) {
continue
}
candidates = append (candidates , inf )
ncandidates += len (inf .conns )
}
s .Unlock ()
}
cm .plk .RUnlock ()
if ncandidates < cm .cfg .lowWater {
log .Info ("open connection count above limit but too many are in the grace period" )
return nil
}
candidates .SortByValueAndStreams (&cm .segments , false )
target := ncandidates - cm .cfg .lowWater
selected := make ([]network .Conn , 0 , target +10 )
for _ , inf := range candidates {
if target <= 0 {
break
}
s := cm .segments .get (inf .id )
s .Lock ()
if len (inf .conns ) == 0 && inf .temp {
delete (s .peers , inf .id )
} else {
for c := range inf .conns {
selected = append (selected , c )
}
target -= len (inf .conns )
}
s .Unlock ()
}
return selected
}
func (cm *BasicConnMgr ) GetTagInfo (p peer .ID ) *connmgr .TagInfo {
s := cm .segments .get (p )
s .Lock ()
defer s .Unlock ()
pi , ok := s .peers [p ]
if !ok {
return nil
}
out := &connmgr .TagInfo {
FirstSeen : pi .firstSeen ,
Value : pi .value ,
Tags : make (map [string ]int ),
Conns : make (map [string ]time .Time ),
}
for t , v := range pi .tags {
out .Tags [t ] = v
}
for t , v := range pi .decaying {
out .Tags [t .name ] = v .Value
}
for c , t := range pi .conns {
out .Conns [c .RemoteMultiaddr ().String ()] = t
}
return out
}
func (cm *BasicConnMgr ) TagPeer (p peer .ID , tag string , val int ) {
s := cm .segments .get (p )
s .Lock ()
defer s .Unlock ()
pi := s .tagInfoFor (p , cm .clock .Now ())
pi .value += val - pi .tags [tag ]
pi .tags [tag ] = val
}
func (cm *BasicConnMgr ) UntagPeer (p peer .ID , tag string ) {
s := cm .segments .get (p )
s .Lock ()
defer s .Unlock ()
pi , ok := s .peers [p ]
if !ok {
log .Debug ("tried to remove tag from untracked peer: " , p , tag )
return
}
pi .value -= pi .tags [tag ]
delete (pi .tags , tag )
}
func (cm *BasicConnMgr ) UpsertTag (p peer .ID , tag string , upsert func (int ) int ) {
s := cm .segments .get (p )
s .Lock ()
defer s .Unlock ()
pi := s .tagInfoFor (p , cm .clock .Now ())
oldval := pi .tags [tag ]
newval := upsert (oldval )
pi .value += newval - oldval
pi .tags [tag ] = newval
}
type CMInfo struct {
LowWater int
HighWater int
LastTrim time .Time
GracePeriod time .Duration
ConnCount int
}
func (cm *BasicConnMgr ) GetInfo () CMInfo {
cm .lastTrimMu .RLock ()
lastTrim := cm .lastTrim
cm .lastTrimMu .RUnlock ()
return CMInfo {
HighWater : cm .cfg .highWater ,
LowWater : cm .cfg .lowWater ,
LastTrim : lastTrim ,
GracePeriod : cm .cfg .gracePeriod ,
ConnCount : int (cm .connCount .Load ()),
}
}
func (cm *BasicConnMgr ) Notifee () network .Notifiee {
return (*cmNotifee )(cm )
}
type cmNotifee BasicConnMgr
func (nn *cmNotifee ) cm () *BasicConnMgr {
return (*BasicConnMgr )(nn )
}
func (nn *cmNotifee ) Connected (_ network .Network , c network .Conn ) {
cm := nn .cm ()
p := c .RemotePeer ()
s := cm .segments .get (p )
s .Lock ()
defer s .Unlock ()
id := c .RemotePeer ()
pinfo , ok := s .peers [id ]
if !ok {
pinfo = &peerInfo {
id : id ,
firstSeen : cm .clock .Now (),
tags : make (map [string ]int ),
decaying : make (map [*decayingTag ]*connmgr .DecayingValue ),
conns : make (map [network .Conn ]time .Time ),
}
s .peers [id ] = pinfo
} else if pinfo .temp {
pinfo .temp = false
pinfo .firstSeen = cm .clock .Now ()
}
_, ok = pinfo .conns [c ]
if ok {
log .Error ("received connected notification for conn we are already tracking: " , p )
return
}
pinfo .conns [c ] = cm .clock .Now ()
cm .connCount .Add (1 )
}
func (nn *cmNotifee ) Disconnected (_ network .Network , c network .Conn ) {
cm := nn .cm ()
p := c .RemotePeer ()
s := cm .segments .get (p )
s .Lock ()
defer s .Unlock ()
cinf , ok := s .peers [p ]
if !ok {
log .Error ("received disconnected notification for peer we are not tracking: " , p )
return
}
_, ok = cinf .conns [c ]
if !ok {
log .Error ("received disconnected notification for conn we are not tracking: " , p )
return
}
delete (cinf .conns , c )
if len (cinf .conns ) == 0 {
delete (s .peers , p )
}
cm .connCount .Add (-1 )
}
func (nn *cmNotifee ) Listen (_ network .Network , _ ma .Multiaddr ) {}
func (nn *cmNotifee ) ListenClose (_ network .Network , _ ma .Multiaddr ) {}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .