package pubsub
import (
"context"
"math"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
const (
RandomSubID = protocol .ID ("/randomsub/1.0.0" )
)
var (
RandomSubD = 6
)
func NewRandomSub (ctx context .Context , h host .Host , size int , opts ...Option ) (*PubSub , error ) {
rt := &RandomSubRouter {
size : size ,
peers : make (map [peer .ID ]protocol .ID ),
}
return NewPubSub (ctx , h , rt , opts ...)
}
type RandomSubRouter struct {
p *PubSub
peers map [peer .ID ]protocol .ID
size int
tracer *pubsubTracer
}
func (rs *RandomSubRouter ) Protocols () []protocol .ID {
return []protocol .ID {RandomSubID , FloodSubID }
}
func (rs *RandomSubRouter ) Attach (p *PubSub ) {
rs .p = p
rs .tracer = p .tracer
}
func (rs *RandomSubRouter ) AddPeer (p peer .ID , proto protocol .ID ) {
rs .tracer .AddPeer (p , proto )
rs .peers [p ] = proto
}
func (rs *RandomSubRouter ) RemovePeer (p peer .ID ) {
rs .tracer .RemovePeer (p )
delete (rs .peers , p )
}
func (rs *RandomSubRouter ) EnoughPeers (topic string , suggested int ) bool {
tmap , ok := rs .p .topics [topic ]
if !ok {
return false
}
fsPeers := 0
rsPeers := 0
for p := range tmap {
switch rs .peers [p ] {
case FloodSubID :
fsPeers ++
case RandomSubID :
rsPeers ++
}
}
if suggested == 0 {
suggested = RandomSubD
}
if fsPeers +rsPeers >= suggested {
return true
}
if rsPeers >= RandomSubD {
return true
}
return false
}
func (rs *RandomSubRouter ) AcceptFrom (peer .ID ) AcceptStatus {
return AcceptAll
}
func (rs *RandomSubRouter ) PreValidation ([]*Message ) {}
func (rs *RandomSubRouter ) HandleRPC (rpc *RPC ) {}
func (rs *RandomSubRouter ) Publish (msg *Message ) {
from := msg .ReceivedFrom
tosend := make (map [peer .ID ]struct {})
rspeers := make (map [peer .ID ]struct {})
src := peer .ID (msg .GetFrom ())
topic := msg .GetTopic ()
tmap , ok := rs .p .topics [topic ]
if !ok {
return
}
for p := range tmap {
if p == from || p == src {
continue
}
if rs .peers [p ] == FloodSubID {
tosend [p ] = struct {}{}
} else {
rspeers [p ] = struct {}{}
}
}
if len (rspeers ) > RandomSubD {
target := RandomSubD
sqrt := int (math .Ceil (math .Sqrt (float64 (rs .size ))))
if sqrt > target {
target = sqrt
}
if target > len (rspeers ) {
target = len (rspeers )
}
xpeers := peerMapToList (rspeers )
shufflePeers (xpeers )
xpeers = xpeers [:target ]
for _ , p := range xpeers {
tosend [p ] = struct {}{}
}
} else {
for p := range rspeers {
tosend [p ] = struct {}{}
}
}
out := rpcWithMessages (msg .Message )
for p := range tosend {
q , ok := rs .p .peers [p ]
if !ok {
continue
}
err := q .Push (out , false )
if err != nil {
log .Infof ("dropping message to peer %s: queue full" , p )
rs .tracer .DropRPC (out , p )
continue
}
rs .tracer .SendRPC (out , p )
}
}
func (rs *RandomSubRouter ) Join (topic string ) {
rs .tracer .Join (topic )
}
func (rs *RandomSubRouter ) Leave (topic string ) {
rs .tracer .Join (topic )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .