package y
import (
"container/heap"
"context"
"sync/atomic"
"github.com/dgraph-io/ristretto/v2/z"
)
type uint64Heap []uint64
func (u uint64Heap ) Len () int { return len (u ) }
func (u uint64Heap ) Less (i , j int ) bool { return u [i ] < u [j ] }
func (u uint64Heap ) Swap (i , j int ) { u [i ], u [j ] = u [j ], u [i ] }
func (u *uint64Heap ) Push (x interface {}) { *u = append (*u , x .(uint64 )) }
func (u *uint64Heap ) Pop () interface {} {
old := *u
n := len (old )
x := old [n -1 ]
*u = old [0 : n -1 ]
return x
}
type mark struct {
index uint64
waiter chan struct {}
indices []uint64
done bool
}
type WaterMark struct {
doneUntil atomic .Uint64
lastIndex atomic .Uint64
Name string
markCh chan mark
}
func (w *WaterMark ) Init (closer *z .Closer ) {
w .markCh = make (chan mark , 100 )
go w .process (closer )
}
func (w *WaterMark ) Begin (index uint64 ) {
w .lastIndex .Store (index )
w .markCh <- mark {index : index , done : false }
}
func (w *WaterMark ) BeginMany (indices []uint64 ) {
w .lastIndex .Store (indices [len (indices )-1 ])
w .markCh <- mark {index : 0 , indices : indices , done : false }
}
func (w *WaterMark ) Done (index uint64 ) {
w .markCh <- mark {index : index , done : true }
}
func (w *WaterMark ) DoneMany (indices []uint64 ) {
w .markCh <- mark {index : 0 , indices : indices , done : true }
}
func (w *WaterMark ) DoneUntil () uint64 {
return w .doneUntil .Load ()
}
func (w *WaterMark ) SetDoneUntil (val uint64 ) {
w .doneUntil .Store (val )
}
func (w *WaterMark ) LastIndex () uint64 {
return w .lastIndex .Load ()
}
func (w *WaterMark ) WaitForMark (ctx context .Context , index uint64 ) error {
if w .DoneUntil () >= index {
return nil
}
waitCh := make (chan struct {})
w .markCh <- mark {index : index , waiter : waitCh }
select {
case <- ctx .Done ():
return ctx .Err ()
case <- waitCh :
return nil
}
}
func (w *WaterMark ) process (closer *z .Closer ) {
defer closer .Done ()
var indices uint64Heap
pending := make (map [uint64 ]int )
waiters := make (map [uint64 ][]chan struct {})
heap .Init (&indices )
processOne := func (index uint64 , done bool ) {
prev , present := pending [index ]
if !present {
heap .Push (&indices , index )
}
delta := 1
if done {
delta = -1
}
pending [index ] = prev + delta
doneUntil := w .DoneUntil ()
if doneUntil > index {
AssertTruef (false , "Name: %s doneUntil: %d. Index: %d" , w .Name , doneUntil , index )
}
until := doneUntil
loops := 0
for len (indices ) > 0 {
min := indices [0 ]
if done := pending [min ]; done > 0 {
break
}
heap .Pop (&indices )
delete (pending , min )
until = min
loops ++
}
if until != doneUntil {
AssertTrue (w .doneUntil .CompareAndSwap (doneUntil , until ))
}
notifyAndRemove := func (idx uint64 , toNotify []chan struct {}) {
for _ , ch := range toNotify {
close (ch )
}
delete (waiters , idx )
}
if until -doneUntil <= uint64 (len (waiters )) {
for idx := doneUntil + 1 ; idx <= until ; idx ++ {
if toNotify , ok := waiters [idx ]; ok {
notifyAndRemove (idx , toNotify )
}
}
} else {
for idx , toNotify := range waiters {
if idx <= until {
notifyAndRemove (idx , toNotify )
}
}
}
}
for {
select {
case <- closer .HasBeenClosed ():
return
case mark := <- w .markCh :
if mark .waiter != nil {
doneUntil := w .doneUntil .Load ()
if doneUntil >= mark .index {
close (mark .waiter )
} else {
ws , ok := waiters [mark .index ]
if !ok {
waiters [mark .index ] = []chan struct {}{mark .waiter }
} else {
waiters [mark .index ] = append (ws , mark .waiter )
}
}
} else {
if mark .index > 0 || (mark .index == 0 && len (mark .indices ) == 0 ) {
processOne (mark .index , mark .done )
}
for _ , index := range mark .indices {
processOne (index , mark .done )
}
}
}
}
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .