/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

/*
Adapted from RocksDB inline skiplist.

Key differences:
- No optimization for sequential inserts (no "prev").
- No custom comparator.
- Support overwrites. This requires care when we see the same key when inserting.
  For RocksDB or LevelDB, overwrites are implemented as a newer sequence number in the key, so
	there is no need for values. We don't intend to support versioning. In-place updates of values
	would be more efficient.
- We discard all non-concurrent code.
- We do not support Splices. This simplifies the code a lot.
- No AllocateNode or other pointer arithmetic.
- We combine the findLessThan, findGreaterOrEqual, etc into one function.
*/

package skl

import (
	
	
	

	
	
)

const (
	maxHeight      = 20
	heightIncrease = math.MaxUint32 / 3
)

// MaxNodeSize is the memory footprint of a node of maximum height.
const MaxNodeSize = int(unsafe.Sizeof(node{}))

type node struct {
	// Multiple parts of the value are encoded as a single uint64 so that it
	// can be atomically loaded and stored:
	//   value offset: uint32 (bits 0-31)
	//   value size  : uint32 (bits 32-63)
	value atomic.Uint64

	// A byte slice is 24 bytes. We are trying to save space here.
	keyOffset uint32 // Immutable. No need to lock to access key.
	keySize   uint16 // Immutable. No need to lock to access key.

	// Height of the tower.
	height uint16

	// Most nodes do not need to use the full height of the tower, since the
	// probability of each successive level decreases exponentially. Because
	// these elements are never accessed, they do not need to be allocated.
	// Therefore, when a node is allocated in the arena, its memory footprint
	// is deliberately truncated to not include unneeded tower elements.
	//
	// All accesses to elements should use CAS operations, with no need to lock.
	tower [maxHeight]atomic.Uint32
}

type Skiplist struct {
	height  atomic.Int32 // Current height. 1 <= height <= kMaxHeight. CAS.
	head    *node
	ref     atomic.Int32
	arena   *Arena
	OnClose func()
}

// IncrRef increases the refcount
func ( *Skiplist) () {
	.ref.Add(1)
}

// DecrRef decrements the refcount, deallocating the Skiplist when done using it
func ( *Skiplist) () {
	 := .ref.Add(-1)
	if  > 0 {
		return
	}
	if .OnClose != nil {
		.OnClose()
	}

	// Indicate we are closed. Good for testing.  Also, lets GC reclaim memory. Race condition
	// here would suggest we are accessing skiplist when we are supposed to have no reference!
	.arena = nil
	// Since the head references the arena's buf, as long as the head is kept around
	// GC can't release the buf.
	.head = nil
}

func newNode( *Arena,  []byte,  y.ValueStruct,  int) *node {
	// The base level is already allocated in the node struct.
	 := .putNode()
	 := .getNode()
	.keyOffset = .putKey()
	.keySize = uint16(len())
	.height = uint16()
	.value.Store(encodeValue(.putVal(), .EncodedSize()))
	return 
}

func encodeValue( uint32,  uint32) uint64 {
	return uint64()<<32 | uint64()
}

func decodeValue( uint64) ( uint32,  uint32) {
	 = uint32()
	 = uint32( >> 32)
	return
}

// NewSkiplist makes a new empty skiplist, with a given arena size
func ( int64) *Skiplist {
	 := newArena()
	 := newNode(, nil, y.ValueStruct{}, maxHeight)
	 := &Skiplist{head: , arena: }
	.height.Store(1)
	.ref.Store(1)
	return 
}

func ( *node) () (uint32, uint32) {
	 := .value.Load()
	return decodeValue()
}

func ( *node) ( *Arena) []byte {
	return .getKey(.keyOffset, .keySize)
}

func ( *node) ( *Arena,  y.ValueStruct) {
	 := .putVal()
	 := encodeValue(, .EncodedSize())
	.value.Store()
}

func ( *node) ( int) uint32 {
	return .tower[].Load()
}

func ( *node) ( int, ,  uint32) bool {
	return .tower[].CompareAndSwap(, )
}

// Returns true if key is strictly > n.key.
// If n is nil, this is an "end" marker and we return false.
//func (s *Skiplist) keyIsAfterNode(key []byte, n *node) bool {
//	y.AssertTrue(n != s.head)
//	return n != nil && y.CompareKeys(key, n.key) > 0
//}

func ( *Skiplist) () int {
	 := 1
	for  < maxHeight && z.FastRand() <= heightIncrease {
		++
	}
	return 
}

func ( *Skiplist) ( *node,  int) *node {
	return .arena.getNode(.getNextOffset())
}

// findNear finds the node near to key.
// If less=true, it finds rightmost node such that node.key < key (if allowEqual=false) or
// node.key <= key (if allowEqual=true).
// If less=false, it finds leftmost node such that node.key > key (if allowEqual=false) or
// node.key >= key (if allowEqual=true).
// Returns the node found. The bool returned is true if the node has key equal to given key.
func ( *Skiplist) ( []byte,  bool,  bool) (*node, bool) {
	 := .head
	 := int(.getHeight() - 1)
	for {
		// Assume x.key < key.
		 := .getNext(, )
		if  == nil {
			// x.key < key < END OF LIST
			if  > 0 {
				// Can descend further to iterate closer to the end.
				--
				continue
			}
			// Level=0. Cannot descend further. Let's return something that makes sense.
			if ! {
				return nil, false
			}
			// Try to return x. Make sure it is not a head node.
			if  == .head {
				return nil, false
			}
			return , false
		}

		 := .key(.arena)
		 := y.CompareKeys(, )
		if  > 0 {
			// x.key < next.key < key. We can continue to move right.
			 = 
			continue
		}
		if  == 0 {
			// x.key < key == next.key.
			if  {
				return , true
			}
			if ! {
				// We want >, so go to base level to grab the next bigger note.
				return .getNext(, 0), false
			}
			// We want <. If not base level, we should go closer in the next level.
			if  > 0 {
				--
				continue
			}
			// On base level. Return x.
			if  == .head {
				return nil, false
			}
			return , false
		}
		// cmp < 0. In other words, x.key < key < next.
		if  > 0 {
			--
			continue
		}
		// At base level. Need to return something.
		if ! {
			return , false
		}
		// Try to return x. Make sure it is not a head node.
		if  == .head {
			return nil, false
		}
		return , false
	}
}

// findSpliceForLevel returns (outBefore, outAfter) with outBefore.key <= key <= outAfter.key.
// The input "before" tells us where to start looking.
// If we found a node with the same key, then we return outBefore = outAfter.
// Otherwise, outBefore.key < key < outAfter.key.
func ( *Skiplist) ( []byte,  *node,  int) (*node, *node) {
	for {
		// Assume before.key < key.
		 := .getNext(, )
		if  == nil {
			return , 
		}
		 := .key(.arena)
		 := y.CompareKeys(, )
		if  == 0 {
			// Equality case.
			return , 
		}
		if  < 0 {
			// before.key < key < next.key. We are done for this level.
			return , 
		}
		 =  // Keep moving right on this level.
	}
}

func ( *Skiplist) () int32 {
	return .height.Load()
}

// Put inserts the key-value pair.
func ( *Skiplist) ( []byte,  y.ValueStruct) {
	// Since we allow overwrite, we may not need to create a new node. We might not even need to
	// increase the height. Let's defer these actions.

	 := .getHeight()
	var  [maxHeight + 1]*node
	var  [maxHeight + 1]*node
	[] = .head
	[] = nil
	for  := int() - 1;  >= 0; -- {
		// Use higher level to speed up for current level.
		[], [] = .findSpliceForLevel(, [+1], )
		if [] == [] {
			[].setValue(.arena, )
			return
		}
	}

	// We do need to create a new node.
	 := .randomHeight()
	 := newNode(.arena, , , )

	// Try to increase s.height via CAS.
	 = .getHeight()
	for  > int() {
		if .height.CompareAndSwap(, int32()) {
			// Successfully increased skiplist.height.
			break
		}
		 = .getHeight()
	}

	// We always insert from the base level and up. After you add a node in base level, we cannot
	// create a node in the level above because it would have discovered the node in the base level.
	for  := 0;  < ; ++ {
		for {
			if [] == nil {
				y.AssertTrue( > 1) // This cannot happen in base level.
				// We haven't computed prev, next for this level because height exceeds old listHeight.
				// For these levels, we expect the lists to be sparse, so we can just search from head.
				[], [] = .findSpliceForLevel(, .head, )
				// Someone adds the exact same key before we are able to do so. This can only happen on
				// the base level. But we know we are not on the base level.
				y.AssertTrue([] != [])
			}
			 := .arena.getNodeOffset([])
			.tower[].Store()
			if [].casNextOffset(, , .arena.getNodeOffset()) {
				// Managed to insert x between prev[i] and next[i]. Go to the next level.
				break
			}
			// CAS failed. We need to recompute prev and next.
			// It is unlikely to be helpful to try to use a different level as we redo the search,
			// because it is unlikely that lots of nodes are inserted between prev[i] and next[i].
			[], [] = .findSpliceForLevel(, [], )
			if [] == [] {
				y.AssertTruef( == 0, "Equality can happen only on base level: %d", )
				[].setValue(.arena, )
				return
			}
		}
	}
}

// Empty returns if the Skiplist is empty.
func ( *Skiplist) () bool {
	return .findLast() == nil
}

// findLast returns the last element. If head (empty list), we return nil. All the find functions
// will NEVER return the head nodes.
func ( *Skiplist) () *node {
	 := .head
	 := int(.getHeight()) - 1
	for {
		 := .getNext(, )
		if  != nil {
			 = 
			continue
		}
		if  == 0 {
			if  == .head {
				return nil
			}
			return 
		}
		--
	}
}

// Get gets the value associated with the key. It returns a valid value if it finds equal or earlier
// version of the same key.
func ( *Skiplist) ( []byte) y.ValueStruct {
	,  := .findNear(, false, true) // findGreaterOrEqual.
	if  == nil {
		return y.ValueStruct{}
	}

	 := .arena.getKey(.keyOffset, .keySize)
	if !y.SameKey(, ) {
		return y.ValueStruct{}
	}

	,  := .getValueOffset()
	 := .arena.getVal(, )
	.Version = y.ParseTs()
	return 
}

// NewIterator returns a skiplist iterator.  You have to Close() the iterator.
func ( *Skiplist) () *Iterator {
	.IncrRef()
	return &Iterator{list: }
}

// MemSize returns the size of the Skiplist in terms of how much memory is used within its internal
// arena.
func ( *Skiplist) () int64 { return .arena.size() }

// Iterator is an iterator over skiplist object. For new objects, you just
// need to initialize Iterator.list.
type Iterator struct {
	list *Skiplist
	n    *node
}

// Close frees the resources held by the iterator
func ( *Iterator) () error {
	.list.DecrRef()
	return nil
}

// Valid returns true iff the iterator is positioned at a valid node.
func ( *Iterator) () bool { return .n != nil }

// Key returns the key at the current position.
func ( *Iterator) () []byte {
	return .list.arena.getKey(.n.keyOffset, .n.keySize)
}

// Value returns value.
func ( *Iterator) () y.ValueStruct {
	,  := .n.getValueOffset()
	return .list.arena.getVal(, )
}

// ValueUint64 returns the uint64 value of the current node.
func ( *Iterator) () uint64 {
	return .n.value.Load()
}

// Next advances to the next position.
func ( *Iterator) () {
	y.AssertTrue(.Valid())
	.n = .list.getNext(.n, 0)
}

// Prev advances to the previous position.
func ( *Iterator) () {
	y.AssertTrue(.Valid())
	.n, _ = .list.findNear(.Key(), true, false) // find <. No equality allowed.
}

// Seek advances to the first entry with a key >= target.
func ( *Iterator) ( []byte) {
	.n, _ = .list.findNear(, false, true) // find >=.
}

// SeekForPrev finds an entry with key <= target.
func ( *Iterator) ( []byte) {
	.n, _ = .list.findNear(, true, true) // find <=.
}

// SeekToFirst seeks position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
func ( *Iterator) () {
	.n = .list.getNext(.list.head, 0)
}

// SeekToLast seeks position at the last entry in list.
// Final state of iterator is Valid() iff list is not empty.
func ( *Iterator) () {
	.n = .list.findLast()
}

// UniIterator is a unidirectional memtable iterator. It is a thin wrapper around
// Iterator. We like to keep Iterator as before, because it is more powerful and
// we might support bidirectional iterators in the future.
type UniIterator struct {
	iter     *Iterator
	reversed bool
}

// NewUniIterator returns a UniIterator.
func ( *Skiplist) ( bool) *UniIterator {
	return &UniIterator{
		iter:     .NewIterator(),
		reversed: ,
	}
}

// Next implements y.Interface
func ( *UniIterator) () {
	if !.reversed {
		.iter.Next()
	} else {
		.iter.Prev()
	}
}

// Rewind implements y.Interface
func ( *UniIterator) () {
	if !.reversed {
		.iter.SeekToFirst()
	} else {
		.iter.SeekToLast()
	}
}

// Seek implements y.Interface
func ( *UniIterator) ( []byte) {
	if !.reversed {
		.iter.Seek()
	} else {
		.iter.SeekForPrev()
	}
}

// Key implements y.Interface
func ( *UniIterator) () []byte { return .iter.Key() }

// Value implements y.Interface
func ( *UniIterator) () y.ValueStruct { return .iter.Value() }

// Valid implements y.Interface
func ( *UniIterator) () bool { return .iter.Valid() }

// Close implements y.Interface (and frees up the iter's resources)
func ( *UniIterator) () error { return .iter.Close() }