/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package table

import (
	
	
	
	

	
	
)

type blockIterator struct {
	data         []byte
	idx          int // Idx of the entry inside a block
	err          error
	baseKey      []byte
	key          []byte
	val          []byte
	entryOffsets []uint32
	block        *Block

	tableID uint64
	blockID int
	// prevOverlap stores the overlap of the previous key with the base key.
	// This avoids unnecessary copy of base key when the overlap is same for multiple keys.
	prevOverlap uint16
}

func ( *blockIterator) ( *Block) {
	// Decrement the ref for the old block. If the old block was compressed, we
	// might be able to reuse it.
	.block.decrRef()

	.block = 
	.err = nil
	.idx = 0
	.baseKey = .baseKey[:0]
	.prevOverlap = 0
	.key = .key[:0]
	.val = .val[:0]
	// Drop the index from the block. We don't need it anymore.
	.data = .data[:.entriesIndexStart]
	.entryOffsets = .entryOffsets
}

// setIdx sets the iterator to the entry at index i and set it's key and value.
func ( *blockIterator) ( int) {
	.idx = 
	if  >= len(.entryOffsets) ||  < 0 {
		.err = io.EOF
		return
	}
	.err = nil
	 := int(.entryOffsets[])

	// Set base key.
	if len(.baseKey) == 0 {
		var  header
		.Decode(.data)
		.baseKey = .data[headerSize : headerSize+.diff]
	}

	var  int
	// idx points to the last entry in the block.
	if .idx+1 == len(.entryOffsets) {
		 = len(.data)
	} else {
		// idx point to some entry other than the last one in the block.
		// EndOffset of the current entry is the start offset of the next entry.
		 = int(.entryOffsets[.idx+1])
	}
	defer func() {
		if  := recover();  != nil {
			var  bytes.Buffer
			fmt.Fprintf(&, "==== Recovered====\n")
			fmt.Fprintf(&, "Table ID: %d\nBlock ID: %d\nEntry Idx: %d\nData len: %d\n"+
				"StartOffset: %d\nEndOffset: %d\nEntryOffsets len: %d\nEntryOffsets: %v\n",
				.tableID, .blockID, .idx, len(.data), , ,
				len(.entryOffsets), .entryOffsets)
			panic(.String())
		}
	}()

	 := .data[:]
	var  header
	.Decode()
	// Header contains the length of key overlap and difference compared to the base key. If the key
	// before this one had the same or better key overlap, we can avoid copying that part into
	// itr.key. But, if the overlap was lesser, we could copy over just that portion.
	if .overlap > .prevOverlap {
		.key = append(.key[:.prevOverlap], .baseKey[.prevOverlap:.overlap]...)
	}
	.prevOverlap = .overlap
	 := headerSize + .diff
	 := [headerSize:]
	.key = append(.key[:.overlap], ...)
	.val = [:]
}

func ( *blockIterator) () bool {
	return  != nil && .err == nil
}

func ( *blockIterator) () error {
	return .err
}

func ( *blockIterator) () {
	.block.decrRef()
}

var (
	origin  = 0
	current = 1
)

// seek brings us to the first block element that is >= input key.
func ( *blockIterator) ( []byte,  int) {
	.err = nil
	 := 0 // This tells from which index we should start binary search.

	switch  {
	case origin:
		// We don't need to do anything. startIndex is already at 0
	case current:
		 = .idx
	}

	 := sort.Search(len(.entryOffsets), func( int) bool {
		// If idx is less than start index then just return false.
		if  <  {
			return false
		}
		.setIdx()
		return y.CompareKeys(.key, ) >= 0
	})
	.setIdx()
}

// seekToFirst brings us to the first element.
func ( *blockIterator) () {
	.setIdx(0)
}

// seekToLast brings us to the last element.
func ( *blockIterator) () {
	.setIdx(len(.entryOffsets) - 1)
}

func ( *blockIterator) () {
	.setIdx(.idx + 1)
}

func ( *blockIterator) () {
	.setIdx(.idx - 1)
}

// Iterator is an iterator for a Table.
type Iterator struct {
	t    *Table
	bpos int
	bi   blockIterator
	err  error

	// Internally, Iterator is bidirectional. However, we only expose the
	// unidirectional functionality for now.
	opt int // Valid options are REVERSED and NOCACHE.
}

// NewIterator returns a new iterator of the Table
func ( *Table) ( int) *Iterator {
	.IncrRef() // Important.
	 := &Iterator{t: , opt: }
	return 
}

// Close closes the iterator (and it must be called).
func ( *Iterator) () error {
	.bi.Close()
	return .t.DecrRef()
}

func ( *Iterator) () {
	.bpos = 0
	.err = nil
}

// Valid follows the y.Iterator interface
func ( *Iterator) () bool {
	return .err == nil
}

func ( *Iterator) () bool {
	return .opt&NOCACHE == 0
}

func ( *Iterator) () {
	 := .t.offsetsLength()
	if  == 0 {
		.err = io.EOF
		return
	}
	.bpos = 0
	,  := .t.block(.bpos, .useCache())
	if  != nil {
		.err = 
		return
	}
	.bi.tableID = .t.id
	.bi.blockID = .bpos
	.bi.setBlock()
	.bi.seekToFirst()
	.err = .bi.Error()
}

func ( *Iterator) () {
	 := .t.offsetsLength()
	if  == 0 {
		.err = io.EOF
		return
	}
	.bpos =  - 1
	,  := .t.block(.bpos, .useCache())
	if  != nil {
		.err = 
		return
	}
	.bi.tableID = .t.id
	.bi.blockID = .bpos
	.bi.setBlock()
	.bi.seekToLast()
	.err = .bi.Error()
}

func ( *Iterator) ( int,  []byte) {
	.bpos = 
	,  := .t.block(, .useCache())
	if  != nil {
		.err = 
		return
	}
	.bi.tableID = .t.id
	.bi.blockID = .bpos
	.bi.setBlock()
	.bi.seek(, origin)
	.err = .bi.Error()
}

// seekFrom brings us to a key that is >= input key.
func ( *Iterator) ( []byte,  int) {
	.err = nil
	switch  {
	case origin:
		.reset()
	case current:
	}

	var  fb.BlockOffset
	 := sort.Search(.t.offsetsLength(), func( int) bool {
		// Offsets should never return false since we're iterating within the OffsetsLength.
		y.AssertTrue(.t.offsets(&, ))
		return y.CompareKeys(.KeyBytes(), ) > 0
	})
	if  == 0 {
		// The smallest key in our table is already strictly > key. We can return that.
		// This is like a SeekToFirst.
		.seekHelper(0, )
		return
	}

	// block[idx].smallest is > key.
	// Since idx>0, we know block[idx-1].smallest is <= key.
	// There are two cases.
	// 1) Everything in block[idx-1] is strictly < key. In this case, we should go to the first
	//    element of block[idx].
	// 2) Some element in block[idx-1] is >= key. We should go to that element.
	.seekHelper(-1, )
	if .err == io.EOF {
		// Case 1. Need to visit block[idx].
		if  == .t.offsetsLength() {
			// If idx == len(itr.t.blockIndex), then input key is greater than ANY element of table.
			// There's nothing we can do. Valid() should return false as we seek to end of table.
			return
		}
		// Since block[idx].smallest is > key. This is essentially a block[idx].SeekToFirst.
		.seekHelper(, )
	}
	// Case 2: No need to do anything. We already did the seek in block[idx-1].
}

// seek will reset iterator and seek to >= key.
func ( *Iterator) ( []byte) {
	.seekFrom(, origin)
}

// seekForPrev will reset iterator and seek to <= key.
func ( *Iterator) ( []byte) {
	// TODO: Optimize this. We shouldn't have to take a Prev step.
	.seekFrom(, origin)
	if !bytes.Equal(.Key(), ) {
		.prev()
	}
}

func ( *Iterator) () {
	.err = nil

	if .bpos >= .t.offsetsLength() {
		.err = io.EOF
		return
	}

	if len(.bi.data) == 0 {
		,  := .t.block(.bpos, .useCache())
		if  != nil {
			.err = 
			return
		}
		.bi.tableID = .t.id
		.bi.blockID = .bpos
		.bi.setBlock()
		.bi.seekToFirst()
		.err = .bi.Error()
		return
	}

	.bi.next()
	if !.bi.Valid() {
		.bpos++
		.bi.data = nil
		.()
		return
	}
}

func ( *Iterator) () {
	.err = nil
	if .bpos < 0 {
		.err = io.EOF
		return
	}

	if len(.bi.data) == 0 {
		,  := .t.block(.bpos, .useCache())
		if  != nil {
			.err = 
			return
		}
		.bi.tableID = .t.id
		.bi.blockID = .bpos
		.bi.setBlock()
		.bi.seekToLast()
		.err = .bi.Error()
		return
	}

	.bi.prev()
	if !.bi.Valid() {
		.bpos--
		.bi.data = nil
		.()
		return
	}
}

// Key follows the y.Iterator interface.
// Returns the key with timestamp.
func ( *Iterator) () []byte {
	return .bi.key
}

// Value follows the y.Iterator interface
func ( *Iterator) () ( y.ValueStruct) {
	.Decode(.bi.val)
	return
}

// ValueCopy copies the current value and returns it as decoded
// ValueStruct.
func ( *Iterator) () ( y.ValueStruct) {
	 := y.Copy(.bi.val)
	.Decode()
	return
}

// Next follows the y.Iterator interface
func ( *Iterator) () {
	if .opt&REVERSED == 0 {
		.next()
	} else {
		.prev()
	}
}

// Rewind follows the y.Iterator interface
func ( *Iterator) () {
	if .opt&REVERSED == 0 {
		.seekToFirst()
	} else {
		.seekToLast()
	}
}

// Seek follows the y.Iterator interface
func ( *Iterator) ( []byte) {
	if .opt&REVERSED == 0 {
		.seek()
	} else {
		.seekForPrev()
	}
}

var (
	REVERSED int = 2
	NOCACHE  int = 4
)

// ConcatIterator concatenates the sequences defined by several iterators.  (It only works with
// TableIterators, probably just because it's faster to not be so generic.)
type ConcatIterator struct {
	idx     int // Which iterator is active now.
	cur     *Iterator
	iters   []*Iterator // Corresponds to tables.
	tables  []*Table    // Disregarding reversed, this is in ascending order.
	options int         // Valid options are REVERSED and NOCACHE.
}

// NewConcatIterator creates a new concatenated iterator
func ( []*Table,  int) *ConcatIterator {
	 := make([]*Iterator, len())
	for  := 0;  < len(); ++ {
		// Increment the reference count. Since, we're not creating the iterator right now.
		// Here, We'll hold the reference of the tables, till the lifecycle of the iterator.
		[].IncrRef()

		// Save cycles by not initializing the iterators until needed.
		// iters[i] = tbls[i].NewIterator(reversed)
	}
	return &ConcatIterator{
		options: ,
		iters:   ,
		tables:  ,
		idx:     -1, // Not really necessary because s.it.Valid()=false, but good to have.
	}
}

func ( *ConcatIterator) ( int) {
	.idx = 
	if  < 0 ||  >= len(.iters) {
		.cur = nil
		return
	}
	if .iters[] == nil {
		.iters[] = .tables[].NewIterator(.options)
	}
	.cur = .iters[.idx]
}

// Rewind implements y.Interface
func ( *ConcatIterator) () {
	if len(.iters) == 0 {
		return
	}
	if .options&REVERSED == 0 {
		.setIdx(0)
	} else {
		.setIdx(len(.iters) - 1)
	}
	.cur.Rewind()
}

// Valid implements y.Interface
func ( *ConcatIterator) () bool {
	return .cur != nil && .cur.Valid()
}

// Key implements y.Interface
func ( *ConcatIterator) () []byte {
	return .cur.Key()
}

// Value implements y.Interface
func ( *ConcatIterator) () y.ValueStruct {
	return .cur.Value()
}

// Seek brings us to element >= key if reversed is false. Otherwise, <= key.
func ( *ConcatIterator) ( []byte) {
	var  int
	if .options&REVERSED == 0 {
		 = sort.Search(len(.tables), func( int) bool {
			return y.CompareKeys(.tables[].Biggest(), ) >= 0
		})
	} else {
		 := len(.tables)
		 =  - 1 - sort.Search(, func( int) bool {
			return y.CompareKeys(.tables[-1-].Smallest(), ) <= 0
		})
	}
	if  >= len(.tables) ||  < 0 {
		.setIdx(-1)
		return
	}
	// For reversed=false, we know s.tables[i-1].Biggest() < key. Thus, the
	// previous table cannot possibly contain key.
	.setIdx()
	.cur.Seek()
}

// Next advances our concat iterator.
func ( *ConcatIterator) () {
	.cur.Next()
	if .cur.Valid() {
		// Nothing to do. Just stay with the current table.
		return
	}
	for { // In case there are empty tables.
		if .options&REVERSED == 0 {
			.setIdx(.idx + 1)
		} else {
			.setIdx(.idx - 1)
		}
		if .cur == nil {
			// End of list. Valid will become false.
			return
		}
		.cur.Rewind()
		if .cur.Valid() {
			break
		}
	}
}

// Close implements y.Interface.
func ( *ConcatIterator) () error {
	for ,  := range .tables {
		// DeReference the tables while closing the iterator.
		if  := .DecrRef();  != nil {
			return 
		}
	}
	for ,  := range .iters {
		if  == nil {
			continue
		}
		if  := .Close();  != nil {
			return y.Wrap(, "ConcatIterator")
		}
	}
	return nil
}