package pubsub
import (
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-pubsub/timecache"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
logging "github.com/ipfs/go-log/v2"
)
const DefaultMaxMessageSize = 1 << 20
var (
TimeCacheDuration = 120 * time .Second
TimeCacheStrategy = timecache .Strategy_FirstSeen
ErrSubscriptionCancelled = errors .New ("subscription cancelled" )
)
var log = logging .Logger ("pubsub" )
type ProtocolMatchFn = func (protocol .ID ) func (protocol .ID ) bool
type PubSub struct {
counter uint64
host host .Host
rt PubSubRouter
val *validation
disc *discover
tracer *pubsubTracer
peerFilter PeerFilter
maxMessageSize int
peerOutboundQueueSize int
incoming chan *RPC
addSub chan *addSubReq
addRelay chan *addRelayReq
rmRelay chan string
getTopics chan *topicReq
getPeers chan *listPeerReq
cancelCh chan *Subscription
addTopic chan *addTopicReq
rmTopic chan *rmTopicReq
newPeers chan struct {}
newPeersPrioLk sync .RWMutex
newPeersMx sync .Mutex
newPeersPend map [peer .ID ]struct {}
newPeerStream chan network .Stream
newPeerError chan peer .ID
peerDead chan struct {}
peerDeadPrioLk sync .RWMutex
peerDeadMx sync .Mutex
peerDeadPend map [peer .ID ]struct {}
deadPeerBackoff *backoff
mySubs map [string ]map [*Subscription ]struct {}
myRelays map [string ]int
myTopics map [string ]*Topic
topics map [string ]map [peer .ID ]struct {}
sendMsg chan *Message
addVal chan *addValReq
rmVal chan *rmValReq
eval chan func ()
blacklist Blacklist
blacklistPeer chan peer .ID
peers map [peer .ID ]*rpcQueue
inboundStreamsMx sync .Mutex
inboundStreams map [peer .ID ]network .Stream
seenMessages timecache .TimeCache
seenMsgTTL time .Duration
seenMsgStrategy timecache .Strategy
idGen *msgIDGenerator
signKey crypto .PrivKey
signID peer .ID
signPolicy MessageSignaturePolicy
subFilter SubscriptionFilter
protoMatchFunc ProtocolMatchFn
ctx context .Context
appSpecificRpcInspector func (peer .ID , *RPC ) error
}
type PubSubRouter interface {
Protocols () []protocol .ID
Attach (*PubSub )
AddPeer (peer .ID , protocol .ID )
RemovePeer (peer .ID )
EnoughPeers (topic string , suggested int ) bool
AcceptFrom (peer .ID ) AcceptStatus
PreValidation ([]*Message )
HandleRPC (*RPC )
Publish (*Message )
Join (topic string )
Leave (topic string )
}
type AcceptStatus int
const (
AcceptNone AcceptStatus = iota
AcceptControl
AcceptAll
)
type Message struct {
*pb .Message
ID string
ReceivedFrom peer .ID
ValidatorData interface {}
Local bool
}
func (m *Message ) GetFrom () peer .ID {
return peer .ID (m .Message .GetFrom ())
}
type RPC struct {
pb .RPC
from peer .ID
}
type Option func (*PubSub ) error
func NewPubSub (ctx context .Context , h host .Host , rt PubSubRouter , opts ...Option ) (*PubSub , error ) {
ps := &PubSub {
host : h ,
ctx : ctx ,
rt : rt ,
val : newValidation (),
peerFilter : DefaultPeerFilter ,
disc : &discover {},
maxMessageSize : DefaultMaxMessageSize ,
peerOutboundQueueSize : 32 ,
signID : h .ID (),
signKey : nil ,
signPolicy : StrictSign ,
incoming : make (chan *RPC , 32 ),
newPeers : make (chan struct {}, 1 ),
newPeersPend : make (map [peer .ID ]struct {}),
newPeerStream : make (chan network .Stream ),
newPeerError : make (chan peer .ID ),
peerDead : make (chan struct {}, 1 ),
peerDeadPend : make (map [peer .ID ]struct {}),
deadPeerBackoff : newBackoff (ctx , 1000 , BackoffCleanupInterval , MaxBackoffAttempts ),
cancelCh : make (chan *Subscription ),
getPeers : make (chan *listPeerReq ),
addSub : make (chan *addSubReq ),
addRelay : make (chan *addRelayReq ),
rmRelay : make (chan string ),
addTopic : make (chan *addTopicReq ),
rmTopic : make (chan *rmTopicReq ),
getTopics : make (chan *topicReq ),
sendMsg : make (chan *Message , 32 ),
addVal : make (chan *addValReq ),
rmVal : make (chan *rmValReq ),
eval : make (chan func ()),
myTopics : make (map [string ]*Topic ),
mySubs : make (map [string ]map [*Subscription ]struct {}),
myRelays : make (map [string ]int ),
topics : make (map [string ]map [peer .ID ]struct {}),
peers : make (map [peer .ID ]*rpcQueue ),
inboundStreams : make (map [peer .ID ]network .Stream ),
blacklist : NewMapBlacklist (),
blacklistPeer : make (chan peer .ID ),
seenMsgTTL : TimeCacheDuration ,
seenMsgStrategy : TimeCacheStrategy ,
idGen : newMsgIdGenerator (),
counter : uint64 (time .Now ().UnixNano ()),
}
for _ , opt := range opts {
err := opt (ps )
if err != nil {
return nil , err
}
}
if ps .signPolicy .mustSign () {
if ps .signID == "" {
return nil , fmt .Errorf ("strict signature usage enabled but message author was disabled" )
}
ps .signKey = ps .host .Peerstore ().PrivKey (ps .signID )
if ps .signKey == nil {
return nil , fmt .Errorf ("can't sign for peer %s: no private key" , ps .signID )
}
}
ps .seenMessages = timecache .NewTimeCacheWithStrategy (ps .seenMsgStrategy , ps .seenMsgTTL )
if err := ps .disc .Start (ps ); err != nil {
return nil , err
}
rt .Attach (ps )
for _ , id := range rt .Protocols () {
if ps .protoMatchFunc != nil {
h .SetStreamHandlerMatch (id , ps .protoMatchFunc (id ), ps .handleNewStream )
} else {
h .SetStreamHandler (id , ps .handleNewStream )
}
}
go ps .watchForNewPeers (ctx )
ps .val .Start (ps )
go ps .processLoop (ctx )
return ps , nil
}
type MsgIdFunction func (pmsg *pb .Message ) string
func WithMessageIdFn (fn MsgIdFunction ) Option {
return func (p *PubSub ) error {
p .idGen .Default = fn
return nil
}
}
type PeerFilter func (pid peer .ID , topic string ) bool
func WithPeerFilter (filter PeerFilter ) Option {
return func (p *PubSub ) error {
p .peerFilter = filter
return nil
}
}
func WithPeerOutboundQueueSize (size int ) Option {
return func (p *PubSub ) error {
if size <= 0 {
return errors .New ("outbound queue size must always be positive" )
}
p .peerOutboundQueueSize = size
return nil
}
}
func WithMessageSignaturePolicy (policy MessageSignaturePolicy ) Option {
return func (p *PubSub ) error {
p .signPolicy = policy
return nil
}
}
func WithMessageSigning (enabled bool ) Option {
return func (p *PubSub ) error {
if enabled {
p .signPolicy |= msgSigning
} else {
p .signPolicy &^= msgSigning
}
return nil
}
}
func WithMessageAuthor (author peer .ID ) Option {
return func (p *PubSub ) error {
author := author
if author == "" {
author = p .host .ID ()
}
p .signID = author
return nil
}
}
func WithNoAuthor () Option {
return func (p *PubSub ) error {
p .signID = ""
p .signPolicy &^= msgSigning
return nil
}
}
func WithStrictSignatureVerification (required bool ) Option {
return func (p *PubSub ) error {
if required {
p .signPolicy |= msgVerification
} else {
p .signPolicy &^= msgVerification
}
return nil
}
}
func WithBlacklist (b Blacklist ) Option {
return func (p *PubSub ) error {
p .blacklist = b
return nil
}
}
func WithDiscovery (d discovery .Discovery , opts ...DiscoverOpt ) Option {
return func (p *PubSub ) error {
discoverOpts := defaultDiscoverOptions ()
for _ , opt := range opts {
err := opt (discoverOpts )
if err != nil {
return err
}
}
p .disc .discovery = &pubSubDiscovery {Discovery : d , opts : discoverOpts .opts }
p .disc .options = discoverOpts
return nil
}
}
func WithEventTracer (tracer EventTracer ) Option {
return func (p *PubSub ) error {
if p .tracer != nil {
p .tracer .tracer = tracer
} else {
p .tracer = &pubsubTracer {tracer : tracer , pid : p .host .ID (), idGen : p .idGen }
}
return nil
}
}
func WithRawTracer (tracer RawTracer ) Option {
return func (p *PubSub ) error {
if p .tracer != nil {
p .tracer .raw = append (p .tracer .raw , tracer )
} else {
p .tracer = &pubsubTracer {raw : []RawTracer {tracer }, pid : p .host .ID (), idGen : p .idGen }
}
return nil
}
}
func WithMaxMessageSize (maxMessageSize int ) Option {
return func (ps *PubSub ) error {
ps .maxMessageSize = maxMessageSize
return nil
}
}
func WithProtocolMatchFn (m ProtocolMatchFn ) Option {
return func (ps *PubSub ) error {
ps .protoMatchFunc = m
return nil
}
}
func WithSeenMessagesTTL (ttl time .Duration ) Option {
return func (ps *PubSub ) error {
ps .seenMsgTTL = ttl
return nil
}
}
func WithSeenMessagesStrategy (strategy timecache .Strategy ) Option {
return func (ps *PubSub ) error {
ps .seenMsgStrategy = strategy
return nil
}
}
func WithAppSpecificRpcInspector (inspector func (peer .ID , *RPC ) error ) Option {
return func (ps *PubSub ) error {
ps .appSpecificRpcInspector = inspector
return nil
}
}
func (p *PubSub ) processLoop (ctx context .Context ) {
defer func () {
for _ , queue := range p .peers {
queue .Close ()
}
p .peers = nil
p .topics = nil
p .seenMessages .Done ()
}()
for {
select {
case <- p .newPeers :
p .handlePendingPeers ()
case s := <- p .newPeerStream :
pid := s .Conn ().RemotePeer ()
q , ok := p .peers [pid ]
if !ok {
log .Warn ("new stream for unknown peer: " , pid )
s .Reset ()
continue
}
if p .blacklist .Contains (pid ) {
log .Warn ("closing stream for blacklisted peer: " , pid )
q .Close ()
delete (p .peers , pid )
s .Reset ()
continue
}
p .rt .AddPeer (pid , s .Protocol ())
case pid := <- p .newPeerError :
delete (p .peers , pid )
case <- p .peerDead :
p .handleDeadPeers ()
case treq := <- p .getTopics :
var out []string
for t := range p .mySubs {
out = append (out , t )
}
treq .resp <- out
case topic := <- p .addTopic :
p .handleAddTopic (topic )
case topic := <- p .rmTopic :
p .handleRemoveTopic (topic )
case sub := <- p .cancelCh :
p .handleRemoveSubscription (sub )
case sub := <- p .addSub :
p .handleAddSubscription (sub )
case relay := <- p .addRelay :
p .handleAddRelay (relay )
case topic := <- p .rmRelay :
p .handleRemoveRelay (topic )
case preq := <- p .getPeers :
tmap , ok := p .topics [preq .topic ]
if preq .topic != "" && !ok {
preq .resp <- nil
continue
}
var peers []peer .ID
for p := range p .peers {
if preq .topic != "" {
_ , ok := tmap [p ]
if !ok {
continue
}
}
peers = append (peers , p )
}
preq .resp <- peers
case rpc := <- p .incoming :
p .handleIncomingRPC (rpc )
case msg := <- p .sendMsg :
p .publishMessage (msg )
case req := <- p .addVal :
p .val .AddValidator (req )
case req := <- p .rmVal :
p .val .RemoveValidator (req )
case thunk := <- p .eval :
thunk ()
case pid := <- p .blacklistPeer :
log .Infof ("Blacklisting peer %s" , pid )
p .blacklist .Add (pid )
q , ok := p .peers [pid ]
if ok {
q .Close ()
delete (p .peers , pid )
for t , tmap := range p .topics {
if _ , ok := tmap [pid ]; ok {
delete (tmap , pid )
p .notifyLeave (t , pid )
}
}
p .rt .RemovePeer (pid )
}
case <- ctx .Done ():
log .Info ("pubsub processloop shutting down" )
return
}
}
}
func (p *PubSub ) handlePendingPeers () {
p .newPeersPrioLk .Lock ()
if len (p .newPeersPend ) == 0 {
p .newPeersPrioLk .Unlock ()
return
}
newPeers := p .newPeersPend
p .newPeersPend = make (map [peer .ID ]struct {})
p .newPeersPrioLk .Unlock ()
for pid := range newPeers {
if p .host .Network ().Connectedness (pid ) != network .Connected {
continue
}
if _ , ok := p .peers [pid ]; ok {
log .Debug ("already have connection to peer: " , pid )
continue
}
if p .blacklist .Contains (pid ) {
log .Warn ("ignoring connection from blacklisted peer: " , pid )
continue
}
rpcQueue := newRpcQueue (p .peerOutboundQueueSize )
rpcQueue .Push (p .getHelloPacket (), true )
go p .handleNewPeer (p .ctx , pid , rpcQueue )
p .peers [pid ] = rpcQueue
}
}
func (p *PubSub ) handleDeadPeers () {
p .peerDeadPrioLk .Lock ()
if len (p .peerDeadPend ) == 0 {
p .peerDeadPrioLk .Unlock ()
return
}
deadPeers := p .peerDeadPend
p .peerDeadPend = make (map [peer .ID ]struct {})
p .peerDeadPrioLk .Unlock ()
for pid := range deadPeers {
q , ok := p .peers [pid ]
if !ok {
continue
}
q .Close ()
delete (p .peers , pid )
for t , tmap := range p .topics {
if _ , ok := tmap [pid ]; ok {
delete (tmap , pid )
p .notifyLeave (t , pid )
}
}
p .rt .RemovePeer (pid )
if p .host .Network ().Connectedness (pid ) == network .Connected {
backoffDelay , err := p .deadPeerBackoff .updateAndGet (pid )
if err != nil {
log .Debug (err )
continue
}
log .Debugf ("peer declared dead but still connected; respawning writer: %s" , pid )
rpcQueue := newRpcQueue (p .peerOutboundQueueSize )
rpcQueue .Push (p .getHelloPacket (), true )
p .peers [pid ] = rpcQueue
go p .handleNewPeerWithBackoff (p .ctx , pid , backoffDelay , rpcQueue )
}
}
}
func (p *PubSub ) handleAddTopic (req *addTopicReq ) {
topic := req .topic
topicID := topic .topic
t , ok := p .myTopics [topicID ]
if ok {
req .resp <- t
return
}
p .myTopics [topicID ] = topic
req .resp <- topic
}
func (p *PubSub ) handleRemoveTopic (req *rmTopicReq ) {
topic := p .myTopics [req .topic .topic ]
if topic == nil {
req .resp <- nil
return
}
if len (topic .evtHandlers ) == 0 &&
len (p .mySubs [req .topic .topic ]) == 0 &&
p .myRelays [req .topic .topic ] == 0 {
delete (p .myTopics , topic .topic )
req .resp <- nil
return
}
req .resp <- fmt .Errorf ("cannot close topic: outstanding event handlers or subscriptions" )
}
func (p *PubSub ) handleRemoveSubscription (sub *Subscription ) {
subs := p .mySubs [sub .topic ]
if subs == nil {
return
}
sub .err = ErrSubscriptionCancelled
sub .close ()
delete (subs , sub )
if len (subs ) == 0 {
delete (p .mySubs , sub .topic )
if p .myRelays [sub .topic ] == 0 {
p .disc .StopAdvertise (sub .topic )
p .announce (sub .topic , false )
p .rt .Leave (sub .topic )
}
}
}
func (p *PubSub ) handleAddSubscription (req *addSubReq ) {
sub := req .sub
subs := p .mySubs [sub .topic ]
if len (subs ) == 0 && p .myRelays [sub .topic ] == 0 {
p .disc .Advertise (sub .topic )
p .announce (sub .topic , true )
p .rt .Join (sub .topic )
}
if subs == nil {
p .mySubs [sub .topic ] = make (map [*Subscription ]struct {})
}
sub .cancelCh = p .cancelCh
p .mySubs [sub .topic ][sub ] = struct {}{}
req .resp <- sub
}
func (p *PubSub ) handleAddRelay (req *addRelayReq ) {
topic := req .topic
p .myRelays [topic ]++
if p .myRelays [topic ] == 1 && len (p .mySubs [topic ]) == 0 {
p .disc .Advertise (topic )
p .announce (topic , true )
p .rt .Join (topic )
}
isCancelled := false
relayCancelFunc := func () {
if isCancelled {
return
}
select {
case p .rmRelay <- topic :
isCancelled = true
case <- p .ctx .Done ():
}
}
req .resp <- relayCancelFunc
}
func (p *PubSub ) handleRemoveRelay (topic string ) {
if p .myRelays [topic ] == 0 {
return
}
p .myRelays [topic ]--
if p .myRelays [topic ] == 0 {
delete (p .myRelays , topic )
if len (p .mySubs [topic ]) == 0 {
p .disc .StopAdvertise (topic )
p .announce (topic , false )
p .rt .Leave (topic )
}
}
}
func (p *PubSub ) announce (topic string , sub bool ) {
subopt := &pb .RPC_SubOpts {
Topicid : &topic ,
Subscribe : &sub ,
}
out := rpcWithSubs (subopt )
for pid , peer := range p .peers {
err := peer .Push (out , false )
if err != nil {
log .Infof ("Can't send announce message to peer %s: queue full; scheduling retry" , pid )
p .tracer .DropRPC (out , pid )
go p .announceRetry (pid , topic , sub )
continue
}
p .tracer .SendRPC (out , pid )
}
}
func (p *PubSub ) announceRetry (pid peer .ID , topic string , sub bool ) {
time .Sleep (time .Duration (1 +rand .Intn (1000 )) * time .Millisecond )
retry := func () {
_ , okSubs := p .mySubs [topic ]
_ , okRelays := p .myRelays [topic ]
ok := okSubs || okRelays
if (ok && sub ) || (!ok && !sub ) {
p .doAnnounceRetry (pid , topic , sub )
}
}
select {
case p .eval <- retry :
case <- p .ctx .Done ():
}
}
func (p *PubSub ) doAnnounceRetry (pid peer .ID , topic string , sub bool ) {
peer , ok := p .peers [pid ]
if !ok {
return
}
subopt := &pb .RPC_SubOpts {
Topicid : &topic ,
Subscribe : &sub ,
}
out := rpcWithSubs (subopt )
err := peer .Push (out , false )
if err != nil {
log .Infof ("Can't send announce message to peer %s: queue full; scheduling retry" , pid )
p .tracer .DropRPC (out , pid )
go p .announceRetry (pid , topic , sub )
return
}
p .tracer .SendRPC (out , pid )
}
func (p *PubSub ) notifySubs (msg *Message ) {
topic := msg .GetTopic ()
subs := p .mySubs [topic ]
for f := range subs {
select {
case f .ch <- msg :
default :
p .tracer .UndeliverableMessage (msg )
log .Infof ("Can't deliver message to subscription for topic %s; subscriber too slow" , topic )
}
}
}
func (p *PubSub ) seenMessage (id string ) bool {
return p .seenMessages .Has (id )
}
func (p *PubSub ) markSeen (id string ) bool {
return p .seenMessages .Add (id )
}
func (p *PubSub ) subscribedToMsg (msg *pb .Message ) bool {
if len (p .mySubs ) == 0 {
return false
}
topic := msg .GetTopic ()
_ , ok := p .mySubs [topic ]
return ok
}
func (p *PubSub ) canRelayMsg (msg *pb .Message ) bool {
if len (p .myRelays ) == 0 {
return false
}
topic := msg .GetTopic ()
relays := p .myRelays [topic ]
return relays > 0
}
func (p *PubSub ) notifyLeave (topic string , pid peer .ID ) {
if t , ok := p .myTopics [topic ]; ok {
t .sendNotification (PeerEvent {PeerLeave , pid })
}
}
func (p *PubSub ) handleIncomingRPC (rpc *RPC ) {
if p .appSpecificRpcInspector != nil {
if err := p .appSpecificRpcInspector (rpc .from , rpc ); err != nil {
log .Debugf ("application-specific inspection failed, rejecting incoming rpc: %s" , err )
return
}
}
p .tracer .RecvRPC (rpc )
subs := rpc .GetSubscriptions ()
if len (subs ) != 0 && p .subFilter != nil {
var err error
subs , err = p .subFilter .FilterIncomingSubscriptions (rpc .from , subs )
if err != nil {
log .Debugf ("subscription filter error: %s; ignoring RPC" , err )
return
}
}
for _ , subopt := range subs {
t := subopt .GetTopicid ()
if subopt .GetSubscribe () {
tmap , ok := p .topics [t ]
if !ok {
tmap = make (map [peer .ID ]struct {})
p .topics [t ] = tmap
}
if _, ok = tmap [rpc .from ]; !ok {
tmap [rpc .from ] = struct {}{}
if topic , ok := p .myTopics [t ]; ok {
peer := rpc .from
topic .sendNotification (PeerEvent {PeerJoin , peer })
}
}
} else {
tmap , ok := p .topics [t ]
if !ok {
continue
}
if _ , ok := tmap [rpc .from ]; ok {
delete (tmap , rpc .from )
p .notifyLeave (t , rpc .from )
}
}
}
switch p .rt .AcceptFrom (rpc .from ) {
case AcceptNone :
log .Debugf ("received RPC from router graylisted peer %s; dropping RPC" , rpc .from )
return
case AcceptControl :
if len (rpc .GetPublish ()) > 0 {
log .Debugf ("peer %s was throttled by router; ignoring %d payload messages" , rpc .from , len (rpc .GetPublish ()))
}
p .tracer .ThrottlePeer (rpc .from )
case AcceptAll :
var toPush []*Message
for _ , pmsg := range rpc .GetPublish () {
if !(p .subscribedToMsg (pmsg ) || p .canRelayMsg (pmsg )) {
log .Debug ("received message in topic we didn't subscribe to; ignoring message" )
continue
}
msg := &Message {pmsg , "" , rpc .from , nil , false }
if p .shouldPush (msg ) {
toPush = append (toPush , msg )
}
}
p .rt .PreValidation (toPush )
for _ , msg := range toPush {
p .pushMsg (msg )
}
}
p .rt .HandleRPC (rpc )
}
func DefaultMsgIdFn (pmsg *pb .Message ) string {
return string (pmsg .GetFrom ()) + string (pmsg .GetSeqno ())
}
func DefaultPeerFilter (pid peer .ID , topic string ) bool {
return true
}
func (p *PubSub ) shouldPush (msg *Message ) bool {
src := msg .ReceivedFrom
if p .blacklist .Contains (src ) {
log .Debugf ("dropping message from blacklisted peer %s" , src )
p .tracer .RejectMessage (msg , RejectBlacklstedPeer )
return false
}
if p .blacklist .Contains (msg .GetFrom ()) {
log .Debugf ("dropping message from blacklisted source %s" , src )
p .tracer .RejectMessage (msg , RejectBlacklistedSource )
return false
}
err := p .checkSigningPolicy (msg )
if err != nil {
log .Debugf ("dropping message from %s: %s" , src , err )
return false
}
self := p .host .ID ()
if peer .ID (msg .GetFrom ()) == self && src != self {
log .Debugf ("dropping message claiming to be from self but forwarded from %s" , src )
p .tracer .RejectMessage (msg , RejectSelfOrigin )
return false
}
id := p .idGen .ID (msg )
if p .seenMessage (id ) {
p .tracer .DuplicateMessage (msg )
return false
}
return true
}
func (p *PubSub ) pushMsg (msg *Message ) {
src := msg .ReceivedFrom
id := p .idGen .ID (msg )
if !p .val .Push (src , msg ) {
return
}
if p .markSeen (id ) {
p .publishMessage (msg )
}
}
func (p *PubSub ) checkSigningPolicy (msg *Message ) error {
if p .signPolicy .mustVerify () {
if p .signPolicy .mustSign () {
if msg .Signature == nil {
p .tracer .RejectMessage (msg , RejectMissingSignature )
return ValidationError {Reason : RejectMissingSignature }
}
} else {
if msg .Signature != nil {
p .tracer .RejectMessage (msg , RejectUnexpectedSignature )
return ValidationError {Reason : RejectUnexpectedSignature }
}
if p .signID == "" {
if msg .Seqno != nil || msg .From != nil || msg .Key != nil {
p .tracer .RejectMessage (msg , RejectUnexpectedAuthInfo )
return ValidationError {Reason : RejectUnexpectedAuthInfo }
}
}
}
}
return nil
}
func (p *PubSub ) publishMessage (msg *Message ) {
p .tracer .DeliverMessage (msg )
p .notifySubs (msg )
if !msg .Local {
p .rt .Publish (msg )
}
}
type addTopicReq struct {
topic *Topic
resp chan *Topic
}
type rmTopicReq struct {
topic *Topic
resp chan error
}
type TopicOptions struct {}
type TopicOpt func (t *Topic ) error
func WithTopicMessageIdFn (msgId MsgIdFunction ) TopicOpt {
return func (t *Topic ) error {
t .p .idGen .Set (t .topic , msgId )
return nil
}
}
func (p *PubSub ) Join (topic string , opts ...TopicOpt ) (*Topic , error ) {
t , ok , err := p .tryJoin (topic , opts ...)
if err != nil {
return nil , err
}
if !ok {
return nil , fmt .Errorf ("topic already exists" )
}
return t , nil
}
func (p *PubSub ) tryJoin (topic string , opts ...TopicOpt ) (*Topic , bool , error ) {
if p .subFilter != nil && !p .subFilter .CanSubscribe (topic ) {
return nil , false , fmt .Errorf ("topic is not allowed by the subscription filter" )
}
t := &Topic {
p : p ,
topic : topic ,
evtHandlers : make (map [*TopicEventHandler ]struct {}),
}
for _ , opt := range opts {
err := opt (t )
if err != nil {
return nil , false , err
}
}
resp := make (chan *Topic , 1 )
select {
case t .p .addTopic <- &addTopicReq {
topic : t ,
resp : resp ,
}:
case <- t .p .ctx .Done ():
return nil , false , t .p .ctx .Err ()
}
returnedTopic := <-resp
if returnedTopic != t {
return returnedTopic , false , nil
}
return t , true , nil
}
type addSubReq struct {
sub *Subscription
resp chan *Subscription
}
type SubOpt func (sub *Subscription ) error
func (p *PubSub ) Subscribe (topic string , opts ...SubOpt ) (*Subscription , error ) {
topicHandle , _ , err := p .tryJoin (topic )
if err != nil {
return nil , err
}
return topicHandle .Subscribe (opts ...)
}
func WithBufferSize (size int ) SubOpt {
return func (sub *Subscription ) error {
sub .ch = make (chan *Message , size )
return nil
}
}
type topicReq struct {
resp chan []string
}
func (p *PubSub ) GetTopics () []string {
out := make (chan []string , 1 )
select {
case p .getTopics <- &topicReq {resp : out }:
case <- p .ctx .Done ():
return nil
}
return <-out
}
func (p *PubSub ) Publish (topic string , data []byte , opts ...PubOpt ) error {
t , _ , err := p .tryJoin (topic )
if err != nil {
return err
}
return t .Publish (context .TODO (), data , opts ...)
}
func (p *PubSub ) nextSeqno () []byte {
seqno := make ([]byte , 8 )
counter := atomic .AddUint64 (&p .counter , 1 )
binary .BigEndian .PutUint64 (seqno , counter )
return seqno
}
type listPeerReq struct {
resp chan []peer .ID
topic string
}
func (p *PubSub ) ListPeers (topic string ) []peer .ID {
out := make (chan []peer .ID )
select {
case p .getPeers <- &listPeerReq {
resp : out ,
topic : topic ,
}:
case <- p .ctx .Done ():
return nil
}
return <-out
}
func (p *PubSub ) BlacklistPeer (pid peer .ID ) {
select {
case p .blacklistPeer <- pid :
case <- p .ctx .Done ():
}
}
func (p *PubSub ) RegisterTopicValidator (topic string , val interface {}, opts ...ValidatorOpt ) error {
addVal := &addValReq {
topic : topic ,
validate : val ,
resp : make (chan error , 1 ),
}
for _ , opt := range opts {
err := opt (addVal )
if err != nil {
return err
}
}
select {
case p .addVal <- addVal :
case <- p .ctx .Done ():
return p .ctx .Err ()
}
return <-addVal .resp
}
func (p *PubSub ) UnregisterTopicValidator (topic string ) error {
rmVal := &rmValReq {
topic : topic ,
resp : make (chan error , 1 ),
}
select {
case p .rmVal <- rmVal :
case <- p .ctx .Done ():
return p .ctx .Err ()
}
return <-rmVal .resp
}
type RelayCancelFunc func ()
type addRelayReq struct {
topic string
resp chan RelayCancelFunc
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .