package pubsub
Import Path
github.com/libp2p/go-libp2p-pubsub (on go.dev)
Dependency Relation
imports 40 packages, and imported by one package
Involved Source Files
backoff.go
blacklist.go
comm.go
discovery.go
The pubsub package provides facilities for the Publish/Subscribe pattern of message
propagation, also known as overlay multicast.
The implementation provides topic-based pubsub, with pluggable routing algorithms.
The main interface to the library is the PubSub object.
You can construct this object with the following constructors:
- NewFloodSub creates an instance that uses the floodsub routing algorithm.
- NewGossipSub creates an instance that uses the gossipsub routing algorithm.
- NewRandomSub creates an instance that uses the randomsub routing algorithm.
In addition, there is a generic constructor that creates a pubsub instance with
a custom PubSubRouter interface. This procedure is currently reserved for internal
use within the package.
Once you have constructed a PubSub instance, you need to establish some connections
to your peers; the implementation relies on ambient peer discovery, leaving bootstrap
and active peer discovery up to the client.
To publish a message to some topic, use Publish; you don't need to be subscribed
to the topic in order to publish.
To subscribe to a topic, use Subscribe; this will give you a subscription interface
from which new messages can be pumped.
floodsub.go
gossip_tracer.go
gossipsub.go
gossipsub_feat.go
mcache.go
midgen.go
peer_gater.go
peer_notify.go
pubsub.go
randomsub.go
rpc_queue.go
score.go
score_params.go
sign.go
subscription.go
subscription_filter.go
tag_tracer.go
topic.go
trace.go
tracer.go
validation.go
validation_builtin.go
Package-Level Type Names (total 58)
func (*FloodSubRouter).AcceptFrom(peer.ID) AcceptStatus
func (*GossipSubRouter).AcceptFrom(p peer.ID) AcceptStatus
func PubSubRouter.AcceptFrom(peer.ID) AcceptStatus
func (*RandomSubRouter).AcceptFrom(peer.ID) AcceptStatus
const AcceptAll
const AcceptControl
const AcceptNone
BackoffConnectorFactory creates a BackoffConnector that is attached to a given host
func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt
BasicSeqnoValidator is a basic validator, usable as a default validator, that ignores replayed
messages outside the seen cache window. The validator uses the message seqno as a peer-specific
nonce to decide whether the message should be propagated, comparing to the maximal nonce store
in the peer metadata store. This is useful to ensure that there can be no infinitely propagating
messages in the network regardless of the seen cache span and network diameter.
It requires that pubsub is instantiated with a strict message signing policy and that seqnos
are not disabled, ie it doesn't support anonymous mode.
Warning: See https://github.com/libp2p/rust-libp2p/issues/3453
TL;DR: rust is currently violating the spec by issuing a random seqno, which creates an
interoperability hazard. We expect this issue to be addressed in the not so distant future,
but keep this in mind if you are in a mixed environment with (older) rust nodes.
Blacklist is an interface for peer blacklisting.
( Blacklist) Add(peer.ID) bool
( Blacklist) Contains(peer.ID) bool
MapBlacklist
*TimeCachedBlacklist
func NewMapBlacklist() Blacklist
func NewTimeCachedBlacklist(expiry time.Duration) (Blacklist, error)
func WithBlacklist(b Blacklist) Option
type CacheEntry (struct)
func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt
func WithDiscoveryOpts(opts ...discovery.Option) DiscoverOpt
func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option
EventTracer is a generic event tracer interface.
This is a high level tracing interface which delivers tracing events, as defined by the protobuf
schema in pb/trace.proto.
( EventTracer) Trace(evt *pb.TraceEvent)
*JSONTracer
*PBTracer
*RemoteTracer
func WithEventTracer(tracer EventTracer) Option
type ExtendedPeerScoreInspectFn = (func)
(*FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus
(*FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID)
(*FloodSubRouter) Attach(p *PubSub)
(*FloodSubRouter) EnoughPeers(topic string, suggested int) bool
(*FloodSubRouter) HandleRPC(rpc *RPC)
(*FloodSubRouter) Join(topic string)
(*FloodSubRouter) Leave(topic string)
(*FloodSubRouter) PreValidation([]*Message)
(*FloodSubRouter) Protocols() []protocol.ID
(*FloodSubRouter) Publish(msg *Message)
(*FloodSubRouter) RemovePeer(p peer.ID)
*FloodSubRouter : PubSubRouter
GossipSubFeature is a feature discriminant enum
func GossipSubDefaultFeatures(feat GossipSubFeature, proto protocol.ID) bool
GossipSubFeatureTest is a feature test function; it takes a feature and a protocol ID and
should return true if the feature is supported by the protocol
GossipSubParams defines all the gossipsub specific parameters.
ConnectionTimeout controls the timeout for connection attempts.
Connectors controls the number of active connection attempts for peers obtained through PX.
D sets the optimal degree for a GossipSub topic mesh. For example, if D == 6,
each peer will want to have about six peers in their mesh for each topic they're subscribed to.
D should be set somewhere between Dlo and Dhi.
Dhi sets the upper bound on the number of peers we keep in a GossipSub topic mesh.
If we have more than Dhi peers, we will select some to prune from the mesh at the next heartbeat.
DirectConnectInitialDelay is the initial delay before opening connections to direct peers
DirectConnectTicks is the number of heartbeat ticks for attempting to reconnect direct peers
that are not currently connected.
Dlazy affects how many peers we will emit gossip to at each heartbeat.
We will send gossip to at least Dlazy peers outside our mesh. The actual
number may be more, depending on GossipFactor and how many peers we're
connected to.
Dlo sets the lower bound on the number of peers we keep in a GossipSub topic mesh.
If we have fewer than Dlo peers, we will attempt to graft some more into the mesh at
the next heartbeat.
Dout sets the quota for the number of outbound connections to maintain in a topic mesh.
When the mesh is pruned due to over subscription, we make sure that we have outbound connections
to at least Dout of the survivor peers. This prevents sybil attackers from overwhelming
our mesh with incoming connections.
Dout must be set below Dlo, and must not exceed D / 2.
Dscore affects how peers are selected when pruning a mesh due to over subscription.
At least Dscore of the retained peers will be high-scoring, while the remainder are
chosen randomly.
FanoutTTL controls how long we keep track of the fanout state. If it's been
FanoutTTL since we've published to a topic that we're not subscribed to,
we'll delete the fanout map for that topic.
GossipFactor affects how many peers we will emit gossip to at each heartbeat.
We will send gossip to GossipFactor * (total number of non-mesh peers), or
Dlazy, whichever is greater.
GossipRetransmission controls how many times we will allow a peer to request
the same message id through IWANT gossip before we start ignoring them. This is designed
to prevent peers from spamming us with requests and wasting our resources.
If a GRAFT comes before GraftFloodThreshold has elapsed since the last PRUNE,
then there is an extra score penalty applied to the peer through P7.
HeartbeatInitialDelay is the short delay before the heartbeat timer begins
after the router is initialized.
HeartbeatInterval controls the time between heartbeats.
HistoryGossip controls how many cached message ids we will advertise in
IHAVE gossip messages. When asked for our seen message IDs, we will return
only those from the most recent HistoryGossip heartbeats. The slack between
HistoryGossip and HistoryLength allows us to avoid advertising messages
that will be expired by the time they're requested.
HistoryGossip must be less than or equal to HistoryLength to
avoid a runtime panic.
HistoryLength controls the size of the message cache used for gossip.
The message cache will remember messages for HistoryLength heartbeats.
IDONTWANT is cleared when it's older than the TTL.
IDONTWANT is only sent for messages larger than the threshold. This should be greater than
D_high * the size of the message id. Otherwise, the attacker can do the amplication attack by sending
small messages while the receiver replies back with larger IDONTWANT messages.
Time to wait for a message requested through IWANT following an IHAVE advertisement.
If the message is not received within this window, a broken promise is declared and
the router may apply bahavioural penalties.
MaxIDontWantLength is the maximum number of messages to include in an IDONTWANT message. Also controls
the maximum number of IDONTWANT ids we will accept to protect against IDONTWANT floods. This value
should be adjusted if your system anticipates a larger amount than specified per heartbeat.
MaxIDontWantMessages is the maximum number of IDONTWANT messages to accept from a peer within a heartbeat.
MaxIHaveLength is the maximum number of messages to include in an IHAVE message.
Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
default if your system is pushing more than 5000 messages in HistoryGossip heartbeats;
with the defaults this is 1666 messages/s.
MaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat.
MaxPendingConnections sets the maximum number of pending connections for peers attempted through px.
OpportunisticGraftPeers is the number of peers to opportunistically graft.
OpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh
with opportunistic grafting. Every OpportunisticGraftTicks we will attempt to select some
high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls
below a threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds).
PruneBackoff controls the backoff time for pruned peers. This is how long
a peer must wait before attempting to graft into our mesh again after being pruned.
When pruning a peer, we send them our value of PruneBackoff so they know
the minimum time to wait. Peers running older versions may not send a backoff time,
so if we receive a prune message without one, we will wait at least PruneBackoff
before attempting to re-graft.
PrunePeers controls the number of peers to include in prune Peer eXchange.
When we prune a peer that's eligible for PX (has a good score, etc), we will try to
send them signed peer records for up to PrunePeers other peers that we
know of.
SlowHeartbeatWarning is the duration threshold for heartbeat processing before emitting
a warning; this would be indicative of an overloaded peer.
UnsubscribeBackoff controls the backoff time to use when unsuscribing
from a topic. A peer should not resubscribe to this topic before this
duration.
func DefaultGossipSubParams() GossipSubParams
func WithGossipSubParams(cfg GossipSubParams) Option
GossipSubRouter is a router that implements the gossipsub protocol.
For each topic we have joined, we maintain an overlay through which
messages flow; this is the mesh map.
For each topic we publish to without joining, we maintain a list of peers
to use for injecting our messages in the overlay with stable routes; this
is the fanout map. Fanout peer lists are expired if we don't publish any
messages to their topic for GossipSubFanoutTTL.
(*GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus
(*GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID)
(*GossipSubRouter) Attach(p *PubSub)
(*GossipSubRouter) EnoughPeers(topic string, suggested int) bool
(*GossipSubRouter) HandleRPC(rpc *RPC)
(*GossipSubRouter) Join(topic string)
(*GossipSubRouter) Leave(topic string)
PreValidation sends the IDONTWANT control messages to all the mesh
peers. They need to be sent right before the validation because they
should be seen by the peers as soon as possible.
(*GossipSubRouter) Protocols() []protocol.ID
(*GossipSubRouter) Publish(msg *Message)
(*GossipSubRouter) RemovePeer(p peer.ID)
SendControl dispatches the given set of control messages to the given peer.
The control messages are sent as a single RPC, with the given (optional) messages.
Args:
p: the peer to send the control messages to.
ctl: the control messages to send.
msgs: the messages to send in the same RPC (optional).
The control messages are piggybacked on the messages.
Returns:
nothing.
WithDefaultTagTracer returns the tag tracer of the GossipSubRouter as a PubSub option.
This is useful for cases where the GossipSubRouter is instantiated externally, and is
injected into the GossipSub constructor as a dependency. This allows the tag tracer to be
also injected into the GossipSub constructor as a PubSub option dependency.
*GossipSubRouter : PubSubRouter
func DefaultGossipSubRouter(h host.Host) *GossipSubRouter
JSONTracer is a tracer that writes events to a file, encoded in ndjson.
(*JSONTracer) Close()
(*JSONTracer) Trace(evt *pb.TraceEvent)
*JSONTracer : EventTracer
func NewJSONTracer(file string) (*JSONTracer, error)
func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error)
MapBlacklist is a blacklist implementation using a perfect map
( MapBlacklist) Add(p peer.ID) bool
( MapBlacklist) Contains(p peer.ID) bool
MapBlacklist : Blacklist
ID string
Local bool
Message *pb.Message
Message.Data []byte
Message.From []byte
Message.Key []byte
Message.Seqno []byte
Message.Signature []byte
Message.Topic *string
Message.XXX_NoUnkeyedLiteral struct{}
Message.XXX_sizecache int32
Message.XXX_unrecognized []byte
ReceivedFrom peer.ID
ValidatorData interface{}
( Message) Descriptor() ([]byte, []int)
( Message) GetData() []byte
(*Message) GetFrom() peer.ID
( Message) GetKey() []byte
( Message) GetSeqno() []byte
( Message) GetSignature() []byte
( Message) GetTopic() string
( Message) Marshal() (dAtA []byte, err error)
( Message) MarshalTo(dAtA []byte) (int, error)
( Message) MarshalToSizedBuffer(dAtA []byte) (int, error)
( Message) ProtoMessage()
( Message) Reset()
( Message) Size() (n int)
( Message) String() string
( Message) Unmarshal(dAtA []byte) error
( Message) XXX_DiscardUnknown()
( Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
( Message) XXX_Merge(src proto.Message)
( Message) XXX_Size() int
( Message) XXX_Unmarshal(b []byte) error
Message : github.com/gogo/protobuf/proto.Marshaler
Message : github.com/gogo/protobuf/proto.Message
Message : github.com/gogo/protobuf/proto.Sizer
Message : github.com/gogo/protobuf/proto.Unmarshaler
Message : github.com/golang/protobuf/proto.Marshaler
Message : github.com/golang/protobuf/proto.Unmarshaler
Message : github.com/pion/rtcp.PacketStatusChunk
Message : expvar.Var
Message : fmt.Stringer
Message : google.golang.org/protobuf/runtime/protoiface.MessageV1
func (*MessageCache).Get(mid string) (*Message, bool)
func (*MessageCache).GetForPeer(mid string, p peer.ID) (*Message, int, bool)
func (*Subscription).Next(ctx context.Context) (*Message, error)
func (*FloodSubRouter).PreValidation([]*Message)
func (*FloodSubRouter).Publish(msg *Message)
func (*GossipSubRouter).PreValidation(msgs []*Message)
func (*GossipSubRouter).Publish(msg *Message)
func (*MessageCache).Put(msg *Message)
func PubSubRouter.PreValidation([]*Message)
func PubSubRouter.Publish(*Message)
func (*RandomSubRouter).PreValidation([]*Message)
func (*RandomSubRouter).Publish(msg *Message)
func RawTracer.DeliverMessage(msg *Message)
func RawTracer.DuplicateMessage(msg *Message)
func RawTracer.RejectMessage(msg *Message, reason string)
func RawTracer.UndeliverableMessage(msg *Message)
func RawTracer.ValidateMessage(msg *Message)
(*MessageCache) Get(mid string) (*Message, bool)
(*MessageCache) GetForPeer(mid string, p peer.ID) (*Message, int, bool)
(*MessageCache) GetGossipIDs(topic string) []string
(*MessageCache) Put(msg *Message)
(*MessageCache) SetMsgIdFn(msgID func(*Message) string)
(*MessageCache) Shift()
func NewMessageCache(gossip, history int) *MessageCache
MessageSignaturePolicy describes if signatures are produced, expected, and/or verified.
func WithMessageSignaturePolicy(policy MessageSignaturePolicy) Option
const LaxSign
const StrictNoSign
const StrictSign
MsgIdFunction returns a unique ID for the passed Message, and PubSub can be customized to use any
implementation of this function by configuring it with the Option from WithMessageIdFn.
func WithMessageIdFn(fn MsgIdFunction) Option
func WithTopicMessageIdFn(msgId MsgIdFunction) TopicOpt
func WithAppSpecificRpcInspector(inspector func(peer.ID, *RPC) error) Option
func WithBlacklist(b Blacklist) Option
func WithDefaultValidator(val interface{}, opts ...ValidatorOpt) Option
func WithDirectConnectTicks(t uint64) Option
func WithDirectPeers(pis []peer.AddrInfo) Option
func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option
func WithEventTracer(tracer EventTracer) Option
func WithFloodPublish(floodPublish bool) Option
func WithGossipSubParams(cfg GossipSubParams) Option
func WithGossipSubProtocols(protos []protocol.ID, feature GossipSubFeatureTest) Option
func WithMaxMessageSize(maxMessageSize int) Option
func WithMessageAuthor(author peer.ID) Option
func WithMessageIdFn(fn MsgIdFunction) Option
func WithMessageSignaturePolicy(policy MessageSignaturePolicy) Option
func WithMessageSigning(enabled bool) Option
func WithNoAuthor() Option
func WithPeerExchange(doPX bool) Option
func WithPeerFilter(filter PeerFilter) Option
func WithPeerGater(params *PeerGaterParams) Option
func WithPeerOutboundQueueSize(size int) Option
func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option
func WithPeerScoreInspect(inspect interface{}, period time.Duration) Option
func WithProtocolMatchFn(m ProtocolMatchFn) Option
func WithRawTracer(tracer RawTracer) Option
func WithSeenMessagesStrategy(strategy timecache.Strategy) Option
func WithSeenMessagesTTL(ttl time.Duration) Option
func WithStrictSignatureVerification(required bool) Option
func WithSubscriptionFilter(subFilter SubscriptionFilter) Option
func WithValidateQueueSize(n int) Option
func WithValidateThrottle(n int) Option
func WithValidateWorkers(n int) Option
func (*GossipSubRouter).WithDefaultTagTracer() Option
func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)
func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
func NewGossipSubWithRouter(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)
func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)
func NewRandomSub(ctx context.Context, h host.Host, size int, opts ...Option) (*PubSub, error)
PBTracer is a tracer that writes events to a file, as delimited protobufs.
(*PBTracer) Close()
(*PBTracer) Trace(evt *pb.TraceEvent)
*PBTracer : EventTracer
func NewPBTracer(file string) (*PBTracer, error)
func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error)
Peer peer.ID
Type EventType
func (*TopicEventHandler).NextPeerEvent(ctx context.Context) (PeerEvent, error)
PeerFilter is used to filter pubsub peers. It should return true for peers that are accepted for
a given topic. PubSub can be customized to use any implementation of this function by configuring
it with the Option from WithPeerFilter.
func WithPeerFilter(filter PeerFilter) Option
PeerGaterParams groups together parameters that control the operation of the peer gater
decay interval
counter zeroing threshold
weight of duplicate message deliveries
(linear) decay parameter for gater counters
// global counter decay
weight of ignored messages
quiet interval before turning off the gater; if there are no validation throttle events
for this interval, the gater turns off
weight of rejected messages
how long to retain stats
// per IP counter decay
when the ratio of throttled/validated messages exceeds this threshold, the gater turns on
priority topic delivery weights
WithTopicDeliveryWeights is a fluid setter for the priority topic delivery weights
func DefaultPeerGaterParams() *PeerGaterParams
func NewPeerGaterParams(threshold, globalDecay, sourceDecay float64) *PeerGaterParams
func (*PeerGaterParams).WithTopicDeliveryWeights(w map[string]float64) *PeerGaterParams
func WithPeerGater(params *PeerGaterParams) Option
PeerMetadataStore is an interface for storing and retrieving per peer metadata
Get retrieves the metadata associated with a peer;
It should return nil if there is no metadata associated with the peer and not an error.
Put sets the metadata associated with a peer.
func NewBasicSeqnoValidator(meta PeerMetadataStore) ValidatorEx
type PeerScoreInspectFn = (func)
P5: Application-specific peer scoring
AppSpecificWeight float64
P7: behavioural pattern penalties.
This parameter has an associated counter which tracks misbehaviour as detected by the
router. The router currently applies penalties for the following behaviors:
- attempting to re-graft before the prune backoff time has elapsed.
- not following up in IWANT requests for messages advertised with IHAVE.
The value of the parameter is the square of the counter over the threshold, which decays with
BehaviourPenaltyDecay.
The weight of the parameter MUST be negative (or zero to disable).
P7: behavioural pattern penalties.
This parameter has an associated counter which tracks misbehaviour as detected by the
router. The router currently applies penalties for the following behaviors:
- attempting to re-graft before the prune backoff time has elapsed.
- not following up in IWANT requests for messages advertised with IHAVE.
The value of the parameter is the square of the counter over the threshold, which decays with
BehaviourPenaltyDecay.
The weight of the parameter MUST be negative (or zero to disable).
P7: behavioural pattern penalties.
This parameter has an associated counter which tracks misbehaviour as detected by the
router. The router currently applies penalties for the following behaviors:
- attempting to re-graft before the prune backoff time has elapsed.
- not following up in IWANT requests for messages advertised with IHAVE.
The value of the parameter is the square of the counter over the threshold, which decays with
BehaviourPenaltyDecay.
The weight of the parameter MUST be negative (or zero to disable).
the decay interval for parameter counters.
counter value below which it is considered 0.
IPColocationFactorThreshold int
P6: IP-colocation factor.
The parameter has an associated counter which counts the number of peers with the same IP.
If the number of peers in the same IP exceeds IPColocationFactorThreshold, then the value
is the square of the difference, ie (PeersInSameIP - IPColocationThreshold)^2.
If the number of peers in the same IP is less than the threshold, then the value is 0.
The weight of the parameter MUST be negative, unless you want to disable for testing.
Note: In order to simulate many IPs in a managable manner when testing, you can set the weight to 0
thus disabling the IP colocation penalty.
IPColocationFactorWhitelist []*net.IPNet
time to remember counters for a disconnected peer.
time to remember a message delivery for. Default to global TimeCacheDuration if 0.
whether it is allowed to just set some params and not all of them.
Aggregate topic score cap; this limits the total contribution of topics towards a positive
score. It must be positive (or 0 for no cap).
Score parameters per topic.
func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option
AppSpecificScore float64
BehaviourPenalty float64
IPColocationFactor float64
Score float64
Topics map[string]*TopicScoreSnapshot
AcceptPXThreshold is the score threshold below which PX will be ignored; this should be positive
and limited to scores attainable by bootstrappers and other trusted nodes.
GossipThreshold is the score threshold below which gossip propagation is suppressed;
should be negative.
GraylistThreshold is the score threshold below which message processing is suppressed altogether,
implementing an effective gray list according to peer score; should be negative and <= PublishThreshold.
OpportunisticGraftThreshold is the median mesh score threshold before triggering opportunistic
grafting; this should have a small positive value.
PublishThreshold is the score threshold below which we shouldn't publish when using flood
publishing (also applies to fanout and floodsub peers); should be negative and <= GossipThreshold.
whether it is allowed to just set some params and not all of them.
func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option
type ProtocolMatchFn = (func)
ProvideKey is a function that provides a private key and its associated peer ID when publishing a new message
type PublishOptions (struct)
func WithLocalPublication(local bool) PubOpt
func WithReadiness(ready RouterReady) PubOpt
func WithSecretKeyAndPeerId(key crypto.PrivKey, pid peer.ID) PubOpt
func (*PubSub).Publish(topic string, data []byte, opts ...PubOpt) error
func (*Topic).Publish(ctx context.Context, data []byte, opts ...PubOpt) error
PubSub is the implementation of the pubsub system.
BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
GetTopics returns the topics this node is subscribed to.
Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if
the Topic handle already exists.
ListPeers returns a list of peers we are connected to in the given topic.
Publish publishes data to the given topic.
Deprecated: use pubsub.Join() and topic.Publish() instead
RegisterTopicValidator registers a validator for topic.
By default validators are asynchronous, which means they will run in a separate goroutine.
The number of active goroutines is controlled by global and per topic validator
throttles; if it exceeds the throttle threshold, messages will be dropped.
Subscribe returns a new Subscription for the given topic.
Note that subscription is not an instantaneous operation. It may take some time
before the subscription is processed by the pubsub main loop and propagated to our peers.
Deprecated: use pubsub.Join() and topic.Subscribe() instead
UnregisterTopicValidator removes a validator from a topic.
Returns an error if there was no validator registered with the topic.
func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)
func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
func NewGossipSubWithRouter(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)
func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)
func NewRandomSub(ctx context.Context, h host.Host, size int, opts ...Option) (*PubSub, error)
func (*FloodSubRouter).Attach(p *PubSub)
func (*GossipSubRouter).Attach(p *PubSub)
func PubSubRouter.Attach(*PubSub)
func (*RandomSubRouter).Attach(p *PubSub)
PubSubRouter is the message router component of PubSub.
AcceptFrom is invoked on any RPC envelope before pushing it to the validation pipeline
or processing control information.
Allows routers with internal scoring to vet peers before committing any processing resources
to the message and implement an effective graylist and react to validation queue overload.
AddPeer notifies the router that a new peer has been connected.
Attach is invoked by the PubSub constructor to attach the router to a
freshly initialized PubSub instance.
EnoughPeers returns whether the router needs more peers before it's ready to publish new records.
Suggested (if greater than 0) is a suggested number of peers that the router should need.
HandleRPC is invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed.
Join notifies the router that we want to receive and forward messages in a topic.
It is invoked after the subscription announcement.
Leave notifies the router that we are no longer interested in a topic.
It is invoked after the unsubscription announcement.
PreValidation is invoked on messages in the RPC envelope right before pushing it to
the validation pipeline
Protocols returns the list of protocols supported by the router.
Publish is invoked to forward a new message that has been validated.
RemovePeer notifies the router that a peer has been disconnected.
*FloodSubRouter
*GossipSubRouter
*RandomSubRouter
func NewGossipSubWithRouter(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)
func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)
RandomSubRouter is a router that implements a random propagation strategy.
For each message, it selects the square root of the network size peers, with a min of RandomSubD,
and forwards the message to them.
(*RandomSubRouter) AcceptFrom(peer.ID) AcceptStatus
(*RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID)
(*RandomSubRouter) Attach(p *PubSub)
(*RandomSubRouter) EnoughPeers(topic string, suggested int) bool
(*RandomSubRouter) HandleRPC(rpc *RPC)
(*RandomSubRouter) Join(topic string)
(*RandomSubRouter) Leave(topic string)
(*RandomSubRouter) PreValidation([]*Message)
(*RandomSubRouter) Protocols() []protocol.ID
(*RandomSubRouter) Publish(msg *Message)
(*RandomSubRouter) RemovePeer(p peer.ID)
*RandomSubRouter : PubSubRouter
RawTracer is a low level tracing interface that allows an application to trace the internal
operation of the pubsub subsystem.
Note that the tracers are invoked synchronously, which means that application tracers must
take care to not block or modify arguments.
Warning: this interface is not fixed, we may be adding new methods as necessitated by the system
in the future.
AddPeer is invoked when a new peer is added.
DeliverMessage is invoked when a message is delivered
DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
DuplicateMessage is invoked when a duplicate message is dropped.
Graft is invoked when a new peer is grafted on the mesh (gossipsub)
Join is invoked when a new topic is joined
Leave is invoked when a topic is abandoned
Prune is invoked when a peer is pruned from the message (gossipsub)
RecvRPC is invoked when an incoming RPC is received.
RejectMessage is invoked when a message is Rejected or Ignored.
The reason argument can be one of the named strings Reject*.
RemovePeer is invoked when a peer is removed.
SendRPC is invoked when a RPC is sent.
ThrottlePeer is invoked when a peer is throttled by the peer gater.
UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and
the pressure release mechanism trigger, dropping messages.
ValidateMessage is invoked when a message first enters the validation pipeline.
func WithRawTracer(tracer RawTracer) Option
RemoteTracer is a tracer that sends trace events to a remote peer
(*RemoteTracer) Close()
(*RemoteTracer) Trace(evt *pb.TraceEvent)
*RemoteTracer : EventTracer
func NewRemoteTracer(ctx context.Context, host host.Host, pi peer.AddrInfo) (*RemoteTracer, error)
RouterReady is a function that decides if a router is ready to publish
func MinTopicSize(size int) RouterReady
func WithReadiness(ready RouterReady) PubOpt
RPC pb.RPC
RPC.Control *pubsub_pb.ControlMessage
RPC.Publish []*pubsub_pb.Message
RPC.Subscriptions []*pubsub_pb.RPC_SubOpts
RPC.XXX_NoUnkeyedLiteral struct{}
RPC.XXX_sizecache int32
RPC.XXX_unrecognized []byte
(*RPC) Descriptor() ([]byte, []int)
(*RPC) GetControl() *pubsub_pb.ControlMessage
(*RPC) GetPublish() []*pubsub_pb.Message
(*RPC) GetSubscriptions() []*pubsub_pb.RPC_SubOpts
(*RPC) Marshal() (dAtA []byte, err error)
(*RPC) MarshalTo(dAtA []byte) (int, error)
(*RPC) MarshalToSizedBuffer(dAtA []byte) (int, error)
(*RPC) ProtoMessage()
(*RPC) Reset()
(*RPC) Size() (n int)
(*RPC) String() string
(*RPC) Unmarshal(dAtA []byte) error
(*RPC) XXX_DiscardUnknown()
(*RPC) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
(*RPC) XXX_Merge(src proto.Message)
(*RPC) XXX_Size() int
(*RPC) XXX_Unmarshal(b []byte) error
*RPC : github.com/gogo/protobuf/proto.Marshaler
*RPC : github.com/gogo/protobuf/proto.Message
*RPC : github.com/gogo/protobuf/proto.Sizer
*RPC : github.com/gogo/protobuf/proto.Unmarshaler
*RPC : github.com/golang/protobuf/proto.Marshaler
*RPC : github.com/golang/protobuf/proto.Unmarshaler
*RPC : github.com/pion/rtcp.PacketStatusChunk
*RPC : expvar.Var
*RPC : fmt.Stringer
*RPC : google.golang.org/protobuf/runtime/protoiface.MessageV1
func (*FloodSubRouter).HandleRPC(rpc *RPC)
func (*GossipSubRouter).HandleRPC(rpc *RPC)
func PubSubRouter.HandleRPC(*RPC)
func (*RandomSubRouter).HandleRPC(rpc *RPC)
func RawTracer.DropRPC(rpc *RPC, p peer.ID)
func RawTracer.RecvRPC(rpc *RPC)
func RawTracer.SendRPC(rpc *RPC, p peer.ID)
func WithBufferSize(size int) SubOpt
func (*PubSub).Subscribe(topic string, opts ...SubOpt) (*Subscription, error)
func (*Topic).Subscribe(opts ...SubOpt) (*Subscription, error)
Subscription handles the details of a particular Topic subscription.
There may be many subscriptions for a given Topic.
Cancel closes the subscription. If this is the last active subscription then pubsub will send an unsubscribe
announcement to the network.
Next returns the next message in our subscription
Topic returns the topic string associated with the Subscription
func (*PubSub).Subscribe(topic string, opts ...SubOpt) (*Subscription, error)
func (*Topic).Subscribe(opts ...SubOpt) (*Subscription, error)
SubscriptionFilter is a function that tells us whether we are interested in allowing and tracking
subscriptions for a given topic.
The filter is consulted whenever a subscription notification is received by another peer; if the
filter returns false, then the notification is ignored.
The filter is also consulted when joining topics; if the filter returns false, then the Join
operation will result in an error.
CanSubscribe returns true if the topic is of interest and we can subscribe to it
FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications.
It should filter only the subscriptions of interest and my return an error if (for instance)
there are too many subscriptions.
func NewAllowlistSubscriptionFilter(topics ...string) SubscriptionFilter
func NewRegexpSubscriptionFilter(rx *regexp.Regexp) SubscriptionFilter
func WrapLimitSubscriptionFilter(filter SubscriptionFilter, limit int) SubscriptionFilter
func WithSubscriptionFilter(subFilter SubscriptionFilter) Option
func WrapLimitSubscriptionFilter(filter SubscriptionFilter, limit int) SubscriptionFilter
TimeCachedBlacklist is a blacklist implementation using a time cache
Add returns a bool saying whether Add of peer was successful
(*TimeCachedBlacklist) Contains(p peer.ID) bool
*TimeCachedBlacklist : Blacklist
Topic is the handle for a pubsub topic
Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
Does not error if the topic is already closed.
EventHandler creates a handle for topic specific events
Multiple event handlers may be created and will operate independently of each other
ListPeers returns a list of peers we are connected to in the given topic.
Publish publishes data to topic.
Relay enables message relaying for the topic and returns a reference
cancel function. Subsequent calls increase the reference counter.
To completely disable the relay, all references must be cancelled.
SetScoreParams sets the topic score parameters if the pubsub router supports peer
scoring
String returns the topic associated with t
Subscribe returns a new Subscription for the topic.
Note that subscription is not an instantaneous operation. It may take some time
before the subscription is processed by the pubsub main loop and propagated to our peers.
*Topic : github.com/prometheus/common/expfmt.Closer
*Topic : expvar.Var
*Topic : fmt.Stringer
*Topic : io.Closer
func (*PubSub).Join(topic string, opts ...TopicOpt) (*Topic, error)
TopicEventHandler is used to manage topic specific events. No Subscription is required to receive events.
Cancel closes the topic event handler
NextPeerEvent returns the next event regarding subscribed peers
Guarantees: Peer Join and Peer Leave events for a given peer will fire in order.
Unless a peer both Joins and Leaves before NextPeerEvent emits either event
all events will eventually be received from NextPeerEvent.
func (*Topic).EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error)
func (*Topic).EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error)
func WithTopicMessageIdFn(msgId MsgIdFunction) TopicOpt
func (*PubSub).Join(topic string, opts ...TopicOpt) (*Topic, error)
type TopicOptions (struct)
FirstMessageDeliveriesCap float64
P2: first message deliveries
This is the number of message deliveries in the topic.
The value of the parameter is a counter, decaying with FirstMessageDeliveriesDecay, and capped
by FirstMessageDeliveriesCap.
The weight of the parameter MUST be positive (or zero to disable).
P2: first message deliveries
This is the number of message deliveries in the topic.
The value of the parameter is a counter, decaying with FirstMessageDeliveriesDecay, and capped
by FirstMessageDeliveriesCap.
The weight of the parameter MUST be positive (or zero to disable).
P4: invalid messages
This is the number of invalid messages in the topic.
The value of the parameter is the square of the counter, decaying with
InvalidMessageDeliveriesDecay.
The weight of the parameter MUST be negative (or zero to disable).
P4: invalid messages
This is the number of invalid messages in the topic.
The value of the parameter is the square of the counter, decaying with
InvalidMessageDeliveriesDecay.
The weight of the parameter MUST be negative (or zero to disable).
P3b: sticky mesh propagation failures
This is a sticky penalty that applies when a peer gets pruned from the mesh with an active
mesh message delivery penalty.
The weight of the parameter MUST be negative (or zero to disable)
P3b: sticky mesh propagation failures
This is a sticky penalty that applies when a peer gets pruned from the mesh with an active
mesh message delivery penalty.
The weight of the parameter MUST be negative (or zero to disable)
MeshMessageDeliveriesActivation time.Duration
MeshMessageDeliveriesCap float64
P3: mesh message deliveries
This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesWindow of
message validation; deliveries during validation also count and are retroactively applied
when validation succeeds.
This window accounts for the minimum time before a hostile mesh peer trying to game the score
could replay back a valid message we just sent them.
It effectively tracks first and near-first deliveries, i.e., a message seen from a mesh peer
before we have forwarded it to them.
The parameter has an associated counter, decaying with MeshMessageDeliveriesDecay.
If the counter exceeds the threshold, its value is 0.
If the counter is below the MeshMessageDeliveriesThreshold, the value is the square of
the deficit, ie (MessageDeliveriesThreshold - counter)^2
The penalty is only activated after MeshMessageDeliveriesActivation time in the mesh.
The weight of the parameter MUST be negative (or zero to disable).
MeshMessageDeliveriesThreshold float64
P3: mesh message deliveries
This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesWindow of
message validation; deliveries during validation also count and are retroactively applied
when validation succeeds.
This window accounts for the minimum time before a hostile mesh peer trying to game the score
could replay back a valid message we just sent them.
It effectively tracks first and near-first deliveries, i.e., a message seen from a mesh peer
before we have forwarded it to them.
The parameter has an associated counter, decaying with MeshMessageDeliveriesDecay.
If the counter exceeds the threshold, its value is 0.
If the counter is below the MeshMessageDeliveriesThreshold, the value is the square of
the deficit, ie (MessageDeliveriesThreshold - counter)^2
The penalty is only activated after MeshMessageDeliveriesActivation time in the mesh.
The weight of the parameter MUST be negative (or zero to disable).
MeshMessageDeliveriesWindow time.Duration
whether it is allowed to just set some params and not all of them.
TimeInMeshCap float64
TimeInMeshQuantum time.Duration
P1: time in the mesh
This is the time the peer has been grafted in the mesh.
The value of the parameter is the time/TimeInMeshQuantum, capped by TimeInMeshCap.
The weight of the parameter MUST be positive (or zero to disable).
The weight of the topic.
func (*Topic).SetScoreParams(p *TopicScoreParams) error
FirstMessageDeliveries float64
InvalidMessageDeliveries float64
MeshMessageDeliveries float64
TimeInMesh time.Duration
ValidationError is an error that may be signalled from message publication when the message
fails validation
Reason string
( ValidationError) Error() string
ValidationError : error
ValidationResult represents the decision of an extended validator
const ValidationAccept
const ValidationIgnore
const ValidationReject
Validator is a function that validates a message with a binary decision: accept or reject.
ValidatorEx is an extended validation function that validates a message with an enumerated decision
func NewBasicSeqnoValidator(meta PeerMetadataStore) ValidatorEx
ValidatorOpt is an option for RegisterTopicValidator.
func WithValidatorConcurrency(n int) ValidatorOpt
func WithValidatorInline(inline bool) ValidatorOpt
func WithValidatorTimeout(timeout time.Duration) ValidatorOpt
func WithDefaultValidator(val interface{}, opts ...ValidatorOpt) Option
func (*PubSub).RegisterTopicValidator(topic string, val interface{}, opts ...ValidatorOpt) error
Package-Level Functions (total 70)
DefaultGossipSubParams returns the default gossip sub parameters
as a config.
DefaultGossipSubRouter returns a new GossipSubRouter with default parameters.
DefaultMsgIdFn returns a unique ID of the passed Message
DefaultPeerFilter accepts all peers on all topics
DefaultPeerGaterParams creates a new PeerGaterParams struct using default values
FilterSubscriptions filters (and deduplicates) a list of subscriptions.
filter should return true if a topic is of interest.
GossipSubDefaultFeatures is the feature test function for the default gossipsub protocols
MinTopicSize returns a function that checks if a router is ready for publishing based on the topic size.
The router ultimately decides the whether it is ready or not, the given size is just a suggestion. Note
that the topic size does not include the router in the count.
NewAllowlistSubscriptionFilter creates a subscription filter that only allows explicitly
specified topics for local subscriptions and incoming peer subscriptions.
NewBasicSeqnoValidator constructs a BasicSeqnoValidator using the givven PeerMetadataStore.
NewFloodSub returns a new PubSub object using the FloodSubRouter.
NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps.
NewGossipSub returns a new PubSub object using the default GossipSubRouter as the router.
NewGossipSubWithRouter returns a new PubSub object using the given router.
NewJsonTracer creates a new JSONTracer writing traces to file.
NewMapBlacklist creates a new MapBlacklist
NewMessageCache creates a sliding window cache that remembers messages for as
long as `history` slots.
When queried for messages to advertise, the cache only returns messages in
the last `gossip` slots.
The `gossip` parameter must be smaller or equal to `history`, or this
function will panic.
The slack between `gossip` and `history` accounts for the reaction time
between when a message is advertised via IHAVE gossip, and the peer pulls it
via an IWANT command.
func NewPBTracer(file string) (*PBTracer, error)
NewPeerGaterParams creates a new PeerGaterParams struct, using the specified threshold and decay
parameters and default values for all other parameters.
NewPubSub returns a new PubSub management object.
NewRandomSub returns a new PubSub object using RandomSubRouter as the router.
NewRegexpSubscriptionFilter creates a subscription filter that only allows topics that
match a regular expression for local subscriptions and incoming peer subscriptions.
Warning: the user should take care to match start/end of string in the supplied regular
expression, otherwise the filter might match unwanted topics unexpectedly.
NewRemoteTracer constructs a RemoteTracer, tracing to the peer identified by pi
NewTimeCachedBlacklist creates a new TimeCachedBlacklist with the given expiry duration
OpenJSONTracer creates a new JSONTracer, with explicit control of OpenFile flags and permissions.
OpenPBTracer creates a new PBTracer, with explicit control of OpenFile flags and permissions.
ScoreParameterDecay computes the decay factor for a parameter, assuming the DecayInterval is 1s
and that the value decays to zero if it drops below 0.01
ScoreParameterDecayWithBase computes the decay factor for a parameter using base as the DecayInterval
WithAppSpecificRpcInspector sets a hook that inspect incomings RPCs prior to
processing them. The inspector is invoked on an accepted RPC just before it
is handled. If inspector's error is nil, the RPC is handled. Otherwise, it
is dropped.
WithBlacklist provides an implementation of the blacklist; the default is a
MapBlacklist
WithBufferSize is a Subscribe option to customize the size of the subscribe output buffer.
The default length is 32 but it can be configured to avoid dropping messages if the consumer is not reading fast
enough.
/ Options
WithDefaultValidator adds a validator that applies to all topics by default; it can be used
more than once and add multiple validators. Having a defult validator does not inhibit registering
a per topic validator.
WithDirectConnectTicks is a gossipsub router option that sets the number of
heartbeat ticks between attempting to reconnect direct peers that are not
currently connected. A "tick" is based on the heartbeat interval, which is
1s by default. The default value for direct connect ticks is 300.
WithDirectPeers is a gossipsub router option that specifies peers with direct
peering agreements. These peers are connected outside of the mesh, with all (valid)
message unconditionally forwarded to them. The router will maintain open connections
to these peers. Note that the peering agreement should be reciprocal with direct peers
symmetrically configured at both ends.
WithDiscoverConnector adds a custom connector that deals with how the discovery subsystem connects to peers
WithDiscovery provides a discovery mechanism used to bootstrap and provide peers into PubSub
WithDiscoveryOpts passes libp2p Discovery options into the PubSub discovery subsystem
WithEventTracer provides a tracer for the pubsub system
WithFloodPublish is a gossipsub router option that enables flood publishing.
When this is enabled, published messages are forwarded to all peers with score >=
to publishThreshold
WithGossipSubParams is a gossip sub router option that allows a custom
config to be set when instantiating the gossipsub router.
WithGossipSubProtocols is a gossipsub router option that configures a custom protocol list
and feature test function
WithLocalPublication returns a publishing option to notify in-process subscribers only.
It prevents message publication to mesh peers.
Useful in edge cases where the msg needs to be only delivered to the in-process subscribers,
e.g. not to spam the network with outdated msgs.
Should not be used specifically for in-process pubsubing.
WithMaxMessageSize sets the global maximum message size for pubsub wire
messages. The default value is 1MiB (DefaultMaxMessageSize).
Observe the following warnings when setting this option.
WARNING #1: Make sure to change the default protocol prefixes for floodsub
(FloodSubID) and gossipsub (GossipSubID). This avoids accidentally joining
the public default network, which uses the default max message size, and
therefore will cause messages to be dropped.
WARNING #2: Reducing the default max message limit is fine, if you are
certain that your application messages will not exceed the new limit.
However, be wary of increasing the limit, as pubsub networks are naturally
write-amplifying, i.e. for every message we receive, we send D copies of the
message to our peers. If those messages are large, the bandwidth requirements
will grow linearly. Note that propagation is sent on the uplink, which
traditionally is more constrained than the downlink. Instead, consider
out-of-band retrieval for large messages, by sending a CID (Content-ID) or
another type of locator, such that messages can be fetched on-demand, rather
than being pushed proactively. Under this design, you'd use the pubsub layer
as a signalling system, rather than a data delivery system.
WithMessageAuthor sets the author for outbound messages to the given peer ID
(defaults to the host's ID). If message signing is enabled, the private key
must be available in the host's peerstore.
WithMessageIdFn is an option to customize the way a message ID is computed for a pubsub message.
The default ID function is DefaultMsgIdFn (concatenate source and seq nr.),
but it can be customized to e.g. the hash of the message.
WithMessageSignaturePolicy sets the mode of operation for producing and verifying message signatures.
WithMessageSigning enables or disables message signing (enabled by default).
Deprecated: signature verification without message signing,
or message signing without verification, are not recommended.
WithNoAuthor omits the author and seq-number data of messages, and disables the use of signatures.
Not recommended to use with the default message ID function, see WithMessageIdFn.
WithPeerExchange is a gossipsub router option that enables Peer eXchange on PRUNE.
This should generally be enabled in bootstrappers and well connected/trusted nodes
used for bootstrapping.
WithPeerFilter is an option to set a filter for pubsub peers.
The default peer filter is DefaultPeerFilter (which always returns true), but it can be customized
to any custom implementation.
WithPeerGater is a gossipsub router option that enables reactive validation queue
management.
The Gater is activated if the ratio of throttled/validated messages exceeds the specified
threshold.
Once active, the Gater probabilistically throttles peers _before_ they enter the validation
queue, performing Random Early Drop.
The throttle decision is randomized, with the probability of allowing messages to enter the
validation queue controlled by the statistical observations of the performance of all peers
in the IP address of the gated peer.
The Gater deactivates if there is no validation throttlinc occurring for the specified quiet
interval.
WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer
We start dropping messages to a peer if the outbound queue if full
WithPeerScore is a gossipsub router option that enables peer scoring.
WithPeerScoreInspect is a gossipsub router option that enables peer score debugging.
When this option is enabled, the supplied function will be invoked periodically to allow
the application to inspect or dump the scores for connected peers.
The supplied function can have one of two signatures:
- PeerScoreInspectFn, which takes a map of peer IDs to score.
- ExtendedPeerScoreInspectFn, which takes a map of peer IDs to
PeerScoreSnapshots and allows inspection of individual score
components for debugging peer scoring.
This option must be passed _after_ the WithPeerScore option.
WithProtocolMatchFn sets a custom matching function for protocol selection to
be used by the protocol handler on the Host's Mux. Should be combined with
WithGossipSubProtocols feature function for checking if certain protocol features
are supported
WithRawTracer adds a raw tracer to the pubsub system.
Multiple tracers can be added using multiple invocations of the option.
WithReadiness returns a publishing option for only publishing when the router is ready.
This option is not useful unless PubSub is also using WithDiscovery
WithSecretKeyAndPeerId returns a publishing option for providing a custom private key and its corresponding peer ID
This option is useful when we want to send messages from "virtual", never-connectable peers in the network
WithSeenMessagesStrategy configures which type of lookup/cleanup strategy is used by the seen messages cache
WithSeenMessagesTTL configures when a previously seen message ID can be forgotten about
WithStrictSignatureVerification is an option to enable or disable strict message signing.
When enabled (which is the default), unsigned messages will be discarded.
Deprecated: signature verification without message signing,
or message signing without verification, are not recommended.
WithSubscriptionFilter is a pubsub option that specifies a filter for subscriptions
in topics of interest.
WithTopicMessageIdFn sets custom MsgIdFunction for a Topic, enabling topics to have own msg id generation rules.
WithValidateQueueSize sets the buffer of validate queue. Defaults to 32.
When queue is full, validation is throttled and new messages are dropped.
WithValidateThrottle sets the upper bound on the number of active validation
goroutines across all topics. The default is 8192.
WithValidateWorkers sets the number of synchronous validation worker goroutines.
Defaults to NumCPU.
The synchronous validation workers perform signature validation, apply inline
user validators, and schedule asynchronous user validators.
You can adjust this parameter to devote less cpu time to synchronous validation.
WithValidatorConcurrency is an option that sets the topic validator throttle.
This controls the number of active validation goroutines for the topic; the default is 1024.
WithValidatorInline is an option that sets the validation disposition to synchronous:
it will be executed inline in validation front-end, without spawning a new goroutine.
This is suitable for simple or cpu-bound validators that do not block.
WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator.
By default there is no timeout in asynchronous validators.
WrapLimitSubscriptionFilter wraps a subscription filter with a hard limit in the number of
subscriptions allowed in an RPC message.
Package-Level Variables (total 60)
DiscoveryPollInitialDelay is how long the discovery system waits after it first starts before polling
DiscoveryPollInterval is approximately how long the discovery system waits in between checks for whether the
more peers are needed for any topic
ErrEmptyPeerID is returned if an empty peer ID was provided
ErrNilSignKey is returned if a nil private key was provided
var ErrQueueClosed error var ErrQueueFull error
ErrSubscriptionCancelled may be returned when a subscription Next() is called after the
subscription has been cancelled.
ErrTooManySubscriptions may be returned by a SubscriptionFilter to signal that there are too many
subscriptions to process.
ErrTopicClosed is returned if a Topic is utilized after it has been closed
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
GossipSubConnTagBumpMessageDelivery is the amount to add to the connection manager
tag that tracks message deliveries. Each time a peer is the first to deliver a
message within a topic, we "bump" a tag by this amount, up to a maximum
of GossipSubConnTagMessageDeliveryCap.
Note that the delivery tags decay over time, decreasing by GossipSubConnTagDecayAmount
at every GossipSubConnTagDecayInterval.
GossipSubConnTagDecayAmount is subtracted from decaying tag values at each decay interval.
GossipSubConnTagDecayInterval is the decay interval for decaying connection manager tags.
GossipSubConnTagMessageDeliveryCap is the maximum value for the connection manager tags that
track message deliveries.
Defines the default gossipsub parameters.
GossipSubDefaultProtocols is the default gossipsub router protocol list
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
Defines the default gossipsub parameters.
var MinTraceBatchSize int var RandomSubD int
TimeCacheDuration specifies how long a message ID will be remembered as seen.
Use WithSeenMessagesTTL to configure this per pubsub instance, instead of overriding the global default.
TimeCacheStrategy specifies which type of lookup/cleanup strategy is used by the seen messages cache.
Use WithSeenMessagesStrategy to configure this per pubsub instance, instead of overriding the global default.
var TraceBufferSize int // 64K ought to be enough for everyone; famous last words.
Package-Level Constants (total 44)
AcceptAll signals to accept the incoming RPC for full processing
AcceptControl signals to accept the incoming RPC only for control message processing by
the router. Included payload messages will _not_ be pushed to the validation queue.
AcceptNone signals to drop the incoming RPC
const BackoffCleanupInterval time.Duration = 60000000000 const BackoffMultiplier = 2 const DefaultDecayInterval time.Duration = 1000000000 const DefaultDecayToZero = 0.01
DefaultMaximumMessageSize is 1mb.
const FloodSubID protocol.ID = "/floodsub/1.0.0" const FloodSubTopicSearchSize = 5
Protocol supports IDONTWANT -- gossipsub-v1.2 compatible
Protocol supports basic GossipSub Mesh -- gossipsub-v1.0 compatible
Protocol supports Peer eXchange on prune -- gossipsub-v1.1 compatible
GossipSubID_v10 is the protocol ID for version 1.0.0 of the GossipSub protocol.
It is advertised along with GossipSubID_v11 and GossipSubID_v12 for backwards compatibility.
GossipSubID_v11 is the protocol ID for version 1.1.0 of the GossipSub protocol.
It is advertised along with GossipSubID_v12 for backwards compatibility.
See the spec for details about how v1.1.0 compares to v1.0.0:
https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md
GossipSubID_v12 is the protocol ID for version 1.2.0 of the GossipSub protocol.
See the spec for details about how v1.2.0 compares to v1.1.0:
https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md
LaxNoSign does not produce signatures and validates incoming signatures iff one is present
Deprecated: it is recommend to either strictly enable, or strictly disable, signatures.
LaxSign produces signatures and validates incoming signatures iff one is present
Deprecated: it is recommend to either strictly enable, or strictly disable, signatures.
const MaxBackoffAttempts = 4 const MaxBackoffDelay time.Duration = 10000000000 const MaxBackoffJitterCoff = 100 const MinBackoffDelay time.Duration = 100000000 const RandomSubID protocol.ID = "/randomsub/1.0.0"
rejection reasons
rejection reasons
rejection reasons
rejection reasons
rejection reasons
rejection reasons
rejection reasons
rejection reasons
rejection reasons
rejection reasons
rejection reasons
const RemoteTracerProtoID protocol.ID = "/libp2p/pubsub/tracer/1.0.0" const SignPrefix = "libp2p-pubsub:"
StrictNoSign does not produce signatures and drops and penalises incoming messages that carry one
StrictSign produces signatures and expects and verifies incoming signatures
const TimeToLive time.Duration = 600000000000
ValidationAccept is a validation decision that indicates a valid message that should be accepted and
delivered to the application and forwarded to the network.
ValidationIgnore is a validation decision that indicates a message that should be ignored: it will
be neither delivered to the application nor forwarded to the network. However, in contrast to
ValidationReject, the peer that forwarded the message must not be penalized by peer scoring routers.
ValidationReject is a validation decision that indicates an invalid message that should not be
delivered to the application or forwarded to the application. Furthermore the peer that forwarded
the message should be penalized by peer scoring routers.
![]() |
The pages are generated with Golds v0.8.2. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |