package badger
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"math"
"math/rand"
"os"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/badger/v4/table"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2/z"
)
type levelsController struct {
nextFileID atomic .Uint64
l0stallsMs atomic .Int64
levels []*levelHandler
kv *DB
cstatus compactStatus
}
func revertToManifest(kv *DB , mf *Manifest , idMap map [uint64 ]struct {}) error {
for id := range mf .Tables {
if _ , ok := idMap [id ]; !ok {
return fmt .Errorf ("file does not exist for table %d" , id )
}
}
for id := range idMap {
if _ , ok := mf .Tables [id ]; !ok {
kv .opt .Debugf ("Table file %d not referenced in MANIFEST\n" , id )
filename := table .NewFilename (id , kv .opt .Dir )
if err := os .Remove (filename ); err != nil {
return y .Wrapf (err , "While removing table %d" , id )
}
}
}
return nil
}
func newLevelsController(db *DB , mf *Manifest ) (*levelsController , error ) {
y .AssertTrue (db .opt .NumLevelZeroTablesStall > db .opt .NumLevelZeroTables )
s := &levelsController {
kv : db ,
levels : make ([]*levelHandler , db .opt .MaxLevels ),
}
s .cstatus .tables = make (map [uint64 ]struct {})
s .cstatus .levels = make ([]*levelCompactStatus , db .opt .MaxLevels )
for i := 0 ; i < db .opt .MaxLevels ; i ++ {
s .levels [i ] = newLevelHandler (db , i )
s .cstatus .levels [i ] = new (levelCompactStatus )
}
if db .opt .InMemory {
return s , nil
}
if err := revertToManifest (db , mf , getIDMap (db .opt .Dir )); err != nil {
return nil , err
}
var mu sync .Mutex
tables := make ([][]*table .Table , db .opt .MaxLevels )
var maxFileID uint64
throttle := y .NewThrottle (3 )
start := time .Now ()
var numOpened atomic .Int32
tick := time .NewTicker (3 * time .Second )
defer tick .Stop ()
for fileID , tf := range mf .Tables {
fname := table .NewFilename (fileID , db .opt .Dir )
select {
case <- tick .C :
db .opt .Infof ("%d tables out of %d opened in %s\n" , numOpened .Load (),
len (mf .Tables ), time .Since (start ).Round (time .Millisecond ))
default :
}
if err := throttle .Do (); err != nil {
closeAllTables (tables )
return nil , err
}
if fileID > maxFileID {
maxFileID = fileID
}
go func (fname string , tf TableManifest ) {
var rerr error
defer func () {
throttle .Done (rerr )
numOpened .Add (1 )
}()
dk , err := db .registry .DataKey (tf .KeyID )
if err != nil {
rerr = y .Wrapf (err , "Error while reading datakey" )
return
}
topt := buildTableOptions (db )
topt .Compression = tf .Compression
topt .DataKey = dk
mf , err := z .OpenMmapFile (fname , db .opt .getFileFlags (), 0 )
if err != nil {
rerr = y .Wrapf (err , "Opening file: %q" , fname )
return
}
t , err := table .OpenTable (mf , topt )
if err != nil {
if strings .HasPrefix (err .Error(), "CHECKSUM_MISMATCH:" ) {
db .opt .Errorf (err .Error())
db .opt .Errorf ("Ignoring table %s" , mf .Fd .Name ())
} else {
rerr = y .Wrapf (err , "Opening table: %q" , fname )
}
return
}
mu .Lock ()
tables [tf .Level ] = append (tables [tf .Level ], t )
mu .Unlock ()
}(fname , tf )
}
if err := throttle .Finish (); err != nil {
closeAllTables (tables )
return nil , err
}
db .opt .Infof ("All %d tables opened in %s\n" , numOpened .Load (),
time .Since (start ).Round (time .Millisecond ))
s .nextFileID .Store (maxFileID + 1 )
for i , tbls := range tables {
s .levels [i ].initTables (tbls )
}
if err := s .validate (); err != nil {
_ = s .cleanupLevels ()
return nil , y .Wrap (err , "Level validation" )
}
if err := syncDir (db .opt .Dir ); err != nil {
_ = s .close ()
return nil , err
}
return s , nil
}
func closeAllTables(tables [][]*table .Table ) {
for _ , tableSlice := range tables {
for _ , table := range tableSlice {
_ = table .Close (-1 )
}
}
}
func (s *levelsController ) cleanupLevels () error {
var firstErr error
for _ , l := range s .levels {
if err := l .close (); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
func (s *levelsController ) dropTree () (int , error ) {
var all []*table .Table
for _ , l := range s .levels {
l .RLock ()
all = append (all , l .tables ...)
l .RUnlock ()
}
if len (all ) == 0 {
return 0 , nil
}
changes := []*pb .ManifestChange {}
for _ , table := range all {
if !table .IsInmemory {
changes = append (changes , newDeleteChange (table .ID ()))
}
}
changeSet := pb .ManifestChangeSet {Changes : changes }
if err := s .kv .manifest .addChanges (changeSet .Changes , s .kv .opt ); err != nil {
return 0 , err
}
for _ , l := range s .levels {
l .Lock ()
l .totalSize = 0
l .tables = l .tables [:0 ]
l .Unlock ()
}
for _ , table := range all {
if err := table .DecrRef (); err != nil {
return 0 , err
}
}
return len (all ), nil
}
func (s *levelsController ) dropPrefixes (prefixes [][]byte ) error {
opt := s .kv .opt
for i := len (s .levels ) - 1 ; i >= 0 ; i -- {
l := s .levels [i ]
l .RLock ()
if l .level == 0 {
size := len (l .tables )
l .RUnlock ()
if size > 0 {
cp := compactionPriority {
level : 0 ,
score : 1.74 ,
dropPrefixes : prefixes ,
}
if err := s .doCompact (174 , cp ); err != nil {
opt .Warningf ("While compacting level 0: %v" , err )
return nil
}
}
continue
}
var tableGroups [][]*table .Table
var tableGroup []*table .Table
finishGroup := func () {
if len (tableGroup ) > 0 {
tableGroups = append (tableGroups , tableGroup )
tableGroup = nil
}
}
for _ , table := range l .tables {
if containsAnyPrefixes (table , prefixes ) {
tableGroup = append (tableGroup , table )
} else {
finishGroup ()
}
}
finishGroup ()
l .RUnlock ()
if len (tableGroups ) == 0 {
continue
}
opt .Infof ("Dropping prefix at level %d (%d tableGroups)" , l .level , len (tableGroups ))
for _ , operation := range tableGroups {
cd := compactDef {
thisLevel : l ,
nextLevel : l ,
top : nil ,
bot : operation ,
dropPrefixes : prefixes ,
t : s .levelTargets (),
}
_ , span := otel .Tracer ("" ).Start (context .TODO (), "Badger.Compaction" )
span .SetAttributes (attribute .Int ("Compaction level" , l .level ))
span .SetAttributes (attribute .String ("Drop Prefixes" , fmt .Sprintf ("%v" , prefixes )))
cd .t .baseLevel = l .level
if err := s .runCompactDef (-1 , l .level , cd ); err != nil {
opt .Warningf ("While running compact def: %+v. Error: %v" , cd , err )
span .End ()
return err
}
span .SetAttributes (
attribute .Int ("Top tables count" , len (cd .top )),
attribute .Int ("Bottom tables count" , len (cd .bot )))
span .End ()
}
}
return nil
}
func (s *levelsController ) startCompact (lc *z .Closer ) {
n := s .kv .opt .NumCompactors
lc .AddRunning (n - 1 )
for i := 0 ; i < n ; i ++ {
go s .runCompactor (i , lc )
}
}
type targets struct {
baseLevel int
targetSz []int64
fileSz []int64
}
func (s *levelsController ) levelTargets () targets {
adjust := func (sz int64 ) int64 {
if sz < s .kv .opt .BaseLevelSize {
return s .kv .opt .BaseLevelSize
}
return sz
}
t := targets {
targetSz : make ([]int64 , len (s .levels )),
fileSz : make ([]int64 , len (s .levels )),
}
dbSize := s .lastLevel ().getTotalSize ()
for i := len (s .levels ) - 1 ; i > 0 ; i -- {
ltarget := adjust (dbSize )
t .targetSz [i ] = ltarget
if t .baseLevel == 0 && ltarget <= s .kv .opt .BaseLevelSize {
t .baseLevel = i
}
dbSize /= int64 (s .kv .opt .LevelSizeMultiplier )
}
tsz := s .kv .opt .BaseTableSize
for i := 0 ; i < len (s .levels ); i ++ {
if i == 0 {
t .fileSz [i ] = s .kv .opt .MemTableSize
} else if i <= t .baseLevel {
t .fileSz [i ] = tsz
} else {
tsz *= int64 (s .kv .opt .TableSizeMultiplier )
t .fileSz [i ] = tsz
}
}
for i := t .baseLevel + 1 ; i < len (s .levels )-1 ; i ++ {
if s .levels [i ].getTotalSize () > 0 {
break
}
t .baseLevel = i
}
b := t .baseLevel
lvl := s .levels
if b < len (lvl )-1 && lvl [b ].getTotalSize () == 0 && lvl [b +1 ].getTotalSize () < t .targetSz [b +1 ] {
t .baseLevel ++
}
return t
}
func (s *levelsController ) runCompactor (id int , lc *z .Closer ) {
defer lc .Done ()
randomDelay := time .NewTimer (time .Duration (rand .Int31n (1000 )) * time .Millisecond )
select {
case <- randomDelay .C :
case <- lc .HasBeenClosed ():
randomDelay .Stop ()
return
}
moveL0toFront := func (prios []compactionPriority ) []compactionPriority {
idx := -1
for i , p := range prios {
if p .level == 0 {
idx = i
break
}
}
if idx > 0 {
out := append ([]compactionPriority {}, prios [idx ])
out = append (out , prios [:idx ]...)
out = append (out , prios [idx +1 :]...)
return out
}
return prios
}
run := func (p compactionPriority ) bool {
err := s .doCompact (id , p )
switch err {
case nil :
return true
case errFillTables :
default :
s .kv .opt .Warningf ("While running doCompact: %v\n" , err )
}
return false
}
var priosBuffer []compactionPriority
runOnce := func () bool {
prios := s .pickCompactLevels (priosBuffer )
defer func () {
priosBuffer = prios
}()
if id == 0 {
prios = moveL0toFront (prios )
}
for _ , p := range prios {
if id == 0 && p .level == 0 {
} else if p .adjusted < 1.0 {
break
}
if run (p ) {
return true
}
}
return false
}
tryLmaxToLmaxCompaction := func () {
p := compactionPriority {
level : s .lastLevel ().level ,
t : s .levelTargets (),
}
run (p )
}
count := 0
ticker := time .NewTicker (50 * time .Millisecond )
defer ticker .Stop ()
for {
select {
case <- ticker .C :
count ++
if s .kv .opt .LmaxCompaction && id == 2 && count >= 200 {
tryLmaxToLmaxCompaction ()
count = 0
} else {
runOnce ()
}
case <- lc .HasBeenClosed ():
return
}
}
}
type compactionPriority struct {
level int
score float64
adjusted float64
dropPrefixes [][]byte
t targets
}
func (s *levelsController ) lastLevel () *levelHandler {
return s .levels [len (s .levels )-1 ]
}
func (s *levelsController ) pickCompactLevels (priosBuffer []compactionPriority ) (prios []compactionPriority ) {
t := s .levelTargets ()
addPriority := func (level int , score float64 ) {
pri := compactionPriority {
level : level ,
score : score ,
adjusted : score ,
t : t ,
}
prios = append (prios , pri )
}
if cap (priosBuffer ) < len (s .levels ) {
priosBuffer = make ([]compactionPriority , 0 , len (s .levels ))
}
prios = priosBuffer [:0 ]
addPriority (0 , float64 (s .levels [0 ].numTables ())/float64 (s .kv .opt .NumLevelZeroTables ))
for i := 1 ; i < len (s .levels ); i ++ {
delSize := s .cstatus .delSize (i )
l := s .levels [i ]
sz := l .getTotalSize () - delSize
addPriority (i , float64 (sz )/float64 (t .targetSz [i ]))
}
y .AssertTrue (len (prios ) == len (s .levels ))
var prevLevel int
for level := t .baseLevel ; level < len (s .levels ); level ++ {
if prios [prevLevel ].adjusted >= 1 {
const minScore = 0.01
if prios [level ].score >= minScore {
prios [prevLevel ].adjusted /= prios [level ].adjusted
} else {
prios [prevLevel ].adjusted /= minScore
}
}
prevLevel = level
}
out := prios [:0 ]
for _ , p := range prios [:len (prios )-1 ] {
if p .score >= 1.0 {
out = append (out , p )
}
}
prios = out
sort .Slice (prios , func (i , j int ) bool {
return prios [i ].adjusted > prios [j ].adjusted
})
return prios
}
func (s *levelsController ) checkOverlap (tables []*table .Table , lev int ) bool {
kr := getKeyRange (tables ...)
for i , lh := range s .levels {
if i < lev {
continue
}
lh .RLock ()
left , right := lh .overlappingTables (levelHandlerRLocked {}, kr )
lh .RUnlock ()
if right -left > 0 {
return true
}
}
return false
}
func (s *levelsController ) subcompact (it y .Iterator , kr keyRange , cd compactDef ,
inflightBuilders *y .Throttle , res chan <- *table .Table ) {
hasOverlap := s .checkOverlap (cd .allTables (), cd .nextLevel .level +1 )
discardTs := s .kv .orc .discardAtOrBelow ()
discardStats := make (map [uint32 ]int64 )
updateStats := func (vs y .ValueStruct ) {
if s .kv .opt .InMemory {
return
}
if vs .Meta &bitValuePointer > 0 {
var vp valuePointer
vp .Decode (vs .Value )
discardStats [vp .Fid ] += int64 (vp .Len )
}
}
exceedsAllowedOverlap := func (kr keyRange ) bool {
n2n := cd .nextLevel .level + 1
if n2n <= 1 || n2n >= len (s .levels ) {
return false
}
n2nl := s .levels [n2n ]
n2nl .RLock ()
defer n2nl .RUnlock ()
l , r := n2nl .overlappingTables (levelHandlerRLocked {}, kr )
return r -l >= 10
}
var (
lastKey , skipKey []byte
numBuilds , numVersions int
firstKeyHasDiscardSet bool
)
addKeys := func (builder *table .Builder ) {
timeStart := time .Now ()
var numKeys , numSkips uint64
var rangeCheck int
var tableKr keyRange
for ; it .Valid (); it .Next () {
if len (cd .dropPrefixes ) > 0 && hasAnyPrefixes (it .Key (), cd .dropPrefixes ) {
numSkips ++
updateStats (it .Value ())
continue
}
if len (skipKey ) > 0 {
if y .SameKey (it .Key (), skipKey ) {
numSkips ++
updateStats (it .Value ())
continue
} else {
skipKey = skipKey [:0 ]
}
}
if !y .SameKey (it .Key (), lastKey ) {
firstKeyHasDiscardSet = false
if len (kr .right ) > 0 && y .CompareKeys (it .Key (), kr .right ) >= 0 {
break
}
if builder .ReachedCapacity () {
break
}
lastKey = y .SafeCopy (lastKey , it .Key ())
numVersions = 0
firstKeyHasDiscardSet = it .Value ().Meta &bitDiscardEarlierVersions > 0
if len (tableKr .left ) == 0 {
tableKr .left = y .SafeCopy (tableKr .left , it .Key ())
}
tableKr .right = lastKey
rangeCheck ++
if rangeCheck %5000 == 0 {
if exceedsAllowedOverlap (tableKr ) {
break
}
}
}
vs := it .Value ()
version := y .ParseTs (it .Key ())
isExpired := isDeletedOrExpired (vs .Meta , vs .ExpiresAt )
if version <= discardTs && vs .Meta &bitMergeEntry == 0 {
numVersions ++
lastValidVersion := vs .Meta &bitDiscardEarlierVersions > 0 ||
numVersions == s .kv .opt .NumVersionsToKeep
if isExpired || lastValidVersion {
skipKey = y .SafeCopy (skipKey , it .Key ())
switch {
case !isExpired && lastValidVersion :
case hasOverlap :
default :
numSkips ++
updateStats (vs )
continue
}
}
}
numKeys ++
var vp valuePointer
if vs .Meta &bitValuePointer > 0 {
vp .Decode (vs .Value )
}
switch {
case firstKeyHasDiscardSet :
builder .AddStaleKey (it .Key (), vs , vp .Len )
case isExpired :
builder .AddStaleKey (it .Key (), vs , vp .Len )
default :
builder .Add (it .Key (), vs , vp .Len )
}
}
s .kv .opt .Debugf ("[%d] LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v" ,
cd .compactorId , numKeys , numSkips , time .Since (timeStart ).Round (time .Millisecond ))
}
if len (kr .left ) > 0 {
it .Seek (kr .left )
} else {
it .Rewind ()
}
for it .Valid () {
if len (kr .right ) > 0 && y .CompareKeys (it .Key (), kr .right ) >= 0 {
break
}
bopts := buildTableOptions (s .kv )
bopts .TableSize = uint64 (cd .t .fileSz [cd .nextLevel .level ])
builder := table .NewTableBuilder (bopts )
addKeys (builder )
if builder .Empty () {
builder .Finish ()
builder .Close ()
continue
}
numBuilds ++
if err := inflightBuilders .Do (); err != nil {
break
}
go func (builder *table .Builder , fileID uint64 ) {
var err error
defer inflightBuilders .Done (err )
defer builder .Close ()
var tbl *table .Table
if s .kv .opt .InMemory {
tbl , err = table .OpenInMemoryTable (builder .Finish (), fileID , &bopts )
} else {
fname := table .NewFilename (fileID , s .kv .opt .Dir )
tbl , err = table .CreateTable (fname , builder )
}
if err != nil {
return
}
res <- tbl
}(builder , s .reserveFileID ())
}
s .kv .vlog .updateDiscardStats (discardStats )
s .kv .opt .Debugf ("Discard stats: %v" , discardStats )
}
func (s *levelsController ) compactBuildTables (
lev int , cd compactDef ) ([]*table .Table , func () error , error ) {
topTables := cd .top
botTables := cd .bot
numTables := int64 (len (topTables ) + len (botTables ))
y .NumCompactionTablesAdd (s .kv .opt .MetricsEnabled , numTables )
defer y .NumCompactionTablesAdd (s .kv .opt .MetricsEnabled , -numTables )
keepTable := func (t *table .Table ) bool {
for _ , prefix := range cd .dropPrefixes {
if bytes .HasPrefix (t .Smallest (), prefix ) &&
bytes .HasPrefix (t .Biggest (), prefix ) {
return false
}
}
return true
}
var valid []*table .Table
for _ , table := range botTables {
if keepTable (table ) {
valid = append (valid , table )
}
}
newIterator := func () []y .Iterator {
var iters []y .Iterator
switch {
case lev == 0 :
iters = appendIteratorsReversed (iters , topTables , table .NOCACHE )
case len (topTables ) > 0 :
y .AssertTrue (len (topTables ) == 1 )
iters = []y .Iterator {topTables [0 ].NewIterator (table .NOCACHE )}
}
return append (iters , table .NewConcatIterator (valid , table .NOCACHE ))
}
res := make (chan *table .Table , 3 )
inflightBuilders := y .NewThrottle (8 + len (cd .splits ))
for _ , kr := range cd .splits {
if err := inflightBuilders .Do (); err != nil {
s .kv .opt .Errorf ("cannot start subcompaction: %+v" , err )
return nil , nil , err
}
go func (kr keyRange ) {
defer inflightBuilders .Done (nil )
it := table .NewMergeIterator (newIterator (), false )
defer it .Close ()
s .subcompact (it , kr , cd , inflightBuilders , res )
}(kr )
}
var newTables []*table .Table
var wg sync .WaitGroup
wg .Add (1 )
go func () {
defer wg .Done ()
for t := range res {
newTables = append (newTables , t )
}
}()
err := inflightBuilders .Finish ()
close (res )
wg .Wait ()
if err == nil {
err = s .kv .syncDir (s .kv .opt .Dir )
}
if err != nil {
_ = decrRefs (newTables )
return nil , nil , y .Wrapf (err , "while running compactions for: %+v" , cd )
}
sort .Slice (newTables , func (i , j int ) bool {
return y .CompareKeys (newTables [i ].Biggest (), newTables [j ].Biggest ()) < 0
})
return newTables , func () error { return decrRefs (newTables ) }, nil
}
func buildChangeSet(cd *compactDef , newTables []*table .Table ) pb .ManifestChangeSet {
changes := []*pb .ManifestChange {}
for _ , table := range newTables {
changes = append (changes ,
newCreateChange (table .ID (), cd .nextLevel .level , table .KeyID (), table .CompressionType ()))
}
for _ , table := range cd .top {
if !table .IsInmemory {
changes = append (changes , newDeleteChange (table .ID ()))
}
}
for _ , table := range cd .bot {
changes = append (changes , newDeleteChange (table .ID ()))
}
return pb .ManifestChangeSet {Changes : changes }
}
func hasAnyPrefixes(s []byte , listOfPrefixes [][]byte ) bool {
for _ , prefix := range listOfPrefixes {
if bytes .HasPrefix (s , prefix ) {
return true
}
}
return false
}
func containsPrefix(table *table .Table , prefix []byte ) bool {
smallValue := table .Smallest ()
largeValue := table .Biggest ()
if bytes .HasPrefix (smallValue , prefix ) {
return true
}
if bytes .HasPrefix (largeValue , prefix ) {
return true
}
isPresent := func () bool {
ti := table .NewIterator (0 )
defer ti .Close ()
ti .Seek (y .KeyWithTs (prefix , math .MaxUint64 ))
return bytes .HasPrefix (ti .Key (), prefix )
}
if bytes .Compare (prefix , smallValue ) > 0 &&
bytes .Compare (prefix , largeValue ) < 0 {
return isPresent ()
}
return false
}
func containsAnyPrefixes(table *table .Table , listOfPrefixes [][]byte ) bool {
for _ , prefix := range listOfPrefixes {
if containsPrefix (table , prefix ) {
return true
}
}
return false
}
type compactDef struct {
compactorId int
t targets
p compactionPriority
thisLevel *levelHandler
nextLevel *levelHandler
top []*table .Table
bot []*table .Table
thisRange keyRange
nextRange keyRange
splits []keyRange
thisSize int64
dropPrefixes [][]byte
}
func (s *levelsController ) addSplits (cd *compactDef ) {
cd .splits = cd .splits [:0 ]
width := int (math .Ceil (float64 (len (cd .bot )) / 5.0 ))
if width < 3 {
width = 3
}
skr := cd .thisRange
skr .extend (cd .nextRange )
addRange := func (right []byte ) {
skr .right = y .Copy (right )
cd .splits = append (cd .splits , skr )
skr .left = skr .right
}
for i , t := range cd .bot {
if i == len (cd .bot )-1 {
addRange ([]byte {})
return
}
if i %width == width -1 {
right := y .KeyWithTs (y .ParseKey (t .Biggest ()), 0 )
addRange (right )
}
}
}
func (cd *compactDef ) lockLevels () {
cd .thisLevel .RLock ()
cd .nextLevel .RLock ()
}
func (cd *compactDef ) unlockLevels () {
cd .nextLevel .RUnlock ()
cd .thisLevel .RUnlock ()
}
func (cd *compactDef ) allTables () []*table .Table {
ret := make ([]*table .Table , 0 , len (cd .top )+len (cd .bot ))
ret = append (ret , cd .top ...)
ret = append (ret , cd .bot ...)
return ret
}
func (s *levelsController ) fillTablesL0ToL0 (cd *compactDef ) bool {
if cd .compactorId != 0 {
return false
}
cd .nextLevel = s .levels [0 ]
cd .nextRange = keyRange {}
cd .bot = nil
y .AssertTrue (cd .thisLevel .level == 0 )
y .AssertTrue (cd .nextLevel .level == 0 )
s .levels [0 ].RLock ()
defer s .levels [0 ].RUnlock ()
s .cstatus .Lock ()
defer s .cstatus .Unlock ()
top := cd .thisLevel .tables
var out []*table .Table
now := time .Now ()
for _ , t := range top {
if t .Size () >= 2 *cd .t .fileSz [0 ] {
continue
}
if now .Sub (t .CreatedAt ) < 10 *time .Second {
continue
}
if _ , beingCompacted := s .cstatus .tables [t .ID ()]; beingCompacted {
continue
}
out = append (out , t )
}
if len (out ) < 4 {
return false
}
cd .thisRange = infRange
cd .top = out
thisLevel := s .cstatus .levels [cd .thisLevel .level ]
thisLevel .ranges = append (thisLevel .ranges , infRange )
for _ , t := range out {
s .cstatus .tables [t .ID ()] = struct {}{}
}
cd .t .fileSz [0 ] = math .MaxUint32
return true
}
func (s *levelsController ) fillTablesL0ToLbase (cd *compactDef ) bool {
if cd .nextLevel .level == 0 {
panic ("Base level can't be zero." )
}
if cd .p .adjusted > 0.0 && cd .p .adjusted < 1.0 {
return false
}
cd .lockLevels ()
defer cd .unlockLevels ()
top := cd .thisLevel .tables
if len (top ) == 0 {
return false
}
var out []*table .Table
if len (cd .dropPrefixes ) > 0 {
out = top
} else {
var kr keyRange
for _ , t := range top {
dkr := getKeyRange (t )
if kr .overlapsWith (dkr ) {
out = append (out , t )
kr .extend (dkr )
} else {
break
}
}
}
cd .thisRange = getKeyRange (out ...)
cd .top = out
left , right := cd .nextLevel .overlappingTables (levelHandlerRLocked {}, cd .thisRange )
cd .bot = make ([]*table .Table , right -left )
copy (cd .bot , cd .nextLevel .tables [left :right ])
if len (cd .bot ) == 0 {
cd .nextRange = cd .thisRange
} else {
cd .nextRange = getKeyRange (cd .bot ...)
}
return s .cstatus .compareAndAdd (thisAndNextLevelRLocked {}, *cd )
}
func (s *levelsController ) fillTablesL0 (cd *compactDef ) bool {
if ok := s .fillTablesL0ToLbase (cd ); ok {
return true
}
return s .fillTablesL0ToL0 (cd )
}
func (s *levelsController ) sortByStaleDataSize (tables []*table .Table , cd *compactDef ) {
if len (tables ) == 0 || cd .nextLevel == nil {
return
}
sort .Slice (tables , func (i , j int ) bool {
return tables [i ].StaleDataSize () > tables [j ].StaleDataSize ()
})
}
func (s *levelsController ) sortByHeuristic (tables []*table .Table , cd *compactDef ) {
if len (tables ) == 0 || cd .nextLevel == nil {
return
}
sort .Slice (tables , func (i , j int ) bool {
return tables [i ].MaxVersion () < tables [j ].MaxVersion ()
})
}
func (s *levelsController ) fillMaxLevelTables (tables []*table .Table , cd *compactDef ) bool {
sortedTables := make ([]*table .Table , len (tables ))
copy (sortedTables , tables )
s .sortByStaleDataSize (sortedTables , cd )
if len (sortedTables ) > 0 && sortedTables [0 ].StaleDataSize () == 0 {
return false
}
cd .bot = []*table .Table {}
collectBotTables := func (t *table .Table , needSz int64 ) {
totalSize := t .Size ()
j := sort .Search (len (tables ), func (i int ) bool {
return y .CompareKeys (tables [i ].Smallest (), t .Smallest ()) >= 0
})
y .AssertTrue (tables [j ].ID () == t .ID ())
j ++
for j < len (tables ) {
newT := tables [j ]
totalSize += newT .Size ()
if totalSize >= needSz {
break
}
cd .bot = append (cd .bot , newT )
cd .nextRange .extend (getKeyRange (newT ))
j ++
}
}
now := time .Now ()
for _ , t := range sortedTables {
if t .MaxVersion () > s .kv .orc .discardAtOrBelow () {
continue
}
if now .Sub (t .CreatedAt ) < time .Hour {
continue
}
if t .StaleDataSize () < 10 <<20 {
continue
}
cd .thisSize = t .Size ()
cd .thisRange = getKeyRange (t )
cd .nextRange = cd .thisRange
if s .cstatus .overlapsWith (cd .thisLevel .level , cd .thisRange ) {
continue
}
cd .top = []*table .Table {t }
needFileSz := cd .t .fileSz [cd .thisLevel .level ]
if t .Size () >= needFileSz {
break
}
collectBotTables (t , needFileSz )
if !s .cstatus .compareAndAdd (thisAndNextLevelRLocked {}, *cd ) {
cd .bot = cd .bot [:0 ]
cd .nextRange = keyRange {}
continue
}
return true
}
if len (cd .top ) == 0 {
return false
}
return s .cstatus .compareAndAdd (thisAndNextLevelRLocked {}, *cd )
}
func (s *levelsController ) fillTables (cd *compactDef ) bool {
cd .lockLevels ()
defer cd .unlockLevels ()
tables := make ([]*table .Table , len (cd .thisLevel .tables ))
copy (tables , cd .thisLevel .tables )
if len (tables ) == 0 {
return false
}
if cd .thisLevel .isLastLevel () {
return s .fillMaxLevelTables (tables , cd )
}
s .sortByHeuristic (tables , cd )
for _ , t := range tables {
cd .thisSize = t .Size ()
cd .thisRange = getKeyRange (t )
if s .cstatus .overlapsWith (cd .thisLevel .level , cd .thisRange ) {
continue
}
cd .top = []*table .Table {t }
left , right := cd .nextLevel .overlappingTables (levelHandlerRLocked {}, cd .thisRange )
cd .bot = make ([]*table .Table , right -left )
copy (cd .bot , cd .nextLevel .tables [left :right ])
if len (cd .bot ) == 0 {
cd .bot = []*table .Table {}
cd .nextRange = cd .thisRange
if !s .cstatus .compareAndAdd (thisAndNextLevelRLocked {}, *cd ) {
continue
}
return true
}
cd .nextRange = getKeyRange (cd .bot ...)
if s .cstatus .overlapsWith (cd .nextLevel .level , cd .nextRange ) {
continue
}
if !s .cstatus .compareAndAdd (thisAndNextLevelRLocked {}, *cd ) {
continue
}
return true
}
return false
}
func (s *levelsController ) runCompactDef (id , l int , cd compactDef ) (err error ) {
if len (cd .t .fileSz ) == 0 {
return errors .New ("Filesizes cannot be zero. Targets are not set" )
}
timeStart := time .Now ()
thisLevel := cd .thisLevel
nextLevel := cd .nextLevel
y .AssertTrue (len (cd .splits ) == 0 )
if thisLevel .level == nextLevel .level {
} else {
s .addSplits (&cd )
}
if len (cd .splits ) == 0 {
cd .splits = append (cd .splits , keyRange {})
}
newTables , decr , err := s .compactBuildTables (l , cd )
if err != nil {
return err
}
defer func () {
if decErr := decr (); err == nil {
err = decErr
}
}()
changeSet := buildChangeSet (&cd , newTables )
if err := s .kv .manifest .addChanges (changeSet .Changes , s .kv .opt ); err != nil {
return err
}
getSizes := func (tables []*table .Table ) int64 {
size := int64 (0 )
for _ , i := range tables {
size += i .Size ()
}
return size
}
sizeNewTables := int64 (0 )
sizeOldTables := int64 (0 )
if s .kv .opt .MetricsEnabled {
sizeNewTables = getSizes (newTables )
sizeOldTables = getSizes (cd .bot ) + getSizes (cd .top )
y .NumBytesCompactionWrittenAdd (s .kv .opt .MetricsEnabled , nextLevel .strLevel , sizeNewTables )
}
if err := nextLevel .replaceTables (cd .bot , newTables ); err != nil {
return err
}
if err := thisLevel .deleteTables (cd .top ); err != nil {
return err
}
from := append (tablesToString (cd .top ), tablesToString (cd .bot )...)
to := tablesToString (newTables )
if dur := time .Since (timeStart ); dur > 2 *time .Second {
var expensive string
if dur > time .Second {
expensive = " [E]"
}
s .kv .opt .Infof ("[%d]%s LOG Compact %d->%d (%d, %d -> %d tables with %d splits)." +
" [%s] -> [%s], took %v\n, deleted %d bytes" ,
id , expensive , thisLevel .level , nextLevel .level , len (cd .top ), len (cd .bot ),
len (newTables ), len (cd .splits ), strings .Join (from , " " ), strings .Join (to , " " ),
dur .Round (time .Millisecond ), sizeOldTables -sizeNewTables )
}
if cd .thisLevel .level != 0 && len (newTables ) > 2 *s .kv .opt .LevelSizeMultiplier {
s .kv .opt .Infof ("This Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n" ,
len (cd .top ), hex .Dump (cd .thisRange .left ), hex .Dump (cd .thisRange .right ))
s .kv .opt .Infof ("Next Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n" ,
len (cd .bot ), hex .Dump (cd .nextRange .left ), hex .Dump (cd .nextRange .right ))
}
return nil
}
func tablesToString(tables []*table .Table ) []string {
var res []string
for _ , t := range tables {
res = append (res , fmt .Sprintf ("%05d" , t .ID ()))
}
res = append (res , "." )
return res
}
var errFillTables = errors .New ("Unable to fill tables" )
func (s *levelsController ) doCompact (id int , p compactionPriority ) error {
l := p .level
y .AssertTrue (l < s .kv .opt .MaxLevels )
if p .t .baseLevel == 0 {
p .t = s .levelTargets ()
}
_ , span := otel .Tracer ("" ).Start (context .TODO (), "Badger.Compaction" )
defer span .End ()
cd := compactDef {
compactorId : id ,
p : p ,
t : p .t ,
thisLevel : s .levels [l ],
dropPrefixes : p .dropPrefixes ,
}
if l == 0 {
cd .nextLevel = s .levels [p .t .baseLevel ]
if !s .fillTablesL0 (&cd ) {
return errFillTables
}
} else {
cd .nextLevel = cd .thisLevel
if !cd .thisLevel .isLastLevel () {
cd .nextLevel = s .levels [l +1 ]
}
if !s .fillTables (&cd ) {
return errFillTables
}
}
defer s .cstatus .delete (cd )
span .SetAttributes (attribute .String ("Compaction" , fmt .Sprintf ("%+v" , cd )))
if err := s .runCompactDef (id , l , cd ); err != nil {
s .kv .opt .Warningf ("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v" , id , err , cd )
return err
}
span .SetAttributes (
attribute .Int ("Top tables count" , len (cd .top )),
attribute .Int ("Bottom tables count" , len (cd .bot )))
s .kv .opt .Debugf ("[Compactor: %d] Compaction for level: %d DONE" , id , cd .thisLevel .level )
return nil
}
func (s *levelsController ) addLevel0Table (t *table .Table ) error {
if !t .IsInmemory {
err := s .kv .manifest .addChanges ([]*pb .ManifestChange {
newCreateChange (t .ID (), 0 , t .KeyID (), t .CompressionType ()),
}, s .kv .opt )
if err != nil {
return err
}
}
for !s .levels [0 ].tryAddLevel0Table (t ) {
timeStart := time .Now ()
for s .levels [0 ].numTables () >= s .kv .opt .NumLevelZeroTablesStall {
time .Sleep (10 * time .Millisecond )
}
dur := time .Since (timeStart )
if dur > time .Second {
s .kv .opt .Infof ("L0 was stalled for %s\n" , dur .Round (time .Millisecond ))
}
s .l0stallsMs .Add (int64 (dur .Round (time .Millisecond )))
}
return nil
}
func (s *levelsController ) close () error {
err := s .cleanupLevels ()
return y .Wrap (err , "levelsController.Close" )
}
func (s *levelsController ) get (key []byte , maxVs y .ValueStruct , startLevel int ) (
y .ValueStruct , error ) {
if s .kv .IsClosed () {
return y .ValueStruct {}, ErrDBClosed
}
version := y .ParseTs (key )
for _ , h := range s .levels {
if h .level < startLevel {
continue
}
vs , err := h .get (key )
if err != nil {
return y .ValueStruct {}, y .Wrapf (err , "get key: %q" , key )
}
if vs .Value == nil && vs .Meta == 0 {
continue
}
y .NumBytesReadsLSMAdd (s .kv .opt .MetricsEnabled , int64 (len (vs .Value )))
if vs .Version == version {
return vs , nil
}
if maxVs .Version < vs .Version {
maxVs = vs
}
}
if len (maxVs .Value ) > 0 {
y .NumGetsWithResultsAdd (s .kv .opt .MetricsEnabled , 1 )
}
return maxVs , nil
}
func appendIteratorsReversed(out []y .Iterator , th []*table .Table , opt int ) []y .Iterator {
for i := len (th ) - 1 ; i >= 0 ; i -- {
out = append (out , th [i ].NewIterator (opt ))
}
return out
}
func (s *levelsController ) appendIterators (
iters []y .Iterator , opt *IteratorOptions ) []y .Iterator {
for _ , level := range s .levels {
iters = level .appendIterators (iters , opt )
}
return iters
}
type TableInfo struct {
ID uint64
Level int
Left []byte
Right []byte
KeyCount uint32
OnDiskSize uint32
StaleDataSize uint32
UncompressedSize uint32
MaxVersion uint64
IndexSz int
BloomFilterSize int
}
func (s *levelsController ) getTableInfo () (result []TableInfo ) {
for _ , l := range s .levels {
l .RLock ()
for _ , t := range l .tables {
info := TableInfo {
ID : t .ID (),
Level : l .level ,
Left : t .Smallest (),
Right : t .Biggest (),
KeyCount : t .KeyCount (),
OnDiskSize : t .OnDiskSize (),
StaleDataSize : t .StaleDataSize (),
IndexSz : t .IndexSize (),
BloomFilterSize : t .BloomFilterSize (),
UncompressedSize : t .UncompressedSize (),
MaxVersion : t .MaxVersion (),
}
result = append (result , info )
}
l .RUnlock ()
}
sort .Slice (result , func (i , j int ) bool {
if result [i ].Level != result [j ].Level {
return result [i ].Level < result [j ].Level
}
return result [i ].ID < result [j ].ID
})
return
}
type LevelInfo struct {
Level int
NumTables int
Size int64
TargetSize int64
TargetFileSize int64
IsBaseLevel bool
Score float64
Adjusted float64
StaleDatSize int64
}
func (s *levelsController ) getLevelInfo () []LevelInfo {
t := s .levelTargets ()
prios := s .pickCompactLevels (nil )
result := make ([]LevelInfo , len (s .levels ))
for i , l := range s .levels {
l .RLock ()
result [i ].Level = i
result [i ].Size = l .totalSize
result [i ].NumTables = len (l .tables )
result [i ].StaleDatSize = l .totalStaleSize
l .RUnlock ()
result [i ].TargetSize = t .targetSz [i ]
result [i ].TargetFileSize = t .fileSz [i ]
result [i ].IsBaseLevel = t .baseLevel == i
}
for _ , p := range prios {
result [p .level ].Score = p .score
result [p .level ].Adjusted = p .adjusted
}
return result
}
func (s *levelsController ) verifyChecksum () error {
var tables []*table .Table
for _ , l := range s .levels {
l .RLock ()
tables = tables [:0 ]
for _ , t := range l .tables {
tables = append (tables , t )
t .IncrRef ()
}
l .RUnlock ()
for _ , t := range tables {
errChkVerify := t .VerifyChecksum ()
if err := t .DecrRef (); err != nil {
s .kv .opt .Errorf ("unable to decrease reference of table: %s while " +
"verifying checksum with error: %s" , t .Filename (), err )
}
if errChkVerify != nil {
return errChkVerify
}
}
}
return nil
}
func (s *levelsController ) keySplits (numPerTable int , prefix []byte ) []string {
splits := make ([]string , 0 )
for _ , l := range s .levels {
l .RLock ()
for _ , t := range l .tables {
tableSplits := t .KeySplits (numPerTable , prefix )
splits = append (splits , tableSplits ...)
}
l .RUnlock ()
}
sort .Strings (splits )
return splits
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .