package pubsub
import (
"context"
"crypto/sha256"
"fmt"
"io"
"math/rand"
"sort"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/record"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"go.uber.org/zap/zapcore"
)
const (
GossipSubID_v10 = protocol .ID ("/meshsub/1.0.0" )
GossipSubID_v11 = protocol .ID ("/meshsub/1.1.0" )
GossipSubID_v12 = protocol .ID ("/meshsub/1.2.0" )
)
var (
GossipSubD = 6
GossipSubDlo = 5
GossipSubDhi = 12
GossipSubDscore = 4
GossipSubDout = 2
GossipSubHistoryLength = 5
GossipSubHistoryGossip = 3
GossipSubDlazy = 6
GossipSubGossipFactor = 0.25
GossipSubGossipRetransmission = 3
GossipSubHeartbeatInitialDelay = 100 * time .Millisecond
GossipSubHeartbeatInterval = 1 * time .Second
GossipSubFanoutTTL = 60 * time .Second
GossipSubPrunePeers = 16
GossipSubPruneBackoff = time .Minute
GossipSubUnsubscribeBackoff = 10 * time .Second
GossipSubConnectors = 8
GossipSubMaxPendingConnections = 128
GossipSubConnectionTimeout = 30 * time .Second
GossipSubDirectConnectTicks uint64 = 300
GossipSubDirectConnectInitialDelay = time .Second
GossipSubOpportunisticGraftTicks uint64 = 60
GossipSubOpportunisticGraftPeers = 2
GossipSubGraftFloodThreshold = 10 * time .Second
GossipSubMaxIHaveLength = 5000
GossipSubMaxIHaveMessages = 10
GossipSubMaxIDontWantLength = 10
GossipSubMaxIDontWantMessages = 1000
GossipSubIWantFollowupTime = 3 * time .Second
GossipSubIDontWantMessageThreshold = 1024
GossipSubIDontWantMessageTTL = 3
)
type checksum struct {
payload [32 ]byte
length uint8
}
type GossipSubParams struct {
D int
Dlo int
Dhi int
Dscore int
Dout int
HistoryLength int
HistoryGossip int
Dlazy int
GossipFactor float64
GossipRetransmission int
HeartbeatInitialDelay time .Duration
HeartbeatInterval time .Duration
SlowHeartbeatWarning float64
FanoutTTL time .Duration
PrunePeers int
PruneBackoff time .Duration
UnsubscribeBackoff time .Duration
Connectors int
MaxPendingConnections int
ConnectionTimeout time .Duration
DirectConnectTicks uint64
DirectConnectInitialDelay time .Duration
OpportunisticGraftTicks uint64
OpportunisticGraftPeers int
GraftFloodThreshold time .Duration
MaxIHaveLength int
MaxIHaveMessages int
MaxIDontWantLength int
MaxIDontWantMessages int
IWantFollowupTime time .Duration
IDontWantMessageThreshold int
IDontWantMessageTTL int
}
func NewGossipSub (ctx context .Context , h host .Host , opts ...Option ) (*PubSub , error ) {
rt := DefaultGossipSubRouter (h )
opts = append (opts , WithRawTracer (rt .tagTracer ))
return NewGossipSubWithRouter (ctx , h , rt , opts ...)
}
func NewGossipSubWithRouter (ctx context .Context , h host .Host , rt PubSubRouter , opts ...Option ) (*PubSub , error ) {
return NewPubSub (ctx , h , rt , opts ...)
}
func DefaultGossipSubRouter (h host .Host ) *GossipSubRouter {
params := DefaultGossipSubParams ()
return &GossipSubRouter {
peers : make (map [peer .ID ]protocol .ID ),
mesh : make (map [string ]map [peer .ID ]struct {}),
fanout : make (map [string ]map [peer .ID ]struct {}),
lastpub : make (map [string ]int64 ),
gossip : make (map [peer .ID ][]*pb .ControlIHave ),
control : make (map [peer .ID ]*pb .ControlMessage ),
backoff : make (map [string ]map [peer .ID ]time .Time ),
peerhave : make (map [peer .ID ]int ),
peerdontwant : make (map [peer .ID ]int ),
unwanted : make (map [peer .ID ]map [checksum ]int ),
iasked : make (map [peer .ID ]int ),
outbound : make (map [peer .ID ]bool ),
connect : make (chan connectInfo , params .MaxPendingConnections ),
cab : pstoremem .NewAddrBook (),
mcache : NewMessageCache (params .HistoryGossip , params .HistoryLength ),
protos : GossipSubDefaultProtocols ,
feature : GossipSubDefaultFeatures ,
tagTracer : newTagTracer (h .ConnManager ()),
params : params ,
}
}
func DefaultGossipSubParams () GossipSubParams {
return GossipSubParams {
D : GossipSubD ,
Dlo : GossipSubDlo ,
Dhi : GossipSubDhi ,
Dscore : GossipSubDscore ,
Dout : GossipSubDout ,
HistoryLength : GossipSubHistoryLength ,
HistoryGossip : GossipSubHistoryGossip ,
Dlazy : GossipSubDlazy ,
GossipFactor : GossipSubGossipFactor ,
GossipRetransmission : GossipSubGossipRetransmission ,
HeartbeatInitialDelay : GossipSubHeartbeatInitialDelay ,
HeartbeatInterval : GossipSubHeartbeatInterval ,
FanoutTTL : GossipSubFanoutTTL ,
PrunePeers : GossipSubPrunePeers ,
PruneBackoff : GossipSubPruneBackoff ,
UnsubscribeBackoff : GossipSubUnsubscribeBackoff ,
Connectors : GossipSubConnectors ,
MaxPendingConnections : GossipSubMaxPendingConnections ,
ConnectionTimeout : GossipSubConnectionTimeout ,
DirectConnectTicks : GossipSubDirectConnectTicks ,
DirectConnectInitialDelay : GossipSubDirectConnectInitialDelay ,
OpportunisticGraftTicks : GossipSubOpportunisticGraftTicks ,
OpportunisticGraftPeers : GossipSubOpportunisticGraftPeers ,
GraftFloodThreshold : GossipSubGraftFloodThreshold ,
MaxIHaveLength : GossipSubMaxIHaveLength ,
MaxIHaveMessages : GossipSubMaxIHaveMessages ,
MaxIDontWantLength : GossipSubMaxIDontWantLength ,
MaxIDontWantMessages : GossipSubMaxIDontWantMessages ,
IWantFollowupTime : GossipSubIWantFollowupTime ,
IDontWantMessageThreshold : GossipSubIDontWantMessageThreshold ,
IDontWantMessageTTL : GossipSubIDontWantMessageTTL ,
SlowHeartbeatWarning : 0.1 ,
}
}
func WithPeerScore (params *PeerScoreParams , thresholds *PeerScoreThresholds ) Option {
return func (ps *PubSub ) error {
gs , ok := ps .rt .(*GossipSubRouter )
if !ok {
return fmt .Errorf ("pubsub router is not gossipsub" )
}
err := params .validate ()
if err != nil {
return err
}
err = thresholds .validate ()
if err != nil {
return err
}
gs .score = newPeerScore (params )
gs .gossipThreshold = thresholds .GossipThreshold
gs .publishThreshold = thresholds .PublishThreshold
gs .graylistThreshold = thresholds .GraylistThreshold
gs .acceptPXThreshold = thresholds .AcceptPXThreshold
gs .opportunisticGraftThreshold = thresholds .OpportunisticGraftThreshold
gs .gossipTracer = newGossipTracer ()
if ps .tracer != nil {
ps .tracer .raw = append (ps .tracer .raw , gs .score , gs .gossipTracer )
} else {
ps .tracer = &pubsubTracer {
raw : []RawTracer {gs .score , gs .gossipTracer },
pid : ps .host .ID (),
idGen : ps .idGen ,
}
}
return nil
}
}
func WithFloodPublish (floodPublish bool ) Option {
return func (ps *PubSub ) error {
gs , ok := ps .rt .(*GossipSubRouter )
if !ok {
return fmt .Errorf ("pubsub router is not gossipsub" )
}
gs .floodPublish = floodPublish
return nil
}
}
func WithPeerExchange (doPX bool ) Option {
return func (ps *PubSub ) error {
gs , ok := ps .rt .(*GossipSubRouter )
if !ok {
return fmt .Errorf ("pubsub router is not gossipsub" )
}
gs .doPX = doPX
return nil
}
}
func WithDirectPeers (pis []peer .AddrInfo ) Option {
return func (ps *PubSub ) error {
gs , ok := ps .rt .(*GossipSubRouter )
if !ok {
return fmt .Errorf ("pubsub router is not gossipsub" )
}
direct := make (map [peer .ID ]struct {})
for _ , pi := range pis {
direct [pi .ID ] = struct {}{}
ps .host .Peerstore ().AddAddrs (pi .ID , pi .Addrs , peerstore .PermanentAddrTTL )
}
gs .direct = direct
if gs .tagTracer != nil {
gs .tagTracer .direct = direct
}
return nil
}
}
func WithDirectConnectTicks (t uint64 ) Option {
return func (ps *PubSub ) error {
gs , ok := ps .rt .(*GossipSubRouter )
if !ok {
return fmt .Errorf ("pubsub router is not gossipsub" )
}
gs .params .DirectConnectTicks = t
return nil
}
}
func WithGossipSubParams (cfg GossipSubParams ) Option {
return func (ps *PubSub ) error {
gs , ok := ps .rt .(*GossipSubRouter )
if !ok {
return fmt .Errorf ("pubsub router is not gossipsub" )
}
gs .params = cfg
gs .connect = make (chan connectInfo , cfg .MaxPendingConnections )
gs .mcache = NewMessageCache (cfg .HistoryGossip , cfg .HistoryLength )
return nil
}
}
type GossipSubRouter struct {
p *PubSub
peers map [peer .ID ]protocol .ID
direct map [peer .ID ]struct {}
mesh map [string ]map [peer .ID ]struct {}
fanout map [string ]map [peer .ID ]struct {}
lastpub map [string ]int64
gossip map [peer .ID ][]*pb .ControlIHave
control map [peer .ID ]*pb .ControlMessage
peerhave map [peer .ID ]int
peerdontwant map [peer .ID ]int
unwanted map [peer .ID ]map [checksum ]int
iasked map [peer .ID ]int
outbound map [peer .ID ]bool
backoff map [string ]map [peer .ID ]time .Time
connect chan connectInfo
cab peerstore .AddrBook
protos []protocol .ID
feature GossipSubFeatureTest
mcache *MessageCache
tracer *pubsubTracer
score *peerScore
gossipTracer *gossipTracer
tagTracer *tagTracer
gate *peerGater
params GossipSubParams
doPX bool
acceptPXThreshold float64
gossipThreshold float64
publishThreshold float64
graylistThreshold float64
opportunisticGraftThreshold float64
floodPublish bool
heartbeatTicks uint64
}
type connectInfo struct {
p peer .ID
spr *record .Envelope
}
func (gs *GossipSubRouter ) Protocols () []protocol .ID {
return gs .protos
}
func (gs *GossipSubRouter ) Attach (p *PubSub ) {
gs .p = p
gs .tracer = p .tracer
gs .score .Start (gs )
gs .gossipTracer .Start (gs )
gs .tagTracer .Start (gs )
gs .mcache .SetMsgIdFn (p .idGen .ID )
go gs .heartbeatTimer ()
for i := 0 ; i < gs .params .Connectors ; i ++ {
go gs .connector ()
}
go gs .manageAddrBook ()
if len (gs .direct ) > 0 {
go func () {
if gs .params .DirectConnectInitialDelay > 0 {
time .Sleep (gs .params .DirectConnectInitialDelay )
}
for p := range gs .direct {
gs .connect <- connectInfo {p : p }
}
}()
}
}
func (gs *GossipSubRouter ) manageAddrBook () {
sub , err := gs .p .host .EventBus ().Subscribe ([]interface {}{
&event .EvtPeerIdentificationCompleted {},
&event .EvtPeerConnectednessChanged {},
})
if err != nil {
log .Errorf ("failed to subscribe to peer identification events: %v" , err )
return
}
defer sub .Close ()
for {
select {
case <- gs .p .ctx .Done ():
cabCloser , ok := gs .cab .(io .Closer )
if ok {
errClose := cabCloser .Close ()
if errClose != nil {
log .Warnf ("failed to close addr book: %v" , errClose )
}
}
return
case ev := <- sub .Out ():
switch ev := ev .(type ) {
case event .EvtPeerIdentificationCompleted :
if ev .SignedPeerRecord != nil {
cab , ok := peerstore .GetCertifiedAddrBook (gs .cab )
if ok {
ttl := peerstore .RecentlyConnectedAddrTTL
if gs .p .host .Network ().Connectedness (ev .Peer ) == network .Connected {
ttl = peerstore .ConnectedAddrTTL
}
_ , err := cab .ConsumePeerRecord (ev .SignedPeerRecord , ttl )
if err != nil {
log .Warnf ("failed to consume signed peer record: %v" , err )
}
}
}
case event .EvtPeerConnectednessChanged :
if ev .Connectedness != network .Connected {
gs .cab .UpdateAddrs (ev .Peer , peerstore .ConnectedAddrTTL , peerstore .RecentlyConnectedAddrTTL )
}
}
}
}
}
func (gs *GossipSubRouter ) AddPeer (p peer .ID , proto protocol .ID ) {
log .Debugf ("PEERUP: Add new peer %s using %s" , p , proto )
gs .tracer .AddPeer (p , proto )
gs .peers [p ] = proto
outbound := false
conns := gs .p .host .Network ().ConnsToPeer (p )
loop :
for _ , c := range conns {
stat := c .Stat ()
if stat .Limited {
continue
}
if stat .Direction == network .DirOutbound {
for _ , s := range c .GetStreams () {
if s .Protocol () == proto {
outbound = true
break loop
}
}
}
}
gs .outbound [p ] = outbound
}
func (gs *GossipSubRouter ) RemovePeer (p peer .ID ) {
log .Debugf ("PEERDOWN: Remove disconnected peer %s" , p )
gs .tracer .RemovePeer (p )
delete (gs .peers , p )
for _ , peers := range gs .mesh {
delete (peers , p )
}
for _ , peers := range gs .fanout {
delete (peers , p )
}
delete (gs .gossip , p )
delete (gs .control , p )
delete (gs .outbound , p )
}
func (gs *GossipSubRouter ) EnoughPeers (topic string , suggested int ) bool {
tmap , ok := gs .p .topics [topic ]
if !ok {
return false
}
fsPeers , gsPeers := 0 , 0
for p := range tmap {
if !gs .feature (GossipSubFeatureMesh , gs .peers [p ]) {
fsPeers ++
}
}
gsPeers = len (gs .mesh [topic ])
if suggested == 0 {
suggested = gs .params .Dlo
}
if fsPeers +gsPeers >= suggested || gsPeers >= gs .params .Dhi {
return true
}
return false
}
func (gs *GossipSubRouter ) AcceptFrom (p peer .ID ) AcceptStatus {
_ , direct := gs .direct [p ]
if direct {
return AcceptAll
}
if gs .score .Score (p ) < gs .graylistThreshold {
return AcceptNone
}
return gs .gate .AcceptFrom (p )
}
func (gs *GossipSubRouter ) PreValidation (msgs []*Message ) {
tmids := make (map [string ][]string )
for _ , msg := range msgs {
if len (msg .GetData ()) < gs .params .IDontWantMessageThreshold {
continue
}
topic := msg .GetTopic ()
tmids [topic ] = append (tmids [topic ], gs .p .idGen .ID (msg ))
}
for topic , mids := range tmids {
if len (mids ) == 0 {
continue
}
shuffleStrings (mids )
for p := range gs .mesh [topic ] {
if gs .feature (GossipSubFeatureIdontwant , gs .peers [p ]) {
idontwant := []*pb .ControlIDontWant {{MessageIDs : mids }}
out := rpcWithControl (nil , nil , nil , nil , nil , idontwant )
gs .sendRPC (p , out , true )
}
}
}
}
func (gs *GossipSubRouter ) HandleRPC (rpc *RPC ) {
ctl := rpc .GetControl ()
if ctl == nil {
return
}
iwant := gs .handleIHave (rpc .from , ctl )
ihave := gs .handleIWant (rpc .from , ctl )
prune := gs .handleGraft (rpc .from , ctl )
gs .handlePrune (rpc .from , ctl )
gs .handleIDontWant (rpc .from , ctl )
if len (iwant ) == 0 && len (ihave ) == 0 && len (prune ) == 0 {
return
}
out := rpcWithControl (ihave , nil , iwant , nil , prune , nil )
gs .sendRPC (rpc .from , out , false )
}
func (gs *GossipSubRouter ) handleIHave (p peer .ID , ctl *pb .ControlMessage ) []*pb .ControlIWant {
score := gs .score .Score (p )
if score < gs .gossipThreshold {
log .Debugf ("IHAVE: ignoring peer %s with score below threshold [score = %f]" , p , score )
return nil
}
gs .peerhave [p ]++
if gs .peerhave [p ] > gs .params .MaxIHaveMessages {
log .Debugf ("IHAVE: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring" , p , gs .peerhave [p ])
return nil
}
if gs .iasked [p ] >= gs .params .MaxIHaveLength {
log .Debugf ("IHAVE: peer %s has already advertised too many messages (%d); ignoring" , p , gs .iasked [p ])
return nil
}
iwant := make (map [string ]struct {})
for _ , ihave := range ctl .GetIhave () {
topic := ihave .GetTopicID ()
_ , ok := gs .mesh [topic ]
if !ok {
continue
}
if !gs .p .peerFilter (p , topic ) {
continue
}
checkIwantMsgsLoop :
for msgIdx , mid := range ihave .GetMessageIDs () {
if msgIdx >= gs .params .MaxIHaveLength {
log .Debugf ("IHAVE: peer %s has sent IHAVE on topic %s with too many messages (%d); ignoring remaining msgs" , p , topic , len (ihave .MessageIDs ))
break checkIwantMsgsLoop
}
if gs .p .seenMessage (mid ) {
continue
}
iwant [mid ] = struct {}{}
}
}
if len (iwant ) == 0 {
return nil
}
iask := len (iwant )
if iask +gs .iasked [p ] > gs .params .MaxIHaveLength {
iask = gs .params .MaxIHaveLength - gs .iasked [p ]
}
log .Debugf ("IHAVE: Asking for %d out of %d messages from %s" , iask , len (iwant ), p )
iwantlst := make ([]string , 0 , len (iwant ))
for mid := range iwant {
iwantlst = append (iwantlst , mid )
}
shuffleStrings (iwantlst )
iwantlst = iwantlst [:iask ]
gs .iasked [p ] += iask
gs .gossipTracer .AddPromise (p , iwantlst )
return []*pb .ControlIWant {{MessageIDs : iwantlst }}
}
func (gs *GossipSubRouter ) handleIWant (p peer .ID , ctl *pb .ControlMessage ) []*pb .Message {
score := gs .score .Score (p )
if score < gs .gossipThreshold {
log .Debugf ("IWANT: ignoring peer %s with score below threshold [score = %f]" , p , score )
return nil
}
ihave := make (map [string ]*pb .Message )
for _ , iwant := range ctl .GetIwant () {
for _ , mid := range iwant .GetMessageIDs () {
if _ , ok := gs .unwanted [p ][computeChecksum (mid )]; ok {
continue
}
msg , count , ok := gs .mcache .GetForPeer (mid , p )
if !ok {
continue
}
if !gs .p .peerFilter (p , msg .GetTopic ()) {
continue
}
if count > gs .params .GossipRetransmission {
log .Debugf ("IWANT: Peer %s has asked for message %s too many times; ignoring request" , p , mid )
continue
}
ihave [mid ] = msg .Message
}
}
if len (ihave ) == 0 {
return nil
}
log .Debugf ("IWANT: Sending %d messages to %s" , len (ihave ), p )
msgs := make ([]*pb .Message , 0 , len (ihave ))
for _ , msg := range ihave {
msgs = append (msgs , msg )
}
return msgs
}
func (gs *GossipSubRouter ) handleGraft (p peer .ID , ctl *pb .ControlMessage ) []*pb .ControlPrune {
var prune []string
doPX := gs .doPX
score := gs .score .Score (p )
now := time .Now ()
for _ , graft := range ctl .GetGraft () {
topic := graft .GetTopicID ()
if !gs .p .peerFilter (p , topic ) {
continue
}
peers , ok := gs .mesh [topic ]
if !ok {
doPX = false
continue
}
_ , inMesh := peers [p ]
if inMesh {
continue
}
_ , direct := gs .direct [p ]
if direct {
log .Warnf ("GRAFT: ignoring request from direct peer %s" , p )
prune = append (prune , topic )
doPX = false
continue
}
expire , backoff := gs .backoff [topic ][p ]
if backoff && now .Before (expire ) {
log .Debugf ("GRAFT: ignoring backed off peer %s" , p )
gs .score .AddPenalty (p , 1 )
doPX = false
floodCutoff := expire .Add (gs .params .GraftFloodThreshold - gs .params .PruneBackoff )
if now .Before (floodCutoff ) {
gs .score .AddPenalty (p , 1 )
}
gs .addBackoff (p , topic , false )
prune = append (prune , topic )
continue
}
if score < 0 {
log .Debugf ("GRAFT: ignoring peer %s with negative score [score = %f, topic = %s]" , p , score , topic )
prune = append (prune , topic )
doPX = false
gs .addBackoff (p , topic , false )
continue
}
if len (peers ) >= gs .params .Dhi && !gs .outbound [p ] {
prune = append (prune , topic )
gs .addBackoff (p , topic , false )
continue
}
log .Debugf ("GRAFT: add mesh link from %s in %s" , p , topic )
gs .tracer .Graft (p , topic )
peers [p ] = struct {}{}
}
if len (prune ) == 0 {
return nil
}
cprune := make ([]*pb .ControlPrune , 0 , len (prune ))
for _ , topic := range prune {
cprune = append (cprune , gs .makePrune (p , topic , doPX , false ))
}
return cprune
}
func (gs *GossipSubRouter ) handlePrune (p peer .ID , ctl *pb .ControlMessage ) {
score := gs .score .Score (p )
for _ , prune := range ctl .GetPrune () {
topic := prune .GetTopicID ()
peers , ok := gs .mesh [topic ]
if !ok {
continue
}
log .Debugf ("PRUNE: Remove mesh link to %s in %s" , p , topic )
gs .tracer .Prune (p , topic )
delete (peers , p )
backoff := prune .GetBackoff ()
if backoff > 0 {
gs .doAddBackoff (p , topic , time .Duration (backoff )*time .Second )
} else {
gs .addBackoff (p , topic , false )
}
px := prune .GetPeers ()
if len (px ) > 0 {
if score < gs .acceptPXThreshold {
log .Debugf ("PRUNE: ignoring PX from peer %s with insufficient score [score = %f, topic = %s]" , p , score , topic )
continue
}
gs .pxConnect (px )
}
}
}
func (gs *GossipSubRouter ) handleIDontWant (p peer .ID , ctl *pb .ControlMessage ) {
if gs .unwanted [p ] == nil {
gs .unwanted [p ] = make (map [checksum ]int )
}
if gs .peerdontwant [p ] >= gs .params .MaxIDontWantMessages {
log .Debugf ("IDONWANT: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring" , p , gs .peerdontwant [p ])
return
}
gs .peerdontwant [p ]++
totalUnwantedIds := 0
mainIDWLoop :
for _ , idontwant := range ctl .GetIdontwant () {
for _ , mid := range idontwant .GetMessageIDs () {
if totalUnwantedIds >= gs .params .MaxIDontWantLength {
log .Debugf ("IDONWANT: peer %s has advertised too many ids (%d) within this message; ignoring" , p , totalUnwantedIds )
break mainIDWLoop
}
totalUnwantedIds ++
gs .unwanted [p ][computeChecksum (mid )] = gs .params .IDontWantMessageTTL
}
}
}
func (gs *GossipSubRouter ) addBackoff (p peer .ID , topic string , isUnsubscribe bool ) {
backoff := gs .params .PruneBackoff
if isUnsubscribe {
backoff = gs .params .UnsubscribeBackoff
}
gs .doAddBackoff (p , topic , backoff )
}
func (gs *GossipSubRouter ) doAddBackoff (p peer .ID , topic string , interval time .Duration ) {
backoff , ok := gs .backoff [topic ]
if !ok {
backoff = make (map [peer .ID ]time .Time )
gs .backoff [topic ] = backoff
}
expire := time .Now ().Add (interval )
if backoff [p ].Before (expire ) {
backoff [p ] = expire
}
}
func (gs *GossipSubRouter ) pxConnect (peers []*pb .PeerInfo ) {
if len (peers ) > gs .params .PrunePeers {
shufflePeerInfo (peers )
peers = peers [:gs .params .PrunePeers ]
}
toconnect := make ([]connectInfo , 0 , len (peers ))
for _ , pi := range peers {
p := peer .ID (pi .PeerID )
_ , connected := gs .peers [p ]
if connected {
continue
}
var spr *record .Envelope
if pi .SignedPeerRecord != nil {
envelope , r , err := record .ConsumeEnvelope (pi .SignedPeerRecord , peer .PeerRecordEnvelopeDomain )
if err != nil {
log .Warnf ("error unmarshalling peer record obtained through px: %s" , err )
continue
}
rec , ok := r .(*peer .PeerRecord )
if !ok {
log .Warnf ("bogus peer record obtained through px: envelope payload is not PeerRecord" )
continue
}
if rec .PeerID != p {
log .Warnf ("bogus peer record obtained through px: peer ID %s doesn't match expected peer %s" , rec .PeerID , p )
continue
}
spr = envelope
}
toconnect = append (toconnect , connectInfo {p , spr })
}
if len (toconnect ) == 0 {
return
}
for _ , ci := range toconnect {
select {
case gs .connect <- ci :
default :
log .Debugf ("ignoring peer connection attempt; too many pending connections" )
}
}
}
func (gs *GossipSubRouter ) connector () {
for {
select {
case ci := <- gs .connect :
if gs .p .host .Network ().Connectedness (ci .p ) == network .Connected {
continue
}
log .Debugf ("connecting to %s" , ci .p )
cab , ok := peerstore .GetCertifiedAddrBook (gs .cab )
if ok && ci .spr != nil {
_ , err := cab .ConsumePeerRecord (ci .spr , peerstore .TempAddrTTL )
if err != nil {
log .Debugf ("error processing peer record: %s" , err )
}
}
ctx , cancel := context .WithTimeout (gs .p .ctx , gs .params .ConnectionTimeout )
err := gs .p .host .Connect (ctx , peer .AddrInfo {ID : ci .p , Addrs : gs .cab .Addrs (ci .p )})
cancel ()
if err != nil {
log .Debugf ("error connecting to %s: %s" , ci .p , err )
}
case <- gs .p .ctx .Done ():
return
}
}
}
func (gs *GossipSubRouter ) Publish (msg *Message ) {
gs .mcache .Put (msg )
from := msg .ReceivedFrom
topic := msg .GetTopic ()
tosend := make (map [peer .ID ]struct {})
tmap , ok := gs .p .topics [topic ]
if !ok {
return
}
if gs .floodPublish && from == gs .p .host .ID () {
for p := range tmap {
_ , direct := gs .direct [p ]
if direct || gs .score .Score (p ) >= gs .publishThreshold {
tosend [p ] = struct {}{}
}
}
} else {
for p := range gs .direct {
_ , inTopic := tmap [p ]
if inTopic {
tosend [p ] = struct {}{}
}
}
for p := range tmap {
if !gs .feature (GossipSubFeatureMesh , gs .peers [p ]) && gs .score .Score (p ) >= gs .publishThreshold {
tosend [p ] = struct {}{}
}
}
gmap , ok := gs .mesh [topic ]
if !ok {
gmap , ok = gs .fanout [topic ]
if !ok || len (gmap ) == 0 {
peers := gs .getPeers (topic , gs .params .D , func (p peer .ID ) bool {
_ , direct := gs .direct [p ]
return !direct && gs .score .Score (p ) >= gs .publishThreshold
})
if len (peers ) > 0 {
gmap = peerListToMap (peers )
gs .fanout [topic ] = gmap
}
}
gs .lastpub [topic ] = time .Now ().UnixNano ()
}
for p := range gmap {
mid := gs .p .idGen .ID (msg )
if _ , ok := gs .unwanted [p ][computeChecksum (mid )]; ok {
continue
}
tosend [p ] = struct {}{}
}
}
out := rpcWithMessages (msg .Message )
for pid := range tosend {
if pid == from || pid == peer .ID (msg .GetFrom ()) {
continue
}
gs .sendRPC (pid , out , false )
}
}
func (gs *GossipSubRouter ) Join (topic string ) {
gmap , ok := gs .mesh [topic ]
if ok {
return
}
log .Debugf ("JOIN %s" , topic )
gs .tracer .Join (topic )
gmap , ok = gs .fanout [topic ]
if ok {
backoff := gs .backoff [topic ]
for p := range gmap {
_ , doBackOff := backoff [p ]
if gs .score .Score (p ) < 0 || doBackOff {
delete (gmap , p )
}
}
if len (gmap ) < gs .params .D {
more := gs .getPeers (topic , gs .params .D -len (gmap ), func (p peer .ID ) bool {
_ , inMesh := gmap [p ]
_ , direct := gs .direct [p ]
_ , doBackOff := backoff [p ]
return !inMesh && !direct && !doBackOff && gs .score .Score (p ) >= 0
})
for _ , p := range more {
gmap [p ] = struct {}{}
}
}
gs .mesh [topic ] = gmap
delete (gs .fanout , topic )
delete (gs .lastpub , topic )
} else {
backoff := gs .backoff [topic ]
peers := gs .getPeers (topic , gs .params .D , func (p peer .ID ) bool {
_ , direct := gs .direct [p ]
_ , doBackOff := backoff [p ]
return !direct && !doBackOff && gs .score .Score (p ) >= 0
})
gmap = peerListToMap (peers )
gs .mesh [topic ] = gmap
}
for p := range gmap {
log .Debugf ("JOIN: Add mesh link to %s in %s" , p , topic )
gs .tracer .Graft (p , topic )
gs .sendGraft (p , topic )
}
}
func (gs *GossipSubRouter ) Leave (topic string ) {
gmap , ok := gs .mesh [topic ]
if !ok {
return
}
log .Debugf ("LEAVE %s" , topic )
gs .tracer .Leave (topic )
delete (gs .mesh , topic )
for p := range gmap {
log .Debugf ("LEAVE: Remove mesh link to %s in %s" , p , topic )
gs .tracer .Prune (p , topic )
gs .sendPrune (p , topic , true )
gs .addBackoff (p , topic , true )
}
}
func (gs *GossipSubRouter ) sendGraft (p peer .ID , topic string ) {
graft := []*pb .ControlGraft {{TopicID : &topic }}
out := rpcWithControl (nil , nil , nil , graft , nil , nil )
gs .sendRPC (p , out , false )
}
func (gs *GossipSubRouter ) sendPrune (p peer .ID , topic string , isUnsubscribe bool ) {
prune := []*pb .ControlPrune {gs .makePrune (p , topic , gs .doPX , isUnsubscribe )}
out := rpcWithControl (nil , nil , nil , nil , prune , nil )
gs .sendRPC (p , out , false )
}
func (gs *GossipSubRouter ) sendRPC (p peer .ID , out *RPC , urgent bool ) {
own := false
ctl , ok := gs .control [p ]
if ok {
out = copyRPC (out )
own = true
gs .piggybackControl (p , out , ctl )
delete (gs .control , p )
}
ihave , ok := gs .gossip [p ]
if ok {
if !own {
out = copyRPC (out )
own = true
}
gs .piggybackGossip (p , out , ihave )
delete (gs .gossip , p )
}
q , ok := gs .p .peers [p ]
if !ok {
return
}
if out .Size () < gs .p .maxMessageSize {
gs .doSendRPC (out , p , q , urgent )
return
}
outRPCs := appendOrMergeRPC (nil , gs .p .maxMessageSize , *out )
for _ , rpc := range outRPCs {
if rpc .Size () > gs .p .maxMessageSize {
gs .doDropRPC (out , p , fmt .Sprintf ("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)" , rpc .Size (), gs .p .maxMessageSize , rpc .Size ()-gs .p .maxMessageSize ))
continue
}
gs .doSendRPC (rpc , p , q , urgent )
}
}
func (gs *GossipSubRouter ) doDropRPC (rpc *RPC , p peer .ID , reason string ) {
if log .Level () <= zapcore .DebugLevel {
log .Debugf ("dropping message to peer %s: %s" , p , reason )
}
gs .tracer .DropRPC (rpc , p )
ctl := rpc .GetControl ()
if ctl != nil {
gs .pushControl (p , ctl )
}
}
func (gs *GossipSubRouter ) doSendRPC (rpc *RPC , p peer .ID , q *rpcQueue , urgent bool ) {
var err error
if urgent {
err = q .UrgentPush (rpc , false )
} else {
err = q .Push (rpc , false )
}
if err != nil {
gs .doDropRPC (rpc , p , "queue full" )
return
}
gs .tracer .SendRPC (rpc , p )
}
func appendOrMergeRPC(slice []*RPC , limit int , elems ...RPC ) []*RPC {
if len (elems ) == 0 {
return slice
}
if len (slice ) == 0 && len (elems ) == 1 && elems [0 ].Size () < limit {
return append (slice , &elems [0 ])
}
out := slice
if len (out ) == 0 {
out = append (out , &RPC {RPC : pb .RPC {}})
out [0 ].from = elems [0 ].from
}
for _ , elem := range elems {
lastRPC := out [len (out )-1 ]
for _ , msg := range elem .GetPublish () {
if lastRPC .Publish = append (lastRPC .Publish , msg ); lastRPC .Size () > limit {
lastRPC .Publish = lastRPC .Publish [:len (lastRPC .Publish )-1 ]
lastRPC = &RPC {RPC : pb .RPC {}, from : elem .from }
lastRPC .Publish = append (lastRPC .Publish , msg )
out = append (out , lastRPC )
}
}
for _ , sub := range elem .GetSubscriptions () {
if lastRPC .Subscriptions = append (lastRPC .Subscriptions , sub ); lastRPC .Size () > limit {
lastRPC .Subscriptions = lastRPC .Subscriptions [:len (lastRPC .Subscriptions )-1 ]
lastRPC = &RPC {RPC : pb .RPC {}, from : elem .from }
lastRPC .Subscriptions = append (lastRPC .Subscriptions , sub )
out = append (out , lastRPC )
}
}
if ctl := elem .GetControl (); ctl != nil {
if lastRPC .Control == nil {
lastRPC .Control = &pb .ControlMessage {}
if lastRPC .Size () > limit {
lastRPC .Control = nil
lastRPC = &RPC {RPC : pb .RPC {Control : &pb .ControlMessage {}}, from : elem .from }
out = append (out , lastRPC )
}
}
for _ , graft := range ctl .GetGraft () {
if lastRPC .Control .Graft = append (lastRPC .Control .Graft , graft ); lastRPC .Size () > limit {
lastRPC .Control .Graft = lastRPC .Control .Graft [:len (lastRPC .Control .Graft )-1 ]
lastRPC = &RPC {RPC : pb .RPC {Control : &pb .ControlMessage {}}, from : elem .from }
lastRPC .Control .Graft = append (lastRPC .Control .Graft , graft )
out = append (out , lastRPC )
}
}
for _ , prune := range ctl .GetPrune () {
if lastRPC .Control .Prune = append (lastRPC .Control .Prune , prune ); lastRPC .Size () > limit {
lastRPC .Control .Prune = lastRPC .Control .Prune [:len (lastRPC .Control .Prune )-1 ]
lastRPC = &RPC {RPC : pb .RPC {Control : &pb .ControlMessage {}}, from : elem .from }
lastRPC .Control .Prune = append (lastRPC .Control .Prune , prune )
out = append (out , lastRPC )
}
}
for _ , iwant := range ctl .GetIwant () {
if len (lastRPC .Control .Iwant ) == 0 {
newIWant := &pb .ControlIWant {}
if lastRPC .Control .Iwant = append (lastRPC .Control .Iwant , newIWant ); lastRPC .Size () > limit {
lastRPC .Control .Iwant = lastRPC .Control .Iwant [:len (lastRPC .Control .Iwant )-1 ]
lastRPC = &RPC {RPC : pb .RPC {Control : &pb .ControlMessage {
Iwant : []*pb .ControlIWant {newIWant },
}}, from : elem .from }
out = append (out , lastRPC )
}
}
for _ , msgID := range iwant .GetMessageIDs () {
if lastRPC .Control .Iwant [0 ].MessageIDs = append (lastRPC .Control .Iwant [0 ].MessageIDs , msgID ); lastRPC .Size () > limit {
lastRPC .Control .Iwant [0 ].MessageIDs = lastRPC .Control .Iwant [0 ].MessageIDs [:len (lastRPC .Control .Iwant [0 ].MessageIDs )-1 ]
lastRPC = &RPC {RPC : pb .RPC {Control : &pb .ControlMessage {
Iwant : []*pb .ControlIWant {{MessageIDs : []string {msgID }}},
}}, from : elem .from }
out = append (out , lastRPC )
}
}
}
for _ , ihave := range ctl .GetIhave () {
if len (lastRPC .Control .Ihave ) == 0 ||
lastRPC .Control .Ihave [len (lastRPC .Control .Ihave )-1 ].TopicID != ihave .TopicID {
newIhave := &pb .ControlIHave {TopicID : ihave .TopicID }
if lastRPC .Control .Ihave = append (lastRPC .Control .Ihave , newIhave ); lastRPC .Size () > limit {
lastRPC .Control .Ihave = lastRPC .Control .Ihave [:len (lastRPC .Control .Ihave )-1 ]
lastRPC = &RPC {RPC : pb .RPC {Control : &pb .ControlMessage {
Ihave : []*pb .ControlIHave {newIhave },
}}, from : elem .from }
out = append (out , lastRPC )
}
}
for _ , msgID := range ihave .GetMessageIDs () {
lastIHave := lastRPC .Control .Ihave [len (lastRPC .Control .Ihave )-1 ]
if lastIHave .MessageIDs = append (lastIHave .MessageIDs , msgID ); lastRPC .Size () > limit {
lastIHave .MessageIDs = lastIHave .MessageIDs [:len (lastIHave .MessageIDs )-1 ]
lastRPC = &RPC {RPC : pb .RPC {Control : &pb .ControlMessage {
Ihave : []*pb .ControlIHave {{TopicID : ihave .TopicID , MessageIDs : []string {msgID }}},
}}, from : elem .from }
out = append (out , lastRPC )
}
}
}
}
}
return out
}
func (gs *GossipSubRouter ) heartbeatTimer () {
time .Sleep (gs .params .HeartbeatInitialDelay )
select {
case gs .p .eval <- gs .heartbeat :
case <- gs .p .ctx .Done ():
return
}
ticker := time .NewTicker (gs .params .HeartbeatInterval )
defer ticker .Stop ()
for {
select {
case <- ticker .C :
select {
case gs .p .eval <- gs .heartbeat :
case <- gs .p .ctx .Done ():
return
}
case <- gs .p .ctx .Done ():
return
}
}
}
func (gs *GossipSubRouter ) heartbeat () {
start := time .Now ()
defer func () {
if gs .params .SlowHeartbeatWarning > 0 {
slowWarning := time .Duration (gs .params .SlowHeartbeatWarning * float64 (gs .params .HeartbeatInterval ))
if dt := time .Since (start ); dt > slowWarning {
log .Warnw ("slow heartbeat" , "took" , dt )
}
}
}()
gs .heartbeatTicks ++
tograft := make (map [peer .ID ][]string )
toprune := make (map [peer .ID ][]string )
noPX := make (map [peer .ID ]bool )
gs .clearBackoff ()
gs .clearIHaveCounters ()
gs .clearIDontWantCounters ()
gs .applyIwantPenalties ()
gs .directConnect ()
scores := make (map [peer .ID ]float64 )
score := func (p peer .ID ) float64 {
s , ok := scores [p ]
if !ok {
s = gs .score .Score (p )
scores [p ] = s
}
return s
}
for topic , peers := range gs .mesh {
prunePeer := func (p peer .ID ) {
gs .tracer .Prune (p , topic )
delete (peers , p )
gs .addBackoff (p , topic , false )
topics := toprune [p ]
toprune [p ] = append (topics , topic )
}
graftPeer := func (p peer .ID ) {
log .Debugf ("HEARTBEAT: Add mesh link to %s in %s" , p , topic )
gs .tracer .Graft (p , topic )
peers [p ] = struct {}{}
topics := tograft [p ]
tograft [p ] = append (topics , topic )
}
for p := range peers {
if score (p ) < 0 {
log .Debugf ("HEARTBEAT: Prune peer %s with negative score [score = %f, topic = %s]" , p , score (p ), topic )
prunePeer (p )
noPX [p ] = true
}
}
if l := len (peers ); l < gs .params .Dlo {
backoff := gs .backoff [topic ]
ineed := gs .params .D - l
plst := gs .getPeers (topic , ineed , func (p peer .ID ) bool {
_ , inMesh := peers [p ]
_ , doBackoff := backoff [p ]
_ , direct := gs .direct [p ]
return !inMesh && !doBackoff && !direct && score (p ) >= 0
})
for _ , p := range plst {
graftPeer (p )
}
}
if len (peers ) >= gs .params .Dhi {
plst := peerMapToList (peers )
shufflePeers (plst )
sort .Slice (plst , func (i , j int ) bool {
return score (plst [i ]) > score (plst [j ])
})
shufflePeers (plst [gs .params .Dscore :])
outbound := 0
for _ , p := range plst [:gs .params .D ] {
if gs .outbound [p ] {
outbound ++
}
}
if outbound < gs .params .Dout {
rotate := func (i int ) {
p := plst [i ]
for j := i ; j > 0 ; j -- {
plst [j ] = plst [j -1 ]
}
plst [0 ] = p
}
if outbound > 0 {
ihave := outbound
for i := 1 ; i < gs .params .D && ihave > 0 ; i ++ {
p := plst [i ]
if gs .outbound [p ] {
rotate (i )
ihave --
}
}
}
ineed := gs .params .Dout - outbound
for i := gs .params .D ; i < len (plst ) && ineed > 0 ; i ++ {
p := plst [i ]
if gs .outbound [p ] {
rotate (i )
ineed --
}
}
}
for _ , p := range plst [gs .params .D :] {
log .Debugf ("HEARTBEAT: Remove mesh link to %s in %s" , p , topic )
prunePeer (p )
}
}
if len (peers ) >= gs .params .Dlo {
outbound := 0
for p := range peers {
if gs .outbound [p ] {
outbound ++
}
}
if outbound < gs .params .Dout {
ineed := gs .params .Dout - outbound
backoff := gs .backoff [topic ]
plst := gs .getPeers (topic , ineed , func (p peer .ID ) bool {
_ , inMesh := peers [p ]
_ , doBackoff := backoff [p ]
_ , direct := gs .direct [p ]
return !inMesh && !doBackoff && !direct && gs .outbound [p ] && score (p ) >= 0
})
for _ , p := range plst {
graftPeer (p )
}
}
}
if gs .heartbeatTicks %gs .params .OpportunisticGraftTicks == 0 && len (peers ) > 1 {
plst := peerMapToList (peers )
sort .Slice (plst , func (i , j int ) bool {
return score (plst [i ]) < score (plst [j ])
})
medianIndex := len (peers ) / 2
medianScore := scores [plst [medianIndex ]]
if medianScore < gs .opportunisticGraftThreshold {
backoff := gs .backoff [topic ]
plst = gs .getPeers (topic , gs .params .OpportunisticGraftPeers , func (p peer .ID ) bool {
_ , inMesh := peers [p ]
_ , doBackoff := backoff [p ]
_ , direct := gs .direct [p ]
return !inMesh && !doBackoff && !direct && score (p ) > medianScore
})
for _ , p := range plst {
log .Debugf ("HEARTBEAT: Opportunistically graft peer %s on topic %s" , p , topic )
graftPeer (p )
}
}
}
gs .emitGossip (topic , peers )
}
now := time .Now ().UnixNano ()
for topic , lastpub := range gs .lastpub {
if lastpub +int64 (gs .params .FanoutTTL ) < now {
delete (gs .fanout , topic )
delete (gs .lastpub , topic )
}
}
for topic , peers := range gs .fanout {
for p := range peers {
_ , ok := gs .p .topics [topic ][p ]
if !ok || score (p ) < gs .publishThreshold {
delete (peers , p )
}
}
if len (peers ) < gs .params .D {
ineed := gs .params .D - len (peers )
plst := gs .getPeers (topic , ineed , func (p peer .ID ) bool {
_ , inFanout := peers [p ]
_ , direct := gs .direct [p ]
return !inFanout && !direct && score (p ) >= gs .publishThreshold
})
for _ , p := range plst {
peers [p ] = struct {}{}
}
}
gs .emitGossip (topic , peers )
}
gs .sendGraftPrune (tograft , toprune , noPX )
gs .flush ()
gs .mcache .Shift ()
}
func (gs *GossipSubRouter ) clearIHaveCounters () {
if len (gs .peerhave ) > 0 {
gs .peerhave = make (map [peer .ID ]int )
}
if len (gs .iasked ) > 0 {
gs .iasked = make (map [peer .ID ]int )
}
}
func (gs *GossipSubRouter ) clearIDontWantCounters () {
if len (gs .peerdontwant ) > 0 {
gs .peerdontwant = make (map [peer .ID ]int )
}
for _ , mids := range gs .unwanted {
for mid := range mids {
mids [mid ]--
if mids [mid ] == 0 {
delete (mids , mid )
}
}
}
}
func (gs *GossipSubRouter ) applyIwantPenalties () {
for p , count := range gs .gossipTracer .GetBrokenPromises () {
log .Infof ("peer %s didn't follow up in %d IWANT requests; adding penalty" , p , count )
gs .score .AddPenalty (p , count )
}
}
func (gs *GossipSubRouter ) clearBackoff () {
if gs .heartbeatTicks %15 != 0 {
return
}
now := time .Now ()
for topic , backoff := range gs .backoff {
for p , expire := range backoff {
if expire .Add (2 * GossipSubHeartbeatInterval ).Before (now ) {
delete (backoff , p )
}
}
if len (backoff ) == 0 {
delete (gs .backoff , topic )
}
}
}
func (gs *GossipSubRouter ) directConnect () {
if gs .heartbeatTicks %gs .params .DirectConnectTicks != 0 {
return
}
var toconnect []peer .ID
for p := range gs .direct {
_ , connected := gs .peers [p ]
if !connected {
toconnect = append (toconnect , p )
}
}
if len (toconnect ) > 0 {
go func () {
for _ , p := range toconnect {
gs .connect <- connectInfo {p : p }
}
}()
}
}
func (gs *GossipSubRouter ) sendGraftPrune (tograft , toprune map [peer .ID ][]string , noPX map [peer .ID ]bool ) {
for p , topics := range tograft {
graft := make ([]*pb .ControlGraft , 0 , len (topics ))
for _ , topic := range topics {
copiedID := topic
graft = append (graft , &pb .ControlGraft {TopicID : &copiedID })
}
var prune []*pb .ControlPrune
pruning , ok := toprune [p ]
if ok {
delete (toprune , p )
prune = make ([]*pb .ControlPrune , 0 , len (pruning ))
for _ , topic := range pruning {
prune = append (prune , gs .makePrune (p , topic , gs .doPX && !noPX [p ], false ))
}
}
out := rpcWithControl (nil , nil , nil , graft , prune , nil )
gs .sendRPC (p , out , false )
}
for p , topics := range toprune {
prune := make ([]*pb .ControlPrune , 0 , len (topics ))
for _ , topic := range topics {
prune = append (prune , gs .makePrune (p , topic , gs .doPX && !noPX [p ], false ))
}
out := rpcWithControl (nil , nil , nil , nil , prune , nil )
gs .sendRPC (p , out , false )
}
}
func (gs *GossipSubRouter ) emitGossip (topic string , exclude map [peer .ID ]struct {}) {
mids := gs .mcache .GetGossipIDs (topic )
if len (mids ) == 0 {
return
}
shuffleStrings (mids )
if len (mids ) > gs .params .MaxIHaveLength {
log .Debugf ("too many messages for gossip; will truncate IHAVE list (%d messages)" , len (mids ))
}
peers := make ([]peer .ID , 0 , len (gs .p .topics [topic ]))
for p := range gs .p .topics [topic ] {
_ , inExclude := exclude [p ]
_ , direct := gs .direct [p ]
if !inExclude && !direct && gs .feature (GossipSubFeatureMesh , gs .peers [p ]) && gs .score .Score (p ) >= gs .gossipThreshold {
peers = append (peers , p )
}
}
target := gs .params .Dlazy
factor := int (gs .params .GossipFactor * float64 (len (peers )))
if factor > target {
target = factor
}
if target > len (peers ) {
target = len (peers )
} else {
shufflePeers (peers )
}
peers = peers [:target ]
for _ , p := range peers {
peerMids := mids
if len (mids ) > gs .params .MaxIHaveLength {
peerMids = make ([]string , gs .params .MaxIHaveLength )
shuffleStrings (mids )
copy (peerMids , mids )
}
gs .enqueueGossip (p , &pb .ControlIHave {TopicID : &topic , MessageIDs : peerMids })
}
}
func (gs *GossipSubRouter ) flush () {
for p , ihave := range gs .gossip {
delete (gs .gossip , p )
out := rpcWithControl (nil , ihave , nil , nil , nil , nil )
gs .sendRPC (p , out , false )
}
for p , ctl := range gs .control {
delete (gs .control , p )
out := rpcWithControl (nil , nil , nil , ctl .Graft , ctl .Prune , nil )
gs .sendRPC (p , out , false )
}
}
func (gs *GossipSubRouter ) enqueueGossip (p peer .ID , ihave *pb .ControlIHave ) {
gossip := gs .gossip [p ]
gossip = append (gossip , ihave )
gs .gossip [p ] = gossip
}
func (gs *GossipSubRouter ) piggybackGossip (p peer .ID , out *RPC , ihave []*pb .ControlIHave ) {
ctl := out .GetControl ()
if ctl == nil {
ctl = &pb .ControlMessage {}
out .Control = ctl
}
ctl .Ihave = ihave
}
func (gs *GossipSubRouter ) pushControl (p peer .ID , ctl *pb .ControlMessage ) {
ctl .Ihave = nil
ctl .Iwant = nil
ctl .Idontwant = nil
if ctl .Graft != nil || ctl .Prune != nil {
gs .control [p ] = ctl
}
}
func (gs *GossipSubRouter ) piggybackControl (p peer .ID , out *RPC , ctl *pb .ControlMessage ) {
var tograft []*pb .ControlGraft
var toprune []*pb .ControlPrune
for _ , graft := range ctl .GetGraft () {
topic := graft .GetTopicID ()
peers , ok := gs .mesh [topic ]
if !ok {
continue
}
_, ok = peers [p ]
if ok {
tograft = append (tograft , graft )
}
}
for _ , prune := range ctl .GetPrune () {
topic := prune .GetTopicID ()
peers , ok := gs .mesh [topic ]
if !ok {
toprune = append (toprune , prune )
continue
}
_, ok = peers [p ]
if !ok {
toprune = append (toprune , prune )
}
}
if len (tograft ) == 0 && len (toprune ) == 0 {
return
}
xctl := out .Control
if xctl == nil {
xctl = &pb .ControlMessage {}
out .Control = xctl
}
if len (tograft ) > 0 {
xctl .Graft = append (xctl .Graft , tograft ...)
}
if len (toprune ) > 0 {
xctl .Prune = append (xctl .Prune , toprune ...)
}
}
func (gs *GossipSubRouter ) makePrune (p peer .ID , topic string , doPX bool , isUnsubscribe bool ) *pb .ControlPrune {
if !gs .feature (GossipSubFeaturePX , gs .peers [p ]) {
return &pb .ControlPrune {TopicID : &topic }
}
backoff := uint64 (gs .params .PruneBackoff / time .Second )
if isUnsubscribe {
backoff = uint64 (gs .params .UnsubscribeBackoff / time .Second )
}
var px []*pb .PeerInfo
if doPX {
peers := gs .getPeers (topic , gs .params .PrunePeers , func (xp peer .ID ) bool {
return p != xp && gs .score .Score (xp ) >= 0
})
cab , ok := peerstore .GetCertifiedAddrBook (gs .cab )
px = make ([]*pb .PeerInfo , 0 , len (peers ))
for _ , p := range peers {
var recordBytes []byte
if ok {
spr := cab .GetPeerRecord (p )
var err error
if spr != nil {
recordBytes , err = spr .Marshal ()
if err != nil {
log .Warnf ("error marshaling signed peer record for %s: %s" , p , err )
}
}
}
px = append (px , &pb .PeerInfo {PeerID : []byte (p ), SignedPeerRecord : recordBytes })
}
}
return &pb .ControlPrune {TopicID : &topic , Peers : px , Backoff : &backoff }
}
func (gs *GossipSubRouter ) getPeers (topic string , count int , filter func (peer .ID ) bool ) []peer .ID {
tmap , ok := gs .p .topics [topic ]
if !ok {
return nil
}
peers := make ([]peer .ID , 0 , len (tmap ))
for p := range tmap {
if gs .feature (GossipSubFeatureMesh , gs .peers [p ]) && filter (p ) && gs .p .peerFilter (p , topic ) {
peers = append (peers , p )
}
}
shufflePeers (peers )
if count > 0 && len (peers ) > count {
peers = peers [:count ]
}
return peers
}
func (gs *GossipSubRouter ) WithDefaultTagTracer () Option {
return WithRawTracer (gs .tagTracer )
}
func (gs *GossipSubRouter ) SendControl (p peer .ID , ctl *pb .ControlMessage , msgs ...*pb .Message ) {
out := rpcWithControl (msgs , ctl .Ihave , ctl .Iwant , ctl .Graft , ctl .Prune , ctl .Idontwant )
gs .sendRPC (p , out , false )
}
func peerListToMap(peers []peer .ID ) map [peer .ID ]struct {} {
pmap := make (map [peer .ID ]struct {})
for _ , p := range peers {
pmap [p ] = struct {}{}
}
return pmap
}
func peerMapToList(peers map [peer .ID ]struct {}) []peer .ID {
plst := make ([]peer .ID , 0 , len (peers ))
for p := range peers {
plst = append (plst , p )
}
return plst
}
func shufflePeers(peers []peer .ID ) {
for i := range peers {
j := rand .Intn (i + 1 )
peers [i ], peers [j ] = peers [j ], peers [i ]
}
}
func shufflePeerInfo(peers []*pb .PeerInfo ) {
for i := range peers {
j := rand .Intn (i + 1 )
peers [i ], peers [j ] = peers [j ], peers [i ]
}
}
func shuffleStrings(lst []string ) {
for i := range lst {
j := rand .Intn (i + 1 )
lst [i ], lst [j ] = lst [j ], lst [i ]
}
}
func computeChecksum(mid string ) checksum {
var cs checksum
if len (mid ) > 32 || len (mid ) == 0 {
cs .payload = sha256 .Sum256 ([]byte (mid ))
} else {
cs .length = uint8 (copy (cs .payload [:], mid ))
}
return cs
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .