/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package badger

import (
	
	
	
	
	
	
	
	
	
	
	
	
	

	
	

	
	
	
	
)

type levelsController struct {
	nextFileID atomic.Uint64
	l0stallsMs atomic.Int64

	// The following are initialized once and const.
	levels []*levelHandler
	kv     *DB

	cstatus compactStatus
}

// revertToManifest checks that all necessary table files exist and removes all table files not
// referenced by the manifest. idMap is a set of table file id's that were read from the directory
// listing.
func revertToManifest( *DB,  *Manifest,  map[uint64]struct{}) error {
	// 1. Check all files in manifest exist.
	for  := range .Tables {
		if ,  := []; ! {
			return fmt.Errorf("file does not exist for table %d", )
		}
	}

	// 2. Delete files that shouldn't exist.
	for  := range  {
		if ,  := .Tables[]; ! {
			.opt.Debugf("Table file %d not referenced in MANIFEST\n", )
			 := table.NewFilename(, .opt.Dir)
			if  := os.Remove();  != nil {
				return y.Wrapf(, "While removing table %d", )
			}
		}
	}

	return nil
}

func newLevelsController( *DB,  *Manifest) (*levelsController, error) {
	y.AssertTrue(.opt.NumLevelZeroTablesStall > .opt.NumLevelZeroTables)
	 := &levelsController{
		kv:     ,
		levels: make([]*levelHandler, .opt.MaxLevels),
	}
	.cstatus.tables = make(map[uint64]struct{})
	.cstatus.levels = make([]*levelCompactStatus, .opt.MaxLevels)

	for  := 0;  < .opt.MaxLevels; ++ {
		.levels[] = newLevelHandler(, )
		.cstatus.levels[] = new(levelCompactStatus)
	}

	if .opt.InMemory {
		return , nil
	}
	// Compare manifest against directory, check for existent/non-existent files, and remove.
	if  := revertToManifest(, , getIDMap(.opt.Dir));  != nil {
		return nil, 
	}

	var  sync.Mutex
	 := make([][]*table.Table, .opt.MaxLevels)
	var  uint64

	// We found that using 3 goroutines allows disk throughput to be utilized to its max.
	// Disk utilization is the main thing we should focus on, while trying to read the data. That's
	// the one factor that remains constant between HDD and SSD.
	 := y.NewThrottle(3)

	 := time.Now()
	var  atomic.Int32
	 := time.NewTicker(3 * time.Second)
	defer .Stop()

	for ,  := range .Tables {
		 := table.NewFilename(, .opt.Dir)
		select {
		case <-.C:
			.opt.Infof("%d tables out of %d opened in %s\n", .Load(),
				len(.Tables), time.Since().Round(time.Millisecond))
		default:
		}
		if  := .Do();  != nil {
			closeAllTables()
			return nil, 
		}
		if  >  {
			 = 
		}
		go func( string,  TableManifest) {
			var  error
			defer func() {
				.Done()
				.Add(1)
			}()
			,  := .registry.DataKey(.KeyID)
			if  != nil {
				 = y.Wrapf(, "Error while reading datakey")
				return
			}
			 := buildTableOptions()
			// Explicitly set Compression and DataKey based on how the table was generated.
			.Compression = .Compression
			.DataKey = 

			,  := z.OpenMmapFile(, .opt.getFileFlags(), 0)
			if  != nil {
				 = y.Wrapf(, "Opening file: %q", )
				return
			}
			,  := table.OpenTable(, )
			if  != nil {
				if strings.HasPrefix(.Error(), "CHECKSUM_MISMATCH:") {
					.opt.Errorf(.Error())
					.opt.Errorf("Ignoring table %s", .Fd.Name())
					// Do not set rerr. We will continue without this table.
				} else {
					 = y.Wrapf(, "Opening table: %q", )
				}
				return
			}

			.Lock()
			[.Level] = append([.Level], )
			.Unlock()
		}(, )
	}
	if  := .Finish();  != nil {
		closeAllTables()
		return nil, 
	}
	.opt.Infof("All %d tables opened in %s\n", .Load(),
		time.Since().Round(time.Millisecond))
	.nextFileID.Store( + 1)
	for ,  := range  {
		.levels[].initTables()
	}

	// Make sure key ranges do not overlap etc.
	if  := .validate();  != nil {
		_ = .cleanupLevels()
		return nil, y.Wrap(, "Level validation")
	}

	// Sync directory (because we have at least removed some files, or previously created the
	// manifest file).
	if  := syncDir(.opt.Dir);  != nil {
		_ = .close()
		return nil, 
	}

	return , nil
}

// Closes the tables, for cleanup in newLevelsController.  (We Close() instead of using DecrRef()
// because that would delete the underlying files.)  We ignore errors, which is OK because tables
// are read-only.
func closeAllTables( [][]*table.Table) {
	for ,  := range  {
		for ,  := range  {
			_ = .Close(-1)
		}
	}
}

func ( *levelsController) () error {
	var  error
	for ,  := range .levels {
		if  := .close();  != nil &&  == nil {
			 = 
		}
	}
	return 
}

// dropTree picks all tables from all levels, creates a manifest changeset,
// applies it, and then decrements the refs of these tables, which would result
// in their deletion.
func ( *levelsController) () (int, error) {
	// First pick all tables, so we can create a manifest changelog.
	var  []*table.Table
	for ,  := range .levels {
		.RLock()
		 = append(, .tables...)
		.RUnlock()
	}
	if len() == 0 {
		return 0, nil
	}

	// Generate the manifest changes.
	 := []*pb.ManifestChange{}
	for ,  := range  {
		// Add a delete change only if the table is not in memory.
		if !.IsInmemory {
			 = append(, newDeleteChange(.ID()))
		}
	}
	 := pb.ManifestChangeSet{Changes: }
	if  := .kv.manifest.addChanges(.Changes, .kv.opt);  != nil {
		return 0, 
	}

	// Now that manifest has been successfully written, we can delete the tables.
	for ,  := range .levels {
		.Lock()
		.totalSize = 0
		.tables = .tables[:0]
		.Unlock()
	}
	for ,  := range  {
		if  := .DecrRef();  != nil {
			return 0, 
		}
	}
	return len(), nil
}

// dropPrefix runs a L0->L1 compaction, and then runs same level compaction on the rest of the
// levels. For L0->L1 compaction, it runs compactions normally, but skips over
// all the keys with the provided prefix.
// For Li->Li compactions, it picks up the tables which would have the prefix. The
// tables who only have keys with this prefix are quickly dropped. The ones which have other keys
// are run through MergeIterator and compacted to create new tables. All the mechanisms of
// compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow.
func ( *levelsController) ( [][]byte) error {
	 := .kv.opt
	// Iterate levels in the reverse order because if we were to iterate from
	// lower level (say level 0) to a higher level (say level 3) we could have
	// a state in which level 0 is compacted and an older version of a key exists in lower level.
	// At this point, if someone creates an iterator, they would see an old
	// value for a key from lower levels. Iterating in reverse order ensures we
	// drop the oldest data first so that lookups never return stale data.
	for  := len(.levels) - 1;  >= 0; -- {
		 := .levels[]

		.RLock()
		if .level == 0 {
			 := len(.tables)
			.RUnlock()

			if  > 0 {
				 := compactionPriority{
					level: 0,
					score: 1.74,
					// A unique number greater than 1.0 does two things. Helps identify this
					// function in logs, and forces a compaction.
					dropPrefixes: ,
				}
				if  := .doCompact(174, );  != nil {
					.Warningf("While compacting level 0: %v", )
					return nil
				}
			}
			continue
		}

		// Build a list of compaction tableGroups affecting all the prefixes we
		// need to drop. We need to build tableGroups that satisfy the invariant that
		// bottom tables are consecutive.
		// tableGroup contains groups of consecutive tables.
		var  [][]*table.Table
		var  []*table.Table

		 := func() {
			if len() > 0 {
				 = append(, )
				 = nil
			}
		}

		for ,  := range .tables {
			if containsAnyPrefixes(, ) {
				 = append(, )
			} else {
				()
			}
		}
		()

		.RUnlock()

		if len() == 0 {
			continue
		}
		.Infof("Dropping prefix at level %d (%d tableGroups)", .level, len())
		for ,  := range  {
			 := compactDef{
				thisLevel:    ,
				nextLevel:    ,
				top:          nil,
				bot:          ,
				dropPrefixes: ,
				t:            .levelTargets(),
			}
			,  := otel.Tracer("").Start(context.TODO(), "Badger.Compaction")
			.SetAttributes(attribute.Int("Compaction level", .level))
			.SetAttributes(attribute.String("Drop Prefixes", fmt.Sprintf("%v", )))
			.t.baseLevel = .level
			if  := .runCompactDef(-1, .level, );  != nil {
				.Warningf("While running compact def: %+v. Error: %v", , )
				.End()
				return 
			}
			.SetAttributes(
				attribute.Int("Top tables count", len(.top)),
				attribute.Int("Bottom tables count", len(.bot)))
			.End()
		}

	}
	return nil
}

func ( *levelsController) ( *z.Closer) {
	 := .kv.opt.NumCompactors
	.AddRunning( - 1)
	for  := 0;  < ; ++ {
		go .runCompactor(, )
	}
}

type targets struct {
	baseLevel int
	targetSz  []int64
	fileSz    []int64
}

// levelTargets calculates the targets for levels in the LSM tree. The idea comes from Dynamic Level
// Sizes ( https://rocksdb.org/blog/2015/07/23/dynamic-level.html ) in RocksDB. The sizes of levels
// are calculated based on the size of the lowest level, typically L6. So, if L6 size is 1GB, then
// L5 target size is 100MB, L4 target size is 10MB and so on.
//
// L0 files don't automatically go to L1. Instead, they get compacted to Lbase, where Lbase is
// chosen based on the first level which is non-empty from top (check L1 through L6). For an empty
// DB, that would be L6.  So, L0 compactions go to L6, then L5, L4 and so on.
//
// Lbase is advanced to the upper levels when its target size exceeds BaseLevelSize. For
// example, when L6 reaches 1.1GB, then L4 target sizes becomes 11MB, thus exceeding the
// BaseLevelSize of 10MB. L3 would then become the new Lbase, with a target size of 1MB <
// BaseLevelSize.
func ( *levelsController) () targets {
	 := func( int64) int64 {
		if  < .kv.opt.BaseLevelSize {
			return .kv.opt.BaseLevelSize
		}
		return 
	}

	 := targets{
		targetSz: make([]int64, len(.levels)),
		fileSz:   make([]int64, len(.levels)),
	}
	// DB size is the size of the last level.
	 := .lastLevel().getTotalSize()
	for  := len(.levels) - 1;  > 0; -- {
		 := ()
		.targetSz[] = 
		if .baseLevel == 0 &&  <= .kv.opt.BaseLevelSize {
			.baseLevel = 
		}
		 /= int64(.kv.opt.LevelSizeMultiplier)
	}

	 := .kv.opt.BaseTableSize
	for  := 0;  < len(.levels); ++ {
		if  == 0 {
			// Use MemTableSize for Level 0. Because at Level 0, we stop compactions based on the
			// number of tables, not the size of the level. So, having a 1:1 size ratio between
			// memtable size and the size of L0 files is better than churning out 32 files per
			// memtable (assuming 64MB MemTableSize and 2MB BaseTableSize).
			.fileSz[] = .kv.opt.MemTableSize
		} else if  <= .baseLevel {
			.fileSz[] = 
		} else {
			 *= int64(.kv.opt.TableSizeMultiplier)
			.fileSz[] = 
		}
	}

	// Bring the base level down to the last empty level.
	for  := .baseLevel + 1;  < len(.levels)-1; ++ {
		if .levels[].getTotalSize() > 0 {
			break
		}
		.baseLevel = 
	}

	// If the base level is empty and the next level size is less than the
	// target size, pick the next level as the base level.
	 := .baseLevel
	 := .levels
	if  < len()-1 && [].getTotalSize() == 0 && [+1].getTotalSize() < .targetSz[+1] {
		.baseLevel++
	}
	return 
}

func ( *levelsController) ( int,  *z.Closer) {
	defer .Done()

	 := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
	select {
	case <-.C:
	case <-.HasBeenClosed():
		.Stop()
		return
	}

	 := func( []compactionPriority) []compactionPriority {
		 := -1
		for ,  := range  {
			if .level == 0 {
				 = 
				break
			}
		}
		// If idx == -1, we didn't find L0.
		// If idx == 0, then we don't need to do anything. L0 is already at the front.
		if  > 0 {
			 := append([]compactionPriority{}, [])
			 = append(, [:]...)
			 = append(, [+1:]...)
			return 
		}
		return 
	}

	 := func( compactionPriority) bool {
		 := .doCompact(, )
		switch  {
		case nil:
			return true
		case errFillTables:
			// pass
		default:
			.kv.opt.Warningf("While running doCompact: %v\n", )
		}
		return false
	}

	var  []compactionPriority
	 := func() bool {
		 := .pickCompactLevels()
		defer func() {
			 = 
		}()
		if  == 0 {
			// Worker ID zero prefers to compact L0 always.
			 = ()
		}
		for ,  := range  {
			if  == 0 && .level == 0 {
				// Allow worker zero to run level 0, irrespective of its adjusted score.
			} else if .adjusted < 1.0 {
				break
			}
			if () {
				return true
			}
		}

		return false
	}

	 := func() {
		 := compactionPriority{
			level: .lastLevel().level,
			t:     .levelTargets(),
		}
		()

	}
	 := 0
	 := time.NewTicker(50 * time.Millisecond)
	defer .Stop()
	for {
		select {
		// Can add a done channel or other stuff.
		case <-.C:
			++
			// Each ticker is 50ms so 50*200=10seconds.
			if .kv.opt.LmaxCompaction &&  == 2 &&  >= 200 {
				()
				 = 0
			} else {
				()
			}
		case <-.HasBeenClosed():
			return
		}
	}
}

type compactionPriority struct {
	level        int
	score        float64
	adjusted     float64
	dropPrefixes [][]byte
	t            targets
}

func ( *levelsController) () *levelHandler {
	return .levels[len(.levels)-1]
}

// pickCompactLevel determines which level to compact.
// Based on: https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
// It tries to reuse priosBuffer to reduce memory allocation,
// passing nil is acceptable, then new memory will be allocated.
func ( *levelsController) ( []compactionPriority) ( []compactionPriority) {
	 := .levelTargets()
	 := func( int,  float64) {
		 := compactionPriority{
			level:    ,
			score:    ,
			adjusted: ,
			t:        ,
		}
		 = append(, )
	}

	// Grow buffer to fit all levels.
	if cap() < len(.levels) {
		 = make([]compactionPriority, 0, len(.levels))
	}
	 = [:0]

	// Add L0 priority based on the number of tables.
	(0, float64(.levels[0].numTables())/float64(.kv.opt.NumLevelZeroTables))

	// All other levels use size to calculate priority.
	for  := 1;  < len(.levels); ++ {
		// Don't consider those tables that are already being compacted right now.
		 := .cstatus.delSize()

		 := .levels[]
		 := .getTotalSize() - 
		(, float64()/float64(.targetSz[]))
	}
	y.AssertTrue(len() == len(.levels))

	// The following code is borrowed from PebbleDB and results in healthier LSM tree structure.
	// If Li-1 has score > 1.0, then we'll divide Li-1 score by Li. If Li score is >= 1.0, then Li-1
	// score is reduced, which means we'll prioritize the compaction of lower levels (L5, L4 and so
	// on) over the higher levels (L0, L1 and so on). On the other hand, if Li score is < 1.0, then
	// we'll increase the priority of Li-1.
	// Overall what this means is, if the bottom level is already overflowing, then de-prioritize
	// compaction of the above level. If the bottom level is not full, then increase the priority of
	// above level.
	var  int
	for  := .baseLevel;  < len(.levels); ++ {
		if [].adjusted >= 1 {
			// Avoid absurdly large scores by placing a floor on the score that we'll
			// adjust a level by. The value of 0.01 was chosen somewhat arbitrarily
			const  = 0.01
			if [].score >=  {
				[].adjusted /= [].adjusted
			} else {
				[].adjusted /= 
			}
		}
		 = 
	}

	// Pick all the levels whose original score is >= 1.0, irrespective of their adjusted score.
	// We'll still sort them by their adjusted score below. Having both these scores allows us to
	// make better decisions about compacting L0. If we see a score >= 1.0, we can do L0->L0
	// compactions. If the adjusted score >= 1.0, then we can do L0->Lbase compactions.
	 := [:0]
	for ,  := range [:len()-1] {
		if .score >= 1.0 {
			 = append(, )
		}
	}
	 = 

	// Sort by the adjusted score.
	sort.Slice(, func(,  int) bool {
		return [].adjusted > [].adjusted
	})
	return 
}

// checkOverlap checks if the given tables overlap with any level from the given "lev" onwards.
func ( *levelsController) ( []*table.Table,  int) bool {
	 := getKeyRange(...)
	for ,  := range .levels {
		if  <  { // Skip upper levels.
			continue
		}
		.RLock()
		,  := .overlappingTables(levelHandlerRLocked{}, )
		.RUnlock()
		if - > 0 {
			return true
		}
	}
	return false
}

// subcompact runs a single sub-compaction, iterating over the specified key-range only.
//
// We use splits to do a single compaction concurrently. If we have >= 3 tables
// involved in the bottom level during compaction, we choose key ranges to
// split the main compaction up into sub-compactions. Each sub-compaction runs
// concurrently, only iterating over the provided key range, generating tables.
// This speeds up the compaction significantly.
func ( *levelsController) ( y.Iterator,  keyRange,  compactDef,
	 *y.Throttle,  chan<- *table.Table) {

	// Check overlap of the top level with the levels which are not being
	// compacted in this compaction.
	 := .checkOverlap(.allTables(), .nextLevel.level+1)

	// Pick a discard ts, so we can discard versions below this ts. We should
	// never discard any versions starting from above this timestamp, because
	// that would affect the snapshot view guarantee provided by transactions.
	 := .kv.orc.discardAtOrBelow()

	// Try to collect stats so that we can inform value log about GC. That would help us find which
	// value log file should be GCed.
	 := make(map[uint32]int64)
	 := func( y.ValueStruct) {
		// We don't need to store/update discard stats when badger is running in Disk-less mode.
		if .kv.opt.InMemory {
			return
		}
		if .Meta&bitValuePointer > 0 {
			var  valuePointer
			.Decode(.Value)
			[.Fid] += int64(.Len)
		}
	}

	// exceedsAllowedOverlap returns true if the given key range would overlap with more than 10
	// tables from level below nextLevel (nextLevel+1). This helps avoid generating tables at Li
	// with huge overlaps with Li+1.
	 := func( keyRange) bool {
		 := .nextLevel.level + 1
		if  <= 1 ||  >= len(.levels) {
			return false
		}
		 := .levels[]
		.RLock()
		defer .RUnlock()

		,  := .overlappingTables(levelHandlerRLocked{}, )
		return - >= 10
	}

	var (
		,        []byte
		,  int
		// Denotes if the first key is a series of duplicate keys had
		// "DiscardEarlierVersions" set
		 bool
	)

	 := func( *table.Builder) {
		 := time.Now()
		var ,  uint64
		var  int
		var  keyRange
		for ; .Valid(); .Next() {
			// See if we need to skip the prefix.
			if len(.dropPrefixes) > 0 && hasAnyPrefixes(.Key(), .dropPrefixes) {
				++
				(.Value())
				continue
			}

			// See if we need to skip this key.
			if len() > 0 {
				if y.SameKey(.Key(), ) {
					++
					(.Value())
					continue
				} else {
					 = [:0]
				}
			}

			if !y.SameKey(.Key(), ) {
				 = false
				if len(.right) > 0 && y.CompareKeys(.Key(), .right) >= 0 {
					break
				}
				if .ReachedCapacity() {
					// Only break if we are on a different key, and have reached capacity. We want
					// to ensure that all versions of the key are stored in the same sstable, and
					// not divided across multiple tables at the same level.
					break
				}
				 = y.SafeCopy(, .Key())
				 = 0
				 = .Value().Meta&bitDiscardEarlierVersions > 0

				if len(.left) == 0 {
					.left = y.SafeCopy(.left, .Key())
				}
				.right = 

				++
				if %5000 == 0 {
					// This table's range exceeds the allowed range overlap with the level after
					// next. So, we stop writing to this table. If we don't do this, then we end up
					// doing very expensive compactions involving too many tables. To amortize the
					// cost of this check, we do it only every N keys.
					if () {
						// s.kv.opt.Debugf("L%d -> L%d Breaking due to exceedsAllowedOverlap with
						// kr: %s\n", cd.thisLevel.level, cd.nextLevel.level, tableKr)
						break
					}
				}
			}

			 := .Value()
			 := y.ParseTs(.Key())

			 := isDeletedOrExpired(.Meta, .ExpiresAt)

			// Do not discard entries inserted by merge operator. These entries will be
			// discarded once they're merged
			if  <=  && .Meta&bitMergeEntry == 0 {
				// Keep track of the number of versions encountered for this key. Only consider the
				// versions which are below the minReadTs, otherwise, we might end up discarding the
				// only valid version for a running transaction.
				++
				// Keep the current version and discard all the next versions if
				// - The `discardEarlierVersions` bit is set OR
				// - We've already processed `NumVersionsToKeep` number of versions
				// (including the current item being processed)
				 := .Meta&bitDiscardEarlierVersions > 0 ||
					 == .kv.opt.NumVersionsToKeep

				if  ||  {
					// If this version of the key is deleted or expired, skip all the rest of the
					// versions. Ensure that we're only removing versions below readTs.
					 = y.SafeCopy(, .Key())

					switch {
					// Add the key to the table only if it has not expired.
					// We don't want to add the deleted/expired keys.
					case ! && :
						// Add this key. We have set skipKey, so the following key versions
						// would be skipped.
					case :
						// If this key range has overlap with lower levels, then keep the deletion
						// marker with the latest version, discarding the rest. We have set skipKey,
						// so the following key versions would be skipped.
					default:
						// If no overlap, we can skip all the versions, by continuing here.
						++
						()
						continue // Skip adding this key.
					}
				}
			}
			++
			var  valuePointer
			if .Meta&bitValuePointer > 0 {
				.Decode(.Value)
			}
			switch {
			case :
				// This key is same as the last key which had "DiscardEarlierVersions" set. The
				// the next compactions will drop this key if its ts >
				// discardTs (of the next compaction).
				.AddStaleKey(.Key(), , .Len)
			case :
				// If the key is expired, the next compaction will drop it if
				// its ts > discardTs (of the next compaction).
				.AddStaleKey(.Key(), , .Len)
			default:
				.Add(.Key(), , .Len)
			}
		}
		.kv.opt.Debugf("[%d] LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
			.compactorId, , , time.Since().Round(time.Millisecond))
	} // End of function: addKeys

	if len(.left) > 0 {
		.Seek(.left)
	} else {
		.Rewind()
	}
	for .Valid() {
		if len(.right) > 0 && y.CompareKeys(.Key(), .right) >= 0 {
			break
		}

		 := buildTableOptions(.kv)
		// Set TableSize to the target file size for that level.
		.TableSize = uint64(.t.fileSz[.nextLevel.level])
		 := table.NewTableBuilder()

		// This would do the iteration and add keys to builder.
		()

		// It was true that it.Valid() at least once in the loop above, which means we
		// called Add() at least once, and builder is not Empty().
		if .Empty() {
			// Cleanup builder resources:
			.Finish()
			.Close()
			continue
		}
		++
		if  := .Do();  != nil {
			// Can't return from here, until I decrRef all the tables that I built so far.
			break
		}
		go func( *table.Builder,  uint64) {
			var  error
			defer .Done()
			defer .Close()

			var  *table.Table
			if .kv.opt.InMemory {
				,  = table.OpenInMemoryTable(.Finish(), , &)
			} else {
				 := table.NewFilename(, .kv.opt.Dir)
				,  = table.CreateTable(, )
			}

			// If we couldn't build the table, return fast.
			if  != nil {
				return
			}
			 <- 
		}(, .reserveFileID())
	}
	.kv.vlog.updateDiscardStats()
	.kv.opt.Debugf("Discard stats: %v", )
}

// compactBuildTables merges topTables and botTables to form a list of new tables.
func ( *levelsController) (
	 int,  compactDef) ([]*table.Table, func() error, error) {

	 := .top
	 := .bot

	 := int64(len() + len())
	y.NumCompactionTablesAdd(.kv.opt.MetricsEnabled, )
	defer y.NumCompactionTablesAdd(.kv.opt.MetricsEnabled, -)

	 := func( *table.Table) bool {
		for ,  := range .dropPrefixes {
			if bytes.HasPrefix(.Smallest(), ) &&
				bytes.HasPrefix(.Biggest(), ) {
				// All the keys in this table have the dropPrefix. So, this
				// table does not need to be in the iterator and can be
				// dropped immediately.
				return false
			}
		}
		return true
	}
	var  []*table.Table
	for ,  := range  {
		if () {
			 = append(, )
		}
	}

	 := func() []y.Iterator {
		// Create iterators across all the tables involved first.
		var  []y.Iterator
		switch {
		case  == 0:
			 = appendIteratorsReversed(, , table.NOCACHE)
		case len() > 0:
			y.AssertTrue(len() == 1)
			 = []y.Iterator{[0].NewIterator(table.NOCACHE)}
		}
		// Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
		return append(, table.NewConcatIterator(, table.NOCACHE))
	}

	 := make(chan *table.Table, 3)
	 := y.NewThrottle(8 + len(.splits))
	for ,  := range .splits {
		// Initiate Do here so we can register the goroutines for buildTables too.
		if  := .Do();  != nil {
			.kv.opt.Errorf("cannot start subcompaction: %+v", )
			return nil, nil, 
		}
		go func( keyRange) {
			defer .Done(nil)
			 := table.NewMergeIterator((), false)
			defer .Close()
			.subcompact(, , , , )
		}()
	}

	var  []*table.Table
	var  sync.WaitGroup
	.Add(1)
	go func() {
		defer .Done()
		for  := range  {
			 = append(, )
		}
	}()

	// Wait for all table builders to finish and also for newTables accumulator to finish.
	 := .Finish()
	close()
	.Wait() // Wait for all tables to be picked up.

	if  == nil {
		// Ensure created files' directory entries are visible.  We don't mind the extra latency
		// from not doing this ASAP after all file creation has finished because this is a
		// background operation.
		 = .kv.syncDir(.kv.opt.Dir)
	}

	if  != nil {
		// An error happened.  Delete all the newly created table files (by calling DecrRef
		// -- we're the only holders of a ref).
		_ = decrRefs()
		return nil, nil, y.Wrapf(, "while running compactions for: %+v", )
	}

	sort.Slice(, func(,  int) bool {
		return y.CompareKeys([].Biggest(), [].Biggest()) < 0
	})
	return , func() error { return decrRefs() }, nil
}

func buildChangeSet( *compactDef,  []*table.Table) pb.ManifestChangeSet {
	 := []*pb.ManifestChange{}
	for ,  := range  {
		 = append(,
			newCreateChange(.ID(), .nextLevel.level, .KeyID(), .CompressionType()))
	}
	for ,  := range .top {
		// Add a delete change only if the table is not in memory.
		if !.IsInmemory {
			 = append(, newDeleteChange(.ID()))
		}
	}
	for ,  := range .bot {
		 = append(, newDeleteChange(.ID()))
	}
	return pb.ManifestChangeSet{Changes: }
}

func hasAnyPrefixes( []byte,  [][]byte) bool {
	for ,  := range  {
		if bytes.HasPrefix(, ) {
			return true
		}
	}

	return false
}

func containsPrefix( *table.Table,  []byte) bool {
	 := .Smallest()
	 := .Biggest()
	if bytes.HasPrefix(, ) {
		return true
	}
	if bytes.HasPrefix(, ) {
		return true
	}
	 := func() bool {
		 := .NewIterator(0)
		defer .Close()
		// In table iterator's Seek, we assume that key has version in last 8 bytes. We set
		// version=0 (ts=math.MaxUint64), so that we don't skip the key prefixed with prefix.
		.Seek(y.KeyWithTs(, math.MaxUint64))
		return bytes.HasPrefix(.Key(), )
	}

	if bytes.Compare(, ) > 0 &&
		bytes.Compare(, ) < 0 {
		// There may be a case when table contains [0x0000,...., 0xffff]. If we are searching for
		// k=0x0011, we should not directly infer that k is present. It may not be present.
		return ()
	}

	return false
}

func containsAnyPrefixes( *table.Table,  [][]byte) bool {
	for ,  := range  {
		if containsPrefix(, ) {
			return true
		}
	}

	return false
}

type compactDef struct {
	compactorId int
	t           targets
	p           compactionPriority
	thisLevel   *levelHandler
	nextLevel   *levelHandler

	top []*table.Table
	bot []*table.Table

	thisRange keyRange
	nextRange keyRange
	splits    []keyRange

	thisSize int64

	dropPrefixes [][]byte
}

// addSplits can allow us to run multiple sub-compactions in parallel across the split key ranges.
func ( *levelsController) ( *compactDef) {
	.splits = .splits[:0]

	// Let's say we have 10 tables in cd.bot and min width = 3. Then, we'll pick
	// 0, 1, 2 (pick), 3, 4, 5 (pick), 6, 7, 8 (pick), 9 (pick, because last table).
	// This gives us 4 picks for 10 tables.
	// In an edge case, 142 tables in bottom led to 48 splits. That's too many splits, because it
	// then uses up a lot of memory for table builder.
	// We should keep it so we have at max 5 splits.
	 := int(math.Ceil(float64(len(.bot)) / 5.0))
	if  < 3 {
		 = 3
	}
	 := .thisRange
	.extend(.nextRange)

	 := func( []byte) {
		.right = y.Copy()
		.splits = append(.splits, )

		.left = .right
	}

	for ,  := range .bot {
		// last entry in bottom table.
		if  == len(.bot)-1 {
			([]byte{})
			return
		}
		if % == -1 {
			// Right is assigned ts=0. The encoding ts bytes take MaxUint64-ts,
			// so, those with smaller TS will be considered larger for the same key.
			// Consider the following.
			// Top table is [A1...C3(deleted)]
			// bot table is [B1....C2]
			// It will generate a split [A1 ... C0], including any records of Key C.
			 := y.KeyWithTs(y.ParseKey(.Biggest()), 0)
			()
		}
	}
}

func ( *compactDef) () {
	.thisLevel.RLock()
	.nextLevel.RLock()
}

func ( *compactDef) () {
	.nextLevel.RUnlock()
	.thisLevel.RUnlock()
}

func ( *compactDef) () []*table.Table {
	 := make([]*table.Table, 0, len(.top)+len(.bot))
	 = append(, .top...)
	 = append(, .bot...)
	return 
}

func ( *levelsController) ( *compactDef) bool {
	if .compactorId != 0 {
		// Only compactor zero can work on this.
		return false
	}

	.nextLevel = .levels[0]
	.nextRange = keyRange{}
	.bot = nil

	// Because this level and next level are both level 0, we should NOT acquire
	// the read lock twice, because it can result in a deadlock. So, we don't
	// call compactDef.lockLevels, instead locking the level only once and
	// directly here.
	//
	// As per godocs on RWMutex:
	// If a goroutine holds a RWMutex for reading and another goroutine might
	// call Lock, no goroutine should expect to be able to acquire a read lock
	// until the initial read lock is released. In particular, this prohibits
	// recursive read locking. This is to ensure that the lock eventually
	// becomes available; a blocked Lock call excludes new readers from
	// acquiring the lock.
	y.AssertTrue(.thisLevel.level == 0)
	y.AssertTrue(.nextLevel.level == 0)
	.levels[0].RLock()
	defer .levels[0].RUnlock()

	.cstatus.Lock()
	defer .cstatus.Unlock()

	 := .thisLevel.tables
	var  []*table.Table
	 := time.Now()
	for ,  := range  {
		if .Size() >= 2*.t.fileSz[0] {
			// This file is already big, don't include it.
			continue
		}
		if .Sub(.CreatedAt) < 10*time.Second {
			// Just created it 10s ago. Don't pick for compaction.
			continue
		}
		if ,  := .cstatus.tables[.ID()];  {
			continue
		}
		 = append(, )
	}

	if len() < 4 {
		// If we don't have enough tables to merge in L0, don't do it.
		return false
	}
	.thisRange = infRange
	.top = 

	// Avoid any other L0 -> Lbase from happening, while this is going on.
	 := .cstatus.levels[.thisLevel.level]
	.ranges = append(.ranges, infRange)
	for ,  := range  {
		.cstatus.tables[.ID()] = struct{}{}
	}

	// For L0->L0 compaction, we set the target file size to max, so the output is always one file.
	// This significantly decreases the L0 table stalls and improves the performance.
	.t.fileSz[0] = math.MaxUint32
	return true
}

func ( *levelsController) ( *compactDef) bool {
	if .nextLevel.level == 0 {
		panic("Base level can't be zero.")
	}
	// We keep cd.p.adjusted > 0.0 here to allow functions in db.go to artificially trigger
	// L0->Lbase compactions. Those functions wouldn't be setting the adjusted score.
	if .p.adjusted > 0.0 && .p.adjusted < 1.0 {
		// Do not compact to Lbase if adjusted score is less than 1.0.
		return false
	}
	.lockLevels()
	defer .unlockLevels()

	 := .thisLevel.tables
	if len() == 0 {
		return false
	}

	var  []*table.Table
	if len(.dropPrefixes) > 0 {
		// Use all tables if drop prefix is set. We don't want to compact only a
		// sub-range. We want to compact all the tables.
		 = 

	} else {
		var  keyRange
		// cd.top[0] is the oldest file. So we start from the oldest file first.
		for ,  := range  {
			 := getKeyRange()
			if .overlapsWith() {
				 = append(, )
				.extend()
			} else {
				break
			}
		}
	}
	.thisRange = getKeyRange(...)
	.top = 

	,  := .nextLevel.overlappingTables(levelHandlerRLocked{}, .thisRange)
	.bot = make([]*table.Table, -)
	copy(.bot, .nextLevel.tables[:])

	if len(.bot) == 0 {
		.nextRange = .thisRange
	} else {
		.nextRange = getKeyRange(.bot...)
	}
	return .cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *)
}

// fillTablesL0 would try to fill tables from L0 to be compacted with Lbase. If
// it can not do that, it would try to compact tables from L0 -> L0.
//
// Say L0 has 10 tables.
// fillTablesL0ToLbase picks up 5 tables to compact from L0 -> L5.
// Next call to fillTablesL0 would run L0ToLbase again, which fails this time.
// So, instead, we run fillTablesL0ToL0, which picks up rest of the 5 tables to
// be compacted within L0. Additionally, it would set the compaction range in
// cstatus to inf, so no other L0 -> Lbase compactions can happen.
// Thus, L0 -> L0 must finish for the next L0 -> Lbase to begin.
func ( *levelsController) ( *compactDef) bool {
	if  := .fillTablesL0ToLbase();  {
		return true
	}
	return .fillTablesL0ToL0()
}

// sortByStaleData sorts tables based on the amount of stale data they have.
// This is useful in removing tombstones.
func ( *levelsController) ( []*table.Table,  *compactDef) {
	if len() == 0 || .nextLevel == nil {
		return
	}

	sort.Slice(, func(,  int) bool {
		return [].StaleDataSize() > [].StaleDataSize()
	})
}

// sortByHeuristic sorts tables in increasing order of MaxVersion, so we
// compact older tables first.
func ( *levelsController) ( []*table.Table,  *compactDef) {
	if len() == 0 || .nextLevel == nil {
		return
	}

	// Sort tables by max version. This is what RocksDB does.
	sort.Slice(, func(,  int) bool {
		return [].MaxVersion() < [].MaxVersion()
	})
}

// This function should be called with lock on levels.
func ( *levelsController) ( []*table.Table,  *compactDef) bool {
	 := make([]*table.Table, len())
	copy(, )
	.sortByStaleDataSize(, )

	if len() > 0 && [0].StaleDataSize() == 0 {
		// This is a maxLevel to maxLevel compaction and we don't have any stale data.
		return false
	}
	.bot = []*table.Table{}
	 := func( *table.Table,  int64) {
		 := .Size()

		 := sort.Search(len(), func( int) bool {
			return y.CompareKeys([].Smallest(), .Smallest()) >= 0
		})
		y.AssertTrue([].ID() == .ID())
		++
		// Collect tables until we reach the the required size.
		for  < len() {
			 := []
			 += .Size()

			if  >=  {
				break
			}
			.bot = append(.bot, )
			.nextRange.extend(getKeyRange())
			++
		}
	}
	 := time.Now()
	for ,  := range  {
		// If the maxVersion is above the discardTs, we won't clean anything in
		// the compaction. So skip this table.
		if .MaxVersion() > .kv.orc.discardAtOrBelow() {
			continue
		}
		if .Sub(.CreatedAt) < time.Hour {
			// Just created it an hour ago. Don't pick for compaction.
			continue
		}
		// If the stale data size is less than 10 MB, it might not be worth
		// rewriting the table. Skip it.
		if .StaleDataSize() < 10<<20 {
			continue
		}

		.thisSize = .Size()
		.thisRange = getKeyRange()
		// Set the next range as the same as the current range. If we don't do
		// this, we won't be able to run more than one max level compactions.
		.nextRange = .thisRange
		// If we're already compacting this range, don't do anything.
		if .cstatus.overlapsWith(.thisLevel.level, .thisRange) {
			continue
		}

		// Found a valid table!
		.top = []*table.Table{}

		 := .t.fileSz[.thisLevel.level]
		// The table size is what we want so no need to collect more tables.
		if .Size() >=  {
			break
		}
		// TableSize is less than what we want. Collect more tables for compaction.
		// If the level has multiple small tables, we collect all of them
		// together to form a bigger table.
		(, )
		if !.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *) {
			.bot = .bot[:0]
			.nextRange = keyRange{}
			continue
		}
		return true
	}
	if len(.top) == 0 {
		return false
	}

	return .cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *)
}

func ( *levelsController) ( *compactDef) bool {
	.lockLevels()
	defer .unlockLevels()

	 := make([]*table.Table, len(.thisLevel.tables))
	copy(, .thisLevel.tables)
	if len() == 0 {
		return false
	}
	// We're doing a maxLevel to maxLevel compaction. Pick tables based on the stale data size.
	if .thisLevel.isLastLevel() {
		return .fillMaxLevelTables(, )
	}
	// We pick tables, so we compact older tables first. This is similar to
	// kOldestLargestSeqFirst in RocksDB.
	.sortByHeuristic(, )

	for ,  := range  {
		.thisSize = .Size()
		.thisRange = getKeyRange()
		// If we're already compacting this range, don't do anything.
		if .cstatus.overlapsWith(.thisLevel.level, .thisRange) {
			continue
		}
		.top = []*table.Table{}
		,  := .nextLevel.overlappingTables(levelHandlerRLocked{}, .thisRange)

		.bot = make([]*table.Table, -)
		copy(.bot, .nextLevel.tables[:])

		if len(.bot) == 0 {
			.bot = []*table.Table{}
			.nextRange = .thisRange
			if !.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *) {
				continue
			}
			return true
		}
		.nextRange = getKeyRange(.bot...)

		if .cstatus.overlapsWith(.nextLevel.level, .nextRange) {
			continue
		}
		if !.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *) {
			continue
		}
		return true
	}
	return false
}

func ( *levelsController) (,  int,  compactDef) ( error) {
	if len(.t.fileSz) == 0 {
		return errors.New("Filesizes cannot be zero. Targets are not set")
	}
	 := time.Now()

	 := .thisLevel
	 := .nextLevel

	y.AssertTrue(len(.splits) == 0)
	if .level == .level {
		// don't do anything for L0 -> L0 and Lmax -> Lmax.
	} else {
		.addSplits(&)
	}
	if len(.splits) == 0 {
		.splits = append(.splits, keyRange{})
	}

	// Table should never be moved directly between levels,
	// always be rewritten to allow discarding invalid versions.

	, ,  := .compactBuildTables(, )
	if  != nil {
		return 
	}
	defer func() {
		// Only assign to err, if it's not already nil.
		if  := ();  == nil {
			 = 
		}
	}()
	 := buildChangeSet(&, )

	// We write to the manifest _before_ we delete files (and after we created files)
	if  := .kv.manifest.addChanges(.Changes, .kv.opt);  != nil {
		return 
	}

	 := func( []*table.Table) int64 {
		 := int64(0)
		for ,  := range  {
			 += .Size()
		}
		return 
	}

	 := int64(0)
	 := int64(0)
	if .kv.opt.MetricsEnabled {
		 = ()
		 = (.bot) + (.top)
		y.NumBytesCompactionWrittenAdd(.kv.opt.MetricsEnabled, .strLevel, )
	}

	// See comment earlier in this function about the ordering of these ops, and the order in which
	// we access levels when reading.
	if  := .replaceTables(.bot, );  != nil {
		return 
	}
	if  := .deleteTables(.top);  != nil {
		return 
	}

	// Note: For level 0, while doCompact is running, it is possible that new tables are added.
	// However, the tables are added only to the end, so it is ok to just delete the first table.

	 := append(tablesToString(.top), tablesToString(.bot)...)
	 := tablesToString()
	if  := time.Since();  > 2*time.Second {
		var  string
		if  > time.Second {
			 = " [E]"
		}
		.kv.opt.Infof("[%d]%s LOG Compact %d->%d (%d, %d -> %d tables with %d splits)."+
			" [%s] -> [%s], took %v\n, deleted %d bytes",
			, , .level, .level, len(.top), len(.bot),
			len(), len(.splits), strings.Join(, " "), strings.Join(, " "),
			.Round(time.Millisecond), -)
	}

	if .thisLevel.level != 0 && len() > 2*.kv.opt.LevelSizeMultiplier {
		.kv.opt.Infof("This Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
			len(.top), hex.Dump(.thisRange.left), hex.Dump(.thisRange.right))
		.kv.opt.Infof("Next Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
			len(.bot), hex.Dump(.nextRange.left), hex.Dump(.nextRange.right))
	}
	return nil
}

func tablesToString( []*table.Table) []string {
	var  []string
	for ,  := range  {
		 = append(, fmt.Sprintf("%05d", .ID()))
	}
	 = append(, ".")
	return 
}

var errFillTables = errors.New("Unable to fill tables")

// doCompact picks some table on level l and compacts it away to the next level.
func ( *levelsController) ( int,  compactionPriority) error {
	 := .level
	y.AssertTrue( < .kv.opt.MaxLevels) // Sanity check.
	if .t.baseLevel == 0 {
		.t = .levelTargets()
	}

	,  := otel.Tracer("").Start(context.TODO(), "Badger.Compaction")
	defer .End()

	 := compactDef{
		compactorId:  ,
		p:            ,
		t:            .t,
		thisLevel:    .levels[],
		dropPrefixes: .dropPrefixes,
	}

	// While picking tables to be compacted, both levels' tables are expected to
	// remain unchanged.
	if  == 0 {
		.nextLevel = .levels[.t.baseLevel]
		if !.fillTablesL0(&) {
			return errFillTables
		}
	} else {
		.nextLevel = .thisLevel
		// We're not compacting the last level so pick the next level.
		if !.thisLevel.isLastLevel() {
			.nextLevel = .levels[+1]
		}
		if !.fillTables(&) {
			return errFillTables
		}
	}
	defer .cstatus.delete() // Remove the ranges from compaction status.

	.SetAttributes(attribute.String("Compaction", fmt.Sprintf("%+v", )))
	if  := .runCompactDef(, , );  != nil {
		// This compaction couldn't be done successfully.
		.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", , , )
		return 
	}

	.SetAttributes(
		attribute.Int("Top tables count", len(.top)),
		attribute.Int("Bottom tables count", len(.bot)))

	.kv.opt.Debugf("[Compactor: %d] Compaction for level: %d DONE", , .thisLevel.level)
	return nil
}

func ( *levelsController) ( *table.Table) error {
	// Add table to manifest file only if it is not opened in memory. We don't want to add a table
	// to the manifest file if it exists only in memory.
	if !.IsInmemory {
		// We update the manifest _before_ the table becomes part of a levelHandler, because at that
		// point it could get used in some compaction.  This ensures the manifest file gets updated in
		// the proper order. (That means this update happens before that of some compaction which
		// deletes the table.)
		 := .kv.manifest.addChanges([]*pb.ManifestChange{
			newCreateChange(.ID(), 0, .KeyID(), .CompressionType()),
		}, .kv.opt)
		if  != nil {
			return 
		}
	}

	for !.levels[0].tryAddLevel0Table() {
		// Before we uninstall, we need to make sure that level 0 is healthy.
		 := time.Now()
		for .levels[0].numTables() >= .kv.opt.NumLevelZeroTablesStall {
			time.Sleep(10 * time.Millisecond)
		}
		 := time.Since()
		if  > time.Second {
			.kv.opt.Infof("L0 was stalled for %s\n", .Round(time.Millisecond))
		}
		.l0stallsMs.Add(int64(.Round(time.Millisecond)))
	}

	return nil
}

func ( *levelsController) () error {
	 := .cleanupLevels()
	return y.Wrap(, "levelsController.Close")
}

// get searches for a given key in all the levels of the LSM tree. It returns
// key version <= the expected version (version in key). If not found,
// it returns an empty y.ValueStruct.
func ( *levelsController) ( []byte,  y.ValueStruct,  int) (
	y.ValueStruct, error) {
	if .kv.IsClosed() {
		return y.ValueStruct{}, ErrDBClosed
	}
	// It's important that we iterate the levels from 0 on upward. The reason is, if we iterated
	// in opposite order, or in parallel (naively calling all the h.RLock() in some order) we could
	// read level L's tables post-compaction and level L+1's tables pre-compaction. (If we do
	// parallelize this, we will need to call the h.RLock() function by increasing order of level
	// number.)
	 := y.ParseTs()
	for ,  := range .levels {
		// Ignore all levels below startLevel. This is useful for GC when L0 is kept in memory.
		if .level <  {
			continue
		}
		,  := .get() // Calls h.RLock() and h.RUnlock().
		if  != nil {
			return y.ValueStruct{}, y.Wrapf(, "get key: %q", )
		}
		if .Value == nil && .Meta == 0 {
			continue
		}
		y.NumBytesReadsLSMAdd(.kv.opt.MetricsEnabled, int64(len(.Value)))
		if .Version ==  {
			return , nil
		}
		if .Version < .Version {
			 = 
		}
	}
	if len(.Value) > 0 {
		y.NumGetsWithResultsAdd(.kv.opt.MetricsEnabled, 1)
	}
	return , nil
}

func appendIteratorsReversed( []y.Iterator,  []*table.Table,  int) []y.Iterator {
	for  := len() - 1;  >= 0; -- {
		// This will increment the reference of the table handler.
		 = append(, [].NewIterator())
	}
	return 
}

// appendIterators appends iterators to an array of iterators, for merging.
// Note: This obtains references for the table handlers. Remember to close these iterators.
func ( *levelsController) (
	 []y.Iterator,  *IteratorOptions) []y.Iterator {
	// Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing
	// data when there's a compaction.
	for ,  := range .levels {
		 = .appendIterators(, )
	}
	return 
}

// TableInfo represents the information about a table.
type TableInfo struct {
	ID               uint64
	Level            int
	Left             []byte
	Right            []byte
	KeyCount         uint32 // Number of keys in the table
	OnDiskSize       uint32
	StaleDataSize    uint32
	UncompressedSize uint32
	MaxVersion       uint64
	IndexSz          int
	BloomFilterSize  int
}

func ( *levelsController) () ( []TableInfo) {
	for ,  := range .levels {
		.RLock()
		for ,  := range .tables {
			 := TableInfo{
				ID:               .ID(),
				Level:            .level,
				Left:             .Smallest(),
				Right:            .Biggest(),
				KeyCount:         .KeyCount(),
				OnDiskSize:       .OnDiskSize(),
				StaleDataSize:    .StaleDataSize(),
				IndexSz:          .IndexSize(),
				BloomFilterSize:  .BloomFilterSize(),
				UncompressedSize: .UncompressedSize(),
				MaxVersion:       .MaxVersion(),
			}
			 = append(, )
		}
		.RUnlock()
	}
	sort.Slice(, func(,  int) bool {
		if [].Level != [].Level {
			return [].Level < [].Level
		}
		return [].ID < [].ID
	})
	return
}

type LevelInfo struct {
	Level          int
	NumTables      int
	Size           int64
	TargetSize     int64
	TargetFileSize int64
	IsBaseLevel    bool
	Score          float64
	Adjusted       float64
	StaleDatSize   int64
}

func ( *levelsController) () []LevelInfo {
	 := .levelTargets()
	 := .pickCompactLevels(nil)
	 := make([]LevelInfo, len(.levels))
	for ,  := range .levels {
		.RLock()
		[].Level = 
		[].Size = .totalSize
		[].NumTables = len(.tables)
		[].StaleDatSize = .totalStaleSize

		.RUnlock()

		[].TargetSize = .targetSz[]
		[].TargetFileSize = .fileSz[]
		[].IsBaseLevel = .baseLevel == 
	}
	for ,  := range  {
		[.level].Score = .score
		[.level].Adjusted = .adjusted
	}
	return 
}

// verifyChecksum verifies checksum for all tables on all levels.
func ( *levelsController) () error {
	var  []*table.Table
	for ,  := range .levels {
		.RLock()
		 = [:0]
		for ,  := range .tables {
			 = append(, )
			.IncrRef()
		}
		.RUnlock()

		for ,  := range  {
			 := .VerifyChecksum()
			if  := .DecrRef();  != nil {
				.kv.opt.Errorf("unable to decrease reference of table: %s while "+
					"verifying checksum with error: %s", .Filename(), )
			}

			if  != nil {
				return 
			}
		}
	}

	return nil
}

// Returns the sorted list of splits for all the levels and tables based
// on the block offsets.
func ( *levelsController) ( int,  []byte) []string {
	 := make([]string, 0)
	for ,  := range .levels {
		.RLock()
		for ,  := range .tables {
			 := .KeySplits(, )
			 = append(, ...)
		}
		.RUnlock()
	}
	sort.Strings()
	return 
}