/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package badger

import (
	
	
	
	
	
	
	

	
	
	
)

type prefetchStatus uint8

const (
	prefetched prefetchStatus = iota + 1
)

// Item is returned during iteration. Both the Key() and Value() output is only valid until
// iterator.Next() is called.
type Item struct {
	key       []byte
	vptr      []byte
	val       []byte
	version   uint64
	expiresAt uint64

	slice *y.Slice // Used only during prefetching.
	next  *Item
	txn   *Txn

	err      error
	wg       sync.WaitGroup
	status   prefetchStatus
	meta     byte // We need to store meta to know about bitValuePointer.
	userMeta byte
}

// String returns a string representation of Item
func ( *Item) () string {
	return fmt.Sprintf("key=%q, version=%d, meta=%x", .Key(), .Version(), .meta)
}

// Key returns the key.
//
// Key is only valid as long as item is valid, or transaction is valid.  If you need to use it
// outside its validity, please use KeyCopy.
func ( *Item) () []byte {
	return .key
}

// KeyCopy returns a copy of the key of the item, writing it to dst slice.
// If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
// returned.
func ( *Item) ( []byte) []byte {
	return y.SafeCopy(, .key)
}

// Version returns the commit timestamp of the item.
func ( *Item) () uint64 {
	return .version
}

// Value retrieves the value of the item from the value log.
//
// This method must be called within a transaction. Calling it outside a
// transaction is considered undefined behavior. If an iterator is being used,
// then Item.Value() is defined in the current iteration only, because items are
// reused.
//
// If you need to use a value outside a transaction, please use Item.ValueCopy
// instead, or copy it yourself. Value might change once discard or commit is called.
// Use ValueCopy if you want to do a Set after Get.
func ( *Item) ( func( []byte) error) error {
	.wg.Wait()
	if .status == prefetched {
		if .err == nil &&  != nil {
			if  := (.val);  != nil {
				return 
			}
		}
		return .err
	}
	, ,  := .yieldItemValue()
	defer runCallback()
	if  != nil {
		return 
	}
	if  != nil {
		return ()
	}
	return nil
}

// ValueCopy returns a copy of the value of the item from the value log, writing it to dst slice.
// If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
// returned. Tip: It might make sense to reuse the returned slice as dst argument for the next call.
//
// This function is useful in long running iterate/update transactions to avoid a write deadlock.
// See Github issue: https://github.com/dgraph-io/badger/issues/315
func ( *Item) ( []byte) ([]byte, error) {
	.wg.Wait()
	if .status == prefetched {
		return y.SafeCopy(, .val), .err
	}
	, ,  := .yieldItemValue()
	defer runCallback()
	return y.SafeCopy(, ), 
}

func ( *Item) () bool {
	if .meta == 0 && .vptr == nil {
		// key not found
		return false
	}
	return true
}

// IsDeletedOrExpired returns true if item contains deleted or expired value.
func ( *Item) () bool {
	return isDeletedOrExpired(.meta, .expiresAt)
}

// DiscardEarlierVersions returns whether the item was created with the
// option to discard earlier versions of a key when multiple are available.
func ( *Item) () bool {
	return .meta&bitDiscardEarlierVersions > 0
}

func ( *Item) () ([]byte, func(), error) {
	 := .Key() // No need to copy.
	if !.hasValue() {
		return nil, nil, nil
	}

	if .slice == nil {
		.slice = new(y.Slice)
	}

	if (.meta & bitValuePointer) == 0 {
		 := .slice.Resize(len(.vptr))
		copy(, .vptr)
		return , nil, nil
	}

	var  valuePointer
	.Decode(.vptr)
	 := .txn.db
	, ,  := .vlog.Read(, .slice)
	if  != nil {
		.opt.Errorf("Unable to read: Key: %v, Version : %v, meta: %v, userMeta: %v"+
			" Error: %v", , .version, .meta, .userMeta, )
		var  *Txn
		if .opt.managedTxns {
			 = .NewTransactionAt(math.MaxUint64, false)
		} else {
			 = .NewTransaction(false)
		}
		defer .Discard()

		 := DefaultIteratorOptions
		.AllVersions = true
		.InternalAccess = true
		.PrefetchValues = false

		 := .NewKeyIterator(.Key(), )
		defer .Close()
		for .Rewind(); .Valid(); .Next() {
			 := .Item()
			var  valuePointer
			if .meta&bitValuePointer > 0 {
				.Decode(.vptr)
			}
			.opt.Errorf("Key: %v, Version : %v, meta: %v, userMeta: %v valuePointer: %+v",
				.Key(), .version, .meta, .userMeta, )
		}
	}
	// Don't return error if we cannot read the value. Just log the error.
	return , , nil
}

func runCallback( func()) {
	if  != nil {
		()
	}
}

func ( *Item) () {
	, ,  := .yieldItemValue()
	defer runCallback()

	.err = 
	.status = prefetched
	if  == nil {
		return
	}
	 := .slice.Resize(len())
	copy(, )
	.val = 
}

// EstimatedSize returns the approximate size of the key-value pair.
//
// This can be called while iterating through a store to quickly estimate the
// size of a range of key-value pairs (without fetching the corresponding
// values).
func ( *Item) () int64 {
	if !.hasValue() {
		return 0
	}
	if (.meta & bitValuePointer) == 0 {
		return int64(len(.key) + len(.vptr))
	}
	var  valuePointer
	.Decode(.vptr)
	return int64(.Len) // includes key length.
}

// KeySize returns the size of the key.
// Exact size of the key is key + 8 bytes of timestamp
func ( *Item) () int64 {
	return int64(len(.key))
}

// ValueSize returns the approximate size of the value.
//
// This can be called to quickly estimate the size of a value without fetching
// it.
func ( *Item) () int64 {
	if !.hasValue() {
		return 0
	}
	if (.meta & bitValuePointer) == 0 {
		return int64(len(.vptr))
	}
	var  valuePointer
	.Decode(.vptr)

	 := int64(len(.key) + 8) // 8 bytes for timestamp.
	// 6 bytes are for the approximate length of the header. Since header is encoded in varint, we
	// cannot find the exact length of header without fetching it.
	return int64(.Len) -  - 6 - crc32.Size
}

// UserMeta returns the userMeta set by the user. Typically, this byte, optionally set by the user
// is used to interpret the value.
func ( *Item) () byte {
	return .userMeta
}

// ExpiresAt returns a Unix time value indicating when the item will be
// considered expired. 0 indicates that the item will never expire.
func ( *Item) () uint64 {
	return .expiresAt
}

// TODO: Switch this to use linked list container in Go.
type list struct {
	head *Item
	tail *Item
}

func ( *list) ( *Item) {
	.next = nil
	if .tail == nil {
		.head = 
		.tail = 
		return
	}
	.tail.next = 
	.tail = 
}

func ( *list) () *Item {
	if .head == nil {
		return nil
	}
	 := .head
	if .head == .tail {
		.tail = nil
		.head = nil
	} else {
		.head = .next
	}
	.next = nil
	return 
}

// IteratorOptions is used to set options when iterating over Badger key-value
// stores.
//
// This package provides DefaultIteratorOptions which contains options that
// should work for most applications. Consider using that as a starting point
// before customizing it for your own needs.
type IteratorOptions struct {
	// PrefetchSize is the number of KV pairs to prefetch while iterating.
	// Valid only if PrefetchValues is true.
	PrefetchSize int
	// PrefetchValues Indicates whether we should prefetch values during
	// iteration and store them.
	PrefetchValues bool
	Reverse        bool // Direction of iteration. False is forward, true is backward.
	AllVersions    bool // Fetch all valid versions of the same key.
	InternalAccess bool // Used to allow internal access to badger keys.

	// The following option is used to narrow down the SSTables that iterator
	// picks up. If Prefix is specified, only tables which could have this
	// prefix are picked based on their range of keys.
	prefixIsKey bool   // If set, use the prefix for bloom filter lookup.
	Prefix      []byte // Only iterate over this given prefix.
	SinceTs     uint64 // Only read data that has version > SinceTs.
}

func ( *IteratorOptions) ( []byte) int {
	// We should compare key without timestamp. For example key - a[TS] might be > "aa" prefix.
	 = y.ParseKey()
	if len() > len(.Prefix) {
		 = [:len(.Prefix)]
	}
	return bytes.Compare(, .Prefix)
}

func ( *IteratorOptions) ( table.TableInterface) bool {
	// Ignore this table if its max version is less than the sinceTs.
	if .MaxVersion() < .SinceTs {
		return false
	}
	if len(.Prefix) == 0 {
		return true
	}
	if .compareToPrefix(.Smallest()) > 0 {
		return false
	}
	if .compareToPrefix(.Biggest()) < 0 {
		return false
	}
	// Bloom filter lookup would only work if opt.Prefix does NOT have the read
	// timestamp as part of the key.
	if .prefixIsKey && .DoesNotHave(y.Hash(.Prefix)) {
		return false
	}
	return true
}

// pickTables picks the necessary table for the iterator. This function also assumes
// that the tables are sorted in the right order.
func ( *IteratorOptions) ( []*table.Table) []*table.Table {
	 := func( []*table.Table) []*table.Table {
		if .SinceTs > 0 {
			 := [:0]
			for ,  := range  {
				if .MaxVersion() < .SinceTs {
					continue
				}
				 = append(, )
			}
			 = 
		}
		return 
	}

	if len(.Prefix) == 0 {
		 := make([]*table.Table, len())
		copy(, )
		return ()
	}
	 := sort.Search(len(), func( int) bool {
		// table.Biggest >= opt.prefix
		// if opt.Prefix < table.Biggest, then surely it is not in any of the preceding tables.
		return .compareToPrefix([].Biggest()) >= 0
	})
	if  == len() {
		// Not found.
		return []*table.Table{}
	}

	 := [:]
	if !.prefixIsKey {
		 := sort.Search(len(), func( int) bool {
			return .compareToPrefix([].Smallest()) > 0
		})
		 := make([]*table.Table, len([:]))
		copy(, [:])
		return ()
	}

	// opt.prefixIsKey == true. This code is optimizing for opt.prefixIsKey part.
	var  []*table.Table
	 := y.Hash(.Prefix)
	for ,  := range  {
		// When we encounter the first table whose smallest key is higher than opt.Prefix, we can
		// stop. This is an IMPORTANT optimization, just considering how often we call
		// NewKeyIterator.
		if .compareToPrefix(.Smallest()) > 0 {
			// if table.Smallest > opt.Prefix, then this and all tables after this can be ignored.
			break
		}
		// opt.Prefix is actually the key. So, we can run bloom filter checks
		// as well.
		if .DoesNotHave() {
			continue
		}
		 = append(, )
	}
	return ()
}

// DefaultIteratorOptions contains default options when iterating over Badger key-value stores.
var DefaultIteratorOptions = IteratorOptions{
	PrefetchValues: true,
	PrefetchSize:   100,
	Reverse:        false,
	AllVersions:    false,
}

// Iterator helps iterating over the KV pairs in a lexicographically sorted order.
type Iterator struct {
	iitr   y.Iterator
	txn    *Txn
	readTs uint64

	opt   IteratorOptions
	item  *Item
	data  list
	waste list

	lastKey []byte // Used to skip over multiple versions of the same key.

	closed  bool
	scanned int // Used to estimate the size of data scanned by iterator.

	// ThreadId is an optional value that can be set to identify which goroutine created
	// the iterator. It can be used, for example, to uniquely identify each of the
	// iterators created by the stream interface
	ThreadId int

	Alloc *z.Allocator
}

// NewIterator returns a new iterator. Depending upon the options, either only keys, or both
// key-value pairs would be fetched. The keys are returned in lexicographically sorted order.
// Using prefetch is recommended if you're doing a long running iteration, for performance.
//
// Multiple Iterators:
// For a read-only txn, multiple iterators can be running simultaneously. However, for a read-write
// txn, iterators have the nuance of being a snapshot of the writes for the transaction at the time
// iterator was created. If writes are performed after an iterator is created, then that iterator
// will not be able to see those writes. Only writes performed before an iterator was created can be
// viewed.
func ( *Txn) ( IteratorOptions) *Iterator {
	if .discarded {
		panic(ErrDiscardedTxn)
	}
	if .db.IsClosed() {
		panic(ErrDBClosed)
	}

	y.NumIteratorsCreatedAdd(.db.opt.MetricsEnabled, 1)

	// Keep track of the number of active iterators.
	.numIterators.Add(1)

	// TODO: If Prefix is set, only pick those memtables which have keys with the prefix.
	,  := .db.getMemTables()
	defer ()
	.db.vlog.incrIteratorCount()
	var  []y.Iterator
	if  := .newPendingWritesIterator(.Reverse);  != nil {
		 = append(, )
	}
	for  := 0;  < len(); ++ {
		 = append(, [].sl.NewUniIterator(.Reverse))
	}
	 = .db.lc.appendIterators(, &) // This will increment references.
	 := &Iterator{
		txn:    ,
		iitr:   table.NewMergeIterator(, .Reverse),
		opt:    ,
		readTs: .readTs,
	}
	return 
}

// NewKeyIterator is just like NewIterator, but allows the user to iterate over all versions of a
// single key. Internally, it sets the Prefix option in provided opt, and uses that prefix to
// additionally run bloom filter lookups before picking tables from the LSM tree.
func ( *Txn) ( []byte,  IteratorOptions) *Iterator {
	if len(.Prefix) > 0 {
		panic("opt.Prefix should be nil for NewKeyIterator.")
	}
	.Prefix =  // This key must be without the timestamp.
	.prefixIsKey = true
	.AllVersions = true
	return .NewIterator()
}

func ( *Iterator) () *Item {
	 := .waste.pop()
	if  == nil {
		 = &Item{slice: new(y.Slice), txn: .txn}
	}
	return 
}

// Item returns pointer to the current key-value pair.
// This item is only valid until it.Next() gets called.
func ( *Iterator) () *Item {
	 := .txn
	.addReadKey(.item.Key())
	return .item
}

// Valid returns false when iteration is done.
func ( *Iterator) () bool {
	if .item == nil {
		return false
	}
	if .opt.prefixIsKey {
		return bytes.Equal(.item.key, .opt.Prefix)
	}
	return bytes.HasPrefix(.item.key, .opt.Prefix)
}

// ValidForPrefix returns false when iteration is done
// or when the current key is not prefixed by the specified prefix.
func ( *Iterator) ( []byte) bool {
	return .Valid() && bytes.HasPrefix(.item.key, )
}

// Close would close the iterator. It is important to call this when you're done with iteration.
func ( *Iterator) () {
	if .closed {
		return
	}
	.closed = true
	if .iitr == nil {
		.txn.numIterators.Add(-1)
		return
	}

	.iitr.Close()
	// It is important to wait for the fill goroutines to finish. Otherwise, we might leave zombie
	// goroutines behind, which are waiting to acquire file read locks after DB has been closed.
	 := func( list) {
		 := .pop()
		for  != nil {
			.wg.Wait()
			 = .pop()
		}
	}
	(.waste)
	(.data)

	// TODO: We could handle this error.
	_ = .txn.db.vlog.decrIteratorCount()
	.txn.numIterators.Add(-1)
}

// Next would advance the iterator by one. Always check it.Valid() after a Next()
// to ensure you have access to a valid it.Item().
func ( *Iterator) () {
	if .iitr == nil {
		return
	}
	// Reuse current item
	.item.wg.Wait() // Just cleaner to wait before pushing to avoid doing ref counting.
	.scanned += len(.item.key) + len(.item.val) + len(.item.vptr) + 2
	.waste.push(.item)

	// Set next item to current
	.item = .data.pop()
	for .iitr.Valid() && hasPrefix() {
		if .parseItem() {
			// parseItem calls one extra next.
			// This is used to deal with the complexity of reverse iteration.
			break
		}
	}
}

func isDeletedOrExpired( byte,  uint64) bool {
	if &bitDelete > 0 {
		return true
	}
	if  == 0 {
		return false
	}
	return  <= uint64(time.Now().Unix())
}

// parseItem is a complex function because it needs to handle both forward and reverse iteration
// implementation. We store keys such that their versions are sorted in descending order. This makes
// forward iteration efficient, but revese iteration complicated. This tradeoff is better because
// forward iteration is more common than reverse. It returns true, if either the iterator is invalid
// or it has pushed an item into it.data list, else it returns false.
//
// This function advances the iterator.
func ( *Iterator) () bool {
	 := .iitr
	 := .Key()

	 := func( *Item) {
		if .item == nil {
			.item = 
		} else {
			.data.push()
		}
	}

	 := bytes.HasPrefix(, badgerPrefix)
	// Skip badger keys.
	if !.opt.InternalAccess &&  {
		.Next()
		return false
	}

	// Skip any versions which are beyond the readTs.
	 := y.ParseTs()
	// Ignore everything that is above the readTs and below or at the sinceTs.
	if  > .readTs || (.opt.SinceTs > 0 &&  <= .opt.SinceTs) {
		.Next()
		return false
	}

	// Skip banned keys only if it does not have badger internal prefix.
	if ! && .txn.db.isBanned() != nil {
		.Next()
		return false
	}

	if .opt.AllVersions {
		// Return deleted or expired values also, otherwise user can't figure out
		// whether the key was deleted.
		 := .newItem()
		.fill()
		()
		.Next()
		return true
	}

	// If iterating in forward direction, then just checking the last key against current key would
	// be sufficient.
	if !.opt.Reverse {
		if y.SameKey(.lastKey, ) {
			.Next()
			return false
		}
		// Only track in forward direction.
		// We should update lastKey as soon as we find a different key in our snapshot.
		// Consider keys: a 5, b 7 (del), b 5. When iterating, lastKey = a.
		// Then we see b 7, which is deleted. If we don't store lastKey = b, we'll then return b 5,
		// which is wrong. Therefore, update lastKey here.
		.lastKey = y.SafeCopy(.lastKey, .Key())
	}

:
	// If deleted, advance and return.
	 := .Value()
	if isDeletedOrExpired(.Meta, .ExpiresAt) {
		.Next()
		return false
	}

	 := .newItem()
	.fill()
	// fill item based on current cursor position. All Next calls have returned, so reaching here
	// means no Next was called.

	.Next()                           // Advance but no fill item yet.
	if !.opt.Reverse || !.Valid() { // Forward direction, or invalid.
		()
		return true
	}

	// Reverse direction.
	 := y.ParseTs(.Key())
	 := y.ParseKey(.Key())
	if  <= .readTs && bytes.Equal(, .key) {
		// This is a valid potential candidate.
		goto 
	}
	// Ignore the next candidate. Return the current one.
	()
	return true
}

func ( *Iterator) ( *Item) {
	 := .iitr.Value()
	.meta = .Meta
	.userMeta = .UserMeta
	.expiresAt = .ExpiresAt

	.version = y.ParseTs(.iitr.Key())
	.key = y.SafeCopy(.key, y.ParseKey(.iitr.Key()))

	.vptr = y.SafeCopy(.vptr, .Value)
	.val = nil
	if .opt.PrefetchValues {
		.wg.Add(1)
		go func() {
			// FIXME we are not handling errors here.
			.prefetchValue()
			.wg.Done()
		}()
	}
}

func hasPrefix( *Iterator) bool {
	// We shouldn't check prefix in case the iterator is going in reverse. Since in reverse we expect
	// people to append items to the end of prefix.
	if !.opt.Reverse && len(.opt.Prefix) > 0 {
		return bytes.HasPrefix(y.ParseKey(.iitr.Key()), .opt.Prefix)
	}
	return true
}

func ( *Iterator) () {
	 := 2
	if .opt.PrefetchValues && .opt.PrefetchSize > 1 {
		 = .opt.PrefetchSize
	}

	 := .iitr
	var  int
	.item = nil
	for .Valid() && hasPrefix() {
		if !.parseItem() {
			continue
		}
		++
		if  ==  {
			break
		}
	}
}

// Seek would seek to the provided key if present. If absent, it would seek to the next
// smallest key greater than the provided key if iterating in the forward direction.
// Behavior would be reversed if iterating backwards.
func ( *Iterator) ( []byte) {
	if .iitr == nil {
		return
	}
	if len() > 0 {
		.txn.addReadKey()
	}
	for  := .data.pop();  != nil;  = .data.pop() {
		.wg.Wait()
		.waste.push()
	}

	.lastKey = .lastKey[:0]
	if len() == 0 {
		 = .opt.Prefix
	}
	if len() == 0 {
		.iitr.Rewind()
		.prefetch()
		return
	}

	if !.opt.Reverse {
		 = y.KeyWithTs(, .txn.readTs)
	} else {
		 = y.KeyWithTs(, 0)
	}
	.iitr.Seek()
	.prefetch()
}

// Rewind would rewind the iterator cursor all the way to zero-th position, which would be the
// smallest key if iterating forward, and largest if iterating backward. It does not keep track of
// whether the cursor started with a Seek().
func ( *Iterator) () {
	.Seek(nil)
}