package badger
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"google.golang.org/protobuf/proto"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2/z"
)
type WriteBatch struct {
sync .Mutex
txn *Txn
db *DB
throttle *y .Throttle
err atomic .Value
isManaged bool
commitTs uint64
finished bool
}
func (db *DB ) NewWriteBatch () *WriteBatch {
if db .opt .managedTxns {
panic ("cannot use NewWriteBatch in managed mode. Use NewWriteBatchAt instead" )
}
return db .newWriteBatch (false )
}
func (db *DB ) newWriteBatch (isManaged bool ) *WriteBatch {
return &WriteBatch {
db : db ,
isManaged : isManaged ,
txn : db .newTransaction (true , isManaged ),
throttle : y .NewThrottle (16 ),
}
}
func (wb *WriteBatch ) SetMaxPendingTxns (max int ) {
wb .throttle = y .NewThrottle (max )
}
func (wb *WriteBatch ) Cancel () {
wb .Lock ()
defer wb .Unlock ()
wb .finished = true
if err := wb .throttle .Finish (); err != nil {
wb .db .opt .Errorf ("WatchBatch.Cancel error while finishing: %v" , err )
}
wb .txn .Discard ()
}
func (wb *WriteBatch ) callback (err error ) {
defer wb .throttle .Done (err )
if err == nil {
return
}
if err := wb .Error (); err != nil {
return
}
wb .err .Store (err )
}
func (wb *WriteBatch ) writeKV (kv *pb .KV ) error {
e := Entry {Key : kv .Key , Value : kv .Value }
if len (kv .UserMeta ) > 0 {
e .UserMeta = kv .UserMeta [0 ]
}
y .AssertTrue (kv .Version != 0 )
e .version = kv .Version
return wb .handleEntry (&e )
}
func (wb *WriteBatch ) Write (buf *z .Buffer ) error {
wb .Lock ()
defer wb .Unlock ()
err := buf .SliceIterate (func (s []byte ) error {
kv := &pb .KV {}
if err := proto .Unmarshal (s , kv ); err != nil {
return err
}
return wb .writeKV (kv )
})
return err
}
func (wb *WriteBatch ) WriteList (kvList *pb .KVList ) error {
wb .Lock ()
defer wb .Unlock ()
for _ , kv := range kvList .Kv {
if err := wb .writeKV (kv ); err != nil {
return err
}
}
return nil
}
func (wb *WriteBatch ) SetEntryAt (e *Entry , ts uint64 ) error {
if !wb .db .opt .managedTxns {
return errors .New ("SetEntryAt can only be used in managed mode. Use SetEntry instead" )
}
e .version = ts
return wb .SetEntry (e )
}
func (wb *WriteBatch ) handleEntry (e *Entry ) error {
if err := wb .txn .SetEntry (e ); err != ErrTxnTooBig {
return err
}
if cerr := wb .commit (); cerr != nil {
return cerr
}
if err := wb .txn .SetEntry (e ); err != nil {
wb .err .Store (err )
return err
}
return nil
}
func (wb *WriteBatch ) SetEntry (e *Entry ) error {
wb .Lock ()
defer wb .Unlock ()
return wb .handleEntry (e )
}
func (wb *WriteBatch ) Set (k , v []byte ) error {
e := &Entry {Key : k , Value : v }
return wb .SetEntry (e )
}
func (wb *WriteBatch ) DeleteAt (k []byte , ts uint64 ) error {
e := Entry {Key : k , meta : bitDelete , version : ts }
return wb .SetEntry (&e )
}
func (wb *WriteBatch ) Delete (k []byte ) error {
wb .Lock ()
defer wb .Unlock ()
if err := wb .txn .Delete (k ); err != ErrTxnTooBig {
return err
}
if err := wb .commit (); err != nil {
return err
}
if err := wb .txn .Delete (k ); err != nil {
wb .err .Store (err )
return err
}
return nil
}
func (wb *WriteBatch ) commit () error {
if err := wb .Error (); err != nil {
return err
}
if wb .finished {
return y .ErrCommitAfterFinish
}
if err := wb .throttle .Do (); err != nil {
wb .err .Store (err )
return err
}
wb .txn .CommitWith (wb .callback )
wb .txn = wb .db .newTransaction (true , wb .isManaged )
wb .txn .commitTs = wb .commitTs
return wb .Error ()
}
func (wb *WriteBatch ) Flush () error {
wb .Lock ()
err := wb .commit ()
if err != nil {
wb .Unlock ()
return err
}
wb .finished = true
wb .txn .Discard ()
wb .Unlock ()
if err := wb .throttle .Finish (); err != nil {
if wb .Error () != nil {
return fmt .Errorf ("wb.err: %w err: %w" , wb .Error (), err )
}
return err
}
return wb .Error ()
}
func (wb *WriteBatch ) Error () error {
err , _ := wb .err .Load ().(error )
return err
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .