// Copyright (c) HashiCorp, Inc// SPDX-License-Identifier: MPL-2.0package segmentimport ()const ( segmentFileSuffix = ".wal" segmentFileNamePattern = "%020d-%016x" + segmentFileSuffix)// Filer implements the abstraction for managing a set of segment files in a// directory. It uses a VFS to abstract actual file system operations for easier// testing.typeFilerstruct { dir string vfs types.VFS}// NewFiler creates a Filer ready for use.func ( string, types.VFS) *Filer {return &Filer{dir: ,vfs: , }}// FileName returns the formatted file name expected for this segment.// SegmentFiler implementations could choose to ignore this but it's here tofunc ( types.SegmentInfo) string {returnfmt.Sprintf(segmentFileNamePattern, .BaseIndex, .ID)}// Create adds a new segment with the given info and returns a writer or an// error.func ( *Filer) ( types.SegmentInfo) (types.SegmentWriter, error) {if .BaseIndex == 0 {returnnil, fmt.Errorf("BaseIndex must be greater than zero") } := FileName() , := .vfs.Create(.dir, , uint64(.SizeLimit))if != nil {returnnil, }returncreateFile(, )}// RecoverTail is called on an unsealed segment when re-opening the WAL it will// attempt to recover from a possible crash. It will either return an error, or// return a valid segmentWriter that is ready for further appends. If the// expected tail segment doesn't exist it must return an error wrapping// os.ErrNotExist.func ( *Filer) ( types.SegmentInfo) (types.SegmentWriter, error) { := FileName() , := .vfs.OpenWriter(.dir, )if != nil {returnnil, }returnrecoverFile(, )}// Open an already sealed segment for reading. Open may validate the file's// header and return an error if it doesn't match the expected info.func ( *Filer) ( types.SegmentInfo) (types.SegmentReader, error) { := FileName() , := .vfs.OpenReader(.dir, )if != nil {returnnil, }// Validate header here since openReader is re-used by writer where it's valid // for the file header not to be committed yet after a crash so we can't check // it there.var [fileHeaderLen]byteif , := .ReadAt([:], 0); != nil {iferrors.Is(, io.EOF) {// Treat failure to read a header as corruption since a sealed file should // never not have a valid header. (I.e. even if crashes happen it should // be impossible to seal a segment with no header written so this // indicates that something truncated the file after the fact)returnnil, fmt.Errorf("%w: failed to read header: %s", types.ErrCorrupt, ) }returnnil, } , := readFileHeader([:])if != nil {returnnil, }if := validateFileHeader(*, ); != nil {returnnil, }returnopenReader(, )}// List returns the set of segment IDs currently stored. It's used by the WAL// on recovery to find any segment files that need to be deleted following a// unclean shutdown. The returned map is a map of ID -> BaseIndex. BaseIndex// is returned to allow subsequent Delete calls to be made.func ( *Filer) () (map[uint64]uint64, error) { , , := .listInternal()return , }func ( *Filer) () (map[uint64]uint64, []uint64, error) { , := .vfs.ListDir(.dir)if != nil {returnnil, nil, } := make(map[uint64]uint64) := make([]uint64, 0)for , := range {if !strings.HasSuffix(, segmentFileSuffix) {continue }// Parse BaseIndex and ID from the file namevar , uint64 , := fmt.Sscanf(, segmentFileNamePattern, &, &)if != nil {returnnil, nil, types.ErrCorrupt }if != 2 {// Misnamed segment files with the right suffix indicates a bug or // tampering, we can't be sure what's happened to the data.returnnil, nil, types.ErrCorrupt } [] = = append(, ) }return , , nil}// Delete removes the segment with given baseIndex and id if it exists. Note// that baseIndex is technically redundant since ID is unique on it's own. But// in practice we name files (or keys) with both so that they sort correctly.// This interface allows a simpler implementation where we can just delete// the file if it exists without having to scan the underlying storage for a.func ( *Filer) ( uint64, uint64) error { := fmt.Sprintf(segmentFileNamePattern, , )return .vfs.Delete(.dir, )}// DumpSegment attempts to read the segment file specified by the baseIndex and// ID. It's intended purpose is for debugging the contents of segment files and// unlike the SegmentFiler interface, it doesn't assume the caller has access to// the correct metadata. This allows dumping log segments in a WAL that is still// being written to by another process. Without metadata we don't know if the// file is sealed so always recover by reading through the whole file. If after// or before are non-zero, the specify a exclusive lower or upper bound on which// log entries should be emitted. No error checking is done on the read data. fn// is called for each entry passing the raft info read from the file header (so// that the caller knows which codec to use for example) the raft index of the// entry and the raw bytes of the entry itself. The callback must return true to// continue reading. The data slice is only valid for the lifetime of the call.func ( *Filer) ( uint64, uint64, , uint64, func( types.SegmentInfo, types.LogEntry) (bool, error)) error { := fmt.Sprintf(segmentFileNamePattern, , ) , := .vfs.OpenReader(.dir, )if != nil {return } := make([]byte, 64*1024) := typestruct {uint64int64uint32 }var [] _, = readThroughSegment(, func( types.SegmentInfo, frameHeader, int64) (bool, error) {if .typ == FrameCommit {// All the previous entries have been committed. Read them and send up to // caller.for , := range {// Check the header is reasonableif . > MaxEntrySize {returnfalse, fmt.Errorf("failed to read entry idx=%d, frame header length (%d) is too big: %w", ., ., ) }if . > uint32(len()) { = make([]byte, .) } , := .ReadAt([:.], .+frameHeaderLen)if != nil {returnfalse, }ifuint32() < . {returnfalse, io.ErrUnexpectedEOF } , := (, types.LogEntry{Index: ., Data: [:]})if ! || != nil {return , } }// Reset batch = [:0]returntrue, nil }if .typ != FrameEntry {returntrue, nil }if <= {// Not in the range we care about, skip reading the entry. ++returntrue, nil }if > 0 && >= {// We're donereturnfalse, nil } = append(, {, , .len}) ++returntrue, nil })return}// DumpLogs attempts to read all log entries from segment files in the directory// for debugging purposes. It does _not_ use the metadata and so may output log// entries that are uncommitted or already truncated as far as the writing// process is concerned. As such it should not be used for replication of data.// It is useful though to debug the contents of the log even while the writing// application is still running. After and before if non-zero specify exclusive// bounds on the logs that should be returned which may allow the implementation// to skip reading entire segment files that are not in the range.func ( *Filer) (, uint64, func( types.SegmentInfo, types.LogEntry) (bool, error)) error { , , := .listInternal()if != nil {return }for , := range { := [] := uint64(0)if +1 < len() {// This is not the last segment, peek at the base index of that one and // assume that this segment won't contain indexes that high. = [[+1]] }// See if this file contains any indexes in the rangeif > 0 && > 0 && >= {// This segment is all indexes before the lower bound we care aboutcontinue }if > 0 && <= {// This segment is all indexes higher than the upper bound. We've output // every log in the range at this point (barring edge cases where we race // with a truncation which leaves multiple generations of segment files on // disk which we are going to ignore for now).returnnil }// We probably care about at least some of the entries in this segment := .DumpSegment(, , , , )if != nil {return } }returnnil}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.