/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package badger

import (
	stderrors 
	
	

	
	
)

// MergeOperator represents a Badger merge operator.
type MergeOperator struct {
	sync.RWMutex
	f      MergeFunc
	db     *DB
	key    []byte
	closer *z.Closer
}

// MergeFunc accepts two byte slices, one representing an existing value, and
// another representing a new value that needs to be ‘merged’ into it. MergeFunc
// contains the logic to perform the ‘merge’ and return an updated value.
// MergeFunc could perform operations like integer addition, list appends etc.
// Note that the ordering of the operands is maintained.
type MergeFunc func(existingVal, newVal []byte) []byte

// GetMergeOperator creates a new MergeOperator for a given key and returns a
// pointer to it. It also fires off a goroutine that performs a compaction using
// the merge function that runs periodically, as specified by dur.
func ( *DB) ( []byte,
	 MergeFunc,  time.Duration) *MergeOperator {
	 := &MergeOperator{
		f:      ,
		db:     ,
		key:    ,
		closer: z.NewCloser(1),
	}

	go .runCompactions()
	return 
}

var errNoMerge = stderrors.New("No need for merge")

func ( *MergeOperator) () ( []byte,  uint64,  error) {
	 := .db.NewTransaction(false)
	defer .Discard()
	 := DefaultIteratorOptions
	.AllVersions = true
	 := .NewKeyIterator(.key, )
	defer .Close()

	var  int
	for .Rewind(); .Valid(); .Next() {
		 := .Item()
		if .IsDeletedOrExpired() {
			break
		}
		++
		if  == 1 {
			// This should be the newVal, considering this is the latest version.
			,  = .ValueCopy()
			if  != nil {
				return nil, 0, 
			}
			 = .Version()
		} else {
			if  := .Value(func( []byte) error {
				// The merge should always be on the newVal considering it has the merge result of
				// the latest version. The value read should be the oldVal.
				 = .f(, )
				return nil
			});  != nil {
				return nil, 0, 
			}
		}
		if .DiscardEarlierVersions() {
			break
		}
	}
	if  == 0 {
		return nil, , ErrKeyNotFound
	} else if  == 1 {
		return , , errNoMerge
	}
	return , , nil
}

func ( *MergeOperator) () error {
	.Lock()
	defer .Unlock()
	, ,  := .iterateAndMerge()
	if  == ErrKeyNotFound ||  == errNoMerge {
		return nil
	} else if  != nil {
		return 
	}
	 := []*Entry{
		{
			Key:   y.KeyWithTs(.key, ),
			Value: ,
			meta:  bitDiscardEarlierVersions,
		},
	}
	// Write value back to the DB. It is important that we do not set the bitMergeEntry bit
	// here. When compaction happens, all the older merged entries will be removed.
	return .db.batchSetAsync(, func( error) {
		if  != nil {
			.db.opt.Errorf("failed to insert the result of merge compaction: %s", )
		}
	})
}

func ( *MergeOperator) ( time.Duration) {
	 := time.NewTicker()
	defer .closer.Done()
	var  bool
	for {
		select {
		case <-.closer.HasBeenClosed():
			 = true
		case <-.C: // wait for tick
		}
		if  := .compact();  != nil {
			.db.opt.Errorf("failure while running merge operation: %s", )
		}
		if  {
			.Stop()
			break
		}
	}
}

// Add records a value in Badger which will eventually be merged by a background
// routine into the values that were recorded by previous invocations to Add().
func ( *MergeOperator) ( []byte) error {
	return .db.Update(func( *Txn) error {
		return .SetEntry(NewEntry(.key, ).withMergeBit())
	})
}

// Get returns the latest value for the merge operator, which is derived by
// applying the merge function to all the values added so far.
//
// If Add has not been called even once, Get will return ErrKeyNotFound.
func ( *MergeOperator) () ([]byte, error) {
	.RLock()
	defer .RUnlock()
	var  []byte
	 := .db.View(func( *Txn) ( error) {
		, _,  = .iterateAndMerge()
		return 
	})
	if  == errNoMerge {
		return , nil
	}
	return , 
}

// Stop waits for any pending merge to complete and then stops the background
// goroutine.
func ( *MergeOperator) () {
	.closer.SignalAndWait()
}