/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package badger

import (
	
	
	
	cryptorand 
	
	
	
	
	
	
	
	
	
	
	

	
	
	
	
)

// memTable structure stores a skiplist and a corresponding WAL. Writes to memTable are written
// both to the WAL and the skiplist. On a crash, the WAL is replayed to bring the skiplist back to
// its pre-crash form.
type memTable struct {
	// TODO: Give skiplist z.Calloc'd []byte.
	sl         *skl.Skiplist
	wal        *logFile
	maxVersion uint64
	opt        Options
	buf        *bytes.Buffer
}

func ( *DB) ( Options) error {
	// We don't need to open any tables in in-memory mode.
	if .opt.InMemory {
		return nil
	}
	,  := os.ReadDir(.opt.Dir)
	if  != nil {
		return errFile(, .opt.Dir, "Unable to open mem dir.")
	}

	var  []int
	for ,  := range  {
		if !strings.HasSuffix(.Name(), memFileExt) {
			continue
		}
		 := len(.Name())
		,  := strconv.ParseInt(.Name()[:-len(memFileExt)], 10, 64)
		if  != nil {
			return errFile(, .Name(), "Unable to parse log id.")
		}
		 = append(, int())
	}

	// Sort in ascending order.
	sort.Slice(, func(,  int) bool {
		return [] < []
	})
	for ,  := range  {
		 := os.O_RDWR
		if .opt.ReadOnly {
			 = os.O_RDONLY
		}
		,  := .openMemTable(, )
		if  != nil {
			return y.Wrapf(, "while opening fid: %d", )
		}
		// If this memtable is empty we don't need to add it. This is a
		// memtable that was completely truncated.
		if .sl.Empty() {
			.DecrRef()
			continue
		}
		// These should no longer be written to. So, make them part of the imm.
		.imm = append(.imm, )
	}
	if len() != 0 {
		.nextMemFid = [len()-1]
	}
	.nextMemFid++
	return nil
}

const memFileExt string = ".mem"

func ( *DB) (,  int) (*memTable, error) {
	 := .mtFilePath()
	 := skl.NewSkiplist(arenaSize(.opt))
	 := &memTable{
		sl:  ,
		opt: .opt,
		buf: &bytes.Buffer{},
	}
	// We don't need to create the wal for the skiplist in in-memory mode so return the mt.
	if .opt.InMemory {
		return , z.NewFile
	}

	.wal = &logFile{
		fid:      uint32(),
		path:     ,
		registry: .registry,
		writeAt:  vlogHeaderSize,
		opt:      .opt,
	}
	 := .wal.open(, , 2*.opt.MemTableSize)
	if  != z.NewFile &&  != nil {
		return nil, y.Wrapf(, "While opening memtable: %s", )
	}

	// Have a callback set to delete WAL when skiplist reference count goes down to zero. That is,
	// when it gets flushed to L0.
	.OnClose = func() {
		if  := .wal.Delete();  != nil {
			.opt.Errorf("while deleting file: %s, err: %v", , )
		}
	}

	if  == z.NewFile {
		return , 
	}
	 := .UpdateSkipList()
	return , y.Wrapf(, "while updating skiplist")
}

func ( *DB) () (*memTable, error) {
	,  := .openMemTable(.nextMemFid, os.O_CREATE|os.O_RDWR)
	if  == z.NewFile {
		.nextMemFid++
		return , nil
	}

	if  != nil {
		.opt.Errorf("Got error: %v for id: %d\n", , .nextMemFid)
		return nil, y.Wrapf(, "newMemTable")
	}
	return nil, fmt.Errorf("File %s already exists", .wal.Fd.Name())
}

func ( *DB) ( int) string {
	return filepath.Join(.opt.Dir, fmt.Sprintf("%05d%s", , memFileExt))
}

func ( *memTable) () error {
	return .wal.Sync()
}

func ( *memTable) () bool {
	if .sl.MemSize() >= .opt.MemTableSize {
		return true
	}
	if .opt.InMemory {
		// InMemory mode doesn't have any WAL.
		return false
	}
	return int64(.wal.writeAt) >= .opt.MemTableSize
}

func ( *memTable) ( []byte,  y.ValueStruct) error {
	 := &Entry{
		Key:       ,
		Value:     .Value,
		UserMeta:  .UserMeta,
		meta:      .Meta,
		ExpiresAt: .ExpiresAt,
	}

	// wal is nil only when badger in running in in-memory mode and we don't need the wal.
	if .wal != nil {
		// If WAL exceeds opt.ValueLogFileSize, we'll force flush the memTable. See logic in
		// ensureRoomForWrite.
		if  := .wal.writeEntry(.buf, , .opt);  != nil {
			return y.Wrapf(, "cannot write entry to WAL file")
		}
	}
	// We insert the finish marker in the WAL but not in the memtable.
	if .meta&bitFinTxn > 0 {
		return nil
	}

	// Write to skiplist and update maxVersion encountered.
	.sl.Put(, )
	if  := y.ParseTs(.Key);  > .maxVersion {
		.maxVersion = 
	}
	y.NumBytesWrittenToL0Add(.opt.MetricsEnabled, .estimateSizeAndSetThreshold(.opt.ValueThreshold))
	return nil
}

func ( *memTable) () error {
	if .wal == nil || .sl == nil {
		return nil
	}
	,  := .wal.iterate(true, 0, .replayFunction(.opt))
	if  != nil {
		return y.Wrapf(, "while iterating wal: %s", .wal.Fd.Name())
	}
	if  < .wal.size.Load() && .opt.ReadOnly {
		return y.Wrapf(ErrTruncateNeeded, "end offset: %d < size: %d", , .wal.size.Load())
	}
	return .wal.Truncate(int64())
}

// IncrRef increases the refcount
func ( *memTable) () {
	.sl.IncrRef()
}

// DecrRef decrements the refcount, deallocating the Skiplist when done using it
func ( *memTable) () {
	.sl.DecrRef()
}

func ( *memTable) ( Options) func(Entry, valuePointer) error {
	 := true
	return func( Entry,  valuePointer) error { // Function for replaying.
		if  {
			.Debugf("First key=%q\n", .Key)
		}
		 = false
		if  := y.ParseTs(.Key);  > .maxVersion {
			.maxVersion = 
		}
		 := y.ValueStruct{
			Value:     .Value,
			Meta:      .meta,
			UserMeta:  .UserMeta,
			ExpiresAt: .ExpiresAt,
		}
		// This is already encoded correctly. Value would be either a vptr, or a full value
		// depending upon how big the original value was. Skiplist makes a copy of the key and
		// value.
		.sl.Put(.Key, )
		return nil
	}
}

type logFile struct {
	*z.MmapFile
	path string
	// This is a lock on the log file. It guards the fd’s value, the file’s
	// existence and the file’s memory map.
	//
	// Use shared ownership when reading/writing the file or memory map, use
	// exclusive ownership to open/close the descriptor, unmap or remove the file.
	lock     sync.RWMutex
	fid      uint32
	size     atomic.Uint32
	dataKey  *pb.DataKey
	baseIV   []byte
	registry *KeyRegistry
	writeAt  uint32
	opt      Options
}

func ( *logFile) ( int64) error {
	if ,  := .Fd.Stat();  != nil {
		return fmt.Errorf("while file.stat on file: %s, error: %v\n", .Fd.Name(), )
	} else if .Size() ==  {
		return nil
	}
	y.AssertTrue(!.opt.ReadOnly)
	.size.Store(uint32())
	return .MmapFile.Truncate()
}

// encodeEntry will encode entry to the buf
// layout of entry
// +--------+-----+-------+-------+
// | header | key | value | crc32 |
// +--------+-----+-------+-------+
func ( *logFile) ( *bytes.Buffer,  *Entry,  uint32) (int, error) {
	 := header{
		klen:      uint32(len(.Key)),
		vlen:      uint32(len(.Value)),
		expiresAt: .ExpiresAt,
		meta:      .meta,
		userMeta:  .UserMeta,
	}

	 := crc32.New(y.CastagnoliCrcTable)
	 := io.MultiWriter(, )

	// encode header.
	var  [maxHeaderSize]byte
	 := .Encode([:])
	y.Check2(.Write([:]))
	// we'll encrypt only key and value.
	if .encryptionEnabled() {
		// TODO: no need to allocate the bytes. we can calculate the encrypted buf one by one
		// since we're using ctr mode of AES encryption. Ordering won't changed. Need some
		// refactoring in XORBlock which will work like stream cipher.
		 := make([]byte, 0, len(.Key)+len(.Value))
		 = append(, .Key...)
		 = append(, .Value...)
		if  := y.XORBlockStream(
			, , .dataKey.Data, .generateIV());  != nil {
			return 0, y.Wrapf(, "Error while encoding entry for vlog.")
		}
	} else {
		// Encryption is disabled so writing directly to the buffer.
		y.Check2(.Write(.Key))
		y.Check2(.Write(.Value))
	}
	// write crc32 hash.
	var  [crc32.Size]byte
	binary.BigEndian.PutUint32([:], .Sum32())
	y.Check2(.Write([:]))
	// return encoded length.
	return len([:]) + len(.Key) + len(.Value) + len(), nil
}

func ( *logFile) ( *bytes.Buffer,  *Entry,  Options) error {
	.Reset()
	,  := .encodeEntry(, , .writeAt)
	if  != nil {
		return 
	}
	y.AssertTrue( == copy(.Data[.writeAt:], .Bytes()))
	.writeAt += uint32()

	.zeroNextEntry()
	return nil
}

func ( *logFile) ( []byte,  uint32) (*Entry, error) {
	var  header
	 := .Decode()
	 := [:]
	if .encryptionEnabled() {
		var  error
		// No need to worry about mmap. because, XORBlock allocates a byte array to do the
		// xor. So, the given slice is not being mutated.
		if ,  = .decryptKV(, );  != nil {
			return nil, 
		}
	}
	 := &Entry{
		meta:      .meta,
		UserMeta:  .userMeta,
		ExpiresAt: .expiresAt,
		offset:    ,
		Key:       [:.klen],
		Value:     [.klen : .klen+.vlen],
	}
	return , nil
}

func ( *logFile) ( []byte,  uint32) ([]byte, error) {
	return y.XORBlockAllocate(, .dataKey.Data, .generateIV())
}

// KeyID returns datakey's ID.
func ( *logFile) () uint64 {
	if .dataKey == nil {
		// If there is no datakey, then we'll return 0. Which means no encryption.
		return 0
	}
	return .dataKey.KeyId
}

func ( *logFile) () bool {
	return .dataKey != nil
}

// Acquire lock on mmap/file if you are calling this
func ( *logFile) ( valuePointer) ( []byte,  error) {
	 := .Offset
	// Do not convert size to uint32, because the lf.Data can be of size
	// 4GB, which overflows the uint32 during conversion to make the size 0,
	// causing the read to fail with ErrEOF. See issue #585.
	 := int64(len(.Data))
	 := .Len
	 := .size.Load()
	if int64() >=  || int64(+) >  ||
		// Ensure that the read is within the file's actual size. It might be possible that
		// the offset+valsz length is beyond the file's actual size. This could happen when
		// dropAll and iterations are running simultaneously.
		int64(+) > int64() {
		 = y.ErrEOF
	} else {
		 = .Data[ : +]
	}
	return , 
}

// generateIV will generate IV by appending given offset with the base IV.
func ( *logFile) ( uint32) []byte {
	 := make([]byte, aes.BlockSize)
	// baseIV is of 12 bytes.
	y.AssertTrue(12 == copy([:12], .baseIV))
	// remaining 4 bytes is obtained from offset.
	binary.BigEndian.PutUint32([12:], )
	return 
}

func ( *logFile) ( uint32) error {
	if .opt.SyncWrites {
		if  := .Sync();  != nil {
			return y.Wrapf(, "Unable to sync value log: %q", .path)
		}
	}

	// Before we were acquiring a lock here on lf.lock, because we were invalidating the file
	// descriptor due to reopening it as read-only. Now, we don't invalidate the fd, but unmap it,
	// truncate it and remap it. That creates a window where we have segfaults because the mmap is
	// no longer valid, while someone might be reading it. Therefore, we need a lock here again.
	.lock.Lock()
	defer .lock.Unlock()

	if  := .Truncate(int64());  != nil {
		return y.Wrapf(, "Unable to truncate file: %q", .path)
	}

	// Previously we used to close the file after it was written and reopen it in read-only mode.
	// We no longer open files in read-only mode. We keep all vlog files open in read-write mode.
	return nil
}

// iterate iterates over log file. It doesn't not allocate new memory for every kv pair.
// Therefore, the kv pair is only valid for the duration of fn call.
func ( *logFile) ( bool,  uint32,  logEntry) (uint32, error) {
	if  == 0 {
		// If offset is set to zero, let's advance past the encryption key header.
		 = vlogHeaderSize
	}

	// For now, read directly from file, because it allows
	 := bufio.NewReader(.NewReader(int()))
	 := &safeRead{
		k:            make([]byte, 10),
		v:            make([]byte, 10),
		recordOffset: ,
		lf:           ,
	}

	var  uint64
	var  uint32 = 

	var  []*Entry
	var  []valuePointer

:
	for {
		,  := .Entry()
		switch {
		// We have not reached the end of the file but the entry we read is
		// zero. This happens because we have truncated the file and
		// zero'ed it out.
		case  == io.EOF:
			break 
		case  == io.ErrUnexpectedEOF ||  == errTruncate:
			break 
		case  != nil:
			return 0, 
		case  == nil:
			continue
		case .isZero():
			break 
		}

		var  valuePointer
		.Len = uint32(.hlen + len(.Key) + len(.Value) + crc32.Size)
		.recordOffset += .Len

		.Offset = .offset
		.Fid = .fid

		switch {
		case .meta&bitTxn > 0:
			 := y.ParseTs(.Key)
			if  == 0 {
				 = 
			}
			if  !=  {
				break 
			}
			 = append(, )
			 = append(, )

		case .meta&bitFinTxn > 0:
			,  := strconv.ParseUint(string(.Value), 10, 64)
			if  != nil ||  !=  {
				break 
			}
			// Got the end of txn. Now we can store them.
			 = 0
			 = .recordOffset

			for ,  := range  {
				 := []
				if  := (*, );  != nil {
					if  == errStop {
						break
					}
					return 0, errFile(, .path, "Iteration function")
				}
			}
			 = [:0]
			 = [:0]

		default:
			if  != 0 {
				// This is most likely an entry which was moved as part of GC.
				// We shouldn't get this entry in the middle of a transaction.
				break 
			}
			 = .recordOffset

			if  := (*, );  != nil {
				if  == errStop {
					break
				}
				return 0, errFile(, .path, "Iteration function")
			}
		}
	}
	return , nil
}

// Zero out the next entry to deal with any crashes.
func ( *logFile) () {
	z.ZeroOut(.Data, int(.writeAt), int(.writeAt+maxHeaderSize))
}

func ( *logFile) ( string,  int,  int64) error {
	,  := z.OpenMmapFile(, , int())
	.MmapFile = 

	if  == z.NewFile {
		if  := .bootstrap();  != nil {
			os.Remove()
			return 
		}
		.size.Store(vlogHeaderSize)

	} else if  != nil {
		return y.Wrapf(, "while opening file: %s", )
	}
	.size.Store(uint32(len(.Data)))

	if .size.Load() < vlogHeaderSize {
		// Every vlog file should have at least vlogHeaderSize. If it is less than vlogHeaderSize
		// then it must have been corrupted. But no need to handle here. log replayer will truncate
		// and bootstrap the logfile. So ignoring here.
		return nil
	}

	// Copy over the encryption registry data.
	 := make([]byte, vlogHeaderSize)

	y.AssertTruef(vlogHeaderSize == copy(, .Data),
		"Unable to copy from %s, size %d", , .size.Load())
	 := binary.BigEndian.Uint64([:8])
	// retrieve datakey.
	if ,  := .registry.DataKey();  != nil {
		return y.Wrapf(, "While opening vlog file %d", .fid)
	} else {
		.dataKey = 
	}
	.baseIV = [8:]
	y.AssertTrue(len(.baseIV) == 12)

	// Preserved ferr so we can return if this was a new file.
	return 
}

// bootstrap will initialize the log file with key id and baseIV.
// The below figure shows the layout of log file.
// +----------------+------------------+------------------+
// | keyID(8 bytes) |  baseIV(12 bytes)|	 entry...     |
// +----------------+------------------+------------------+
func ( *logFile) () error {
	var  error

	// generate data key for the log file.
	var  *pb.DataKey
	if ,  = .registry.LatestDataKey();  != nil {
		return y.Wrapf(, "Error while retrieving datakey in logFile.bootstarp")
	}
	.dataKey = 

	// We'll always preserve vlogHeaderSize for key id and baseIV.
	 := make([]byte, vlogHeaderSize)

	// write key id to the buf.
	// key id will be zero if the logfile is in plain text.
	binary.BigEndian.PutUint64([:8], .keyID())
	// generate base IV. It'll be used with offset of the vptr to encrypt the entry.
	if ,  := cryptorand.Read([8:]);  != nil {
		return y.Wrapf(, "Error while creating base IV, while creating logfile")
	}

	// Initialize base IV.
	.baseIV = [8:]
	y.AssertTrue(len(.baseIV) == 12)

	// Copy over to the logFile.
	y.AssertTrue(vlogHeaderSize == copy(.Data[0:], ))

	// Zero out the next entry.
	.zeroNextEntry()
	return nil
}