package badger
import (
stderrors "errors"
"sync"
"time"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2/z"
)
type MergeOperator struct {
sync .RWMutex
f MergeFunc
db *DB
key []byte
closer *z .Closer
}
type MergeFunc func (existingVal, newVal []byte ) []byte
func (db *DB ) GetMergeOperator (key []byte ,
f MergeFunc , dur time .Duration ) *MergeOperator {
op := &MergeOperator {
f : f ,
db : db ,
key : key ,
closer : z .NewCloser (1 ),
}
go op .runCompactions (dur )
return op
}
var errNoMerge = stderrors .New ("No need for merge" )
func (op *MergeOperator ) iterateAndMerge () (newVal []byte , latest uint64 , err error ) {
txn := op .db .NewTransaction (false )
defer txn .Discard ()
opt := DefaultIteratorOptions
opt .AllVersions = true
it := txn .NewKeyIterator (op .key , opt )
defer it .Close ()
var numVersions int
for it .Rewind (); it .Valid (); it .Next () {
item := it .Item ()
if item .IsDeletedOrExpired () {
break
}
numVersions ++
if numVersions == 1 {
newVal , err = item .ValueCopy (newVal )
if err != nil {
return nil , 0 , err
}
latest = item .Version ()
} else {
if err := item .Value (func (oldVal []byte ) error {
newVal = op .f (oldVal , newVal )
return nil
}); err != nil {
return nil , 0 , err
}
}
if item .DiscardEarlierVersions () {
break
}
}
if numVersions == 0 {
return nil , latest , ErrKeyNotFound
} else if numVersions == 1 {
return newVal , latest , errNoMerge
}
return newVal , latest , nil
}
func (op *MergeOperator ) compact () error {
op .Lock ()
defer op .Unlock ()
val , version , err := op .iterateAndMerge ()
if err == ErrKeyNotFound || err == errNoMerge {
return nil
} else if err != nil {
return err
}
entries := []*Entry {
{
Key : y .KeyWithTs (op .key , version ),
Value : val ,
meta : bitDiscardEarlierVersions ,
},
}
return op .db .batchSetAsync (entries , func (err error ) {
if err != nil {
op .db .opt .Errorf ("failed to insert the result of merge compaction: %s" , err )
}
})
}
func (op *MergeOperator ) runCompactions (dur time .Duration ) {
ticker := time .NewTicker (dur )
defer op .closer .Done ()
var stop bool
for {
select {
case <- op .closer .HasBeenClosed ():
stop = true
case <- ticker .C :
}
if err := op .compact (); err != nil {
op .db .opt .Errorf ("failure while running merge operation: %s" , err )
}
if stop {
ticker .Stop ()
break
}
}
}
func (op *MergeOperator ) Add (val []byte ) error {
return op .db .Update (func (txn *Txn ) error {
return txn .SetEntry (NewEntry (op .key , val ).withMergeBit ())
})
}
func (op *MergeOperator ) Get () ([]byte , error ) {
op .RLock ()
defer op .RUnlock ()
var existing []byte
err := op .db .View (func (txn *Txn ) (err error ) {
existing , _, err = op .iterateAndMerge ()
return err
})
if err == errNoMerge {
return existing , nil
}
return existing , err
}
func (op *MergeOperator ) Stop () {
op .closer .SignalAndWait ()
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .