package identify
import (
"context"
"fmt"
"net"
"slices"
"sort"
"sync"
"github.com/libp2p/go-libp2p/core/network"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var ActivationThresh = 4
var observedAddrManagerWorkerChannelSize = 16
const maxExternalThinWaistAddrsPerLocalAddr = 3
type thinWaist struct {
Addr, TW, Rest ma .Multiaddr
}
type thinWaistWithCount struct {
thinWaist
Count int
}
func thinWaistForm(a ma .Multiaddr ) (thinWaist , error ) {
i := 0
tw , rest := ma .SplitFunc (a , func (c ma .Component ) bool {
if i > 1 {
return true
}
switch i {
case 0 :
if c .Protocol ().Code == ma .P_IP4 || c .Protocol ().Code == ma .P_IP6 {
i ++
return false
}
return true
case 1 :
if c .Protocol ().Code == ma .P_TCP || c .Protocol ().Code == ma .P_UDP {
i ++
return false
}
return true
}
return false
})
if i <= 1 {
return thinWaist {}, fmt .Errorf ("not a thinwaist address: %s" , a )
}
return thinWaist {Addr : a , TW : tw , Rest : rest }, nil
}
func getObserver(a ma .Multiaddr ) (string , error ) {
ip , err := manet .ToIP (a )
if err != nil {
return "" , err
}
if ip4 := ip .To4 (); ip4 != nil {
return ip4 .String (), nil
}
return ip .Mask (net .CIDRMask (56 , 128 )).String (), nil
}
type connMultiaddrs interface {
network .ConnMultiaddrs
IsClosed() bool
}
const observerSetCacheSize = 5
type observerSet struct {
ObservedTWAddr ma .Multiaddr
ObservedBy map [string ]int
mu sync .RWMutex
cachedMultiaddrs map [string ]ma .Multiaddr
}
func (s *observerSet ) cacheMultiaddr (addr ma .Multiaddr ) ma .Multiaddr {
if addr == nil {
return s .ObservedTWAddr
}
addrStr := string (addr .Bytes ())
s .mu .RLock ()
res , ok := s .cachedMultiaddrs [addrStr ]
s .mu .RUnlock ()
if ok {
return res
}
s .mu .Lock ()
defer s .mu .Unlock ()
res , ok = s .cachedMultiaddrs [addrStr ]
if ok {
return res
}
if s .cachedMultiaddrs == nil {
s .cachedMultiaddrs = make (map [string ]ma .Multiaddr , observerSetCacheSize )
}
if len (s .cachedMultiaddrs ) == observerSetCacheSize {
for k := range s .cachedMultiaddrs {
delete (s .cachedMultiaddrs , k )
break
}
}
s .cachedMultiaddrs [addrStr ] = ma .Join (s .ObservedTWAddr , addr )
return s .cachedMultiaddrs [addrStr ]
}
type observation struct {
conn connMultiaddrs
observed ma .Multiaddr
}
type ObservedAddrManager struct {
listenAddrs func () []ma .Multiaddr
interfaceListenAddrs func () ([]ma .Multiaddr , error )
hostAddrs func () []ma .Multiaddr
normalize func (ma .Multiaddr ) ma .Multiaddr
wch chan observation
addrRecordedNotif chan struct {}
wg sync .WaitGroup
ctx context .Context
ctxCancel context .CancelFunc
mu sync .RWMutex
externalAddrs map [string ]map [string ]*observerSet
connObservedTWAddrs map [connMultiaddrs ]ma .Multiaddr
localAddrs map [string ]*thinWaistWithCount
}
func NewObservedAddrManager (listenAddrs , hostAddrs func () []ma .Multiaddr ,
interfaceListenAddrs func () ([]ma .Multiaddr , error ), normalize func (ma .Multiaddr ) ma .Multiaddr ) (*ObservedAddrManager , error ) {
if normalize == nil {
normalize = func (addr ma .Multiaddr ) ma .Multiaddr { return addr }
}
o := &ObservedAddrManager {
externalAddrs : make (map [string ]map [string ]*observerSet ),
connObservedTWAddrs : make (map [connMultiaddrs ]ma .Multiaddr ),
localAddrs : make (map [string ]*thinWaistWithCount ),
wch : make (chan observation , observedAddrManagerWorkerChannelSize ),
addrRecordedNotif : make (chan struct {}, 1 ),
listenAddrs : listenAddrs ,
interfaceListenAddrs : interfaceListenAddrs ,
hostAddrs : hostAddrs ,
normalize : normalize ,
}
o .ctx , o .ctxCancel = context .WithCancel (context .Background ())
o .wg .Add (1 )
go o .worker ()
return o , nil
}
func (o *ObservedAddrManager ) AddrsFor (addr ma .Multiaddr ) (addrs []ma .Multiaddr ) {
if addr == nil {
return nil
}
o .mu .RLock ()
defer o .mu .RUnlock ()
tw , err := thinWaistForm (o .normalize (addr ))
if err != nil {
return nil
}
observerSets := o .getTopExternalAddrs (string (tw .TW .Bytes ()))
res := make ([]ma .Multiaddr , 0 , len (observerSets ))
for _ , s := range observerSets {
res = append (res , s .cacheMultiaddr (tw .Rest ))
}
return res
}
func (o *ObservedAddrManager ) appendInferredAddrs (twToObserverSets map [string ][]*observerSet , addrs []ma .Multiaddr ) []ma .Multiaddr {
if twToObserverSets == nil {
twToObserverSets = make (map [string ][]*observerSet )
for localTWStr := range o .externalAddrs {
twToObserverSets [localTWStr ] = append (twToObserverSets [localTWStr ], o .getTopExternalAddrs (localTWStr )...)
}
}
lAddrs , err := o .interfaceListenAddrs ()
if err != nil {
log .Warnw ("failed to get interface resolved listen addrs. Using just the listen addrs" , "error" , err )
lAddrs = nil
}
lAddrs = append (lAddrs , o .listenAddrs ()...)
seenTWs := make (map [string ]struct {})
for _ , a := range lAddrs {
if _ , ok := o .localAddrs [string (a .Bytes ())]; ok {
continue
}
if _ , ok := seenTWs [string (a .Bytes ())]; ok {
continue
}
seenTWs [string (a .Bytes ())] = struct {}{}
a = o .normalize (a )
t , err := thinWaistForm (a )
if err != nil {
continue
}
for _ , s := range twToObserverSets [string (t .TW .Bytes ())] {
addrs = append (addrs , s .cacheMultiaddr (t .Rest ))
}
}
return addrs
}
func (o *ObservedAddrManager ) Addrs () []ma .Multiaddr {
o .mu .RLock ()
defer o .mu .RUnlock ()
m := make (map [string ][]*observerSet )
for localTWStr := range o .externalAddrs {
m [localTWStr ] = append (m [localTWStr ], o .getTopExternalAddrs (localTWStr )...)
}
addrs := make ([]ma .Multiaddr , 0 , maxExternalThinWaistAddrsPerLocalAddr *5 )
for _ , t := range o .localAddrs {
for _ , s := range m [string (t .TW .Bytes ())] {
addrs = append (addrs , s .cacheMultiaddr (t .Rest ))
}
}
addrs = o .appendInferredAddrs (m , addrs )
return addrs
}
func (o *ObservedAddrManager ) getTopExternalAddrs (localTWStr string ) []*observerSet {
observerSets := make ([]*observerSet , 0 , len (o .externalAddrs [localTWStr ]))
for _ , v := range o .externalAddrs [localTWStr ] {
if len (v .ObservedBy ) >= ActivationThresh {
observerSets = append (observerSets , v )
}
}
slices .SortFunc (observerSets , func (a , b *observerSet ) int {
diff := len (b .ObservedBy ) - len (a .ObservedBy )
if diff != 0 {
return diff
}
as := a .ObservedTWAddr .String ()
bs := b .ObservedTWAddr .String ()
if as < bs {
return -1
} else if as > bs {
return 1
} else {
return 0
}
})
n := len (observerSets )
if n > maxExternalThinWaistAddrsPerLocalAddr {
n = maxExternalThinWaistAddrsPerLocalAddr
}
return observerSets [:n ]
}
func (o *ObservedAddrManager ) Record (conn connMultiaddrs , observed ma .Multiaddr ) {
select {
case o .wch <- observation {
conn : conn ,
observed : observed ,
}:
default :
log .Debugw ("dropping address observation due to full buffer" ,
"from" , conn .RemoteMultiaddr (),
"observed" , observed ,
)
}
}
func (o *ObservedAddrManager ) worker () {
defer o .wg .Done ()
for {
select {
case obs := <- o .wch :
o .maybeRecordObservation (obs .conn , obs .observed )
case <- o .ctx .Done ():
return
}
}
}
func isRelayedAddress(a ma .Multiaddr ) bool {
_ , err := a .ValueForProtocol (ma .P_CIRCUIT )
return err == nil
}
func (o *ObservedAddrManager ) shouldRecordObservation (conn connMultiaddrs , observed ma .Multiaddr ) (shouldRecord bool , localTW thinWaist , observedTW thinWaist ) {
if conn == nil || observed == nil {
return false , thinWaist {}, thinWaist {}
}
if manet .IsIPLoopback (observed ) {
return false , thinWaist {}, thinWaist {}
}
if manet .IsNAT64IPv4ConvertedIPv6Addr (observed ) {
return false , thinWaist {}, thinWaist {}
}
if isRelayedAddress (observed ) {
return false , thinWaist {}, thinWaist {}
}
ifaceaddrs , err := o .interfaceListenAddrs ()
if err != nil {
log .Infof ("failed to get interface listen addrs" , err )
return false , thinWaist {}, thinWaist {}
}
for i , a := range ifaceaddrs {
ifaceaddrs [i ] = o .normalize (a )
}
local := o .normalize (conn .LocalMultiaddr ())
listenAddrs := o .listenAddrs ()
for i , a := range listenAddrs {
listenAddrs [i ] = o .normalize (a )
}
if !ma .Contains (ifaceaddrs , local ) && !ma .Contains (listenAddrs , local ) {
return false , thinWaist {}, thinWaist {}
}
localTW , err = thinWaistForm (local )
if err != nil {
return false , thinWaist {}, thinWaist {}
}
observedTW , err = thinWaistForm (o .normalize (observed ))
if err != nil {
return false , thinWaist {}, thinWaist {}
}
hostAddrs := o .hostAddrs ()
for i , a := range hostAddrs {
hostAddrs [i ] = o .normalize (a )
}
if !HasConsistentTransport (observed , hostAddrs ) &&
!HasConsistentTransport (observed , listenAddrs ) {
log .Debugw (
"observed multiaddr doesn't match the transports of any announced addresses" ,
"from" , conn .RemoteMultiaddr (),
"observed" , observed ,
)
return false , thinWaist {}, thinWaist {}
}
return true , localTW , observedTW
}
func (o *ObservedAddrManager ) maybeRecordObservation (conn connMultiaddrs , observed ma .Multiaddr ) {
shouldRecord , localTW , observedTW := o .shouldRecordObservation (conn , observed )
if !shouldRecord {
return
}
log .Debugw ("added own observed listen addr" , "conn" , conn , "observed" , observed )
o .mu .Lock ()
defer o .mu .Unlock ()
o .recordObservationUnlocked (conn , localTW , observedTW )
select {
case o .addrRecordedNotif <- struct {}{}:
default :
}
}
func (o *ObservedAddrManager ) recordObservationUnlocked (conn connMultiaddrs , localTW , observedTW thinWaist ) {
if conn .IsClosed () {
return
}
localTWStr := string (localTW .TW .Bytes ())
observedTWStr := string (observedTW .TW .Bytes ())
observer , err := getObserver (conn .RemoteMultiaddr ())
if err != nil {
return
}
prevObservedTWAddr , ok := o .connObservedTWAddrs [conn ]
if !ok {
t , ok := o .localAddrs [string (localTW .Addr .Bytes ())]
if !ok {
t = &thinWaistWithCount {
thinWaist : localTW ,
}
o .localAddrs [string (localTW .Addr .Bytes ())] = t
}
t .Count ++
} else {
if prevObservedTWAddr .Equal (observedTW .TW ) {
return
}
o .removeExternalAddrsUnlocked (observer , localTWStr , string (prevObservedTWAddr .Bytes ()))
}
o .connObservedTWAddrs [conn ] = observedTW .TW
o .addExternalAddrsUnlocked (observedTW .TW , observer , localTWStr , observedTWStr )
}
func (o *ObservedAddrManager ) removeExternalAddrsUnlocked (observer , localTWStr , observedTWStr string ) {
s , ok := o .externalAddrs [localTWStr ][observedTWStr ]
if !ok {
return
}
s .ObservedBy [observer ]--
if s .ObservedBy [observer ] <= 0 {
delete (s .ObservedBy , observer )
}
if len (s .ObservedBy ) == 0 {
delete (o .externalAddrs [localTWStr ], observedTWStr )
}
if len (o .externalAddrs [localTWStr ]) == 0 {
delete (o .externalAddrs , localTWStr )
}
}
func (o *ObservedAddrManager ) addExternalAddrsUnlocked (observedTWAddr ma .Multiaddr , observer , localTWStr , observedTWStr string ) {
s , ok := o .externalAddrs [localTWStr ][observedTWStr ]
if !ok {
s = &observerSet {
ObservedTWAddr : observedTWAddr ,
ObservedBy : make (map [string ]int ),
}
if _ , ok := o .externalAddrs [localTWStr ]; !ok {
o .externalAddrs [localTWStr ] = make (map [string ]*observerSet )
}
o .externalAddrs [localTWStr ][observedTWStr ] = s
}
s .ObservedBy [observer ]++
}
func (o *ObservedAddrManager ) removeConn (conn connMultiaddrs ) {
if conn == nil {
return
}
o .mu .Lock ()
defer o .mu .Unlock ()
observedTWAddr , ok := o .connObservedTWAddrs [conn ]
if !ok {
return
}
delete (o .connObservedTWAddrs , conn )
localTW , err := thinWaistForm (o .normalize (conn .LocalMultiaddr ()))
if err != nil {
return
}
t , ok := o .localAddrs [string (localTW .Addr .Bytes ())]
if !ok {
return
}
t .Count --
if t .Count <= 0 {
delete (o .localAddrs , string (localTW .Addr .Bytes ()))
}
observer , err := getObserver (conn .RemoteMultiaddr ())
if err != nil {
return
}
o .removeExternalAddrsUnlocked (observer , string (localTW .TW .Bytes ()), string (observedTWAddr .Bytes ()))
select {
case o .addrRecordedNotif <- struct {}{}:
default :
}
}
func (o *ObservedAddrManager ) getNATType () (tcpNATType , udpNATType network .NATDeviceType ) {
o .mu .RLock ()
defer o .mu .RUnlock ()
var tcpCounts , udpCounts []int
var tcpTotal , udpTotal int
for _ , m := range o .externalAddrs {
isTCP := false
for _ , v := range m {
if _ , err := v .ObservedTWAddr .ValueForProtocol (ma .P_TCP ); err == nil {
isTCP = true
}
break
}
for _ , v := range m {
if isTCP {
tcpCounts = append (tcpCounts , len (v .ObservedBy ))
tcpTotal += len (v .ObservedBy )
} else {
udpCounts = append (udpCounts , len (v .ObservedBy ))
udpTotal += len (v .ObservedBy )
}
}
}
sort .Sort (sort .Reverse (sort .IntSlice (tcpCounts )))
sort .Sort (sort .Reverse (sort .IntSlice (udpCounts )))
tcpTopCounts , udpTopCounts := 0 , 0
for i := 0 ; i < maxExternalThinWaistAddrsPerLocalAddr && i < len (tcpCounts ); i ++ {
tcpTopCounts += tcpCounts [i ]
}
for i := 0 ; i < maxExternalThinWaistAddrsPerLocalAddr && i < len (udpCounts ); i ++ {
udpTopCounts += udpCounts [i ]
}
if tcpTotal >= 3 *maxExternalThinWaistAddrsPerLocalAddr {
if tcpTopCounts >= tcpTotal /2 {
tcpNATType = network .NATDeviceTypeCone
} else {
tcpNATType = network .NATDeviceTypeSymmetric
}
}
if udpTotal >= 3 *maxExternalThinWaistAddrsPerLocalAddr {
if udpTopCounts >= udpTotal /2 {
udpNATType = network .NATDeviceTypeCone
} else {
udpNATType = network .NATDeviceTypeSymmetric
}
}
return
}
func (o *ObservedAddrManager ) Close () error {
o .ctxCancel ()
o .wg .Wait ()
return nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .