/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package badger

import (
	
	
	
	

	

	
	
	
)

// WriteBatch holds the necessary info to perform batched writes.
type WriteBatch struct {
	sync.Mutex
	txn      *Txn
	db       *DB
	throttle *y.Throttle
	err      atomic.Value

	isManaged bool
	commitTs  uint64
	finished  bool
}

// NewWriteBatch creates a new WriteBatch. This provides a way to conveniently do a lot of writes,
// batching them up as tightly as possible in a single transaction and using callbacks to avoid
// waiting for them to commit, thus achieving good performance. This API hides away the logic of
// creating and committing transactions. Due to the nature of SSI guaratees provided by Badger,
// blind writes can never encounter transaction conflicts (ErrConflict).
func ( *DB) () *WriteBatch {
	if .opt.managedTxns {
		panic("cannot use NewWriteBatch in managed mode. Use NewWriteBatchAt instead")
	}
	return .newWriteBatch(false)
}

func ( *DB) ( bool) *WriteBatch {
	return &WriteBatch{
		db:        ,
		isManaged: ,
		txn:       .newTransaction(true, ),
		throttle:  y.NewThrottle(16),
	}
}

// SetMaxPendingTxns sets a limit on maximum number of pending transactions while writing batches.
// This function should be called before using WriteBatch. Default value of MaxPendingTxns is
// 16 to minimise memory usage.
func ( *WriteBatch) ( int) {
	.throttle = y.NewThrottle()
}

// Cancel function must be called if there's a chance that Flush might not get
// called. If neither Flush or Cancel is called, the transaction oracle would
// never get a chance to clear out the row commit timestamp map, thus causing an
// unbounded memory consumption. Typically, you can call Cancel as a defer
// statement right after NewWriteBatch is called.
//
// Note that any committed writes would still go through despite calling Cancel.
func ( *WriteBatch) () {
	.Lock()
	defer .Unlock()
	.finished = true
	if  := .throttle.Finish();  != nil {
		.db.opt.Errorf("WatchBatch.Cancel error while finishing: %v", )
	}
	.txn.Discard()
}

func ( *WriteBatch) ( error) {
	// sync.WaitGroup is thread-safe, so it doesn't need to be run inside wb.Lock.
	defer .throttle.Done()
	if  == nil {
		return
	}
	if  := .Error();  != nil {
		return
	}
	.err.Store()
}

func ( *WriteBatch) ( *pb.KV) error {
	 := Entry{Key: .Key, Value: .Value}
	if len(.UserMeta) > 0 {
		.UserMeta = .UserMeta[0]
	}
	y.AssertTrue(.Version != 0)
	.version = .Version
	return .handleEntry(&)
}

func ( *WriteBatch) ( *z.Buffer) error {
	.Lock()
	defer .Unlock()

	 := .SliceIterate(func( []byte) error {
		 := &pb.KV{}
		if  := proto.Unmarshal(, );  != nil {
			return 
		}
		return .writeKV()
	})
	return 
}

func ( *WriteBatch) ( *pb.KVList) error {
	.Lock()
	defer .Unlock()
	for ,  := range .Kv {
		if  := .writeKV();  != nil {
			return 
		}
	}
	return nil
}

// SetEntryAt is the equivalent of Txn.SetEntry but it also allows setting version for the entry.
// SetEntryAt can be used only in managed mode.
func ( *WriteBatch) ( *Entry,  uint64) error {
	if !.db.opt.managedTxns {
		return errors.New("SetEntryAt can only be used in managed mode. Use SetEntry instead")
	}
	.version = 
	return .SetEntry()
}

// Should be called with lock acquired.
func ( *WriteBatch) ( *Entry) error {
	if  := .txn.SetEntry();  != ErrTxnTooBig {
		return 
	}
	// Txn has reached it's zenith. Commit now.
	if  := .commit();  != nil {
		return 
	}
	// This time the error must not be ErrTxnTooBig, otherwise, we make the
	// error permanent.
	if  := .txn.SetEntry();  != nil {
		.err.Store()
		return 
	}
	return nil
}

// SetEntry is the equivalent of Txn.SetEntry.
func ( *WriteBatch) ( *Entry) error {
	.Lock()
	defer .Unlock()
	return .handleEntry()
}

// Set is equivalent of Txn.Set().
func ( *WriteBatch) (,  []byte) error {
	 := &Entry{Key: , Value: }
	return .SetEntry()
}

// DeleteAt is equivalent of Txn.Delete but accepts a delete timestamp.
func ( *WriteBatch) ( []byte,  uint64) error {
	 := Entry{Key: , meta: bitDelete, version: }
	return .SetEntry(&)
}

// Delete is equivalent of Txn.Delete.
func ( *WriteBatch) ( []byte) error {
	.Lock()
	defer .Unlock()

	if  := .txn.Delete();  != ErrTxnTooBig {
		return 
	}
	if  := .commit();  != nil {
		return 
	}
	if  := .txn.Delete();  != nil {
		.err.Store()
		return 
	}
	return nil
}

// Caller to commit must hold a write lock.
func ( *WriteBatch) () error {
	if  := .Error();  != nil {
		return 
	}
	if .finished {
		return y.ErrCommitAfterFinish
	}
	if  := .throttle.Do();  != nil {
		.err.Store()
		return 
	}
	.txn.CommitWith(.callback)
	.txn = .db.newTransaction(true, .isManaged)
	.txn.commitTs = .commitTs
	return .Error()
}

// Flush must be called at the end to ensure that any pending writes get committed to Badger. Flush
// returns any error stored by WriteBatch.
func ( *WriteBatch) () error {
	.Lock()
	 := .commit()
	if  != nil {
		.Unlock()
		return 
	}
	.finished = true
	.txn.Discard()
	.Unlock()

	if  := .throttle.Finish();  != nil {
		if .Error() != nil {
			return fmt.Errorf("wb.err: %w err: %w", .Error(), )
		}
		return 
	}

	return .Error()
}

// Error returns any errors encountered so far. No commits would be run once an error is detected.
func ( *WriteBatch) () error {
	// If the interface conversion fails, the err will be nil.
	,  := .err.Load().(error)
	return 
}