package holepunch
import (
"context"
"errors"
"fmt"
"sync"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-msgio/pbio"
ma "github.com/multiformats/go-multiaddr"
)
const defaultDirectDialTimeout = 10 * time .Second
const Protocol protocol .ID = "/libp2p/dcutr"
var log = logging .Logger ("p2p-holepunch" )
var StreamTimeout = 1 * time .Minute
const (
ServiceName = "libp2p.holepunch"
maxMsgSize = 4 * 1024
)
var ErrClosed = errors .New ("hole punching service closing" )
type Option func (*Service ) error
func DirectDialTimeout (timeout time .Duration ) Option {
return func (s *Service ) error {
s .directDialTimeout = timeout
return nil
}
}
type Service struct {
ctx context .Context
ctxCancel context .CancelFunc
host host .Host
ids identify .IDService
listenAddrs func () []ma .Multiaddr
directDialTimeout time .Duration
holePuncherMx sync .Mutex
holePuncher *holePuncher
hasPublicAddrsChan chan struct {}
tracer *tracer
filter AddrFilter
refCount sync .WaitGroup
legacyBehavior bool
}
func (s *Service ) SetLegacyBehavior (legacyBehavior bool ) {
s .legacyBehavior = legacyBehavior
}
func NewService (h host .Host , ids identify .IDService , listenAddrs func () []ma .Multiaddr , opts ...Option ) (*Service , error ) {
if ids == nil {
return nil , errors .New ("identify service can't be nil" )
}
ctx , cancel := context .WithCancel (context .Background ())
s := &Service {
ctx : ctx ,
ctxCancel : cancel ,
host : h ,
ids : ids ,
listenAddrs : listenAddrs ,
hasPublicAddrsChan : make (chan struct {}),
directDialTimeout : defaultDirectDialTimeout ,
legacyBehavior : true ,
}
for _ , opt := range opts {
if err := opt (s ); err != nil {
cancel ()
return nil , err
}
}
s .tracer .Start ()
s .refCount .Add (1 )
go s .waitForPublicAddr ()
return s , nil
}
func (s *Service ) waitForPublicAddr () {
defer s .refCount .Done ()
log .Debugw ("waiting until we have at least one public address" , "peer" , s .host .ID ())
duration := 250 * time .Millisecond
const maxDuration = 5 * time .Second
t := time .NewTimer (duration )
defer t .Stop ()
for {
if len (s .listenAddrs ()) > 0 {
log .Debugf ("Host %s now has a public address (%s). Starting holepunch protocol." , s .host .ID (), s .host .Addrs ())
s .host .SetStreamHandler (Protocol , s .handleNewStream )
break
}
select {
case <- s .ctx .Done ():
return
case <- t .C :
duration *= 2
if duration > maxDuration {
duration = maxDuration
}
t .Reset (duration )
}
}
s .holePuncherMx .Lock ()
if s .ctx .Err () != nil {
return
}
s .holePuncher = newHolePuncher (s .host , s .ids , s .listenAddrs , s .tracer , s .filter )
s .holePuncher .directDialTimeout = s .directDialTimeout
s .holePuncher .legacyBehavior = s .legacyBehavior
s .holePuncherMx .Unlock ()
close (s .hasPublicAddrsChan )
}
func (s *Service ) Close () error {
var err error
s .ctxCancel ()
s .holePuncherMx .Lock ()
if s .holePuncher != nil {
err = s .holePuncher .Close ()
}
s .holePuncherMx .Unlock ()
s .tracer .Close ()
s .host .RemoveStreamHandler (Protocol )
s .refCount .Wait ()
return err
}
func (s *Service ) incomingHolePunch (str network .Stream ) (rtt time .Duration , remoteAddrs []ma .Multiaddr , ownAddrs []ma .Multiaddr , err error ) {
if !isRelayAddress (str .Conn ().RemoteMultiaddr ()) {
return 0 , nil , nil , fmt .Errorf ("received hole punch stream: %s" , str .Conn ().RemoteMultiaddr ())
}
ownAddrs = s .listenAddrs ()
if s .filter != nil {
ownAddrs = s .filter .FilterLocal (str .Conn ().RemotePeer (), ownAddrs )
}
if len (ownAddrs ) == 0 {
return 0 , nil , nil , errors .New ("rejecting hole punch request, as we don't have any public addresses" )
}
if err := str .Scope ().ReserveMemory (maxMsgSize , network .ReservationPriorityAlways ); err != nil {
log .Debugf ("error reserving memory for stream: %s" , err )
return 0 , nil , nil , err
}
defer str .Scope ().ReleaseMemory (maxMsgSize )
wr := pbio .NewDelimitedWriter (str )
rd := pbio .NewDelimitedReader (str , maxMsgSize )
msg := new (pb .HolePunch )
str .SetDeadline (time .Now ().Add (StreamTimeout ))
if err := rd .ReadMsg (msg ); err != nil {
return 0 , nil , nil , fmt .Errorf ("failed to read message from initiator: %w" , err )
}
if t := msg .GetType (); t != pb .HolePunch_CONNECT {
return 0 , nil , nil , fmt .Errorf ("expected CONNECT message from initiator but got %d" , t )
}
obsDial := removeRelayAddrs (addrsFromBytes (msg .ObsAddrs ))
if s .filter != nil {
obsDial = s .filter .FilterRemote (str .Conn ().RemotePeer (), obsDial )
}
log .Debugw ("received hole punch request" , "peer" , str .Conn ().RemotePeer (), "addrs" , obsDial )
if len (obsDial ) == 0 {
return 0 , nil , nil , errors .New ("expected CONNECT message to contain at least one address" )
}
msg .Reset ()
msg .Type = pb .HolePunch_CONNECT .Enum ()
msg .ObsAddrs = addrsToBytes (ownAddrs )
tstart := time .Now ()
if err := wr .WriteMsg (msg ); err != nil {
return 0 , nil , nil , fmt .Errorf ("failed to write CONNECT message to initiator: %w" , err )
}
msg .Reset ()
if err := rd .ReadMsg (msg ); err != nil {
return 0 , nil , nil , fmt .Errorf ("failed to read message from initiator: %w" , err )
}
if t := msg .GetType (); t != pb .HolePunch_SYNC {
return 0 , nil , nil , fmt .Errorf ("expected SYNC message from initiator but got %d" , t )
}
return time .Since (tstart ), obsDial , ownAddrs , nil
}
func (s *Service ) handleNewStream (str network .Stream ) {
if str .Conn ().Stat ().Direction == network .DirInbound {
str .Reset ()
return
}
if err := str .Scope ().SetService (ServiceName ); err != nil {
log .Debugf ("error attaching stream to holepunch service: %s" , err )
str .Reset ()
return
}
rp := str .Conn ().RemotePeer ()
rtt , addrs , ownAddrs , err := s .incomingHolePunch (str )
if err != nil {
s .tracer .ProtocolError (rp , err )
log .Debugw ("error handling holepunching stream from" , "peer" , rp , "error" , err )
str .Reset ()
return
}
str .Close ()
pi := peer .AddrInfo {
ID : rp ,
Addrs : addrs ,
}
s .tracer .StartHolePunch (rp , addrs , rtt )
log .Debugw ("starting hole punch" , "peer" , rp )
start := time .Now ()
s .tracer .HolePunchAttempt (pi .ID )
ctx , cancel := context .WithTimeout (s .ctx , s .directDialTimeout )
isClient := false
if s .legacyBehavior {
isClient = true
}
err = holePunchConnect (ctx , s .host , pi , isClient )
cancel ()
dt := time .Since (start )
s .tracer .EndHolePunch (rp , dt , err )
s .tracer .HolePunchFinished ("receiver" , 1 , addrs , ownAddrs , getDirectConnection (s .host , rp ))
}
func (s *Service ) DirectConnect (p peer .ID ) error {
<-s .hasPublicAddrsChan
s .holePuncherMx .Lock ()
holePuncher := s .holePuncher
s .holePuncherMx .Unlock ()
return holePuncher .DirectConnect (p )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .