/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package badger

import (
	
	
	
	
	
	

	humanize 
	

	
	
	
)

const batchSize = 16 << 20 // 16 MB

// maxStreamSize is the maximum allowed size of a stream batch. This is a soft limit
// as a single list that is still over the limit will have to be sent as is since it
// cannot be split further. This limit prevents the framework from creating batches
// so big that sending them causes issues (e.g running into the max size gRPC limit).
var maxStreamSize = uint64(100 << 20) // 100MB

// Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up
// key-values, batch them up and call Send. Stream does concurrent iteration over many smaller key
// ranges. It does NOT send keys in lexicographical sorted order. To get keys in sorted
// order, use Iterator.
type Stream struct {
	// Prefix to only iterate over certain range of keys. If set to nil (default), Stream would
	// iterate over the entire DB.
	Prefix []byte

	// Number of goroutines to use for iterating over key ranges. Defaults to 8.
	NumGo int

	// Badger would produce log entries in Infof to indicate the progress of Stream. LogPrefix can
	// be used to help differentiate them from other activities. Default is "Badger.Stream".
	LogPrefix string

	// ChooseKey is invoked each time a new key is encountered. Note that this is not called
	// on every version of the value, only the first encountered version (i.e. the highest version
	// of the value a key has). ChooseKey can be left nil to select all keys.
	//
	// Note: Calls to ChooseKey are concurrent.
	ChooseKey func(item *Item) bool

	// MaxSize is the maximum allowed size of a stream batch. This is a soft limit
	// as a single list that is still over the limit will have to be sent as is since it
	// cannot be split further. This limit prevents the framework from creating batches
	// so big that sending them causes issues (e.g running into the max size gRPC limit).
	// If necessary, set it up before the Stream starts synchronisation
	// This is not a concurrency-safe setting
	MaxSize uint64

	// KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It
	// is upto the caller to iterate over the versions and generate zero, one or more KVs. It
	// is expected that the user would advance the iterator to go through the versions of the
	// values. However, the user MUST immediately return from this function on the first encounter
	// with a mismatching key. See example usage in ToList function. Can be left nil to use ToList
	// function by default.
	//
	// KeyToList has access to z.Allocator accessible via stream.Allocator(itr.ThreadId). This
	// allocator can be used to allocate KVs, to decrease the memory pressure on Go GC. Stream
	// framework takes care of releasing those resources after calling Send. AllocRef does
	// NOT need to be set in the returned KVList, as Stream framework would ignore that field,
	// instead using the allocator assigned to that thread id.
	//
	// Note: Calls to KeyToList are concurrent.
	KeyToList func(key []byte, itr *Iterator) (*pb.KVList, error)
	// UseKeyToListWithThreadId is used to indicate that KeyToListWithThreadId should be used
	// instead of KeyToList. This is a new api that can be used to figure out parallelism
	// of the stream. Each threadId would be run serially. KeyToList being concurrent makes you
	// take care of concurrency in KeyToList. Here threadId could be used to do some things serially.
	// Once a thread finishes FinishThread() would be called.
	UseKeyToListWithThreadId bool
	KeyToListWithThreadId    func(key []byte, itr *Iterator, threadId int) (*pb.KVList, error)
	FinishThread             func(threadId int) (*pb.KVList, error)

	// This is the method where Stream sends the final output. All calls to Send are done by a
	// single goroutine, i.e. logic within Send method can expect single threaded execution.
	Send func(buf *z.Buffer) error

	// Read data above the sinceTs. All keys with version =< sinceTs will be ignored.
	SinceTs      uint64
	readTs       uint64
	db           *DB
	rangeCh      chan keyRange
	kvChan       chan *z.Buffer
	nextStreamId atomic.Uint32
	doneMarkers  bool
	scanned      atomic.Uint64 // used to estimate the ETA for data scan.
	numProducers atomic.Int32
}

// SendDoneMarkers when true would send out done markers on the stream. False by default.
func ( *Stream) ( bool) {
	.doneMarkers = 
}

// ToList is a default implementation of KeyToList. It picks up all valid versions of the key,
// skipping over deleted or expired keys.
func ( *Stream) ( []byte,  *Iterator) (*pb.KVList, error) {
	 := .Alloc
	 := .Copy()

	 := &pb.KVList{}
	for ; .Valid(); .Next() {
		 := .Item()
		if .IsDeletedOrExpired() {
			break
		}
		if !bytes.Equal(, .Key()) {
			// Break out on the first encounter with another key.
			break
		}

		 := y.NewKV()
		.Key = 

		if  := .Value(func( []byte) error {
			.Value = .Copy()
			return nil

		});  != nil {
			return nil, 
		}
		.Version = .Version()
		.ExpiresAt = .ExpiresAt()
		.UserMeta = .Copy([]byte{.UserMeta()})

		.Kv = append(.Kv, )
		if .db.opt.NumVersionsToKeep == 1 {
			break
		}

		if .DiscardEarlierVersions() {
			break
		}
	}
	return , nil
}

// keyRange is [start, end), including start, excluding end. Do ensure that the start,
// end byte slices are owned by keyRange struct.
func ( *Stream) ( context.Context) {
	 := .db.Ranges(.Prefix, .NumGo)
	y.AssertTrue(len() > 0)
	y.AssertTrue([0].left == nil)
	y.AssertTrue([len()-1].right == nil)
	.db.opt.Infof("Number of ranges found: %d\n", len())

	// Sort in descending order of size.
	sort.Slice(, func(,  int) bool {
		return [].size > [].size
	})
	for ,  := range  {
		.rangeCh <- *
		.db.opt.Infof("Sent range %d for iteration: [%x, %x) of size: %s\n",
			, .left, .right, humanize.IBytes(uint64(.size)))
	}
	close(.rangeCh)
}

// produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan.
func ( *Stream) ( context.Context,  int) error {
	.numProducers.Add(1)
	defer .numProducers.Add(-1)

	var  *Txn
	if .readTs > 0 {
		 = .db.NewTransactionAt(.readTs, false)
	} else {
		 = .db.NewTransaction(false)
	}
	defer .Discard()

	// produceKVs is running iterate serially. So, we can define the outList here.
	 := z.NewBuffer(2*batchSize, "Stream.ProduceKVs")
	defer func() {
		// The outList variable changes. So, we need to evaluate the variable in the defer. DO NOT
		// call `defer outList.Release()`.
		_ = .Release()
	}()

	 := func( keyRange) error {
		 := DefaultIteratorOptions
		.AllVersions = true
		.Prefix = .Prefix
		.PrefetchValues = true
		.SinceTs = .SinceTs
		 := .NewIterator()
		.ThreadId = 
		defer .Close()

		.Alloc = z.NewAllocator(1<<20, "Stream.Iterate")
		defer .Alloc.Release()

		// This unique stream id is used to identify all the keys from this iteration.
		 := .nextStreamId.Add(1)
		var  int

		 := func() error {
			select {
			case .kvChan <- :
				 = z.NewBuffer(2*batchSize, "Stream.ProduceKVs")
				.scanned.Add(uint64(.scanned - ))
				 = .scanned
			case <-.Done():
				return .Err()
			}
			return nil
		}

		var  []byte
		for .Seek(.left); .Valid(); {
			// it.Valid would only return true for keys with the provided Prefix in iterOpts.
			 := .Item()
			if bytes.Equal(.Key(), ) {
				.Next()
				continue
			}
			 = append([:0], .Key()...)

			// Check if we reached the end of the key range.
			if len(.right) > 0 && bytes.Compare(.Key(), .right) >= 0 {
				break
			}

			// Check if we should pick this key.
			if .ChooseKey != nil && !.ChooseKey() {
				continue
			}

			// Now convert to key value.
			.Alloc.Reset()
			var  *pb.KVList
			var  error
			if .UseKeyToListWithThreadId {
				,  = .KeyToListWithThreadId(.KeyCopy(nil), , )
			} else {
				,  = .KeyToList(.KeyCopy(nil), )
			}
			if  != nil {
				.db.opt.Warningf("While reading key: %x, got error: %v", .Key(), )
				continue
			}
			if  == nil || len(.Kv) == 0 {
				continue
			}
			for ,  := range .Kv {
				.StreamId = 
				KVToBuffer(, )
				if .LenNoPadding() < batchSize {
					continue
				}
				if  := ();  != nil {
					return 
				}
			}
		}

		if .UseKeyToListWithThreadId {
			if ,  := .FinishThread();  != nil {
				return 
			} else {
				for ,  := range .Kv {
					.StreamId = 
					KVToBuffer(, )
					if .LenNoPadding() < batchSize {
						continue
					}
					if  := ();  != nil {
						return 
					}
				}
			}
		}
		// Mark the stream as done.
		if .doneMarkers {
			 := &pb.KV{
				StreamId:   ,
				StreamDone: true,
			}
			KVToBuffer(, )
		}
		return ()
	}

	for {
		select {
		case ,  := <-.rangeCh:
			if ! {
				// Done with the keys.
				return nil
			}
			if  := ();  != nil {
				return 
			}
		case <-.Done():
			return .Err()
		}
	}
}

func ( *Stream) ( context.Context) error {
	,  := .db.EstimateSize(.Prefix)
	// Manish has seen uncompressed size to be in 20% error margin.
	 = uint64(float64() * 1.2)
	.db.opt.Infof("%s Streaming about %s of uncompressed data (%s on disk)\n",
		.LogPrefix, humanize.IBytes(), humanize.IBytes())

	 := 5 * time.Second
	var  uint64
	 := time.NewTicker()
	defer .Stop()
	 := time.Now()

	 := func( *z.Buffer) error {
		defer func() { _ = .Release() }()
		 := uint64(.LenNoPadding())
		if  == 0 {
			return nil
		}
		 += 
		// st.db.opt.Infof("%s Sending batch of size: %s.\n", st.LogPrefix, humanize.IBytes(sz))
		if  := .Send();  != nil {
			.db.opt.Warningf("Error while sending: %v\n", )
			return 
		}
		return nil
	}

	 := func( *z.Buffer) error {
	:
		for {
			// Send the batch immediately if it already exceeds the maximum allowed size.
			// If the size of the batch exceeds maxStreamSize, break from the loop to
			// avoid creating a batch that is so big that certain limits are reached.
			if uint64(.LenNoPadding()) > .MaxSize {
				break 
			}
			select {
			case ,  := <-.kvChan:
				if ! {
					break 
				}
				y.AssertTrue( != nil)
				y.Check2(.Write(.Bytes()))
				y.Check(.Release())

			default:
				break 
			}
		}
		return ()
	} // end of slurp.

	 := y.NewRateMonitor(20)
	 := y.NewRateMonitor(20)
:
	for {
		var  *z.Buffer
		select {
		case <-.Done():
			return .Err()

		case <-.C:
			// Instead of calculating speed over the entire lifetime, we average the speed over
			// ticker duration.
			.Capture()
			 := .scanned.Load()
			.Capture()
			 := .numProducers.Load()

			.db.opt.Infof("%s [%s] Scan (%d): ~%s/%s at %s/sec. Sent: %s at %s/sec."+
				" jemalloc: %s\n",
				.LogPrefix, y.FixedDuration(time.Since()), ,
				y.IBytesToString(, 1), humanize.IBytes(),
				humanize.IBytes(.Rate()),
				y.IBytesToString(, 1), humanize.IBytes(.Rate()),
				humanize.IBytes(uint64(z.NumAllocBytes())))

		case ,  := <-.kvChan:
			if ! {
				break 
			}
			y.AssertTrue( != nil)
			 = 

			// Otherwise, slurp more keys into this batch.
			if  := ();  != nil {
				return 
			}
		}
	}

	.db.opt.Infof("%s Sent data of size %s\n", .LogPrefix, humanize.IBytes())
	return nil
}

// Orchestrate runs Stream. It picks up ranges from the SSTables, then runs NumGo number of
// goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single
// goroutine to pick these lists, batch them up further and send to Output.Send. Orchestrate also
// spits logs out to Infof, using provided LogPrefix. Note that all calls to Output.Send
// are serial. In case any of these steps encounter an error, Orchestrate would stop execution and
// return that error. Orchestrate can be called multiple times, but in serial order.
func ( *Stream) ( context.Context) error {
	,  := context.WithCancel()
	defer ()
	.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists.

	// kvChan should only have a small capacity to ensure that we don't buffer up too much data if
	// sending is slow. Page size is set to 4MB, which is used to lazily cap the size of each
	// KVList. To get 128MB buffer, we can set the channel size to 32.
	.kvChan = make(chan *z.Buffer, 32)

	if .KeyToList == nil {
		.KeyToList = .ToList
	}

	// Picks up ranges from Badger, and sends them to rangeCh.
	go .produceRanges()

	 := make(chan error, .NumGo) // Stores error by consumeKeys.
	var  sync.WaitGroup
	for  := 0;  < .NumGo; ++ {
		.Add(1)

		go func( int) {
			defer .Done()
			// Picks up ranges from rangeCh, generates KV lists, and sends them to kvChan.
			if  := .produceKVs(, );  != nil {
				select {
				case  <- :
				default:
				}
			}
		}()
	}

	// Pick up key-values from kvChan and send to stream.
	 := make(chan error, 1)
	go func() {
		// Picks up KV lists from kvChan, and sends them to Output.
		 := .streamKVs()
		if  != nil {
			() // Stop all the go routines.
		}
		 <- 
	}()
	.Wait()        // Wait for produceKVs to be over.
	close(.kvChan) // Now we can close kvChan.
	defer func() {
		// If due to some error, we have buffers left in kvChan, we should release them.
		for  := range .kvChan {
			_ = .Release()
		}
	}()

	select {
	case  := <-: // Check error from produceKVs.
		return 
	default:
	}

	// Wait for key streaming to be over.
	 := <-
	return 
}

func ( *DB) () *Stream {
	return &Stream{
		db:        ,
		NumGo:     .opt.NumGoroutines,
		LogPrefix: "Badger.Stream",
		MaxSize:   maxStreamSize,
	}
}

// NewStream creates a new Stream.
func ( *DB) () *Stream {
	if .opt.managedTxns {
		panic("This API can not be called in managed mode.")
	}
	return .newStream()
}

// NewStreamAt creates a new Stream at a particular timestamp. Should only be used with managed DB.
func ( *DB) ( uint64) *Stream {
	if !.opt.managedTxns {
		panic("This API can only be called in managed mode.")
	}
	 := .newStream()
	.readTs = 
	return 
}

func ( *z.Buffer) (*pb.KVList, error) {
	var  pb.KVList
	 := .SliceIterate(func( []byte) error {
		 := new(pb.KV)
		if  := proto.Unmarshal(, );  != nil {
			return 
		}
		.Kv = append(.Kv, )
		return nil
	})
	return &, 
}

func ( *pb.KV,  *z.Buffer) {
	 := .SliceAllocate(proto.Size())[:0]
	,  := proto.MarshalOptions{}.MarshalAppend(, )
	y.AssertTrue( == nil)
}