/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package badger

import (
	
	
	
	
	
	
	
	
	
	
	
	
	
	

	
	

	
	
)

// maxVlogFileSize is the maximum size of the vlog file which can be created. Vlog Offset is of
// uint32, so limiting at max uint32.
var maxVlogFileSize uint32 = math.MaxUint32

// Values have their first byte being byteData or byteDelete. This helps us distinguish between
// a key that has never been seen and a key that has been explicitly deleted.
const (
	bitDelete                 byte = 1 << 0 // Set if the key has been deleted.
	bitValuePointer           byte = 1 << 1 // Set if the value is NOT stored directly next to key.
	bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
	// Set if item shouldn't be discarded via compactions (used by merge operator)
	bitMergeEntry byte = 1 << 3
	// The MSB 2 bits are for transactions.
	bitTxn    byte = 1 << 6 // Set if the entry is part of a txn.
	bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.

	mi int64 = 1 << 20 //nolint:unused

	// size of vlog header.
	// +----------------+------------------+
	// | keyID(8 bytes) |  baseIV(12 bytes)|
	// +----------------+------------------+
	vlogHeaderSize = 20
)

var errStop = errors.New("Stop iteration")
var errTruncate = errors.New("Do truncate")

type logEntry func(e Entry, vp valuePointer) error

type safeRead struct {
	k []byte
	v []byte

	recordOffset uint32
	lf           *logFile
}

// hashReader implements io.Reader, io.ByteReader interfaces. It also keeps track of the number
// bytes read. The hashReader writes to h (hash) what it reads from r.
type hashReader struct {
	r         io.Reader
	h         hash.Hash32
	bytesRead int // Number of bytes read.
}

func newHashReader( io.Reader) *hashReader {
	 := crc32.New(y.CastagnoliCrcTable)
	return &hashReader{
		r: ,
		h: ,
	}
}

// Read reads len(p) bytes from the reader. Returns the number of bytes read, error on failure.
func ( *hashReader) ( []byte) (int, error) {
	,  := .r.Read()
	if  != nil {
		return , 
	}
	.bytesRead += 
	return .h.Write([:])
}

// ReadByte reads exactly one byte from the reader. Returns error on failure.
func ( *hashReader) () (byte, error) {
	 := make([]byte, 1)
	,  := .Read()
	return [0], 
}

// Sum32 returns the sum32 of the underlying hash.
func ( *hashReader) () uint32 {
	return .h.Sum32()
}

// Entry reads an entry from the provided reader. It also validates the checksum for every entry
// read. Returns error on failure.
func ( *safeRead) ( io.Reader) (*Entry, error) {
	 := newHashReader()
	var  header
	,  := .DecodeFrom()
	if  != nil {
		return nil, 
	}
	if .klen > uint32(1<<16) { // Key length must be below uint16.
		return nil, errTruncate
	}
	 := int(.klen)
	if cap(.k) <  {
		.k = make([]byte, 2*)
	}
	 := int(.vlen)
	if cap(.v) <  {
		.v = make([]byte, 2*)
	}

	 := &Entry{}
	.offset = .recordOffset
	.hlen = 
	 := make([]byte, .klen+.vlen)
	if ,  := io.ReadFull(, [:]);  != nil {
		if  == io.EOF {
			 = errTruncate
		}
		return nil, 
	}
	if .lf.encryptionEnabled() {
		if ,  = .lf.decryptKV([:], .recordOffset);  != nil {
			return nil, 
		}
	}
	.Key = [:.klen]
	.Value = [.klen:]
	var  [crc32.Size]byte
	if ,  := io.ReadFull(, [:]);  != nil {
		if  == io.EOF {
			 = errTruncate
		}
		return nil, 
	}
	 := y.BytesToU32([:])
	if  != .Sum32() {
		return nil, errTruncate
	}
	.meta = .meta
	.UserMeta = .userMeta
	.ExpiresAt = .expiresAt
	return , nil
}

func ( *valueLog) ( *logFile) error {
	.filesLock.RLock()
	for ,  := range .filesToBeDeleted {
		if  == .fid {
			.filesLock.RUnlock()
			return fmt.Errorf("value log file already marked for deletion fid: %d", )
		}
	}
	 := .maxFid
	y.AssertTruef(.fid < , "fid to move: %d. Current max fid: %d", .fid, )
	.filesLock.RUnlock()

	.opt.Infof("Rewriting fid: %d", .fid)
	 := make([]*Entry, 0, 1000)
	var  int64

	y.AssertTrue(.db != nil)
	var ,  int
	 := func( Entry) error {
		++
		if %100000 == 0 {
			.opt.Debugf("Processing entry %d", )
		}

		,  := .db.get(.Key)
		if  != nil {
			return 
		}
		if discardEntry(, , .db) {
			return nil
		}

		// Value is still present in value log.
		if len(.Value) == 0 {
			return fmt.Errorf("Empty value: %+v", )
		}
		var  valuePointer
		.Decode(.Value)

		// If the entry found from the LSM Tree points to a newer vlog file, don't do anything.
		if .Fid > .fid {
			return nil
		}
		// If the entry found from the LSM Tree points to an offset greater than the one
		// read from vlog, don't do anything.
		if .Offset > .offset {
			return nil
		}
		// If the entry read from LSM Tree and vlog file point to the same vlog file and offset,
		// insert them back into the DB.
		// NOTE: It might be possible that the entry read from the LSM Tree points to
		// an older vlog file. See the comments in the else part.
		if .Fid == .fid && .Offset == .offset {
			++
			// This new entry only contains the key, and a pointer to the value.
			 := new(Entry)
			// Remove only the bitValuePointer and transaction markers. We
			// should keep the other bits.
			.meta = .meta &^ (bitValuePointer | bitTxn | bitFinTxn)
			.UserMeta = .UserMeta
			.ExpiresAt = .ExpiresAt
			.Key = append([]byte{}, .Key...)
			.Value = append([]byte{}, .Value...)
			 := .estimateSizeAndSetThreshold(.db.valueThreshold())
			// Consider size of value as well while considering the total size
			// of the batch. There have been reports of high memory usage in
			// rewrite because we don't consider the value size. See #1292.
			 += int64(len(.Value))

			// Ensure length and size of wb is within transaction limits.
			if int64(len()+1) >= .opt.maxBatchCount ||
				+ >= .opt.maxBatchSize {
				if  := .db.batchSet();  != nil {
					return 
				}
				 = 0
				 = [:0]
			}
			 = append(, )
			 += 
		} else { //nolint:staticcheck
			// It might be possible that the entry read from LSM Tree points to
			// an older vlog file.  This can happen in the following situation.
			// Assume DB is opened with
			// numberOfVersionsToKeep=1
			//
			// Now, if we have ONLY one key in the system "FOO" which has been
			// updated 3 times and the same key has been garbage collected 3
			// times, we'll have 3 versions of the movekey
			// for the same key "FOO".
			//
			// NOTE: moveKeyi is the gc'ed version of the original key with version i
			// We're calling the gc'ed keys as moveKey to simplify the
			// explanation. We used to add move keys but we no longer do that.
			//
			// Assume we have 3 move keys in L0.
			// - moveKey1 (points to vlog file 10),
			// - moveKey2 (points to vlog file 14) and
			// - moveKey3 (points to vlog file 15).
			//
			// Also, assume there is another move key "moveKey1" (points to
			// vlog file 6) (this is also a move Key for key "FOO" ) on upper
			// levels (let's say 3). The move key "moveKey1" on level 0 was
			// inserted because vlog file 6 was GCed.
			//
			// Here's what the arrangement looks like
			// L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15)
			// L1 => ....
			// L2 => ....
			// L3 => (moveKey1 => vlog6)
			//
			// When L0 compaction runs, it keeps only moveKey3 because the number of versions
			// to keep is set to 1. (we've dropped moveKey1's latest version)
			//
			// The new arrangement of keys is
			// L0 => ....
			// L1 => (moveKey3 => vlog15)
			// L2 => ....
			// L3 => (moveKey1 => vlog6)
			//
			// Now if we try to GC vlog file 10, the entry read from vlog file
			// will point to vlog10 but the entry read from LSM Tree will point
			// to vlog6. The move key read from LSM tree will point to vlog6
			// because we've asked for version 1 of the move key.
			//
			// This might seem like an issue but it's not really an issue
			// because the user has set the number of versions to keep to 1 and
			// the latest version of moveKey points to the correct vlog file
			// and offset. The stale move key on L3 will be eventually dropped
			// by compaction because there is a newer versions in the upper
			// levels.
		}
		return nil
	}

	,  := .iterate(.opt.ReadOnly, 0, func( Entry,  valuePointer) error {
		return ()
	})
	if  != nil {
		return 
	}

	 := 1024
	var  int
	for  := 0;  < len(); {
		++
		if  == 0 {
			.db.opt.Warningf("We shouldn't reach batch size of zero.")
			return ErrNoRewrite
		}
		 :=  + 
		if  > len() {
			 = len()
		}
		if  := .db.batchSet([:]);  != nil {
			if  == ErrTxnTooBig {
				// Decrease the batch size to half.
				 =  / 2
				continue
			}
			return 
		}
		 += 
	}
	.opt.Infof("Processed %d entries in %d loops", len(), )
	.opt.Infof("Total entries: %d. Moved: %d", , )
	.opt.Infof("Removing fid: %d", .fid)
	var  bool
	// Entries written to LSM. Remove the older file now.
	{
		.filesLock.Lock()
		// Just a sanity-check.
		if ,  := .filesMap[.fid]; ! {
			.filesLock.Unlock()
			return fmt.Errorf("Unable to find fid: %d", .fid)
		}
		if .iteratorCount() == 0 {
			delete(.filesMap, .fid)
			 = true
		} else {
			.filesToBeDeleted = append(.filesToBeDeleted, .fid)
		}
		.filesLock.Unlock()
	}

	if  {
		if  := .deleteLogFile();  != nil {
			return 
		}
	}
	return nil
}

func ( *valueLog) () {
	.numActiveIterators.Add(1)
}

func ( *valueLog) () int {
	return int(.numActiveIterators.Load())
}

func ( *valueLog) () error {
	 := .numActiveIterators.Add(-1)
	if  != 0 {
		return nil
	}

	.filesLock.Lock()
	 := make([]*logFile, 0, len(.filesToBeDeleted))
	for ,  := range .filesToBeDeleted {
		 = append(, .filesMap[])
		delete(.filesMap, )
	}
	.filesToBeDeleted = nil
	.filesLock.Unlock()

	for ,  := range  {
		if  := .deleteLogFile();  != nil {
			return 
		}
	}
	return nil
}

func ( *valueLog) ( *logFile) error {
	if  == nil {
		return nil
	}
	.lock.Lock()
	defer .lock.Unlock()
	// Delete fid from discard stats as well.
	.discardStats.Update(.fid, -1)

	return .Delete()
}

func ( *valueLog) () (int, error) {
	// If db is opened in InMemory mode, we don't need to do anything since there are no vlog files.
	if .db.opt.InMemory {
		return 0, nil
	}
	// We don't want to block dropAll on any pending transactions. So, don't worry about iterator
	// count.
	var  int
	 := func() error {
		.filesLock.Lock()
		defer .filesLock.Unlock()
		for ,  := range .filesMap {
			if  := .deleteLogFile();  != nil {
				return 
			}
			++
		}
		.filesMap = make(map[uint32]*logFile)
		.maxFid = 0
		return nil
	}
	if  := ();  != nil {
		return , 
	}

	.db.opt.Infof("Value logs deleted. Creating value log file: 1")
	if ,  := .createVlogFile();  != nil { // Called while writes are stopped.
		return , 
	}
	return , nil
}

func ( *DB) () int64 {
	return .threshold.valueThreshold.Load()
}

type valueLog struct {
	dirPath string

	// guards our view of which files exist, which to be deleted, how many active iterators
	filesLock        sync.RWMutex
	filesMap         map[uint32]*logFile
	maxFid           uint32
	filesToBeDeleted []uint32
	// A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
	numActiveIterators atomic.Int32

	db                *DB
	writableLogOffset atomic.Uint32 // read by read, written by write
	numEntriesWritten uint32
	opt               Options

	garbageCh    chan struct{}
	discardStats *discardStats
}

func vlogFilePath( string,  uint32) string {
	return fmt.Sprintf("%s%s%06d.vlog", , string(os.PathSeparator), )
}

func ( *valueLog) ( uint32) string {
	return vlogFilePath(.dirPath, )
}

func ( *valueLog) () error {
	.filesMap = make(map[uint32]*logFile)

	,  := os.ReadDir(.dirPath)
	if  != nil {
		return errFile(, .dirPath, "Unable to open log dir.")
	}

	 := make(map[uint64]struct{})
	for ,  := range  {
		if !strings.HasSuffix(.Name(), ".vlog") {
			continue
		}
		 := len(.Name())
		,  := strconv.ParseUint(.Name()[:-5], 10, 32)
		if  != nil {
			return errFile(, .Name(), "Unable to parse log id.")
		}
		if ,  := [];  {
			return errFile(, .Name(), "Duplicate file found. Please delete one.")
		}
		[] = struct{}{}

		 := &logFile{
			fid:      uint32(),
			path:     .fpath(uint32()),
			registry: .db.registry,
		}
		.filesMap[uint32()] = 
		if .maxFid < uint32() {
			.maxFid = uint32()
		}
	}
	return nil
}

func ( *valueLog) () (*logFile, error) {
	 := .maxFid + 1
	 := .fpath()
	 := &logFile{
		fid:      ,
		path:     ,
		registry: .db.registry,
		writeAt:  vlogHeaderSize,
		opt:      .opt,
	}
	 := .open(, os.O_RDWR|os.O_CREATE|os.O_EXCL, 2*.opt.ValueLogFileSize)
	if  != z.NewFile &&  != nil {
		return nil, 
	}

	.filesLock.Lock()
	.filesMap[] = 
	y.AssertTrue(.maxFid < )
	.maxFid = 
	// writableLogOffset is only written by write func, by read by Read func.
	// To avoid a race condition, all reads and updates to this variable must be
	// done via atomics.
	.writableLogOffset.Store(vlogHeaderSize)
	.numEntriesWritten = 0
	.filesLock.Unlock()

	return , nil
}

func errFile( error,  string,  string) error {
	return fmt.Errorf("%s. Path=%s. Error=%v", , , )
}

// init initializes the value log struct. This initialization needs to happen
// before compactions start.
func ( *valueLog) ( *DB) {
	.opt = .opt
	.db = 
	// We don't need to open any vlog files or collect stats for GC if DB is opened
	// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
	if .opt.InMemory {
		return
	}
	.dirPath = .opt.ValueDir

	.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
	,  := InitDiscardStats(.opt)
	y.Check()
	.discardStats = 
	// See TestPersistLFDiscardStats for purpose of statement below.
	.logToSyncChan(endVLogInitMsg)
}

func ( *valueLog) ( *DB) error {
	// We don't need to open any vlog files or collect stats for GC if DB is opened
	// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
	if .opt.InMemory {
		return nil
	}

	if  := .populateFilesMap();  != nil {
		return 
	}
	// If no files are found, then create a new file.
	if len(.filesMap) == 0 {
		if .opt.ReadOnly {
			return nil
		}
		,  := .createVlogFile()
		return y.Wrapf(, "Error while creating log file in valueLog.open")
	}
	 := .sortedFids()
	for ,  := range  {
		,  := .filesMap[]
		y.AssertTrue()

		// Just open in RDWR mode. This should not create a new log file.
		.opt = .opt
		if  := .open(.fpath(), os.O_RDWR,
			2*.opt.ValueLogFileSize);  != nil {
			return y.Wrapf(, "Open existing file: %q", .path)
		}
		// We shouldn't delete the maxFid file.
		if .size.Load() == vlogHeaderSize &&  != .maxFid {
			.opt.Infof("Deleting empty file: %s", .path)
			if  := .Delete();  != nil {
				return y.Wrapf(, "while trying to delete empty file: %s", .path)
			}
			delete(.filesMap, )
		}
	}

	if .opt.ReadOnly {
		return nil
	}
	// Now we can read the latest value log file, and see if it needs truncation. We could
	// technically do this over all the value log files, but that would mean slowing down the value
	// log open.
	,  := .filesMap[.maxFid]
	y.AssertTrue()
	,  := .iterate(.opt.ReadOnly, vlogHeaderSize,
		func( Entry,  valuePointer) error {
			return nil
		})
	if  != nil {
		return y.Wrapf(, "while iterating over: %s", .path)
	}
	if  := .Truncate(int64());  != nil {
		return y.Wrapf(, "while truncating last value log file: %s", .path)
	}

	// Don't write to the old log file. Always create a new one.
	if ,  := .createVlogFile();  != nil {
		return y.Wrapf(, "Error while creating log file in valueLog.open")
	}
	return nil
}

func ( *valueLog) () error {
	if  == nil || .db == nil || .db.opt.InMemory {
		return nil
	}

	.opt.Debugf("Stopping garbage collection of values.")
	var  error
	for ,  := range .filesMap {
		.lock.Lock() // We won’t release the lock.
		 := int64(-1)

		if !.opt.ReadOnly &&  == .maxFid {
			 = int64(.woffset())
		}
		if  := .Close();  != nil &&  == nil {
			 = 
		}
	}
	if .discardStats != nil {
		.db.captureDiscardStats()
		if  := .discardStats.Close(-1);  != nil &&  == nil {
			 = 
		}
	}
	return 
}

// sortedFids returns the file id's not pending deletion, sorted.  Assumes we have shared access to
// filesMap.
func ( *valueLog) () []uint32 {
	 := make(map[uint32]struct{})
	for ,  := range .filesToBeDeleted {
		[] = struct{}{}
	}
	 := make([]uint32, 0, len(.filesMap))
	for  := range .filesMap {
		if ,  := []; ! {
			 = append(, )
		}
	}
	sort.Slice(, func(,  int) bool {
		return [] < []
	})
	return 
}

type request struct {
	// Input values
	Entries []*Entry
	// Output values and wait group stuff below
	Ptrs []valuePointer
	Wg   sync.WaitGroup
	Err  error
	ref  atomic.Int32
}

func ( *request) () {
	.Entries = .Entries[:0]
	.Ptrs = .Ptrs[:0]
	.Wg = sync.WaitGroup{}
	.Err = nil
	.ref.Store(0)
}

func ( *request) () {
	.ref.Add(1)
}

func ( *request) () {
	 := .ref.Add(-1)
	if  > 0 {
		return
	}
	.Entries = nil
	requestPool.Put()
}

func ( *request) () error {
	.Wg.Wait()
	 := .Err
	.DecrRef() // DecrRef after writing to DB.
	return 
}

type requests []*request

func ( requests) () {
	for ,  := range  {
		.DecrRef()
	}
}

func ( requests) () {
	for ,  := range  {
		.IncrRef()
	}
}

// sync function syncs content of latest value log file to disk. Syncing of value log directory is
// not required here as it happens every time a value log file rotation happens(check createVlogFile
// function). During rotation, previous value log file also gets synced to disk. It only syncs file
// if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with
// fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32.
func ( *valueLog) () error {
	if .opt.SyncWrites || .opt.InMemory {
		return nil
	}

	.filesLock.RLock()
	 := .maxFid
	 := .filesMap[]
	// Sometimes it is possible that vlog.maxFid has been increased but file creation
	// with same id is still in progress and this function is called. In those cases
	// entry for the file might not be present in vlog.filesMap.
	if  == nil {
		.filesLock.RUnlock()
		return nil
	}
	.lock.RLock()
	.filesLock.RUnlock()

	 := .Sync()
	.lock.RUnlock()
	return 
}

func ( *valueLog) () uint32 {
	return .writableLogOffset.Load()
}

// validateWrites will check whether the given requests can fit into 4GB vlog file.
// NOTE: 4GB is the maximum size we can create for vlog because value pointer offset is of type
// uint32. If we create more than 4GB, it will overflow uint32. So, limiting the size to 4GB.
func ( *valueLog) ( []*request) error {
	 := uint64(.woffset())
	for ,  := range  {
		// calculate size of the request.
		 := estimateRequestSize()
		 :=  + 
		if  > uint64(maxVlogFileSize) {
			return fmt.Errorf("Request size offset %d is bigger than maximum offset %d",
				, maxVlogFileSize)
		}

		if  >= uint64(.opt.ValueLogFileSize) {
			// We'll create a new vlog file if the estimated offset is greater or equal to
			// max vlog size. So, resetting the vlogOffset.
			 = 0
			continue
		}
		// Estimated vlog offset will become current vlog offset if the vlog is not rotated.
		 = 
	}
	return nil
}

// estimateRequestSize returns the size that needed to be written for the given request.
func estimateRequestSize( *request) uint64 {
	 := uint64(0)
	for ,  := range .Entries {
		 += uint64(maxHeaderSize + len(.Key) + len(.Value) + crc32.Size)
	}
	return 
}

// write is thread-unsafe by design and should not be called concurrently.
func ( *valueLog) ( []*request) error {
	if .db.opt.InMemory {
		return nil
	}
	// Validate writes before writing to vlog. Because, we don't want to partially write and return
	// an error.
	if  := .validateWrites();  != nil {
		return y.Wrapf(, "while validating writes")
	}

	.filesLock.RLock()
	 := .maxFid
	 := .filesMap[]
	.filesLock.RUnlock()

	defer func() {
		if .opt.SyncWrites {
			if  := .Sync();  != nil {
				.opt.Errorf("Error while curlf sync: %v\n", )
			}
		}
	}()

	 := func( *bytes.Buffer) error {
		if .Len() == 0 {
			return nil
		}

		 := uint32(.Len())
		 := .writableLogOffset.Add()
		// Increase the file size if we cannot accommodate this entry.
		// [Aman] Should this be >= or just >? Doesn't make sense to extend the file if it big enough already.
		if int() >= len(.Data) {
			if  := .Truncate(int64());  != nil {
				return 
			}
		}

		 := int( - )
		y.AssertTrue(copy(.Data[:], .Bytes()) == int())

		.size.Store()
		return nil
	}

	 := func() error {
		if .woffset() > uint32(.opt.ValueLogFileSize) ||
			.numEntriesWritten > .opt.ValueLogMaxEntries {
			if  := .doneWriting(.woffset());  != nil {
				return 
			}

			,  := .createVlogFile()
			if  != nil {
				return 
			}
			 = 
		}
		return nil
	}

	 := new(bytes.Buffer)
	for  := range  {
		 := []
		.Ptrs = .Ptrs[:0]
		var ,  int
		 := make([]int64, 0, len(.Entries))
		for  := range .Entries {
			.Reset()

			 := .Entries[]
			 = append(, int64(len(.Value)))
			if .skipVlogAndSetThreshold(.db.valueThreshold()) {
				.Ptrs = append(.Ptrs, valuePointer{})
				continue
			}
			var  valuePointer

			.Fid = .fid
			.Offset = .woffset()

			// We should not store transaction marks in the vlog file because it will never have all
			// the entries in a transaction. If we store entries with transaction marks then value
			// GC will not be able to iterate on the entire vlog file.
			// But, we still want the entry to stay intact for the memTable WAL. So, store the meta
			// in a temporary variable and reassign it after writing to the value log.
			 := .meta
			.meta = .meta &^ (bitTxn | bitFinTxn)
			,  := .encodeEntry(, , .Offset) // Now encode the entry into buffer.
			if  != nil {
				return 
			}
			// Restore the meta.
			.meta = 

			.Len = uint32()
			.Ptrs = append(.Ptrs, )
			if  := ();  != nil {
				return 
			}
			++
			 += .Len()
			// No need to flush anything, we write to file directly via mmap.
		}
		y.NumWritesVlogAdd(.opt.MetricsEnabled, int64())
		y.NumBytesWrittenVlogAdd(.opt.MetricsEnabled, int64())

		.numEntriesWritten += uint32()
		.db.threshold.update()
		// We write to disk here so that all entries that are part of the same transaction are
		// written to the same vlog file.
		if  := ();  != nil {
			return 
		}
	}
	return ()
}

// Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
// (if non-nil)
func ( *valueLog) ( valuePointer) (*logFile, error) {
	.filesLock.RLock()
	defer .filesLock.RUnlock()
	,  := .filesMap[.Fid]
	if ! {
		// log file has gone away, we can't do anything. Return.
		return nil, fmt.Errorf("file with ID: %d not found", .Fid)
	}

	// Check for valid offset if we are reading from writable log.
	 := .maxFid
	// In read-only mode we don't need to check for writable offset as we are not writing anything.
	// Moreover, this offset is not set in readonly mode.
	if !.opt.ReadOnly && .Fid ==  {
		 := .woffset()
		if .Offset >=  {
			return nil, fmt.Errorf(
				"Invalid value pointer offset: %d greater than current offset: %d",
				.Offset, )
		}
	}

	.lock.RLock()
	return , nil
}

// Read reads the value log at a given location.
// TODO: Make this read private.
func ( *valueLog) ( valuePointer,  *y.Slice) ([]byte, func(), error) {
	, ,  := .readValueBytes()
	// log file is locked so, decide whether to lock immediately or let the caller to
	// unlock it, after caller uses it.
	 := .getUnlockCallback()
	if  != nil {
		return nil, , 
	}

	if .opt.VerifyValueChecksum {
		 := crc32.New(y.CastagnoliCrcTable)
		if ,  := .Write([:len()-crc32.Size]);  != nil {
			runCallback()
			return nil, nil, y.Wrapf(, "failed to write hash for vp %+v", )
		}
		// Fetch checksum from the end of the buffer.
		 := [len()-crc32.Size:]
		if .Sum32() != y.BytesToU32() {
			runCallback()
			return nil, nil, y.Wrapf(y.ErrChecksumMismatch, "value corrupted for vp: %+v", )
		}
	}
	var  header
	 := .Decode()
	 := [:]
	if .encryptionEnabled() {
		,  = .decryptKV(, .Offset)
		if  != nil {
			return nil, , 
		}
	}
	if uint32(len()) < .klen+.vlen {
		.db.opt.Errorf("Invalid read: vp: %+v", )
		return nil, nil, fmt.Errorf("Invalid read: Len: %d read at:[%d:%d]",
			len(), .klen, .klen+.vlen)
	}
	return [.klen : .klen+.vlen], , nil
}

// getUnlockCallback will returns a function which unlock the logfile if the logfile is mmaped.
// otherwise, it unlock the logfile and return nil.
func ( *valueLog) ( *logFile) func() {
	if  == nil {
		return nil
	}
	return .lock.RUnlock
}

// readValueBytes return vlog entry slice and read locked log file. Caller should take care of
// logFile unlocking.
func ( *valueLog) ( valuePointer) ([]byte, *logFile, error) {
	,  := .getFileRLocked()
	if  != nil {
		return nil, nil, 
	}

	,  := .read()
	y.NumReadsVlogAdd(.db.opt.MetricsEnabled, 1)
	y.NumBytesReadsVlogAdd(.db.opt.MetricsEnabled, int64(len()))
	return , , 
}

func ( *valueLog) ( float64) *logFile {
	.filesLock.RLock()
	defer .filesLock.RUnlock()

:
	// Pick a candidate that contains the largest amount of discardable data
	,  := .discardStats.MaxDiscard()

	// MaxDiscard will return fid=0 if it doesn't have any discard data. The
	// vlog files start from 1.
	if  == 0 {
		.opt.Debugf("No file with discard stats")
		return nil
	}
	,  := .filesMap[]
	// This file was deleted but it's discard stats increased because of compactions. The file
	// doesn't exist so we don't need to do anything. Skip it and retry.
	if ! {
		.discardStats.Update(, -1)
		goto 
	}
	// We have a valid file.
	,  := .Fd.Stat()
	if  != nil {
		.opt.Errorf("Unable to get stats for value log fid: %d err: %+v", , )
		return nil
	}
	if  :=  * float64(.Size()); float64() <  {
		.opt.Debugf("Discard: %d less than threshold: %.0f for file: %s",
			, , .Name())
		return nil
	}
	if  < .maxFid {
		.opt.Infof("Found value log max discard fid: %d discard: %d\n", , )
		,  := .filesMap[]
		y.AssertTrue()
		return 
	}

	// Don't randomly pick any value log file.
	return nil
}

func discardEntry( Entry,  y.ValueStruct,  *DB) bool {
	if .Version != y.ParseTs(.Key) {
		// Version not found. Discard.
		return true
	}
	if isDeletedOrExpired(.Meta, .ExpiresAt) {
		return true
	}
	if (.Meta & bitValuePointer) == 0 {
		// Key also stores the value in LSM. Discard.
		return true
	}
	if (.Meta & bitFinTxn) > 0 {
		// Just a txn finish entry. Discard.
		return true
	}
	return false
}

func ( *valueLog) ( *logFile) error {
	,  := otel.Tracer("").Start(context.TODO(), "Badger.GC")
	.SetAttributes(attribute.String("GC rewrite for", .path))
	defer .End()
	if  := .rewrite();  != nil {
		return 
	}
	// Remove the file from discardStats.
	.discardStats.Update(.fid, -1)
	return nil
}

func ( *valueLog) ( *z.Closer) {
	defer .Done()

	<-.HasBeenClosed() // Wait for lc to be closed.

	// Block any GC in progress to finish, and don't allow any more writes to runGC by filling up
	// the channel of size 1.
	.garbageCh <- struct{}{}
}

func ( *valueLog) ( float64) error {
	select {
	case .garbageCh <- struct{}{}:
		// Pick a log file for GC.
		defer func() {
			<-.garbageCh
		}()

		 := .pickLog()
		if  == nil {
			return ErrNoRewrite
		}
		return .doRunGC()
	default:
		return ErrRejected
	}
}

func ( *valueLog) ( map[uint32]int64) {
	if .opt.InMemory {
		return
	}
	for ,  := range  {
		.discardStats.Update(, )
	}
	// The following is to coordinate with some test cases where we want to
	// verify that at least one iteration of updateDiscardStats has been completed.
	.db.logToSyncChan(updateDiscardStatsMsg)
}

type vlogThreshold struct {
	logger         Logger
	percentile     float64
	valueThreshold atomic.Int64
	valueCh        chan []int64
	clearCh        chan bool
	closer         *z.Closer
	// Metrics contains a running log of statistics like amount of data stored etc.
	vlMetrics *z.HistogramData
}

func initVlogThreshold( *Options) *vlogThreshold {
	 := func() []float64 {
		 := .maxValueThreshold
		 := float64(.ValueThreshold)
		y.AssertTruef( >= , "maximum threshold bound is less than the min threshold")
		 := math.Min(-+1, 1024.0)
		 := ( - ) / 
		 := make([]float64, int64())
		for  := range  {
			if  == 0 {
				[0] = 
				continue
			}
			if  == int(-1) {
				[] = 
				continue
			}
			[] = [-1] + 
		}
		return 
	}
	 := &vlogThreshold{
		logger:     .Logger,
		percentile: .VLogPercentile,
		valueCh:    make(chan []int64, 1000),
		clearCh:    make(chan bool, 1),
		closer:     z.NewCloser(1),
		vlMetrics:  z.NewHistogramData(()),
	}
	.valueThreshold.Store(.ValueThreshold)
	return 
}

func ( *vlogThreshold) ( Options) {
	.valueThreshold.Store(.ValueThreshold)
	.clearCh <- true
}

func ( *vlogThreshold) ( []int64) {
	.valueCh <- 
}

func ( *vlogThreshold) () {
	.closer.SignalAndWait()
}

func ( *vlogThreshold) () {
	defer .closer.Done()
	for {
		select {
		case <-.closer.HasBeenClosed():
			return
		case  := <-.valueCh:
			for ,  := range  {
				.vlMetrics.Update()
			}
			// we are making it to get Options.VlogPercentile so that values with sizes
			// in range of Options.VlogPercentile will make it to the LSM tree and rest to the
			// value log file.
			 := int64(.vlMetrics.Percentile(.percentile))
			if .valueThreshold.Load() !=  {
				if .logger != nil {
					.logger.Infof("updating value of threshold to: %d", )
				}
				.valueThreshold.Store()
			}
		case <-.clearCh:
			.vlMetrics.Clear()
		}
	}
}