package pubsub
import (
"errors"
"regexp"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
)
var ErrTooManySubscriptions = errors .New ("too many subscriptions" )
type SubscriptionFilter interface {
CanSubscribe (topic string ) bool
FilterIncomingSubscriptions (peer .ID , []*pb .RPC_SubOpts ) ([]*pb .RPC_SubOpts , error )
}
func WithSubscriptionFilter (subFilter SubscriptionFilter ) Option {
return func (ps *PubSub ) error {
ps .subFilter = subFilter
return nil
}
}
func NewAllowlistSubscriptionFilter (topics ...string ) SubscriptionFilter {
allow := make (map [string ]struct {})
for _ , topic := range topics {
allow [topic ] = struct {}{}
}
return &allowlistSubscriptionFilter {allow : allow }
}
type allowlistSubscriptionFilter struct {
allow map [string ]struct {}
}
var _ SubscriptionFilter = (*allowlistSubscriptionFilter )(nil )
func (f *allowlistSubscriptionFilter ) CanSubscribe (topic string ) bool {
_ , ok := f .allow [topic ]
return ok
}
func (f *allowlistSubscriptionFilter ) FilterIncomingSubscriptions (from peer .ID , subs []*pb .RPC_SubOpts ) ([]*pb .RPC_SubOpts , error ) {
return FilterSubscriptions (subs , f .CanSubscribe ), nil
}
func NewRegexpSubscriptionFilter (rx *regexp .Regexp ) SubscriptionFilter {
return &rxSubscriptionFilter {allow : rx }
}
type rxSubscriptionFilter struct {
allow *regexp .Regexp
}
var _ SubscriptionFilter = (*rxSubscriptionFilter )(nil )
func (f *rxSubscriptionFilter ) CanSubscribe (topic string ) bool {
return f .allow .MatchString (topic )
}
func (f *rxSubscriptionFilter ) FilterIncomingSubscriptions (from peer .ID , subs []*pb .RPC_SubOpts ) ([]*pb .RPC_SubOpts , error ) {
return FilterSubscriptions (subs , f .CanSubscribe ), nil
}
func FilterSubscriptions (subs []*pb .RPC_SubOpts , filter func (string ) bool ) []*pb .RPC_SubOpts {
accept := make (map [string ]*pb .RPC_SubOpts )
for _ , sub := range subs {
topic := sub .GetTopicid ()
if !filter (topic ) {
continue
}
otherSub , ok := accept [topic ]
if ok {
if sub .GetSubscribe () != otherSub .GetSubscribe () {
delete (accept , topic )
}
} else {
accept [topic ] = sub
}
}
if len (accept ) == 0 {
return nil
}
result := make ([]*pb .RPC_SubOpts , 0 , len (accept ))
for _ , sub := range accept {
result = append (result , sub )
}
return result
}
func WrapLimitSubscriptionFilter (filter SubscriptionFilter , limit int ) SubscriptionFilter {
return &limitSubscriptionFilter {filter : filter , limit : limit }
}
type limitSubscriptionFilter struct {
filter SubscriptionFilter
limit int
}
var _ SubscriptionFilter = (*limitSubscriptionFilter )(nil )
func (f *limitSubscriptionFilter ) CanSubscribe (topic string ) bool {
return f .filter .CanSubscribe (topic )
}
func (f *limitSubscriptionFilter ) FilterIncomingSubscriptions (from peer .ID , subs []*pb .RPC_SubOpts ) ([]*pb .RPC_SubOpts , error ) {
if len (subs ) > f .limit {
return nil , ErrTooManySubscriptions
}
return f .filter .FilterIncomingSubscriptions (from , subs )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .