package badger
import (
"encoding/hex"
"fmt"
"sync"
"github.com/dustin/go-humanize"
"google.golang.org/protobuf/proto"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/badger/v4/table"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2/z"
)
type StreamWriter struct {
writeLock sync .Mutex
db *DB
done func ()
throttle *y .Throttle
maxVersion uint64
writers map [uint32 ]*sortedWriter
prevLevel int
}
func (db *DB ) NewStreamWriter () *StreamWriter {
return &StreamWriter {
db : db ,
throttle : y .NewThrottle (16 ),
writers : make (map [uint32 ]*sortedWriter ),
}
}
func (sw *StreamWriter ) Prepare () error {
sw .writeLock .Lock ()
defer sw .writeLock .Unlock ()
done , err := sw .db .dropAll ()
var once sync .Once
sw .done = func () { once .Do (done ) }
return err
}
func (sw *StreamWriter ) PrepareIncremental () error {
sw .writeLock .Lock ()
defer sw .writeLock .Unlock ()
var once sync .Once
f , err := sw .db .prepareToDrop ()
if err != nil {
sw .done = func () { once .Do (f ) }
return err
}
sw .db .stopCompactions ()
done := func () {
sw .db .startCompactions ()
f ()
}
sw .done = func () { once .Do (done ) }
mts , decr := sw .db .getMemTables ()
defer decr ()
for _ , m := range mts {
if !m .sl .Empty () {
return fmt .Errorf ("Unable to do incremental writes because MemTable has data" )
}
}
isEmptyDB := true
for _ , level := range sw .db .Levels () {
if level .NumTables > 0 {
sw .prevLevel = level .Level
isEmptyDB = false
break
}
}
if isEmptyDB {
return nil
}
if sw .prevLevel == 0 {
if err := sw .db .Flatten (3 ); err != nil {
return fmt .Errorf ("error during flatten in StreamWriter: %w" , err )
}
sw .prevLevel = len (sw .db .Levels ()) - 1
}
return nil
}
func (sw *StreamWriter ) Write (buf *z .Buffer ) error {
if buf .LenNoPadding () == 0 {
return nil
}
closedStreams := make (map [uint32 ]struct {})
streamReqs := make (map [uint32 ]*request )
err := buf .SliceIterate (func (s []byte ) error {
var kv pb .KV
if err := proto .Unmarshal (s , &kv ); err != nil {
return err
}
if kv .StreamDone {
closedStreams [kv .StreamId ] = struct {}{}
return nil
}
if _ , ok := closedStreams [kv .StreamId ]; ok {
panic (fmt .Sprintf ("write performed on closed stream: %d" , kv .StreamId ))
}
sw .writeLock .Lock ()
if sw .maxVersion < kv .Version {
sw .maxVersion = kv .Version
}
if sw .prevLevel == 0 {
sw .prevLevel = len (sw .db .lc .levels )
}
sw .writeLock .Unlock ()
var meta , userMeta byte
if len (kv .Meta ) > 0 {
meta = kv .Meta [0 ]
}
if len (kv .UserMeta ) > 0 {
userMeta = kv .UserMeta [0 ]
}
e := &Entry {
Key : y .KeyWithTs (kv .Key , kv .Version ),
Value : y .Copy (kv .Value ),
UserMeta : userMeta ,
ExpiresAt : kv .ExpiresAt ,
meta : meta ,
}
req := streamReqs [kv .StreamId ]
if req == nil {
req = &request {}
streamReqs [kv .StreamId ] = req
}
req .Entries = append (req .Entries , e )
return nil
})
if err != nil {
return err
}
all := make ([]*request , 0 , len (streamReqs ))
for _ , req := range streamReqs {
all = append (all , req )
}
sw .writeLock .Lock ()
defer sw .writeLock .Unlock ()
if err := sw .db .vlog .write (all ); err != nil {
return err
}
for streamID , req := range streamReqs {
writer , ok := sw .writers [streamID ]
if !ok {
var err error
writer , err = sw .newWriter (streamID )
if err != nil {
return y .Wrapf (err , "failed to create writer with ID %d" , streamID )
}
sw .writers [streamID ] = writer
}
if writer == nil {
panic (fmt .Sprintf ("write performed on closed stream: %d" , streamID ))
}
writer .reqCh <- req
}
for streamId := range closedStreams {
writer , ok := sw .writers [streamId ]
if !ok {
sw .db .opt .Warningf ("Trying to close stream: %d, but no sorted " +
"writer found for it" , streamId )
continue
}
writer .closer .SignalAndWait ()
if err := writer .Done (); err != nil {
return err
}
sw .writers [streamId ] = nil
}
return nil
}
func (sw *StreamWriter ) Flush () error {
sw .writeLock .Lock ()
defer sw .writeLock .Unlock ()
defer sw .done ()
for _ , writer := range sw .writers {
if writer != nil {
writer .closer .SignalAndWait ()
}
}
for _ , writer := range sw .writers {
if writer == nil {
continue
}
if err := writer .Done (); err != nil {
return err
}
}
if !sw .db .opt .managedTxns {
if sw .db .orc != nil {
sw .db .orc .Stop ()
}
if curMax := sw .db .orc .readTs (); curMax >= sw .maxVersion {
sw .maxVersion = curMax
}
sw .db .orc = newOracle (sw .db .opt )
sw .db .orc .nextTxnTs = sw .maxVersion
sw .db .orc .txnMark .Done (sw .maxVersion )
sw .db .orc .readMark .Done (sw .maxVersion )
sw .db .orc .incrementNextTs ()
}
if err := sw .throttle .Finish (); err != nil {
return err
}
for _ , l := range sw .db .lc .levels {
l .sortTables ()
}
if sw .db .opt .ValueDir != sw .db .opt .Dir {
if err := sw .db .syncDir (sw .db .opt .ValueDir ); err != nil {
return err
}
}
if err := sw .db .syncDir (sw .db .opt .Dir ); err != nil {
return err
}
return sw .db .lc .validate ()
}
func (sw *StreamWriter ) Cancel () {
sw .writeLock .Lock ()
defer sw .writeLock .Unlock ()
for _ , writer := range sw .writers {
if writer != nil {
writer .closer .Signal ()
}
}
for _ , writer := range sw .writers {
if writer != nil {
writer .closer .Wait ()
}
}
if err := sw .throttle .Finish (); err != nil {
sw .db .opt .Errorf ("error in throttle.Finish: %+v" , err )
}
if sw .done != nil {
sw .done ()
}
}
type sortedWriter struct {
db *DB
throttle *y .Throttle
opts table .Options
builder *table .Builder
lastKey []byte
level int
streamID uint32
reqCh chan *request
closer *z .Closer
}
func (sw *StreamWriter ) newWriter (streamID uint32 ) (*sortedWriter , error ) {
bopts := buildTableOptions (sw .db )
for i := 2 ; i < sw .db .opt .MaxLevels ; i ++ {
bopts .TableSize *= uint64 (sw .db .opt .TableSizeMultiplier )
}
w := &sortedWriter {
db : sw .db ,
opts : bopts ,
streamID : streamID ,
throttle : sw .throttle ,
builder : table .NewTableBuilder (bopts ),
reqCh : make (chan *request , 3 ),
closer : z .NewCloser (1 ),
level : sw .prevLevel - 1 ,
}
go w .handleRequests ()
return w , nil
}
func (w *sortedWriter ) handleRequests () {
defer w .closer .Done ()
process := func (req *request ) {
for i , e := range req .Entries {
var vs y .ValueStruct
if e .skipVlogAndSetThreshold (w .db .valueThreshold ()) {
vs = y .ValueStruct {
Value : e .Value ,
Meta : e .meta ,
UserMeta : e .UserMeta ,
ExpiresAt : e .ExpiresAt ,
}
} else {
vptr := req .Ptrs [i ]
vs = y .ValueStruct {
Value : vptr .Encode (),
Meta : e .meta | bitValuePointer ,
UserMeta : e .UserMeta ,
ExpiresAt : e .ExpiresAt ,
}
}
if err := w .Add (e .Key , vs ); err != nil {
panic (err )
}
}
}
for {
select {
case req := <- w .reqCh :
process (req )
case <- w .closer .HasBeenClosed ():
close (w .reqCh )
for req := range w .reqCh {
process (req )
}
return
}
}
}
func (w *sortedWriter ) Add (key []byte , vs y .ValueStruct ) error {
if len (w .lastKey ) > 0 && y .CompareKeys (key , w .lastKey ) <= 0 {
return fmt .Errorf ("keys not in sorted order (last key: %s, key: %s)" ,
hex .Dump (w .lastKey ), hex .Dump (key ))
}
sameKey := y .SameKey (key , w .lastKey )
if !sameKey && w .builder .ReachedCapacity () {
if err := w .send (false ); err != nil {
return err
}
}
w .lastKey = y .SafeCopy (w .lastKey , key )
var vp valuePointer
if vs .Meta &bitValuePointer > 0 {
vp .Decode (vs .Value )
}
w .builder .Add (key , vs , vp .Len )
return nil
}
func (w *sortedWriter ) send (done bool ) error {
if err := w .throttle .Do (); err != nil {
return err
}
go func (builder *table .Builder ) {
err := w .createTable (builder )
w .throttle .Done (err )
}(w .builder )
if done {
w .builder = nil
return nil
}
w .builder = table .NewTableBuilder (w .opts )
return nil
}
func (w *sortedWriter ) Done () error {
if w .builder .Empty () {
w .builder .Close ()
w .builder = nil
return nil
}
return w .send (true )
}
func (w *sortedWriter ) createTable (builder *table .Builder ) error {
defer builder .Close ()
if builder .Empty () {
builder .Finish ()
return nil
}
fileID := w .db .lc .reserveFileID ()
var tbl *table .Table
if w .db .opt .InMemory {
data := builder .Finish ()
var err error
if tbl , err = table .OpenInMemoryTable (data , fileID , builder .Opts ()); err != nil {
return err
}
} else {
var err error
fname := table .NewFilename (fileID , w .db .opt .Dir )
if tbl , err = table .CreateTable (fname , builder ); err != nil {
return err
}
}
lc := w .db .lc
lhandler := lc .levels [w .level ]
change := &pb .ManifestChange {
Id : tbl .ID (),
KeyId : tbl .KeyID (),
Op : pb .ManifestChange_CREATE ,
Level : uint32 (lhandler .level ),
Compression : uint32 (tbl .CompressionType ()),
}
if err := w .db .manifest .addChanges ([]*pb .ManifestChange {change }, w .db .opt ); err != nil {
return err
}
lhandler .addTable (tbl )
_ = tbl .DecrRef ()
w .db .opt .Infof ("Table created: %d at level: %d for stream: %d. Size: %s\n" ,
fileID , lhandler .level , w .streamID , humanize .IBytes (uint64 (tbl .Size ())))
return nil
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .