/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package badger

import (
	
	
	

	
	

	
	
	
	
)

// StreamWriter is used to write data coming from multiple streams. The streams must not have any
// overlapping key ranges. Within each stream, the keys must be sorted. Badger Stream framework is
// capable of generating such an output. So, this StreamWriter can be used at the other end to build
// BadgerDB at a much faster pace by writing SSTables (and value logs) directly to LSM tree levels
// without causing any compactions at all. This is way faster than using batched writer or using
// transactions, but only applicable in situations where the keys are pre-sorted and the DB is being
// bootstrapped. Existing data would get deleted when using this writer. So, this is only useful
// when restoring from backup or replicating DB across servers.
//
// StreamWriter should not be called on in-use DB instances. It is designed only to bootstrap new
// DBs.
type StreamWriter struct {
	writeLock  sync.Mutex
	db         *DB
	done       func()
	throttle   *y.Throttle
	maxVersion uint64
	writers    map[uint32]*sortedWriter
	prevLevel  int
}

// NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
// called. The memory usage of a StreamWriter is directly proportional to the number of streams
// possible. So, efforts must be made to keep the number of streams low. Stream framework would
// typically use 16 goroutines and hence create 16 streams.
func ( *DB) () *StreamWriter {
	return &StreamWriter{
		db: ,
		// throttle shouldn't make much difference. Memory consumption is based on the number of
		// concurrent streams being processed.
		throttle: y.NewThrottle(16),
		writers:  make(map[uint32]*sortedWriter),
	}
}

// Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
// existing DB, stops compactions and any writes being done by other means. Be very careful when
// calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
// in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
func ( *StreamWriter) () error {
	.writeLock.Lock()
	defer .writeLock.Unlock()

	,  := .db.dropAll()
	// Ensure that done() is never called more than once.
	var  sync.Once
	.done = func() { .Do() }
	return 
}

// PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
// In incremental stream write, the tables are written at one level above the current base level.
func ( *StreamWriter) () error {
	.writeLock.Lock()
	defer .writeLock.Unlock()

	// Ensure that done() is never called more than once.
	var  sync.Once

	// prepareToDrop will stop all the incoming writes and process any pending flush tasks.
	// Before we start writing, we'll stop the compactions because no one else should be writing to
	// the same level as the stream writer is writing to.
	,  := .db.prepareToDrop()
	if  != nil {
		.done = func() { .Do() }
		return 
	}
	.db.stopCompactions()
	 := func() {
		.db.startCompactions()
		()
	}
	.done = func() { .Do() }

	,  := .db.getMemTables()
	defer ()
	for ,  := range  {
		if !.sl.Empty() {
			return fmt.Errorf("Unable to do incremental writes because MemTable has data")
		}
	}

	 := true
	for ,  := range .db.Levels() {
		if .NumTables > 0 {
			.prevLevel = .Level
			 = false
			break
		}
	}
	if  {
		// If DB is empty, we should allow doing incremental stream write.
		return nil
	}
	if .prevLevel == 0 {
		// It seems that data is present in all levels from Lmax to L0. If we call flatten
		// on the tree, all the data will go to Lmax. All the levels above will be empty
		// after flatten call. Now, we should be able to use incremental stream writer again.
		if  := .db.Flatten(3);  != nil {
			return fmt.Errorf("error during flatten in StreamWriter: %w", )
		}
		.prevLevel = len(.db.Levels()) - 1
	}
	return nil
}

// Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
// would use to demux the writes. Write is thread safe and can be called concurrently by multiple
// goroutines.
func ( *StreamWriter) ( *z.Buffer) error {
	if .LenNoPadding() == 0 {
		return nil
	}

	// closedStreams keeps track of all streams which are going to be marked as done. We are
	// keeping track of all streams so that we can close them at the end, after inserting all
	// the valid kvs.
	 := make(map[uint32]struct{})
	 := make(map[uint32]*request)

	 := .SliceIterate(func( []byte) error {
		var  pb.KV
		if  := proto.Unmarshal(, &);  != nil {
			return 
		}
		if .StreamDone {
			[.StreamId] = struct{}{}
			return nil
		}

		// Panic if some kv comes after stream has been marked as closed.
		if ,  := [.StreamId];  {
			panic(fmt.Sprintf("write performed on closed stream: %d", .StreamId))
		}

		.writeLock.Lock()
		if .maxVersion < .Version {
			.maxVersion = .Version
		}
		if .prevLevel == 0 {
			// If prevLevel is 0, that means that we have not written anything yet.
			// So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
			// so we can set prevLevel to len(levels).
			.prevLevel = len(.db.lc.levels)
		}
		.writeLock.Unlock()

		var ,  byte
		if len(.Meta) > 0 {
			 = .Meta[0]
		}
		if len(.UserMeta) > 0 {
			 = .UserMeta[0]
		}
		 := &Entry{
			Key:       y.KeyWithTs(.Key, .Version),
			Value:     y.Copy(.Value),
			UserMeta:  ,
			ExpiresAt: .ExpiresAt,
			meta:      ,
		}
		// If the value can be collocated with the key in LSM tree, we can skip
		// writing the value to value log.
		 := [.StreamId]
		if  == nil {
			 = &request{}
			[.StreamId] = 
		}
		.Entries = append(.Entries, )
		return nil
	})
	if  != nil {
		return 
	}

	 := make([]*request, 0, len())
	for ,  := range  {
		 = append(, )
	}

	.writeLock.Lock()
	defer .writeLock.Unlock()

	// We are writing all requests to vlog even if some request belongs to already closed stream.
	// It is safe to do because we are panicking while writing to sorted writer, which will be nil
	// for closed stream. At restart, stream writer will drop all the data in Prepare function.
	if  := .db.vlog.write();  != nil {
		return 
	}

	for ,  := range  {
		,  := .writers[]
		if ! {
			var  error
			,  = .newWriter()
			if  != nil {
				return y.Wrapf(, "failed to create writer with ID %d", )
			}
			.writers[] = 
		}

		if  == nil {
			panic(fmt.Sprintf("write performed on closed stream: %d", ))
		}

		.reqCh <- 
	}

	// Now we can close any streams if required. We will make writer for
	// the closed streams as nil.
	for  := range  {
		,  := .writers[]
		if ! {
			.db.opt.Warningf("Trying to close stream: %d, but no sorted "+
				"writer found for it", )
			continue
		}

		.closer.SignalAndWait()
		if  := .Done();  != nil {
			return 
		}

		.writers[] = nil
	}
	return nil
}

// Flush is called once we are done writing all the entries. It syncs DB directories. It also
// updates Oracle with maxVersion found in all entries (if DB is not managed).
func ( *StreamWriter) () error {
	.writeLock.Lock()
	defer .writeLock.Unlock()

	defer .done()

	for ,  := range .writers {
		if  != nil {
			.closer.SignalAndWait()
		}
	}

	for ,  := range .writers {
		if  == nil {
			continue
		}
		if  := .Done();  != nil {
			return 
		}
	}

	if !.db.opt.managedTxns {
		if .db.orc != nil {
			.db.orc.Stop()
		}

		if  := .db.orc.readTs();  >= .maxVersion {
			.maxVersion = 
		}

		.db.orc = newOracle(.db.opt)
		.db.orc.nextTxnTs = .maxVersion
		.db.orc.txnMark.Done(.maxVersion)
		.db.orc.readMark.Done(.maxVersion)
		.db.orc.incrementNextTs()
	}

	// Wait for all files to be written.
	if  := .throttle.Finish();  != nil {
		return 
	}

	// Sort tables at the end.
	for ,  := range .db.lc.levels {
		.sortTables()
	}

	// Now sync the directories, so all the files are registered.
	if .db.opt.ValueDir != .db.opt.Dir {
		if  := .db.syncDir(.db.opt.ValueDir);  != nil {
			return 
		}
	}
	if  := .db.syncDir(.db.opt.Dir);  != nil {
		return 
	}
	return .db.lc.validate()
}

// Cancel signals all goroutines to exit. Calling defer sw.Cancel() immediately after creating a new StreamWriter
// ensures that writes are unblocked even upon early return. Note that dropAll() is not called here, so any
// partially written data will not be erased until a new StreamWriter is initialized.
func ( *StreamWriter) () {
	.writeLock.Lock()
	defer .writeLock.Unlock()

	for ,  := range .writers {
		if  != nil {
			.closer.Signal()
		}
	}
	for ,  := range .writers {
		if  != nil {
			.closer.Wait()
		}
	}

	if  := .throttle.Finish();  != nil {
		.db.opt.Errorf("error in throttle.Finish: %+v", )
	}

	// Handle Cancel() being called before Prepare().
	if .done != nil {
		.done()
	}
}

type sortedWriter struct {
	db       *DB
	throttle *y.Throttle
	opts     table.Options

	builder  *table.Builder
	lastKey  []byte
	level    int
	streamID uint32
	reqCh    chan *request
	// Have separate closer for each writer, as it can be closed at any time.
	closer *z.Closer
}

func ( *StreamWriter) ( uint32) (*sortedWriter, error) {
	 := buildTableOptions(.db)
	for  := 2;  < .db.opt.MaxLevels; ++ {
		.TableSize *= uint64(.db.opt.TableSizeMultiplier)
	}
	 := &sortedWriter{
		db:       .db,
		opts:     ,
		streamID: ,
		throttle: .throttle,
		builder:  table.NewTableBuilder(),
		reqCh:    make(chan *request, 3),
		closer:   z.NewCloser(1),
		level:    .prevLevel - 1, // Write at the level just above the one we were writing to.
	}

	go .handleRequests()
	return , nil
}

func ( *sortedWriter) () {
	defer .closer.Done()

	 := func( *request) {
		for ,  := range .Entries {
			// If badger is running in InMemory mode, len(req.Ptrs) == 0.
			var  y.ValueStruct
			if .skipVlogAndSetThreshold(.db.valueThreshold()) {
				 = y.ValueStruct{
					Value:     .Value,
					Meta:      .meta,
					UserMeta:  .UserMeta,
					ExpiresAt: .ExpiresAt,
				}
			} else {
				 := .Ptrs[]
				 = y.ValueStruct{
					Value:     .Encode(),
					Meta:      .meta | bitValuePointer,
					UserMeta:  .UserMeta,
					ExpiresAt: .ExpiresAt,
				}
			}
			if  := .Add(.Key, );  != nil {
				panic()
			}
		}
	}

	for {
		select {
		case  := <-.reqCh:
			()
		case <-.closer.HasBeenClosed():
			close(.reqCh)
			for  := range .reqCh {
				()
			}
			return
		}
	}
}

// Add adds key and vs to sortedWriter.
func ( *sortedWriter) ( []byte,  y.ValueStruct) error {
	if len(.lastKey) > 0 && y.CompareKeys(, .lastKey) <= 0 {
		return fmt.Errorf("keys not in sorted order (last key: %s, key: %s)",
			hex.Dump(.lastKey), hex.Dump())
	}

	 := y.SameKey(, .lastKey)

	// Same keys should go into the same SSTable.
	if ! && .builder.ReachedCapacity() {
		if  := .send(false);  != nil {
			return 
		}
	}

	.lastKey = y.SafeCopy(.lastKey, )
	var  valuePointer
	if .Meta&bitValuePointer > 0 {
		.Decode(.Value)
	}

	.builder.Add(, , .Len)
	return nil
}

func ( *sortedWriter) ( bool) error {
	if  := .throttle.Do();  != nil {
		return 
	}
	go func( *table.Builder) {
		 := .createTable()
		.throttle.Done()
	}(.builder)
	// If done is true, this indicates we can close the writer.
	// No need to allocate underlying TableBuilder now.
	if  {
		.builder = nil
		return nil
	}

	.builder = table.NewTableBuilder(.opts)
	return nil
}

// Done is called once we are done writing all keys and valueStructs
// to sortedWriter. It completes writing current SST to disk.
func ( *sortedWriter) () error {
	if .builder.Empty() {
		.builder.Close()
		// Assign builder as nil, so that underlying memory can be garbage collected.
		.builder = nil
		return nil
	}

	return .send(true)
}

func ( *sortedWriter) ( *table.Builder) error {
	defer .Close()
	if .Empty() {
		.Finish()
		return nil
	}

	 := .db.lc.reserveFileID()
	var  *table.Table
	if .db.opt.InMemory {
		 := .Finish()
		var  error
		if ,  = table.OpenInMemoryTable(, , .Opts());  != nil {
			return 
		}
	} else {
		var  error
		 := table.NewFilename(, .db.opt.Dir)
		if ,  = table.CreateTable(, );  != nil {
			return 
		}
	}
	 := .db.lc

	 := .levels[.level]
	// Now that table can be opened successfully, let's add this to the MANIFEST.
	 := &pb.ManifestChange{
		Id:          .ID(),
		KeyId:       .KeyID(),
		Op:          pb.ManifestChange_CREATE,
		Level:       uint32(.level),
		Compression: uint32(.CompressionType()),
	}
	if  := .db.manifest.addChanges([]*pb.ManifestChange{}, .db.opt);  != nil {
		return 
	}

	// We are not calling lhandler.replaceTables() here, as it sorts tables on every addition.
	// We can sort all tables only once during Flush() call.
	.addTable()

	// Release the ref held by OpenTable.
	_ = .DecrRef()
	.db.opt.Infof("Table created: %d at level: %d for stream: %d. Size: %s\n",
		, .level, .streamID, humanize.IBytes(uint64(.Size())))
	return nil
}