package pubsub

import (
	
	
	
	
	
	
	
	

	pb 
	

	
	
	
	
	
	

	logging 
)

// DefaultMaximumMessageSize is 1mb.
const DefaultMaxMessageSize = 1 << 20

var (
	// TimeCacheDuration specifies how long a message ID will be remembered as seen.
	// Use WithSeenMessagesTTL to configure this per pubsub instance, instead of overriding the global default.
	TimeCacheDuration = 120 * time.Second

	// TimeCacheStrategy specifies which type of lookup/cleanup strategy is used by the seen messages cache.
	// Use WithSeenMessagesStrategy to configure this per pubsub instance, instead of overriding the global default.
	TimeCacheStrategy = timecache.Strategy_FirstSeen

	// ErrSubscriptionCancelled may be returned when a subscription Next() is called after the
	// subscription has been cancelled.
	ErrSubscriptionCancelled = errors.New("subscription cancelled")
)

var log = logging.Logger("pubsub")

type ProtocolMatchFn = func(protocol.ID) func(protocol.ID) bool

// PubSub is the implementation of the pubsub system.
type PubSub struct {
	// atomic counter for seqnos
	// NOTE: Must be declared at the top of the struct as we perform atomic
	// operations on this field.
	//
	// See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
	counter uint64

	host host.Host

	rt PubSubRouter

	val *validation

	disc *discover

	tracer *pubsubTracer

	peerFilter PeerFilter

	// maxMessageSize is the maximum message size; it applies globally to all
	// topics.
	maxMessageSize int

	// size of the outbound message channel that we maintain for each peer
	peerOutboundQueueSize int

	// incoming messages from other peers
	incoming chan *RPC

	// addSub is a control channel for us to add and remove subscriptions
	addSub chan *addSubReq

	// addRelay is a control channel for us to add and remove relays
	addRelay chan *addRelayReq

	// rmRelay is a relay cancellation channel
	rmRelay chan string

	// get list of topics we are subscribed to
	getTopics chan *topicReq

	// get chan of peers we are connected to
	getPeers chan *listPeerReq

	// send subscription here to cancel it
	cancelCh chan *Subscription

	// addSub is a channel for us to add a topic
	addTopic chan *addTopicReq

	// removeTopic is a topic cancellation channel
	rmTopic chan *rmTopicReq

	// a notification channel for new peer connections accumulated
	newPeers       chan struct{}
	newPeersPrioLk sync.RWMutex
	newPeersMx     sync.Mutex
	newPeersPend   map[peer.ID]struct{}

	// a notification channel for new outoging peer streams
	newPeerStream chan network.Stream

	// a notification channel for errors opening new peer streams
	newPeerError chan peer.ID

	// a notification channel for when our peers die
	peerDead       chan struct{}
	peerDeadPrioLk sync.RWMutex
	peerDeadMx     sync.Mutex
	peerDeadPend   map[peer.ID]struct{}
	// backoff for retrying new connections to dead peers
	deadPeerBackoff *backoff

	// The set of topics we are subscribed to
	mySubs map[string]map[*Subscription]struct{}

	// The set of topics we are relaying for
	myRelays map[string]int

	// The set of topics we are interested in
	myTopics map[string]*Topic

	// topics tracks which topics each of our peers are subscribed to
	topics map[string]map[peer.ID]struct{}

	// sendMsg handles messages that have been validated
	sendMsg chan *Message

	// addVal handles validator registration requests
	addVal chan *addValReq

	// rmVal handles validator unregistration requests
	rmVal chan *rmValReq

	// eval thunk in event loop
	eval chan func()

	// peer blacklist
	blacklist     Blacklist
	blacklistPeer chan peer.ID

	peers map[peer.ID]*rpcQueue

	inboundStreamsMx sync.Mutex
	inboundStreams   map[peer.ID]network.Stream

	seenMessages    timecache.TimeCache
	seenMsgTTL      time.Duration
	seenMsgStrategy timecache.Strategy

	// generator used to compute the ID for a message
	idGen *msgIDGenerator

	// key for signing messages; nil when signing is disabled
	signKey crypto.PrivKey
	// source ID for signed messages; corresponds to signKey, empty when signing is disabled.
	// If empty, the author and seq-nr are completely omitted from the messages.
	signID peer.ID
	// strict mode rejects all unsigned messages prior to validation
	signPolicy MessageSignaturePolicy

	// filter for tracking subscriptions in topics of interest; if nil, then we track all subscriptions
	subFilter SubscriptionFilter

	// protoMatchFunc is a matching function for protocol selection.
	protoMatchFunc ProtocolMatchFn

	ctx context.Context

	// appSpecificRpcInspector is an auxiliary that may be set by the application to inspect incoming RPCs prior to
	// processing them. The inspector is invoked on an accepted RPC right prior to handling it.
	// The return value of the inspector function is an error indicating whether the RPC should be processed or not.
	// If the error is nil, the RPC is processed as usual. If the error is non-nil, the RPC is dropped.
	appSpecificRpcInspector func(peer.ID, *RPC) error
}

// PubSubRouter is the message router component of PubSub.
type PubSubRouter interface {
	// Protocols returns the list of protocols supported by the router.
	Protocols() []protocol.ID
	// Attach is invoked by the PubSub constructor to attach the router to a
	// freshly initialized PubSub instance.
	Attach(*PubSub)
	// AddPeer notifies the router that a new peer has been connected.
	AddPeer(peer.ID, protocol.ID)
	// RemovePeer notifies the router that a peer has been disconnected.
	RemovePeer(peer.ID)
	// EnoughPeers returns whether the router needs more peers before it's ready to publish new records.
	// Suggested (if greater than 0) is a suggested number of peers that the router should need.
	EnoughPeers(topic string, suggested int) bool
	// AcceptFrom is invoked on any RPC envelope before pushing it to the validation pipeline
	// or processing control information.
	// Allows routers with internal scoring to vet peers before committing any processing resources
	// to the message and implement an effective graylist and react to validation queue overload.
	AcceptFrom(peer.ID) AcceptStatus
	// PreValidation is invoked on messages in the RPC envelope right before pushing it to
	// the validation pipeline
	PreValidation([]*Message)
	// HandleRPC is invoked to process control messages in the RPC envelope.
	// It is invoked after subscriptions and payload messages have been processed.
	HandleRPC(*RPC)
	// Publish is invoked to forward a new message that has been validated.
	Publish(*Message)
	// Join notifies the router that we want to receive and forward messages in a topic.
	// It is invoked after the subscription announcement.
	Join(topic string)
	// Leave notifies the router that we are no longer interested in a topic.
	// It is invoked after the unsubscription announcement.
	Leave(topic string)
}

type AcceptStatus int

const (
	// AcceptNone signals to drop the incoming RPC
	AcceptNone AcceptStatus = iota
	// AcceptControl signals to accept the incoming RPC only for control message processing by
	// the router. Included payload messages will _not_ be pushed to the validation queue.
	AcceptControl
	// AcceptAll signals to accept the incoming RPC for full processing
	AcceptAll
)

type Message struct {
	*pb.Message
	ID            string
	ReceivedFrom  peer.ID
	ValidatorData interface{}
	Local         bool
}

func ( *Message) () peer.ID {
	return peer.ID(.Message.GetFrom())
}

type RPC struct {
	pb.RPC

	// unexported on purpose, not sending this over the wire
	from peer.ID
}

type Option func(*PubSub) error

// NewPubSub returns a new PubSub management object.
func ( context.Context,  host.Host,  PubSubRouter,  ...Option) (*PubSub, error) {
	 := &PubSub{
		host:                  ,
		ctx:                   ,
		rt:                    ,
		val:                   newValidation(),
		peerFilter:            DefaultPeerFilter,
		disc:                  &discover{},
		maxMessageSize:        DefaultMaxMessageSize,
		peerOutboundQueueSize: 32,
		signID:                .ID(),
		signKey:               nil,
		signPolicy:            StrictSign,
		incoming:              make(chan *RPC, 32),
		newPeers:              make(chan struct{}, 1),
		newPeersPend:          make(map[peer.ID]struct{}),
		newPeerStream:         make(chan network.Stream),
		newPeerError:          make(chan peer.ID),
		peerDead:              make(chan struct{}, 1),
		peerDeadPend:          make(map[peer.ID]struct{}),
		deadPeerBackoff:       newBackoff(, 1000, BackoffCleanupInterval, MaxBackoffAttempts),
		cancelCh:              make(chan *Subscription),
		getPeers:              make(chan *listPeerReq),
		addSub:                make(chan *addSubReq),
		addRelay:              make(chan *addRelayReq),
		rmRelay:               make(chan string),
		addTopic:              make(chan *addTopicReq),
		rmTopic:               make(chan *rmTopicReq),
		getTopics:             make(chan *topicReq),
		sendMsg:               make(chan *Message, 32),
		addVal:                make(chan *addValReq),
		rmVal:                 make(chan *rmValReq),
		eval:                  make(chan func()),
		myTopics:              make(map[string]*Topic),
		mySubs:                make(map[string]map[*Subscription]struct{}),
		myRelays:              make(map[string]int),
		topics:                make(map[string]map[peer.ID]struct{}),
		peers:                 make(map[peer.ID]*rpcQueue),
		inboundStreams:        make(map[peer.ID]network.Stream),
		blacklist:             NewMapBlacklist(),
		blacklistPeer:         make(chan peer.ID),
		seenMsgTTL:            TimeCacheDuration,
		seenMsgStrategy:       TimeCacheStrategy,
		idGen:                 newMsgIdGenerator(),
		counter:               uint64(time.Now().UnixNano()),
	}

	for ,  := range  {
		 := ()
		if  != nil {
			return nil, 
		}
	}

	if .signPolicy.mustSign() {
		if .signID == "" {
			return nil, fmt.Errorf("strict signature usage enabled but message author was disabled")
		}
		.signKey = .host.Peerstore().PrivKey(.signID)
		if .signKey == nil {
			return nil, fmt.Errorf("can't sign for peer %s: no private key", .signID)
		}
	}

	.seenMessages = timecache.NewTimeCacheWithStrategy(.seenMsgStrategy, .seenMsgTTL)

	if  := .disc.Start();  != nil {
		return nil, 
	}

	.Attach()

	for ,  := range .Protocols() {
		if .protoMatchFunc != nil {
			.SetStreamHandlerMatch(, .protoMatchFunc(), .handleNewStream)
		} else {
			.SetStreamHandler(, .handleNewStream)
		}
	}
	go .watchForNewPeers()

	.val.Start()

	go .processLoop()

	return , nil
}

// MsgIdFunction returns a unique ID for the passed Message, and PubSub can be customized to use any
// implementation of this function by configuring it with the Option from WithMessageIdFn.
type MsgIdFunction func(pmsg *pb.Message) string

// WithMessageIdFn is an option to customize the way a message ID is computed for a pubsub message.
// The default ID function is DefaultMsgIdFn (concatenate source and seq nr.),
// but it can be customized to e.g. the hash of the message.
func ( MsgIdFunction) Option {
	return func( *PubSub) error {
		.idGen.Default = 
		return nil
	}
}

// PeerFilter is used to filter pubsub peers. It should return true for peers that are accepted for
// a given topic. PubSub can be customized to use any implementation of this function by configuring
// it with the Option from WithPeerFilter.
type PeerFilter func(pid peer.ID, topic string) bool

// WithPeerFilter is an option to set a filter for pubsub peers.
// The default peer filter is DefaultPeerFilter (which always returns true), but it can be customized
// to any custom implementation.
func ( PeerFilter) Option {
	return func( *PubSub) error {
		.peerFilter = 
		return nil
	}
}

// WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer
// We start dropping messages to a peer if the outbound queue if full
func ( int) Option {
	return func( *PubSub) error {
		if  <= 0 {
			return errors.New("outbound queue size must always be positive")
		}
		.peerOutboundQueueSize = 
		return nil
	}
}

// WithMessageSignaturePolicy sets the mode of operation for producing and verifying message signatures.
func ( MessageSignaturePolicy) Option {
	return func( *PubSub) error {
		.signPolicy = 
		return nil
	}
}

// WithMessageSigning enables or disables message signing (enabled by default).
// Deprecated: signature verification without message signing,
// or message signing without verification, are not recommended.
func ( bool) Option {
	return func( *PubSub) error {
		if  {
			.signPolicy |= msgSigning
		} else {
			.signPolicy &^= msgSigning
		}
		return nil
	}
}

// WithMessageAuthor sets the author for outbound messages to the given peer ID
// (defaults to the host's ID). If message signing is enabled, the private key
// must be available in the host's peerstore.
func ( peer.ID) Option {
	return func( *PubSub) error {
		 := 
		if  == "" {
			 = .host.ID()
		}
		.signID = 
		return nil
	}
}

// WithNoAuthor omits the author and seq-number data of messages, and disables the use of signatures.
// Not recommended to use with the default message ID function, see WithMessageIdFn.
func () Option {
	return func( *PubSub) error {
		.signID = ""
		.signPolicy &^= msgSigning
		return nil
	}
}

// WithStrictSignatureVerification is an option to enable or disable strict message signing.
// When enabled (which is the default), unsigned messages will be discarded.
// Deprecated: signature verification without message signing,
// or message signing without verification, are not recommended.
func ( bool) Option {
	return func( *PubSub) error {
		if  {
			.signPolicy |= msgVerification
		} else {
			.signPolicy &^= msgVerification
		}
		return nil
	}
}

// WithBlacklist provides an implementation of the blacklist; the default is a
// MapBlacklist
func ( Blacklist) Option {
	return func( *PubSub) error {
		.blacklist = 
		return nil
	}
}

// WithDiscovery provides a discovery mechanism used to bootstrap and provide peers into PubSub
func ( discovery.Discovery,  ...DiscoverOpt) Option {
	return func( *PubSub) error {
		 := defaultDiscoverOptions()
		for ,  := range  {
			 := ()
			if  != nil {
				return 
			}
		}

		.disc.discovery = &pubSubDiscovery{Discovery: , opts: .opts}
		.disc.options = 
		return nil
	}
}

// WithEventTracer provides a tracer for the pubsub system
func ( EventTracer) Option {
	return func( *PubSub) error {
		if .tracer != nil {
			.tracer.tracer = 
		} else {
			.tracer = &pubsubTracer{tracer: , pid: .host.ID(), idGen: .idGen}
		}
		return nil
	}
}

// WithRawTracer adds a raw tracer to the pubsub system.
// Multiple tracers can be added using multiple invocations of the option.
func ( RawTracer) Option {
	return func( *PubSub) error {
		if .tracer != nil {
			.tracer.raw = append(.tracer.raw, )
		} else {
			.tracer = &pubsubTracer{raw: []RawTracer{}, pid: .host.ID(), idGen: .idGen}
		}
		return nil
	}
}

// WithMaxMessageSize sets the global maximum message size for pubsub wire
// messages. The default value is 1MiB (DefaultMaxMessageSize).
//
// Observe the following warnings when setting this option.
//
// WARNING #1: Make sure to change the default protocol prefixes for floodsub
// (FloodSubID) and gossipsub (GossipSubID). This avoids accidentally joining
// the public default network, which uses the default max message size, and
// therefore will cause messages to be dropped.
//
// WARNING #2: Reducing the default max message limit is fine, if you are
// certain that your application messages will not exceed the new limit.
// However, be wary of increasing the limit, as pubsub networks are naturally
// write-amplifying, i.e. for every message we receive, we send D copies of the
// message to our peers. If those messages are large, the bandwidth requirements
// will grow linearly. Note that propagation is sent on the uplink, which
// traditionally is more constrained than the downlink. Instead, consider
// out-of-band retrieval for large messages, by sending a CID (Content-ID) or
// another type of locator, such that messages can be fetched on-demand, rather
// than being pushed proactively. Under this design, you'd use the pubsub layer
// as a signalling system, rather than a data delivery system.
func ( int) Option {
	return func( *PubSub) error {
		.maxMessageSize = 
		return nil
	}
}

// WithProtocolMatchFn sets a custom matching function for protocol selection to
// be used by the protocol handler on the Host's Mux. Should be combined with
// WithGossipSubProtocols feature function for checking if certain protocol features
// are supported
func ( ProtocolMatchFn) Option {
	return func( *PubSub) error {
		.protoMatchFunc = 
		return nil
	}
}

// WithSeenMessagesTTL configures when a previously seen message ID can be forgotten about
func ( time.Duration) Option {
	return func( *PubSub) error {
		.seenMsgTTL = 
		return nil
	}
}

// WithSeenMessagesStrategy configures which type of lookup/cleanup strategy is used by the seen messages cache
func ( timecache.Strategy) Option {
	return func( *PubSub) error {
		.seenMsgStrategy = 
		return nil
	}
}

// WithAppSpecificRpcInspector sets a hook that inspect incomings RPCs prior to
// processing them.  The inspector is invoked on an accepted RPC just before it
// is handled.  If inspector's error is nil, the RPC is handled. Otherwise, it
// is dropped.
func ( func(peer.ID, *RPC) error) Option {
	return func( *PubSub) error {
		.appSpecificRpcInspector = 
		return nil
	}
}

// processLoop handles all inputs arriving on the channels
func ( *PubSub) ( context.Context) {
	defer func() {
		// Clean up go routines.
		for ,  := range .peers {
			.Close()
		}
		.peers = nil
		.topics = nil
		.seenMessages.Done()
	}()

	for {
		select {
		case <-.newPeers:
			.handlePendingPeers()

		case  := <-.newPeerStream:
			 := .Conn().RemotePeer()

			,  := .peers[]
			if ! {
				log.Warn("new stream for unknown peer: ", )
				.Reset()
				continue
			}

			if .blacklist.Contains() {
				log.Warn("closing stream for blacklisted peer: ", )
				.Close()
				delete(.peers, )
				.Reset()
				continue
			}

			.rt.AddPeer(, .Protocol())

		case  := <-.newPeerError:
			delete(.peers, )

		case <-.peerDead:
			.handleDeadPeers()

		case  := <-.getTopics:
			var  []string
			for  := range .mySubs {
				 = append(, )
			}
			.resp <- 
		case  := <-.addTopic:
			.handleAddTopic()
		case  := <-.rmTopic:
			.handleRemoveTopic()
		case  := <-.cancelCh:
			.handleRemoveSubscription()
		case  := <-.addSub:
			.handleAddSubscription()
		case  := <-.addRelay:
			.handleAddRelay()
		case  := <-.rmRelay:
			.handleRemoveRelay()
		case  := <-.getPeers:
			,  := .topics[.topic]
			if .topic != "" && ! {
				.resp <- nil
				continue
			}
			var  []peer.ID
			for  := range .peers {
				if .topic != "" {
					,  := []
					if ! {
						continue
					}
				}
				 = append(, )
			}
			.resp <- 
		case  := <-.incoming:
			.handleIncomingRPC()

		case  := <-.sendMsg:
			.publishMessage()

		case  := <-.addVal:
			.val.AddValidator()

		case  := <-.rmVal:
			.val.RemoveValidator()

		case  := <-.eval:
			()

		case  := <-.blacklistPeer:
			log.Infof("Blacklisting peer %s", )
			.blacklist.Add()

			,  := .peers[]
			if  {
				.Close()
				delete(.peers, )
				for ,  := range .topics {
					if ,  := [];  {
						delete(, )
						.notifyLeave(, )
					}
				}
				.rt.RemovePeer()
			}

		case <-.Done():
			log.Info("pubsub processloop shutting down")
			return
		}
	}
}

func ( *PubSub) () {
	.newPeersPrioLk.Lock()

	if len(.newPeersPend) == 0 {
		.newPeersPrioLk.Unlock()
		return
	}

	 := .newPeersPend
	.newPeersPend = make(map[peer.ID]struct{})
	.newPeersPrioLk.Unlock()

	for  := range  {
		// Make sure we have a non-limited connection. We do this late because we may have
		// disconnected in the meantime.
		if .host.Network().Connectedness() != network.Connected {
			continue
		}

		if ,  := .peers[];  {
			log.Debug("already have connection to peer: ", )
			continue
		}

		if .blacklist.Contains() {
			log.Warn("ignoring connection from blacklisted peer: ", )
			continue
		}

		 := newRpcQueue(.peerOutboundQueueSize)
		.Push(.getHelloPacket(), true)
		go .handleNewPeer(.ctx, , )
		.peers[] = 
	}
}

func ( *PubSub) () {
	.peerDeadPrioLk.Lock()

	if len(.peerDeadPend) == 0 {
		.peerDeadPrioLk.Unlock()
		return
	}

	 := .peerDeadPend
	.peerDeadPend = make(map[peer.ID]struct{})
	.peerDeadPrioLk.Unlock()

	for  := range  {
		,  := .peers[]
		if ! {
			continue
		}

		.Close()
		delete(.peers, )

		for ,  := range .topics {
			if ,  := [];  {
				delete(, )
				.notifyLeave(, )
			}
		}

		.rt.RemovePeer()

		if .host.Network().Connectedness() == network.Connected {
			,  := .deadPeerBackoff.updateAndGet()
			if  != nil {
				log.Debug()
				continue
			}

			// still connected, must be a duplicate connection being closed.
			// we respawn the writer as we need to ensure there is a stream active
			log.Debugf("peer declared dead but still connected; respawning writer: %s", )
			 := newRpcQueue(.peerOutboundQueueSize)
			.Push(.getHelloPacket(), true)
			.peers[] = 
			go .handleNewPeerWithBackoff(.ctx, , , )
		}
	}
}

// handleAddTopic adds a tracker for a particular topic.
// Only called from processLoop.
func ( *PubSub) ( *addTopicReq) {
	 := .topic
	 := .topic

	,  := .myTopics[]
	if  {
		.resp <- 
		return
	}

	.myTopics[] = 
	.resp <- 
}

// handleRemoveTopic removes Topic tracker from bookkeeping.
// Only called from processLoop.
func ( *PubSub) ( *rmTopicReq) {
	 := .myTopics[.topic.topic]

	if  == nil {
		.resp <- nil
		return
	}

	if len(.evtHandlers) == 0 &&
		len(.mySubs[.topic.topic]) == 0 &&
		.myRelays[.topic.topic] == 0 {
		delete(.myTopics, .topic)
		.resp <- nil
		return
	}

	.resp <- fmt.Errorf("cannot close topic: outstanding event handlers or subscriptions")
}

// handleRemoveSubscription removes Subscription sub from bookeeping.
// If this was the last subscription and no more relays exist for a given topic,
// it will also announce that this node is not subscribing to this topic anymore.
// Only called from processLoop.
func ( *PubSub) ( *Subscription) {
	 := .mySubs[.topic]

	if  == nil {
		return
	}

	.err = ErrSubscriptionCancelled
	.close()
	delete(, )

	if len() == 0 {
		delete(.mySubs, .topic)

		// stop announcing only if there are no more subs and relays
		if .myRelays[.topic] == 0 {
			.disc.StopAdvertise(.topic)
			.announce(.topic, false)
			.rt.Leave(.topic)
		}
	}
}

// handleAddSubscription adds a Subscription for a particular topic. If it is
// the first subscription and no relays exist so far for the topic, it will
// announce that this node subscribes to the topic.
// Only called from processLoop.
func ( *PubSub) ( *addSubReq) {
	 := .sub
	 := .mySubs[.topic]

	// announce we want this topic if neither subs nor relays exist so far
	if len() == 0 && .myRelays[.topic] == 0 {
		.disc.Advertise(.topic)
		.announce(.topic, true)
		.rt.Join(.topic)
	}

	// make new if not there
	if  == nil {
		.mySubs[.topic] = make(map[*Subscription]struct{})
	}

	.cancelCh = .cancelCh

	.mySubs[.topic][] = struct{}{}

	.resp <- 
}

// handleAddRelay adds a relay for a particular topic. If it is
// the first relay and no subscriptions exist so far for the topic , it will
// announce that this node relays for the topic.
// Only called from processLoop.
func ( *PubSub) ( *addRelayReq) {
	 := .topic

	.myRelays[]++

	// announce we want this topic if neither relays nor subs exist so far
	if .myRelays[] == 1 && len(.mySubs[]) == 0 {
		.disc.Advertise()
		.announce(, true)
		.rt.Join()
	}

	// flag used to prevent calling cancel function multiple times
	 := false

	 := func() {
		if  {
			return
		}

		select {
		case .rmRelay <- :
			 = true
		case <-.ctx.Done():
		}
	}

	.resp <- 
}

// handleRemoveRelay removes one relay reference from bookkeeping.
// If this was the last relay reference and no more subscriptions exist
// for a given topic, it will also announce that this node is not relaying
// for this topic anymore.
// Only called from processLoop.
func ( *PubSub) ( string) {
	if .myRelays[] == 0 {
		return
	}

	.myRelays[]--

	if .myRelays[] == 0 {
		delete(.myRelays, )

		// stop announcing only if there are no more relays and subs
		if len(.mySubs[]) == 0 {
			.disc.StopAdvertise()
			.announce(, false)
			.rt.Leave()
		}
	}
}

// announce announces whether or not this node is interested in a given topic
// Only called from processLoop.
func ( *PubSub) ( string,  bool) {
	 := &pb.RPC_SubOpts{
		Topicid:   &,
		Subscribe: &,
	}

	 := rpcWithSubs()
	for ,  := range .peers {
		 := .Push(, false)
		if  != nil {
			log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", )
			.tracer.DropRPC(, )
			go .announceRetry(, , )
			continue
		}
		.tracer.SendRPC(, )
	}
}

func ( *PubSub) ( peer.ID,  string,  bool) {
	time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond)

	 := func() {
		,  := .mySubs[]
		,  := .myRelays[]

		 :=  || 

		if ( && ) || (! && !) {
			.doAnnounceRetry(, , )
		}
	}

	select {
	case .eval <- :
	case <-.ctx.Done():
	}
}

func ( *PubSub) ( peer.ID,  string,  bool) {
	,  := .peers[]
	if ! {
		return
	}

	 := &pb.RPC_SubOpts{
		Topicid:   &,
		Subscribe: &,
	}

	 := rpcWithSubs()
	 := .Push(, false)
	if  != nil {
		log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", )
		.tracer.DropRPC(, )
		go .announceRetry(, , )
		return
	}
	.tracer.SendRPC(, )
}

// notifySubs sends a given message to all corresponding subscribers.
// Only called from processLoop.
func ( *PubSub) ( *Message) {
	 := .GetTopic()
	 := .mySubs[]
	for  := range  {
		select {
		case .ch <- :
		default:
			.tracer.UndeliverableMessage()
			log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", )
		}
	}
}

// seenMessage returns whether we already saw this message before
func ( *PubSub) ( string) bool {
	return .seenMessages.Has()
}

// markSeen marks a message as seen such that seenMessage returns `true' for the given id
// returns true if the message was freshly marked
func ( *PubSub) ( string) bool {
	return .seenMessages.Add()
}

// subscribedToMessage returns whether we are subscribed to one of the topics
// of a given message
func ( *PubSub) ( *pb.Message) bool {
	if len(.mySubs) == 0 {
		return false
	}

	 := .GetTopic()
	,  := .mySubs[]

	return 
}

// canRelayMsg returns whether we are able to relay for one of the topics
// of a given message
func ( *PubSub) ( *pb.Message) bool {
	if len(.myRelays) == 0 {
		return false
	}

	 := .GetTopic()
	 := .myRelays[]

	return  > 0
}

func ( *PubSub) ( string,  peer.ID) {
	if ,  := .myTopics[];  {
		.sendNotification(PeerEvent{PeerLeave, })
	}
}

func ( *PubSub) ( *RPC) {
	// pass the rpc through app specific validation (if any available).
	if .appSpecificRpcInspector != nil {
		// check if the RPC is allowed by the external inspector
		if  := .appSpecificRpcInspector(.from, );  != nil {
			log.Debugf("application-specific inspection failed, rejecting incoming rpc: %s", )
			return // reject the RPC
		}
	}

	.tracer.RecvRPC()

	 := .GetSubscriptions()
	if len() != 0 && .subFilter != nil {
		var  error
		,  = .subFilter.FilterIncomingSubscriptions(.from, )
		if  != nil {
			log.Debugf("subscription filter error: %s; ignoring RPC", )
			return
		}
	}

	for ,  := range  {
		 := .GetTopicid()

		if .GetSubscribe() {
			,  := .topics[]
			if ! {
				 = make(map[peer.ID]struct{})
				.topics[] = 
			}

			if _,  = [.from]; ! {
				[.from] = struct{}{}
				if ,  := .myTopics[];  {
					 := .from
					.sendNotification(PeerEvent{PeerJoin, })
				}
			}
		} else {
			,  := .topics[]
			if ! {
				continue
			}

			if ,  := [.from];  {
				delete(, .from)
				.notifyLeave(, .from)
			}
		}
	}

	// ask the router to vet the peer before commiting any processing resources
	switch .rt.AcceptFrom(.from) {
	case AcceptNone:
		log.Debugf("received RPC from router graylisted peer %s; dropping RPC", .from)
		return

	case AcceptControl:
		if len(.GetPublish()) > 0 {
			log.Debugf("peer %s was throttled by router; ignoring %d payload messages", .from, len(.GetPublish()))
		}
		.tracer.ThrottlePeer(.from)

	case AcceptAll:
		var  []*Message
		for ,  := range .GetPublish() {
			if !(.subscribedToMsg() || .canRelayMsg()) {
				log.Debug("received message in topic we didn't subscribe to; ignoring message")
				continue
			}

			 := &Message{, "", .from, nil, false}
			if .shouldPush() {
				 = append(, )
			}
		}
		.rt.PreValidation()
		for ,  := range  {
			.pushMsg()
		}
	}

	.rt.HandleRPC()
}

// DefaultMsgIdFn returns a unique ID of the passed Message
func ( *pb.Message) string {
	return string(.GetFrom()) + string(.GetSeqno())
}

// DefaultPeerFilter accepts all peers on all topics
func ( peer.ID,  string) bool {
	return true
}

// shouldPush filters a message before validating and pushing it
// It returns true if the message can be further validated and pushed
func ( *PubSub) ( *Message) bool {
	 := .ReceivedFrom
	// reject messages from blacklisted peers
	if .blacklist.Contains() {
		log.Debugf("dropping message from blacklisted peer %s", )
		.tracer.RejectMessage(, RejectBlacklstedPeer)
		return false
	}

	// even if they are forwarded by good peers
	if .blacklist.Contains(.GetFrom()) {
		log.Debugf("dropping message from blacklisted source %s", )
		.tracer.RejectMessage(, RejectBlacklistedSource)
		return false
	}

	 := .checkSigningPolicy()
	if  != nil {
		log.Debugf("dropping message from %s: %s", , )
		return false
	}

	// reject messages claiming to be from ourselves but not locally published
	 := .host.ID()
	if peer.ID(.GetFrom()) ==  &&  !=  {
		log.Debugf("dropping message claiming to be from self but forwarded from %s", )
		.tracer.RejectMessage(, RejectSelfOrigin)
		return false
	}

	// have we already seen and validated this message?
	 := .idGen.ID()
	if .seenMessage() {
		.tracer.DuplicateMessage()
		return false
	}

	return true
}

// pushMsg pushes a message performing validation as necessary
func ( *PubSub) ( *Message) {
	 := .ReceivedFrom
	 := .idGen.ID()

	if !.val.Push(, ) {
		return
	}

	if .markSeen() {
		.publishMessage()
	}
}

func ( *PubSub) ( *Message) error {
	// reject unsigned messages when strict before we even process the id
	if .signPolicy.mustVerify() {
		if .signPolicy.mustSign() {
			if .Signature == nil {
				.tracer.RejectMessage(, RejectMissingSignature)
				return ValidationError{Reason: RejectMissingSignature}
			}
			// Actual signature verification happens in the validation pipeline,
			// after checking if the message was already seen or not,
			// to avoid unnecessary signature verification processing-cost.
		} else {
			if .Signature != nil {
				.tracer.RejectMessage(, RejectUnexpectedSignature)
				return ValidationError{Reason: RejectUnexpectedSignature}
			}
			// If we are expecting signed messages, and not authoring messages,
			// then do no accept seq numbers, from data, or key data.
			// The default msgID function still relies on Seqno and From,
			// but is not used if we are not authoring messages ourselves.
			if .signID == "" {
				if .Seqno != nil || .From != nil || .Key != nil {
					.tracer.RejectMessage(, RejectUnexpectedAuthInfo)
					return ValidationError{Reason: RejectUnexpectedAuthInfo}
				}
			}
		}
	}

	return nil
}

func ( *PubSub) ( *Message) {
	.tracer.DeliverMessage()
	.notifySubs()
	if !.Local {
		.rt.Publish()
	}
}

type addTopicReq struct {
	topic *Topic
	resp  chan *Topic
}

type rmTopicReq struct {
	topic *Topic
	resp  chan error
}

type TopicOptions struct{}

type TopicOpt func(t *Topic) error

// WithTopicMessageIdFn sets custom MsgIdFunction for a Topic, enabling topics to have own msg id generation rules.
func ( MsgIdFunction) TopicOpt {
	return func( *Topic) error {
		.p.idGen.Set(.topic, )
		return nil
	}
}

// Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if
// the Topic handle already exists.
func ( *PubSub) ( string,  ...TopicOpt) (*Topic, error) {
	, ,  := .tryJoin(, ...)
	if  != nil {
		return nil, 
	}

	if ! {
		return nil, fmt.Errorf("topic already exists")
	}

	return , nil
}

// tryJoin is an internal function that tries to join a topic
// Returns the topic if it can be created or found
// Returns true if the topic was newly created, false otherwise
// Can be removed once pubsub.Publish() and pubsub.Subscribe() are removed
func ( *PubSub) ( string,  ...TopicOpt) (*Topic, bool, error) {
	if .subFilter != nil && !.subFilter.CanSubscribe() {
		return nil, false, fmt.Errorf("topic is not allowed by the subscription filter")
	}

	 := &Topic{
		p:           ,
		topic:       ,
		evtHandlers: make(map[*TopicEventHandler]struct{}),
	}

	for ,  := range  {
		 := ()
		if  != nil {
			return nil, false, 
		}
	}

	 := make(chan *Topic, 1)
	select {
	case .p.addTopic <- &addTopicReq{
		topic: ,
		resp:  ,
	}:
	case <-.p.ctx.Done():
		return nil, false, .p.ctx.Err()
	}
	 := <-

	if  !=  {
		return , false, nil
	}

	return , true, nil
}

type addSubReq struct {
	sub  *Subscription
	resp chan *Subscription
}

type SubOpt func(sub *Subscription) error

// Subscribe returns a new Subscription for the given topic.
// Note that subscription is not an instantaneous operation. It may take some time
// before the subscription is processed by the pubsub main loop and propagated to our peers.
//
// Deprecated: use pubsub.Join() and topic.Subscribe() instead
func ( *PubSub) ( string,  ...SubOpt) (*Subscription, error) {
	// ignore whether the topic was newly created or not, since either way we have a valid topic to work with
	, ,  := .tryJoin()
	if  != nil {
		return nil, 
	}

	return .Subscribe(...)
}

// WithBufferSize is a Subscribe option to customize the size of the subscribe output buffer.
// The default length is 32 but it can be configured to avoid dropping messages if the consumer is not reading fast
// enough.
func ( int) SubOpt {
	return func( *Subscription) error {
		.ch = make(chan *Message, )
		return nil
	}
}

type topicReq struct {
	resp chan []string
}

// GetTopics returns the topics this node is subscribed to.
func ( *PubSub) () []string {
	 := make(chan []string, 1)
	select {
	case .getTopics <- &topicReq{resp: }:
	case <-.ctx.Done():
		return nil
	}
	return <-
}

// Publish publishes data to the given topic.
//
// Deprecated: use pubsub.Join() and topic.Publish() instead
func ( *PubSub) ( string,  []byte,  ...PubOpt) error {
	// ignore whether the topic was newly created or not, since either way we have a valid topic to work with
	, ,  := .tryJoin()
	if  != nil {
		return 
	}

	return .Publish(context.TODO(), , ...)
}

func ( *PubSub) () []byte {
	 := make([]byte, 8)
	 := atomic.AddUint64(&.counter, 1)
	binary.BigEndian.PutUint64(, )
	return 
}

type listPeerReq struct {
	resp  chan []peer.ID
	topic string
}

// ListPeers returns a list of peers we are connected to in the given topic.
func ( *PubSub) ( string) []peer.ID {
	 := make(chan []peer.ID)
	select {
	case .getPeers <- &listPeerReq{
		resp:  ,
		topic: ,
	}:
	case <-.ctx.Done():
		return nil
	}
	return <-
}

// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func ( *PubSub) ( peer.ID) {
	select {
	case .blacklistPeer <- :
	case <-.ctx.Done():
	}
}

// RegisterTopicValidator registers a validator for topic.
// By default validators are asynchronous, which means they will run in a separate goroutine.
// The number of active goroutines is controlled by global and per topic validator
// throttles; if it exceeds the throttle threshold, messages will be dropped.
func ( *PubSub) ( string,  interface{},  ...ValidatorOpt) error {
	 := &addValReq{
		topic:    ,
		validate: ,
		resp:     make(chan error, 1),
	}

	for ,  := range  {
		 := ()
		if  != nil {
			return 
		}
	}

	select {
	case .addVal <- :
	case <-.ctx.Done():
		return .ctx.Err()
	}
	return <-.resp
}

// UnregisterTopicValidator removes a validator from a topic.
// Returns an error if there was no validator registered with the topic.
func ( *PubSub) ( string) error {
	 := &rmValReq{
		topic: ,
		resp:  make(chan error, 1),
	}

	select {
	case .rmVal <- :
	case <-.ctx.Done():
		return .ctx.Err()
	}
	return <-.resp
}

type RelayCancelFunc func()

type addRelayReq struct {
	topic string
	resp  chan RelayCancelFunc
}