/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package badger

import (
	
	
	
	
	
	
	
	
	
	
	
	
	
	

	humanize 

	
	
	
	
	
	
	
	
)

var (
	badgerPrefix = []byte("!badger!")       // Prefix for internal keys used by badger.
	txnKey       = []byte("!badger!txn")    // For indicating end of entries in txn.
	bannedNsKey  = []byte("!badger!banned") // For storing the banned namespaces.
)

type closers struct {
	updateSize  *z.Closer
	compactors  *z.Closer
	memtable    *z.Closer
	writes      *z.Closer
	valueGC     *z.Closer
	pub         *z.Closer
	cacheHealth *z.Closer
}

type lockedKeys struct {
	sync.RWMutex
	keys map[uint64]struct{}
}

func ( *lockedKeys) ( uint64) {
	.Lock()
	defer .Unlock()
	.keys[] = struct{}{}
}

func ( *lockedKeys) ( uint64) bool {
	.RLock()
	defer .RUnlock()
	,  := .keys[]
	return 
}

func ( *lockedKeys) () []uint64 {
	.RLock()
	defer .RUnlock()
	 := make([]uint64, 0, len(.keys))
	for  := range .keys {
		 = append(, )
	}
	return 
}

// DB provides the various functions required to interact with Badger.
// DB is thread-safe.
type DB struct {
	testOnlyDBExtensions

	lock sync.RWMutex // Guards list of inmemory tables, not individual reads and writes.

	dirLockGuard *directoryLockGuard
	// nil if Dir and ValueDir are the same
	valueDirGuard *directoryLockGuard

	closers closers

	mt  *memTable   // Our latest (actively written) in-memory table
	imm []*memTable // Add here only AFTER pushing to flushChan.

	// Initialized via openMemTables.
	nextMemFid int

	opt       Options
	manifest  *manifestFile
	lc        *levelsController
	vlog      valueLog
	writeCh   chan *request
	flushChan chan *memTable // For flushing memtables.
	closeOnce sync.Once      // For closing DB only once.

	blockWrites atomic.Int32
	isClosed    atomic.Uint32

	orc              *oracle
	bannedNamespaces *lockedKeys
	threshold        *vlogThreshold

	pub        *publisher
	registry   *KeyRegistry
	blockCache *ristretto.Cache[[]byte, *table.Block]
	indexCache *ristretto.Cache[uint64, *fb.TableIndex]
	allocPool  *z.AllocatorPool
}

const (
	kvWriteChCapacity = 1000
)

func checkAndSetOptions( *Options) error {
	// It's okay to have zero compactors which will disable all compactions but
	// we cannot have just one compactor otherwise we will end up with all data
	// on level 2.
	if .NumCompactors == 1 {
		return errors.New("Cannot have 1 compactor. Need at least 2")
	}

	if .InMemory && (.Dir != "" || .ValueDir != "") {
		return errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set")
	}
	.maxBatchSize = (15 * .MemTableSize) / 100
	.maxBatchCount = .maxBatchSize / int64(skl.MaxNodeSize)

	// This is the maximum value, vlogThreshold can have if dynamic thresholding is enabled.
	.maxValueThreshold = math.Min(maxValueThreshold, float64(.maxBatchSize))
	if .VLogPercentile < 0.0 || .VLogPercentile > 1.0 {
		return errors.New("vlogPercentile must be within range of 0.0-1.0")
	}

	// We are limiting opt.ValueThreshold to maxValueThreshold for now.
	if .ValueThreshold > maxValueThreshold {
		return fmt.Errorf("Invalid ValueThreshold, must be less or equal to %d",
			maxValueThreshold)
	}

	// If ValueThreshold is greater than opt.maxBatchSize, we won't be able to push any data using
	// the transaction APIs. Transaction batches entries into batches of size opt.maxBatchSize.
	if .ValueThreshold > .maxBatchSize {
		return fmt.Errorf("Valuethreshold %d greater than max batch size of %d. Either "+
			"reduce opt.ValueThreshold or increase opt.BaseTableSize.",
			.ValueThreshold, .maxBatchSize)
	}
	// ValueLogFileSize should be strictly LESS than 2<<30 otherwise we will
	// overflow the uint32 when we mmap it in OpenMemtable.
	if !(.ValueLogFileSize < 2<<30 && .ValueLogFileSize >= 1<<20) {
		return ErrValueLogSize
	}

	if .ReadOnly {
		// Do not perform compaction in read only mode.
		.CompactL0OnClose = false
	}

	 := (.Compression != options.None) || (len(.EncryptionKey) > 0)
	if  && .BlockCacheSize == 0 {
		panic("BlockCacheSize should be set since compression/encryption are enabled")
	}
	return nil
}

// Open returns a new DB object.
func ( Options) (*DB, error) {
	if  := checkAndSetOptions(&);  != nil {
		return nil, 
	}
	var ,  *directoryLockGuard

	// Create directories and acquire lock on it only if badger is not running in InMemory mode.
	// We don't have any directories/files in InMemory mode so we don't need to acquire
	// any locks on them.
	if !.InMemory {
		if  := createDirs();  != nil {
			return nil, 
		}
		var  error
		if !.BypassLockGuard {
			,  = acquireDirectoryLock(.Dir, lockFile, .ReadOnly)
			if  != nil {
				return nil, 
			}
			defer func() {
				if  != nil {
					_ = .release()
				}
			}()
			,  := filepath.Abs(.Dir)
			if  != nil {
				return nil, 
			}
			,  := filepath.Abs(.ValueDir)
			if  != nil {
				return nil, 
			}
			if  !=  {
				,  = acquireDirectoryLock(.ValueDir, lockFile, .ReadOnly)
				if  != nil {
					return nil, 
				}
				defer func() {
					if  != nil {
						_ = .release()
					}
				}()
			}
		}
	}

	, ,  := openOrCreateManifestFile()
	if  != nil {
		return nil, 
	}
	defer func() {
		if  != nil {
			_ = .close()
		}
	}()

	 := &DB{
		imm:              make([]*memTable, 0, .NumMemtables),
		flushChan:        make(chan *memTable, .NumMemtables),
		writeCh:          make(chan *request, kvWriteChCapacity),
		opt:              ,
		manifest:         ,
		dirLockGuard:     ,
		valueDirGuard:    ,
		orc:              newOracle(),
		pub:              newPublisher(),
		allocPool:        z.NewAllocatorPool(8),
		bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
		threshold:        initVlogThreshold(&),
	}

	.syncChan = .syncChan

	// Cleanup all the goroutines started by badger in case of an error.
	defer func() {
		if  != nil {
			.Errorf("Received err: %v. Cleaning up...", )
			.cleanup()
			 = nil
		}
	}()

	if .BlockCacheSize > 0 {
		 := .BlockCacheSize / int64(.BlockSize)
		if  == 0 {
			// Make the value of this variable at least one since the cache requires
			// the number of counters to be greater than zero.
			 = 1
		}

		 := ristretto.Config[[]byte, *table.Block]{
			NumCounters:  * 8,
			MaxCost:     .BlockCacheSize,
			BufferItems: 64,
			Metrics:     true,
			OnExit:      table.BlockEvictHandler,
		}
		.blockCache,  = ristretto.NewCache[[]byte, *table.Block](&)
		if  != nil {
			return nil, y.Wrap(, "failed to create data cache")
		}
	}

	if .IndexCacheSize > 0 {
		// Index size is around 5% of the table size.
		 := int64(float64(.MemTableSize) * 0.05)
		 := .IndexCacheSize / 
		if  == 0 {
			// Make the value of this variable at least one since the cache requires
			// the number of counters to be greater than zero.
			 = 1
		}

		 := ristretto.Config[uint64, *fb.TableIndex]{
			NumCounters:  * 8,
			MaxCost:     .IndexCacheSize,
			BufferItems: 64,
			Metrics:     true,
		}
		.indexCache,  = ristretto.NewCache(&)
		if  != nil {
			return nil, y.Wrap(, "failed to create bf cache")
		}
	}

	.closers.cacheHealth = z.NewCloser(1)
	go .monitorCache(.closers.cacheHealth)

	if .opt.InMemory {
		.opt.SyncWrites = false
		// If badger is running in memory mode, push everything into the LSM Tree.
		.opt.ValueThreshold = math.MaxInt32
	}
	 := KeyRegistryOptions{
		ReadOnly:                      .ReadOnly,
		Dir:                           .Dir,
		EncryptionKey:                 .EncryptionKey,
		EncryptionKeyRotationDuration: .EncryptionKeyRotationDuration,
		InMemory:                      .InMemory,
	}

	if .registry,  = OpenKeyRegistry();  != nil {
		return , 
	}
	.calculateSize()
	.closers.updateSize = z.NewCloser(1)
	go .updateSize(.closers.updateSize)

	if  := .openMemTables(.opt);  != nil {
		return nil, y.Wrapf(, "while opening memtables")
	}

	if !.opt.ReadOnly {
		if .mt,  = .newMemTable();  != nil {
			return nil, y.Wrapf(, "cannot create memtable")
		}
	}

	// newLevelsController potentially loads files in directory.
	if .lc,  = newLevelsController(, &);  != nil {
		return , 
	}

	// Initialize vlog struct.
	.vlog.init()

	if !.ReadOnly {
		.closers.compactors = z.NewCloser(1)
		.lc.startCompact(.closers.compactors)

		.closers.memtable = z.NewCloser(1)
		go func() {
			.flushMemtable(.closers.memtable) // Need levels controller to be up.
		}()
		// Flush them to disk asap.
		for ,  := range .imm {
			.flushChan <- 
		}
	}
	// We do increment nextTxnTs below. So, no need to do it here.
	.orc.nextTxnTs = .MaxVersion()
	.opt.Infof("Set nextTxnTs to %d", .orc.nextTxnTs)

	if  = .vlog.open();  != nil {
		return , y.Wrapf(, "During db.vlog.open")
	}

	// Let's advance nextTxnTs to one more than whatever we observed via
	// replaying the logs.
	.orc.txnMark.Done(.orc.nextTxnTs)
	// In normal mode, we must update readMark so older versions of keys can be removed during
	// compaction when run in offline mode via the flatten tool.
	.orc.readMark.Done(.orc.nextTxnTs)
	.orc.incrementNextTs()

	go .threshold.listenForValueThresholdUpdate()

	if  := .initBannedNamespaces();  != nil {
		return , fmt.Errorf("While setting banned keys: %w", )
	}

	.closers.writes = z.NewCloser(1)
	go .doWrites(.closers.writes)

	if !.opt.InMemory {
		.closers.valueGC = z.NewCloser(1)
		go .vlog.waitOnGC(.closers.valueGC)
	}

	.closers.pub = z.NewCloser(1)
	go .pub.listenForUpdates(.closers.pub)

	 = nil
	 = nil
	 = nil
	return , nil
}

// initBannedNamespaces retrieves the banned namespaces from the DB and updates in-memory structure.
func ( *DB) () error {
	if .opt.NamespaceOffset < 0 {
		return nil
	}
	return .View(func( *Txn) error {
		 := DefaultIteratorOptions
		.Prefix = bannedNsKey
		.PrefetchValues = false
		.InternalAccess = true
		 := .NewIterator()
		defer .Close()
		for .Rewind(); .Valid(); .Next() {
			 := y.BytesToU64(.Item().Key()[len(bannedNsKey):])
			.bannedNamespaces.add()
		}
		return nil
	})
}

func ( *DB) () uint64 {
	var  uint64
	 := func( uint64) {
		if  >  {
			 = 
		}
	}
	.lock.Lock()
	// In read only mode, we do not create new mem table.
	if !.opt.ReadOnly {
		(.mt.maxVersion)
	}
	for ,  := range .imm {
		(.maxVersion)
	}
	.lock.Unlock()
	for ,  := range .Tables() {
		(.MaxVersion)
	}
	return 
}

func ( *DB) ( *z.Closer) {
	defer .Done()
	 := 0
	 := func( string,  *ristretto.Metrics) {
		// If the mean life expectancy is less than 10 seconds, the cache
		// might be too small.
		 := .LifeExpectancySeconds()
		if  == nil {
			return
		}
		 := .Count > 0 && float64(.Sum)/float64(.Count) < 10
		 := .Ratio() > 0 && .Ratio() < 0.4
		if  &&  {
			.opt.Warningf("%s might be too small. Metrics: %s\n", , )
			.opt.Warningf("Cache life expectancy (in seconds): %+v\n", )

		} else if .Count > 1000 && %5 == 0 {
			.opt.Infof("%s metrics: %s\n", , )
		}
	}

	 := time.NewTicker(1 * time.Minute)
	defer .Stop()
	for {
		select {
		case <-.HasBeenClosed():
			return
		case <-.C:
		}

		("Block cache", .BlockCacheMetrics())
		("Index cache", .IndexCacheMetrics())
		++
	}
}

// cleanup stops all the goroutines started by badger. This is used in open to
// cleanup goroutines in case of an error.
func ( *DB) () {
	.stopMemoryFlush()
	.stopCompactions()

	.blockCache.Close()
	.indexCache.Close()
	if .closers.updateSize != nil {
		.closers.updateSize.Signal()
	}
	if .closers.valueGC != nil {
		.closers.valueGC.Signal()
	}
	if .closers.writes != nil {
		.closers.writes.Signal()
	}
	if .closers.pub != nil {
		.closers.pub.Signal()
	}

	.orc.Stop()

	// Do not use vlog.Close() here. vlog.Close truncates the files. We don't
	// want to truncate files unless the user has specified the truncate flag.
}

// BlockCacheMetrics returns the metrics for the underlying block cache.
func ( *DB) () *ristretto.Metrics {
	if .blockCache != nil {
		return .blockCache.Metrics
	}
	return nil
}

// IndexCacheMetrics returns the metrics for the underlying index cache.
func ( *DB) () *ristretto.Metrics {
	if .indexCache != nil {
		return .indexCache.Metrics
	}
	return nil
}

// Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to
// disk. Calling DB.Close() multiple times would still only close the DB once.
func ( *DB) () error {
	var  error
	.closeOnce.Do(func() {
		 = .close()
	})
	return 
}

// IsClosed denotes if the badger DB is closed or not. A DB instance should not
// be used after closing it.
func ( *DB) () bool {
	return .isClosed.Load() == 1
}

func ( *DB) () ( error) {
	defer .allocPool.Release()

	.opt.Debugf("Closing database")
	.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(.lc.l0stallsMs.Load()))

	.blockWrites.Store(1)
	.isClosed.Store(1)

	if !.opt.InMemory {
		// Stop value GC first.
		.closers.valueGC.SignalAndWait()
	}

	// Stop writes next.
	.closers.writes.SignalAndWait()

	// Don't accept any more write.
	close(.writeCh)

	.closers.pub.SignalAndWait()
	.closers.cacheHealth.Signal()

	// Make sure that block writer is done pushing stuff into memtable!
	// Otherwise, you will have a race condition: we are trying to flush memtables
	// and remove them completely, while the block / memtable writer is still
	// trying to push stuff into the memtable. This will also resolve the value
	// offset problem: as we push into memtable, we update value offsets there.
	if .mt != nil {
		if .mt.sl.Empty() {
			// Remove the memtable if empty.
			.mt.DecrRef()
		} else {
			.opt.Debugf("Flushing memtable")
			for {
				 := func() bool {
					.lock.Lock()
					defer .lock.Unlock()
					y.AssertTrue(.mt != nil)
					select {
					case .flushChan <- .mt:
						.imm = append(.imm, .mt) // Flusher will attempt to remove this from s.imm.
						.mt = nil                    // Will segfault if we try writing!
						.opt.Debugf("pushed to flush chan\n")
						return true
					default:
						// If we fail to push, we need to unlock and wait for a short while.
						// The flushing operation needs to update s.imm. Otherwise, we have a
						// deadlock.
						// TODO: Think about how to do this more cleanly, maybe without any locks.
					}
					return false
				}()
				if  {
					break
				}
				time.Sleep(10 * time.Millisecond)
			}
		}
	}
	.stopMemoryFlush()
	.stopCompactions()

	// Force Compact L0
	// We don't need to care about cstatus since no parallel compaction is running.
	if .opt.CompactL0OnClose {
		 := .lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
		switch  {
		case errFillTables:
			// This error only means that there might be enough tables to do a compaction. So, we
			// should not report it to the end user to avoid confusing them.
		case nil:
			.opt.Debugf("Force compaction on level 0 done")
		default:
			.opt.Warningf("While forcing compaction on level 0: %v", )
		}
	}

	// Now close the value log.
	if  := .vlog.Close();  != nil {
		 = y.Wrap(, "DB.Close")
	}

	.opt.Infof(.LevelsToString())
	if  := .lc.close();  == nil {
		 = y.Wrap(, "DB.Close")
	}
	.opt.Debugf("Waiting for closer")
	.closers.updateSize.SignalAndWait()
	.orc.Stop()
	.blockCache.Close()
	.indexCache.Close()

	.threshold.close()

	if .opt.InMemory {
		return
	}

	if .dirLockGuard != nil {
		if  := .dirLockGuard.release();  == nil {
			 = y.Wrap(, "DB.Close")
		}
	}
	if .valueDirGuard != nil {
		if  := .valueDirGuard.release();  == nil {
			 = y.Wrap(, "DB.Close")
		}
	}
	if  := .manifest.close();  == nil {
		 = y.Wrap(, "DB.Close")
	}
	if  := .registry.Close();  == nil {
		 = y.Wrap(, "DB.Close")
	}

	// Fsync directories to ensure that lock file, and any other removed files whose directory
	// we haven't specifically fsynced, are guaranteed to have their directory entry removal
	// persisted to disk.
	if  := .syncDir(.opt.Dir);  == nil {
		 = y.Wrap(, "DB.Close")
	}
	if  := .syncDir(.opt.ValueDir);  == nil {
		 = y.Wrap(, "DB.Close")
	}

	return 
}

// VerifyChecksum verifies checksum for all tables on all levels.
// This method can be used to verify checksum, if opt.ChecksumVerificationMode is NoVerification.
func ( *DB) () error {
	return .lc.verifyChecksum()
}

const (
	lockFile = "LOCK"
)

// Sync syncs database content to disk. This function provides
// more control to user to sync data whenever required.
func ( *DB) () error {
	/**
	Make an attempt to sync both the logs, the active memtable's WAL and the vLog (1847).
	Cases:
	- All_ok			:: If both the logs sync successfully.

	- Entry_Lost		:: If an entry with a value pointer was present in the active memtable's WAL,
						:: and the WAL was synced but there was an error in syncing the vLog.
						:: The entry will be considered lost and this case will need to be handled during recovery.

	- Entries_Lost		:: If there were errors in syncing both the logs, multiple entries would be lost.

	- Entries_Lost      :: If the active memtable's WAL is not synced but the vLog is synced, it will
						:: result in entries being lost because recovery of the active memtable is done from its WAL.
						:: Check `UpdateSkipList` in memtable.go.

	- Nothing_lost		:: If an entry with its value was present in the active memtable's WAL, and the WAL was synced,
						:: but there was an error in syncing the vLog.
						:: Nothing is lost for this very specific entry because the entry is completely present in the memtable's WAL.

	- Partially_lost    :: If entries were written partially in either of the logs,
						:: the logs will be truncated during recovery.
						:: As a result of truncation, some entries might be lost.
					    :: Assume that 4KB of data is to be synced and invoking `Sync` results only in syncing 3KB
	                    :: of data and then the machine shuts down or the disk failure happens,
						:: this will result in partial writes. [[This case needs verification]]
	*/
	.lock.RLock()
	 := .mt.SyncWAL()
	.lock.RUnlock()

	 := .vlog.sync()
	return y.CombineErrors(, )
}

// getMemtables returns the current memtables and get references.
func ( *DB) () ([]*memTable, func()) {
	.lock.RLock()
	defer .lock.RUnlock()

	var  []*memTable

	// Mutable memtable does not exist in read-only mode.
	if !.opt.ReadOnly {
		// Get mutable memtable.
		 = append(, .mt)
		.mt.IncrRef()
	}

	// Get immutable memtables.
	 := len(.imm) - 1
	for  := range .imm {
		 = append(, .imm[-])
		.imm[-].IncrRef()
	}
	return , func() {
		for ,  := range  {
			.DecrRef()
		}
	}
}

// get returns the value in memtable or disk for given key.
// Note that value will include meta byte.
//
// IMPORTANT: We should never write an entry with an older timestamp for the same key, We need to
// maintain this invariant to search for the latest value of a key, or else we need to search in all
// tables and find the max version among them.  To maintain this invariant, we also need to ensure
// that all versions of a key are always present in the same table from level 1, because compaction
// can push any table down.
//
// Update(23/09/2020) - We have dropped the move key implementation. Earlier we
// were inserting move keys to fix the invalid value pointers but we no longer
// do that. For every get("fooX") call where X is the version, we will search
// for "fooX" in all the levels of the LSM tree. This is expensive but it
// removes the overhead of handling move keys completely.
func ( *DB) ( []byte) (y.ValueStruct, error) {
	if .IsClosed() {
		return y.ValueStruct{}, ErrDBClosed
	}
	,  := .getMemTables() // Lock should be released.
	defer ()

	var  y.ValueStruct
	 := y.ParseTs()

	y.NumGetsAdd(.opt.MetricsEnabled, 1)
	for  := 0;  < len(); ++ {
		 := [].sl.Get()
		y.NumMemtableGetsAdd(.opt.MetricsEnabled, 1)
		if .Meta == 0 && .Value == nil {
			continue
		}
		// Found the required version of the key, return immediately.
		if .Version ==  {
			y.NumGetsWithResultsAdd(.opt.MetricsEnabled, 1)
			return , nil
		}
		if .Version < .Version {
			 = 
		}
	}
	return .lc.get(, , 0)
}

var requestPool = sync.Pool{
	New: func() interface{} {
		return new(request)
	},
}

func ( *DB) ( *request) error {
	// We should check the length of b.Prts and b.Entries only when badger is not
	// running in InMemory mode. In InMemory mode, we don't write anything to the
	// value log and that's why the length of b.Ptrs will always be zero.
	if !.opt.InMemory && len(.Ptrs) != len(.Entries) {
		return fmt.Errorf("Ptrs and Entries don't match: %+v", )
	}

	for ,  := range .Entries {
		var  error
		if .skipVlogAndSetThreshold(.valueThreshold()) {
			// Will include deletion / tombstone case.
			 = .mt.Put(.Key,
				y.ValueStruct{
					Value: .Value,
					// Ensure value pointer flag is removed. Otherwise, the value will fail
					// to be retrieved during iterator prefetch. `bitValuePointer` is only
					// known to be set in write to LSM when the entry is loaded from a backup
					// with lower ValueThreshold and its value was stored in the value log.
					Meta:      .meta &^ bitValuePointer,
					UserMeta:  .UserMeta,
					ExpiresAt: .ExpiresAt,
				})
		} else {
			// Write pointer to Memtable.
			 = .mt.Put(.Key,
				y.ValueStruct{
					Value:     .Ptrs[].Encode(),
					Meta:      .meta | bitValuePointer,
					UserMeta:  .UserMeta,
					ExpiresAt: .ExpiresAt,
				})
		}
		if  != nil {
			return y.Wrapf(, "while writing to memTable")
		}
	}
	if .opt.SyncWrites {
		return .mt.SyncWAL()
	}
	return nil
}

// writeRequests is called serially by only one goroutine.
func ( *DB) ( []*request) error {
	if len() == 0 {
		return nil
	}

	 := func( error) {
		for ,  := range  {
			.Err = 
			.Wg.Done()
		}
	}
	.opt.Debugf("writeRequests called. Writing to value log")
	 := .vlog.write()
	if  != nil {
		()
		return 
	}

	.opt.Debugf("Writing to memtable")
	var  int
	for ,  := range  {
		if len(.Entries) == 0 {
			continue
		}
		 += len(.Entries)
		var  uint64
		var  error
		for  = .ensureRoomForWrite();  == errNoRoom;  = .ensureRoomForWrite() {
			++
			if %100 == 0 {
				.opt.Debugf("Making room for writes")
			}
			// We need to poll a bit because both hasRoomForWrite and the flusher need access to s.imm.
			// When flushChan is full and you are blocked there, and the flusher is trying to update s.imm,
			// you will get a deadlock.
			time.Sleep(10 * time.Millisecond)
		}
		if  != nil {
			()
			return y.Wrap(, "writeRequests")
		}
		if  := .writeToLSM();  != nil {
			()
			return y.Wrap(, "writeRequests")
		}
	}

	.opt.Debugf("Sending updates to subscribers")
	.pub.sendUpdates()

	(nil)
	.opt.Debugf("%d entries written", )
	return nil
}

func ( *DB) ( []*Entry) (*request, error) {
	if .blockWrites.Load() == 1 {
		return nil, ErrBlockedWrites
	}
	var ,  int64
	for ,  := range  {
		 += .estimateSizeAndSetThreshold(.valueThreshold())
		++
	}
	y.NumBytesWrittenUserAdd(.opt.MetricsEnabled, )
	if  >= .opt.maxBatchCount ||  >= .opt.maxBatchSize {
		return nil, ErrTxnTooBig
	}

	// We can only service one request because we need each txn to be stored in a contiguous section.
	// Txns should not interleave among other txns or rewrites.
	 := requestPool.Get().(*request)
	.reset()
	.Entries = 
	.Wg.Add(1)
	.IncrRef()     // for db write
	.writeCh <-  // Handled in doWrites.
	y.NumPutsAdd(.opt.MetricsEnabled, int64(len()))

	return , nil
}

func ( *DB) ( *z.Closer) {
	defer .Done()
	 := make(chan struct{}, 1)

	 := func( []*request) {
		if  := .writeRequests();  != nil {
			.opt.Errorf("writeRequests: %v", )
		}
		<-
	}

	// This variable tracks the number of pending writes.
	 := new(expvar.Int)
	y.PendingWritesSet(.opt.MetricsEnabled, .opt.Dir, )

	 := make([]*request, 0, 10)
	for {
		var  *request
		select {
		case  = <-.writeCh:
		case <-.HasBeenClosed():
			goto 
		}

		for {
			 = append(, )
			.Set(int64(len()))

			if len() >= 3*kvWriteChCapacity {
				 <- struct{}{} // blocking.
				goto 
			}

			select {
			// Either push to pending, or continue to pick from writeCh.
			case  = <-.writeCh:
			case  <- struct{}{}:
				goto 
			case <-.HasBeenClosed():
				goto 
			}
		}

	:
		// All the pending request are drained.
		// Don't close the writeCh, because it has be used in several places.
		for {
			select {
			case  = <-.writeCh:
				 = append(, )
			default:
				 <- struct{}{} // Push to pending before doing a write.
				()
				return
			}
		}

	:
		go ()
		 = make([]*request, 0, 10)
		.Set(0)
	}
}

// batchSet applies a list of badger.Entry. If a request level error occurs it
// will be returned.
//
//	Check(kv.BatchSet(entries))
func ( *DB) ( []*Entry) error {
	,  := .sendToWriteCh()
	if  != nil {
		return 
	}

	return .Wait()
}

// batchSetAsync is the asynchronous version of batchSet. It accepts a callback
// function which is called when all the sets are complete. If a request level
// error occurs, it will be passed back via the callback.
//
//	err := kv.BatchSetAsync(entries, func(err error)) {
//	   Check(err)
//	}
func ( *DB) ( []*Entry,  func(error)) error {
	,  := .sendToWriteCh()
	if  != nil {
		return 
	}
	go func() {
		 := .Wait()
		// Write is complete. Let's call the callback function now.
		()
	}()
	return nil
}

var errNoRoom = errors.New("No room for write")

// ensureRoomForWrite is always called serially.
func ( *DB) () error {
	var  error
	.lock.Lock()
	defer .lock.Unlock()

	y.AssertTrue(.mt != nil) // A nil mt indicates that DB is being closed.
	if !.mt.isFull() {
		return nil
	}

	select {
	case .flushChan <- .mt:
		.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
			.mt.sl.MemSize(), len(.flushChan))
		// We manage to push this task. Let's modify imm.
		.imm = append(.imm, .mt)
		.mt,  = .newMemTable()
		if  != nil {
			return y.Wrapf(, "cannot create new mem table")
		}
		// New memtable is empty. We certainly have room.
		return nil
	default:
		// We need to do this to unlock and allow the flusher to modify imm.
		return errNoRoom
	}
}

func arenaSize( Options) int64 {
	return .MemTableSize + .maxBatchSize + .maxBatchCount*int64(skl.MaxNodeSize)
}

// buildL0Table builds a new table from the memtable.
func buildL0Table( y.Iterator,  [][]byte,  table.Options) *table.Builder {
	defer .Close()

	 := table.NewTableBuilder()
	for .Rewind(); .Valid(); .Next() {
		if len() > 0 && hasAnyPrefixes(.Key(), ) {
			continue
		}
		 := .Value()
		var  valuePointer
		if .Meta&bitValuePointer > 0 {
			.Decode(.Value)
		}
		.Add(.Key(), .Value(), .Len)
	}

	return 
}

// handleMemTableFlush must be run serially.
func ( *DB) ( *memTable,  [][]byte) error {
	 := buildTableOptions()
	 := .sl.NewUniIterator(false)
	 := buildL0Table(, nil, )
	defer .Close()

	// buildL0Table can return nil if the none of the items in the skiplist are
	// added to the builder. This can happen when drop prefix is set and all
	// the items are skipped.
	if .Empty() {
		.Finish()
		return nil
	}

	 := .lc.reserveFileID()
	var  *table.Table
	var  error
	if .opt.InMemory {
		 := .Finish()
		,  = table.OpenInMemoryTable(, , &)
	} else {
		,  = table.CreateTable(table.NewFilename(, .opt.Dir), )
	}
	if  != nil {
		return y.Wrap(, "error while creating table")
	}
	// We own a ref on tbl.
	 = .lc.addLevel0Table() // This will incrRef
	_ = .DecrRef()               // Releases our ref.
	return 
}

// flushMemtable must keep running until we send it an empty memtable. If there
// are errors during handling the memtable flush, we'll retry indefinitely.
func ( *DB) ( *z.Closer) {
	defer .Done()

	for  := range .flushChan {
		if  == nil {
			continue
		}

		for {
			if  := .handleMemTableFlush(, nil);  != nil {
				// Encountered error. Retry indefinitely.
				.opt.Errorf("error flushing memtable to disk: %v, retrying", )
				time.Sleep(time.Second)
				continue
			}

			// Update s.imm. Need a lock.
			.lock.Lock()
			// This is a single-threaded operation. mt corresponds to the head of
			// db.imm list. Once we flush it, we advance db.imm. The next mt
			// which would arrive here would match db.imm[0], because we acquire a
			// lock over DB when pushing to flushChan.
			// TODO: This logic is dirty AF. Any change and this could easily break.
			y.AssertTrue( == .imm[0])
			.imm = .imm[1:]
			.DecrRef() // Return memory.
			// unlock
			.lock.Unlock()
			break
		}
	}
}

func exists( string) (bool, error) {
	,  := os.Stat()
	if  == nil {
		return true, nil
	}
	if os.IsNotExist() {
		return false, nil
	}
	return true, 
}

// This function does a filewalk, calculates the size of vlog and sst files and stores it in
// y.LSMSize and y.VlogSize.
func ( *DB) () {
	if .opt.InMemory {
		return
	}
	 := func( int64) *expvar.Int {
		 := new(expvar.Int)
		.Add()
		return 
	}

	 := func( string) (int64, int64) {
		var ,  int64
		 := filepath.Walk(, func( string,  os.FileInfo,  error) error {
			if  != nil {
				return 
			}
			 := filepath.Ext()
			switch  {
			case ".sst":
				 += .Size()
			case ".vlog":
				 += .Size()
			}
			return nil
		})
		if  != nil {
			.opt.Debugf("Got error while calculating total size of directory: %s", )
		}
		return , 
	}

	,  := (.opt.Dir)
	y.LSMSizeSet(.opt.MetricsEnabled, .opt.Dir, ())
	// If valueDir is different from dir, we'd have to do another walk.
	if .opt.ValueDir != .opt.Dir {
		_,  = (.opt.ValueDir)
	}
	y.VlogSizeSet(.opt.MetricsEnabled, .opt.ValueDir, ())
}

func ( *DB) ( *z.Closer) {
	defer .Done()
	if .opt.InMemory {
		return
	}

	 := time.NewTicker(time.Minute)
	defer .Stop()

	for {
		select {
		case <-.C:
			.calculateSize()
		case <-.HasBeenClosed():
			return
		}
	}
}

// RunValueLogGC triggers a value log garbage collection.
//
// It picks value log files to perform GC based on statistics that are collected
// during compactions.  If no such statistics are available, then log files are
// picked in random order. The process stops as soon as the first log file is
// encountered which does not result in garbage collection.
//
// When a log file is picked, it is first sampled. If the sample shows that we
// can discard at least discardRatio space of that file, it would be rewritten.
//
// If a call to RunValueLogGC results in no rewrites, then an ErrNoRewrite is
// thrown indicating that the call resulted in no file rewrites.
//
// We recommend setting discardRatio to 0.5, thus indicating that a file be
// rewritten if half the space can be discarded.  This results in a lifetime
// value log write amplification of 2 (1 from original write + 0.5 rewrite +
// 0.25 + 0.125 + ... = 2). Setting it to higher value would result in fewer
// space reclaims, while setting it to a lower value would result in more space
// reclaims at the cost of increased activity on the LSM tree. discardRatio
// must be in the range (0.0, 1.0), both endpoints excluded, otherwise an
// ErrInvalidRequest is returned.
//
// Only one GC is allowed at a time. If another value log GC is running, or DB
// has been closed, this would return an ErrRejected.
//
// Note: Every time GC is run, it would produce a spike of activity on the LSM
// tree.
func ( *DB) ( float64) error {
	if .opt.InMemory {
		return ErrGCInMemoryMode
	}
	if  >= 1.0 ||  <= 0.0 {
		return ErrInvalidRequest
	}

	// Pick a log file and run GC
	return .vlog.runGC()
}

// Size returns the size of lsm and value log files in bytes. It can be used to decide how often to
// call RunValueLogGC.
func ( *DB) () (,  int64) {
	if y.LSMSizeGet(.opt.MetricsEnabled, .opt.Dir) == nil {
		,  = 0, 0
		return
	}
	 = y.LSMSizeGet(.opt.MetricsEnabled, .opt.Dir).(*expvar.Int).Value()
	 = y.VlogSizeGet(.opt.MetricsEnabled, .opt.ValueDir).(*expvar.Int).Value()
	return
}

// Sequence represents a Badger sequence.
type Sequence struct {
	lock      sync.Mutex
	db        *DB
	key       []byte
	next      uint64
	leased    uint64
	bandwidth uint64
}

// Next would return the next integer in the sequence, updating the lease by running a transaction
// if needed.
func ( *Sequence) () (uint64, error) {
	.lock.Lock()
	defer .lock.Unlock()
	if .next >= .leased {
		if  := .updateLease();  != nil {
			return 0, 
		}
	}
	 := .next
	.next++
	return , nil
}

// Release the leased sequence to avoid wasted integers. This should be done right
// before closing the associated DB. However it is valid to use the sequence after
// it was released, causing a new lease with full bandwidth.
func ( *Sequence) () error {
	.lock.Lock()
	defer .lock.Unlock()
	 := .db.Update(func( *Txn) error {
		,  := .Get(.key)
		if  != nil {
			return 
		}

		var  uint64
		if  := .Value(func( []byte) error {
			 = binary.BigEndian.Uint64()
			return nil
		});  != nil {
			return 
		}

		if  == .leased {
			var  [8]byte
			binary.BigEndian.PutUint64([:], .next)
			return .SetEntry(NewEntry(.key, [:]))
		}

		return nil
	})
	if  != nil {
		return 
	}
	.leased = .next
	return nil
}

func ( *Sequence) () error {
	return .db.Update(func( *Txn) error {
		,  := .Get(.key)
		switch {
		case  == ErrKeyNotFound:
			.next = 0
		case  != nil:
			return 
		default:
			var  uint64
			if  := .Value(func( []byte) error {
				 = binary.BigEndian.Uint64()
				return nil
			});  != nil {
				return 
			}
			.next = 
		}

		 := .next + .bandwidth
		var  [8]byte
		binary.BigEndian.PutUint64([:], )
		if  = .SetEntry(NewEntry(.key, [:]));  != nil {
			return 
		}
		.leased = 
		return nil
	})
}

// GetSequence would initiate a new sequence object, generating it from the stored lease, if
// available, in the database. Sequence can be used to get a list of monotonically increasing
// integers. Multiple sequences can be created by providing different keys. Bandwidth sets the
// size of the lease, determining how many Next() requests can be served from memory.
//
// GetSequence is not supported on ManagedDB. Calling this would result in a panic.
func ( *DB) ( []byte,  uint64) (*Sequence, error) {
	if .opt.managedTxns {
		panic("Cannot use GetSequence with managedDB=true.")
	}

	switch {
	case len() == 0:
		return nil, ErrEmptyKey
	case  == 0:
		return nil, ErrZeroBandwidth
	}
	 := &Sequence{
		db:        ,
		key:       ,
		next:      0,
		leased:    0,
		bandwidth: ,
	}
	 := .updateLease()
	return , 
}

// Tables gets the TableInfo objects from the level controller. If withKeysCount
// is true, TableInfo objects also contain counts of keys for the tables.
func ( *DB) () []TableInfo {
	return .lc.getTableInfo()
}

// Levels gets the LevelInfo.
func ( *DB) () []LevelInfo {
	return .lc.getLevelInfo()
}

// EstimateSize can be used to get rough estimate of data size for a given prefix.
func ( *DB) ( []byte) (uint64, uint64) {
	var ,  uint64
	 := .Tables()
	for ,  := range  {
		if bytes.HasPrefix(.Left, ) && bytes.HasPrefix(.Right, ) {
			 += uint64(.OnDiskSize)
			 += uint64(.UncompressedSize)
		}
	}
	return , 
}

// Ranges can be used to get rough key ranges to divide up iteration over the DB. The ranges here
// would consider the prefix, but would not necessarily start or end with the prefix. In fact, the
// first range would have nil as left key, and the last range would have nil as the right key.
func ( *DB) ( []byte,  int) []*keyRange {
	var  []string
	 := .Tables()

	// We just want table ranges here and not keys count.
	for ,  := range  {
		// We don't use ti.Left, because that has a tendency to store !badger keys. Skip over tables
		// at upper levels. Only choose tables from the last level.
		if .Level != .opt.MaxLevels-1 {
			continue
		}
		if bytes.HasPrefix(.Right, ) {
			 = append(, string(.Right))
		}
	}

	// If the number of splits is low, look at the offsets inside the
	// tables to generate more splits.
	if len() < 32 {
		 := len()
		if  == 0 {
			 = 1
		}
		 := 32 / 
		if  == 0 {
			 = 1
		}
		 = .lc.keySplits(, )
	}

	// If the number of splits is still < 32, then look at the memtables.
	if len() < 32 {
		 := 10000
		 := func( *memTable) {
			if  == nil {
				return
			}
			 := 0
			 := .sl.NewIterator()
			for .SeekToFirst(); .Valid(); .Next() {
				if % == 0 {
					// Add a split every maxPerSplit keys.
					if bytes.HasPrefix(.Key(), ) {
						 = append(, string(.Key()))
					}
				}
				 += 1
			}
			_ = .Close()
		}

		.lock.Lock()
		defer .lock.Unlock()
		var  []*memTable
		 = append(, .imm...)
		for ,  := range  {
			()
		}
		(.mt)
	}

	// We have our splits now. Let's convert them to ranges.
	sort.Strings()
	var  []*keyRange
	var  []byte
	for ,  := range  {
		 = append(, &keyRange{left: , right: y.SafeCopy(nil, []byte())})
		 = y.SafeCopy(nil, []byte())
	}
	 = append(, &keyRange{left: })

	// Figure out the approximate table size this range has to deal with.
	for ,  := range  {
		 := keyRange{left: .Left, right: .Right}
		for ,  := range  {
			if len(.left) == 0 || len(.right) == 0 {
				continue
			}
			if .overlapsWith() {
				.size += int64(.UncompressedSize)
			}
		}
	}

	var  int64
	for ,  := range  {
		 += .size
	}
	if  == 0 {
		return 
	}
	// Figure out the average size, so we know how to bin the ranges together.
	 :=  / int64()

	var  []*keyRange
	var  int
	for  < len() {
		 := []
		 := &keyRange{left: .left, size: .size, right: .right}
		++
		for ;  < len(); ++ {
			 := []
			if .size+.size >  {
				break
			}
			.right = .right
			.size += .size
		}
		 = append(, )
	}
	return 
}

// MaxBatchCount returns max possible entries in batch
func ( *DB) () int64 {
	return .opt.maxBatchCount
}

// MaxBatchSize returns max possible batch size
func ( *DB) () int64 {
	return .opt.maxBatchSize
}

func ( *DB) () {
	// Stop memtable flushes.
	if .closers.memtable != nil {
		close(.flushChan)
		.closers.memtable.SignalAndWait()
	}
}

func ( *DB) () {
	// Stop compactions.
	if .closers.compactors != nil {
		.closers.compactors.SignalAndWait()
	}
}

func ( *DB) () {
	// Resume compactions.
	if .closers.compactors != nil {
		.closers.compactors = z.NewCloser(1)
		.lc.startCompact(.closers.compactors)
	}
}

func ( *DB) () {
	// Start memory fluhser.
	if .closers.memtable != nil {
		.flushChan = make(chan *memTable, .opt.NumMemtables)
		.closers.memtable = z.NewCloser(1)
		go func() {
			.flushMemtable(.closers.memtable)
		}()
	}
}

// Flatten can be used to force compactions on the LSM tree so all the tables fall on the same
// level. This ensures that all the versions of keys are colocated and not split across multiple
// levels, which is necessary after a restore from backup. During Flatten, live compactions are
// stopped. Ideally, no writes are going on during Flatten. Otherwise, it would create competition
// between flattening the tree and new tables being created at level zero.
func ( *DB) ( int) error {

	.stopCompactions()
	defer .startCompactions()

	 := func( compactionPriority) error {
		.opt.Infof("Attempting to compact with %+v\n", )
		 := make(chan error, 1)
		for  := 0;  < ; ++ {
			go func() {
				 <- .lc.doCompact(175, )
			}()
		}
		var  int
		var  error
		for  := 0;  < ; ++ {
			 := <-
			if  != nil {
				 = 
				.opt.Warningf("While running doCompact with %+v. Error: %v\n", , )
			} else {
				++
			}
		}
		if  == 0 {
			return 
		}
		// We could do at least one successful compaction. So, we'll consider this a success.
		.opt.Infof("%d compactor(s) succeeded. One or more tables from level %d compacted.\n",
			, .level)
		return nil
	}

	 := func( int64) string {
		return humanize.IBytes(uint64())
	}

	 := .lc.levelTargets()
	for {
		.opt.Infof("\n")
		var  []int
		for ,  := range .lc.levels {
			 := .getTotalSize()
			.opt.Infof("Level: %d. %8s Size. %8s Max.\n",
				, (.getTotalSize()), (.targetSz[]))
			if  > 0 {
				 = append(, )
			}
		}
		if len() <= 1 {
			 := .lc.pickCompactLevels(nil)
			if len() == 0 || [0].score <= 1.0 {
				.opt.Infof("All tables consolidated into one level. Flattening done.\n")
				return nil
			}
			if  := ([0]);  != nil {
				return 
			}
			continue
		}
		// Create an artificial compaction priority, to ensure that we compact the level.
		 := compactionPriority{level: [0], score: 1.71}
		if  := ();  != nil {
			return 
		}
	}
}

func ( *DB) () error {
	// Stop accepting new writes.
	if !.blockWrites.CompareAndSwap(0, 1) {
		return ErrBlockedWrites
	}

	// Make all pending writes finish. The following will also close writeCh.
	.closers.writes.SignalAndWait()
	.opt.Infof("Writes flushed. Stopping compactions now...")
	return nil
}

func ( *DB) () {
	.closers.writes = z.NewCloser(1)
	go .doWrites(.closers.writes)

	// Resume writes.
	.blockWrites.Store(0)
}

func ( *DB) () (func(), error) {
	if .opt.ReadOnly {
		panic("Attempting to drop data in read-only mode.")
	}
	// In order prepare for drop, we need to block the incoming writes and
	// write it to db. Then, flush all the pending memtable. So that, we
	// don't miss any entries.
	if  := .blockWrite();  != nil {
		return func() {}, 
	}
	 := make([]*request, 0, 10)
	for {
		select {
		case  := <-.writeCh:
			 = append(, )
		default:
			if  := .writeRequests();  != nil {
				.opt.Errorf("writeRequests: %v", )
			}
			.stopMemoryFlush()
			return func() {
				.opt.Infof("Resuming writes")
				.startMemoryFlush()
				.unblockWrite()
			}, nil
		}
	}
}

// DropAll would drop all the data stored in Badger. It does this in the following way.
// - Stop accepting new writes.
// - Pause memtable flushes and compactions.
// - Pick all tables from all levels, create a changeset to delete all these
// tables and apply it to manifest.
// - Pick all log files from value log, and delete all of them. Restart value log files from zero.
// - Resume memtable flushes and compactions.
//
// NOTE: DropAll is resilient to concurrent writes, but not to reads. It is up to the user to not do
// any reads while DropAll is going on, otherwise they may result in panics. Ideally, both reads and
// writes are paused before running DropAll, and resumed after it is finished.
func ( *DB) () error {
	,  := .dropAll()
	if  != nil {
		()
	}
	return 
}

func ( *DB) () (func(), error) {
	.opt.Infof("DropAll called. Blocking writes...")
	,  := .prepareToDrop()
	if  != nil {
		return , 
	}
	// prepareToDrop will stop all the incoming write and flushes any pending memtables.
	// Before we drop, we'll stop the compaction because anyways all the data are going to
	// be deleted.
	.stopCompactions()
	 := func() {
		.startCompactions()
		()
	}
	// Block all foreign interactions with memory tables.
	.lock.Lock()
	defer .lock.Unlock()

	// Remove inmemory tables. Calling DecrRef for safety. Not sure if they're absolutely needed.
	.mt.DecrRef()
	for ,  := range .imm {
		.DecrRef()
	}
	.imm = .imm[:0]
	.mt,  = .newMemTable() // Set it up for future writes.
	if  != nil {
		return , y.Wrapf(, "cannot open new memtable")
	}

	,  := .lc.dropTree()
	if  != nil {
		return , 
	}
	.opt.Infof("Deleted %d SSTables. Now deleting value logs...\n", )

	,  = .vlog.dropAll()
	if  != nil {
		return , 
	}
	.lc.nextFileID.Store(1)
	.opt.Infof("Deleted %d value log files. DropAll done.\n", )
	.blockCache.Clear()
	.indexCache.Clear()
	.threshold.Clear(.opt)
	return , nil
}

// DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
//   - Stop accepting new writes.
//   - Stop memtable flushes before acquiring lock. Because we're acquiring lock here
//     and memtable flush stalls for lock, which leads to deadlock
//   - Flush out all memtables, skipping over keys with the given prefix, Kp.
//   - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp
//     back after a restart.
//   - Stop compaction.
//   - Compact L0->L1, skipping over Kp.
//   - Compact rest of the levels, Li->Li, picking tables which have Kp.
//   - Resume memtable flushes, compactions and writes.
func ( *DB) ( ...[]byte) error {
	if len() == 0 {
		return nil
	}
	.opt.Infof("DropPrefix called for %s", )
	,  := .prepareToDrop()
	if  != nil {
		return 
	}
	defer ()

	var  [][]byte
	if ,  = .filterPrefixesToDrop();  != nil {
		return 
	}
	// If there is no prefix for which the data already exist, do not do anything.
	if len() == 0 {
		.opt.Infof("No prefixes to drop")
		return nil
	}
	// Block all foreign interactions with memory tables.
	.lock.Lock()
	defer .lock.Unlock()

	.imm = append(.imm, .mt)
	for ,  := range .imm {
		if .sl.Empty() {
			.DecrRef()
			continue
		}
		.opt.Debugf("Flushing memtable")
		if  := .handleMemTableFlush(, );  != nil {
			.opt.Errorf("While trying to flush memtable: %v", )
			return 
		}
		.DecrRef()
	}
	.stopCompactions()
	defer .startCompactions()
	.imm = .imm[:0]
	.mt,  = .newMemTable()
	if  != nil {
		return y.Wrapf(, "cannot create new mem table")
	}

	// Drop prefixes from the levels.
	if  := .lc.dropPrefixes();  != nil {
		return 
	}
	.opt.Infof("DropPrefix done")
	return nil
}

func ( *DB) ( [][]byte) ([][]byte, error) {
	var  [][]byte
	for ,  := range  {
		 := .View(func( *Txn) error {
			 := DefaultIteratorOptions
			.Prefix = 
			.PrefetchValues = false
			 := .NewIterator()
			defer .Close()
			.Rewind()
			if .ValidForPrefix() {
				 = append(, )
			}
			return nil
		})
		if  != nil {
			return , 
		}
	}
	return , nil
}

// Checks if the key is banned. Returns the respective error if the key belongs to any of the banned
// namepspaces. Else it returns nil.
func ( *DB) ( []byte) error {
	if .opt.NamespaceOffset < 0 {
		return nil
	}
	if len() <= .opt.NamespaceOffset+8 {
		return nil
	}
	if .bannedNamespaces.has(y.BytesToU64([.opt.NamespaceOffset:])) {
		return ErrBannedKey
	}
	return nil
}

// BanNamespace bans a namespace. Read/write to keys belonging to any of such namespace is denied.
func ( *DB) ( uint64) error {
	if .opt.NamespaceOffset < 0 {
		return ErrNamespaceMode
	}
	.opt.Infof("Banning namespace: %d", )
	// First set the banned namespaces in DB and then update the in-memory structure.
	 := y.KeyWithTs(append(bannedNsKey, y.U64ToBytes()...), 1)
	 := []*Entry{{
		Key:   ,
		Value: nil,
	}}
	,  := .sendToWriteCh()
	if  != nil {
		return 
	}
	if  := .Wait();  != nil {
		return 
	}
	.bannedNamespaces.add()
	return nil
}

// BannedNamespaces returns the list of prefixes banned for DB.
func ( *DB) () []uint64 {
	return .bannedNamespaces.all()
}

// KVList contains a list of key-value pairs.
type KVList = pb.KVList

// Subscribe can be used to watch key changes for the given key prefixes and the ignore string.
// At least one prefix should be passed, or an error will be returned.
// You can use an empty prefix to monitor all changes to the DB.
// Ignore string is the byte ranges for which prefix matching will be ignored.
// For example: ignore = "2-3", and prefix = "abc" will match for keys "abxxc", "abdfc" etc.
// This function blocks until the given context is done or an error occurs.
// The given function will be called with a new KVList containing the modified keys and the
// corresponding values.
func ( *DB) ( context.Context,  func( *KVList) error,  []pb.Match) error {
	if  == nil {
		return ErrNilCallback
	}

	 := z.NewCloser(1)
	,  := .pub.newSubscriber(, )
	if  != nil {
		return y.Wrapf(, "while creating a new subscriber")
	}
	 := func( *pb.KVList) error {
		for {
			select {
			case  := <-.sendCh:
				.Kv = append(.Kv, .Kv...)
			default:
				if len(.GetKv()) > 0 {
					return ()
				}
				return nil
			}
		}
	}

	 := func() {
		for {
			select {
			case ,  := <-.sendCh:
				if ! {
					// Channel is closed.
					return
				}
			default:
				return
			}
		}
	}
	for {
		select {
		case <-.HasBeenClosed():
			// No need to delete here. Closer will be called only while
			// closing DB. Subscriber will be deleted by cleanSubscribers.
			 := (new(pb.KVList))
			// Drain if any pending updates.
			.Done()
			return 
		case <-.Done():
			.Done()
			.active.Store(0)
			()
			.pub.deleteSubscriber(.id)
			// Delete the subscriber to avoid further updates.
			return .Err()
		case  := <-.sendCh:
			 := ()
			if  != nil {
				.Done()
				.active.Store(0)
				()
				// Delete the subscriber if there is an error by the callback.
				.pub.deleteSubscriber(.id)
				return 
			}
		}
	}
}

func ( *DB) ( string) error {
	if .opt.InMemory {
		return nil
	}
	return syncDir()
}

func createDirs( Options) error {
	for ,  := range []string{.Dir, .ValueDir} {
		,  := exists()
		if  != nil {
			return y.Wrapf(, "Invalid Dir: %q", )
		}
		if ! {
			if .ReadOnly {
				return fmt.Errorf("Cannot find directory %q for read-only open", )
			}
			// Try to create the directory
			 = os.MkdirAll(, 0700)
			if  != nil {
				return y.Wrapf(, "Error Creating Dir: %q", )
			}
		}
	}
	return nil
}

// Stream the contents of this DB to a new DB with options outOptions that will be
// created in outDir.
func ( *DB) ( Options) error {
	 := .Dir

	// Open output DB.
	,  := OpenManaged()
	if  != nil {
		return y.Wrapf(, "cannot open out DB at %s", )
	}
	defer .Close()
	 := .NewStreamWriter()
	if  := .Prepare();  != nil {
		return y.Wrapf(, "cannot create stream writer in out DB at %s", )
	}

	// Stream contents of DB to the output DB.
	 := .NewStreamAt(math.MaxUint64)
	.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", )

	.Send = func( *z.Buffer) error {
		return .Write()
	}
	if  := .Orchestrate(context.Background());  != nil {
		return y.Wrapf(, "cannot stream DB to out DB at %s", )
	}
	if  := .Flush();  != nil {
		return y.Wrapf(, "cannot flush writer")
	}
	return nil
}

// Opts returns a copy of the DB options.
func ( *DB) () Options {
	return .opt
}

type CacheType int

const (
	BlockCache CacheType = iota
	IndexCache
)

// CacheMaxCost updates the max cost of the given cache (either block or index cache).
// The call will have an effect only if the DB was created with the cache. Otherwise it is
// a no-op. If you pass a negative value, the function will return the current value
// without updating it.
func ( *DB) ( CacheType,  int64) (int64, error) {
	if  == nil {
		return 0, nil
	}

	if  < 0 {
		switch  {
		case BlockCache:
			return .blockCache.MaxCost(), nil
		case IndexCache:
			return .indexCache.MaxCost(), nil
		default:
			return 0, errors.New("invalid cache type")
		}
	}

	switch  {
	case BlockCache:
		.blockCache.UpdateMaxCost()
		return , nil
	case IndexCache:
		.indexCache.UpdateMaxCost()
		return , nil
	default:
		return 0, errors.New("invalid cache type")
	}
}

func ( *DB) () string {
	 := .Levels()
	 := func( int64) string {
		return humanize.IBytes(uint64())
	}
	 := func( bool) string {
		if  {
			return "B"
		}
		return " "
	}

	var  strings.Builder
	.WriteRune('\n')
	for ,  := range  {
		.WriteString(fmt.Sprintf(
			"Level %d [%s]: NumTables: %02d. Size: %s of %s. Score: %.2f->%.2f"+
				" StaleData: %s Target FileSize: %s\n",
			.Level, (.IsBaseLevel), .NumTables,
			(.Size), (.TargetSize), .Score, .Adjusted, (.StaleDatSize),
			(.TargetFileSize)))
	}
	.WriteString("Level Done\n")
	return .String()
}