/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package y

import (
	
	
	

	
)

type uint64Heap []uint64

func ( uint64Heap) () int            { return len() }
func ( uint64Heap) (,  int) bool  { return [] < [] }
func ( uint64Heap) (,  int)       { [], [] = [], [] }
func ( *uint64Heap) ( interface{}) { * = append(*, .(uint64)) }
func ( *uint64Heap) () interface{} {
	 := *
	 := len()
	 := [-1]
	* = [0 : -1]
	return 
}

// mark contains one of more indices, along with a done boolean to indicate the
// status of the index: begin or done. It also contains waiters, who could be
// waiting for the watermark to reach >= a certain index.
type mark struct {
	// Either this is an (index, waiter) pair or (index, done) or (indices, done).
	index   uint64
	waiter  chan struct{}
	indices []uint64
	done    bool // Set to true if the index is done.
}

// WaterMark is used to keep track of the minimum un-finished index.  Typically, an index k becomes
// finished or "done" according to a WaterMark once Done(k) has been called
//  1. as many times as Begin(k) has, AND
//  2. a positive number of times.
//
// An index may also become "done" by calling SetDoneUntil at a time such that it is not
// inter-mingled with Begin/Done calls.
//
// Since doneUntil and lastIndex addresses are passed to sync/atomic packages, we ensure that they
// are 64-bit aligned by putting them at the beginning of the structure.
type WaterMark struct {
	doneUntil atomic.Uint64
	lastIndex atomic.Uint64
	Name      string
	markCh    chan mark
}

// Init initializes a WaterMark struct. MUST be called before using it.
func ( *WaterMark) ( *z.Closer) {
	.markCh = make(chan mark, 100)
	go .process()
}

// Begin sets the last index to the given value.
func ( *WaterMark) ( uint64) {
	.lastIndex.Store()
	.markCh <- mark{index: , done: false}
}

// BeginMany works like Begin but accepts multiple indices.
func ( *WaterMark) ( []uint64) {
	.lastIndex.Store([len()-1])
	.markCh <- mark{index: 0, indices: , done: false}
}

// Done sets a single index as done.
func ( *WaterMark) ( uint64) {
	.markCh <- mark{index: , done: true}
}

// DoneMany works like Done but accepts multiple indices.
func ( *WaterMark) ( []uint64) {
	.markCh <- mark{index: 0, indices: , done: true}
}

// DoneUntil returns the maximum index that has the property that all indices
// less than or equal to it are done.
func ( *WaterMark) () uint64 {
	return .doneUntil.Load()
}

// SetDoneUntil sets the maximum index that has the property that all indices
// less than or equal to it are done.
func ( *WaterMark) ( uint64) {
	.doneUntil.Store()
}

// LastIndex returns the last index for which Begin has been called.
func ( *WaterMark) () uint64 {
	return .lastIndex.Load()
}

// WaitForMark waits until the given index is marked as done.
func ( *WaterMark) ( context.Context,  uint64) error {
	if .DoneUntil() >=  {
		return nil
	}
	 := make(chan struct{})
	.markCh <- mark{index: , waiter: }

	select {
	case <-.Done():
		return .Err()
	case <-:
		return nil
	}
}

// process is used to process the Mark channel. This is not thread-safe,
// so only run one goroutine for process. One is sufficient, because
// all goroutine ops use purely memory and cpu.
// Each index has to emit at least one begin watermark in serial order otherwise waiters
// can get blocked idefinitely. Example: We had an watermark at 100 and a waiter at 101,
// if no watermark is emitted at index 101 then waiter would get stuck indefinitely as it
// can't decide whether the task at 101 has decided not to emit watermark or it didn't get
// scheduled yet.
func ( *WaterMark) ( *z.Closer) {
	defer .Done()

	var  uint64Heap
	// pending maps raft proposal index to the number of pending mutations for this proposal.
	 := make(map[uint64]int)
	 := make(map[uint64][]chan struct{})

	heap.Init(&)

	 := func( uint64,  bool) {
		// If not already done, then set. Otherwise, don't undo a done entry.
		,  := []
		if ! {
			heap.Push(&, )
		}

		 := 1
		if  {
			 = -1
		}
		[] =  + 

		// Update mark by going through all indices in order; and checking if they have
		// been done. Stop at the first index, which isn't done.
		 := .DoneUntil()
		if  >  {
			AssertTruef(false, "Name: %s doneUntil: %d. Index: %d", .Name, , )
		}

		 := 
		 := 0

		for len() > 0 {
			 := [0]
			if  := [];  > 0 {
				break // len(indices) will be > 0.
			}
			// Even if done is called multiple times causing it to become
			// negative, we should still pop the index.
			heap.Pop(&)
			delete(, )
			 = 
			++
		}

		if  !=  {
			AssertTrue(.doneUntil.CompareAndSwap(, ))
		}

		 := func( uint64,  []chan struct{}) {
			for ,  := range  {
				close()
			}
			delete(, ) // Release the memory back.
		}

		if - <= uint64(len()) {
			// Issue #908 showed that if doneUntil is close to 2^60, while until is zero, this loop
			// can hog up CPU just iterating over integers creating a busy-wait loop. So, only do
			// this path if until - doneUntil is less than the number of waiters.
			for  :=  + 1;  <= ; ++ {
				if ,  := [];  {
					(, )
				}
			}
		} else {
			for ,  := range  {
				if  <=  {
					(, )
				}
			}
		} // end of notifying waiters.
	}

	for {
		select {
		case <-.HasBeenClosed():
			return
		case  := <-.markCh:
			if .waiter != nil {
				 := .doneUntil.Load()
				if  >= .index {
					close(.waiter)
				} else {
					,  := [.index]
					if ! {
						[.index] = []chan struct{}{.waiter}
					} else {
						[.index] = append(, .waiter)
					}
				}
			} else {
				// it is possible that mark.index is zero. We need to handle that case as well.
				if .index > 0 || (.index == 0 && len(.indices) == 0) {
					(.index, .done)
				}
				for ,  := range .indices {
					(, .done)
				}
			}
		}
	}
}