package badger
import (
"bytes"
"context"
"sort"
"sync"
"sync/atomic"
"time"
humanize "github.com/dustin/go-humanize"
"google.golang.org/protobuf/proto"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2/z"
)
const batchSize = 16 << 20
var maxStreamSize = uint64 (100 << 20 )
type Stream struct {
Prefix []byte
NumGo int
LogPrefix string
ChooseKey func (item *Item ) bool
MaxSize uint64
KeyToList func (key []byte , itr *Iterator ) (*pb .KVList , error )
UseKeyToListWithThreadId bool
KeyToListWithThreadId func (key []byte , itr *Iterator , threadId int ) (*pb .KVList , error )
FinishThread func (threadId int ) (*pb .KVList , error )
Send func (buf *z .Buffer ) error
SinceTs uint64
readTs uint64
db *DB
rangeCh chan keyRange
kvChan chan *z .Buffer
nextStreamId atomic .Uint32
doneMarkers bool
scanned atomic .Uint64
numProducers atomic .Int32
}
func (st *Stream ) SendDoneMarkers (done bool ) {
st .doneMarkers = done
}
func (st *Stream ) ToList (key []byte , itr *Iterator ) (*pb .KVList , error ) {
a := itr .Alloc
ka := a .Copy (key )
list := &pb .KVList {}
for ; itr .Valid (); itr .Next () {
item := itr .Item ()
if item .IsDeletedOrExpired () {
break
}
if !bytes .Equal (key , item .Key ()) {
break
}
kv := y .NewKV (a )
kv .Key = ka
if err := item .Value (func (val []byte ) error {
kv .Value = a .Copy (val )
return nil
}); err != nil {
return nil , err
}
kv .Version = item .Version ()
kv .ExpiresAt = item .ExpiresAt ()
kv .UserMeta = a .Copy ([]byte {item .UserMeta ()})
list .Kv = append (list .Kv , kv )
if st .db .opt .NumVersionsToKeep == 1 {
break
}
if item .DiscardEarlierVersions () {
break
}
}
return list , nil
}
func (st *Stream ) produceRanges (ctx context .Context ) {
ranges := st .db .Ranges (st .Prefix , st .NumGo )
y .AssertTrue (len (ranges ) > 0 )
y .AssertTrue (ranges [0 ].left == nil )
y .AssertTrue (ranges [len (ranges )-1 ].right == nil )
st .db .opt .Infof ("Number of ranges found: %d\n" , len (ranges ))
sort .Slice (ranges , func (i , j int ) bool {
return ranges [i ].size > ranges [j ].size
})
for i , r := range ranges {
st .rangeCh <- *r
st .db .opt .Infof ("Sent range %d for iteration: [%x, %x) of size: %s\n" ,
i , r .left , r .right , humanize .IBytes (uint64 (r .size )))
}
close (st .rangeCh )
}
func (st *Stream ) produceKVs (ctx context .Context , threadId int ) error {
st .numProducers .Add (1 )
defer st .numProducers .Add (-1 )
var txn *Txn
if st .readTs > 0 {
txn = st .db .NewTransactionAt (st .readTs , false )
} else {
txn = st .db .NewTransaction (false )
}
defer txn .Discard ()
outList := z .NewBuffer (2 *batchSize , "Stream.ProduceKVs" )
defer func () {
_ = outList .Release ()
}()
iterate := func (kr keyRange ) error {
iterOpts := DefaultIteratorOptions
iterOpts .AllVersions = true
iterOpts .Prefix = st .Prefix
iterOpts .PrefetchValues = true
iterOpts .SinceTs = st .SinceTs
itr := txn .NewIterator (iterOpts )
itr .ThreadId = threadId
defer itr .Close ()
itr .Alloc = z .NewAllocator (1 <<20 , "Stream.Iterate" )
defer itr .Alloc .Release ()
streamId := st .nextStreamId .Add (1 )
var scanned int
sendIt := func () error {
select {
case st .kvChan <- outList :
outList = z .NewBuffer (2 *batchSize , "Stream.ProduceKVs" )
st .scanned .Add (uint64 (itr .scanned - scanned ))
scanned = itr .scanned
case <- ctx .Done ():
return ctx .Err ()
}
return nil
}
var prevKey []byte
for itr .Seek (kr .left ); itr .Valid (); {
item := itr .Item ()
if bytes .Equal (item .Key (), prevKey ) {
itr .Next ()
continue
}
prevKey = append (prevKey [:0 ], item .Key ()...)
if len (kr .right ) > 0 && bytes .Compare (item .Key (), kr .right ) >= 0 {
break
}
if st .ChooseKey != nil && !st .ChooseKey (item ) {
continue
}
itr .Alloc .Reset ()
var list *pb .KVList
var err error
if st .UseKeyToListWithThreadId {
list , err = st .KeyToListWithThreadId (item .KeyCopy (nil ), itr , threadId )
} else {
list , err = st .KeyToList (item .KeyCopy (nil ), itr )
}
if err != nil {
st .db .opt .Warningf ("While reading key: %x, got error: %v" , item .Key (), err )
continue
}
if list == nil || len (list .Kv ) == 0 {
continue
}
for _ , kv := range list .Kv {
kv .StreamId = streamId
KVToBuffer (kv , outList )
if outList .LenNoPadding () < batchSize {
continue
}
if err := sendIt (); err != nil {
return err
}
}
}
if st .UseKeyToListWithThreadId {
if kvs , err := st .FinishThread (threadId ); err != nil {
return err
} else {
for _ , kv := range kvs .Kv {
kv .StreamId = streamId
KVToBuffer (kv , outList )
if outList .LenNoPadding () < batchSize {
continue
}
if err := sendIt (); err != nil {
return err
}
}
}
}
if st .doneMarkers {
kv := &pb .KV {
StreamId : streamId ,
StreamDone : true ,
}
KVToBuffer (kv , outList )
}
return sendIt ()
}
for {
select {
case kr , ok := <- st .rangeCh :
if !ok {
return nil
}
if err := iterate (kr ); err != nil {
return err
}
case <- ctx .Done ():
return ctx .Err ()
}
}
}
func (st *Stream ) streamKVs (ctx context .Context ) error {
onDiskSize , uncompressedSize := st .db .EstimateSize (st .Prefix )
uncompressedSize = uint64 (float64 (uncompressedSize ) * 1.2 )
st .db .opt .Infof ("%s Streaming about %s of uncompressed data (%s on disk)\n" ,
st .LogPrefix , humanize .IBytes (uncompressedSize ), humanize .IBytes (onDiskSize ))
tickerDur := 5 * time .Second
var bytesSent uint64
t := time .NewTicker (tickerDur )
defer t .Stop ()
now := time .Now ()
sendBatch := func (batch *z .Buffer ) error {
defer func () { _ = batch .Release () }()
sz := uint64 (batch .LenNoPadding ())
if sz == 0 {
return nil
}
bytesSent += sz
if err := st .Send (batch ); err != nil {
st .db .opt .Warningf ("Error while sending: %v\n" , err )
return err
}
return nil
}
slurp := func (batch *z .Buffer ) error {
loop :
for {
if uint64 (batch .LenNoPadding ()) > st .MaxSize {
break loop
}
select {
case kvs , ok := <- st .kvChan :
if !ok {
break loop
}
y .AssertTrue (kvs != nil )
y .Check2 (batch .Write (kvs .Bytes ()))
y .Check (kvs .Release ())
default :
break loop
}
}
return sendBatch (batch )
}
writeRate := y .NewRateMonitor (20 )
scanRate := y .NewRateMonitor (20 )
outer :
for {
var batch *z .Buffer
select {
case <- ctx .Done ():
return ctx .Err ()
case <- t .C :
writeRate .Capture (bytesSent )
scanned := st .scanned .Load ()
scanRate .Capture (scanned )
numProducers := st .numProducers .Load ()
st .db .opt .Infof ("%s [%s] Scan (%d): ~%s/%s at %s/sec. Sent: %s at %s/sec." +
" jemalloc: %s\n" ,
st .LogPrefix , y .FixedDuration (time .Since (now )), numProducers ,
y .IBytesToString (scanned , 1 ), humanize .IBytes (uncompressedSize ),
humanize .IBytes (scanRate .Rate ()),
y .IBytesToString (bytesSent , 1 ), humanize .IBytes (writeRate .Rate ()),
humanize .IBytes (uint64 (z .NumAllocBytes ())))
case kvs , ok := <- st .kvChan :
if !ok {
break outer
}
y .AssertTrue (kvs != nil )
batch = kvs
if err := slurp (batch ); err != nil {
return err
}
}
}
st .db .opt .Infof ("%s Sent data of size %s\n" , st .LogPrefix , humanize .IBytes (bytesSent ))
return nil
}
func (st *Stream ) Orchestrate (ctx context .Context ) error {
ctx , cancel := context .WithCancel (ctx )
defer cancel ()
st .rangeCh = make (chan keyRange , 3 )
st .kvChan = make (chan *z .Buffer , 32 )
if st .KeyToList == nil {
st .KeyToList = st .ToList
}
go st .produceRanges (ctx )
errCh := make (chan error , st .NumGo )
var wg sync .WaitGroup
for i := 0 ; i < st .NumGo ; i ++ {
wg .Add (1 )
go func (threadId int ) {
defer wg .Done ()
if err := st .produceKVs (ctx , threadId ); err != nil {
select {
case errCh <- err :
default :
}
}
}(i )
}
kvErr := make (chan error , 1 )
go func () {
err := st .streamKVs (ctx )
if err != nil {
cancel ()
}
kvErr <- err
}()
wg .Wait ()
close (st .kvChan )
defer func () {
for buf := range st .kvChan {
_ = buf .Release ()
}
}()
select {
case err := <- errCh :
return err
default :
}
err := <-kvErr
return err
}
func (db *DB ) newStream () *Stream {
return &Stream {
db : db ,
NumGo : db .opt .NumGoroutines ,
LogPrefix : "Badger.Stream" ,
MaxSize : maxStreamSize ,
}
}
func (db *DB ) NewStream () *Stream {
if db .opt .managedTxns {
panic ("This API can not be called in managed mode." )
}
return db .newStream ()
}
func (db *DB ) NewStreamAt (readTs uint64 ) *Stream {
if !db .opt .managedTxns {
panic ("This API can only be called in managed mode." )
}
stream := db .newStream ()
stream .readTs = readTs
return stream
}
func BufferToKVList (buf *z .Buffer ) (*pb .KVList , error ) {
var list pb .KVList
err := buf .SliceIterate (func (s []byte ) error {
kv := new (pb .KV )
if err := proto .Unmarshal (s , kv ); err != nil {
return err
}
list .Kv = append (list .Kv , kv )
return nil
})
return &list , err
}
func KVToBuffer (kv *pb .KV , buf *z .Buffer ) {
in := buf .SliceAllocate (proto .Size (kv ))[:0 ]
_ , err := proto .MarshalOptions {}.MarshalAppend (in , kv )
y .AssertTrue (err == nil )
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .