package autonat
import (
"context"
"errors"
"math/rand"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/autonat/pb"
"github.com/libp2p/go-msgio/pbio"
ma "github.com/multiformats/go-multiaddr"
)
var streamTimeout = 60 * time .Second
const (
ServiceName = "libp2p.autonat"
maxMsgSize = 4096
)
type autoNATService struct {
instanceLock sync .Mutex
instance context .CancelFunc
backgroundRunning chan struct {}
config *config
mx sync .Mutex
reqs map [peer .ID ]int
globalReqs int
}
func newAutoNATService(c *config ) (*autoNATService , error ) {
if c .dialer == nil {
return nil , errors .New ("cannot create NAT service without a network" )
}
return &autoNATService {
config : c ,
reqs : make (map [peer .ID ]int ),
}, nil
}
func (as *autoNATService ) handleStream (s network .Stream ) {
if err := s .Scope ().SetService (ServiceName ); err != nil {
log .Debugf ("error attaching stream to autonat service: %s" , err )
s .Reset ()
return
}
if err := s .Scope ().ReserveMemory (maxMsgSize , network .ReservationPriorityAlways ); err != nil {
log .Debugf ("error reserving memory for autonat stream: %s" , err )
s .Reset ()
return
}
defer s .Scope ().ReleaseMemory (maxMsgSize )
s .SetDeadline (time .Now ().Add (streamTimeout ))
defer s .Close ()
pid := s .Conn ().RemotePeer ()
log .Debugf ("New stream from %s" , pid )
r := pbio .NewDelimitedReader (s , maxMsgSize )
w := pbio .NewDelimitedWriter (s )
var req pb .Message
var res pb .Message
err := r .ReadMsg (&req )
if err != nil {
log .Debugf ("Error reading message from %s: %s" , pid , err .Error())
s .Reset ()
return
}
t := req .GetType ()
if t != pb .Message_DIAL {
log .Debugf ("Unexpected message from %s: %s (%d)" , pid , t .String (), t )
s .Reset ()
return
}
dr := as .handleDial (pid , s .Conn ().RemoteMultiaddr (), req .GetDial ().GetPeer ())
res .Type = pb .Message_DIAL_RESPONSE .Enum ()
res .DialResponse = dr
err = w .WriteMsg (&res )
if err != nil {
log .Debugf ("Error writing response to %s: %s" , pid , err .Error())
s .Reset ()
return
}
if as .config .metricsTracer != nil {
as .config .metricsTracer .OutgoingDialResponse (res .GetDialResponse ().GetStatus ())
}
}
func (as *autoNATService ) handleDial (p peer .ID , obsaddr ma .Multiaddr , mpi *pb .Message_PeerInfo ) *pb .Message_DialResponse {
if mpi == nil {
return newDialResponseError (pb .Message_E_BAD_REQUEST , "missing peer info" )
}
mpid := mpi .GetId ()
if mpid != nil {
mp , err := peer .IDFromBytes (mpid )
if err != nil {
return newDialResponseError (pb .Message_E_BAD_REQUEST , "bad peer id" )
}
if mp != p {
return newDialResponseError (pb .Message_E_BAD_REQUEST , "peer id mismatch" )
}
}
addrs := make ([]ma .Multiaddr , 0 , as .config .maxPeerAddresses )
seen := make (map [string ]struct {})
if as .config .dialPolicy .skipDial (obsaddr ) {
if as .config .metricsTracer != nil {
as .config .metricsTracer .OutgoingDialRefused (dial_blocked )
}
return newDialResponseError (pb .Message_E_DIAL_REFUSED , "refusing to dial peer with blocked observed address" )
}
hostIP , _ := ma .SplitFirst (obsaddr )
switch hostIP .Protocol ().Code {
case ma .P_IP4 , ma .P_IP6 :
default :
return newDialResponseError (pb .Message_E_INTERNAL_ERROR , "expected an IP address" )
}
addrs = append (addrs , obsaddr )
seen [obsaddr .String ()] = struct {}{}
for _ , maddr := range mpi .GetAddrs () {
addr , err := ma .NewMultiaddrBytes (maddr )
if err != nil {
log .Debugf ("Error parsing multiaddr: %s" , err .Error())
continue
}
if ip , rest := ma .SplitFirst (addr ); !ip .Equal (hostIP ) {
switch ip .Protocol ().Code {
case ma .P_IP4 , ma .P_IP6 :
default :
continue
}
addr = hostIP .Multiaddr ()
if len (rest ) > 0 {
addr = addr .Encapsulate (rest )
}
}
if as .config .dialPolicy .skipDial (addr ) {
continue
}
str := addr .String ()
_ , ok := seen [str ]
if ok {
continue
}
addrs = append (addrs , addr )
seen [str ] = struct {}{}
if len (addrs ) >= as .config .maxPeerAddresses {
break
}
}
if len (addrs ) == 0 {
if as .config .metricsTracer != nil {
as .config .metricsTracer .OutgoingDialRefused (no_valid_address )
}
return newDialResponseError (pb .Message_E_DIAL_REFUSED , "no dialable addresses" )
}
return as .doDial (peer .AddrInfo {ID : p , Addrs : addrs })
}
func (as *autoNATService ) doDial (pi peer .AddrInfo ) *pb .Message_DialResponse {
as .mx .Lock ()
count := as .reqs [pi .ID ]
if count >= as .config .throttlePeerMax || (as .config .throttleGlobalMax > 0 &&
as .globalReqs >= as .config .throttleGlobalMax ) {
as .mx .Unlock ()
if as .config .metricsTracer != nil {
as .config .metricsTracer .OutgoingDialRefused (rate_limited )
}
return newDialResponseError (pb .Message_E_DIAL_REFUSED , "too many dials" )
}
as .reqs [pi .ID ] = count + 1
as .globalReqs ++
as .mx .Unlock ()
ctx , cancel := context .WithTimeout (context .Background (), as .config .dialTimeout )
defer cancel ()
as .config .dialer .Peerstore ().ClearAddrs (pi .ID )
as .config .dialer .Peerstore ().AddAddrs (pi .ID , pi .Addrs , peerstore .TempAddrTTL )
defer func () {
as .config .dialer .Peerstore ().ClearAddrs (pi .ID )
as .config .dialer .Peerstore ().RemovePeer (pi .ID )
}()
conn , err := as .config .dialer .DialPeer (ctx , pi .ID )
if err != nil {
log .Debugf ("error dialing %s: %s" , pi .ID , err .Error())
<-ctx .Done ()
return newDialResponseError (pb .Message_E_DIAL_ERROR , "dial failed" )
}
ra := conn .RemoteMultiaddr ()
as .config .dialer .ClosePeer (pi .ID )
return newDialResponseOK (ra )
}
func (as *autoNATService ) Enable () {
as .instanceLock .Lock ()
defer as .instanceLock .Unlock ()
if as .instance != nil {
return
}
ctx , cancel := context .WithCancel (context .Background ())
as .instance = cancel
as .backgroundRunning = make (chan struct {})
as .config .host .SetStreamHandler (AutoNATProto , as .handleStream )
go as .background (ctx )
}
func (as *autoNATService ) Disable () {
as .instanceLock .Lock ()
defer as .instanceLock .Unlock ()
if as .instance != nil {
as .config .host .RemoveStreamHandler (AutoNATProto )
as .instance ()
as .instance = nil
<-as .backgroundRunning
}
}
func (as *autoNATService ) Close () error {
as .Disable ()
return as .config .dialer .Close ()
}
func (as *autoNATService ) background (ctx context .Context ) {
defer close (as .backgroundRunning )
timer := time .NewTimer (as .config .throttleResetPeriod )
defer timer .Stop ()
for {
select {
case <- timer .C :
as .mx .Lock ()
as .reqs = make (map [peer .ID ]int )
as .globalReqs = 0
as .mx .Unlock ()
jitter := rand .Float32 () * float32 (as .config .throttleResetJitter )
timer .Reset (as .config .throttleResetPeriod + time .Duration (int64 (jitter )))
case <- ctx .Done ():
return
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .