package pubsub
import (
"context"
"errors"
"fmt"
"sync"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
)
var ErrTopicClosed = errors .New ("this Topic is closed, try opening a new one" )
var ErrNilSignKey = errors .New ("nil sign key" )
var ErrEmptyPeerID = errors .New ("empty peer ID" )
type Topic struct {
p *PubSub
topic string
evtHandlerMux sync .RWMutex
evtHandlers map [*TopicEventHandler ]struct {}
mux sync .RWMutex
closed bool
}
func (t *Topic ) String () string {
return t .topic
}
func (t *Topic ) SetScoreParams (p *TopicScoreParams ) error {
err := p .validate ()
if err != nil {
return fmt .Errorf ("invalid topic score parameters: %w" , err )
}
t .mux .Lock ()
defer t .mux .Unlock ()
if t .closed {
return ErrTopicClosed
}
result := make (chan error , 1 )
update := func () {
gs , ok := t .p .rt .(*GossipSubRouter )
if !ok {
result <- fmt .Errorf ("pubsub router is not gossipsub" )
return
}
if gs .score == nil {
result <- fmt .Errorf ("peer scoring is not enabled in router" )
return
}
err := gs .score .SetTopicScoreParams (t .topic , p )
result <- err
}
select {
case t .p .eval <- update :
err = <-result
return err
case <- t .p .ctx .Done ():
return t .p .ctx .Err ()
}
}
func (t *Topic ) EventHandler (opts ...TopicEventHandlerOpt ) (*TopicEventHandler , error ) {
t .mux .RLock ()
defer t .mux .RUnlock ()
if t .closed {
return nil , ErrTopicClosed
}
h := &TopicEventHandler {
topic : t ,
err : nil ,
evtLog : make (map [peer .ID ]EventType ),
evtLogCh : make (chan struct {}, 1 ),
}
for _ , opt := range opts {
err := opt (h )
if err != nil {
return nil , err
}
}
done := make (chan struct {}, 1 )
select {
case t .p .eval <- func () {
tmap := t .p .topics [t .topic ]
for p := range tmap {
h .evtLog [p ] = PeerJoin
}
t .evtHandlerMux .Lock ()
t .evtHandlers [h ] = struct {}{}
t .evtHandlerMux .Unlock ()
done <- struct {}{}
}:
case <- t .p .ctx .Done ():
return nil , t .p .ctx .Err ()
}
<-done
return h , nil
}
func (t *Topic ) sendNotification (evt PeerEvent ) {
t .evtHandlerMux .RLock ()
defer t .evtHandlerMux .RUnlock ()
for h := range t .evtHandlers {
h .sendNotification (evt )
}
}
func (t *Topic ) Subscribe (opts ...SubOpt ) (*Subscription , error ) {
t .mux .RLock ()
defer t .mux .RUnlock ()
if t .closed {
return nil , ErrTopicClosed
}
sub := &Subscription {
topic : t .topic ,
ctx : t .p .ctx ,
}
for _ , opt := range opts {
err := opt (sub )
if err != nil {
return nil , err
}
}
if sub .ch == nil {
sub .ch = make (chan *Message , 32 )
}
out := make (chan *Subscription , 1 )
t .p .disc .Discover (sub .topic )
select {
case t .p .addSub <- &addSubReq {
sub : sub ,
resp : out ,
}:
case <- t .p .ctx .Done ():
return nil , t .p .ctx .Err ()
}
return <-out , nil
}
func (t *Topic ) Relay () (RelayCancelFunc , error ) {
t .mux .RLock ()
defer t .mux .RUnlock ()
if t .closed {
return nil , ErrTopicClosed
}
out := make (chan RelayCancelFunc , 1 )
t .p .disc .Discover (t .topic )
select {
case t .p .addRelay <- &addRelayReq {
topic : t .topic ,
resp : out ,
}:
case <- t .p .ctx .Done ():
return nil , t .p .ctx .Err ()
}
return <-out , nil
}
type RouterReady func (rt PubSubRouter , topic string ) (bool , error )
type ProvideKey func () (crypto .PrivKey , peer .ID )
type PublishOptions struct {
ready RouterReady
customKey ProvideKey
local bool
}
type PubOpt func (pub *PublishOptions ) error
func (t *Topic ) Publish (ctx context .Context , data []byte , opts ...PubOpt ) error {
t .mux .RLock ()
defer t .mux .RUnlock ()
if t .closed {
return ErrTopicClosed
}
pid := t .p .signID
key := t .p .signKey
pub := &PublishOptions {}
for _ , opt := range opts {
err := opt (pub )
if err != nil {
return err
}
}
if pub .customKey != nil && !pub .local {
key , pid = pub .customKey ()
if key == nil {
return ErrNilSignKey
}
if len (pid ) == 0 {
return ErrEmptyPeerID
}
}
m := &pb .Message {
Data : data ,
Topic : &t .topic ,
From : nil ,
Seqno : nil ,
}
if pid != "" {
m .From = []byte (pid )
m .Seqno = t .p .nextSeqno ()
}
if key != nil {
m .From = []byte (pid )
err := signMessage (pid , key , m )
if err != nil {
return err
}
}
if pub .ready != nil {
if t .p .disc .discovery != nil {
t .p .disc .Bootstrap (ctx , t .topic , pub .ready )
} else {
var ticker *time .Ticker
readyLoop :
for {
res := make (chan bool , 1 )
select {
case t .p .eval <- func () {
done , _ := pub .ready (t .p .rt , t .topic )
res <- done
}:
if <-res {
break readyLoop
}
case <- t .p .ctx .Done ():
return t .p .ctx .Err ()
case <- ctx .Done ():
return ctx .Err ()
}
if ticker == nil {
ticker = time .NewTicker (200 * time .Millisecond )
defer ticker .Stop ()
}
select {
case <- ticker .C :
case <- ctx .Done ():
return fmt .Errorf ("router is not ready: %w" , ctx .Err ())
}
}
}
}
return t .p .val .PushLocal (&Message {m , "" , t .p .host .ID (), nil , pub .local })
}
func WithReadiness (ready RouterReady ) PubOpt {
return func (pub *PublishOptions ) error {
pub .ready = ready
return nil
}
}
func WithLocalPublication (local bool ) PubOpt {
return func (pub *PublishOptions ) error {
pub .local = local
return nil
}
}
func WithSecretKeyAndPeerId (key crypto .PrivKey , pid peer .ID ) PubOpt {
return func (pub *PublishOptions ) error {
pub .customKey = func () (crypto .PrivKey , peer .ID ) {
return key , pid
}
return nil
}
}
func (t *Topic ) Close () error {
t .mux .Lock ()
defer t .mux .Unlock ()
if t .closed {
return nil
}
req := &rmTopicReq {t , make (chan error , 1 )}
select {
case t .p .rmTopic <- req :
case <- t .p .ctx .Done ():
return t .p .ctx .Err ()
}
err := <-req .resp
if err == nil {
t .closed = true
}
return err
}
func (t *Topic ) ListPeers () []peer .ID {
t .mux .RLock ()
defer t .mux .RUnlock ()
if t .closed {
return []peer .ID {}
}
return t .p .ListPeers (t .topic )
}
type EventType int
const (
PeerJoin EventType = iota
PeerLeave
)
type TopicEventHandler struct {
topic *Topic
err error
evtLogMx sync .Mutex
evtLog map [peer .ID ]EventType
evtLogCh chan struct {}
}
type TopicEventHandlerOpt func (t *TopicEventHandler ) error
type PeerEvent struct {
Type EventType
Peer peer .ID
}
func (t *TopicEventHandler ) Cancel () {
topic := t .topic
t .err = fmt .Errorf ("topic event handler cancelled by calling handler.Cancel()" )
topic .evtHandlerMux .Lock ()
delete (topic .evtHandlers , t )
t .topic .evtHandlerMux .Unlock ()
}
func (t *TopicEventHandler ) sendNotification (evt PeerEvent ) {
t .evtLogMx .Lock ()
t .addToEventLog (evt )
t .evtLogMx .Unlock ()
}
func (t *TopicEventHandler ) addToEventLog (evt PeerEvent ) {
e , ok := t .evtLog [evt .Peer ]
if !ok {
t .evtLog [evt .Peer ] = evt .Type
select {
case t .evtLogCh <- struct {}{}:
default :
}
} else if e != evt .Type {
delete (t .evtLog , evt .Peer )
}
}
func (t *TopicEventHandler ) pullFromEventLog () (PeerEvent , bool ) {
for k , v := range t .evtLog {
evt := PeerEvent {Peer : k , Type : v }
delete (t .evtLog , k )
return evt , true
}
return PeerEvent {}, false
}
func (t *TopicEventHandler ) NextPeerEvent (ctx context .Context ) (PeerEvent , error ) {
for {
t .evtLogMx .Lock ()
evt , ok := t .pullFromEventLog ()
if ok {
if len (t .evtLog ) > 0 {
select {
case t .evtLogCh <- struct {}{}:
default :
}
}
t .evtLogMx .Unlock ()
return evt , nil
}
t .evtLogMx .Unlock ()
select {
case <- t .evtLogCh :
continue
case <- ctx .Done ():
return PeerEvent {}, ctx .Err ()
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .