/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package badger

import (
	
	
	

	
	
)

type levelHandler struct {
	// Guards tables, totalSize.
	sync.RWMutex

	// For level >= 1, tables are sorted by key ranges, which do not overlap.
	// For level 0, tables are sorted by time.
	// For level 0, newest table are at the back. Compact the oldest one first, which is at the front.
	tables         []*table.Table
	totalSize      int64
	totalStaleSize int64

	// The following are initialized once and const.
	level    int
	strLevel string
	db       *DB
}

func ( *levelHandler) () bool {
	return .level == .db.opt.MaxLevels-1
}

func ( *levelHandler) () int64 {
	.RLock()
	defer .RUnlock()
	return .totalStaleSize
}

func ( *levelHandler) () int64 {
	.RLock()
	defer .RUnlock()
	return .totalSize
}

// initTables replaces s.tables with given tables. This is done during loading.
func ( *levelHandler) ( []*table.Table) {
	.Lock()
	defer .Unlock()

	.tables = 
	.totalSize = 0
	.totalStaleSize = 0
	for ,  := range  {
		.addSize()
	}

	if .level == 0 {
		// Key range will overlap. Just sort by fileID in ascending order
		// because newer tables are at the end of level 0.
		sort.Slice(.tables, func(,  int) bool {
			return .tables[].ID() < .tables[].ID()
		})
	} else {
		// Sort tables by keys.
		sort.Slice(.tables, func(,  int) bool {
			return y.CompareKeys(.tables[].Smallest(), .tables[].Smallest()) < 0
		})
	}
}

// deleteTables remove tables idx0, ..., idx1-1.
func ( *levelHandler) ( []*table.Table) error {
	.Lock() // s.Unlock() below

	 := make(map[uint64]struct{})
	for ,  := range  {
		[.ID()] = struct{}{}
	}

	// Make a copy as iterators might be keeping a slice of tables.
	var  []*table.Table
	for ,  := range .tables {
		,  := [.ID()]
		if ! {
			 = append(, )
			continue
		}
		.subtractSize()
	}
	.tables = 

	.Unlock() // Unlock s _before_ we DecrRef our tables, which can be slow.

	return decrRefs()
}

// replaceTables will replace tables[left:right] with newTables. Note this EXCLUDES tables[right].
// You must call decr() to delete the old tables _after_ writing the update to the manifest.
func ( *levelHandler) (,  []*table.Table) error {
	// Need to re-search the range of tables in this level to be replaced as other goroutines might
	// be changing it as well.  (They can't touch our tables, but if they add/remove other tables,
	// the indices get shifted around.)
	.Lock() // We s.Unlock() below.

	 := make(map[uint64]struct{})
	for ,  := range  {
		[.ID()] = struct{}{}
	}
	var  []*table.Table
	for ,  := range .tables {
		,  := [.ID()]
		if ! {
			 = append(, )
			continue
		}
		.subtractSize()
	}

	// Increase totalSize first.
	for ,  := range  {
		.addSize()
		.IncrRef()
		 = append(, )
	}

	// Assign tables.
	.tables = 
	sort.Slice(.tables, func(,  int) bool {
		return y.CompareKeys(.tables[].Smallest(), .tables[].Smallest()) < 0
	})
	.Unlock() // s.Unlock before we DecrRef tables -- that can be slow.
	return decrRefs()
}

// addTable adds toAdd table to levelHandler. Normally when we add tables to levelHandler, we sort
// tables based on table.Smallest. This is required for correctness of the system. But in case of
// stream writer this can be avoided. We can just add tables to levelHandler's table list
// and after all addTable calls, we can sort table list(check sortTable method).
// NOTE: levelHandler.sortTables() should be called after call addTable calls are done.
func ( *levelHandler) ( *table.Table) {
	.Lock()
	defer .Unlock()

	.addSize() // Increase totalSize first.
	.IncrRef()
	.tables = append(.tables, )
}

// sortTables sorts tables of levelHandler based on table.Smallest.
// Normally it should be called after all addTable calls.
func ( *levelHandler) () {
	.Lock()
	defer .Unlock()

	sort.Slice(.tables, func(,  int) bool {
		return y.CompareKeys(.tables[].Smallest(), .tables[].Smallest()) < 0
	})
}

func decrRefs( []*table.Table) error {
	for ,  := range  {
		if  := .DecrRef();  != nil {
			return 
		}
	}
	return nil
}

func newLevelHandler( *DB,  int) *levelHandler {
	return &levelHandler{
		level:    ,
		strLevel: fmt.Sprintf("l%d", ),
		db:       ,
	}
}

// tryAddLevel0Table returns true if ok and no stalling.
func ( *levelHandler) ( *table.Table) bool {
	y.AssertTrue(.level == 0)
	// Need lock as we may be deleting the first table during a level 0 compaction.
	.Lock()
	defer .Unlock()
	// Stall (by returning false) if we are above the specified stall setting for L0.
	if len(.tables) >= .db.opt.NumLevelZeroTablesStall {
		return false
	}

	.tables = append(.tables, )
	.IncrRef()
	.addSize()

	return true
}

// This should be called while holding the lock on the level.
func ( *levelHandler) ( *table.Table) {
	.totalSize += .Size()
	.totalStaleSize += int64(.StaleDataSize())
}

// This should be called while holding the lock on the level.
func ( *levelHandler) ( *table.Table) {
	.totalSize -= .Size()
	.totalStaleSize -= int64(.StaleDataSize())
}
func ( *levelHandler) () int {
	.RLock()
	defer .RUnlock()
	return len(.tables)
}

func ( *levelHandler) () error {
	.RLock()
	defer .RUnlock()
	var  error
	for ,  := range .tables {
		if  := .Close(-1);  != nil &&  == nil {
			 = 
		}
	}
	return y.Wrap(, "levelHandler.close")
}

// getTableForKey acquires a read-lock to access s.tables. It returns a list of tableHandlers.
func ( *levelHandler) ( []byte) ([]*table.Table, func() error) {
	.RLock()
	defer .RUnlock()

	if .level == 0 {
		// For level 0, we need to check every table. Remember to make a copy as s.tables may change
		// once we exit this function, and we don't want to lock s.tables while seeking in tables.
		// CAUTION: Reverse the tables.
		 := make([]*table.Table, 0, len(.tables))
		for  := len(.tables) - 1;  >= 0; -- {
			 = append(, .tables[])
			.tables[].IncrRef()
		}
		return , func() error {
			for ,  := range  {
				if  := .DecrRef();  != nil {
					return 
				}
			}
			return nil
		}
	}
	// For level >= 1, we can do a binary search as key range does not overlap.
	 := sort.Search(len(.tables), func( int) bool {
		return y.CompareKeys(.tables[].Biggest(), ) >= 0
	})
	if  >= len(.tables) {
		// Given key is strictly > than every element we have.
		return nil, func() error { return nil }
	}
	 := .tables[]
	.IncrRef()
	return []*table.Table{}, .DecrRef
}

// get returns value for a given key or the key after that. If not found, return nil.
func ( *levelHandler) ( []byte) (y.ValueStruct, error) {
	,  := .getTableForKey()
	 := y.ParseKey()

	 := y.Hash()
	var  y.ValueStruct
	for ,  := range  {
		if .DoesNotHave() {
			y.NumLSMBloomHitsAdd(.db.opt.MetricsEnabled, .strLevel, 1)
			continue
		}

		 := .NewIterator(0)
		defer .Close()

		y.NumLSMGetsAdd(.db.opt.MetricsEnabled, .strLevel, 1)
		.Seek()
		if !.Valid() {
			continue
		}
		if y.SameKey(, .Key()) {
			if  := y.ParseTs(.Key()); .Version <  {
				 = .ValueCopy()
				.Version = 
			}
		}
	}
	return , ()
}

// appendIterators appends iterators to an array of iterators, for merging.
// Note: This obtains references for the table handlers. Remember to close these iterators.
func ( *levelHandler) ( []y.Iterator,  *IteratorOptions) []y.Iterator {
	.RLock()
	defer .RUnlock()

	var  int
	if .Reverse {
		 = table.REVERSED
	}
	if .level == 0 {
		// Remember to add in reverse order!
		// The newer table at the end of s.tables should be added first as it takes precedence.
		// Level 0 tables are not in key sorted order, so we need to consider them one by one.
		var  []*table.Table
		for ,  := range .tables {
			if .pickTable() {
				 = append(, )
			}
		}
		return appendIteratorsReversed(, , )
	}

	 := .pickTables(.tables)
	if len() == 0 {
		return 
	}
	return append(, table.NewConcatIterator(, ))
}

type levelHandlerRLocked struct{}

// overlappingTables returns the tables that intersect with key range. Returns a half-interval.
// This function should already have acquired a read lock, and this is so important the caller must
// pass an empty parameter declaring such.
func ( *levelHandler) ( levelHandlerRLocked,  keyRange) (int, int) {
	if len(.left) == 0 || len(.right) == 0 {
		return 0, 0
	}
	 := sort.Search(len(.tables), func( int) bool {
		return y.CompareKeys(.left, .tables[].Biggest()) <= 0
	})
	 := sort.Search(len(.tables), func( int) bool {
		return y.CompareKeys(.right, .tables[].Smallest()) < 0
	})
	return , 
}