package swarm
import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"sync/atomic"
"time"
"slices"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/transport"
logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
)
const (
defaultDialTimeout = 15 * time .Second
defaultDialTimeoutLocal = 5 * time .Second
defaultNewStreamTimeout = 15 * time .Second
)
var log = logging .Logger ("swarm2" )
var ErrSwarmClosed = errors .New ("swarm closed" )
var ErrAddrFiltered = errors .New ("address filtered" )
var ErrDialTimeout = errors .New ("dial timed out" )
type Option func (*Swarm ) error
func WithConnectionGater (gater connmgr .ConnectionGater ) Option {
return func (s *Swarm ) error {
s .gater = gater
return nil
}
}
func WithMultiaddrResolver (resolver network .MultiaddrDNSResolver ) Option {
return func (s *Swarm ) error {
s .multiaddrResolver = resolver
return nil
}
}
func WithMetrics (reporter metrics .Reporter ) Option {
return func (s *Swarm ) error {
s .bwc = reporter
return nil
}
}
func WithMetricsTracer (t MetricsTracer ) Option {
return func (s *Swarm ) error {
s .metricsTracer = t
return nil
}
}
func WithDialTimeout (t time .Duration ) Option {
return func (s *Swarm ) error {
s .dialTimeout = t
return nil
}
}
func WithDialTimeoutLocal (t time .Duration ) Option {
return func (s *Swarm ) error {
s .dialTimeoutLocal = t
return nil
}
}
func WithResourceManager (m network .ResourceManager ) Option {
return func (s *Swarm ) error {
s .rcmgr = m
return nil
}
}
func WithDialRanker (d network .DialRanker ) Option {
return func (s *Swarm ) error {
if d == nil {
return errors .New ("swarm: dial ranker cannot be nil" )
}
s .dialRanker = d
return nil
}
}
func WithUDPBlackHoleSuccessCounter (f *BlackHoleSuccessCounter ) Option {
return func (s *Swarm ) error {
s .udpBHF = f
return nil
}
}
func WithIPv6BlackHoleSuccessCounter (f *BlackHoleSuccessCounter ) Option {
return func (s *Swarm ) error {
s .ipv6BHF = f
return nil
}
}
func WithReadOnlyBlackHoleDetector () Option {
return func (s *Swarm ) error {
s .readOnlyBHD = true
return nil
}
}
type Swarm struct {
nextConnID atomic .Uint64
nextStreamID atomic .Uint64
refs sync .WaitGroup
emitter event .Emitter
rcmgr network .ResourceManager
local peer .ID
peers peerstore .Peerstore
dialTimeout time .Duration
dialTimeoutLocal time .Duration
conns struct {
sync .RWMutex
m map [peer .ID ][]*Conn
}
listeners struct {
sync .RWMutex
ifaceListenAddres []ma .Multiaddr
cacheEOL time .Time
m map [transport .Listener ]struct {}
}
notifs struct {
sync .RWMutex
m map [network .Notifiee ]struct {}
}
directConnNotifs struct {
sync .Mutex
m map [peer .ID ][]chan struct {}
}
transports struct {
sync .RWMutex
m map [int ]transport .Transport
}
multiaddrResolver network .MultiaddrDNSResolver
streamh atomic .Pointer [network .StreamHandler ]
dsync *dialSync
backf DialBackoff
limiter *dialLimiter
gater connmgr .ConnectionGater
closeOnce sync .Once
ctx context .Context
ctxCancel context .CancelFunc
bwc metrics .Reporter
metricsTracer MetricsTracer
dialRanker network .DialRanker
connectednessEventEmitter *connectednessEventEmitter
udpBHF *BlackHoleSuccessCounter
ipv6BHF *BlackHoleSuccessCounter
bhd *blackHoleDetector
readOnlyBHD bool
}
func NewSwarm (local peer .ID , peers peerstore .Peerstore , eventBus event .Bus , opts ...Option ) (*Swarm , error ) {
emitter , err := eventBus .Emitter (new (event .EvtPeerConnectednessChanged ))
if err != nil {
return nil , err
}
ctx , cancel := context .WithCancel (context .Background ())
s := &Swarm {
local : local ,
peers : peers ,
emitter : emitter ,
ctx : ctx ,
ctxCancel : cancel ,
dialTimeout : defaultDialTimeout ,
dialTimeoutLocal : defaultDialTimeoutLocal ,
multiaddrResolver : ResolverFromMaDNS {madns .DefaultResolver },
dialRanker : DefaultDialRanker ,
udpBHF : &BlackHoleSuccessCounter {N : 100 , MinSuccesses : 5 , Name : "UDP" },
ipv6BHF : &BlackHoleSuccessCounter {N : 100 , MinSuccesses : 5 , Name : "IPv6" },
}
s .conns .m = make (map [peer .ID ][]*Conn )
s .listeners .m = make (map [transport .Listener ]struct {})
s .transports .m = make (map [int ]transport .Transport )
s .notifs .m = make (map [network .Notifiee ]struct {})
s .directConnNotifs .m = make (map [peer .ID ][]chan struct {})
s .connectednessEventEmitter = newConnectednessEventEmitter (s .Connectedness , emitter )
for _ , opt := range opts {
if err := opt (s ); err != nil {
return nil , err
}
}
if s .rcmgr == nil {
s .rcmgr = &network .NullResourceManager {}
}
s .dsync = newDialSync (s .dialWorkerLoop )
s .limiter = newDialLimiter (s .dialAddr )
s .backf .init (s .ctx )
s .bhd = &blackHoleDetector {
udp : s .udpBHF ,
ipv6 : s .ipv6BHF ,
mt : s .metricsTracer ,
readOnly : s .readOnlyBHD ,
}
return s , nil
}
func (s *Swarm ) Close () error {
s .closeOnce .Do (s .close )
return nil
}
func (s *Swarm ) Done () <-chan struct {} {
return s .ctx .Done ()
}
func (s *Swarm ) close () {
s .ctxCancel ()
s .listeners .Lock ()
listeners := s .listeners .m
s .listeners .m = nil
s .listeners .Unlock ()
s .conns .Lock ()
conns := s .conns .m
s .conns .m = nil
s .conns .Unlock ()
s .refs .Add (len (listeners ))
for l := range listeners {
go func (l transport .Listener ) {
defer s .refs .Done ()
if err := l .Close (); err != nil && err != transport .ErrListenerClosed {
log .Errorf ("error when shutting down listener: %s" , err )
}
}(l )
}
for _ , cs := range conns {
for _ , c := range cs {
go func (c *Conn ) {
if err := c .Close (); err != nil {
log .Errorf ("error when shutting down connection: %s" , err )
}
}(c )
}
}
s .refs .Wait ()
s .connectednessEventEmitter .Close ()
s .emitter .Close ()
s .transports .Lock ()
transports := s .transports .m
s .transports .m = nil
s .transports .Unlock ()
transportsToClose := make (map [transport .Transport ]struct {}, len (transports ))
for _ , t := range transports {
transportsToClose [t ] = struct {}{}
}
var wg sync .WaitGroup
for t := range transportsToClose {
if closer , ok := t .(io .Closer ); ok {
wg .Add (1 )
go func (c io .Closer ) {
defer wg .Done ()
if err := closer .Close (); err != nil {
log .Errorf ("error when closing down transport %T: %s" , c , err )
}
}(closer )
}
}
wg .Wait ()
}
func (s *Swarm ) addConn (tc transport .CapableConn , dir network .Direction ) (*Conn , error ) {
var (
p = tc .RemotePeer ()
addr = tc .RemoteMultiaddr ()
)
var stat network .ConnStats
if cs , ok := tc .(network .ConnStat ); ok {
stat = cs .Stat ()
}
stat .Direction = dir
stat .Opened = time .Now ()
isLimited := stat .Limited
c := &Conn {
conn : tc ,
swarm : s ,
stat : stat ,
id : s .nextConnID .Add (1 ),
}
if s .gater != nil {
if allow , _ := s .gater .InterceptUpgraded (c ); !allow {
err := tc .CloseWithError (network .ConnGated )
if err != nil {
log .Warnf ("failed to close connection with peer %s and addr %s; err: %s" , p , addr , err )
}
return nil , ErrGaterDisallowedConnection
}
}
if pk := tc .RemotePublicKey (); pk != nil {
s .peers .AddPubKey (p , pk )
}
s .backf .Clear (p )
s .conns .Lock ()
if s .conns .m == nil {
s .conns .Unlock ()
tc .Close ()
return nil , ErrSwarmClosed
}
c .streams .m = make (map [*Stream ]struct {})
s .conns .m [p ] = append (s .conns .m [p ], c )
s .refs .Add (2 )
c .notifyLk .Lock ()
s .conns .Unlock ()
s .connectednessEventEmitter .AddConn (p )
if !isLimited {
s .directConnNotifs .Lock ()
for _ , ch := range s .directConnNotifs .m [p ] {
close (ch )
}
delete (s .directConnNotifs .m , p )
s .directConnNotifs .Unlock ()
}
s .notifyAll (func (f network .Notifiee ) {
f .Connected (s , c )
})
c .notifyLk .Unlock ()
c .start ()
return c , nil
}
func (s *Swarm ) Peerstore () peerstore .Peerstore {
return s .peers
}
func (s *Swarm ) SetStreamHandler (handler network .StreamHandler ) {
s .streamh .Store (&handler )
}
func (s *Swarm ) StreamHandler () network .StreamHandler {
handler := s .streamh .Load ()
if handler == nil {
return nil
}
return *handler
}
func (s *Swarm ) NewStream (ctx context .Context , p peer .ID ) (network .Stream , error ) {
log .Debugf ("[%s] opening stream to peer [%s]" , s .local , p )
numDials := 0
for {
c := s .bestConnToPeer (p )
if c == nil {
if nodial , _ := network .GetNoDial (ctx ); !nodial {
numDials ++
if numDials > DialAttempts {
return nil , errors .New ("max dial attempts exceeded" )
}
var err error
c , err = s .dialPeer (ctx , p )
if err != nil {
return nil , err
}
} else {
return nil , network .ErrNoConn
}
}
limitedAllowed , _ := network .GetAllowLimitedConn (ctx )
if !limitedAllowed && c .Stat ().Limited {
var err error
c , err = s .waitForDirectConn (ctx , p )
if err != nil {
log .Debugf ("failed to get direct connection to a limited peer %s: %s" , p , err )
return nil , err
}
}
str , err := c .NewStream (ctx )
if err != nil {
if c .conn .IsClosed () {
continue
}
return nil , err
}
return str , nil
}
}
func (s *Swarm ) waitForDirectConn (ctx context .Context , p peer .ID ) (*Conn , error ) {
s .directConnNotifs .Lock ()
c := s .bestConnToPeer (p )
if c == nil {
s .directConnNotifs .Unlock ()
return nil , network .ErrNoConn
} else if !c .Stat ().Limited {
s .directConnNotifs .Unlock ()
return c , nil
}
ch := make (chan struct {})
s .directConnNotifs .m [p ] = append (s .directConnNotifs .m [p ], ch )
s .directConnNotifs .Unlock ()
ctx , cancel := context .WithTimeout (ctx , network .GetDialPeerTimeout (ctx ))
defer cancel ()
select {
case <- ctx .Done ():
s .directConnNotifs .Lock ()
defer s .directConnNotifs .Unlock ()
s .directConnNotifs .m [p ] = slices .DeleteFunc (
s .directConnNotifs .m [p ],
func (c chan struct {}) bool { return c == ch },
)
if len (s .directConnNotifs .m [p ]) == 0 {
delete (s .directConnNotifs .m , p )
}
return nil , ctx .Err ()
case <- ch :
c := s .bestConnToPeer (p )
if c == nil {
return nil , network .ErrNoConn
}
if c .Stat ().Limited {
return nil , network .ErrLimitedConn
}
return c , nil
}
}
func (s *Swarm ) ConnsToPeer (p peer .ID ) []network .Conn {
s .conns .RLock ()
defer s .conns .RUnlock ()
conns := s .conns .m [p ]
output := make ([]network .Conn , len (conns ))
for i , c := range conns {
output [i ] = c
}
return output
}
func isBetterConn(a , b *Conn ) bool {
aLimited := a .Stat ().Limited
bLimited := b .Stat ().Limited
if aLimited != bLimited {
return !aLimited
}
aDirect := isDirectConn (a )
bDirect := isDirectConn (b )
if aDirect != bDirect {
return aDirect
}
a .streams .Lock ()
aLen := len (a .streams .m )
a .streams .Unlock ()
b .streams .Lock ()
bLen := len (b .streams .m )
b .streams .Unlock ()
if aLen != bLen {
return aLen > bLen
}
return true
}
func (s *Swarm ) bestConnToPeer (p peer .ID ) *Conn {
s .conns .RLock ()
defer s .conns .RUnlock ()
var best *Conn
for _ , c := range s .conns .m [p ] {
if c .conn .IsClosed () {
continue
}
if best == nil || isBetterConn (c , best ) {
best = c
}
}
return best
}
func (s *Swarm ) bestAcceptableConnToPeer (ctx context .Context , p peer .ID ) *Conn {
conn := s .bestConnToPeer (p )
forceDirect , _ := network .GetForceDirectDial (ctx )
if forceDirect && !isDirectConn (conn ) {
return nil
}
return conn
}
func isDirectConn(c *Conn ) bool {
return c != nil && !c .conn .Transport ().Proxy ()
}
func (s *Swarm ) Connectedness (p peer .ID ) network .Connectedness {
s .conns .RLock ()
defer s .conns .RUnlock ()
return s .connectednessUnlocked (p )
}
func (s *Swarm ) connectednessUnlocked (p peer .ID ) network .Connectedness {
var haveLimited bool
for _ , c := range s .conns .m [p ] {
if c .IsClosed () {
continue
}
if c .Stat ().Limited {
haveLimited = true
} else {
return network .Connected
}
}
if haveLimited {
return network .Limited
}
return network .NotConnected
}
func (s *Swarm ) Conns () []network .Conn {
s .conns .RLock ()
defer s .conns .RUnlock ()
conns := make ([]network .Conn , 0 , len (s .conns .m ))
for _ , cs := range s .conns .m {
for _ , c := range cs {
conns = append (conns , c )
}
}
return conns
}
func (s *Swarm ) ClosePeer (p peer .ID ) error {
conns := s .ConnsToPeer (p )
switch len (conns ) {
case 0 :
return nil
case 1 :
return conns [0 ].Close ()
default :
errCh := make (chan error )
for _ , c := range conns {
go func (c network .Conn ) {
errCh <- c .Close ()
}(c )
}
var errs []string
for range conns {
err := <-errCh
if err != nil {
errs = append (errs , err .Error())
}
}
if len (errs ) > 0 {
return fmt .Errorf ("when disconnecting from peer %s: %s" , p , strings .Join (errs , ", " ))
}
return nil
}
}
func (s *Swarm ) Peers () []peer .ID {
s .conns .RLock ()
defer s .conns .RUnlock ()
peers := make ([]peer .ID , 0 , len (s .conns .m ))
for p := range s .conns .m {
peers = append (peers , p )
}
return peers
}
func (s *Swarm ) LocalPeer () peer .ID {
return s .local
}
func (s *Swarm ) Backoff () *DialBackoff {
return &s .backf
}
func (s *Swarm ) notifyAll (notify func (network .Notifiee )) {
s .notifs .RLock ()
for f := range s .notifs .m {
notify (f )
}
s .notifs .RUnlock ()
}
func (s *Swarm ) Notify (f network .Notifiee ) {
s .notifs .Lock ()
s .notifs .m [f ] = struct {}{}
s .notifs .Unlock ()
}
func (s *Swarm ) StopNotify (f network .Notifiee ) {
s .notifs .Lock ()
delete (s .notifs .m , f )
s .notifs .Unlock ()
}
func (s *Swarm ) removeConn (c *Conn ) {
p := c .RemotePeer ()
s .conns .Lock ()
cs := s .conns .m [p ]
for i , ci := range cs {
if ci == c {
copy (cs [i :], cs [i +1 :])
cs [len (cs )-1 ] = nil
s .conns .m [p ] = cs [:len (cs )-1 ]
break
}
}
if len (s .conns .m [p ]) == 0 {
delete (s .conns .m , p )
}
s .conns .Unlock ()
}
func (s *Swarm ) String () string {
return fmt .Sprintf ("<Swarm %s>" , s .LocalPeer ())
}
func (s *Swarm ) ResourceManager () network .ResourceManager {
return s .rcmgr
}
var (
_ network .Network = (*Swarm )(nil )
_ transport .TransportNetwork = (*Swarm )(nil )
)
type connWithMetrics struct {
transport .CapableConn
opened time .Time
dir network .Direction
metricsTracer MetricsTracer
once sync .Once
closeErr error
}
func wrapWithMetrics(capableConn transport .CapableConn , metricsTracer MetricsTracer , opened time .Time , dir network .Direction ) *connWithMetrics {
c := &connWithMetrics {CapableConn : capableConn , opened : opened , dir : dir , metricsTracer : metricsTracer }
c .metricsTracer .OpenedConnection (c .dir , capableConn .RemotePublicKey (), capableConn .ConnState (), capableConn .LocalMultiaddr ())
return c
}
func (c *connWithMetrics ) completedHandshake () {
c .metricsTracer .CompletedHandshake (time .Since (c .opened ), c .ConnState (), c .LocalMultiaddr ())
}
func (c *connWithMetrics ) Close () error {
c .once .Do (func () {
c .metricsTracer .ClosedConnection (c .dir , time .Since (c .opened ), c .ConnState (), c .LocalMultiaddr ())
c .closeErr = c .CapableConn .Close ()
})
return c .closeErr
}
func (c *connWithMetrics ) CloseWithError (errCode network .ConnErrorCode ) error {
c .once .Do (func () {
c .metricsTracer .ClosedConnection (c .dir , time .Since (c .opened ), c .ConnState (), c .LocalMultiaddr ())
c .closeErr = c .CapableConn .CloseWithError (errCode )
})
return c .closeErr
}
func (c *connWithMetrics ) Stat () network .ConnStats {
if cs , ok := c .CapableConn .(network .ConnStat ); ok {
return cs .Stat ()
}
return network .ConnStats {}
}
var _ network .ConnStat = &connWithMetrics {}
type ResolverFromMaDNS struct {
*madns .Resolver
}
var _ network .MultiaddrDNSResolver = ResolverFromMaDNS {}
func startsWithDNSADDR(m ma .Multiaddr ) bool {
if m == nil {
return false
}
startsWithDNSADDR := false
ma .ForEach (m , func (c ma .Component ) bool {
startsWithDNSADDR = c .Protocol ().Code == ma .P_DNSADDR
return false
})
return startsWithDNSADDR
}
func (r ResolverFromMaDNS ) ResolveDNSAddr (ctx context .Context , expectedPeerID peer .ID , maddr ma .Multiaddr , recursionLimit int , outputLimit int ) ([]ma .Multiaddr , error ) {
if outputLimit <= 0 {
return nil , nil
}
if recursionLimit <= 0 {
return []ma .Multiaddr {maddr }, nil
}
var resolved , toResolve []ma .Multiaddr
addrs , err := r .Resolve (ctx , maddr )
if err != nil {
return nil , err
}
if len (addrs ) > outputLimit {
addrs = addrs [:outputLimit ]
}
for _ , addr := range addrs {
if startsWithDNSADDR (addr ) {
toResolve = append (toResolve , addr )
} else {
resolved = append (resolved , addr )
}
}
for i , addr := range toResolve {
nextOutputLimit := outputLimit - len (resolved ) - (len (toResolve ) - i ) + 1
resolvedAddrs , err := r .ResolveDNSAddr (ctx , expectedPeerID , addr , recursionLimit -1 , nextOutputLimit )
if err != nil {
log .Warnf ("failed to resolve dnsaddr %v %s: " , addr , err )
continue
}
resolved = append (resolved , resolvedAddrs ...)
}
if len (resolved ) > outputLimit {
resolved = resolved [:outputLimit ]
}
if expectedPeerID != "" {
removeMismatchPeerID := func (a ma .Multiaddr ) bool {
id , err := peer .IDFromP2PAddr (a )
if err == peer .ErrInvalidAddr {
return false
} else if err != nil {
return true
}
return id != expectedPeerID
}
resolved = slices .DeleteFunc (resolved , removeMismatchPeerID )
}
return resolved , nil
}
func (r ResolverFromMaDNS ) ResolveDNSComponent (ctx context .Context , maddr ma .Multiaddr , outputLimit int ) ([]ma .Multiaddr , error ) {
addrs , err := r .Resolve (ctx , maddr )
if err != nil {
return nil , err
}
if len (addrs ) > outputLimit {
addrs = addrs [:outputLimit ]
}
return addrs , nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .