package backoff
import (
"context"
"fmt"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
)
type BackoffDiscovery struct {
disc discovery .Discovery
stratFactory BackoffFactory
peerCache map [string ]*backoffCache
peerCacheMux sync .RWMutex
parallelBufSz int
returnedBufSz int
clock clock
}
type BackoffDiscoveryOption func (*BackoffDiscovery ) error
func NewBackoffDiscovery (disc discovery .Discovery , stratFactory BackoffFactory , opts ...BackoffDiscoveryOption ) (discovery .Discovery , error ) {
b := &BackoffDiscovery {
disc : disc ,
stratFactory : stratFactory ,
peerCache : make (map [string ]*backoffCache ),
parallelBufSz : 32 ,
returnedBufSz : 32 ,
clock : realClock {},
}
for _ , opt := range opts {
if err := opt (b ); err != nil {
return nil , err
}
}
return b , nil
}
func WithBackoffDiscoverySimultaneousQueryBufferSize (size int ) BackoffDiscoveryOption {
return func (b *BackoffDiscovery ) error {
if size < 0 {
return fmt .Errorf ("cannot set size to be smaller than 0" )
}
b .parallelBufSz = size
return nil
}
}
func WithBackoffDiscoveryReturnedChannelSize (size int ) BackoffDiscoveryOption {
return func (b *BackoffDiscovery ) error {
if size < 0 {
return fmt .Errorf ("cannot set size to be smaller than 0" )
}
b .returnedBufSz = size
return nil
}
}
type clock interface {
Now() time .Time
}
type realClock struct {}
func (c realClock ) Now () time .Time {
return time .Now ()
}
type backoffCache struct {
strat BackoffStrategy
mux sync .Mutex
nextDiscover time .Time
prevPeers map [peer .ID ]peer .AddrInfo
peers map [peer .ID ]peer .AddrInfo
sendingChs map [chan peer .AddrInfo ]int
ongoing bool
clock clock
}
func (d *BackoffDiscovery ) Advertise (ctx context .Context , ns string , opts ...discovery .Option ) (time .Duration , error ) {
return d .disc .Advertise (ctx , ns , opts ...)
}
func (d *BackoffDiscovery ) FindPeers (ctx context .Context , ns string , opts ...discovery .Option ) (<-chan peer .AddrInfo , error ) {
var options discovery .Options
err := options .Apply (opts ...)
if err != nil {
return nil , err
}
d .peerCacheMux .RLock ()
c , ok := d .peerCache [ns ]
d .peerCacheMux .RUnlock ()
if !ok {
pc := &backoffCache {
nextDiscover : time .Time {},
prevPeers : make (map [peer .ID ]peer .AddrInfo ),
peers : make (map [peer .ID ]peer .AddrInfo ),
sendingChs : make (map [chan peer .AddrInfo ]int ),
strat : d .stratFactory (),
clock : d .clock ,
}
d .peerCacheMux .Lock ()
c , ok = d .peerCache [ns ]
if !ok {
d .peerCache [ns ] = pc
c = pc
}
d .peerCacheMux .Unlock ()
}
c .mux .Lock ()
defer c .mux .Unlock ()
timeExpired := d .clock .Now ().After (c .nextDiscover )
if !(timeExpired || c .ongoing ) {
chLen := options .Limit
if chLen == 0 {
chLen = len (c .prevPeers )
} else if chLen > len (c .prevPeers ) {
chLen = len (c .prevPeers )
}
pch := make (chan peer .AddrInfo , chLen )
for _ , ai := range c .prevPeers {
select {
case pch <- ai :
default :
}
}
close (pch )
return pch , nil
}
if !c .ongoing {
pch , err := d .disc .FindPeers (ctx , ns , opts ...)
if err != nil {
return nil , err
}
c .ongoing = true
go findPeerDispatcher (ctx , c , pch )
}
evtCh := make (chan peer .AddrInfo , d .parallelBufSz )
pch := make (chan peer .AddrInfo , d .returnedBufSz )
rcvPeers := make ([]peer .AddrInfo , 0 , 32 )
for _ , ai := range c .peers {
rcvPeers = append (rcvPeers , ai )
}
c .sendingChs [evtCh ] = options .Limit
go findPeerReceiver (ctx , pch , evtCh , rcvPeers )
return pch , nil
}
func findPeerDispatcher(ctx context .Context , c *backoffCache , pch <-chan peer .AddrInfo ) {
defer func () {
c .mux .Lock ()
if checkUpdates (c .prevPeers , c .peers ) {
c .strat .Reset ()
c .prevPeers = c .peers
}
c .nextDiscover = c .clock .Now ().Add (c .strat .Delay ())
c .ongoing = false
c .peers = make (map [peer .ID ]peer .AddrInfo )
for ch := range c .sendingChs {
close (ch )
}
c .sendingChs = make (map [chan peer .AddrInfo ]int )
c .mux .Unlock ()
}()
for {
select {
case ai , ok := <- pch :
if !ok {
return
}
c .mux .Lock ()
var sendAi peer .AddrInfo
if prevAi , ok := c .peers [ai .ID ]; ok {
if combinedAi := mergeAddrInfos (prevAi , ai ); combinedAi != nil {
sendAi = *combinedAi
} else {
c .mux .Unlock ()
continue
}
} else {
sendAi = ai
}
c .peers [ai .ID ] = sendAi
for ch , rem := range c .sendingChs {
if rem > 0 {
ch <- sendAi
c .sendingChs [ch ] = rem - 1
}
}
c .mux .Unlock ()
case <- ctx .Done ():
return
}
}
}
func findPeerReceiver(ctx context .Context , pch , evtCh chan peer .AddrInfo , rcvPeers []peer .AddrInfo ) {
defer close (pch )
for {
select {
case ai , ok := <- evtCh :
if ok {
rcvPeers = append (rcvPeers , ai )
sentAll := true
sendPeers :
for i , p := range rcvPeers {
select {
case pch <- p :
default :
rcvPeers = rcvPeers [i :]
sentAll = false
break sendPeers
}
}
if sentAll {
rcvPeers = []peer .AddrInfo {}
}
} else {
for _ , p := range rcvPeers {
select {
case pch <- p :
case <- ctx .Done ():
return
}
}
return
}
case <- ctx .Done ():
return
}
}
}
func mergeAddrInfos(prevAi , newAi peer .AddrInfo ) *peer .AddrInfo {
seen := make (map [string ]struct {}, len (prevAi .Addrs ))
combinedAddrs := make ([]ma .Multiaddr , 0 , len (prevAi .Addrs ))
addAddrs := func (addrs []ma .Multiaddr ) {
for _ , addr := range addrs {
if _ , ok := seen [addr .String ()]; ok {
continue
}
seen [addr .String ()] = struct {}{}
combinedAddrs = append (combinedAddrs , addr )
}
}
addAddrs (prevAi .Addrs )
addAddrs (newAi .Addrs )
if len (combinedAddrs ) > len (prevAi .Addrs ) {
combinedAi := &peer .AddrInfo {ID : prevAi .ID , Addrs : combinedAddrs }
return combinedAi
}
return nil
}
func checkUpdates(orig , update map [peer .ID ]peer .AddrInfo ) bool {
if len (orig ) != len (update ) {
return true
}
for p , ai := range update {
if prevAi , ok := orig [p ]; ok {
if combinedAi := mergeAddrInfos (prevAi , ai ); combinedAi != nil {
return true
}
} else {
return true
}
}
return false
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .