/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package badger

import (
	
	
	
	
	
	

	

	
	
	
)

// flushThreshold determines when a buffer will be flushed. When performing a
// backup/restore, the entries will be batched up until the total size of batch
// is more than flushThreshold or entry size (without the value size) is more
// than the maxBatchSize.
const flushThreshold = 100 << 20

// Backup dumps a protobuf-encoded list of all entries in the database into the
// given writer, that are newer than or equal to the specified version. It
// returns a timestamp (version) indicating the version of last entry that is
// dumped, which after incrementing by 1 can be passed into later invocation to
// generate incremental backup of entries that have been added/modified since
// the last invocation of DB.Backup().
// DB.Backup is a wrapper function over Stream.Backup to generate full and
// incremental backups of the DB. For more control over how many goroutines are
// used to generate the backup, or if you wish to backup only a certain range
// of keys, use Stream.Backup directly.
func ( *DB) ( io.Writer,  uint64) (uint64, error) {
	 := .NewStream()
	.LogPrefix = "DB.Backup"
	.SinceTs = 
	return .Backup(, )
}

// Backup dumps a protobuf-encoded list of all entries in the database into the
// given writer, that are newer than or equal to the specified version. It returns a
// timestamp(version) indicating the version of last entry that was dumped, which
// after incrementing by 1 can be passed into a later invocation to generate an
// incremental dump of entries that have been added/modified since the last
// invocation of Stream.Backup().
//
// This can be used to backup the data in a database at a given point in time.
func ( *Stream) ( io.Writer,  uint64) (uint64, error) {
	.KeyToList = func( []byte,  *Iterator) (*pb.KVList, error) {
		 := &pb.KVList{}
		 := .Alloc
		for ; .Valid(); .Next() {
			 := .Item()
			if !bytes.Equal(.Key(), ) {
				return , nil
			}
			if .Version() <  {
				return nil, fmt.Errorf("Backup: Item Version: %d less than sinceTs: %d",
					.Version(), )
			}

			var  []byte
			if !.IsDeletedOrExpired() {
				// No need to copy value, if item is deleted or expired.
				 := .Value(func( []byte) error {
					 = .Copy()
					return nil
				})
				if  != nil {
					.db.opt.Errorf("Key [%x, %d]. Error while fetching value [%v]\n",
						.Key(), .Version(), )
					return nil, 
				}
			}

			// clear txn bits
			 := .meta &^ (bitTxn | bitFinTxn)
			 := y.NewKV()
			* = pb.KV{
				Key:       .Copy(.Key()),
				Value:     ,
				UserMeta:  .Copy([]byte{.UserMeta()}),
				Version:   .Version(),
				ExpiresAt: .ExpiresAt(),
				Meta:      .Copy([]byte{}),
			}
			.Kv = append(.Kv, )

			switch {
			case .DiscardEarlierVersions():
				// If we need to discard earlier versions of this item, add a delete
				// marker just below the current version.
				.Kv = append(.Kv, &pb.KV{
					Key:     .KeyCopy(nil),
					Version: .Version() - 1,
					Meta:    []byte{bitDelete},
				})
				return , nil

			case .IsDeletedOrExpired():
				return , nil
			}
		}
		return , nil
	}

	var  uint64
	.Send = func( *z.Buffer) error {
		,  := BufferToKVList()
		if  != nil {
			return 
		}
		 := .Kv[:0]
		for ,  := range .Kv {
			if  < .Version {
				 = .Version
			}
			if !.StreamDone {
				// Don't pick stream done changes.
				 = append(, )
			}
		}
		.Kv = 
		return writeTo(, )
	}

	if  := .Orchestrate(context.Background());  != nil {
		return 0, 
	}
	return , nil
}

func writeTo( *pb.KVList,  io.Writer) error {
	if  := binary.Write(, binary.LittleEndian, uint64(proto.Size()));  != nil {
		return 
	}
	,  := proto.Marshal()
	if  != nil {
		return 
	}
	_,  = .Write()
	return 
}

// KVLoader is used to write KVList objects in to badger. It can be used to restore a backup.
type KVLoader struct {
	db          *DB
	throttle    *y.Throttle
	entries     []*Entry
	entriesSize int64
	totalSize   int64
}

// NewKVLoader returns a new instance of KVLoader.
func ( *DB) ( int) *KVLoader {
	return &KVLoader{
		db:       ,
		throttle: y.NewThrottle(),
		entries:  make([]*Entry, 0, .opt.maxBatchCount),
	}
}

// Set writes the key-value pair to the database.
func ( *KVLoader) ( *pb.KV) error {
	var ,  byte
	if len(.UserMeta) > 0 {
		 = .UserMeta[0]
	}
	if len(.Meta) > 0 {
		 = .Meta[0]
	}
	 := &Entry{
		Key:       y.KeyWithTs(.Key, .Version),
		Value:     .Value,
		UserMeta:  ,
		ExpiresAt: .ExpiresAt,
		meta:      ,
	}
	 := .estimateSizeAndSetThreshold(.db.valueThreshold())
	// Flush entries if inserting the next entry would overflow the transactional limits.
	if int64(len(.entries))+1 >= .db.opt.maxBatchCount ||
		.entriesSize+ >= .db.opt.maxBatchSize ||
		.totalSize >= flushThreshold {
		if  := .send();  != nil {
			return 
		}
	}
	.entries = append(.entries, )
	.entriesSize += 
	.totalSize +=  + int64(len(.Value))
	return nil
}

func ( *KVLoader) () error {
	if  := .throttle.Do();  != nil {
		return 
	}
	if  := .db.batchSetAsync(.entries, func( error) {
		.throttle.Done()
	});  != nil {
		return 
	}

	.entries = make([]*Entry, 0, .db.opt.maxBatchCount)
	.entriesSize = 0
	.totalSize = 0
	return nil
}

// Finish is meant to be called after all the key-value pairs have been loaded.
func ( *KVLoader) () error {
	if len(.entries) > 0 {
		if  := .send();  != nil {
			return 
		}
	}
	return .throttle.Finish()
}

// Load reads a protobuf-encoded list of all entries from a reader and writes
// them to the database. This can be used to restore the database from a backup
// made by calling DB.Backup(). If more complex logic is needed to restore a badger
// backup, the KVLoader interface should be used instead.
//
// DB.Load() should be called on a database that is not running any other
// concurrent transactions while it is running.
func ( *DB) ( io.Reader,  int) error {
	 := bufio.NewReaderSize(, 16<<10)
	 := make([]byte, 1<<10)

	 := .NewKVLoader()
	for {
		var  uint64
		 := binary.Read(, binary.LittleEndian, &)
		if  == io.EOF {
			break
		} else if  != nil {
			return 
		}

		if cap() < int() {
			 = make([]byte, )
		}

		if _,  = io.ReadFull(, [:]);  != nil {
			return 
		}

		 := &pb.KVList{}
		if  := proto.Unmarshal([:], );  != nil {
			return 
		}

		for ,  := range .Kv {
			if  := .Set();  != nil {
				return 
			}

			// Update nextTxnTs, memtable stores this
			// timestamp in badger head when flushed.
			if .Version >= .orc.nextTxnTs {
				.orc.nextTxnTs = .Version + 1
			}
		}
	}

	if  := .Finish();  != nil {
		return 
	}
	.orc.txnMark.Done(.orc.nextTxnTs - 1)
	return nil
}