package badger
import (
"bytes"
"context"
"encoding/binary"
"errors"
"expvar"
"fmt"
"math"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
humanize "github.com/dustin/go-humanize"
"github.com/dgraph-io/badger/v4/fb"
"github.com/dgraph-io/badger/v4/options"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/badger/v4/skl"
"github.com/dgraph-io/badger/v4/table"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2"
"github.com/dgraph-io/ristretto/v2/z"
)
var (
badgerPrefix = []byte ("!badger!" )
txnKey = []byte ("!badger!txn" )
bannedNsKey = []byte ("!badger!banned" )
)
type closers struct {
updateSize *z .Closer
compactors *z .Closer
memtable *z .Closer
writes *z .Closer
valueGC *z .Closer
pub *z .Closer
cacheHealth *z .Closer
}
type lockedKeys struct {
sync .RWMutex
keys map [uint64 ]struct {}
}
func (lk *lockedKeys ) add (key uint64 ) {
lk .Lock ()
defer lk .Unlock ()
lk .keys [key ] = struct {}{}
}
func (lk *lockedKeys ) has (key uint64 ) bool {
lk .RLock ()
defer lk .RUnlock ()
_ , ok := lk .keys [key ]
return ok
}
func (lk *lockedKeys ) all () []uint64 {
lk .RLock ()
defer lk .RUnlock ()
keys := make ([]uint64 , 0 , len (lk .keys ))
for key := range lk .keys {
keys = append (keys , key )
}
return keys
}
type DB struct {
testOnlyDBExtensions
lock sync .RWMutex
dirLockGuard *directoryLockGuard
valueDirGuard *directoryLockGuard
closers closers
mt *memTable
imm []*memTable
nextMemFid int
opt Options
manifest *manifestFile
lc *levelsController
vlog valueLog
writeCh chan *request
flushChan chan *memTable
closeOnce sync .Once
blockWrites atomic .Int32
isClosed atomic .Uint32
orc *oracle
bannedNamespaces *lockedKeys
threshold *vlogThreshold
pub *publisher
registry *KeyRegistry
blockCache *ristretto .Cache [[]byte , *table .Block ]
indexCache *ristretto .Cache [uint64 , *fb .TableIndex ]
allocPool *z .AllocatorPool
}
const (
kvWriteChCapacity = 1000
)
func checkAndSetOptions(opt *Options ) error {
if opt .NumCompactors == 1 {
return errors .New ("Cannot have 1 compactor. Need at least 2" )
}
if opt .InMemory && (opt .Dir != "" || opt .ValueDir != "" ) {
return errors .New ("Cannot use badger in Disk-less mode with Dir or ValueDir set" )
}
opt .maxBatchSize = (15 * opt .MemTableSize ) / 100
opt .maxBatchCount = opt .maxBatchSize / int64 (skl .MaxNodeSize )
opt .maxValueThreshold = math .Min (maxValueThreshold , float64 (opt .maxBatchSize ))
if opt .VLogPercentile < 0.0 || opt .VLogPercentile > 1.0 {
return errors .New ("vlogPercentile must be within range of 0.0-1.0" )
}
if opt .ValueThreshold > maxValueThreshold {
return fmt .Errorf ("Invalid ValueThreshold, must be less or equal to %d" ,
maxValueThreshold )
}
if opt .ValueThreshold > opt .maxBatchSize {
return fmt .Errorf ("Valuethreshold %d greater than max batch size of %d. Either " +
"reduce opt.ValueThreshold or increase opt.BaseTableSize." ,
opt .ValueThreshold , opt .maxBatchSize )
}
if !(opt .ValueLogFileSize < 2 <<30 && opt .ValueLogFileSize >= 1 <<20 ) {
return ErrValueLogSize
}
if opt .ReadOnly {
opt .CompactL0OnClose = false
}
needCache := (opt .Compression != options .None ) || (len (opt .EncryptionKey ) > 0 )
if needCache && opt .BlockCacheSize == 0 {
panic ("BlockCacheSize should be set since compression/encryption are enabled" )
}
return nil
}
func Open (opt Options ) (*DB , error ) {
if err := checkAndSetOptions (&opt ); err != nil {
return nil , err
}
var dirLockGuard , valueDirLockGuard *directoryLockGuard
if !opt .InMemory {
if err := createDirs (opt ); err != nil {
return nil , err
}
var err error
if !opt .BypassLockGuard {
dirLockGuard , err = acquireDirectoryLock (opt .Dir , lockFile , opt .ReadOnly )
if err != nil {
return nil , err
}
defer func () {
if dirLockGuard != nil {
_ = dirLockGuard .release ()
}
}()
absDir , err := filepath .Abs (opt .Dir )
if err != nil {
return nil , err
}
absValueDir , err := filepath .Abs (opt .ValueDir )
if err != nil {
return nil , err
}
if absValueDir != absDir {
valueDirLockGuard , err = acquireDirectoryLock (opt .ValueDir , lockFile , opt .ReadOnly )
if err != nil {
return nil , err
}
defer func () {
if valueDirLockGuard != nil {
_ = valueDirLockGuard .release ()
}
}()
}
}
}
manifestFile , manifest , err := openOrCreateManifestFile (opt )
if err != nil {
return nil , err
}
defer func () {
if manifestFile != nil {
_ = manifestFile .close ()
}
}()
db := &DB {
imm : make ([]*memTable , 0 , opt .NumMemtables ),
flushChan : make (chan *memTable , opt .NumMemtables ),
writeCh : make (chan *request , kvWriteChCapacity ),
opt : opt ,
manifest : manifestFile ,
dirLockGuard : dirLockGuard ,
valueDirGuard : valueDirLockGuard ,
orc : newOracle (opt ),
pub : newPublisher (),
allocPool : z .NewAllocatorPool (8 ),
bannedNamespaces : &lockedKeys {keys : make (map [uint64 ]struct {})},
threshold : initVlogThreshold (&opt ),
}
db .syncChan = opt .syncChan
defer func () {
if err != nil {
opt .Errorf ("Received err: %v. Cleaning up..." , err )
db .cleanup ()
db = nil
}
}()
if opt .BlockCacheSize > 0 {
numInCache := opt .BlockCacheSize / int64 (opt .BlockSize )
if numInCache == 0 {
numInCache = 1
}
config := ristretto .Config [[]byte , *table .Block ]{
NumCounters : numInCache * 8 ,
MaxCost : opt .BlockCacheSize ,
BufferItems : 64 ,
Metrics : true ,
OnExit : table .BlockEvictHandler ,
}
db .blockCache , err = ristretto .NewCache [[]byte , *table .Block ](&config )
if err != nil {
return nil , y .Wrap (err , "failed to create data cache" )
}
}
if opt .IndexCacheSize > 0 {
indexSz := int64 (float64 (opt .MemTableSize ) * 0.05 )
numInCache := opt .IndexCacheSize / indexSz
if numInCache == 0 {
numInCache = 1
}
config := ristretto .Config [uint64 , *fb .TableIndex ]{
NumCounters : numInCache * 8 ,
MaxCost : opt .IndexCacheSize ,
BufferItems : 64 ,
Metrics : true ,
}
db .indexCache , err = ristretto .NewCache (&config )
if err != nil {
return nil , y .Wrap (err , "failed to create bf cache" )
}
}
db .closers .cacheHealth = z .NewCloser (1 )
go db .monitorCache (db .closers .cacheHealth )
if db .opt .InMemory {
db .opt .SyncWrites = false
db .opt .ValueThreshold = math .MaxInt32
}
krOpt := KeyRegistryOptions {
ReadOnly : opt .ReadOnly ,
Dir : opt .Dir ,
EncryptionKey : opt .EncryptionKey ,
EncryptionKeyRotationDuration : opt .EncryptionKeyRotationDuration ,
InMemory : opt .InMemory ,
}
if db .registry , err = OpenKeyRegistry (krOpt ); err != nil {
return db , err
}
db .calculateSize ()
db .closers .updateSize = z .NewCloser (1 )
go db .updateSize (db .closers .updateSize )
if err := db .openMemTables (db .opt ); err != nil {
return nil , y .Wrapf (err , "while opening memtables" )
}
if !db .opt .ReadOnly {
if db .mt , err = db .newMemTable (); err != nil {
return nil , y .Wrapf (err , "cannot create memtable" )
}
}
if db .lc , err = newLevelsController (db , &manifest ); err != nil {
return db , err
}
db .vlog .init (db )
if !opt .ReadOnly {
db .closers .compactors = z .NewCloser (1 )
db .lc .startCompact (db .closers .compactors )
db .closers .memtable = z .NewCloser (1 )
go func () {
db .flushMemtable (db .closers .memtable )
}()
for _ , mt := range db .imm {
db .flushChan <- mt
}
}
db .orc .nextTxnTs = db .MaxVersion ()
db .opt .Infof ("Set nextTxnTs to %d" , db .orc .nextTxnTs )
if err = db .vlog .open (db ); err != nil {
return db , y .Wrapf (err , "During db.vlog.open" )
}
db .orc .txnMark .Done (db .orc .nextTxnTs )
db .orc .readMark .Done (db .orc .nextTxnTs )
db .orc .incrementNextTs ()
go db .threshold .listenForValueThresholdUpdate ()
if err := db .initBannedNamespaces (); err != nil {
return db , fmt .Errorf ("While setting banned keys: %w" , err )
}
db .closers .writes = z .NewCloser (1 )
go db .doWrites (db .closers .writes )
if !db .opt .InMemory {
db .closers .valueGC = z .NewCloser (1 )
go db .vlog .waitOnGC (db .closers .valueGC )
}
db .closers .pub = z .NewCloser (1 )
go db .pub .listenForUpdates (db .closers .pub )
valueDirLockGuard = nil
dirLockGuard = nil
manifestFile = nil
return db , nil
}
func (db *DB ) initBannedNamespaces () error {
if db .opt .NamespaceOffset < 0 {
return nil
}
return db .View (func (txn *Txn ) error {
iopts := DefaultIteratorOptions
iopts .Prefix = bannedNsKey
iopts .PrefetchValues = false
iopts .InternalAccess = true
itr := txn .NewIterator (iopts )
defer itr .Close ()
for itr .Rewind (); itr .Valid (); itr .Next () {
key := y .BytesToU64 (itr .Item ().Key ()[len (bannedNsKey ):])
db .bannedNamespaces .add (key )
}
return nil
})
}
func (db *DB ) MaxVersion () uint64 {
var maxVersion uint64
update := func (a uint64 ) {
if a > maxVersion {
maxVersion = a
}
}
db .lock .Lock ()
if !db .opt .ReadOnly {
update (db .mt .maxVersion )
}
for _ , mt := range db .imm {
update (mt .maxVersion )
}
db .lock .Unlock ()
for _ , ti := range db .Tables () {
update (ti .MaxVersion )
}
return maxVersion
}
func (db *DB ) monitorCache (c *z .Closer ) {
defer c .Done ()
count := 0
analyze := func (name string , metrics *ristretto .Metrics ) {
le := metrics .LifeExpectancySeconds ()
if le == nil {
return
}
lifeTooShort := le .Count > 0 && float64 (le .Sum )/float64 (le .Count ) < 10
hitRatioTooLow := metrics .Ratio () > 0 && metrics .Ratio () < 0.4
if lifeTooShort && hitRatioTooLow {
db .opt .Warningf ("%s might be too small. Metrics: %s\n" , name , metrics )
db .opt .Warningf ("Cache life expectancy (in seconds): %+v\n" , le )
} else if le .Count > 1000 && count %5 == 0 {
db .opt .Infof ("%s metrics: %s\n" , name , metrics )
}
}
ticker := time .NewTicker (1 * time .Minute )
defer ticker .Stop ()
for {
select {
case <- c .HasBeenClosed ():
return
case <- ticker .C :
}
analyze ("Block cache" , db .BlockCacheMetrics ())
analyze ("Index cache" , db .IndexCacheMetrics ())
count ++
}
}
func (db *DB ) cleanup () {
db .stopMemoryFlush ()
db .stopCompactions ()
db .blockCache .Close ()
db .indexCache .Close ()
if db .closers .updateSize != nil {
db .closers .updateSize .Signal ()
}
if db .closers .valueGC != nil {
db .closers .valueGC .Signal ()
}
if db .closers .writes != nil {
db .closers .writes .Signal ()
}
if db .closers .pub != nil {
db .closers .pub .Signal ()
}
db .orc .Stop ()
}
func (db *DB ) BlockCacheMetrics () *ristretto .Metrics {
if db .blockCache != nil {
return db .blockCache .Metrics
}
return nil
}
func (db *DB ) IndexCacheMetrics () *ristretto .Metrics {
if db .indexCache != nil {
return db .indexCache .Metrics
}
return nil
}
func (db *DB ) Close () error {
var err error
db .closeOnce .Do (func () {
err = db .close ()
})
return err
}
func (db *DB ) IsClosed () bool {
return db .isClosed .Load () == 1
}
func (db *DB ) close () (err error ) {
defer db .allocPool .Release ()
db .opt .Debugf ("Closing database" )
db .opt .Infof ("Lifetime L0 stalled for: %s\n" , time .Duration (db .lc .l0stallsMs .Load ()))
db .blockWrites .Store (1 )
db .isClosed .Store (1 )
if !db .opt .InMemory {
db .closers .valueGC .SignalAndWait ()
}
db .closers .writes .SignalAndWait ()
close (db .writeCh )
db .closers .pub .SignalAndWait ()
db .closers .cacheHealth .Signal ()
if db .mt != nil {
if db .mt .sl .Empty () {
db .mt .DecrRef ()
} else {
db .opt .Debugf ("Flushing memtable" )
for {
pushedMemTable := func () bool {
db .lock .Lock ()
defer db .lock .Unlock ()
y .AssertTrue (db .mt != nil )
select {
case db .flushChan <- db .mt :
db .imm = append (db .imm , db .mt )
db .mt = nil
db .opt .Debugf ("pushed to flush chan\n" )
return true
default :
}
return false
}()
if pushedMemTable {
break
}
time .Sleep (10 * time .Millisecond )
}
}
}
db .stopMemoryFlush ()
db .stopCompactions ()
if db .opt .CompactL0OnClose {
err := db .lc .doCompact (173 , compactionPriority {level : 0 , score : 1.73 })
switch err {
case errFillTables :
case nil :
db .opt .Debugf ("Force compaction on level 0 done" )
default :
db .opt .Warningf ("While forcing compaction on level 0: %v" , err )
}
}
if vlogErr := db .vlog .Close (); vlogErr != nil {
err = y .Wrap (vlogErr , "DB.Close" )
}
db .opt .Infof (db .LevelsToString ())
if lcErr := db .lc .close (); err == nil {
err = y .Wrap (lcErr , "DB.Close" )
}
db .opt .Debugf ("Waiting for closer" )
db .closers .updateSize .SignalAndWait ()
db .orc .Stop ()
db .blockCache .Close ()
db .indexCache .Close ()
db .threshold .close ()
if db .opt .InMemory {
return
}
if db .dirLockGuard != nil {
if guardErr := db .dirLockGuard .release (); err == nil {
err = y .Wrap (guardErr , "DB.Close" )
}
}
if db .valueDirGuard != nil {
if guardErr := db .valueDirGuard .release (); err == nil {
err = y .Wrap (guardErr , "DB.Close" )
}
}
if manifestErr := db .manifest .close (); err == nil {
err = y .Wrap (manifestErr , "DB.Close" )
}
if registryErr := db .registry .Close (); err == nil {
err = y .Wrap (registryErr , "DB.Close" )
}
if syncErr := db .syncDir (db .opt .Dir ); err == nil {
err = y .Wrap (syncErr , "DB.Close" )
}
if syncErr := db .syncDir (db .opt .ValueDir ); err == nil {
err = y .Wrap (syncErr , "DB.Close" )
}
return err
}
func (db *DB ) VerifyChecksum () error {
return db .lc .verifyChecksum ()
}
const (
lockFile = "LOCK"
)
func (db *DB ) Sync () error {
db .lock .RLock ()
memtableSyncError := db .mt .SyncWAL ()
db .lock .RUnlock ()
vLogSyncError := db .vlog .sync ()
return y .CombineErrors (memtableSyncError , vLogSyncError )
}
func (db *DB ) getMemTables () ([]*memTable , func ()) {
db .lock .RLock ()
defer db .lock .RUnlock ()
var tables []*memTable
if !db .opt .ReadOnly {
tables = append (tables , db .mt )
db .mt .IncrRef ()
}
last := len (db .imm ) - 1
for i := range db .imm {
tables = append (tables , db .imm [last -i ])
db .imm [last -i ].IncrRef ()
}
return tables , func () {
for _ , tbl := range tables {
tbl .DecrRef ()
}
}
}
func (db *DB ) get (key []byte ) (y .ValueStruct , error ) {
if db .IsClosed () {
return y .ValueStruct {}, ErrDBClosed
}
tables , decr := db .getMemTables ()
defer decr ()
var maxVs y .ValueStruct
version := y .ParseTs (key )
y .NumGetsAdd (db .opt .MetricsEnabled , 1 )
for i := 0 ; i < len (tables ); i ++ {
vs := tables [i ].sl .Get (key )
y .NumMemtableGetsAdd (db .opt .MetricsEnabled , 1 )
if vs .Meta == 0 && vs .Value == nil {
continue
}
if vs .Version == version {
y .NumGetsWithResultsAdd (db .opt .MetricsEnabled , 1 )
return vs , nil
}
if maxVs .Version < vs .Version {
maxVs = vs
}
}
return db .lc .get (key , maxVs , 0 )
}
var requestPool = sync .Pool {
New : func () interface {} {
return new (request )
},
}
func (db *DB ) writeToLSM (b *request ) error {
if !db .opt .InMemory && len (b .Ptrs ) != len (b .Entries ) {
return fmt .Errorf ("Ptrs and Entries don't match: %+v" , b )
}
for i , entry := range b .Entries {
var err error
if entry .skipVlogAndSetThreshold (db .valueThreshold ()) {
err = db .mt .Put (entry .Key ,
y .ValueStruct {
Value : entry .Value ,
Meta : entry .meta &^ bitValuePointer ,
UserMeta : entry .UserMeta ,
ExpiresAt : entry .ExpiresAt ,
})
} else {
err = db .mt .Put (entry .Key ,
y .ValueStruct {
Value : b .Ptrs [i ].Encode (),
Meta : entry .meta | bitValuePointer ,
UserMeta : entry .UserMeta ,
ExpiresAt : entry .ExpiresAt ,
})
}
if err != nil {
return y .Wrapf (err , "while writing to memTable" )
}
}
if db .opt .SyncWrites {
return db .mt .SyncWAL ()
}
return nil
}
func (db *DB ) writeRequests (reqs []*request ) error {
if len (reqs ) == 0 {
return nil
}
done := func (err error ) {
for _ , r := range reqs {
r .Err = err
r .Wg .Done ()
}
}
db .opt .Debugf ("writeRequests called. Writing to value log" )
err := db .vlog .write (reqs )
if err != nil {
done (err )
return err
}
db .opt .Debugf ("Writing to memtable" )
var count int
for _ , b := range reqs {
if len (b .Entries ) == 0 {
continue
}
count += len (b .Entries )
var i uint64
var err error
for err = db .ensureRoomForWrite (); err == errNoRoom ; err = db .ensureRoomForWrite () {
i ++
if i %100 == 0 {
db .opt .Debugf ("Making room for writes" )
}
time .Sleep (10 * time .Millisecond )
}
if err != nil {
done (err )
return y .Wrap (err , "writeRequests" )
}
if err := db .writeToLSM (b ); err != nil {
done (err )
return y .Wrap (err , "writeRequests" )
}
}
db .opt .Debugf ("Sending updates to subscribers" )
db .pub .sendUpdates (reqs )
done (nil )
db .opt .Debugf ("%d entries written" , count )
return nil
}
func (db *DB ) sendToWriteCh (entries []*Entry ) (*request , error ) {
if db .blockWrites .Load () == 1 {
return nil , ErrBlockedWrites
}
var count , size int64
for _ , e := range entries {
size += e .estimateSizeAndSetThreshold (db .valueThreshold ())
count ++
}
y .NumBytesWrittenUserAdd (db .opt .MetricsEnabled , size )
if count >= db .opt .maxBatchCount || size >= db .opt .maxBatchSize {
return nil , ErrTxnTooBig
}
req := requestPool .Get ().(*request )
req .reset ()
req .Entries = entries
req .Wg .Add (1 )
req .IncrRef ()
db .writeCh <- req
y .NumPutsAdd (db .opt .MetricsEnabled , int64 (len (entries )))
return req , nil
}
func (db *DB ) doWrites (lc *z .Closer ) {
defer lc .Done ()
pendingCh := make (chan struct {}, 1 )
writeRequests := func (reqs []*request ) {
if err := db .writeRequests (reqs ); err != nil {
db .opt .Errorf ("writeRequests: %v" , err )
}
<-pendingCh
}
reqLen := new (expvar .Int )
y .PendingWritesSet (db .opt .MetricsEnabled , db .opt .Dir , reqLen )
reqs := make ([]*request , 0 , 10 )
for {
var r *request
select {
case r = <- db .writeCh :
case <- lc .HasBeenClosed ():
goto closedCase
}
for {
reqs = append (reqs , r )
reqLen .Set (int64 (len (reqs )))
if len (reqs ) >= 3 *kvWriteChCapacity {
pendingCh <- struct {}{}
goto writeCase
}
select {
case r = <- db .writeCh :
case pendingCh <- struct {}{}:
goto writeCase
case <- lc .HasBeenClosed ():
goto closedCase
}
}
closedCase :
for {
select {
case r = <- db .writeCh :
reqs = append (reqs , r )
default :
pendingCh <- struct {}{}
writeRequests (reqs )
return
}
}
writeCase :
go writeRequests (reqs )
reqs = make ([]*request , 0 , 10 )
reqLen .Set (0 )
}
}
func (db *DB ) batchSet (entries []*Entry ) error {
req , err := db .sendToWriteCh (entries )
if err != nil {
return err
}
return req .Wait ()
}
func (db *DB ) batchSetAsync (entries []*Entry , f func (error )) error {
req , err := db .sendToWriteCh (entries )
if err != nil {
return err
}
go func () {
err := req .Wait ()
f (err )
}()
return nil
}
var errNoRoom = errors .New ("No room for write" )
func (db *DB ) ensureRoomForWrite () error {
var err error
db .lock .Lock ()
defer db .lock .Unlock ()
y .AssertTrue (db .mt != nil )
if !db .mt .isFull () {
return nil
}
select {
case db .flushChan <- db .mt :
db .opt .Debugf ("Flushing memtable, mt.size=%d size of flushChan: %d\n" ,
db .mt .sl .MemSize (), len (db .flushChan ))
db .imm = append (db .imm , db .mt )
db .mt , err = db .newMemTable ()
if err != nil {
return y .Wrapf (err , "cannot create new mem table" )
}
return nil
default :
return errNoRoom
}
}
func arenaSize(opt Options ) int64 {
return opt .MemTableSize + opt .maxBatchSize + opt .maxBatchCount *int64 (skl .MaxNodeSize )
}
func buildL0Table(iter y .Iterator , dropPrefixes [][]byte , bopts table .Options ) *table .Builder {
defer iter .Close ()
b := table .NewTableBuilder (bopts )
for iter .Rewind (); iter .Valid (); iter .Next () {
if len (dropPrefixes ) > 0 && hasAnyPrefixes (iter .Key (), dropPrefixes ) {
continue
}
vs := iter .Value ()
var vp valuePointer
if vs .Meta &bitValuePointer > 0 {
vp .Decode (vs .Value )
}
b .Add (iter .Key (), iter .Value (), vp .Len )
}
return b
}
func (db *DB ) handleMemTableFlush (mt *memTable , dropPrefixes [][]byte ) error {
bopts := buildTableOptions (db )
itr := mt .sl .NewUniIterator (false )
builder := buildL0Table (itr , nil , bopts )
defer builder .Close ()
if builder .Empty () {
builder .Finish ()
return nil
}
fileID := db .lc .reserveFileID ()
var tbl *table .Table
var err error
if db .opt .InMemory {
data := builder .Finish ()
tbl , err = table .OpenInMemoryTable (data , fileID , &bopts )
} else {
tbl , err = table .CreateTable (table .NewFilename (fileID , db .opt .Dir ), builder )
}
if err != nil {
return y .Wrap (err , "error while creating table" )
}
err = db .lc .addLevel0Table (tbl )
_ = tbl .DecrRef ()
return err
}
func (db *DB ) flushMemtable (lc *z .Closer ) {
defer lc .Done ()
for mt := range db .flushChan {
if mt == nil {
continue
}
for {
if err := db .handleMemTableFlush (mt , nil ); err != nil {
db .opt .Errorf ("error flushing memtable to disk: %v, retrying" , err )
time .Sleep (time .Second )
continue
}
db .lock .Lock ()
y .AssertTrue (mt == db .imm [0 ])
db .imm = db .imm [1 :]
mt .DecrRef ()
db .lock .Unlock ()
break
}
}
}
func exists(path string ) (bool , error ) {
_ , err := os .Stat (path )
if err == nil {
return true , nil
}
if os .IsNotExist (err ) {
return false , nil
}
return true , err
}
func (db *DB ) calculateSize () {
if db .opt .InMemory {
return
}
newInt := func (val int64 ) *expvar .Int {
v := new (expvar .Int )
v .Add (val )
return v
}
totalSize := func (dir string ) (int64 , int64 ) {
var lsmSize , vlogSize int64
err := filepath .Walk (dir , func (path string , info os .FileInfo , err error ) error {
if err != nil {
return err
}
ext := filepath .Ext (path )
switch ext {
case ".sst" :
lsmSize += info .Size ()
case ".vlog" :
vlogSize += info .Size ()
}
return nil
})
if err != nil {
db .opt .Debugf ("Got error while calculating total size of directory: %s" , dir )
}
return lsmSize , vlogSize
}
lsmSize , vlogSize := totalSize (db .opt .Dir )
y .LSMSizeSet (db .opt .MetricsEnabled , db .opt .Dir , newInt (lsmSize ))
if db .opt .ValueDir != db .opt .Dir {
_, vlogSize = totalSize (db .opt .ValueDir )
}
y .VlogSizeSet (db .opt .MetricsEnabled , db .opt .ValueDir , newInt (vlogSize ))
}
func (db *DB ) updateSize (lc *z .Closer ) {
defer lc .Done ()
if db .opt .InMemory {
return
}
metricsTicker := time .NewTicker (time .Minute )
defer metricsTicker .Stop ()
for {
select {
case <- metricsTicker .C :
db .calculateSize ()
case <- lc .HasBeenClosed ():
return
}
}
}
func (db *DB ) RunValueLogGC (discardRatio float64 ) error {
if db .opt .InMemory {
return ErrGCInMemoryMode
}
if discardRatio >= 1.0 || discardRatio <= 0.0 {
return ErrInvalidRequest
}
return db .vlog .runGC (discardRatio )
}
func (db *DB ) Size () (lsm , vlog int64 ) {
if y .LSMSizeGet (db .opt .MetricsEnabled , db .opt .Dir ) == nil {
lsm , vlog = 0 , 0
return
}
lsm = y .LSMSizeGet (db .opt .MetricsEnabled , db .opt .Dir ).(*expvar .Int ).Value ()
vlog = y .VlogSizeGet (db .opt .MetricsEnabled , db .opt .ValueDir ).(*expvar .Int ).Value ()
return
}
type Sequence struct {
lock sync .Mutex
db *DB
key []byte
next uint64
leased uint64
bandwidth uint64
}
func (seq *Sequence ) Next () (uint64 , error ) {
seq .lock .Lock ()
defer seq .lock .Unlock ()
if seq .next >= seq .leased {
if err := seq .updateLease (); err != nil {
return 0 , err
}
}
val := seq .next
seq .next ++
return val , nil
}
func (seq *Sequence ) Release () error {
seq .lock .Lock ()
defer seq .lock .Unlock ()
err := seq .db .Update (func (txn *Txn ) error {
item , err := txn .Get (seq .key )
if err != nil {
return err
}
var num uint64
if err := item .Value (func (v []byte ) error {
num = binary .BigEndian .Uint64 (v )
return nil
}); err != nil {
return err
}
if num == seq .leased {
var buf [8 ]byte
binary .BigEndian .PutUint64 (buf [:], seq .next )
return txn .SetEntry (NewEntry (seq .key , buf [:]))
}
return nil
})
if err != nil {
return err
}
seq .leased = seq .next
return nil
}
func (seq *Sequence ) updateLease () error {
return seq .db .Update (func (txn *Txn ) error {
item , err := txn .Get (seq .key )
switch {
case err == ErrKeyNotFound :
seq .next = 0
case err != nil :
return err
default :
var num uint64
if err := item .Value (func (v []byte ) error {
num = binary .BigEndian .Uint64 (v )
return nil
}); err != nil {
return err
}
seq .next = num
}
lease := seq .next + seq .bandwidth
var buf [8 ]byte
binary .BigEndian .PutUint64 (buf [:], lease )
if err = txn .SetEntry (NewEntry (seq .key , buf [:])); err != nil {
return err
}
seq .leased = lease
return nil
})
}
func (db *DB ) GetSequence (key []byte , bandwidth uint64 ) (*Sequence , error ) {
if db .opt .managedTxns {
panic ("Cannot use GetSequence with managedDB=true." )
}
switch {
case len (key ) == 0 :
return nil , ErrEmptyKey
case bandwidth == 0 :
return nil , ErrZeroBandwidth
}
seq := &Sequence {
db : db ,
key : key ,
next : 0 ,
leased : 0 ,
bandwidth : bandwidth ,
}
err := seq .updateLease ()
return seq , err
}
func (db *DB ) Tables () []TableInfo {
return db .lc .getTableInfo ()
}
func (db *DB ) Levels () []LevelInfo {
return db .lc .getLevelInfo ()
}
func (db *DB ) EstimateSize (prefix []byte ) (uint64 , uint64 ) {
var onDiskSize , uncompressedSize uint64
tables := db .Tables ()
for _ , ti := range tables {
if bytes .HasPrefix (ti .Left , prefix ) && bytes .HasPrefix (ti .Right , prefix ) {
onDiskSize += uint64 (ti .OnDiskSize )
uncompressedSize += uint64 (ti .UncompressedSize )
}
}
return onDiskSize , uncompressedSize
}
func (db *DB ) Ranges (prefix []byte , numRanges int ) []*keyRange {
var splits []string
tables := db .Tables ()
for _ , ti := range tables {
if ti .Level != db .opt .MaxLevels -1 {
continue
}
if bytes .HasPrefix (ti .Right , prefix ) {
splits = append (splits , string (ti .Right ))
}
}
if len (splits ) < 32 {
numTables := len (tables )
if numTables == 0 {
numTables = 1
}
numPerTable := 32 / numTables
if numPerTable == 0 {
numPerTable = 1
}
splits = db .lc .keySplits (numPerTable , prefix )
}
if len (splits ) < 32 {
maxPerSplit := 10000
mtSplits := func (mt *memTable ) {
if mt == nil {
return
}
count := 0
iter := mt .sl .NewIterator ()
for iter .SeekToFirst (); iter .Valid (); iter .Next () {
if count %maxPerSplit == 0 {
if bytes .HasPrefix (iter .Key (), prefix ) {
splits = append (splits , string (iter .Key ()))
}
}
count += 1
}
_ = iter .Close ()
}
db .lock .Lock ()
defer db .lock .Unlock ()
var memTables []*memTable
memTables = append (memTables , db .imm ...)
for _ , mt := range memTables {
mtSplits (mt )
}
mtSplits (db .mt )
}
sort .Strings (splits )
var ranges []*keyRange
var start []byte
for _ , key := range splits {
ranges = append (ranges , &keyRange {left : start , right : y .SafeCopy (nil , []byte (key ))})
start = y .SafeCopy (nil , []byte (key ))
}
ranges = append (ranges , &keyRange {left : start })
for _ , t := range tables {
tr := keyRange {left : t .Left , right : t .Right }
for _ , r := range ranges {
if len (r .left ) == 0 || len (r .right ) == 0 {
continue
}
if r .overlapsWith (tr ) {
r .size += int64 (t .UncompressedSize )
}
}
}
var total int64
for _ , r := range ranges {
total += r .size
}
if total == 0 {
return ranges
}
avg := total / int64 (numRanges )
var out []*keyRange
var i int
for i < len (ranges ) {
r := ranges [i ]
cur := &keyRange {left : r .left , size : r .size , right : r .right }
i ++
for ; i < len (ranges ); i ++ {
next := ranges [i ]
if cur .size +next .size > avg {
break
}
cur .right = next .right
cur .size += next .size
}
out = append (out , cur )
}
return out
}
func (db *DB ) MaxBatchCount () int64 {
return db .opt .maxBatchCount
}
func (db *DB ) MaxBatchSize () int64 {
return db .opt .maxBatchSize
}
func (db *DB ) stopMemoryFlush () {
if db .closers .memtable != nil {
close (db .flushChan )
db .closers .memtable .SignalAndWait ()
}
}
func (db *DB ) stopCompactions () {
if db .closers .compactors != nil {
db .closers .compactors .SignalAndWait ()
}
}
func (db *DB ) startCompactions () {
if db .closers .compactors != nil {
db .closers .compactors = z .NewCloser (1 )
db .lc .startCompact (db .closers .compactors )
}
}
func (db *DB ) startMemoryFlush () {
if db .closers .memtable != nil {
db .flushChan = make (chan *memTable , db .opt .NumMemtables )
db .closers .memtable = z .NewCloser (1 )
go func () {
db .flushMemtable (db .closers .memtable )
}()
}
}
func (db *DB ) Flatten (workers int ) error {
db .stopCompactions ()
defer db .startCompactions ()
compactAway := func (cp compactionPriority ) error {
db .opt .Infof ("Attempting to compact with %+v\n" , cp )
errCh := make (chan error , 1 )
for i := 0 ; i < workers ; i ++ {
go func () {
errCh <- db .lc .doCompact (175 , cp )
}()
}
var success int
var rerr error
for i := 0 ; i < workers ; i ++ {
err := <-errCh
if err != nil {
rerr = err
db .opt .Warningf ("While running doCompact with %+v. Error: %v\n" , cp , err )
} else {
success ++
}
}
if success == 0 {
return rerr
}
db .opt .Infof ("%d compactor(s) succeeded. One or more tables from level %d compacted.\n" ,
success , cp .level )
return nil
}
hbytes := func (sz int64 ) string {
return humanize .IBytes (uint64 (sz ))
}
t := db .lc .levelTargets ()
for {
db .opt .Infof ("\n" )
var levels []int
for i , l := range db .lc .levels {
sz := l .getTotalSize ()
db .opt .Infof ("Level: %d. %8s Size. %8s Max.\n" ,
i , hbytes (l .getTotalSize ()), hbytes (t .targetSz [i ]))
if sz > 0 {
levels = append (levels , i )
}
}
if len (levels ) <= 1 {
prios := db .lc .pickCompactLevels (nil )
if len (prios ) == 0 || prios [0 ].score <= 1.0 {
db .opt .Infof ("All tables consolidated into one level. Flattening done.\n" )
return nil
}
if err := compactAway (prios [0 ]); err != nil {
return err
}
continue
}
cp := compactionPriority {level : levels [0 ], score : 1.71 }
if err := compactAway (cp ); err != nil {
return err
}
}
}
func (db *DB ) blockWrite () error {
if !db .blockWrites .CompareAndSwap (0 , 1 ) {
return ErrBlockedWrites
}
db .closers .writes .SignalAndWait ()
db .opt .Infof ("Writes flushed. Stopping compactions now..." )
return nil
}
func (db *DB ) unblockWrite () {
db .closers .writes = z .NewCloser (1 )
go db .doWrites (db .closers .writes )
db .blockWrites .Store (0 )
}
func (db *DB ) prepareToDrop () (func (), error ) {
if db .opt .ReadOnly {
panic ("Attempting to drop data in read-only mode." )
}
if err := db .blockWrite (); err != nil {
return func () {}, err
}
reqs := make ([]*request , 0 , 10 )
for {
select {
case r := <- db .writeCh :
reqs = append (reqs , r )
default :
if err := db .writeRequests (reqs ); err != nil {
db .opt .Errorf ("writeRequests: %v" , err )
}
db .stopMemoryFlush ()
return func () {
db .opt .Infof ("Resuming writes" )
db .startMemoryFlush ()
db .unblockWrite ()
}, nil
}
}
}
func (db *DB ) DropAll () error {
f , err := db .dropAll ()
if f != nil {
f ()
}
return err
}
func (db *DB ) dropAll () (func (), error ) {
db .opt .Infof ("DropAll called. Blocking writes..." )
f , err := db .prepareToDrop ()
if err != nil {
return f , err
}
db .stopCompactions ()
resume := func () {
db .startCompactions ()
f ()
}
db .lock .Lock ()
defer db .lock .Unlock ()
db .mt .DecrRef ()
for _ , mt := range db .imm {
mt .DecrRef ()
}
db .imm = db .imm [:0 ]
db .mt , err = db .newMemTable ()
if err != nil {
return resume , y .Wrapf (err , "cannot open new memtable" )
}
num , err := db .lc .dropTree ()
if err != nil {
return resume , err
}
db .opt .Infof ("Deleted %d SSTables. Now deleting value logs...\n" , num )
num , err = db .vlog .dropAll ()
if err != nil {
return resume , err
}
db .lc .nextFileID .Store (1 )
db .opt .Infof ("Deleted %d value log files. DropAll done.\n" , num )
db .blockCache .Clear ()
db .indexCache .Clear ()
db .threshold .Clear (db .opt )
return resume , nil
}
func (db *DB ) DropPrefix (prefixes ...[]byte ) error {
if len (prefixes ) == 0 {
return nil
}
db .opt .Infof ("DropPrefix called for %s" , prefixes )
f , err := db .prepareToDrop ()
if err != nil {
return err
}
defer f ()
var filtered [][]byte
if filtered , err = db .filterPrefixesToDrop (prefixes ); err != nil {
return err
}
if len (filtered ) == 0 {
db .opt .Infof ("No prefixes to drop" )
return nil
}
db .lock .Lock ()
defer db .lock .Unlock ()
db .imm = append (db .imm , db .mt )
for _ , memtable := range db .imm {
if memtable .sl .Empty () {
memtable .DecrRef ()
continue
}
db .opt .Debugf ("Flushing memtable" )
if err := db .handleMemTableFlush (memtable , filtered ); err != nil {
db .opt .Errorf ("While trying to flush memtable: %v" , err )
return err
}
memtable .DecrRef ()
}
db .stopCompactions ()
defer db .startCompactions ()
db .imm = db .imm [:0 ]
db .mt , err = db .newMemTable ()
if err != nil {
return y .Wrapf (err , "cannot create new mem table" )
}
if err := db .lc .dropPrefixes (filtered ); err != nil {
return err
}
db .opt .Infof ("DropPrefix done" )
return nil
}
func (db *DB ) filterPrefixesToDrop (prefixes [][]byte ) ([][]byte , error ) {
var filtered [][]byte
for _ , prefix := range prefixes {
err := db .View (func (txn *Txn ) error {
iopts := DefaultIteratorOptions
iopts .Prefix = prefix
iopts .PrefetchValues = false
itr := txn .NewIterator (iopts )
defer itr .Close ()
itr .Rewind ()
if itr .ValidForPrefix (prefix ) {
filtered = append (filtered , prefix )
}
return nil
})
if err != nil {
return filtered , err
}
}
return filtered , nil
}
func (db *DB ) isBanned (key []byte ) error {
if db .opt .NamespaceOffset < 0 {
return nil
}
if len (key ) <= db .opt .NamespaceOffset +8 {
return nil
}
if db .bannedNamespaces .has (y .BytesToU64 (key [db .opt .NamespaceOffset :])) {
return ErrBannedKey
}
return nil
}
func (db *DB ) BanNamespace (ns uint64 ) error {
if db .opt .NamespaceOffset < 0 {
return ErrNamespaceMode
}
db .opt .Infof ("Banning namespace: %d" , ns )
key := y .KeyWithTs (append (bannedNsKey , y .U64ToBytes (ns )...), 1 )
entry := []*Entry {{
Key : key ,
Value : nil ,
}}
req , err := db .sendToWriteCh (entry )
if err != nil {
return err
}
if err := req .Wait (); err != nil {
return err
}
db .bannedNamespaces .add (ns )
return nil
}
func (db *DB ) BannedNamespaces () []uint64 {
return db .bannedNamespaces .all ()
}
type KVList = pb .KVList
func (db *DB ) Subscribe (ctx context .Context , cb func (kv *KVList ) error , matches []pb .Match ) error {
if cb == nil {
return ErrNilCallback
}
c := z .NewCloser (1 )
s , err := db .pub .newSubscriber (c , matches )
if err != nil {
return y .Wrapf (err , "while creating a new subscriber" )
}
slurp := func (batch *pb .KVList ) error {
for {
select {
case kvs := <- s .sendCh :
batch .Kv = append (batch .Kv , kvs .Kv ...)
default :
if len (batch .GetKv ()) > 0 {
return cb (batch )
}
return nil
}
}
}
drain := func () {
for {
select {
case _ , ok := <- s .sendCh :
if !ok {
return
}
default :
return
}
}
}
for {
select {
case <- c .HasBeenClosed ():
err := slurp (new (pb .KVList ))
c .Done ()
return err
case <- ctx .Done ():
c .Done ()
s .active .Store (0 )
drain ()
db .pub .deleteSubscriber (s .id )
return ctx .Err ()
case batch := <- s .sendCh :
err := slurp (batch )
if err != nil {
c .Done ()
s .active .Store (0 )
drain ()
db .pub .deleteSubscriber (s .id )
return err
}
}
}
}
func (db *DB ) syncDir (dir string ) error {
if db .opt .InMemory {
return nil
}
return syncDir (dir )
}
func createDirs(opt Options ) error {
for _ , path := range []string {opt .Dir , opt .ValueDir } {
dirExists , err := exists (path )
if err != nil {
return y .Wrapf (err , "Invalid Dir: %q" , path )
}
if !dirExists {
if opt .ReadOnly {
return fmt .Errorf ("Cannot find directory %q for read-only open" , path )
}
err = os .MkdirAll (path , 0700 )
if err != nil {
return y .Wrapf (err , "Error Creating Dir: %q" , path )
}
}
}
return nil
}
func (db *DB ) StreamDB (outOptions Options ) error {
outDir := outOptions .Dir
outDB , err := OpenManaged (outOptions )
if err != nil {
return y .Wrapf (err , "cannot open out DB at %s" , outDir )
}
defer outDB .Close ()
writer := outDB .NewStreamWriter ()
if err := writer .Prepare (); err != nil {
return y .Wrapf (err , "cannot create stream writer in out DB at %s" , outDir )
}
stream := db .NewStreamAt (math .MaxUint64 )
stream .LogPrefix = fmt .Sprintf ("Streaming DB to new DB at %s" , outDir )
stream .Send = func (buf *z .Buffer ) error {
return writer .Write (buf )
}
if err := stream .Orchestrate (context .Background ()); err != nil {
return y .Wrapf (err , "cannot stream DB to out DB at %s" , outDir )
}
if err := writer .Flush (); err != nil {
return y .Wrapf (err , "cannot flush writer" )
}
return nil
}
func (db *DB ) Opts () Options {
return db .opt
}
type CacheType int
const (
BlockCache CacheType = iota
IndexCache
)
func (db *DB ) CacheMaxCost (cache CacheType , maxCost int64 ) (int64 , error ) {
if db == nil {
return 0 , nil
}
if maxCost < 0 {
switch cache {
case BlockCache :
return db .blockCache .MaxCost (), nil
case IndexCache :
return db .indexCache .MaxCost (), nil
default :
return 0 , errors .New ("invalid cache type" )
}
}
switch cache {
case BlockCache :
db .blockCache .UpdateMaxCost (maxCost )
return maxCost , nil
case IndexCache :
db .indexCache .UpdateMaxCost (maxCost )
return maxCost , nil
default :
return 0 , errors .New ("invalid cache type" )
}
}
func (db *DB ) LevelsToString () string {
levels := db .Levels ()
h := func (sz int64 ) string {
return humanize .IBytes (uint64 (sz ))
}
base := func (b bool ) string {
if b {
return "B"
}
return " "
}
var b strings .Builder
b .WriteRune ('\n' )
for _ , li := range levels {
b .WriteString (fmt .Sprintf (
"Level %d [%s]: NumTables: %02d. Size: %s of %s. Score: %.2f->%.2f" +
" StaleData: %s Target FileSize: %s\n" ,
li .Level , base (li .IsBaseLevel ), li .NumTables ,
h (li .Size ), h (li .TargetSize ), li .Score , li .Adjusted , h (li .StaleDatSize ),
h (li .TargetFileSize )))
}
b .WriteString ("Level Done\n" )
return b .String ()
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .