package badger
import (
"sync"
"sync/atomic"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/badger/v4/trie"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2/z"
)
type subscriber struct {
id uint64
matches []pb .Match
sendCh chan *pb .KVList
subCloser *z .Closer
active *atomic .Uint64
}
type publisher struct {
sync .Mutex
pubCh chan requests
subscribers map [uint64 ]subscriber
nextID uint64
indexer *trie .Trie
}
func newPublisher() *publisher {
return &publisher {
pubCh : make (chan requests , 1000 ),
subscribers : make (map [uint64 ]subscriber ),
nextID : 0 ,
indexer : trie .NewTrie (),
}
}
func (p *publisher ) listenForUpdates (c *z .Closer ) {
defer func () {
p .cleanSubscribers ()
c .Done ()
}()
slurp := func (batch requests ) {
for {
select {
case reqs := <- p .pubCh :
batch = append (batch , reqs ...)
default :
p .publishUpdates (batch )
return
}
}
}
for {
select {
case <- c .HasBeenClosed ():
return
case reqs := <- p .pubCh :
slurp (reqs )
}
}
}
func (p *publisher ) publishUpdates (reqs requests ) {
p .Lock ()
defer func () {
p .Unlock ()
reqs .DecrRef ()
}()
batchedUpdates := make (map [uint64 ]*pb .KVList )
for _ , req := range reqs {
for _ , e := range req .Entries {
ids := p .indexer .Get (e .Key )
if len (ids ) == 0 {
continue
}
k := y .SafeCopy (nil , e .Key )
kv := &pb .KV {
Key : y .ParseKey (k ),
Value : y .SafeCopy (nil , e .Value ),
Meta : []byte {e .UserMeta },
ExpiresAt : e .ExpiresAt ,
Version : y .ParseTs (k ),
}
for id := range ids {
if _ , ok := batchedUpdates [id ]; !ok {
batchedUpdates [id ] = &pb .KVList {}
}
batchedUpdates [id ].Kv = append (batchedUpdates [id ].Kv , kv )
}
}
}
for id , kvs := range batchedUpdates {
if p .subscribers [id ].active .Load () == 1 {
p .subscribers [id ].sendCh <- kvs
}
}
}
func (p *publisher ) newSubscriber (c *z .Closer , matches []pb .Match ) (subscriber , error ) {
p .Lock ()
defer p .Unlock ()
ch := make (chan *pb .KVList , 1000 )
id := p .nextID
p .nextID ++
s := subscriber {
id : id ,
matches : matches ,
sendCh : ch ,
subCloser : c ,
active : new (atomic .Uint64 ),
}
s .active .Store (1 )
p .subscribers [id ] = s
for _ , m := range matches {
if err := p .indexer .AddMatch (m , id ); err != nil {
return subscriber {}, err
}
}
return s , nil
}
func (p *publisher ) cleanSubscribers () {
p .Lock ()
defer p .Unlock ()
for id , s := range p .subscribers {
for _ , m := range s .matches {
_ = p .indexer .DeleteMatch (m , id )
}
delete (p .subscribers , id )
s .subCloser .SignalAndWait ()
}
}
func (p *publisher ) deleteSubscriber (id uint64 ) {
p .Lock ()
defer p .Unlock ()
if s , ok := p .subscribers [id ]; ok {
for _ , m := range s .matches {
_ = p .indexer .DeleteMatch (m , id )
}
}
delete (p .subscribers , id )
}
func (p *publisher ) sendUpdates (reqs requests ) {
if p .noOfSubscribers () != 0 {
reqs .IncrRef ()
p .pubCh <- reqs
}
}
func (p *publisher ) noOfSubscribers () int {
p .Lock ()
defer p .Unlock ()
return len (p .subscribers )
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .