// Copyright (c) HashiCorp, Inc// SPDX-License-Identifier: MPL-2.0package walimport ()var (ErrNotFound = types.ErrNotFoundErrCorrupt = types.ErrCorruptErrSealed = types.ErrSealedErrClosed = types.ErrClosedErrOutOfRange = errors.New("index out of range")DefaultSegmentSize = 64 * 1024 * 1024)// LogStore is used to provide an interface for storing// and retrieving logs in a durable fashion.typeLogStoreinterface {io.Closer// FirstIndex returns the first index written. 0 for no entries.FirstIndex() (uint64, error)// LastIndex returns the last index written. 0 for no entries.LastIndex() (uint64, error)// GetLog gets a log entry at a given index.GetLog(index uint64, log *types.LogEntry) error// StoreLogs stores multiple log entries.StoreLogs(logs []types.LogEntry) error// TruncateBack truncates the back of the log by removing all entries that // are after the provided `index`. In other words the entry at `index` // becomes the last entry in the log.TruncateBack(index uint64) error// TruncateFront truncates the front of the log by removing all entries that // are before the provided `index`. In other words the entry at // `index` becomes the first entry in the log.TruncateFront(index uint64) error}// WAL is a write-ahead log suitable for github.com/hashicorp/raft.typeWALstruct { closed uint32// atomically accessed to keep it first in struct for alignment. dir string sf types.SegmentFiler metaDB types.MetaStore metrics *Metrics logger log.Logger segmentSize int// s is the current state of the WAL files. It is an immutable snapshot that // can be accessed without a lock when reading. We only support a single // writer so all methods that mutate either the WAL state or append to the // tail of the log must hold the writeMu until they complete all changes. s atomic.Value// *state// writeMu must be held when modifying s or while appending to the tail. // Although we take care never to let readers block writer, we still only // allow a single writer to be updating the meta state at once. The mutex must // be held before s is loaded until all modifications to s or appends to the // tail are complete. writeMu sync.Mutex// These chans are used to hand off serial execution for segment rotation to a // background goroutine so that StoreLogs can return and allow the caller to // get on with other work while we mess with files. The next call to StoreLogs // needs to wait until the background work is done though since the current // log is sealed. // // At the end of StoreLogs, if the segment was sealed, still holding writeMu // we make awaitRotate so it's non-nil, then send the indexStart on // triggerRotate which is 1-buffered. We then drop the lock and return to // caller. The rotation goroutine reads from triggerRotate in a loop, takes // the write lock performs rotation and then closes awaitRotate and sets it to // nil before releasing the lock. The next StoreLogs call takes the lock, // checks if awaitRotate. If it is nil there is no rotation going on so // StoreLogs can proceed. If it is non-nil, it releases the lock and then // waits on the close before acquiring the lock and continuing. triggerRotate chanuint64 awaitRotate chanstruct{}}type walOpt func(*WAL)// Open attempts to open the WAL stored in dir. If there are no existing WAL// files a new WAL will be initialized there. The dir must already exist and be// readable and writable to the current process. If existing files are found,// recovery is attempted. If recovery is not possible an error is returned,// otherwise the returned *WAL is in a state ready for use.func ( string, ...walOpt) (*WAL, error) { := &WAL{dir: ,triggerRotate: make(chanuint64, 1), }// Apply optionsfor , := range { () }if := .applyDefaultsAndValidate(); != nil {returnnil, }// Load or create metaDB , := .metaDB.Load(.dir)if != nil {returnnil, } := state{segments: &immutable.SortedMap[uint64, segmentState]{},nextSegmentID: .NextSegmentID, }// Get the set of all persisted segments so we can prune it down to just the // unused ones as we go. , := .sf.List()if != nil {returnnil, }// Build the state := falsefor , := range .Segments {// We want to keep this segment since it's still in the metaDB list!delete(, .ID)if .SealTime.IsZero() {// This is an unsealed segment. It _must_ be the last one. Safety check!if < len(.Segments)-1 {returnnil, fmt.Errorf("unsealed segment is not at tail") }// Try to recover this segment , := .sf.RecoverTail()iferrors.Is(, os.ErrNotExist) {// Handle no file specially. This can happen if we crashed right after // persisting the metadata but before we managed to persist the new // file. In fact it could happen if the whole machine looses power any // time before the fsync of the parent dir since the FS could loose the // dir entry for the new file until that point. We do ensure we pass // that point before we return from Append for the first time in that // new file so that's safe, but we have to handle recovering from that // case here. , = .sf.Create() }if != nil {returnnil, }// Set the tail and "reader" for this segment := segmentState{SegmentInfo: ,r: , } .tail = .segments = .segments.Set(.BaseIndex, ) = true// We're done with this loop, break here to avoid nesting all the rest of // the logic!break }// This is a sealed segment// Open segment reader , := .sf.Open()if != nil {returnnil, }// Store the open reader to get logs from := segmentState{SegmentInfo: ,r: , } .segments = .segments.Set(.BaseIndex, ) }if ! {// There was no unsealed segment at the end. This can only really happen // when the log is empty with zero segments (either on creation or after a // truncation that removed all segments) since we otherwise never allow the // state to have a sealed tail segment. But this logic works regardless!// Create a new segment. We use baseIndex of 1 even though the first append // might be much higher - we'll allow that since we know we have no records // yet and so lastIndex will also be 0. := .newSegment(.nextSegmentID, 1) .nextSegmentID++ := segmentState{SegmentInfo: , } .segments = .segments.Set(.BaseIndex, )// Persist the new meta to "commit" it even before we create the file so we // don't attempt to recreate files with duplicate IDs on a later failure.if := .metaDB.CommitState(.Persistent()); != nil {returnnil, }// Create the new segment file , := .sf.Create()if != nil {returnnil, } .tail = // Update the segment in memory so we have a reader for the new segment. We // don't need to commit again as this isn't changing the persisted metadata // about the segment. .r = .segments = .segments.Set(.BaseIndex, ) }// Store the in-memory state (it was already persisted if we modified it // above) there are no readers yet since we are constructing a new WAL so we // don't need to jump through the mutateState hoops yet! .s.Store(&)// Delete any unused segment files left over after a crash. .deleteSegments()// Start the rotation routinego .runRotate()return , nil}// stateTxn represents a transaction body that mutates the state under the// writeLock. s is already a shallow copy of the current state that may be// mutated as needed. If a nil error is returned, s will be atomically set as// the new state. If a non-nil finalizer func is returned it will be atomically// attached to the old state after it's been replaced but before the write lock// is released. The finalizer will be called exactly once when all current// readers have released the old state. If the transaction func returns a// non-nil postCommit it is executed after the new state has been committed to// metaDB. It may mutate the state further (captured by closure) before it is// atomically committed in memory but the update won't be persisted to disk in// this transaction. This is used where we need sequencing between committing// meta and creating and opening a new file. Both need to happen in memory in// one transaction but the disk commit isn't at the end! If postCommit returns// an error, the state is not updated in memory and the error is returned to the// mutate caller.type stateTxn func(s *state) (finalizer func(), postCommit func() error, err error)func ( *WAL) () *state {return .s.Load().(*state)}// mutateState executes a stateTxn. writeLock MUST be held while calling this.func ( *WAL) ( stateTxn) error { := .loadState() .acquire()defer .release() := .clone() , , := (&)if != nil {return }// Commit updates to metaif := .metaDB.CommitState(.Persistent()); != nil {return }if != nil {if := (); != nil {return } } .s.Store(&) .finalizer.Store()returnnil}// acquireState should be used by all readers to fetch the current state. The// returned release func must be called when no further accesses to state or the// data within it will be performed to free old files that may have been// truncated concurrently.func ( *WAL) () (*state, func()) { := .loadState()return , .acquire()}// newSegment creates a types.SegmentInfo with the passed ID and baseIndex, filling in// the segment parameters based on the current WAL configuration.func ( *WAL) (, uint64) types.SegmentInfo {returntypes.SegmentInfo{ID: ,BaseIndex: ,MinIndex: ,SizeLimit: uint32(.segmentSize),CreateTime: time.Now(), }}// FirstIndex returns the first index written. 0 for no entries.func ( *WAL) () (uint64, error) {if := .checkClosed(); != nil {return0, } , := .acquireState()defer ()return .firstIndex(), nil}// LastIndex returns the last index written. 0 for no entries.func ( *WAL) () (uint64, error) {if := .checkClosed(); != nil {return0, } , := .acquireState()defer ()return .lastIndex(), nil}// GetLog gets a log entry at a given index.func ( *WAL) ( uint64, *types.LogEntry) error {if := .checkClosed(); != nil {return } , := .acquireState()defer () .metrics.EntriesRead.Inc()if := .getLog(, ); != nil {return } .Index = .metrics.EntryBytesRead.Add(float64(len(.Data)))returnnil}// StoreLogs stores multiple log entries.func ( *WAL) ( []types.LogEntry) error {if := .checkClosed(); != nil {return }iflen() < 1 {returnnil } .writeMu.Lock()defer .writeMu.Unlock() := .awaitRotateif != nil {// We managed to race for writeMu with the background rotate operation which // needs to complete first. Wait for it to complete. .writeMu.Unlock() <- .writeMu.Lock() } , := .acquireState()defer ()// Verify monotonicity since we assume it := .lastIndex()// Special case, if the log is currently empty and this is the first append, // we allow any starting index. We've already created an empty tail segment // though and probably started at index 1. Rather than break the invariant // that BaseIndex is the same as the first index in the segment (which causes // lots of extra complexity lower down) we simply accept the additional cost // in this rare case of removing the current tail and re-creating it with the // correct BaseIndex for the first log we are about to append. In practice, // this only happens on startup of a new server, or after a user snapshot // restore which are both rare enough events that the cost is not significant // since the cost of creating other state or restoring snapshots is larger // anyway. We could theoretically defer creating at all until we know for sure // but that is more complex internally since then everything has to handle the // uninitialized case where the is no tail yet with special cases. := .getTailInfo()// Note we check index != ti.BaseIndex rather than index != 1 so that this // works even if we choose to initialize first segments to a BaseIndex other // than 1. For example it might be marginally more performant to choose to // initialize to the old MaxIndex + 1 after a truncate since that is what our // raft library will use after a restore currently so will avoid this case on // the next append, while still being generally safe.if == 0 && [0].Index != .BaseIndex {if := .resetEmptyFirstSegmentBaseIndex([0].Index); != nil {return }// Re-read state now we just changed it. , := .acquireState()defer ()// Overwrite the state we read before so the code below uses the new state = }// Encode logs := uint64(0)for , := range {if > 0 && .Index != (+1) {returnfmt.Errorf("non-monotonic log entries: tried to append index %d after %d", .Index, ) } = .Index += uint64(len([].Data)) }if := .tail.Append(); != nil {return } .metrics.Appends.Inc() .metrics.EntriesWritten.Add(float64(len())) .metrics.BytesWritten.Add(float64())// Check if we need to roll logs , , := .tail.Sealed()if != nil {return }if {// Async rotation to allow caller to do more work while we mess with files. .triggerRotateLocked() }returnnil}func ( *WAL) ( uint64) error { := func() error {if := .checkClosed(); != nil {return } .writeMu.Lock()defer .writeMu.Unlock() , := .acquireState()defer ()if < .firstIndex() {// no-op.returnnil }// Note that lastIndex is not checked here to allow for a WAL "reset". // e.g. if the last index is currently 5, and a TruncateFront(10) call // comes in, this is a valid truncation, resulting in an empty WAL with // the new firstIndex and lastIndex being 0. On the next call to // StoreLogs, the firstIndex will be set to the index of the first log // (special case with empty WAL).return .truncateHeadLocked() }() .metrics.Truncations.WithLabelValues("front", fmt.Sprintf("%t", == nil))return}func ( *WAL) ( uint64) error { := func() error {if := .checkClosed(); != nil {return } .writeMu.Lock()defer .writeMu.Unlock() , := .acquireState()defer () , := .firstIndex(), .lastIndex()if > {// no-op.returnnil }if < {returnfmt.Errorf("truncate back err %w: first=%d, last=%d, index=%d", ErrOutOfRange, , , ) }return .truncateTailLocked() }() .metrics.Truncations.WithLabelValues("back", fmt.Sprintf("%t", == nil))return}func ( *WAL) ( uint64) {ifatomic.LoadUint32(&.closed) == 1 {return } .awaitRotate = make(chanstruct{}) .triggerRotate <- }func ( *WAL) () {for { := <-.triggerRotate .writeMu.Lock()// Either triggerRotate was closed by Close, or Close raced with a real // trigger, either way shut down without changing anything else. In the // second case the segment file is sealed but meta data isn't updated yet // but we have to handle that case during recovery anyway so it's simpler // not to try and complete the rotation here on an already-closed WAL. := atomic.LoadUint32(&.closed)if == 1 { .writeMu.Unlock()return } := .rotateSegmentLocked()if != nil {// The only possible errors indicate bugs and could probably validly be // panics, but be conservative and just attempt to log them instead!level.Error(.logger).Log("msg", "rotate error", "err", ) } := .awaitRotate .awaitRotate = nil .writeMu.Unlock()// Now we are done, close the channel to unblock the waiting writer if there // is oneclose() }}func ( *WAL) ( uint64) error { := func( *state) (func(), func() error, error) {// Mark current tail as sealed in segments := .getTailInfo()if == nil {// Can't happenreturnnil, nil, fmt.Errorf("no tail found during rotate") }// Note that tail is a copy since it's a value type. Even though this is a // pointer here it's pointing to a copy on the heap that was made in // getTailInfo above, so we can mutate it safely and update the immutable // state with our version. .SealTime = time.Now() .MaxIndex = .tail.LastIndex() .IndexStart = .metrics.LastSegmentAgeSeconds.Set(.SealTime.Sub(.CreateTime).Seconds())// Update the old tail with the seal time etc. .segments = .segments.Set(.BaseIndex, *) , := .createNextSegment()returnnil, , } .metrics.SegmentRotations.Inc()return .mutateStateLocked()}// createNextSegment is passes a mutable copy of the new state ready to have a// new segment appended. newState must be a copy, taken under write lock which// is still held by the caller and its segments map must contain all non-tail// segments that should be in the log, all must be sealed at this point. The new// segment's baseIndex will be the current last-segment's MaxIndex + 1 (or 1 if// no current tail segment). The func returned is to be executed post// transaction commit to create the actual segment file.func ( *WAL) ( *state) (func() error, error) {// Find existing sealed tail := .getTailInfo()// If there is no tail, next baseIndex is 1 (or the requested next base index) := uint64(1)if != nil { = .MaxIndex + 1 } elseif .nextBaseIndex > 0 { = .nextBaseIndex }// Create a new segment := .newSegment(.nextSegmentID, ) .nextSegmentID++ := segmentState{SegmentInfo: , } .segments = .segments.Set(.BaseIndex, )// We're ready to commit now! Return a postCommit that will actually create // the segment file once meta is persisted. We don't do it in parallel because // we don't want to persist a file with an ID before that ID is durably stored // in case the metaDB write doesn't happen. := func() error {// Now create the new segment for writing. , := .sf.Create()if != nil {return } .tail = // Also cache the reader/log getter which is also the writer. We don't bother // reopening read only since we assume we have exclusive access anyway and // only use this read-only interface once the segment is sealed. .r = .tail// We need to re-insert it since newTail is a copy not a reference .segments = .segments.Set(.BaseIndex, )returnnil }return , nil}// resetEmptyFirstSegmentBaseIndex is used to change the baseIndex of the tail// segment file if its empty. This is needed when the first log written has a// different index to the base index that was assumed when the tail was created// (e.g. on startup). It will return an error if the log is not currently empty.func ( *WAL) ( uint64) error { := stateTxn(func( *state) (func(), func() error, error) {if .lastIndex() > 0 {returnnil, nil, fmt.Errorf("can't reset BaseIndex on segment, log is not empty") } := func() {} := .getTailInfo()if != nil {// There is an existing tail. Check if it needs to be replacedif .BaseIndex == {// It's fine as it is, no-opreturnnil, nil, nil }// It needs to be removed .segments = .segments.Delete(.BaseIndex) .tail = nil = func() { .closeSegments([]io.Closer{.r}) .deleteSegments(map[uint64]uint64{.ID: .BaseIndex}) } }// Ensure the newly created tail has the right base index .nextBaseIndex = // Create the new segment , := .createNextSegment()if != nil {returnnil, nil, }return , , nil })return .mutateStateLocked()}func ( *WAL) ( uint64) error { := stateTxn(func( *state) (func(), func() error, error) { := .lastIndex()// Iterate the segments to find any that are entirely deleted. := make(map[uint64]uint64) := make([]io.Closer, 0, 1) := .segments.Iterator()var *segmentState := uint64(0)for !.Done() { , , := .Next() := .MaxIndex// If the segment is the tail (unsealed) or a sealed segment that contains // this new min then we've found the new head.if .SealTime.IsZero() { = .lastIndex()// This is the tail, check if it actually has any content to keepif >= { = &break } } elseif .MaxIndex >= { = &break } [.ID] = .BaseIndex = append(, .r) .segments = .segments.Delete(.BaseIndex) += ( - .MinIndex + 1) // +1 because MaxIndex is inclusive }// There may not be any segments (left) but if there are, update the new // head's MinIndex.varfunc() errorif != nil {// new += ( - .MinIndex) .MinIndex = .segments = .segments.Set(.BaseIndex, *) } else {// If there is no head any more, then there is no tail either! We should // create a new blank one ready for use when we next append like we do // during initialization. As an optimization, we create it with a // BaseIndex of the old MaxIndex + 1 since this is what our Raft library // uses as the next log index after a restore so this avoids recreating // the files a second time on the next append. .nextBaseIndex = + 1 , := .createNextSegment()if != nil {returnnil, nil, } = } .metrics.EntriesTruncated.WithLabelValues("front").Add(float64())// Return a finalizer that will be called when all readers are done with the // segments in the current state to close and delete old segments. := func() { .closeSegments() .deleteSegments() }return , , nil })return .mutateStateLocked()}func ( *WAL) ( uint64) error { := stateTxn(func( *state) (func(), func() error, error) {// Reverse iterate the segments to find any that are entirely deleted. := make(map[uint64]uint64) := make([]io.Closer, 0, 1) := .segments.Iterator() .Last() := uint64(0)for !.Done() { , , := .Prev()if .BaseIndex <= {// We're donebreak } := .MaxIndexif .SealTime.IsZero() { = .lastIndex() } [.ID] = .BaseIndex = append(, .r) .segments = .segments.Delete(.BaseIndex) += ( - .MinIndex + 1) // +1 becuase MaxIndex is inclusive } := .getTailInfo()if != nil { := .MaxIndex// Check that the tail is sealed (it won't be if we didn't need to remove // the actual partial tail above).if .SealTime.IsZero() { .SealTime = time.Now() = .lastIndex() }// Update the MaxIndex += ( - ) .MaxIndex = // And update the tail in the new state .segments = .segments.Set(.BaseIndex, *) }// Create the new tail segment , := .createNextSegment()if != nil {returnnil, nil, } .metrics.EntriesTruncated.WithLabelValues("back").Add(float64())// Return a finalizer that will be called when all readers are done with the // segments in the current state to close and delete old segments. := func() { .closeSegments() .deleteSegments() }return , , nil })return .mutateStateLocked()}func ( *WAL) ( map[uint64]uint64) {for , := range {if := .sf.Delete(, ); != nil {// This is not fatal. We can continue just old files might need manual // cleanup somehow.level.Error(.logger).Log("msg", "failed to delete old segment", "baseIndex", , "id", , "err", ) } }}func ( *WAL) ( []io.Closer) {for , := range {if != nil {if := .Close(); != nil {// Shouldn't happen!level.Error(.logger).Log("msg", "error closing old segment file", "err", ) } } }}func ( *WAL) () error { := atomic.LoadUint32(&.closed)if != 0 {returnErrClosed }returnnil}// Close closes all open files related to the WAL. The WAL is in an invalid// state and should not be used again after this is called. It is safe (though a// no-op) to call it multiple times and concurrent reads and writes will either// complete safely or get ErrClosed returned depending on sequencing. Generally// reads and writes should be stopped before calling this to avoid propagating// errors to users during shutdown but it's safe from a data-race perspective.func ( *WAL) () error {if := atomic.SwapUint32(&.closed, 1); != 0 {// Only close oncereturnnil }// Wait for writes .writeMu.Lock()defer .writeMu.Unlock()// It doesn't matter if there is a rotation scheduled because runRotate will // exist when it sees we are closed anyway. .awaitRotate = nil// Awake and terminate the runRotateclose(.triggerRotate)// Replace state with nil state := .loadState() .acquire()defer .release() .s.Store(&state{})// Old state might be still in use by readers, attach closers to all open // segment files. := make([]io.Closer, 0, .segments.Len()) := .segments.Iterator()for !.Done() { , , := .Next()if .r != nil { = append(, .r) } }// Store finalizer to run once all readers are done. There can't be an // existing finalizer since this was the active state read under a write // lock and finalizers are only set on states that have been replaced under // that same lock. .finalizer.Store(func() { .closeSegments() })return .metaDB.Close()}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.