/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package badger

import (
	
	
	
	
	
	
	
	
	
	

	
	
)

type oracle struct {
	isManaged       bool // Does not change value, so no locking required.
	detectConflicts bool // Determines if the txns should be checked for conflicts.

	sync.Mutex // For nextTxnTs and commits.
	// writeChLock lock is for ensuring that transactions go to the write
	// channel in the same order as their commit timestamps.
	writeChLock sync.Mutex
	nextTxnTs   uint64

	// Used to block NewTransaction, so all previous commits are visible to a new read.
	txnMark *y.WaterMark

	// Either of these is used to determine which versions can be permanently
	// discarded during compaction.
	discardTs uint64       // Used by ManagedDB.
	readMark  *y.WaterMark // Used by DB.

	// committedTxns contains all committed writes (contains fingerprints
	// of keys written and their latest commit counter).
	committedTxns []committedTxn
	lastCleanupTs uint64

	// closer is used to stop watermarks.
	closer *z.Closer
}

type committedTxn struct {
	ts uint64
	// ConflictKeys Keeps track of the entries written at timestamp ts.
	conflictKeys map[uint64]struct{}
}

func newOracle( Options) *oracle {
	 := &oracle{
		isManaged:       .managedTxns,
		detectConflicts: .DetectConflicts,
		// We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open.
		//
		// WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here.
		// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
		readMark: &y.WaterMark{Name: "badger.PendingReads"},
		txnMark:  &y.WaterMark{Name: "badger.TxnTimestamp"},
		closer:   z.NewCloser(2),
	}
	.readMark.Init(.closer)
	.txnMark.Init(.closer)
	return 
}

func ( *oracle) () {
	.closer.SignalAndWait()
}

func ( *oracle) () uint64 {
	if .isManaged {
		panic("ReadTs should not be retrieved for managed DB")
	}

	var  uint64
	.Lock()
	 = .nextTxnTs - 1
	.readMark.Begin()
	.Unlock()

	// Wait for all txns which have no conflicts, have been assigned a commit
	// timestamp and are going through the write to value log and LSM tree
	// process. Not waiting here could mean that some txns which have been
	// committed would not be read.
	y.Check(.txnMark.WaitForMark(context.Background(), ))
	return 
}

func ( *oracle) () uint64 {
	.Lock()
	defer .Unlock()
	return .nextTxnTs
}

func ( *oracle) () {
	.Lock()
	defer .Unlock()
	.nextTxnTs++
}

// Any deleted or invalid versions at or below ts would be discarded during
// compaction to reclaim disk space in LSM tree and thence value log.
func ( *oracle) ( uint64) {
	.Lock()
	defer .Unlock()
	.discardTs = 
	.cleanupCommittedTransactions()
}

func ( *oracle) () uint64 {
	if .isManaged {
		.Lock()
		defer .Unlock()
		return .discardTs
	}
	return .readMark.DoneUntil()
}

// hasConflict must be called while having a lock.
func ( *oracle) ( *Txn) bool {
	if len(.reads) == 0 {
		return false
	}
	for ,  := range .committedTxns {
		// If the committedTxn.ts is less than txn.readTs that implies that the
		// committedTxn finished before the current transaction started.
		// We don't need to check for conflict in that case.
		// This change assumes linearizability. Lack of linearizability could
		// cause the read ts of a new txn to be lower than the commit ts of
		// a txn before it (@mrjn).
		if .ts <= .readTs {
			continue
		}

		for ,  := range .reads {
			if ,  := .conflictKeys[];  {
				return true
			}
		}
	}

	return false
}

func ( *oracle) ( *Txn) (uint64, bool) {
	.Lock()
	defer .Unlock()

	if .hasConflict() {
		return 0, true
	}

	var  uint64
	if !.isManaged {
		.doneRead()
		.cleanupCommittedTransactions()

		// This is the general case, when user doesn't specify the read and commit ts.
		 = .nextTxnTs
		.nextTxnTs++
		.txnMark.Begin()

	} else {
		// If commitTs is set, use it instead.
		 = .commitTs
	}

	y.AssertTrue( >= .lastCleanupTs)

	if .detectConflicts {
		// We should ensure that txns are not added to o.committedTxns slice when
		// conflict detection is disabled otherwise this slice would keep growing.
		.committedTxns = append(.committedTxns, committedTxn{
			ts:           ,
			conflictKeys: .conflictKeys,
		})
	}

	return , false
}

func ( *oracle) ( *Txn) {
	if !.doneRead {
		.doneRead = true
		.readMark.Done(.readTs)
	}
}

func ( *oracle) () { // Must be called under o.Lock
	if !.detectConflicts {
		// When detectConflicts is set to false, we do not store any
		// committedTxns and so there's nothing to clean up.
		return
	}
	// Same logic as discardAtOrBelow but unlocked
	var  uint64
	if .isManaged {
		 = .discardTs
	} else {
		 = .readMark.DoneUntil()
	}

	y.AssertTrue( >= .lastCleanupTs)

	// do not run clean up if the maxReadTs (read timestamp of the
	// oldest transaction that is still in flight) has not increased
	if  == .lastCleanupTs {
		return
	}
	.lastCleanupTs = 

	 := .committedTxns[:0]
	for ,  := range .committedTxns {
		if .ts <=  {
			continue
		}
		 = append(, )
	}
	.committedTxns = 
}

func ( *oracle) ( uint64) {
	if .isManaged {
		// No need to update anything.
		return
	}
	.txnMark.Done()
}

// Txn represents a Badger transaction.
type Txn struct {
	readTs   uint64
	commitTs uint64
	size     int64
	count    int64
	db       *DB

	reads []uint64 // contains fingerprints of keys read.
	// contains fingerprints of keys written. This is used for conflict detection.
	conflictKeys map[uint64]struct{}
	readsLock    sync.Mutex // guards the reads slice. See addReadKey.

	pendingWrites   map[string]*Entry // cache stores any writes done by txn.
	duplicateWrites []*Entry          // Used in managed mode to store duplicate entries.

	numIterators atomic.Int32
	discarded    bool
	doneRead     bool
	update       bool // update is used to conditionally keep track of reads.
}

type pendingWritesIterator struct {
	entries  []*Entry
	nextIdx  int
	readTs   uint64
	reversed bool
}

func ( *pendingWritesIterator) () {
	.nextIdx++
}

func ( *pendingWritesIterator) () {
	.nextIdx = 0
}

func ( *pendingWritesIterator) ( []byte) {
	 = y.ParseKey()
	.nextIdx = sort.Search(len(.entries), func( int) bool {
		 := bytes.Compare(.entries[].Key, )
		if !.reversed {
			return  >= 0
		}
		return  <= 0
	})
}

func ( *pendingWritesIterator) () []byte {
	y.AssertTrue(.Valid())
	 := .entries[.nextIdx]
	return y.KeyWithTs(.Key, .readTs)
}

func ( *pendingWritesIterator) () y.ValueStruct {
	y.AssertTrue(.Valid())
	 := .entries[.nextIdx]
	return y.ValueStruct{
		Value:     .Value,
		Meta:      .meta,
		UserMeta:  .UserMeta,
		ExpiresAt: .ExpiresAt,
		Version:   .readTs,
	}
}

func ( *pendingWritesIterator) () bool {
	return .nextIdx < len(.entries)
}

func ( *pendingWritesIterator) () error {
	return nil
}

func ( *Txn) ( bool) *pendingWritesIterator {
	if !.update || len(.pendingWrites) == 0 {
		return nil
	}
	 := make([]*Entry, 0, len(.pendingWrites))
	for ,  := range .pendingWrites {
		 = append(, )
	}
	// Number of pending writes per transaction shouldn't be too big in general.
	sort.Slice(, func(,  int) bool {
		 := bytes.Compare([].Key, [].Key)
		if ! {
			return  < 0
		}
		return  > 0
	})
	return &pendingWritesIterator{
		readTs:   .readTs,
		entries:  ,
		reversed: ,
	}
}

func ( *Txn) ( *Entry) error {
	 := .count + 1
	// Extra bytes for the version in key.
	 := .size + .estimateSizeAndSetThreshold(.db.valueThreshold()) + 10
	if  >= .db.opt.maxBatchCount ||  >= .db.opt.maxBatchSize {
		return ErrTxnTooBig
	}
	.count, .size = , 
	return nil
}

func exceedsSize( string,  int64,  []byte) error {
	return fmt.Errorf("%s with size %d exceeded %d limit. %s:\n%s",
		, len(), , , hex.Dump([:1<<10]))
}

func ( *Txn) ( *Entry) error {
	const  = 65000

	switch {
	case !.update:
		return ErrReadOnlyTxn
	case .discarded:
		return ErrDiscardedTxn
	case len(.Key) == 0:
		return ErrEmptyKey
	case bytes.HasPrefix(.Key, badgerPrefix):
		return ErrInvalidKey
	case len(.Key) > :
		// Key length can't be more than uint16, as determined by table::header.  To
		// keep things safe and allow badger move prefix and a timestamp suffix, let's
		// cut it down to 65000, instead of using 65536.
		return exceedsSize("Key", , .Key)
	case int64(len(.Value)) > .db.opt.ValueLogFileSize:
		return exceedsSize("Value", .db.opt.ValueLogFileSize, .Value)
	case .db.opt.InMemory && int64(len(.Value)) > .db.valueThreshold():
		return exceedsSize("Value", .db.valueThreshold(), .Value)
	}

	if  := .db.isBanned(.Key);  != nil {
		return 
	}

	if  := .checkSize();  != nil {
		return 
	}

	// The txn.conflictKeys is used for conflict detection. If conflict detection
	// is disabled, we don't need to store key hashes in this map.
	if .db.opt.DetectConflicts {
		 := z.MemHash(.Key) // Avoid dealing with byte arrays.
		.conflictKeys[] = struct{}{}
	}
	// If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
	// Add the entry to duplicateWrites only if both the entries have different versions. For
	// same versions, we will overwrite the existing entry.
	if ,  := .pendingWrites[string(.Key)];  && .version != .version {
		.duplicateWrites = append(.duplicateWrites, )
	}
	.pendingWrites[string(.Key)] = 
	return nil
}

// Set adds a key-value pair to the database.
// It will return ErrReadOnlyTxn if update flag was set to false when creating the transaction.
//
// The current transaction keeps a reference to the key and val byte slice
// arguments. Users must not modify key and val until the end of the transaction.
func ( *Txn) (,  []byte) error {
	return .SetEntry(NewEntry(, ))
}

// SetEntry takes an Entry struct and adds the key-value pair in the struct,
// along with other metadata to the database.
//
// The current transaction keeps a reference to the entry passed in argument.
// Users must not modify the entry until the end of the transaction.
func ( *Txn) ( *Entry) error {
	return .modify()
}

// Delete deletes a key.
//
// This is done by adding a delete marker for the key at commit timestamp.  Any
// reads happening before this timestamp would be unaffected. Any reads after
// this commit would see the deletion.
//
// The current transaction keeps a reference to the key byte slice argument.
// Users must not modify the key until the end of the transaction.
func ( *Txn) ( []byte) error {
	 := &Entry{
		Key:  ,
		meta: bitDelete,
	}
	return .modify()
}

// Get looks for key and returns corresponding Item.
// If key is not found, ErrKeyNotFound is returned.
func ( *Txn) ( []byte) ( *Item,  error) {
	if len() == 0 {
		return nil, ErrEmptyKey
	} else if .discarded {
		return nil, ErrDiscardedTxn
	}

	if  := .db.isBanned();  != nil {
		return nil, 
	}

	 = new(Item)
	if .update {
		if ,  := .pendingWrites[string()];  && bytes.Equal(, .Key) {
			if isDeletedOrExpired(.meta, .ExpiresAt) {
				return nil, ErrKeyNotFound
			}
			// Fulfill from cache.
			.meta = .meta
			.val = .Value
			.userMeta = .UserMeta
			.key = 
			.status = prefetched
			.version = .readTs
			.expiresAt = .ExpiresAt
			// We probably don't need to set db on item here.
			return , nil
		}
		// Only track reads if this is update txn. No need to track read if txn serviced it
		// internally.
		.addReadKey()
	}

	 := y.KeyWithTs(, .readTs)
	,  := .db.get()
	if  != nil {
		return nil, y.Wrapf(, "DB::Get key: %q", )
	}
	if .Value == nil && .Meta == 0 {
		return nil, ErrKeyNotFound
	}
	if isDeletedOrExpired(.Meta, .ExpiresAt) {
		return nil, ErrKeyNotFound
	}

	.key = 
	.version = .Version
	.meta = .Meta
	.userMeta = .UserMeta
	.vptr = y.SafeCopy(.vptr, .Value)
	.txn = 
	.expiresAt = .ExpiresAt
	return , nil
}

func ( *Txn) ( []byte) {
	if .update {
		 := z.MemHash()

		// Because of the possibility of multiple iterators it is now possible
		// for multiple threads within a read-write transaction to read keys at
		// the same time. The reads slice is not currently thread-safe and
		// needs to be locked whenever we mark a key as read.
		.readsLock.Lock()
		.reads = append(.reads, )
		.readsLock.Unlock()
	}
}

// Discard discards a created transaction. This method is very important and must be called. Commit
// method calls this internally, however, calling this multiple times doesn't cause any issues. So,
// this can safely be called via a defer right when transaction is created.
//
// NOTE: If any operations are run on a discarded transaction, ErrDiscardedTxn is returned.
func ( *Txn) () {
	if .discarded { // Avoid a re-run.
		return
	}
	if .numIterators.Load() > 0 {
		panic("Unclosed iterator at time of Txn.Discard.")
	}
	.discarded = true
	if !.db.orc.isManaged {
		.db.orc.doneRead()
	}
}

func ( *Txn) () (func() error, error) {
	 := .db.orc
	// Ensure that the order in which we get the commit timestamp is the same as
	// the order in which we push these updates to the write channel. So, we
	// acquire a writeChLock before getting a commit timestamp, and only release
	// it after pushing the entries to it.
	.writeChLock.Lock()
	defer .writeChLock.Unlock()

	,  := .newCommitTs()
	if  {
		return nil, ErrConflict
	}

	 := true
	 := func( *Entry) {
		if .version == 0 {
			.version = 
		} else {
			 = false
		}
	}
	for ,  := range .pendingWrites {
		()
	}
	// The duplicateWrites slice will be non-empty only if there are duplicate
	// entries with different versions.
	for ,  := range .duplicateWrites {
		()
	}

	 := make([]*Entry, 0, len(.pendingWrites)+len(.duplicateWrites)+1)

	 := func( *Entry) {
		// Suffix the keys with commit ts, so the key versions are sorted in
		// descending order of commit timestamp.
		.Key = y.KeyWithTs(.Key, .version)
		// Add bitTxn only if these entries are part of a transaction. We
		// support SetEntryAt(..) in managed mode which means a single
		// transaction can have entries with different timestamps. If entries
		// in a single transaction have different timestamps, we don't add the
		// transaction markers.
		if  {
			.meta |= bitTxn
		}
		 = append(, )
	}

	// The following debug information is what led to determining the cause of
	// bank txn violation bug, and it took a whole bunch of effort to narrow it
	// down to here. So, keep this around for at least a couple of months.
	// var b strings.Builder
	// fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",
	// 	txn.readTs, commitTs, txn.reads, txn.conflictKeys)
	for ,  := range .pendingWrites {
		()
	}
	for ,  := range .duplicateWrites {
		()
	}

	if  {
		// CommitTs should not be zero if we're inserting transaction markers.
		y.AssertTrue( != 0)
		 := &Entry{
			Key:   y.KeyWithTs(txnKey, ),
			Value: []byte(strconv.FormatUint(, 10)),
			meta:  bitFinTxn,
		}
		 = append(, )
	}

	,  := .db.sendToWriteCh()
	if  != nil {
		.doneCommit()
		return nil, 
	}
	 := func() error {
		 := .Wait()
		// Wait before marking commitTs as done.
		// We can't defer doneCommit above, because it is being called from a
		// callback here.
		.doneCommit()
		return 
	}
	return , nil
}

func ( *Txn) () error {
	if .discarded {
		return errors.New("Trying to commit a discarded txn")
	}
	 := true
	for ,  := range .pendingWrites {
		if .version != 0 {
			 = false
		}
	}

	// If keepTogether is True, it implies transaction markers will be added.
	// In that case, commitTs should not be never be zero. This might happen if
	// someone uses txn.Commit instead of txn.CommitAt in managed mode.  This
	// should happen only in managed mode. In normal mode, keepTogether will
	// always be true.
	if  && .db.opt.managedTxns && .commitTs == 0 {
		return errors.New("CommitTs cannot be zero. Please use commitAt instead")
	}
	return nil
}

// Commit commits the transaction, following these steps:
//
// 1. If there are no writes, return immediately.
//
// 2. Check if read rows were updated since txn started. If so, return ErrConflict.
//
// 3. If no conflict, generate a commit timestamp and update written rows' commit ts.
//
// 4. Batch up all writes, write them to value log and LSM tree.
//
// 5. If callback is provided, Badger will return immediately after checking
// for conflicts. Writes to the database will happen in the background.  If
// there is a conflict, an error will be returned and the callback will not
// run. If there are no conflicts, the callback will be called in the
// background upon successful completion of writes or any error during write.
//
// If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
// tree won't be updated, so there's no need for any rollback.
func ( *Txn) () error {
	// txn.conflictKeys can be zero if conflict detection is turned off. So we
	// should check txn.pendingWrites.
	if len(.pendingWrites) == 0 {
		// Discard the transaction so that the read is marked done.
		.Discard()
		return nil
	}
	// Precheck before discarding txn.
	if  := .commitPrecheck();  != nil {
		return 
	}
	defer .Discard()

	,  := .commitAndSend()
	if  != nil {
		return 
	}
	// If batchSet failed, LSM would not have been updated. So, no need to rollback anything.

	// TODO: What if some of the txns successfully make it to value log, but others fail.
	// Nothing gets updated to LSM, until a restart happens.
	return ()
}

type txnCb struct {
	commit func() error
	user   func(error)
	err    error
}

func runTxnCallback( *txnCb) {
	switch {
	case  == nil:
		panic("txn callback is nil")
	case .user == nil:
		panic("Must have caught a nil callback for txn.CommitWith")
	case .err != nil:
		.user(.err)
	case .commit != nil:
		 := .commit()
		.user()
	default:
		.user(nil)
	}
}

// CommitWith acts like Commit, but takes a callback, which gets run via a
// goroutine to avoid blocking this function. The callback is guaranteed to run,
// so it is safe to increment sync.WaitGroup before calling CommitWith, and
// decrementing it in the callback; to block until all callbacks are run.
func ( *Txn) ( func(error)) {
	if  == nil {
		panic("Nil callback provided to CommitWith")
	}

	if len(.pendingWrites) == 0 {
		// Do not run these callbacks from here, because the CommitWith and the
		// callback might be acquiring the same locks. Instead run the callback
		// from another goroutine.
		go runTxnCallback(&txnCb{user: , err: nil})
		// Discard the transaction so that the read is marked done.
		.Discard()
		return
	}

	// Precheck before discarding txn.
	if  := .commitPrecheck();  != nil {
		()
		return
	}

	defer .Discard()

	,  := .commitAndSend()
	if  != nil {
		go runTxnCallback(&txnCb{user: , err: })
		return
	}

	go runTxnCallback(&txnCb{user: , commit: })
}

// ReadTs returns the read timestamp of the transaction.
func ( *Txn) () uint64 {
	return .readTs
}

// NewTransaction creates a new transaction. Badger supports concurrent execution of transactions,
// providing serializable snapshot isolation, avoiding write skews. Badger achieves this by tracking
// the keys read and at Commit time, ensuring that these read keys weren't concurrently modified by
// another transaction.
//
// For read-only transactions, set update to false. In this mode, we don't track the rows read for
// any changes. Thus, any long running iterations done in this mode wouldn't pay this overhead.
//
// Running transactions concurrently is OK. However, a transaction itself isn't thread safe, and
// should only be run serially. It doesn't matter if a transaction is created by one goroutine and
// passed down to other, as long as the Txn APIs are called serially.
//
// When you create a new transaction, it is absolutely essential to call
// Discard(). This should be done irrespective of what the update param is set
// to. Commit API internally runs Discard, but running it twice wouldn't cause
// any issues.
//
//	txn := db.NewTransaction(false)
//	defer txn.Discard()
//	// Call various APIs.
func ( *DB) ( bool) *Txn {
	return .newTransaction(, false)
}

func ( *DB) (,  bool) *Txn {
	if .opt.ReadOnly &&  {
		// DB is read-only, force read-only transaction.
		 = false
	}

	 := &Txn{
		update: ,
		db:     ,
		count:  1,                       // One extra entry for BitFin.
		size:   int64(len(txnKey) + 10), // Some buffer for the extra entry.
	}
	if  {
		if .opt.DetectConflicts {
			.conflictKeys = make(map[uint64]struct{})
		}
		.pendingWrites = make(map[string]*Entry)
	}
	if ! {
		.readTs = .orc.readTs()
	}
	return 
}

// View executes a function creating and managing a read-only transaction for the user. Error
// returned by the function is relayed by the View method.
// If View is used with managed transactions, it would assume a read timestamp of MaxUint64.
func ( *DB) ( func( *Txn) error) error {
	if .IsClosed() {
		return ErrDBClosed
	}
	var  *Txn
	if .opt.managedTxns {
		 = .NewTransactionAt(math.MaxUint64, false)
	} else {
		 = .NewTransaction(false)
	}
	defer .Discard()

	return ()
}

// Update executes a function, creating and managing a read-write transaction
// for the user. Error returned by the function is relayed by the Update method.
// Update cannot be used with managed transactions.
func ( *DB) ( func( *Txn) error) error {
	if .IsClosed() {
		return ErrDBClosed
	}
	if .opt.managedTxns {
		panic("Update can only be used with managedDB=false.")
	}
	 := .NewTransaction(true)
	defer .Discard()

	if  := ();  != nil {
		return 
	}

	return .Commit()
}