package badger
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"google.golang.org/protobuf/proto"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2/z"
)
const flushThreshold = 100 << 20
func (db *DB ) Backup (w io .Writer , since uint64 ) (uint64 , error ) {
stream := db .NewStream ()
stream .LogPrefix = "DB.Backup"
stream .SinceTs = since
return stream .Backup (w , since )
}
func (stream *Stream ) Backup (w io .Writer , since uint64 ) (uint64 , error ) {
stream .KeyToList = func (key []byte , itr *Iterator ) (*pb .KVList , error ) {
list := &pb .KVList {}
a := itr .Alloc
for ; itr .Valid (); itr .Next () {
item := itr .Item ()
if !bytes .Equal (item .Key (), key ) {
return list , nil
}
if item .Version () < since {
return nil , fmt .Errorf ("Backup: Item Version: %d less than sinceTs: %d" ,
item .Version (), since )
}
var valCopy []byte
if !item .IsDeletedOrExpired () {
err := item .Value (func (val []byte ) error {
valCopy = a .Copy (val )
return nil
})
if err != nil {
stream .db .opt .Errorf ("Key [%x, %d]. Error while fetching value [%v]\n" ,
item .Key (), item .Version (), err )
return nil , err
}
}
meta := item .meta &^ (bitTxn | bitFinTxn )
kv := y .NewKV (a )
*kv = pb .KV {
Key : a .Copy (item .Key ()),
Value : valCopy ,
UserMeta : a .Copy ([]byte {item .UserMeta ()}),
Version : item .Version (),
ExpiresAt : item .ExpiresAt (),
Meta : a .Copy ([]byte {meta }),
}
list .Kv = append (list .Kv , kv )
switch {
case item .DiscardEarlierVersions ():
list .Kv = append (list .Kv , &pb .KV {
Key : item .KeyCopy (nil ),
Version : item .Version () - 1 ,
Meta : []byte {bitDelete },
})
return list , nil
case item .IsDeletedOrExpired ():
return list , nil
}
}
return list , nil
}
var maxVersion uint64
stream .Send = func (buf *z .Buffer ) error {
list , err := BufferToKVList (buf )
if err != nil {
return err
}
out := list .Kv [:0 ]
for _ , kv := range list .Kv {
if maxVersion < kv .Version {
maxVersion = kv .Version
}
if !kv .StreamDone {
out = append (out , kv )
}
}
list .Kv = out
return writeTo (list , w )
}
if err := stream .Orchestrate (context .Background ()); err != nil {
return 0 , err
}
return maxVersion , nil
}
func writeTo(list *pb .KVList , w io .Writer ) error {
if err := binary .Write (w , binary .LittleEndian , uint64 (proto .Size (list ))); err != nil {
return err
}
buf , err := proto .Marshal (list )
if err != nil {
return err
}
_, err = w .Write (buf )
return err
}
type KVLoader struct {
db *DB
throttle *y .Throttle
entries []*Entry
entriesSize int64
totalSize int64
}
func (db *DB ) NewKVLoader (maxPendingWrites int ) *KVLoader {
return &KVLoader {
db : db ,
throttle : y .NewThrottle (maxPendingWrites ),
entries : make ([]*Entry , 0 , db .opt .maxBatchCount ),
}
}
func (l *KVLoader ) Set (kv *pb .KV ) error {
var userMeta , meta byte
if len (kv .UserMeta ) > 0 {
userMeta = kv .UserMeta [0 ]
}
if len (kv .Meta ) > 0 {
meta = kv .Meta [0 ]
}
e := &Entry {
Key : y .KeyWithTs (kv .Key , kv .Version ),
Value : kv .Value ,
UserMeta : userMeta ,
ExpiresAt : kv .ExpiresAt ,
meta : meta ,
}
estimatedSize := e .estimateSizeAndSetThreshold (l .db .valueThreshold ())
if int64 (len (l .entries ))+1 >= l .db .opt .maxBatchCount ||
l .entriesSize +estimatedSize >= l .db .opt .maxBatchSize ||
l .totalSize >= flushThreshold {
if err := l .send (); err != nil {
return err
}
}
l .entries = append (l .entries , e )
l .entriesSize += estimatedSize
l .totalSize += estimatedSize + int64 (len (e .Value ))
return nil
}
func (l *KVLoader ) send () error {
if err := l .throttle .Do (); err != nil {
return err
}
if err := l .db .batchSetAsync (l .entries , func (err error ) {
l .throttle .Done (err )
}); err != nil {
return err
}
l .entries = make ([]*Entry , 0 , l .db .opt .maxBatchCount )
l .entriesSize = 0
l .totalSize = 0
return nil
}
func (l *KVLoader ) Finish () error {
if len (l .entries ) > 0 {
if err := l .send (); err != nil {
return err
}
}
return l .throttle .Finish ()
}
func (db *DB ) Load (r io .Reader , maxPendingWrites int ) error {
br := bufio .NewReaderSize (r , 16 <<10 )
unmarshalBuf := make ([]byte , 1 <<10 )
ldr := db .NewKVLoader (maxPendingWrites )
for {
var sz uint64
err := binary .Read (br , binary .LittleEndian , &sz )
if err == io .EOF {
break
} else if err != nil {
return err
}
if cap (unmarshalBuf ) < int (sz ) {
unmarshalBuf = make ([]byte , sz )
}
if _, err = io .ReadFull (br , unmarshalBuf [:sz ]); err != nil {
return err
}
list := &pb .KVList {}
if err := proto .Unmarshal (unmarshalBuf [:sz ], list ); err != nil {
return err
}
for _ , kv := range list .Kv {
if err := ldr .Set (kv ); err != nil {
return err
}
if kv .Version >= db .orc .nextTxnTs {
db .orc .nextTxnTs = kv .Version + 1
}
}
}
if err := ldr .Finish (); err != nil {
return err
}
db .orc .txnMark .Done (db .orc .nextTxnTs - 1 )
return nil
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .