package autonatv2
import (
"context"
"errors"
"fmt"
"iter"
"math/rand/v2"
"slices"
"sync"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
const (
ServiceName = "libp2p.autonatv2"
DialBackProtocol = "/libp2p/autonat/2/dial-back"
DialProtocol = "/libp2p/autonat/2/dial-request"
maxMsgSize = 8192
streamTimeout = 15 * time .Second
dialBackStreamTimeout = 5 * time .Second
dialBackDialTimeout = 10 * time .Second
dialBackMaxMsgSize = 1024
minHandshakeSizeBytes = 30_000
maxHandshakeSizeBytes = 100_000
maxPeerAddresses = 50
defaultThrottlePeerDuration = 2 * time .Minute
)
var (
ErrNoPeers = errors .New ("no peers for autonat v2" )
ErrPrivateAddrs = errors .New ("private addresses cannot be verified with autonatv2" )
log = logging .Logger ("autonatv2" )
)
type Request struct {
Addr ma .Multiaddr
SendDialData bool
}
type Result struct {
Addr ma .Multiaddr
Idx int
Reachability network .Reachability
AllAddrsRefused bool
}
type AutoNAT struct {
host host .Host
ctx context .Context
cancel context .CancelFunc
wg sync .WaitGroup
srv *server
cli *client
mx sync .Mutex
peers *peersMap
throttlePeer map [peer .ID ]time .Time
throttlePeerDuration time .Duration
allowPrivateAddrs bool
}
func New (dialerHost host .Host , opts ...AutoNATOption ) (*AutoNAT , error ) {
s := defaultSettings ()
for _ , o := range opts {
if err := o (s ); err != nil {
return nil , fmt .Errorf ("failed to apply option: %w" , err )
}
}
ctx , cancel := context .WithCancel (context .Background ())
an := &AutoNAT {
ctx : ctx ,
cancel : cancel ,
srv : newServer (dialerHost , s ),
cli : newClient (s ),
allowPrivateAddrs : s .allowPrivateAddrs ,
peers : newPeersMap (),
throttlePeer : make (map [peer .ID ]time .Time ),
throttlePeerDuration : s .throttlePeerDuration ,
}
return an , nil
}
func (an *AutoNAT ) background (sub event .Subscription ) {
ticker := time .NewTicker (10 * time .Minute )
for {
select {
case <- an .ctx .Done ():
sub .Close ()
an .wg .Done ()
return
case e := <- sub .Out ():
switch evt := e .(type ) {
case event .EvtPeerProtocolsUpdated :
an .updatePeer (evt .Peer )
case event .EvtPeerConnectednessChanged :
an .updatePeer (evt .Peer )
case event .EvtPeerIdentificationCompleted :
an .updatePeer (evt .Peer )
default :
log .Errorf ("unexpected event: %T" , e )
}
case <- ticker .C :
now := time .Now ()
an .mx .Lock ()
for p , t := range an .throttlePeer {
if t .Before (now ) {
delete (an .throttlePeer , p )
}
}
an .mx .Unlock ()
}
}
}
func (an *AutoNAT ) Start (h host .Host ) error {
an .host = h
sub , err := an .host .EventBus ().Subscribe ([]interface {}{
new (event .EvtPeerProtocolsUpdated ),
new (event .EvtPeerConnectednessChanged ),
new (event .EvtPeerIdentificationCompleted ),
})
if err != nil {
return fmt .Errorf ("event subscription failed: %w" , err )
}
an .cli .Start (h )
an .srv .Start (h )
an .wg .Add (1 )
go an .background (sub )
return nil
}
func (an *AutoNAT ) Close () {
an .cancel ()
an .wg .Wait ()
an .srv .Close ()
an .cli .Close ()
an .peers = nil
}
func (an *AutoNAT ) GetReachability (ctx context .Context , reqs []Request ) (Result , error ) {
var filteredReqs []Request
if !an .allowPrivateAddrs {
filteredReqs = make ([]Request , 0 , len (reqs ))
for _ , r := range reqs {
if manet .IsPublicAddr (r .Addr ) {
filteredReqs = append (filteredReqs , r )
} else {
log .Errorf ("private address in reachability check: %s" , r .Addr )
}
}
if len (filteredReqs ) == 0 {
return Result {}, ErrPrivateAddrs
}
} else {
filteredReqs = reqs
}
an .mx .Lock ()
now := time .Now ()
var p peer .ID
for pr := range an .peers .Shuffled () {
if t := an .throttlePeer [pr ]; t .After (now ) {
continue
}
p = pr
an .throttlePeer [p ] = time .Now ().Add (an .throttlePeerDuration )
break
}
an .mx .Unlock ()
if p == "" {
return Result {}, ErrNoPeers
}
res , err := an .cli .GetReachability (ctx , p , filteredReqs )
if err != nil {
log .Debugf ("reachability check with %s failed, err: %s" , p , err )
return res , fmt .Errorf ("reachability check with %s failed: %w" , p , err )
}
for i , r := range reqs {
if r .Addr .Equal (res .Addr ) {
res .Idx = i
break
}
}
log .Debugf ("reachability check with %s successful" , p )
return res , nil
}
func (an *AutoNAT ) updatePeer (p peer .ID ) {
an .mx .Lock ()
defer an .mx .Unlock ()
protos , err := an .host .Peerstore ().SupportsProtocols (p , DialProtocol )
connectedness := an .host .Network ().Connectedness (p )
if err == nil && connectedness == network .Connected && slices .Contains (protos , DialProtocol ) {
an .peers .Put (p )
} else {
an .peers .Delete (p )
}
}
type peersMap struct {
peerIdx map [peer .ID ]int
peers []peer .ID
}
func newPeersMap() *peersMap {
return &peersMap {
peerIdx : make (map [peer .ID ]int ),
peers : make ([]peer .ID , 0 ),
}
}
func (p *peersMap ) Shuffled () iter .Seq [peer .ID ] {
n := len (p .peers )
start := 0
if n > 0 {
start = rand .IntN (n )
}
return func (yield func (peer .ID ) bool ) {
for i := range n {
if !yield (p .peers [(i +start )%n ]) {
return
}
}
}
}
func (p *peersMap ) Put (id peer .ID ) {
if _ , ok := p .peerIdx [id ]; ok {
return
}
p .peers = append (p .peers , id )
p .peerIdx [id ] = len (p .peers ) - 1
}
func (p *peersMap ) Delete (id peer .ID ) {
idx , ok := p .peerIdx [id ]
if !ok {
return
}
n := len (p .peers )
lastPeer := p .peers [n -1 ]
p .peers [idx ] = lastPeer
p .peerIdx [lastPeer ] = idx
p .peers [n -1 ] = ""
p .peers = p .peers [:n -1 ]
delete (p .peerIdx , id )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .