/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package table

import (
	
	
	
	
	
	
	

	fbs 
	
	

	
	
	
	
	
)

const (
	KB = 1024
	MB = KB * 1024

	// When a block is encrypted, it's length increases. We add 256 bytes of padding to
	// handle cases when block size increases. This is an approximate number.
	padding = 256
)

type header struct {
	overlap uint16 // Overlap with base key.
	diff    uint16 // Length of the diff.
}

const headerSize = uint16(unsafe.Sizeof(header{}))

// Encode encodes the header.
func ( header) () []byte {
	var  [4]byte
	*(*header)(unsafe.Pointer(&[0])) = 
	return [:]
}

// Decode decodes the header.
func ( *header) ( []byte) {
	// Copy over data from buf into h. Using *h=unsafe.pointer(...) leads to
	// pointer alignment issues. See https://github.com/dgraph-io/badger/issues/1096
	// and comment https://github.com/dgraph-io/badger/pull/1097#pullrequestreview-307361714
	copy(((*[headerSize]byte)(unsafe.Pointer())[:]), [:headerSize])
}

// bblock represents a block that is being compressed/encrypted in the background.
type bblock struct {
	data         []byte
	baseKey      []byte   // Base key for the current block.
	entryOffsets []uint32 // Offsets of entries present in current block.
	end          int      // Points to the end offset of the block.
}

// Builder is used in building a table.
type Builder struct {
	// Typically tens or hundreds of meg. This is for one single file.
	alloc            *z.Allocator
	curBlock         *bblock
	compressedSize   atomic.Uint32
	uncompressedSize atomic.Uint32

	lenOffsets    uint32
	keyHashes     []uint32 // Used for building the bloomfilter.
	opts          *Options
	maxVersion    uint64
	onDiskSize    uint32
	staleDataSize int

	// Used to concurrently compress/encrypt blocks.
	wg        sync.WaitGroup
	blockChan chan *bblock
	blockList []*bblock
}

func ( *Builder) ( int) []byte {
	 := .curBlock
	if len(.data[.end:]) <  {
		// We need to reallocate. 1GB is the max size that the allocator can allocate.
		// While reallocating, if doubling exceeds that limit, then put the upper bound on it.
		 := 2 * len(.data)
		if  > (1 << 30) {
			 = 1 << 30
		}
		if .end+ >  {
			 = .end + 
		}
		 := .alloc.Allocate()
		copy(, .data)
		.data = 
	}
	.end += 
	return .data[.end- : .end]
}

// append appends to curBlock.data
func ( *Builder) ( []byte) {
	 := .allocate(len())
	y.AssertTrue(len() == copy(, ))
}

const maxAllocatorInitialSz = 256 << 20

// NewTableBuilder makes a new TableBuilder.
func ( Options) *Builder {
	 := 2 * int(.TableSize)
	if  > maxAllocatorInitialSz {
		 = maxAllocatorInitialSz
	}
	 := &Builder{
		alloc: .AllocPool.Get(, "TableBuilder"),
		opts:  &,
	}
	.alloc.Tag = "Builder"
	.curBlock = &bblock{
		data: .alloc.Allocate(.BlockSize + padding),
	}
	.opts.tableCapacity = uint64(float64(.opts.TableSize) * 0.95)

	// If encryption or compression is not enabled, do not start compression/encryption goroutines
	// and write directly to the buffer.
	if .opts.Compression == options.None && .opts.DataKey == nil {
		return 
	}

	 := 2 * runtime.NumCPU()
	.blockChan = make(chan *bblock, *2)

	.wg.Add()
	for  := 0;  < ; ++ {
		go .handleBlock()
	}
	return 
}

func maxEncodedLen( options.CompressionType,  int) int {
	switch  {
	case options.Snappy:
		return s2.MaxEncodedLen()
	case options.ZSTD:
		return y.ZSTDCompressBound()
	}
	return 
}

func ( *Builder) () {
	defer .wg.Done()

	 := .opts.Compression != options.None
	for  := range .blockChan {
		// Extract the block.
		 := .data[:.end]
		// Compress the block.
		if  {
			,  := .compressData()
			y.Check()
			 = 
		}
		if .shouldEncrypt() {
			,  := .encrypt()
			y.Check(y.Wrapf(, "Error while encrypting block in table builder."))
			 = 
		}

		// BlockBuf should always less than or equal to allocated space. If the blockBuf is greater
		// than allocated space that means the data from this block cannot be stored in its
		// existing location.
		 := maxEncodedLen(.opts.Compression, (.end)) + padding + 1
		y.AssertTrue(len() <= )

		// blockBuf was allocated on allocator. So, we don't need to copy it over.
		.data = 
		.end = len()
		.compressedSize.Add(uint32(len()))
	}
}

// Close closes the TableBuilder.
func ( *Builder) () {
	.opts.AllocPool.Return(.alloc)
}

// Empty returns whether it's empty.
func ( *Builder) () bool { return len(.keyHashes) == 0 }

// keyDiff returns a suffix of newKey that is different from b.baseKey.
func ( *Builder) ( []byte) []byte {
	var  int
	for  = 0;  < len() &&  < len(.curBlock.baseKey); ++ {
		if [] != .curBlock.baseKey[] {
			break
		}
	}
	return [:]
}

func ( *Builder) ( []byte,  y.ValueStruct,  uint32) {
	.keyHashes = append(.keyHashes, y.Hash(y.ParseKey()))

	if  := y.ParseTs();  > .maxVersion {
		.maxVersion = 
	}

	// diffKey stores the difference of key with baseKey.
	var  []byte
	if len(.curBlock.baseKey) == 0 {
		// Make a copy. Builder should not keep references. Otherwise, caller has to be very careful
		// and will have to make copies of keys every time they add to builder, which is even worse.
		.curBlock.baseKey = append(.curBlock.baseKey[:0], ...)
		 = 
	} else {
		 = .keyDiff()
	}

	y.AssertTrue(len()-len() <= math.MaxUint16)
	y.AssertTrue(len() <= math.MaxUint16)

	 := header{
		overlap: uint16(len() - len()),
		diff:    uint16(len()),
	}

	// store current entry's offset
	.curBlock.entryOffsets = append(.curBlock.entryOffsets, uint32(.curBlock.end))

	// Layout: header, diffKey, value.
	.append(.Encode())
	.append()

	 := .allocate(int(.EncodedSize()))
	.Encode()

	// Add the vpLen to the onDisk size. We'll add the size of the block to
	// onDisk size in Finish() function.
	.onDiskSize += 
}

/*
Structure of Block.
+-------------------+---------------------+--------------------+--------------+------------------+
| Entry1            | Entry2              | Entry3             | Entry4       | Entry5           |
+-------------------+---------------------+--------------------+--------------+------------------+
| Entry6            | ...                 | ...                | ...          | EntryN           |
+-------------------+---------------------+--------------------+--------------+------------------+
| Block Meta(contains list of offsets used| Block Meta Size    | Block        | Checksum Size    |
| to perform binary search in the block)  | (4 Bytes)          | Checksum     | (4 Bytes)        |
+-----------------------------------------+--------------------+--------------+------------------+
*/
// In case the data is encrypted, the "IV" is added to the end of the block.
func ( *Builder) () {
	if len(.curBlock.entryOffsets) == 0 {
		return
	}
	// Append the entryOffsets and its length.
	.append(y.U32SliceToBytes(.curBlock.entryOffsets))
	.append(y.U32ToBytes(uint32(len(.curBlock.entryOffsets))))

	 := .calculateChecksum(.curBlock.data[:.curBlock.end])

	// Append the block checksum and its length.
	.append()
	.append(y.U32ToBytes(uint32(len())))

	.blockList = append(.blockList, .curBlock)
	.uncompressedSize.Add(uint32(.curBlock.end))

	// Add length of baseKey (rounded to next multiple of 4 because of alignment).
	// Add another 40 Bytes, these additional 40 bytes consists of
	// 12 bytes of metadata of flatbuffer
	// 8 bytes for Key in flat buffer
	// 8 bytes for offset
	// 8 bytes for the len
	// 4 bytes for the size of slice while SliceAllocate
	.lenOffsets += uint32(int(math.Ceil(float64(len(.curBlock.baseKey))/4))*4) + 40

	// If compression/encryption is enabled, we need to send the block to the blockChan.
	if .blockChan != nil {
		.blockChan <- .curBlock
	}
}

func ( *Builder) ( []byte,  y.ValueStruct) bool {
	// If there is no entry till now, we will return false.
	if len(.curBlock.entryOffsets) <= 0 {
		return false
	}

	// Integer overflow check for statements below.
	y.AssertTrue((uint32(len(.curBlock.entryOffsets))+1)*4+4+8+4 < math.MaxUint32)
	// We should include current entry also in size, that's why +1 to len(b.entryOffsets).
	 := uint32((len(.curBlock.entryOffsets)+1)*4 +
		4 + // size of list
		8 + // Sum64 in checksum proto
		4) // checksum length
	 := uint32(.curBlock.end) + uint32(6 /*header size for entry*/) +
		uint32(len()) + .EncodedSize() + 

	if .shouldEncrypt() {
		// IV is added at the end of the block, while encrypting.
		// So, size of IV is added to estimatedSize.
		 += aes.BlockSize
	}

	// Integer overflow check for table size.
	y.AssertTrue(uint64(.curBlock.end)+uint64() < math.MaxUint32)

	return  > uint32(.opts.BlockSize)
}

// AddStaleKey is same is Add function but it also increments the internal
// staleDataSize counter. This value will be used to prioritize this table for
// compaction.
func ( *Builder) ( []byte,  y.ValueStruct,  uint32) {
	// Rough estimate based on how much space it will occupy in the SST.
	.staleDataSize += len() + len(.Value) + 4 /* entry offset */ + 4 /* header size */
	.addInternal(, , , true)
}

// Add adds a key-value pair to the block.
func ( *Builder) ( []byte,  y.ValueStruct,  uint32) {
	.addInternal(, , , false)
}

func ( *Builder) ( []byte,  y.ValueStruct,  uint32,  bool) {
	if .shouldFinishBlock(, ) {
		if  {
			// This key will be added to tableIndex and it is stale.
			.staleDataSize += len() + 4 /* len */ + 4 /* offset */
		}
		.finishBlock()
		// Create a new block and start writing.
		.curBlock = &bblock{
			data: .alloc.Allocate(.opts.BlockSize + padding),
		}
	}
	.addHelper(, , )
}

// TODO: vvv this was the comment on ReachedCapacity.
// FinalSize returns the *rough* final size of the array, counting the header which is
// not yet written.
// TODO: Look into why there is a discrepancy. I suspect it is because of Write(empty, empty)
// at the end. The diff can vary.

// ReachedCapacity returns true if we... roughly (?) reached capacity?
func ( *Builder) () bool {
	// If encryption/compression is enabled then use the compressed size.
	 := .compressedSize.Load()
	if .opts.Compression == options.None && .opts.DataKey == nil {
		 = .uncompressedSize.Load()
	}
	 :=  + // actual length of current buffer
		uint32(len(.curBlock.entryOffsets)*4) + // all entry offsets size
		4 + // count of all entry offsets
		8 + // checksum bytes
		4 // checksum length

	 :=  +
		4 + // Index length
		.lenOffsets

	return uint64() > .opts.tableCapacity
}

// Finish finishes the table by appending the index.
/*
The table structure looks like
+---------+------------+-----------+---------------+
| Block 1 | Block 2    | Block 3   | Block 4       |
+---------+------------+-----------+---------------+
| Block 5 | Block 6    | Block ... | Block N       |
+---------+------------+-----------+---------------+
| Index   | Index Size | Checksum  | Checksum Size |
+---------+------------+-----------+---------------+
*/
// In case the data is encrypted, the "IV" is added to the end of the index.
func ( *Builder) () []byte {
	 := .Done()
	 := make([]byte, .Size)
	 := .Copy()
	y.AssertTrue( == len())
	return 
}

type buildData struct {
	blockList []*bblock
	index     []byte
	checksum  []byte
	Size      int
	alloc     *z.Allocator
}

func ( *buildData) ( []byte) int {
	var  int
	for ,  := range .blockList {
		 += copy([:], .data[:.end])
	}
	 += copy([:], .index)
	 += copy([:], y.U32ToBytes(uint32(len(.index))))

	 += copy([:], .checksum)
	 += copy([:], y.U32ToBytes(uint32(len(.checksum))))
	return 
}

func ( *Builder) () buildData {
	.finishBlock() // This will never start a new block.
	if .blockChan != nil {
		close(.blockChan)
	}
	// Wait for block handler to finish.
	.wg.Wait()

	if len(.blockList) == 0 {
		return buildData{}
	}
	 := buildData{
		blockList: .blockList,
		alloc:     .alloc,
	}

	var  y.Filter
	if .opts.BloomFalsePositive > 0 {
		 := y.BloomBitsPerKey(len(.keyHashes), .opts.BloomFalsePositive)
		 = y.NewFilter(.keyHashes, )
	}
	,  := .buildIndex()

	var  error
	if .shouldEncrypt() {
		,  = .encrypt()
		y.Check()
	}
	 := .calculateChecksum()

	.index = 
	.checksum = 
	.Size = int() + len() + len() + 4 + 4
	return 
}

func ( *Builder) ( []byte) []byte {
	// Build checksum for the index.
	 := pb.Checksum{
		// TODO: The checksum type should be configurable from the
		// options.
		// We chose to use CRC32 as the default option because
		// it performed better compared to xxHash64.
		// See the BenchmarkChecksum in table_test.go file
		// Size     =>   1024 B        2048 B
		// CRC32    => 63.7 ns/op     112 ns/op
		// xxHash64 => 87.5 ns/op     158 ns/op
		Sum:  y.CalculateChecksum(, pb.Checksum_CRC32C),
		Algo: pb.Checksum_CRC32C,
	}

	// Write checksum to the file.
	,  := proto.Marshal(&)
	y.Check()
	// Write checksum size.
	return 
}

// DataKey returns datakey of the builder.
func ( *Builder) () *pb.DataKey {
	return .opts.DataKey
}

func ( *Builder) () *Options {
	return .opts
}

// encrypt will encrypt the given data and appends IV to the end of the encrypted data.
// This should be only called only after checking shouldEncrypt method.
func ( *Builder) ( []byte) ([]byte, error) {
	,  := y.GenerateIV()
	if  != nil {
		return , y.Wrapf(, "Error while generating IV in Builder.encrypt")
	}
	 := len() + len()
	 := .alloc.Allocate()

	if  = y.XORBlock([:len()], , .DataKey().Data, );  != nil {
		return , y.Wrapf(, "Error while encrypting in Builder.encrypt")
	}

	y.AssertTrue(len() == copy([len():], ))
	return , nil
}

// shouldEncrypt tells us whether to encrypt the data or not.
// We encrypt only if the data key exist. Otherwise, not.
func ( *Builder) () bool {
	return .opts.DataKey != nil
}

// compressData compresses the given data.
func ( *Builder) ( []byte) ([]byte, error) {
	switch .opts.Compression {
	case options.None:
		return , nil
	case options.Snappy:
		 := s2.MaxEncodedLen(len())
		 := .alloc.Allocate()
		return s2.EncodeSnappy(, ), nil
	case options.ZSTD:
		 := y.ZSTDCompressBound(len())
		 := .alloc.Allocate()
		return y.ZSTDCompress(, , .opts.ZSTDCompressionLevel)
	}
	return nil, errors.New("Unsupported compression type")
}

func ( *Builder) ( []byte) ([]byte, uint32) {
	 := fbs.NewBuilder(3 << 20)

	,  := .writeBlockOffsets()
	// Write block offset vector the the idxBuilder.
	fb.TableIndexStartOffsetsVector(, len())

	// Write individual block offsets in reverse order to work around how Flatbuffers expects it.
	for  := len() - 1;  >= 0; -- {
		.PrependUOffsetT([])
	}
	 := .EndVector(len())

	var  fbs.UOffsetT
	// Write the bloom filter.
	if len() > 0 {
		 = .CreateByteVector()
	}
	.onDiskSize += 
	fb.TableIndexStart()
	fb.TableIndexAddOffsets(, )
	fb.TableIndexAddBloomFilter(, )
	fb.TableIndexAddMaxVersion(, .maxVersion)
	fb.TableIndexAddUncompressedSize(, .uncompressedSize.Load())
	fb.TableIndexAddKeyCount(, uint32(len(.keyHashes)))
	fb.TableIndexAddOnDiskSize(, .onDiskSize)
	fb.TableIndexAddStaleDataSize(, uint32(.staleDataSize))
	.Finish(fb.TableIndexEnd())

	 := .FinishedBytes()
	 := fb.GetRootAsTableIndex(, 0)
	// Mutate the ondisk size to include the size of the index as well.
	y.AssertTrue(.MutateOnDiskSize(.OnDiskSize() + uint32(len())))
	return , 
}

// writeBlockOffsets writes all the blockOffets in b.offsets and returns the
// offsets for the newly written items.
func ( *Builder) ( *fbs.Builder) ([]fbs.UOffsetT, uint32) {
	var  uint32
	var  []fbs.UOffsetT
	for ,  := range .blockList {
		 := .writeBlockOffset(, , )
		 = append(, )
		 += uint32(.end)
	}
	return , 
}

// writeBlockOffset writes the given key,offset,len triple to the indexBuilder.
// It returns the offset of the newly written blockoffset.
func ( *Builder) (
	 *fbs.Builder,  *bblock,  uint32) fbs.UOffsetT {
	// Write the key to the buffer.
	 := .CreateByteVector(.baseKey)

	// Build the blockOffset.
	fb.BlockOffsetStart()
	fb.BlockOffsetAddKey(, )
	fb.BlockOffsetAddOffset(, )
	fb.BlockOffsetAddLen(, uint32(.end))
	return fb.BlockOffsetEnd()
}