/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package table

import (
	
	
	
	
	
	
	
	
	
	
	
	
	
	

	
	
	

	
	
	
	
	
	
)

const fileSuffix = ".sst"
const intSize = int(unsafe.Sizeof(int(0)))

// Options contains configurable options for Table/Builder.
type Options struct {
	// Options for Opening/Building Table.

	// Open tables in read only mode.
	ReadOnly       bool
	MetricsEnabled bool

	// Maximum size of the table.
	TableSize     uint64
	tableCapacity uint64 // 0.9x TableSize.

	// ChkMode is the checksum verification mode for Table.
	ChkMode options.ChecksumVerificationMode

	// Options for Table builder.

	// BloomFalsePositive is the false positive probabiltiy of bloom filter.
	BloomFalsePositive float64

	// BlockSize is the size of each block inside SSTable in bytes.
	BlockSize int

	// DataKey is the key used to decrypt the encrypted text.
	DataKey *pb.DataKey

	// Compression indicates the compression algorithm used for block compression.
	Compression options.CompressionType

	// Block cache is used to cache decompressed and decrypted blocks.
	BlockCache *ristretto.Cache[[]byte, *Block]
	IndexCache *ristretto.Cache[uint64, *fb.TableIndex]

	AllocPool *z.AllocatorPool

	// ZSTDCompressionLevel is the ZSTD compression level used for compressing blocks.
	ZSTDCompressionLevel int
}

// TableInterface is useful for testing.
type TableInterface interface {
	Smallest() []byte
	Biggest() []byte
	DoesNotHave(hash uint32) bool
	MaxVersion() uint64
}

// Table represents a loaded table file with the info we have about it.
type Table struct {
	sync.Mutex
	*z.MmapFile

	tableSize int // Initialized in OpenTable, using fd.Stat().

	_index *fb.TableIndex // Nil if encryption is enabled. Use fetchIndex to access.
	_cheap *cheapIndex
	ref    atomic.Int32 // For file garbage collection

	// The following are initialized once and const.
	smallest, biggest []byte // Smallest and largest keys (with timestamps).
	id                uint64 // file id, part of filename

	Checksum       []byte
	CreatedAt      time.Time
	indexStart     int
	indexLen       int
	hasBloomFilter bool

	IsInmemory bool // Set to true if the table is on level 0 and opened in memory.
	opt        *Options
}

type cheapIndex struct {
	MaxVersion        uint64
	KeyCount          uint32
	UncompressedSize  uint32
	OnDiskSize        uint32
	BloomFilterLength int
	OffsetsLength     int
}

func ( *Table) () *cheapIndex {
	return ._cheap
}
func ( *Table) () int { return .cheapIndex().OffsetsLength }

// MaxVersion returns the maximum version across all keys stored in this table.
func ( *Table) () uint64 { return .cheapIndex().MaxVersion }

// BloomFilterSize returns the size of the bloom filter in bytes stored in memory.
func ( *Table) () int { return .cheapIndex().BloomFilterLength }

// UncompressedSize is the size uncompressed data stored in this file.
func ( *Table) () uint32 { return .cheapIndex().UncompressedSize }

// KeyCount is the total number of keys in this table.
func ( *Table) () uint32 { return .cheapIndex().KeyCount }

// OnDiskSize returns the total size of key-values stored in this table (including the
// disk space occupied on the value log).
func ( *Table) () uint32 { return .cheapIndex().OnDiskSize }

// CompressionType returns the compression algorithm used for block compression.
func ( *Table) () options.CompressionType {
	return .opt.Compression
}

// IncrRef increments the refcount (having to do with whether the file should be deleted)
func ( *Table) () {
	.ref.Add(1)
}

// DecrRef decrements the refcount and possibly deletes the table
func ( *Table) () error {
	 := .ref.Add(-1)
	if  == 0 {
		// We can safely delete this file, because for all the current files, we always have
		// at least one reference pointing to them.

		// Delete all blocks from the cache.
		for  := 0;  < .offsetsLength(); ++ {
			.opt.BlockCache.Del(.blockCacheKey())
		}
		if  := .Delete();  != nil {
			return 
		}
	}
	return nil
}

// BlockEvictHandler is used to reuse the byte slice stored in the block on cache eviction.
func ( *Block) {
	.decrRef()
}

type Block struct {
	offset            int
	data              []byte
	checksum          []byte
	entriesIndexStart int      // start index of entryOffsets list
	entryOffsets      []uint32 // used to binary search an entry in the block.
	chkLen            int      // checksum length.
	freeMe            bool     // used to determine if the blocked should be reused.
	ref               atomic.Int32
}

var NumBlocks atomic.Int32

// incrRef increments the ref of a block and return a bool indicating if the
// increment was successful. A true value indicates that the block can be used.
func ( *Block) () bool {
	for {
		// We can't blindly add 1 to ref. We need to check whether it has
		// reached zero first, because if it did, then we should absolutely not
		// use this block.
		 := .ref.Load()
		// The ref would not be equal to 0 unless the existing
		// block get evicted before this line. If the ref is zero, it means that
		// the block is already added the the blockPool and cannot be used
		// anymore. The ref of a new block is 1 so the following condition will
		// be true only if the block got reused before we could increment its
		// ref.
		if  == 0 {
			return false
		}
		// Increment the ref only if it is not zero and has not changed between
		// the time we read it and we're updating it.
		//
		if .ref.CompareAndSwap(, +1) {
			return true
		}
	}
}
func ( *Block) () {
	if  == nil {
		return
	}

	// Insert the []byte into pool only if the block is reusable. When a block
	// is reusable a new []byte is used for decompression and this []byte can
	// be reused.
	// In case of an uncompressed block, the []byte is a reference to the
	// table.mmap []byte slice. Any attempt to write data to the mmap []byte
	// will lead to SEGFAULT.
	if .ref.Add(-1) == 0 {
		if .freeMe {
			z.Free(.data)
		}
		NumBlocks.Add(-1)
		// blockPool.Put(&b.data)
	}
	y.AssertTrue(.ref.Load() >= 0)
}
func ( *Block) () int64 {
	return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
		cap(.data) + cap(.checksum) + cap(.entryOffsets)*4)
}

func ( *Block) () error {
	 := &pb.Checksum{}
	if  := proto.Unmarshal(.checksum, );  != nil {
		return y.Wrapf(, "unable to unmarshal checksum for block")
	}
	return y.VerifyChecksum(.data, )
}

func ( string,  *Builder) (*Table, error) {
	 := .Done()
	,  := z.OpenMmapFile(, os.O_CREATE|os.O_RDWR|os.O_EXCL, .Size)
	if  == z.NewFile {
		// Expected.
	} else if  != nil {
		return nil, y.Wrapf(, "while creating table: %s", )
	} else {
		return nil, fmt.Errorf("file already exists: %s", )
	}

	 := .Copy(.Data)
	y.AssertTrue( == len(.Data))
	if  := z.Msync(.Data);  != nil {
		return nil, y.Wrapf(, "while calling msync on %s", )
	}
	return OpenTable(, *.opts)
}

// OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function
// entry. Returns a table with one reference count on it (decrementing which may delete the file!
// -- consider t.Close() instead). The fd has to writeable because we call Truncate on it before
// deleting. Checksum for all blocks of table is verified based on value of chkMode.
func ( *z.MmapFile,  Options) (*Table, error) {
	// BlockSize is used to compute the approximate size of the decompressed
	// block. It should not be zero if the table is compressed.
	if .BlockSize == 0 && .Compression != options.None {
		return nil, errors.New("Block size cannot be zero")
	}
	,  := .Fd.Stat()
	if  != nil {
		.Close(-1)
		return nil, y.Wrap(, "")
	}

	 := .Name()
	,  := ParseFileID()
	if ! {
		.Close(-1)
		return nil, fmt.Errorf("Invalid filename: %s", )
	}
	 := &Table{
		MmapFile:   ,
		id:         ,
		opt:        &,
		IsInmemory: false,
		tableSize:  int(.Size()),
		CreatedAt:  .ModTime(),
	}
	// Caller is given one reference.
	.ref.Store(1)

	if  := .initBiggestAndSmallest();  != nil {
		return nil, y.Wrapf(, "failed to initialize table")
	}

	if .ChkMode == options.OnTableRead || .ChkMode == options.OnTableAndBlockRead {
		if  := .VerifyChecksum();  != nil {
			.Close(-1)
			return nil, y.Wrapf(, "failed to verify checksum")
		}
	}

	return , nil
}

// OpenInMemoryTable is similar to OpenTable but it opens a new table from the provided data.
// OpenInMemoryTable is used for L0 tables.
func ( []byte,  uint64,  *Options) (*Table, error) {
	 := &z.MmapFile{
		Data: ,
		Fd:   nil,
	}
	 := &Table{
		MmapFile:   ,
		opt:        ,
		tableSize:  len(),
		IsInmemory: true,
		id:         , // It is important that each table gets a unique ID.
	}
	// Caller is given one reference.
	.ref.Store(1)

	if  := .initBiggestAndSmallest();  != nil {
		return nil, 
	}
	return , nil
}

func ( *Table) () error {
	// This defer will help gathering debugging info in case initIndex crashes.
	defer func() {
		if  := recover();  != nil {
			// Use defer for printing info because there may be an intermediate panic.
			var  bytes.Buffer
			defer func() {
				panic(fmt.Sprintf("%s\n== Recovered ==\n", .String()))
			}()

			// Get the count of null bytes at the end of file. This is to make sure if there was an
			// issue with mmap sync or file copy.
			 := 0
			for  := len(.Data) - 1;  >= 0; -- {
				if .Data[] != 0 {
					break
				}
				++
			}

			fmt.Fprintf(&, "\n== Recovering from initIndex crash ==\n")
			fmt.Fprintf(&, "File Info: [ID: %d, Size: %d, Zeros: %d]\n",
				.id, .tableSize, )

			fmt.Fprintf(&, "isEnrypted: %v ", .shouldDecrypt())

			 := .tableSize

			// Read checksum size.
			 -= 4
			 := .readNoFail(, 4)
			 := int(y.BytesToU32())
			fmt.Fprintf(&, "checksumLen: %d ", )

			// Read checksum.
			 := &pb.Checksum{}
			 -= 
			 = .readNoFail(, )
			_ = proto.Unmarshal(, )
			fmt.Fprintf(&, "checksum: %+v ", )

			// Read index size from the footer.
			 -= 4
			 = .readNoFail(, 4)
			 := int(y.BytesToU32())
			fmt.Fprintf(&, "indexLen: %d ", )

			// Read index.
			 -= .indexLen
			.indexStart = 
			 := .readNoFail(, .indexLen)
			fmt.Fprintf(&, "index: %v ", )
		}
	}()

	var  error
	var  *fb.BlockOffset
	if ,  = .initIndex();  != nil {
		return y.Wrapf(, "failed to read index.")
	}

	.smallest = y.Copy(.KeyBytes())

	 := .NewIterator(REVERSED | NOCACHE)
	defer .Close()
	.Rewind()
	if !.Valid() {
		return y.Wrapf(.err, "failed to initialize biggest for table %s", .Filename())
	}
	.biggest = y.Copy(.Key())
	return nil
}

func ( *Table) (,  int) ([]byte, error) {
	return .Bytes(, )
}

func ( *Table) (,  int) []byte {
	,  := .read(, )
	y.Check()
	return 
}

// initIndex reads the index and populate the necessary table fields and returns
// first block offset
func ( *Table) () (*fb.BlockOffset, error) {
	 := .tableSize

	// Read checksum len from the last 4 bytes.
	 -= 4
	 := .readNoFail(, 4)
	 := int(y.BytesToU32())
	if  < 0 {
		return nil, errors.New("checksum length less than zero. Data corrupted")
	}

	// Read checksum.
	 := &pb.Checksum{}
	 -= 
	 = .readNoFail(, )
	if  := proto.Unmarshal(, );  != nil {
		return nil, 
	}

	// Read index size from the footer.
	 -= 4
	 = .readNoFail(, 4)
	.indexLen = int(y.BytesToU32())

	// Read index.
	 -= .indexLen
	.indexStart = 
	 := .readNoFail(, .indexLen)

	if  := y.VerifyChecksum(, );  != nil {
		return nil, y.Wrapf(, "failed to verify checksum for table: %s", .Filename())
	}

	,  := .readTableIndex()
	if  != nil {
		return nil, 
	}
	if !.shouldDecrypt() {
		// If there's no encryption, this points to the mmap'ed buffer.
		._index = 
	}
	._cheap = &cheapIndex{
		MaxVersion:        .MaxVersion(),
		KeyCount:          .KeyCount(),
		UncompressedSize:  .UncompressedSize(),
		OnDiskSize:        .OnDiskSize(),
		OffsetsLength:     .OffsetsLength(),
		BloomFilterLength: .BloomFilterLength(),
	}

	.hasBloomFilter = len(.BloomFilterBytes()) > 0

	var  fb.BlockOffset
	y.AssertTrue(.Offsets(&, 0))
	return &, nil
}

// KeySplits splits the table into at least n ranges based on the block offsets.
func ( *Table) ( int,  []byte) []string {
	if  == 0 {
		return nil
	}

	 := .offsetsLength()
	 :=  / 
	if  == 0 {
		 = 1
	}

	var  fb.BlockOffset
	var  []string
	for  := 0;  < ;  +=  {
		if  >=  {
			 =  - 1
		}
		y.AssertTrue(.offsets(&, ))
		if bytes.HasPrefix(.KeyBytes(), ) {
			 = append(, string(.KeyBytes()))
		}
	}
	return 
}

func ( *Table) () *fb.TableIndex {
	if !.shouldDecrypt() {
		return ._index
	}

	if .opt.IndexCache == nil {
		panic("Index Cache must be set for encrypted workloads")
	}
	if ,  := .opt.IndexCache.Get(.indexKey());  &&  != nil {
		return 
	}

	,  := .readTableIndex()
	y.Check()
	.opt.IndexCache.Set(.indexKey(), , int64(.indexLen))
	return 
}

func ( *Table) ( *fb.BlockOffset,  int) bool {
	return .fetchIndex().Offsets(, )
}

// block function return a new block. Each block holds a ref and the byte
// slice stored in the block will be reused when the ref becomes zero. The
// caller should release the block by calling block.decrRef() on it.
func ( *Table) ( int,  bool) (*Block, error) {
	y.AssertTruef( >= 0, "idx=%d", )
	if  >= .offsetsLength() {
		return nil, errors.New("block out of index")
	}
	if .opt.BlockCache != nil {
		 := .blockCacheKey()
		,  := .opt.BlockCache.Get()
		if  &&  != nil {
			// Use the block only if the increment was successful. The block
			// could get evicted from the cache between the Get() call and the
			// incrRef() call.
			if .incrRef() {
				return , nil
			}
		}
	}

	var  fb.BlockOffset
	y.AssertTrue(.offsets(&, ))
	 := &Block{offset: int(.Offset())}
	.ref.Store(1)
	defer .decrRef() // Deal with any errors, where blk would not be returned.
	NumBlocks.Add(1)

	var  error
	if .data,  = .read(.offset, int(.Len()));  != nil {
		return nil, y.Wrapf(,
			"failed to read from file: %s at offset: %d, len: %d",
			.Fd.Name(), .offset, .Len())
	}

	if .shouldDecrypt() {
		// Decrypt the block if it is encrypted.
		if .data,  = .decrypt(.data, true);  != nil {
			return nil, 
		}
		// blk.data is allocated via Calloc. So, do free.
		.freeMe = true
	}

	if  = .decompress();  != nil {
		return nil, y.Wrapf(,
			"failed to decode compressed data in file: %s at offset: %d, len: %d",
			.Fd.Name(), .offset, .Len())
	}

	// Read meta data related to block.
	 := len(.data) - 4 // First read checksum length.
	.chkLen = int(y.BytesToU32(.data[ : +4]))

	// Checksum length greater than block size could happen if the table was compressed and
	// it was opened with an incorrect compression algorithm (or the data was corrupted).
	if .chkLen > len(.data) {
		return nil, errors.New("invalid checksum length. Either the data is " +
			"corrupted or the table options are incorrectly set")
	}

	// Read checksum and store it
	 -= .chkLen
	.checksum = .data[ : +.chkLen]
	// Move back and read numEntries in the block.
	 -= 4
	 := int(y.BytesToU32(.data[ : +4]))
	 :=  - ( * 4)
	 :=  + *4

	.entryOffsets = y.BytesToU32Slice(.data[:])

	.entriesIndexStart = 

	// Drop checksum and checksum length.
	// The checksum is calculated for actual data + entry index + index length
	.data = .data[:+4]

	// Verify checksum on if checksum verification mode is OnRead on OnStartAndRead.
	if .opt.ChkMode == options.OnBlockRead || .opt.ChkMode == options.OnTableAndBlockRead {
		if  = .verifyCheckSum();  != nil {
			return nil, 
		}
	}

	.incrRef()
	if  && .opt.BlockCache != nil {
		 := .blockCacheKey()
		// incrRef should never return false here because we're calling it on a
		// new block with ref=1.
		y.AssertTrue(.incrRef())

		// Decrement the block ref if we could not insert it in the cache.
		if !.opt.BlockCache.Set(, , .size()) {
			.decrRef()
		}
		// We have added an OnReject func in our cache, which gets called in case the block is not
		// admitted to the cache. So, every block would be accounted for.
	}
	return , nil
}

// blockCacheKey is used to store blocks in the block cache.
func ( *Table) ( int) []byte {
	y.AssertTrue(.id < math.MaxUint32)
	y.AssertTrue(uint32() < math.MaxUint32)

	 := make([]byte, 8)
	// Assume t.ID does not overflow uint32.
	binary.BigEndian.PutUint32([:4], uint32(.ID()))
	binary.BigEndian.PutUint32([4:], uint32())
	return 
}

// indexKey returns the cache key for block offsets. blockOffsets
// are stored in the index cache.
func ( *Table) () uint64 {
	return .id
}

// IndexSize is the size of table index in bytes.
func ( *Table) () int {
	return .indexLen
}

// Size is its file size in bytes
func ( *Table) () int64 { return int64(.tableSize) }

// StaleDataSize is the amount of stale data (that can be dropped by a compaction )in this SST.
func ( *Table) () uint32 { return .fetchIndex().StaleDataSize() }

// Smallest is its smallest key, or nil if there are none
func ( *Table) () []byte { return .smallest }

// Biggest is its biggest key, or nil if there are none
func ( *Table) () []byte { return .biggest }

// Filename is NOT the file name.  Just kidding, it is.
func ( *Table) () string { return .Fd.Name() }

// ID is the table's ID number (used to make the file name).
func ( *Table) () uint64 { return .id }

// DoesNotHave returns true if and only if the table does not have the key hash.
// It does a bloom filter lookup.
func ( *Table) ( uint32) bool {
	if !.hasBloomFilter {
		return false
	}

	y.NumLSMBloomHitsAdd(.opt.MetricsEnabled, "DoesNotHave_ALL", 1)
	 := .fetchIndex()
	 := .BloomFilterBytes()
	 := y.Filter().MayContain()
	if ! {
		y.NumLSMBloomHitsAdd(.opt.MetricsEnabled, "DoesNotHave_HIT", 1)
	}
	return !
}

// readTableIndex reads table index from the sst and returns its pb format.
func ( *Table) () (*fb.TableIndex, error) {
	 := .readNoFail(.indexStart, .indexLen)
	var  error
	// Decrypt the table index if it is encrypted.
	if .shouldDecrypt() {
		if ,  = .decrypt(, false);  != nil {
			return nil, y.Wrapf(,
				"Error while decrypting table index for the table %d in readTableIndex", .id)
		}
	}
	return fb.GetRootAsTableIndex(, 0), nil
}

// VerifyChecksum verifies checksum for all blocks of table. This function is called by
// OpenTable() function. This function is also called inside levelsController.VerifyChecksum().
func ( *Table) () error {
	 := .fetchIndex()
	for  := 0;  < .OffsetsLength(); ++ {
		,  := .block(, true)
		if  != nil {
			return y.Wrapf(, "checksum validation failed for table: %s, block: %d, offset:%d",
				.Filename(), , .offset)
		}
		// We should not call incrRef here, because the block already has one ref when created.
		defer .decrRef()
		// OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum
		// on block, verification would be done while reading block itself.
		if !(.opt.ChkMode == options.OnBlockRead || .opt.ChkMode == options.OnTableAndBlockRead) {
			if  = .verifyCheckSum();  != nil {
				return y.Wrapf(,
					"checksum validation failed for table: %s, block: %d, offset:%d",
					.Filename(), , .offset)
			}
		}
	}
	return nil
}

// shouldDecrypt tells whether to decrypt or not. We decrypt only if the datakey exist
// for the table.
func ( *Table) () bool {
	return .opt.DataKey != nil
}

// KeyID returns data key id.
func ( *Table) () uint64 {
	if .opt.DataKey != nil {
		return .opt.DataKey.KeyId
	}
	// By default it's 0, if it is plain text.
	return 0
}

// decrypt decrypts the given data. It should be called only after checking shouldDecrypt.
func ( *Table) ( []byte,  bool) ([]byte, error) {
	// Last BlockSize bytes of the data is the IV.
	 := [len()-aes.BlockSize:]
	// Rest all bytes are data.
	 = [:len()-aes.BlockSize]

	var  []byte
	if  {
		 = z.Calloc(len(), "Table.Decrypt")
	} else {
		 = make([]byte, len())
	}
	if  := y.XORBlock(, , .opt.DataKey.Data, );  != nil {
		return nil, y.Wrapf(, "while decrypt")
	}
	return , nil
}

// ParseFileID reads the file id out of a filename.
func ( string) (uint64, bool) {
	 = filepath.Base()
	if !strings.HasSuffix(, fileSuffix) {
		return 0, false
	}
	//	suffix := name[len(fileSuffix):]
	 = strings.TrimSuffix(, fileSuffix)
	,  := strconv.Atoi()
	if  != nil {
		return 0, false
	}
	y.AssertTrue( >= 0)
	return uint64(), true
}

// IDToFilename does the inverse of ParseFileID
func ( uint64) string {
	return fmt.Sprintf("%06d", ) + fileSuffix
}

// NewFilename should be named TableFilepath -- it combines the dir with the ID to make a table
// filepath.
func ( uint64,  string) string {
	return filepath.Join(, IDToFilename())
}

// decompress decompresses the data stored in a block.
func ( *Table) ( *Block) error {
	var  []byte
	var  error

	// Point to the original b.data
	 := .data

	switch .opt.Compression {
	case options.None:
		// Nothing to be done here.
		return nil
	case options.Snappy:
		if ,  := snappy.DecodedLen(.data);  == nil {
			 = z.Calloc(, "Table.Decompress")
		} else {
			 = z.Calloc(len(.data)*4, "Table.Decompress") // Take a guess.
		}
		.data,  = snappy.Decode(, .data)
		if  != nil {
			z.Free()
			return y.Wrap(, "failed to decompress")
		}
	case options.ZSTD:
		 := int(float64(.opt.BlockSize) * 1.2)
		// Get frame content size from header.
		var  zstd.Header
		if  := .Decode(.data);  == nil && .HasFCS && .FrameContentSize < uint64(.opt.BlockSize*2) {
			 = int(.FrameContentSize)
		}
		 = z.Calloc(, "Table.Decompress")
		.data,  = y.ZSTDDecompress(, .data)
		if  != nil {
			z.Free()
			return y.Wrap(, "failed to decompress")
		}
	default:
		return errors.New("Unsupported compression type")
	}

	if .freeMe {
		z.Free()
		.freeMe = false
	}

	if len(.data) > 0 && len() > 0 && &[0] != &.data[0] {
		z.Free()
	} else {
		.freeMe = true
	}
	return nil
}