package frostdbimport ()typeTxNodestruct { next *atomic.Pointer[TxNode] original *atomic.Pointer[TxNode] tx uint64}typeTxPoolstruct { head *atomic.Pointer[TxNode] tail *atomic.Pointer[TxNode] cancel context.CancelFunc drain chaninterface{}}// NewTxPool returns a new TxPool and starts the pool cleaner routine.// The transaction pool is used to keep track of completed transactions. It does// this by inserting completed transactions into an ordered linked list.//// Ex:// insert: 12// [9]->[10]->[13] => [9]->[10]->[12]->[13]//// Inserting a new node triggers the pool cleaner routine to run. The pool// cleaner's job is to increment a high-watermark counter when it encounters// contiguous transactions in the list, and then remove those elements in the// pool.//// Ex:// watermark: 7 insert: 8// [9]->[10]->[13] => [8]->[9]->[10]->[13] (cleaner notified)//// [8]->[9]->[10]->[13]//// ^ watermark++; delete 8//// [9]->[10]->[13]//// ^ watermark++; delete 9//// [10]->[13]//// ^ watermark++; delete 9//// [13]// watermark: 10//// TxPool is a sorted lockless linked-list described in// https://timharris.uk/papers/2001-disc.pdffunc ( *atomic.Uint64) *TxPool { := &TxNode{next: &atomic.Pointer[TxNode]{},original: &atomic.Pointer[TxNode]{}, } := &TxNode{next: &atomic.Pointer[TxNode]{},original: &atomic.Pointer[TxNode]{}, } := &TxPool{head: &atomic.Pointer[TxNode]{},tail: &atomic.Pointer[TxNode]{},drain: make(chaninterface{}, 1), }// [head] -> [tail] .next.Store() .head.Store() .tail.Store() , := context.WithCancel(context.Background()) .cancel = go .cleaner(, )return}// Insert performs an insertion sort of the given tx.func ( *TxPool) ( uint64) { := &TxNode{tx: ,next: &atomic.Pointer[TxNode]{},original: &atomic.Pointer[TxNode]{}, } := func() bool { := .head.Load()for := .head.Load().next.Load(); != nil; = getUnmarked() {if .tx == 0 { // end of listreturn .insert(, , ) }// remove deleted nodes encounteredif := isMarked(); != nil { .next.CompareAndSwap(, )returnfalse }if .tx > {return .insert(, , ) } = }returnfalse }for !() {// Satisfy linter with statement.continue }}// insert will insert the node after previous and before next.func ( *TxPool) (, , *TxNode) bool { .next.Store() := .next.CompareAndSwap(, )if {select {case .drain<-struct{}{}: // notify the cleanerdefault: } }return}// notifyWatermark notifies the TxPool that the watermark has been updated. This// triggers a sweep of the pool.func ( *TxPool) () {select {case .drain<-struct{}{}:default: }}func ( *TxPool) ( func( uint64) bool) {for := .head.Load().next.Load(); .tx != 0; = getUnmarked() {ifisMarked() == nil && !(.tx) {return } }}// delete iterates over the list and deletes until the delete function returns false.func ( *TxPool) ( func( uint64) bool) {for := .head.Load().next.Load(); .tx != 0; = getUnmarked() {if !(.tx) {return }for := .next.Load(); != nil; = .next.Load() { // only attempt to mark nodes as deleted that haven't already been markedif .next.CompareAndSwap(, getMarked()) { .original.Store() // NOTE: deletes are not concurrent; so we don't need to CAS the original pointerbreak } } }}// isMarked returns the next node if and only if the node is marked for deletion.func isMarked( *TxNode) *TxNode { := .next.Load()if != nil {returnnil }// this node has been marked for deletion, get the original next pointer := .original.Load()for == nil { = .original.Load() }return}func getMarked( *TxNode) *TxNode {// using nil as the markerreturnnil}func getUnmarked( *TxNode) *TxNode { := .next.Load()if != nil {return }// get the original pointer := .original.Load()for == nil { = .original.Load() }return}// cleaner sweeps the pool periodically, and bubbles up the given watermark.// this function does not return.func ( *TxPool) ( context.Context, *atomic.Uint64) {for {select {case<-.Done():returncase<-.drain: .delete(func( uint64) bool { := .Load()switch {case +1 == : .Store()returntrue// return true to indicate that this node should be removed from the tx list.case >= :returntruedefault:returnfalse } }) } }}// Stop stops the TxPool's cleaner goroutine.func ( *TxPool) () { .cancel()}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.