package basichost
import (
"context"
"errors"
"fmt"
"math"
"slices"
"sync"
"sync/atomic"
"time"
"github.com/benbjohnson/clock"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/p2p/protocol/autonatv2"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
type autonatv2Client interface {
GetReachability(ctx context .Context , reqs []autonatv2 .Request ) (autonatv2 .Result , error )
}
const (
maxAddrsPerRequest = 10
maxTrackedAddrs = 50
defaultMaxConcurrency = 5
newAddrsProbeDelay = 1 * time .Second
)
type addrsReachabilityTracker struct {
ctx context .Context
cancel context .CancelFunc
wg sync .WaitGroup
client autonatv2Client
reachabilityUpdateCh chan struct {}
maxConcurrency int
newAddrsProbeDelay time .Duration
probeManager *probeManager
newAddrs chan []ma .Multiaddr
clock clock .Clock
metricsTracker MetricsTracker
mx sync .Mutex
reachableAddrs []ma .Multiaddr
unreachableAddrs []ma .Multiaddr
unknownAddrs []ma .Multiaddr
}
func newAddrsReachabilityTracker(client autonatv2Client , reachabilityUpdateCh chan struct {}, cl clock .Clock , metricsTracker MetricsTracker ) *addrsReachabilityTracker {
ctx , cancel := context .WithCancel (context .Background ())
if cl == nil {
cl = clock .New ()
}
return &addrsReachabilityTracker {
ctx : ctx ,
cancel : cancel ,
client : client ,
reachabilityUpdateCh : reachabilityUpdateCh ,
probeManager : newProbeManager (cl .Now ),
newAddrsProbeDelay : newAddrsProbeDelay ,
maxConcurrency : defaultMaxConcurrency ,
newAddrs : make (chan []ma .Multiaddr , 1 ),
clock : cl ,
metricsTracker : metricsTracker ,
}
}
func (r *addrsReachabilityTracker ) UpdateAddrs (addrs []ma .Multiaddr ) {
select {
case r .newAddrs <- slices .Clone (addrs ):
case <- r .ctx .Done ():
}
}
func (r *addrsReachabilityTracker ) ConfirmedAddrs () (reachableAddrs , unreachableAddrs , unknownAddrs []ma .Multiaddr ) {
r .mx .Lock ()
defer r .mx .Unlock ()
return slices .Clone (r .reachableAddrs ), slices .Clone (r .unreachableAddrs ), slices .Clone (r .unknownAddrs )
}
func (r *addrsReachabilityTracker ) Start () error {
r .wg .Add (1 )
go r .background ()
return nil
}
func (r *addrsReachabilityTracker ) Close () error {
r .cancel ()
r .wg .Wait ()
return nil
}
const (
defaultReachabilityRefreshInterval = 5 * time .Minute
maxBackoffInterval = 5 * time .Minute
backoffStartInterval = 5 * time .Second
)
func (r *addrsReachabilityTracker ) background () {
defer r .wg .Done ()
probeTicker := r .clock .Ticker (defaultReachabilityRefreshInterval )
defer probeTicker .Stop ()
probeTimer := r .clock .Timer (time .Duration (math .MaxInt64 ))
defer probeTimer .Stop ()
nextProbeTime := time .Time {}
var task reachabilityTask
var backoffInterval time .Duration
var currReachable , currUnreachable , currUnknown , prevReachable , prevUnreachable , prevUnknown []ma .Multiaddr
for {
select {
case <- probeTicker .C :
if task .BackoffCh == nil && nextProbeTime .IsZero () {
task = r .refreshReachability ()
}
case <- probeTimer .C :
if task .BackoffCh == nil {
task = r .refreshReachability ()
}
nextProbeTime = time .Time {}
case backoff := <- task .BackoffCh :
task = reachabilityTask {}
if backoff {
backoffInterval = newBackoffInterval (backoffInterval )
} else {
backoffInterval = -1 * time .Second
}
nextProbeTime = r .clock .Now ().Add (backoffInterval )
case addrs := <- r .newAddrs :
if task .BackoffCh != nil {
task .Cancel ()
<-task .BackoffCh
task = reachabilityTask {}
}
r .updateTrackedAddrs (addrs )
newAddrsNextTime := r .clock .Now ().Add (r .newAddrsProbeDelay )
if nextProbeTime .Before (newAddrsNextTime ) {
nextProbeTime = newAddrsNextTime
}
case <- r .ctx .Done ():
if task .BackoffCh != nil {
task .Cancel ()
<-task .BackoffCh
task = reachabilityTask {}
}
if r .metricsTracker != nil {
r .metricsTracker .ReachabilityTrackerClosed ()
}
return
}
currReachable , currUnreachable , currUnknown = r .appendConfirmedAddrs (currReachable [:0 ], currUnreachable [:0 ], currUnknown [:0 ])
if areAddrsDifferent (prevReachable , currReachable ) || areAddrsDifferent (prevUnreachable , currUnreachable ) || areAddrsDifferent (prevUnknown , currUnknown ) {
if r .metricsTracker != nil {
r .metricsTracker .ConfirmedAddrsChanged (currReachable , currUnreachable , currUnknown )
}
r .notify ()
}
prevReachable = append (prevReachable [:0 ], currReachable ...)
prevUnreachable = append (prevUnreachable [:0 ], currUnreachable ...)
prevUnknown = append (prevUnknown [:0 ], currUnknown ...)
if !nextProbeTime .IsZero () {
probeTimer .Reset (nextProbeTime .Sub (r .clock .Now ()))
}
}
}
func newBackoffInterval(current time .Duration ) time .Duration {
if current <= 0 {
return backoffStartInterval
}
current *= 2
if current > maxBackoffInterval {
return maxBackoffInterval
}
return current
}
func (r *addrsReachabilityTracker ) appendConfirmedAddrs (reachable , unreachable , unknown []ma .Multiaddr ) (reachableAddrs , unreachableAddrs , unknownAddrs []ma .Multiaddr ) {
reachable , unreachable , unknown = r .probeManager .AppendConfirmedAddrs (reachable , unreachable , unknown )
r .mx .Lock ()
r .reachableAddrs = append (r .reachableAddrs [:0 ], reachable ...)
r .unreachableAddrs = append (r .unreachableAddrs [:0 ], unreachable ...)
r .unknownAddrs = append (r .unknownAddrs [:0 ], unknown ...)
r .mx .Unlock ()
return reachable , unreachable , unknown
}
func (r *addrsReachabilityTracker ) notify () {
select {
case r .reachabilityUpdateCh <- struct {}{}:
default :
}
}
func (r *addrsReachabilityTracker ) updateTrackedAddrs (addrs []ma .Multiaddr ) {
addrs = slices .DeleteFunc (addrs , func (a ma .Multiaddr ) bool {
return !manet .IsPublicAddr (a )
})
if len (addrs ) > maxTrackedAddrs {
log .Errorf ("too many addresses (%d) for addrs reachability tracker; dropping %d" , len (addrs ), len (addrs )-maxTrackedAddrs )
addrs = addrs [:maxTrackedAddrs ]
}
r .probeManager .UpdateAddrs (addrs )
}
type probe = []autonatv2 .Request
const probeTimeout = 30 * time .Second
type reachabilityTask struct {
Cancel context .CancelFunc
BackoffCh chan bool
}
func (r *addrsReachabilityTracker ) refreshReachability () reachabilityTask {
if len (r .probeManager .GetProbe ()) == 0 {
return reachabilityTask {}
}
resCh := make (chan bool , 1 )
ctx , cancel := context .WithTimeout (r .ctx , 5 *time .Minute )
r .wg .Add (1 )
go func () {
defer r .wg .Done ()
defer cancel ()
client := &errCountingClient {autonatv2Client : r .client , MaxConsecutiveErrors : maxConsecutiveErrors }
var backoff atomic .Bool
var wg sync .WaitGroup
wg .Add (r .maxConcurrency )
for range r .maxConcurrency {
go func () {
defer wg .Done ()
for {
if ctx .Err () != nil {
return
}
reqs := r .probeManager .GetProbe ()
if len (reqs ) == 0 {
return
}
r .probeManager .MarkProbeInProgress (reqs )
rctx , cancel := context .WithTimeout (ctx , probeTimeout )
res , err := client .GetReachability (rctx , reqs )
cancel ()
r .probeManager .CompleteProbe (reqs , res , err )
if isErrorPersistent (err ) {
backoff .Store (true )
return
}
}
}()
}
wg .Wait ()
resCh <- backoff .Load ()
}()
return reachabilityTask {Cancel : cancel , BackoffCh : resCh }
}
var errTooManyConsecutiveFailures = errors .New ("too many consecutive failures" )
type errCountingClient struct {
autonatv2Client
MaxConsecutiveErrors int
mx sync .Mutex
consecutiveErrors int
}
func (c *errCountingClient ) GetReachability (ctx context .Context , reqs probe ) (autonatv2 .Result , error ) {
res , err := c .autonatv2Client .GetReachability (ctx , reqs )
c .mx .Lock ()
defer c .mx .Unlock ()
if err != nil && !errors .Is (err , context .Canceled ) {
c .consecutiveErrors ++
if c .consecutiveErrors > c .MaxConsecutiveErrors {
err = fmt .Errorf ("%w:%w" , errTooManyConsecutiveFailures , err )
}
if errors .Is (err , autonatv2 .ErrPrivateAddrs ) {
log .Errorf ("private IP addr in autonatv2 request: %s" , err )
}
} else {
c .consecutiveErrors = 0
}
return res , err
}
const maxConsecutiveErrors = 20
func isErrorPersistent(err error ) bool {
if err == nil {
return false
}
return errors .Is (err , autonatv2 .ErrPrivateAddrs ) || errors .Is (err , autonatv2 .ErrNoPeers ) ||
errors .Is (err , errTooManyConsecutiveFailures )
}
const (
recentProbeInterval = 10 * time .Minute
maxConsecutiveRefusals = 5
maxRecentDialsPerAddr = 10
targetConfidence = 3
minConfidence = 2
maxRecentDialsWindow = targetConfidence + 2
highConfidenceAddrProbeInterval = 1 * time .Hour
maxProbeResultTTL = maxRecentDialsWindow * highConfidenceAddrProbeInterval
)
type probeManager struct {
now func () time .Time
mx sync .Mutex
inProgressProbes map [string ]int
inProgressProbesTotal int
statuses map [string ]*addrStatus
addrs []ma .Multiaddr
}
func newProbeManager(now func () time .Time ) *probeManager {
return &probeManager {
statuses : make (map [string ]*addrStatus ),
inProgressProbes : make (map [string ]int ),
now : now ,
}
}
func (m *probeManager ) AppendConfirmedAddrs (reachable , unreachable , unknown []ma .Multiaddr ) (reachableAddrs , unreachableAddrs , unknownAddrs []ma .Multiaddr ) {
m .mx .Lock ()
defer m .mx .Unlock ()
for _ , a := range m .addrs {
s := m .statuses [string (a .Bytes ())]
s .RemoveBefore (m .now ().Add (-maxProbeResultTTL ))
switch s .Reachability () {
case network .ReachabilityPublic :
reachable = append (reachable , a )
case network .ReachabilityPrivate :
unreachable = append (unreachable , a )
case network .ReachabilityUnknown :
unknown = append (unknown , a )
}
}
return reachable , unreachable , unknown
}
func (m *probeManager ) UpdateAddrs (addrs []ma .Multiaddr ) {
m .mx .Lock ()
defer m .mx .Unlock ()
slices .SortFunc (addrs , func (a , b ma .Multiaddr ) int { return a .Compare (b ) })
statuses := make (map [string ]*addrStatus , len (addrs ))
for _ , addr := range addrs {
k := string (addr .Bytes ())
if _ , ok := m .statuses [k ]; !ok {
statuses [k ] = &addrStatus {Addr : addr }
} else {
statuses [k ] = m .statuses [k ]
}
}
m .addrs = addrs
m .statuses = statuses
}
func (m *probeManager ) GetProbe () probe {
m .mx .Lock ()
defer m .mx .Unlock ()
now := m .now ()
for i , a := range m .addrs {
ab := a .Bytes ()
pc := m .statuses [string (ab )].RequiredProbeCount (now )
if m .inProgressProbes [string (ab )] >= pc {
continue
}
reqs := make (probe , 0 , maxAddrsPerRequest )
reqs = append (reqs , autonatv2 .Request {Addr : a , SendDialData : true })
for j := 1 ; j < len (m .addrs ); j ++ {
k := (i + j ) % len (m .addrs )
ab := m .addrs [k ].Bytes ()
pc := m .statuses [string (ab )].RequiredProbeCount (now )
if pc == 0 {
continue
}
reqs = append (reqs , autonatv2 .Request {Addr : m .addrs [k ], SendDialData : true })
if len (reqs ) >= maxAddrsPerRequest {
break
}
}
return reqs
}
return nil
}
func (m *probeManager ) MarkProbeInProgress (reqs probe ) {
if len (reqs ) == 0 {
return
}
m .mx .Lock ()
defer m .mx .Unlock ()
m .inProgressProbes [string (reqs [0 ].Addr .Bytes ())]++
m .inProgressProbesTotal ++
}
func (m *probeManager ) InProgressProbes () int {
m .mx .Lock ()
defer m .mx .Unlock ()
return m .inProgressProbesTotal
}
func (m *probeManager ) CompleteProbe (reqs probe , res autonatv2 .Result , err error ) {
now := m .now ()
if len (reqs ) == 0 {
return
}
m .mx .Lock ()
defer m .mx .Unlock ()
primaryAddrKey := string (reqs [0 ].Addr .Bytes ())
m .inProgressProbes [primaryAddrKey ]--
if m .inProgressProbes [primaryAddrKey ] <= 0 {
delete (m .inProgressProbes , primaryAddrKey )
}
m .inProgressProbesTotal --
if err != nil {
return
}
if res .AllAddrsRefused {
if s , ok := m .statuses [primaryAddrKey ]; ok {
s .AddRefusal (now )
}
return
}
dialAddrKey := string (res .Addr .Bytes ())
if dialAddrKey != primaryAddrKey {
if s , ok := m .statuses [primaryAddrKey ]; ok {
s .AddRefusal (now )
}
}
if s , ok := m .statuses [dialAddrKey ]; ok {
s .AddOutcome (now , res .Reachability , maxRecentDialsWindow )
}
}
type dialOutcome struct {
Success bool
At time .Time
}
type addrStatus struct {
Addr ma .Multiaddr
lastRefusalTime time .Time
consecutiveRefusals int
dialTimes []time .Time
outcomes []dialOutcome
}
func (s *addrStatus ) Reachability () network .Reachability {
rch , _ , _ := s .reachabilityAndCounts ()
return rch
}
func (s *addrStatus ) RequiredProbeCount (now time .Time ) int {
if s .consecutiveRefusals >= maxConsecutiveRefusals {
if now .Sub (s .lastRefusalTime ) < recentProbeInterval {
return 0
}
s .lastRefusalTime = time .Time {}
s .consecutiveRefusals = 0
}
rd := s .recentDialCount (now )
if rd >= maxRecentDialsPerAddr {
return 0
}
return s .requiredProbeCountForConfirmation (now )
}
func (s *addrStatus ) requiredProbeCountForConfirmation (now time .Time ) int {
reachability , successes , failures := s .reachabilityAndCounts ()
confidence := successes - failures
if confidence < 0 {
confidence = -confidence
}
cnt := targetConfidence - confidence
if cnt > 0 {
return cnt
}
if len (s .outcomes ) == 0 {
return 0
}
lastOutcome := s .outcomes [len (s .outcomes )-1 ]
if now .Sub (lastOutcome .At ) > highConfidenceAddrProbeInterval {
return 1
}
switch reachability {
case network .ReachabilityPublic :
if !lastOutcome .Success {
return 1
}
case network .ReachabilityPrivate :
if lastOutcome .Success {
return 1
}
default :
return 1
}
return 0
}
func (s *addrStatus ) AddRefusal (now time .Time ) {
s .lastRefusalTime = now
s .consecutiveRefusals ++
}
func (s *addrStatus ) AddOutcome (at time .Time , rch network .Reachability , windowSize int ) {
s .lastRefusalTime = time .Time {}
s .consecutiveRefusals = 0
s .dialTimes = append (s .dialTimes , at )
for i , t := range s .dialTimes {
if at .Sub (t ) < recentProbeInterval {
s .dialTimes = slices .Delete (s .dialTimes , 0 , i )
break
}
}
s .RemoveBefore (at .Add (-maxProbeResultTTL ))
success := false
switch rch {
case network .ReachabilityPublic :
success = true
case network .ReachabilityPrivate :
success = false
default :
return
}
s .outcomes = append (s .outcomes , dialOutcome {At : at , Success : success })
if len (s .outcomes ) > windowSize {
s .outcomes = slices .Delete (s .outcomes , 0 , len (s .outcomes )-windowSize )
}
}
func (s *addrStatus ) RemoveBefore (t time .Time ) {
end := 0
for ; end < len (s .outcomes ); end ++ {
if !s .outcomes [end ].At .Before (t ) {
break
}
}
s .outcomes = slices .Delete (s .outcomes , 0 , end )
}
func (s *addrStatus ) recentDialCount (now time .Time ) int {
cnt := 0
for _ , t := range slices .Backward (s .dialTimes ) {
if now .Sub (t ) > recentProbeInterval {
break
}
cnt ++
}
return cnt
}
func (s *addrStatus ) reachabilityAndCounts () (rch network .Reachability , successes int , failures int ) {
for _ , r := range s .outcomes {
if r .Success {
successes ++
} else {
failures ++
}
}
if successes -failures >= minConfidence {
return network .ReachabilityPublic , successes , failures
}
if failures -successes >= minConfidence {
return network .ReachabilityPrivate , successes , failures
}
return network .ReachabilityUnknown , successes , failures
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .