package pubsub
import (
"context"
"fmt"
"net"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
manet "github.com/multiformats/go-multiaddr/net"
)
type peerStats struct {
connected bool
expire time .Time
topics map [string ]*topicStats
ips []string
ipWhitelist map [string ]bool
behaviourPenalty float64
}
type topicStats struct {
inMesh bool
graftTime time .Time
meshTime time .Duration
firstMessageDeliveries float64
meshMessageDeliveries float64
meshMessageDeliveriesActive bool
meshFailurePenalty float64
invalidMessageDeliveries float64
}
type peerScore struct {
sync .Mutex
params *PeerScoreParams
peerStats map [peer .ID ]*peerStats
peerIPs map [string ]map [peer .ID ]struct {}
deliveries *messageDeliveries
idGen *msgIDGenerator
host host .Host
inspect PeerScoreInspectFn
inspectEx ExtendedPeerScoreInspectFn
inspectPeriod time .Duration
}
var _ RawTracer = (*peerScore )(nil )
type messageDeliveries struct {
seenMsgTTL time .Duration
records map [string ]*deliveryRecord
head *deliveryEntry
tail *deliveryEntry
}
type deliveryRecord struct {
status int
firstSeen time .Time
validated time .Time
peers map [peer .ID ]struct {}
}
type deliveryEntry struct {
id string
expire time .Time
next *deliveryEntry
}
const (
deliveryUnknown = iota
deliveryValid
deliveryInvalid
deliveryIgnored
deliveryThrottled
)
type (
PeerScoreInspectFn = func (map [peer .ID ]float64 )
ExtendedPeerScoreInspectFn = func (map [peer .ID ]*PeerScoreSnapshot )
)
type PeerScoreSnapshot struct {
Score float64
Topics map [string ]*TopicScoreSnapshot
AppSpecificScore float64
IPColocationFactor float64
BehaviourPenalty float64
}
type TopicScoreSnapshot struct {
TimeInMesh time .Duration
FirstMessageDeliveries float64
MeshMessageDeliveries float64
InvalidMessageDeliveries float64
}
func WithPeerScoreInspect (inspect interface {}, period time .Duration ) Option {
return func (ps *PubSub ) error {
gs , ok := ps .rt .(*GossipSubRouter )
if !ok {
return fmt .Errorf ("pubsub router is not gossipsub" )
}
if gs .score == nil {
return fmt .Errorf ("peer scoring is not enabled" )
}
if gs .score .inspect != nil || gs .score .inspectEx != nil {
return fmt .Errorf ("duplicate peer score inspector" )
}
switch i := inspect .(type ) {
case PeerScoreInspectFn :
gs .score .inspect = i
case ExtendedPeerScoreInspectFn :
gs .score .inspectEx = i
default :
return fmt .Errorf ("unknown peer score insector type: %v" , inspect )
}
gs .score .inspectPeriod = period
return nil
}
}
func newPeerScore(params *PeerScoreParams ) *peerScore {
seenMsgTTL := params .SeenMsgTTL
if seenMsgTTL == 0 {
seenMsgTTL = TimeCacheDuration
}
return &peerScore {
params : params ,
peerStats : make (map [peer .ID ]*peerStats ),
peerIPs : make (map [string ]map [peer .ID ]struct {}),
deliveries : &messageDeliveries {seenMsgTTL : seenMsgTTL , records : make (map [string ]*deliveryRecord )},
idGen : newMsgIdGenerator (),
}
}
func (ps *peerScore ) SetTopicScoreParams (topic string , p *TopicScoreParams ) error {
ps .Lock ()
defer ps .Unlock ()
old , exist := ps .params .Topics [topic ]
ps .params .Topics [topic ] = p
if !exist {
return nil
}
recap := false
if p .FirstMessageDeliveriesCap < old .FirstMessageDeliveriesCap {
recap = true
}
if p .MeshMessageDeliveriesCap < old .MeshMessageDeliveriesCap {
recap = true
}
if !recap {
return nil
}
for _ , pstats := range ps .peerStats {
tstats , ok := pstats .topics [topic ]
if !ok {
continue
}
if tstats .firstMessageDeliveries > p .FirstMessageDeliveriesCap {
tstats .firstMessageDeliveries = p .FirstMessageDeliveriesCap
}
if tstats .meshMessageDeliveries > p .MeshMessageDeliveriesCap {
tstats .meshMessageDeliveries = p .MeshMessageDeliveriesCap
}
}
return nil
}
func (ps *peerScore ) Start (gs *GossipSubRouter ) {
if ps == nil {
return
}
ps .idGen = gs .p .idGen
ps .host = gs .p .host
go ps .background (gs .p .ctx )
}
func (ps *peerScore ) Score (p peer .ID ) float64 {
if ps == nil {
return 0
}
ps .Lock ()
defer ps .Unlock ()
return ps .score (p )
}
func (ps *peerScore ) score (p peer .ID ) float64 {
pstats , ok := ps .peerStats [p ]
if !ok {
return 0
}
var score float64
for topic , tstats := range pstats .topics {
topicParams , ok := ps .params .Topics [topic ]
if !ok {
continue
}
var topicScore float64
if tstats .inMesh {
p1 := float64 (tstats .meshTime / topicParams .TimeInMeshQuantum )
if p1 > topicParams .TimeInMeshCap {
p1 = topicParams .TimeInMeshCap
}
topicScore += p1 * topicParams .TimeInMeshWeight
}
p2 := tstats .firstMessageDeliveries
topicScore += p2 * topicParams .FirstMessageDeliveriesWeight
if tstats .meshMessageDeliveriesActive {
if tstats .meshMessageDeliveries < topicParams .MeshMessageDeliveriesThreshold {
deficit := topicParams .MeshMessageDeliveriesThreshold - tstats .meshMessageDeliveries
p3 := deficit * deficit
topicScore += p3 * topicParams .MeshMessageDeliveriesWeight
}
}
p3b := tstats .meshFailurePenalty
topicScore += p3b * topicParams .MeshFailurePenaltyWeight
p4 := (tstats .invalidMessageDeliveries * tstats .invalidMessageDeliveries )
topicScore += p4 * topicParams .InvalidMessageDeliveriesWeight
score += topicScore * topicParams .TopicWeight
}
if ps .params .TopicScoreCap > 0 && score > ps .params .TopicScoreCap {
score = ps .params .TopicScoreCap
}
p5 := ps .params .AppSpecificScore (p )
score += p5 * ps .params .AppSpecificWeight
p6 := ps .ipColocationFactor (p )
score += p6 * ps .params .IPColocationFactorWeight
if pstats .behaviourPenalty > ps .params .BehaviourPenaltyThreshold {
excess := pstats .behaviourPenalty - ps .params .BehaviourPenaltyThreshold
p7 := excess * excess
score += p7 * ps .params .BehaviourPenaltyWeight
}
return score
}
func (ps *peerScore ) ipColocationFactor (p peer .ID ) float64 {
pstats , ok := ps .peerStats [p ]
if !ok {
return 0
}
var result float64
loop :
for _ , ip := range pstats .ips {
if len (ps .params .IPColocationFactorWhitelist ) > 0 {
if pstats .ipWhitelist == nil {
pstats .ipWhitelist = make (map [string ]bool )
}
whitelisted , ok := pstats .ipWhitelist [ip ]
if !ok {
ipObj := net .ParseIP (ip )
for _ , ipNet := range ps .params .IPColocationFactorWhitelist {
if ipNet .Contains (ipObj ) {
pstats .ipWhitelist [ip ] = true
continue loop
}
}
pstats .ipWhitelist [ip ] = false
}
if whitelisted {
continue loop
}
}
peersInIP := len (ps .peerIPs [ip ])
if peersInIP > ps .params .IPColocationFactorThreshold {
surpluss := float64 (peersInIP - ps .params .IPColocationFactorThreshold )
result += surpluss * surpluss
}
}
return result
}
func (ps *peerScore ) AddPenalty (p peer .ID , count int ) {
if ps == nil {
return
}
ps .Lock ()
defer ps .Unlock ()
pstats , ok := ps .peerStats [p ]
if !ok {
return
}
pstats .behaviourPenalty += float64 (count )
}
func (ps *peerScore ) background (ctx context .Context ) {
refreshScores := time .NewTicker (ps .params .DecayInterval )
defer refreshScores .Stop ()
refreshIPs := time .NewTicker (time .Minute )
defer refreshIPs .Stop ()
gcDeliveryRecords := time .NewTicker (time .Minute )
defer gcDeliveryRecords .Stop ()
var inspectScores <-chan time .Time
if ps .inspect != nil || ps .inspectEx != nil {
ticker := time .NewTicker (ps .inspectPeriod )
defer ticker .Stop ()
defer ps .inspectScores ()
inspectScores = ticker .C
}
for {
select {
case <- refreshScores .C :
ps .refreshScores ()
case <- refreshIPs .C :
ps .refreshIPs ()
case <- gcDeliveryRecords .C :
ps .gcDeliveryRecords ()
case <- inspectScores :
ps .inspectScores ()
case <- ctx .Done ():
return
}
}
}
func (ps *peerScore ) inspectScores () {
if ps .inspect != nil {
ps .inspectScoresSimple ()
}
if ps .inspectEx != nil {
ps .inspectScoresExtended ()
}
}
func (ps *peerScore ) inspectScoresSimple () {
ps .Lock ()
scores := make (map [peer .ID ]float64 , len (ps .peerStats ))
for p := range ps .peerStats {
scores [p ] = ps .score (p )
}
ps .Unlock ()
go ps .inspect (scores )
}
func (ps *peerScore ) inspectScoresExtended () {
ps .Lock ()
scores := make (map [peer .ID ]*PeerScoreSnapshot , len (ps .peerStats ))
for p , pstats := range ps .peerStats {
pss := new (PeerScoreSnapshot )
pss .Score = ps .score (p )
if len (pstats .topics ) > 0 {
pss .Topics = make (map [string ]*TopicScoreSnapshot , len (pstats .topics ))
for t , ts := range pstats .topics {
tss := &TopicScoreSnapshot {
FirstMessageDeliveries : ts .firstMessageDeliveries ,
MeshMessageDeliveries : ts .meshMessageDeliveries ,
InvalidMessageDeliveries : ts .invalidMessageDeliveries ,
}
if ts .inMesh {
tss .TimeInMesh = ts .meshTime
}
pss .Topics [t ] = tss
}
}
pss .AppSpecificScore = ps .params .AppSpecificScore (p )
pss .IPColocationFactor = ps .ipColocationFactor (p )
pss .BehaviourPenalty = pstats .behaviourPenalty
scores [p ] = pss
}
ps .Unlock ()
go ps .inspectEx (scores )
}
func (ps *peerScore ) refreshScores () {
ps .Lock ()
defer ps .Unlock ()
now := time .Now ()
for p , pstats := range ps .peerStats {
if !pstats .connected {
if now .After (pstats .expire ) {
ps .removeIPs (p , pstats .ips )
delete (ps .peerStats , p )
}
continue
}
for topic , tstats := range pstats .topics {
topicParams , ok := ps .params .Topics [topic ]
if !ok {
continue
}
tstats .firstMessageDeliveries *= topicParams .FirstMessageDeliveriesDecay
if tstats .firstMessageDeliveries < ps .params .DecayToZero {
tstats .firstMessageDeliveries = 0
}
tstats .meshMessageDeliveries *= topicParams .MeshMessageDeliveriesDecay
if tstats .meshMessageDeliveries < ps .params .DecayToZero {
tstats .meshMessageDeliveries = 0
}
tstats .meshFailurePenalty *= topicParams .MeshFailurePenaltyDecay
if tstats .meshFailurePenalty < ps .params .DecayToZero {
tstats .meshFailurePenalty = 0
}
tstats .invalidMessageDeliveries *= topicParams .InvalidMessageDeliveriesDecay
if tstats .invalidMessageDeliveries < ps .params .DecayToZero {
tstats .invalidMessageDeliveries = 0
}
if tstats .inMesh {
tstats .meshTime = now .Sub (tstats .graftTime )
if tstats .meshTime > topicParams .MeshMessageDeliveriesActivation {
tstats .meshMessageDeliveriesActive = true
}
}
}
pstats .behaviourPenalty *= ps .params .BehaviourPenaltyDecay
if pstats .behaviourPenalty < ps .params .DecayToZero {
pstats .behaviourPenalty = 0
}
}
}
func (ps *peerScore ) refreshIPs () {
ps .Lock ()
defer ps .Unlock ()
for p , pstats := range ps .peerStats {
if pstats .connected {
ips := ps .getIPs (p )
ps .setIPs (p , ips , pstats .ips )
pstats .ips = ips
}
}
}
func (ps *peerScore ) gcDeliveryRecords () {
ps .Lock ()
defer ps .Unlock ()
ps .deliveries .gc ()
}
func (ps *peerScore ) AddPeer (p peer .ID , proto protocol .ID ) {
ps .Lock ()
defer ps .Unlock ()
pstats , ok := ps .peerStats [p ]
if !ok {
pstats = &peerStats {topics : make (map [string ]*topicStats )}
ps .peerStats [p ] = pstats
}
pstats .connected = true
ips := ps .getIPs (p )
ps .setIPs (p , ips , pstats .ips )
pstats .ips = ips
}
func (ps *peerScore ) RemovePeer (p peer .ID ) {
ps .Lock ()
defer ps .Unlock ()
pstats , ok := ps .peerStats [p ]
if !ok {
return
}
if ps .score (p ) > 0 {
ps .removeIPs (p , pstats .ips )
delete (ps .peerStats , p )
return
}
for topic , tstats := range pstats .topics {
tstats .firstMessageDeliveries = 0
threshold := ps .params .Topics [topic ].MeshMessageDeliveriesThreshold
if tstats .inMesh && tstats .meshMessageDeliveriesActive && tstats .meshMessageDeliveries < threshold {
deficit := threshold - tstats .meshMessageDeliveries
tstats .meshFailurePenalty += deficit * deficit
}
tstats .inMesh = false
}
pstats .connected = false
pstats .expire = time .Now ().Add (ps .params .RetainScore )
}
func (ps *peerScore ) Join (topic string ) {}
func (ps *peerScore ) Leave (topic string ) {}
func (ps *peerScore ) Graft (p peer .ID , topic string ) {
ps .Lock ()
defer ps .Unlock ()
pstats , ok := ps .peerStats [p ]
if !ok {
return
}
tstats , ok := pstats .getTopicStats (topic , ps .params )
if !ok {
return
}
tstats .inMesh = true
tstats .graftTime = time .Now ()
tstats .meshTime = 0
tstats .meshMessageDeliveriesActive = false
}
func (ps *peerScore ) Prune (p peer .ID , topic string ) {
ps .Lock ()
defer ps .Unlock ()
pstats , ok := ps .peerStats [p ]
if !ok {
return
}
tstats , ok := pstats .getTopicStats (topic , ps .params )
if !ok {
return
}
threshold := ps .params .Topics [topic ].MeshMessageDeliveriesThreshold
if tstats .meshMessageDeliveriesActive && tstats .meshMessageDeliveries < threshold {
deficit := threshold - tstats .meshMessageDeliveries
tstats .meshFailurePenalty += deficit * deficit
}
tstats .inMesh = false
}
func (ps *peerScore ) ValidateMessage (msg *Message ) {
ps .Lock ()
defer ps .Unlock ()
_ = ps .deliveries .getRecord (ps .idGen .ID (msg ))
}
func (ps *peerScore ) DeliverMessage (msg *Message ) {
ps .Lock ()
defer ps .Unlock ()
ps .markFirstMessageDelivery (msg .ReceivedFrom , msg )
drec := ps .deliveries .getRecord (ps .idGen .ID (msg ))
if drec .status != deliveryUnknown {
log .Debugf ("unexpected delivery trace: message from %s was first seen %s ago and has delivery status %d" , msg .ReceivedFrom , time .Since (drec .firstSeen ), drec .status )
return
}
drec .status = deliveryValid
drec .validated = time .Now ()
for p := range drec .peers {
if p != msg .ReceivedFrom {
ps .markDuplicateMessageDelivery (p , msg , time .Time {})
}
}
}
func (ps *peerScore ) RejectMessage (msg *Message , reason string ) {
ps .Lock ()
defer ps .Unlock ()
switch reason {
case RejectMissingSignature :
fallthrough
case RejectInvalidSignature :
fallthrough
case RejectUnexpectedSignature :
fallthrough
case RejectUnexpectedAuthInfo :
fallthrough
case RejectSelfOrigin :
ps .markInvalidMessageDelivery (msg .ReceivedFrom , msg )
return
case RejectBlacklstedPeer :
fallthrough
case RejectBlacklistedSource :
return
case RejectValidationQueueFull :
return
}
drec := ps .deliveries .getRecord (ps .idGen .ID (msg ))
if drec .status != deliveryUnknown {
log .Debugf ("unexpected rejection trace: message from %s was first seen %s ago and has delivery status %d" , msg .ReceivedFrom , time .Since (drec .firstSeen ), drec .status )
return
}
switch reason {
case RejectValidationThrottled :
drec .status = deliveryThrottled
drec .peers = nil
return
case RejectValidationIgnored :
drec .status = deliveryIgnored
drec .peers = nil
return
}
drec .status = deliveryInvalid
ps .markInvalidMessageDelivery (msg .ReceivedFrom , msg )
for p := range drec .peers {
ps .markInvalidMessageDelivery (p , msg )
}
drec .peers = nil
}
func (ps *peerScore ) DuplicateMessage (msg *Message ) {
ps .Lock ()
defer ps .Unlock ()
drec := ps .deliveries .getRecord (ps .idGen .ID (msg ))
_ , ok := drec .peers [msg .ReceivedFrom ]
if ok {
return
}
switch drec .status {
case deliveryUnknown :
drec .peers [msg .ReceivedFrom ] = struct {}{}
case deliveryValid :
drec .peers [msg .ReceivedFrom ] = struct {}{}
ps .markDuplicateMessageDelivery (msg .ReceivedFrom , msg , drec .validated )
case deliveryInvalid :
ps .markInvalidMessageDelivery (msg .ReceivedFrom , msg )
case deliveryThrottled :
case deliveryIgnored :
}
}
func (ps *peerScore ) ThrottlePeer (p peer .ID ) {}
func (ps *peerScore ) RecvRPC (rpc *RPC ) {}
func (ps *peerScore ) SendRPC (rpc *RPC , p peer .ID ) {}
func (ps *peerScore ) DropRPC (rpc *RPC , p peer .ID ) {}
func (ps *peerScore ) UndeliverableMessage (msg *Message ) {}
func (d *messageDeliveries ) getRecord (id string ) *deliveryRecord {
rec , ok := d .records [id ]
if ok {
return rec
}
now := time .Now ()
rec = &deliveryRecord {peers : make (map [peer .ID ]struct {}), firstSeen : now }
d .records [id ] = rec
entry := &deliveryEntry {id : id , expire : now .Add (d .seenMsgTTL )}
if d .tail != nil {
d .tail .next = entry
d .tail = entry
} else {
d .head = entry
d .tail = entry
}
return rec
}
func (d *messageDeliveries ) gc () {
if d .head == nil {
return
}
now := time .Now ()
for d .head != nil && now .After (d .head .expire ) {
delete (d .records , d .head .id )
d .head = d .head .next
}
if d .head == nil {
d .tail = nil
}
}
func (pstats *peerStats ) getTopicStats (topic string , params *PeerScoreParams ) (*topicStats , bool ) {
tstats , ok := pstats .topics [topic ]
if ok {
return tstats , true
}
_ , scoredTopic := params .Topics [topic ]
if !scoredTopic {
return nil , false
}
tstats = &topicStats {}
pstats .topics [topic ] = tstats
return tstats , true
}
func (ps *peerScore ) markInvalidMessageDelivery (p peer .ID , msg *Message ) {
pstats , ok := ps .peerStats [p ]
if !ok {
return
}
topic := msg .GetTopic ()
tstats , ok := pstats .getTopicStats (topic , ps .params )
if !ok {
return
}
tstats .invalidMessageDeliveries += 1
}
func (ps *peerScore ) markFirstMessageDelivery (p peer .ID , msg *Message ) {
pstats , ok := ps .peerStats [p ]
if !ok {
return
}
topic := msg .GetTopic ()
tstats , ok := pstats .getTopicStats (topic , ps .params )
if !ok {
return
}
cap := ps .params .Topics [topic ].FirstMessageDeliveriesCap
tstats .firstMessageDeliveries += 1
if tstats .firstMessageDeliveries > cap {
tstats .firstMessageDeliveries = cap
}
if !tstats .inMesh {
return
}
cap = ps .params .Topics [topic ].MeshMessageDeliveriesCap
tstats .meshMessageDeliveries += 1
if tstats .meshMessageDeliveries > cap {
tstats .meshMessageDeliveries = cap
}
}
func (ps *peerScore ) markDuplicateMessageDelivery (p peer .ID , msg *Message , validated time .Time ) {
pstats , ok := ps .peerStats [p ]
if !ok {
return
}
topic := msg .GetTopic ()
tstats , ok := pstats .getTopicStats (topic , ps .params )
if !ok {
return
}
if !tstats .inMesh {
return
}
tparams := ps .params .Topics [topic ]
if !validated .IsZero () && time .Since (validated ) > tparams .MeshMessageDeliveriesWindow {
return
}
cap := tparams .MeshMessageDeliveriesCap
tstats .meshMessageDeliveries += 1
if tstats .meshMessageDeliveries > cap {
tstats .meshMessageDeliveries = cap
}
}
func (ps *peerScore ) getIPs (p peer .ID ) []string {
if ps .host == nil {
return nil
}
conns := ps .host .Network ().ConnsToPeer (p )
res := make ([]string , 0 , 1 )
for _ , c := range conns {
if c .Stat ().Limited {
continue
}
remote := c .RemoteMultiaddr ()
ip , err := manet .ToIP (remote )
if err != nil {
continue
}
if ip .IsLoopback () {
continue
}
if len (ip .To4 ()) == 4 {
ip4 := ip .String ()
res = append (res , ip4 )
} else {
ip6 := ip .String ()
res = append (res , ip6 )
ip6mask := ip .Mask (net .CIDRMask (64 , 128 )).String ()
res = append (res , ip6mask )
}
}
return res
}
func (ps *peerScore ) setIPs (p peer .ID , newips , oldips []string ) {
addNewIPs :
for _ , ip := range newips {
for _ , xip := range oldips {
if ip == xip {
continue addNewIPs
}
}
peers , ok := ps .peerIPs [ip ]
if !ok {
peers = make (map [peer .ID ]struct {})
ps .peerIPs [ip ] = peers
}
peers [p ] = struct {}{}
}
removeOldIPs :
for _ , ip := range oldips {
for _ , xip := range newips {
if ip == xip {
continue removeOldIPs
}
}
peers , ok := ps .peerIPs [ip ]
if !ok {
continue
}
delete (peers , p )
if len (peers ) == 0 {
delete (ps .peerIPs , ip )
}
}
}
func (ps *peerScore ) removeIPs (p peer .ID , ips []string ) {
for _ , ip := range ips {
peers , ok := ps .peerIPs [ip ]
if !ok {
continue
}
delete (peers , p )
if len (peers ) == 0 {
delete (ps .peerIPs , ip )
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .