package swarm
import (
"context"
"errors"
"fmt"
"net/netip"
"strconv"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/canonicallog"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
manet "github.com/multiformats/go-multiaddr/net"
)
const maximumResolvedAddresses = 100
const maximumDNSADDRRecursion = 4
var (
ErrDialBackoff = errors .New ("dial backoff" )
ErrDialRefusedBlackHole = errors .New ("dial refused because of black hole" )
ErrDialToSelf = errors .New ("dial to self attempted" )
ErrNoTransport = errors .New ("no transport for protocol" )
ErrAllDialsFailed = errors .New ("all dials failed" )
ErrNoAddresses = errors .New ("no addresses" )
ErrNoGoodAddresses = errors .New ("no good addresses" )
ErrGaterDisallowedConnection = errors .New ("gater disallows connection to peer" )
)
var ErrQUICDraft29 errQUICDraft29
type errQUICDraft29 struct {}
func (errQUICDraft29 ) Error () string {
return "QUIC draft-29 has been removed, QUIC (RFC 9000) is accessible with /quic-v1"
}
func (errQUICDraft29 ) Unwrap () error {
return ErrNoTransport
}
const DialAttempts = 1
const ConcurrentFdDials = 160
var DefaultPerPeerRateLimit = 8
type DialBackoff struct {
entries map [peer .ID ]map [string ]*backoffAddr
lock sync .RWMutex
}
type backoffAddr struct {
tries int
until time .Time
}
func (db *DialBackoff ) init (ctx context .Context ) {
if db .entries == nil {
db .entries = make (map [peer .ID ]map [string ]*backoffAddr )
}
go db .background (ctx )
}
func (db *DialBackoff ) background (ctx context .Context ) {
ticker := time .NewTicker (BackoffMax )
defer ticker .Stop ()
for {
select {
case <- ctx .Done ():
return
case <- ticker .C :
db .cleanup ()
}
}
}
func (db *DialBackoff ) Backoff (p peer .ID , addr ma .Multiaddr ) (backoff bool ) {
db .lock .RLock ()
defer db .lock .RUnlock ()
ap , found := db .entries [p ][string (addr .Bytes ())]
return found && time .Now ().Before (ap .until )
}
var BackoffBase = time .Second * 5
var BackoffCoef = time .Second
var BackoffMax = time .Minute * 5
func (db *DialBackoff ) AddBackoff (p peer .ID , addr ma .Multiaddr ) {
saddr := string (addr .Bytes ())
db .lock .Lock ()
defer db .lock .Unlock ()
bp , ok := db .entries [p ]
if !ok {
bp = make (map [string ]*backoffAddr , 1 )
db .entries [p ] = bp
}
ba , ok := bp [saddr ]
if !ok {
bp [saddr ] = &backoffAddr {
tries : 1 ,
until : time .Now ().Add (BackoffBase ),
}
return
}
backoffTime := BackoffBase + BackoffCoef *time .Duration (ba .tries *ba .tries )
if backoffTime > BackoffMax {
backoffTime = BackoffMax
}
ba .until = time .Now ().Add (backoffTime )
ba .tries ++
}
func (db *DialBackoff ) Clear (p peer .ID ) {
db .lock .Lock ()
defer db .lock .Unlock ()
delete (db .entries , p )
}
func (db *DialBackoff ) cleanup () {
db .lock .Lock ()
defer db .lock .Unlock ()
now := time .Now ()
for p , e := range db .entries {
good := false
for _ , backoff := range e {
backoffTime := BackoffBase + BackoffCoef *time .Duration (backoff .tries *backoff .tries )
if backoffTime > BackoffMax {
backoffTime = BackoffMax
}
if now .Before (backoff .until .Add (backoffTime )) {
good = true
break
}
}
if !good {
delete (db .entries , p )
}
}
}
func (s *Swarm ) DialPeer (ctx context .Context , p peer .ID ) (network .Conn , error ) {
c , err := s .dialPeer (ctx , p )
if err != nil {
return nil , err
}
return c , nil
}
func (s *Swarm ) dialPeer (ctx context .Context , p peer .ID ) (*Conn , error ) {
log .Debugw ("dialing peer" , "from" , s .local , "to" , p )
err := p .Validate ()
if err != nil {
return nil , err
}
if p == s .local {
return nil , ErrDialToSelf
}
conn := s .bestAcceptableConnToPeer (ctx , p )
if conn != nil {
return conn , nil
}
if s .gater != nil && !s .gater .InterceptPeerDial (p ) {
log .Debugf ("gater disallowed outbound connection to peer %s" , p )
return nil , &DialError {Peer : p , Cause : ErrGaterDisallowedConnection }
}
ctx , cancel := context .WithTimeout (ctx , network .GetDialPeerTimeout (ctx ))
defer cancel ()
conn , err = s .dsync .Dial (ctx , p )
if err == nil {
if conn .RemotePeer () != p {
conn .Close ()
log .Errorw ("Handshake failed to properly authenticate peer" , "authenticated" , conn .RemotePeer (), "expected" , p )
return nil , fmt .Errorf ("unexpected peer" )
}
return conn , nil
}
log .Debugf ("network for %s finished dialing %s" , s .local , p )
if ctx .Err () != nil {
return nil , ctx .Err ()
}
if s .ctx .Err () != nil {
return nil , ErrSwarmClosed
}
return nil , err
}
func (s *Swarm ) dialWorkerLoop (p peer .ID , reqch <-chan dialRequest ) {
w := newDialWorker (s , p , reqch , nil )
w .loop ()
}
func (s *Swarm ) addrsForDial (ctx context .Context , p peer .ID ) (goodAddrs []ma .Multiaddr , addrErrs []TransportError , err error ) {
peerAddrs := s .peers .Addrs (p )
if len (peerAddrs ) == 0 {
return nil , nil , ErrNoAddresses
}
resolved := s .resolveAddrs (ctx , peer .AddrInfo {ID : p , Addrs : peerAddrs })
goodAddrs = ma .Unique (resolved )
goodAddrs , addrErrs = s .filterKnownUndialables (p , goodAddrs )
if forceDirect , _ := network .GetForceDirectDial (ctx ); forceDirect {
goodAddrs = ma .FilterAddrs (goodAddrs , s .nonProxyAddr )
}
if len (goodAddrs ) == 0 {
return nil , addrErrs , ErrNoGoodAddresses
}
s .peers .AddAddrs (p , goodAddrs , peerstore .TempAddrTTL )
return goodAddrs , addrErrs , nil
}
func startsWithDNSComponent(m ma .Multiaddr ) bool {
if m == nil {
return false
}
startsWithDNS := false
ma .ForEach (m , func (c ma .Component ) bool {
switch c .Protocol ().Code {
case ma .P_DNS , ma .P_DNS4 , ma .P_DNS6 :
startsWithDNS = true
}
return false
})
return startsWithDNS
}
func stripP2PComponent(addrs []ma .Multiaddr ) []ma .Multiaddr {
for i , addr := range addrs {
if id , _ := peer .IDFromP2PAddr (addr ); id != "" {
addrs [i ], _ = ma .SplitLast (addr )
}
}
return addrs
}
type resolver struct {
canResolve func (ma .Multiaddr ) bool
resolve func (ctx context .Context , maddr ma .Multiaddr , outputLimit int ) ([]ma .Multiaddr , error )
}
type resolveErr struct {
addr ma .Multiaddr
err error
}
func chainResolvers(ctx context .Context , addrs []ma .Multiaddr , outputLimit int , resolvers []resolver ) ([]ma .Multiaddr , []resolveErr ) {
nextAddrs := make ([]ma .Multiaddr , 0 , len (addrs ))
errs := make ([]resolveErr , 0 )
for _ , r := range resolvers {
for _ , a := range addrs {
if !r .canResolve (a ) {
nextAddrs = append (nextAddrs , a )
continue
}
if len (nextAddrs ) >= outputLimit {
nextAddrs = nextAddrs [:outputLimit ]
break
}
next , err := r .resolve (ctx , a , outputLimit -len (nextAddrs ))
if err != nil {
errs = append (errs , resolveErr {addr : a , err : err })
continue
}
nextAddrs = append (nextAddrs , next ...)
}
addrs , nextAddrs = nextAddrs , addrs
nextAddrs = nextAddrs [:0 ]
}
return addrs , errs
}
func (s *Swarm ) resolveAddrs (ctx context .Context , pi peer .AddrInfo ) []ma .Multiaddr {
dnsAddrResolver := resolver {
canResolve : startsWithDNSADDR ,
resolve : func (ctx context .Context , maddr ma .Multiaddr , outputLimit int ) ([]ma .Multiaddr , error ) {
return s .multiaddrResolver .ResolveDNSAddr (ctx , pi .ID , maddr , maximumDNSADDRRecursion , outputLimit )
},
}
var skipped []ma .Multiaddr
skipResolver := resolver {
canResolve : func (addr ma .Multiaddr ) bool {
tpt := s .TransportForDialing (addr )
if tpt == nil {
return false
}
_ , ok := tpt .(transport .SkipResolver )
return ok
},
resolve : func (ctx context .Context , addr ma .Multiaddr , _ int ) ([]ma .Multiaddr , error ) {
tpt := s .TransportForDialing (addr )
resolver , ok := tpt .(transport .SkipResolver )
if !ok {
return []ma .Multiaddr {addr }, nil
}
if resolver .SkipResolve (ctx , addr ) {
skipped = append (skipped , addr )
return nil , nil
}
return []ma .Multiaddr {addr }, nil
},
}
tptResolver := resolver {
canResolve : func (addr ma .Multiaddr ) bool {
tpt := s .TransportForDialing (addr )
if tpt == nil {
return false
}
_ , ok := tpt .(transport .Resolver )
return ok
},
resolve : func (ctx context .Context , addr ma .Multiaddr , outputLimit int ) ([]ma .Multiaddr , error ) {
tpt := s .TransportForDialing (addr )
resolver , ok := tpt .(transport .Resolver )
if !ok {
return []ma .Multiaddr {addr }, nil
}
addrs , err := resolver .Resolve (ctx , addr )
if err != nil {
return nil , err
}
if len (addrs ) > outputLimit {
addrs = addrs [:outputLimit ]
}
return addrs , nil
},
}
dnsResolver := resolver {
canResolve : startsWithDNSComponent ,
resolve : s .multiaddrResolver .ResolveDNSComponent ,
}
addrs , errs := chainResolvers (ctx , pi .Addrs , maximumResolvedAddresses , []resolver {dnsAddrResolver , skipResolver , tptResolver , dnsResolver })
for _ , err := range errs {
log .Warnf ("Failed to resolve addr %s: %v" , err .addr , err .err )
}
addrs = append (addrs , skipped ...)
return stripP2PComponent (addrs )
}
func (s *Swarm ) dialNextAddr (ctx context .Context , p peer .ID , addr ma .Multiaddr , resch chan transport .DialUpdate ) error {
if forceDirect , _ := network .GetForceDirectDial (ctx ); !forceDirect {
if s .backf .Backoff (p , addr ) {
return ErrDialBackoff
}
}
s .limitedDial (ctx , p , addr , resch )
return nil
}
func (s *Swarm ) CanDial (p peer .ID , addr ma .Multiaddr ) bool {
dialable , _ := s .filterKnownUndialables (p , []ma .Multiaddr {addr })
return len (dialable ) > 0
}
func (s *Swarm ) nonProxyAddr (addr ma .Multiaddr ) bool {
t := s .TransportForDialing (addr )
return !t .Proxy ()
}
var quicDraft29DialMatcher = mafmt .And (mafmt .IP , mafmt .Base (ma .P_UDP ), mafmt .Base (ma .P_QUIC ))
func (s *Swarm ) filterKnownUndialables (p peer .ID , addrs []ma .Multiaddr ) (goodAddrs []ma .Multiaddr , addrErrs []TransportError ) {
lisAddrs , _ := s .InterfaceListenAddresses ()
var ourAddrs []ma .Multiaddr
for _ , addr := range lisAddrs {
ma .ForEach (addr , func (c ma .Component ) bool {
if c .Protocol ().Code == ma .P_IP4 || c .Protocol ().Code == ma .P_IP6 {
ourAddrs = append (ourAddrs , addr )
}
return false
})
}
addrErrs = make ([]TransportError , 0 , len (addrs ))
addrs = ma .FilterAddrs (addrs , func (a ma .Multiaddr ) bool {
if s .TransportForDialing (a ) == nil {
e := ErrNoTransport
if quicDraft29DialMatcher .Matches (a ) {
e = ErrQUICDraft29
}
addrErrs = append (addrErrs , TransportError {Address : a , Cause : e })
return false
}
return true
})
addrs = filterLowPriorityAddresses (addrs )
addrs , blackHoledAddrs := s .bhd .FilterAddrs (addrs )
for _ , a := range blackHoledAddrs {
addrErrs = append (addrErrs , TransportError {Address : a , Cause : ErrDialRefusedBlackHole })
}
return ma .FilterAddrs (addrs ,
func (addr ma .Multiaddr ) bool {
return !manet .IsIPUnspecified (addr )
},
func (addr ma .Multiaddr ) bool {
if ma .Contains (ourAddrs , addr ) {
addrErrs = append (addrErrs , TransportError {Address : addr , Cause : ErrDialToSelf })
return false
}
return true
},
func (addr ma .Multiaddr ) bool { return !manet .IsIP6LinkLocal (addr ) },
func (addr ma .Multiaddr ) bool {
if s .gater != nil && !s .gater .InterceptAddrDial (p , addr ) {
addrErrs = append (addrErrs , TransportError {Address : addr , Cause : ErrGaterDisallowedConnection })
return false
}
return true
},
), addrErrs
}
func (s *Swarm ) limitedDial (ctx context .Context , p peer .ID , a ma .Multiaddr , resp chan transport .DialUpdate ) {
timeout := s .dialTimeout
if manet .IsPrivateAddr (a ) && s .dialTimeoutLocal < s .dialTimeout {
timeout = s .dialTimeoutLocal
}
s .limiter .AddDialJob (&dialJob {
addr : a ,
peer : p ,
resp : resp ,
ctx : ctx ,
timeout : timeout ,
})
}
func (s *Swarm ) dialAddr (ctx context .Context , p peer .ID , addr ma .Multiaddr , updCh chan <- transport .DialUpdate ) (transport .CapableConn , error ) {
if s .local == p {
return nil , ErrDialToSelf
}
if err := ctx .Err (); err != nil {
log .Debugf ("%s swarm not dialing. Context cancelled: %v. %s %s" , s .local , err , p , addr )
return nil , err
}
log .Debugf ("%s swarm dialing %s %s" , s .local , p , addr )
tpt := s .TransportForDialing (addr )
if tpt == nil {
return nil , ErrNoTransport
}
start := time .Now ()
var connC transport .CapableConn
var err error
if du , ok := tpt .(transport .DialUpdater ); ok {
connC , err = du .DialWithUpdates (ctx , addr , p , updCh )
} else {
connC , err = tpt .Dial (ctx , addr , p )
}
s .bhd .RecordResult (addr , err == nil )
if err != nil {
if s .metricsTracer != nil {
s .metricsTracer .FailedDialing (addr , err , context .Cause (ctx ))
}
return nil , err
}
canonicallog .LogPeerStatus (100 , connC .RemotePeer (), connC .RemoteMultiaddr (), "connection_status" , "established" , "dir" , "outbound" )
if s .metricsTracer != nil {
connWithMetrics := wrapWithMetrics (connC , s .metricsTracer , start , network .DirOutbound )
connWithMetrics .completedHandshake ()
connC = connWithMetrics
}
if connC .RemotePeer () != p {
connC .Close ()
err = fmt .Errorf ("BUG in transport %T: tried to dial %s, dialed %s" , tpt , p , connC .RemotePeer ())
log .Error (err )
return nil , err
}
return connC , nil
}
func isFdConsumingAddr(addr ma .Multiaddr ) bool {
first , _ := ma .SplitFunc (addr , func (c ma .Component ) bool {
return c .Protocol ().Code == ma .P_CIRCUIT
})
if first == nil {
return true
}
_ , err1 := first .ValueForProtocol (ma .P_TCP )
_ , err2 := first .ValueForProtocol (ma .P_UNIX )
return err1 == nil || err2 == nil
}
func isRelayAddr(addr ma .Multiaddr ) bool {
_ , err := addr .ValueForProtocol (ma .P_CIRCUIT )
return err == nil
}
func filterLowPriorityAddresses(addrs []ma .Multiaddr ) []ma .Multiaddr {
quicV1Addr := make (map [netip .AddrPort ]struct {})
tcpAddr := make (map [netip .AddrPort ]struct {})
for _ , a := range addrs {
switch {
case isProtocolAddr (a , ma .P_WEBTRANSPORT ):
case isProtocolAddr (a , ma .P_QUIC_V1 ):
ap , err := addrPort (a , ma .P_UDP )
if err != nil {
continue
}
quicV1Addr [ap ] = struct {}{}
case isProtocolAddr (a , ma .P_WS ) || isProtocolAddr (a , ma .P_WSS ):
case isProtocolAddr (a , ma .P_TCP ):
ap , err := addrPort (a , ma .P_TCP )
if err != nil {
continue
}
tcpAddr [ap ] = struct {}{}
}
}
i := 0
for _ , a := range addrs {
switch {
case isProtocolAddr (a , ma .P_WEBTRANSPORT ) || isProtocolAddr (a , ma .P_QUIC ):
ap , err := addrPort (a , ma .P_UDP )
if err != nil {
break
}
if _ , ok := quicV1Addr [ap ]; ok {
continue
}
case isProtocolAddr (a , ma .P_WS ) || isProtocolAddr (a , ma .P_WSS ):
ap , err := addrPort (a , ma .P_TCP )
if err != nil {
break
}
if _ , ok := tcpAddr [ap ]; ok {
continue
}
}
addrs [i ] = a
i ++
}
return addrs [:i ]
}
func addrPort(a ma .Multiaddr , p int ) (netip .AddrPort , error ) {
ip , err := manet .ToIP (a )
if err != nil {
return netip .AddrPort {}, err
}
port , err := a .ValueForProtocol (p )
if err != nil {
return netip .AddrPort {}, err
}
pi , err := strconv .Atoi (port )
if err != nil {
return netip .AddrPort {}, err
}
addr , ok := netip .AddrFromSlice (ip )
if !ok {
return netip .AddrPort {}, fmt .Errorf ("failed to parse IP %s" , ip )
}
return netip .AddrPortFrom (addr , uint16 (pi )), nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .