Source File
state.go
Belonging Package
github.com/polarsignals/wal
// Copyright (c) HashiCorp, Inc// SPDX-License-Identifier: MPL-2.0package walimport ()// state is an immutable snapshot of the state of the log. Modifications must be// made by copying and modifying the copy. This is easy enough because segments// is an immutable map so changing and re-assigning to the clone won't impact// the original map, and tail is just a pointer that can be mutated in the// shallow clone. Note that methods called on the tail segmentWriter may mutate// it's state so must only be called while holding the WAL's writeLock.type state struct {// refCount tracks readers that are reading segments based on this metadata.// It is accessed atomically nd must be 64 bit aligned (i.e. leave it at the// start of the struct).refCount int32// finaliser is set at most once while WAL is holding the write lock in order// to provide a func that must be called when all current readers are done// with this state. It's used for deferring closing and deleting old segments// until we can be sure no reads are still in progress on them.finalizer atomic.Value // func()nextSegmentID uint64// nextBaseIndex is used to signal which baseIndex to use next if there are no// segments or current tail.nextBaseIndex uint64segments *immutable.SortedMap[uint64, segmentState]tail types.SegmentWriter}type segmentState struct {types.SegmentInfo// r is the SegmentReader for our in-memory state.r types.SegmentReader}// Commit converts the in-memory state into a PersistentState.func ( *state) () types.PersistentState {:= make([]types.SegmentInfo, 0, .segments.Len()):= .segments.Iterator()for !.Done() {, , := .Next()= append(, .SegmentInfo)}return types.PersistentState{NextSegmentID: .nextSegmentID,Segments: ,}}func ( *state) ( uint64, *types.LogEntry) error {// Check the tail writer firstif .tail != nil {:= .tail.GetLog(, )if != nil && != ErrNotFound {// Return actual errors since they might mask the fact that index really// is in the tail but failed to read for some other reason.return}if == nil {// No error means we found it and just need to decode.return nil}// Not in the tail segment, fall back to searching previous segments.}, := .findSegmentReader()if != nil {return}return .GetLog(, )}// findSegmentReader searches the segment tree for the segment that contains the// log at index idx. It may return the tail segment which may not in fact// contain idx if idx is larger than the last written index. Typically this is// called after already checking with the tail writer whether the log is in// there which means the caller can be sure it's not going to return the tail// segment.func ( *state) ( uint64) (types.SegmentReader, error) {if .segments.Len() == 0 {return nil, ErrNotFound}// Search for a segment with baseIndex.:= .segments.Iterator()// The baseIndex we want is the first one lower or equal to idx. Seek gets us// to the first result equal or greater so we are either at it (if equal) or// on the one _after_ the one we need. We step back since that's most likely.Seek()// The first call to Next/Prev actually returns the node the iterator is// currently on (which is probably the one after the one we want) but in some// edge cases we might actually want this one. Rather than reversing back and// coming forward again, just check both this and the one before it., , := .Prev()if && .BaseIndex > {_, , = .Prev()}// We either have the right segment or it doesn't exist.if && .MinIndex <= && (.MaxIndex == 0 || .MaxIndex >= ) {return .r, nil}return nil, ErrNotFound}func ( *state) () *segmentState {:= .segments.Iterator().Last(), , := .Next()if ! {return nil}return &}func ( *state) ( []types.LogEntry) error {return .tail.Append()}func ( *state) () uint64 {:= .segments.Iterator(), , := .Next()if ! {return 0}if .SealTime.IsZero() {// First segment is unsealed so is also the tail. Check it actually has at// least one log in otherwise it doesn't matter what the BaseIndex/MinIndex// are.if .tail.LastIndex() == 0 {// No logs in the WALreturn 0}// At least one log exists, return the MinIndex}return .MinIndex}func ( *state) () uint64 {:= .tail.LastIndex()if > 0 {return}// Current tail is empty. Check there are previous sealed segments.:= .segments.Iterator().Last(), , := .Prev()if ! {// No tail! shouldn't be possible but means no logs yetreturn 0}// Go back to the segment before the tail_, _, = .Prev()if ! {// No previous segment so the whole log is emptyreturn 0}// There was a previous segment so it's MaxIndex will be one less than the// tail's BaseIndex.:= .getTailInfo()if == nil || .BaseIndex == 0 {return 0}return .BaseIndex - 1}func ( *state) () func() {atomic.AddInt32(&.refCount, 1)return .release}func ( *state) () {// decrement on release:= atomic.AddInt32(&.refCount, -1)if == 0 {// Cleanup state associated with this version now all refs have gone. Since// there are no more refs and we should not set a finalizer until this state// is no longer the active state, we can be sure this will happen only one.// Even still lets swap the fn to ensure we only call finalizer once ever!// We can't swap actual nil as it's not the same type as func() so do a// dance with a nilFn below.var func():= .finalizer.Swap()if , := .(func()); && != nil {()}}}// clone returns a new state which is a shallow copy of just the immutable parts// of s. This is safer than a simple assignment copy because that "reads" the// atomically modified state non-atomically. We never want to copy the refCount// or finalizer anyway.func ( *state) () state {return state{nextSegmentID: .nextSegmentID,segments: .segments,tail: .tail,}}
![]() |
The pages are generated with Golds v0.8.2. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |