// Copyright (c) HashiCorp, Inc// SPDX-License-Identifier: MPL-2.0package segmentimport ()// Writer allows appending logs to a segment file as well as reading them back.typeWriterstruct {// commitIdx is updated after an append batch is fully persisted to disk to // allow readers to read the new value. Note that readers must not read values // larger than this even if they are available in tailIndex as they are not // yet committed to disk! commitIdx uint64// offsets is the index offset. The first element corresponds to the // BaseIndex. It is accessed concurrently by readers and the single writer // without locks! This is race-free via the following invariants: // - the slice here is never mutated only copied though it may still refer to // the same backing array. // - readers only ever read up to len(offsets) in the atomically accessed // slice. Those elements of the backing array are immutable and will never // be modified once they are accessible to readers. // - readers and writers synchronize on atomic access to the slice // - serial writer will only append to the end which either mutates the // shared backing array but at an index greater than the len any reader has // seen, or a new backing array is allocated and the old one copied into it // which also will never mutate the entries readers can already "see" via // the old slice. offsets atomic.Value// []uint32// writer state is accessed only on the (serial) write path so doesn't need // synchronization. writer struct {// commitBuf stores the pending frames waiting to be flushed to the current // tail block. commitBuf []byte// crc is the rolling crc32 Castagnoli sum of all data written since the // last fsync. crc uint32// writeOffset is the absolute file offset up to which we've written data to // the file. The contents of commitBuf will be written at this offset when // it commits or we reach the end of the block, whichever happens first. writeOffset uint32// indexStart is set when the tail is sealed indicating the file offset at // which the index array was written. indexStart uint64 } info types.SegmentInfo wf types.WritableFile r types.SegmentReader}func createFile( types.SegmentInfo, types.WritableFile) (*Writer, error) { , := openReader(, )if != nil {returnnil, } := &Writer{info: ,wf: ,r: , } .tail = if := .initEmpty(); != nil {returnnil, }return , nil}func recoverFile( types.SegmentInfo, types.WritableFile) (*Writer, error) { , := openReader(, )if != nil {returnnil, } := &Writer{info: ,wf: ,r: , } .tail = if := .recoverTail(); != nil {returnnil, }return , nil}func ( *Writer) () error {// Write header into write buffer to be written out with the first commit. .writer.writeOffset = 0 .ensureBufCap(fileHeaderLen) .writer.commitBuf = .writer.commitBuf[:fileHeaderLen]if := writeFileHeader(.writer.commitBuf, .info); != nil {return } .writer.crc = crc32.Checksum(.writer.commitBuf[:fileHeaderLen], castagnoliTable)// Initialize the index := make([]uint32, 0, 32*1024) .offsets.Store()returnnil}func ( *Writer) () error {// We need to track the last two commit framestypestruct {frameHeaderint64int64int }var , * := make([]uint32, 0, 32*1024) , := readThroughSegment(.wf, func( types.SegmentInfo, frameHeader, int64) (bool, error) {switch .typ {caseFrameEntry:// Record the frame offset = append(, uint32())caseFrameIndex:// So this segment was sealed! (or attempted) keep track of this // indexStart in case it turns out the Seal actually committed completely. // We store the start of the actual array not the frame header. .writer.indexStart = uint64() + frameHeaderLencaseFrameCommit:// The payload is not the length field in this case! = = &{ : , : , : 0, // First commit includes the file header : len(), // Track how many entries were found up to this commit point. }if != nil { . = . + frameHeaderLen } }returntrue, nil })if != nil {return }if == nil {// There were no commit frames found at all. This segment file is // effectively empty. Init it that way ready for appending. This overwrites // the file header so it doesn't matter if it was valid or not.return .initEmpty() }// Assume that the final commit is good for now and set the writer state .writer.writeOffset = uint32(. + frameHeaderLen)// Just store what we have for now to ensure the defer doesn't panic we'll // probably update this below. .offsets.Store()// Whichever path we take, fix up the commitIdx before we leavedeferfunc() { := .getOffsets()iflen() > 0 {// Non atomic is OK because this file is not visible to any other threads // yet. .commitIdx = .info.BaseIndex + uint64(len()) - 1 } }()if . < len() {// Some entries were found after the last commit. Those must be a partial // write that was uncommitted so can be ignored. But the fact they were // written at all means that the last commit frame must have been completed // and acknowledged so we don't need to verify anything. Just truncate the // extra entries from index and reset the write cursor to continue appending // after the last commit. = [:.] .offsets.Store()// Since at least one commit was found, the header better be valid!returnvalidateFileHeader(*, .info) }// Last frame was a commit frame! Let's check that all the data written in // that commit frame made it to disk. // Verify the length first := . - .// We know bufLen can't be bigger than the whole segment file because none of // the values above were read from the data just from the offsets we moved // through. := make([]byte, )if , := .wf.ReadAt(, .); != nil {returnfmt.Errorf("failed to read last committed batch for CRC validation: %w", ) } := crc32.Checksum(, castagnoliTable)if == ..crc {// All is good. We already setup the state we need for writer other than // offsets. .offsets.Store()// Since at least one commit was found, the header better be valid!returnvalidateFileHeader(*, .info) }// Last commit was incomplete rewind back to the previous one or start of fileif == nil {// Init wil re-write the file header so it doesn't matter if it was corrupt // or not!return .initEmpty() } .writer.writeOffset = uint32(. + frameHeaderLen) = [:.] .offsets.Store()// Since at least one commit was found, the header better be valid!returnvalidateFileHeader(*, .info)}// Close implements io.Closerfunc ( *Writer) () error {return .r.Close()}// GetLog implements types.SegmentReaderfunc ( *Writer) ( uint64, *types.LogEntry) error {return .r.GetLog(, )}// Append adds one or more entries. It must not return until the entries are// durably stored otherwise raft's guarantees will be compromised.func ( *Writer) ( []types.LogEntry) error {iflen() < 1 {returnnil }if .writer.indexStart > 0 {returntypes.ErrSealed }// Iterate entries and append each onefor , := range {if := .appendEntry(); != nil {return } } := .getOffsets()// Work out if we need to seal before we commit and sync.if (.writer.writeOffset + uint32(len(.writer.commitBuf)+indexFrameSize(len()))) > .info.SizeLimit {// Seal the segment! We seal it by writing an index frame before we commit.if := .appendIndex(); != nil {return } }// Write the commit frameif := .appendCommit(); != nil {return }// Commit in-memoryatomic.StoreUint64(&.commitIdx, [len()-1].Index)returnnil}func ( *Writer) () []uint32 {return .offsets.Load().([]uint32)}// OffsetForFrame implements tailWriter and allows readers to lookup entry// frames in the tail's in-memory index.func ( *Writer) ( uint64) (uint32, error) {if < .info.BaseIndex || < .info.MinIndex || > .LastIndex() {return0, types.ErrNotFound } := .getOffsets() := - .info.BaseIndex// No bounds check on entryIndex since LastIndex must ensure it's in bounds.return [], nil}func ( *Writer) ( types.LogEntry) error { := .getOffsets()// Check the invariant that this entry is the next one we expect otherwise our // index logic is incorrect and will result in panics on read.if .Index != .info.BaseIndex+uint64(len()) {returnfmt.Errorf("non-monotonic append to segment with BaseIndex=%d. Entry index %d, expected %d", .info.BaseIndex, .Index, .info.BaseIndex+uint64(len())) } := frameHeader{typ: FrameEntry,len: uint32(len(.Data)), } , := .appendFrame(, .Data)if != nil {return }// Update the offsets index// Add the index entry. Note this is safe despite mutating the same backing // array as tail because it's beyond the limit current readers will access // until we do the atomic update below. Even if append re-allocates the // backing array, it will only read the indexes smaller than numEntries from // the old array to copy them into the new one and we are not mutating the // same memory locations. Old readers might still be looking at the old // array (lower than numEntries) through the current tail.offsets slice but // we are not touching that at least below numEntries. = append(, .writer.writeOffset+uint32())// Now we can make it available to readers. Note that readers still // shouldn't read it until we actually commit to disk (and increment // commitIdx) but it's race free for them to now! .offsets.Store()returnnil}func ( *Writer) () error { := frameHeader{typ: FrameCommit,crc: .writer.crc, }if , := .appendFrame(, nil); != nil {return }// Flush all writes to the fileif := .sync(); != nil {return }// Finally, reset crc so that by the time we write the next trailer // we'll know where the append batch started. .writer.crc = 0returnnil}func ( *Writer) ( int) { := len(.writer.commitBuf) + ifcap(.writer.commitBuf) < { := minBufSize// Double buffer size until it's big enough to amortize costfor < { = * 2 } := make([]byte, ) := len(.writer.commitBuf)copy(, .writer.commitBuf) .writer.commitBuf = [:] }}func ( *Writer) () error {// Append the index record before we commit (commit and flush happen later // generally) := .getOffsets() := indexFrameSize(len()) .ensureBufCap() := len(.writer.commitBuf)if := writeIndexFrame(.writer.commitBuf[:+], ); != nil {return } .writer.commitBuf = .writer.commitBuf[:+]// Update crc with those values .writer.crc = crc32.Update(.writer.crc, castagnoliTable, .writer.commitBuf[:+])// Record the file offset where the index starts (the actual index data so // after the frame header). .writer.indexStart = uint64(.writer.writeOffset) + uint64(+frameHeaderLen)returnnil}// appendFrame appends the given frame to the current block. The frame must fit// already otherwise an error will be returned.func ( *Writer) ( frameHeader, []byte) (int, error) {// Encode frame header into current block buffer := encodedFrameSize(len()) .ensureBufCap() := len(.writer.commitBuf)if := writeFrame(.writer.commitBuf[:+], , ); != nil {return0, }// Update len of commitBuf since we resliced it for the write .writer.commitBuf = .writer.commitBuf[:+]// Update the CRC .writer.crc = crc32.Update(.writer.crc, castagnoliTable, .writer.commitBuf[:+])return , nil}func ( *Writer) () error {// Write to file , := .wf.WriteAt(.writer.commitBuf, int64(.writer.writeOffset))if == io.EOF && == len(.writer.commitBuf) {// Writer may return EOF even if it wrote all bytes if it wrote right up to // the end of the file. Ignore that case though. = nil }if != nil {return }// Reset writer state ready for next writes .writer.writeOffset += uint32(len(.writer.commitBuf)) .writer.commitBuf = .writer.commitBuf[:0]returnnil}func ( *Writer) () error {// Write out current buffer to fileif := .flush(); != nil {return }// Sync fileif := .wf.Sync(); != nil {return }// Update commitIdx atomically := .getOffsets() := uint64(0)iflen() > 0 {// Probably not possible for the to be less, but just in case we ever flush // the file with only meta data written... = uint64(.info.BaseIndex) + uint64(len()) - 1 }atomic.StoreUint64(&.commitIdx, )returnnil}// Sealed returns whether the segment is sealed or not. If it is it returns// true and the file offset that it's index array starts at to be saved in// meta data. WAL will call this after every append so it should be relatively// cheap in the common case. This design allows the final Append to write out// the index or any additional data needed at seal time in the same fsync.func ( *Writer) () (bool, uint64, error) {if .writer.indexStart == 0 {returnfalse, 0, nil }returntrue, .writer.indexStart, nil}// LastIndex returns the most recently persisted index in the log. It must// respond without blocking on append since it's needed frequently by read// paths that may call it concurrently. Typically this will be loaded from an// atomic int. If the segment is empty lastIndex should return zero.func ( *Writer) () uint64 {returnatomic.LoadUint64(&.commitIdx)}func readThroughSegment( types.ReadableFile, func( types.SegmentInfo, frameHeader, int64) (bool, error)) (*types.SegmentInfo, error) {// First read the file header. Note we wrote it as part of the first commit so // it may be missing or partial written and that's OK as long as there are no // other later commit frames!var [fileHeaderLen]byte , := .ReadAt([:], 0)// EOF is ok - the file might be empty if we crashed before committing // anything and preallocation isn't supported.if != io.EOF && != nil {returnnil, } , := readFileHeader([:])if == types.ErrCorrupt {// Header is malformed or missing, don't error yet though we'll detect it // later when we know if it's a problem or not. = nil }if != nil {returnnil, }// If header wasn't detected as corrupt, it might still be just in a way // that's valid since we've not verified it against the expected metadata yet. // We'll wait to see if the header was part of the last commit before decide // if we should validate it for corruption or not though. For now just make // sure it's not nil so we don't have to handle nil checks everywhere.if == nil {// Zero info will fail validation against the actual metadata if it was // corrupt when it shouldn't be later. Just prevents a nil panic. = &types.SegmentInfo{} }// Read through file from after header until we hit zeros, EOF or corrupt // frames. := int64(fileHeaderLen)var [frameHeaderLen]bytefor { , := .ReadAt([:], )if == io.EOF {if < frameHeaderLen {return , nil }// This is OK! The last frame in file might be a commit frame so as long // as we have it all then we can ignore the EOF for this iteration. = nil }if != nil {return , fmt.Errorf("failed reading frame at offset=%d: %w", , ) } , := readFrameHeader([:frameHeaderLen])if != nil {// This is not actually an error case. If we failed to decode it could be // because of a torn write (since we don't assume writes are atomic). We // assume that previously committed data is not silently corrupted by the // FS (see README for details). So this must be due to corruption that // happened due to non-atomic sector updates whilst committing the last // write batch.return , nil }if .typ == FrameInvalid {// This means we've hit zeros at the end of the file (or due to an // incomplete write, which we treat the same way).return , nil }// Call the callback , := (*, , )if != nil {return , }if ! {return , nil }// Skip to next frame += int64(encodedFrameSize(int(.len))) }}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.