// Copyright (c) HashiCorp, Inc
// SPDX-License-Identifier: MPL-2.0

package wal

import (
	
	
	
	
	
	
	

	
	
	

	
)

var (
	ErrNotFound   = types.ErrNotFound
	ErrCorrupt    = types.ErrCorrupt
	ErrSealed     = types.ErrSealed
	ErrClosed     = types.ErrClosed
	ErrOutOfRange = errors.New("index out of range")

	DefaultSegmentSize = 64 * 1024 * 1024
)

// LogStore is used to provide an interface for storing
// and retrieving logs in a durable fashion.
type LogStore interface {
	io.Closer

	// FirstIndex returns the first index written. 0 for no entries.
	FirstIndex() (uint64, error)

	// LastIndex returns the last index written. 0 for no entries.
	LastIndex() (uint64, error)

	// GetLog gets a log entry at a given index.
	GetLog(index uint64, log *types.LogEntry) error

	// StoreLogs stores multiple log entries.
	StoreLogs(logs []types.LogEntry) error

	// TruncateBack truncates the back of the log by removing all entries that
	// are after the provided `index`. In other words the entry at `index`
	// becomes the last entry in the log.
	TruncateBack(index uint64) error

	// TruncateFront truncates the front of the log by removing all entries that
	// are before the provided `index`. In other words the entry at
	// `index` becomes the first entry in the log.
	TruncateFront(index uint64) error
}

// WAL is a write-ahead log suitable for github.com/hashicorp/raft.
type WAL struct {
	closed uint32 // atomically accessed to keep it first in struct for alignment.

	dir    string
	sf     types.SegmentFiler
	metaDB types.MetaStore

	metrics *Metrics

	logger      log.Logger
	segmentSize int

	// s is the current state of the WAL files. It is an immutable snapshot that
	// can be accessed without a lock when reading. We only support a single
	// writer so all methods that mutate either the WAL state or append to the
	// tail of the log must hold the writeMu until they complete all changes.
	s atomic.Value // *state

	// writeMu must be held when modifying s or while appending to the tail.
	// Although we take care never to let readers block writer, we still only
	// allow a single writer to be updating the meta state at once. The mutex must
	// be held before s is loaded until all modifications to s or appends to the
	// tail are complete.
	writeMu sync.Mutex

	// These chans are used to hand off serial execution for segment rotation to a
	// background goroutine so that StoreLogs can return and allow the caller to
	// get on with other work while we mess with files. The next call to StoreLogs
	// needs to wait until the background work is done though since the current
	// log is sealed.
	//
	// At the end of StoreLogs, if the segment was sealed, still holding writeMu
	// we make awaitRotate so it's non-nil, then send the indexStart on
	// triggerRotate which is 1-buffered. We then drop the lock and return to
	// caller. The rotation goroutine reads from triggerRotate in a loop, takes
	// the write lock performs rotation and then closes awaitRotate and sets it to
	// nil before releasing the lock. The next StoreLogs call takes the lock,
	// checks if awaitRotate. If it is nil there is no rotation going on so
	// StoreLogs can proceed. If it is non-nil, it releases the lock and then
	// waits on the close before acquiring the lock and continuing.
	triggerRotate chan uint64
	awaitRotate   chan struct{}
}

type walOpt func(*WAL)

// Open attempts to open the WAL stored in dir. If there are no existing WAL
// files a new WAL will be initialized there. The dir must already exist and be
// readable and writable to the current process. If existing files are found,
// recovery is attempted. If recovery is not possible an error is returned,
// otherwise the returned *WAL is in a state ready for use.
func ( string,  ...walOpt) (*WAL, error) {
	 := &WAL{
		dir:           ,
		triggerRotate: make(chan uint64, 1),
	}
	// Apply options
	for ,  := range  {
		()
	}
	if  := .applyDefaultsAndValidate();  != nil {
		return nil, 
	}

	// Load or create metaDB
	,  := .metaDB.Load(.dir)
	if  != nil {
		return nil, 
	}

	 := state{
		segments:      &immutable.SortedMap[uint64, segmentState]{},
		nextSegmentID: .NextSegmentID,
	}

	// Get the set of all persisted segments so we can prune it down to just the
	// unused ones as we go.
	,  := .sf.List()
	if  != nil {
		return nil, 
	}

	// Build the state
	 := false
	for ,  := range .Segments {
		// We want to keep this segment since it's still in the metaDB list!
		delete(, .ID)

		if .SealTime.IsZero() {
			// This is an unsealed segment. It _must_ be the last one. Safety check!
			if  < len(.Segments)-1 {
				return nil, fmt.Errorf("unsealed segment is not at tail")
			}

			// Try to recover this segment
			,  := .sf.RecoverTail()
			if errors.Is(, os.ErrNotExist) {
				// Handle no file specially. This can happen if we crashed right after
				// persisting the metadata but before we managed to persist the new
				// file. In fact it could happen if the whole machine looses power any
				// time before the fsync of the parent dir since the FS could loose the
				// dir entry for the new file until that point. We do ensure we pass
				// that point before we return from Append for the first time in that
				// new file so that's safe, but we have to handle recovering from that
				// case here.
				,  = .sf.Create()
			}
			if  != nil {
				return nil, 
			}
			// Set the tail and "reader" for this segment
			 := segmentState{
				SegmentInfo: ,
				r:           ,
			}
			.tail = 
			.segments = .segments.Set(.BaseIndex, )
			 = true

			// We're done with this loop, break here to avoid nesting all the rest of
			// the logic!
			break
		}

		// This is a sealed segment

		// Open segment reader
		,  := .sf.Open()
		if  != nil {
			return nil, 
		}

		// Store the open reader to get logs from
		 := segmentState{
			SegmentInfo: ,
			r:           ,
		}
		.segments = .segments.Set(.BaseIndex, )
	}

	if ! {
		// There was no unsealed segment at the end. This can only really happen
		// when the log is empty with zero segments (either on creation or after a
		// truncation that removed all segments) since we otherwise never allow the
		// state to have a sealed tail segment. But this logic works regardless!

		// Create a new segment. We use baseIndex of 1 even though the first append
		// might be much higher - we'll allow that since we know we have no records
		// yet and so lastIndex will also be 0.
		 := .newSegment(.nextSegmentID, 1)
		.nextSegmentID++
		 := segmentState{
			SegmentInfo: ,
		}
		.segments = .segments.Set(.BaseIndex, )

		// Persist the new meta to "commit" it even before we create the file so we
		// don't attempt to recreate files with duplicate IDs on a later failure.
		if  := .metaDB.CommitState(.Persistent());  != nil {
			return nil, 
		}

		// Create the new segment file
		,  := .sf.Create()
		if  != nil {
			return nil, 
		}
		.tail = 
		// Update the segment in memory so we have a reader for the new segment. We
		// don't need to commit again as this isn't changing the persisted metadata
		// about the segment.
		.r = 
		.segments = .segments.Set(.BaseIndex, )
	}

	// Store the in-memory state (it was already persisted if we modified it
	// above) there are no readers yet since we are constructing a new WAL so we
	// don't need to jump through the mutateState hoops yet!
	.s.Store(&)

	// Delete any unused segment files left over after a crash.
	.deleteSegments()

	// Start the rotation routine
	go .runRotate()

	return , nil
}

// stateTxn represents a transaction body that mutates the state under the
// writeLock. s is already a shallow copy of the current state that may be
// mutated as needed. If a nil error is returned, s will be atomically set as
// the new state. If a non-nil finalizer func is returned it will be atomically
// attached to the old state after it's been replaced but before the write lock
// is released. The finalizer will be called exactly once when all current
// readers have released the old state. If the transaction func returns a
// non-nil postCommit it is executed after the new state has been committed to
// metaDB. It may mutate the state further (captured by closure) before it is
// atomically committed in memory but the update won't be persisted to disk in
// this transaction. This is used where we need sequencing between committing
// meta and creating and opening a new file. Both need to happen in memory in
// one transaction but the disk commit isn't at the end! If postCommit returns
// an error, the state is not updated in memory and the error is returned to the
// mutate caller.
type stateTxn func(s *state) (finalizer func(), postCommit func() error, err error)

func ( *WAL) () *state {
	return .s.Load().(*state)
}

// mutateState executes a stateTxn. writeLock MUST be held while calling this.
func ( *WAL) ( stateTxn) error {
	 := .loadState()
	.acquire()
	defer .release()

	 := .clone()
	, ,  := (&)
	if  != nil {
		return 
	}

	// Commit updates to meta
	if  := .metaDB.CommitState(.Persistent());  != nil {
		return 
	}

	if  != nil {
		if  := ();  != nil {
			return 
		}
	}

	.s.Store(&)
	.finalizer.Store()
	return nil
}

// acquireState should be used by all readers to fetch the current state. The
// returned release func must be called when no further accesses to state or the
// data within it will be performed to free old files that may have been
// truncated concurrently.
func ( *WAL) () (*state, func()) {
	 := .loadState()
	return , .acquire()
}

// newSegment creates a types.SegmentInfo with the passed ID and baseIndex, filling in
// the segment parameters based on the current WAL configuration.
func ( *WAL) (,  uint64) types.SegmentInfo {
	return types.SegmentInfo{
		ID:        ,
		BaseIndex: ,
		MinIndex:  ,
		SizeLimit: uint32(.segmentSize),

		CreateTime: time.Now(),
	}
}

// FirstIndex returns the first index written. 0 for no entries.
func ( *WAL) () (uint64, error) {
	if  := .checkClosed();  != nil {
		return 0, 
	}
	,  := .acquireState()
	defer ()
	return .firstIndex(), nil
}

// LastIndex returns the last index written. 0 for no entries.
func ( *WAL) () (uint64, error) {
	if  := .checkClosed();  != nil {
		return 0, 
	}
	,  := .acquireState()
	defer ()
	return .lastIndex(), nil
}

// GetLog gets a log entry at a given index.
func ( *WAL) ( uint64,  *types.LogEntry) error {
	if  := .checkClosed();  != nil {
		return 
	}
	,  := .acquireState()
	defer ()
	.metrics.EntriesRead.Inc()

	if  := .getLog(, );  != nil {
		return 
	}
	.Index = 
	.metrics.EntryBytesRead.Add(float64(len(.Data)))
	return nil
}

// StoreLogs stores multiple log entries.
func ( *WAL) ( []types.LogEntry) error {
	if  := .checkClosed();  != nil {
		return 
	}
	if len() < 1 {
		return nil
	}

	.writeMu.Lock()
	defer .writeMu.Unlock()

	 := .awaitRotate
	if  != nil {
		// We managed to race for writeMu with the background rotate operation which
		// needs to complete first. Wait for it to complete.
		.writeMu.Unlock()
		<-
		.writeMu.Lock()
	}

	,  := .acquireState()
	defer ()

	// Verify monotonicity since we assume it
	 := .lastIndex()

	// Special case, if the log is currently empty and this is the first append,
	// we allow any starting index. We've already created an empty tail segment
	// though and probably started at index 1. Rather than break the invariant
	// that BaseIndex is the same as the first index in the segment (which causes
	// lots of extra complexity lower down) we simply accept the additional cost
	// in this rare case of removing the current tail and re-creating it with the
	// correct BaseIndex for the first log we are about to append. In practice,
	// this only happens on startup of a new server, or after a user snapshot
	// restore which are both rare enough events that the cost is not significant
	// since the cost of creating other state or restoring snapshots is larger
	// anyway. We could theoretically defer creating at all until we know for sure
	// but that is more complex internally since then everything has to handle the
	// uninitialized case where the is no tail yet with special cases.
	 := .getTailInfo()
	// Note we check index != ti.BaseIndex rather than index != 1 so that this
	// works even if we choose to initialize first segments to a BaseIndex other
	// than 1. For example it might be marginally more performant to choose to
	// initialize to the old MaxIndex + 1 after a truncate since that is what our
	// raft library will use after a restore currently so will avoid this case on
	// the next append, while still being generally safe.
	if  == 0 && [0].Index != .BaseIndex {
		if  := .resetEmptyFirstSegmentBaseIndex([0].Index);  != nil {
			return 
		}

		// Re-read state now we just changed it.
		,  := .acquireState()
		defer ()

		// Overwrite the state we read before so the code below uses the new state
		 = 
	}

	// Encode logs
	 := uint64(0)
	for ,  := range  {
		if  > 0 && .Index != (+1) {
			return fmt.Errorf("non-monotonic log entries: tried to append index %d after %d", .Index, )
		}
		 = .Index
		 += uint64(len([].Data))
	}
	if  := .tail.Append();  != nil {
		return 
	}
	.metrics.Appends.Inc()
	.metrics.EntriesWritten.Add(float64(len()))
	.metrics.BytesWritten.Add(float64())

	// Check if we need to roll logs
	, ,  := .tail.Sealed()
	if  != nil {
		return 
	}
	if  {
		// Async rotation to allow caller to do more work while we mess with files.
		.triggerRotateLocked()
	}
	return nil
}

func ( *WAL) ( uint64) error {
	 := func() error {
		if  := .checkClosed();  != nil {
			return 
		}
		.writeMu.Lock()
		defer .writeMu.Unlock()

		,  := .acquireState()
		defer ()

		if  < .firstIndex() {
			// no-op.
			return nil
		}
		// Note that lastIndex is not checked here to allow for a WAL "reset".
		// e.g. if the last index is currently 5, and a TruncateFront(10) call
		// comes in, this is a valid truncation, resulting in an empty WAL with
		// the new firstIndex and lastIndex being 0. On the next call to
		// StoreLogs, the firstIndex will be set to the index of the first log
		// (special case with empty WAL).

		return .truncateHeadLocked()
	}()
	.metrics.Truncations.WithLabelValues("front", fmt.Sprintf("%t",  == nil))
	return 
}

func ( *WAL) ( uint64) error {
	 := func() error {
		if  := .checkClosed();  != nil {
			return 
		}
		.writeMu.Lock()
		defer .writeMu.Unlock()

		,  := .acquireState()
		defer ()

		,  := .firstIndex(), .lastIndex()
		if  >  {
			// no-op.
			return nil
		}
		if  <  {
			return fmt.Errorf("truncate back err %w: first=%d, last=%d, index=%d", ErrOutOfRange, , , )
		}

		return .truncateTailLocked()
	}()
	.metrics.Truncations.WithLabelValues("back", fmt.Sprintf("%t",  == nil))
	return 
}

func ( *WAL) ( uint64) {
	if atomic.LoadUint32(&.closed) == 1 {
		return
	}
	.awaitRotate = make(chan struct{})
	.triggerRotate <- 
}

func ( *WAL) () {
	for {
		 := <-.triggerRotate

		.writeMu.Lock()

		// Either triggerRotate was closed by Close, or Close raced with a real
		// trigger, either way shut down without changing anything else. In the
		// second case the segment file is sealed but meta data isn't updated yet
		// but we have to handle that case during recovery anyway so it's simpler
		// not to try and complete the rotation here on an already-closed WAL.
		 := atomic.LoadUint32(&.closed)
		if  == 1 {
			.writeMu.Unlock()
			return
		}

		 := .rotateSegmentLocked()
		if  != nil {
			// The only possible errors indicate bugs and could probably validly be
			// panics, but be conservative and just attempt to log them instead!
			level.Error(.logger).Log("msg", "rotate error", "err", )
		}
		 := .awaitRotate
		.awaitRotate = nil
		.writeMu.Unlock()
		// Now we are done, close the channel to unblock the waiting writer if there
		// is one
		close()
	}
}

func ( *WAL) ( uint64) error {
	 := func( *state) (func(), func() error, error) {
		// Mark current tail as sealed in segments
		 := .getTailInfo()
		if  == nil {
			// Can't happen
			return nil, nil, fmt.Errorf("no tail found during rotate")
		}

		// Note that tail is a copy since it's a value type. Even though this is a
		// pointer here it's pointing to a copy on the heap that was made in
		// getTailInfo above, so we can mutate it safely and update the immutable
		// state with our version.
		.SealTime = time.Now()
		.MaxIndex = .tail.LastIndex()
		.IndexStart = 
		.metrics.LastSegmentAgeSeconds.Set(.SealTime.Sub(.CreateTime).Seconds())

		// Update the old tail with the seal time etc.
		.segments = .segments.Set(.BaseIndex, *)

		,  := .createNextSegment()
		return nil, , 
	}
	.metrics.SegmentRotations.Inc()
	return .mutateStateLocked()
}

// createNextSegment is passes a mutable copy of the new state ready to have a
// new segment appended. newState must be a copy, taken under write lock which
// is still held by the caller and its segments map must contain all non-tail
// segments that should be in the log, all must be sealed at this point. The new
// segment's baseIndex will be the current last-segment's MaxIndex + 1 (or 1 if
// no current tail segment). The func returned is to be executed post
// transaction commit to create the actual segment file.
func ( *WAL) ( *state) (func() error, error) {
	// Find existing sealed tail
	 := .getTailInfo()

	// If there is no tail, next baseIndex is 1 (or the requested next base index)
	 := uint64(1)
	if  != nil {
		 = .MaxIndex + 1
	} else if .nextBaseIndex > 0 {
		 = .nextBaseIndex
	}

	// Create a new segment
	 := .newSegment(.nextSegmentID, )
	.nextSegmentID++
	 := segmentState{
		SegmentInfo: ,
	}
	.segments = .segments.Set(.BaseIndex, )

	// We're ready to commit now! Return a postCommit that will actually create
	// the segment file once meta is persisted. We don't do it in parallel because
	// we don't want to persist a file with an ID before that ID is durably stored
	// in case the metaDB write doesn't happen.
	 := func() error {
		// Now create the new segment for writing.
		,  := .sf.Create()
		if  != nil {
			return 
		}
		.tail = 

		// Also cache the reader/log getter which is also the writer. We don't bother
		// reopening read only since we assume we have exclusive access anyway and
		// only use this read-only interface once the segment is sealed.
		.r = .tail

		// We need to re-insert it since newTail is a copy not a reference
		.segments = .segments.Set(.BaseIndex, )
		return nil
	}
	return , nil
}

// resetEmptyFirstSegmentBaseIndex is used to change the baseIndex of the tail
// segment file if its empty. This is needed when the first log written has a
// different index to the base index that was assumed when the tail was created
// (e.g. on startup). It will return an error if the log is not currently empty.
func ( *WAL) ( uint64) error {
	 := stateTxn(func( *state) (func(), func() error, error) {
		if .lastIndex() > 0 {
			return nil, nil, fmt.Errorf("can't reset BaseIndex on segment, log is not empty")
		}

		 := func() {}

		 := .getTailInfo()
		if  != nil {
			// There is an existing tail. Check if it needs to be replaced
			if .BaseIndex ==  {
				// It's fine as it is, no-op
				return nil, nil, nil
			}
			// It needs to be removed
			.segments = .segments.Delete(.BaseIndex)
			.tail = nil
			 = func() {
				.closeSegments([]io.Closer{.r})
				.deleteSegments(map[uint64]uint64{.ID: .BaseIndex})
			}
		}

		// Ensure the newly created tail has the right base index
		.nextBaseIndex = 

		// Create the new segment
		,  := .createNextSegment()
		if  != nil {
			return nil, nil, 
		}

		return , , nil
	})

	return .mutateStateLocked()
}

func ( *WAL) ( uint64) error {
	 := stateTxn(func( *state) (func(), func() error, error) {
		 := .lastIndex()

		// Iterate the segments to find any that are entirely deleted.
		 := make(map[uint64]uint64)
		 := make([]io.Closer, 0, 1)
		 := .segments.Iterator()
		var  *segmentState
		 := uint64(0)
		for !.Done() {
			, ,  := .Next()

			 := .MaxIndex
			// If the segment is the tail (unsealed) or a sealed segment that contains
			// this new min then we've found the new head.
			if .SealTime.IsZero() {
				 = .lastIndex()
				// This is the tail, check if it actually has any content to keep
				if  >=  {
					 = &
					break
				}
			} else if .MaxIndex >=  {
				 = &
				break
			}

			[.ID] = .BaseIndex
			 = append(, .r)
			.segments = .segments.Delete(.BaseIndex)
			 += ( - .MinIndex + 1) // +1 because MaxIndex is inclusive
		}

		// There may not be any segments (left) but if there are, update the new
		// head's MinIndex.
		var  func() error
		if  != nil {
			// new
			 += ( - .MinIndex)
			.MinIndex = 
			.segments = .segments.Set(.BaseIndex, *)
		} else {
			// If there is no head any more, then there is no tail either! We should
			// create a new blank one ready for use when we next append like we do
			// during initialization. As an optimization, we create it with a
			// BaseIndex of the old MaxIndex + 1 since this is what our Raft library
			// uses as the next log index after a restore so this avoids recreating
			// the files a second time on the next append.
			.nextBaseIndex =  + 1
			,  := .createNextSegment()
			if  != nil {
				return nil, nil, 
			}
			 = 
		}
		.metrics.EntriesTruncated.WithLabelValues("front").Add(float64())

		// Return a finalizer that will be called when all readers are done with the
		// segments in the current state to close and delete old segments.
		 := func() {
			.closeSegments()
			.deleteSegments()
		}
		return , , nil
	})

	return .mutateStateLocked()
}

func ( *WAL) ( uint64) error {
	 := stateTxn(func( *state) (func(), func() error, error) {
		// Reverse iterate the segments to find any that are entirely deleted.
		 := make(map[uint64]uint64)
		 := make([]io.Closer, 0, 1)
		 := .segments.Iterator()
		.Last()

		 := uint64(0)
		for !.Done() {
			, ,  := .Prev()

			if .BaseIndex <=  {
				// We're done
				break
			}

			 := .MaxIndex
			if .SealTime.IsZero() {
				 = .lastIndex()
			}

			[.ID] = .BaseIndex
			 = append(, .r)
			.segments = .segments.Delete(.BaseIndex)
			 += ( - .MinIndex + 1) // +1 becuase MaxIndex is inclusive
		}

		 := .getTailInfo()
		if  != nil {
			 := .MaxIndex

			// Check that the tail is sealed (it won't be if we didn't need to remove
			// the actual partial tail above).
			if .SealTime.IsZero() {
				.SealTime = time.Now()
				 = .lastIndex()
			}
			// Update the MaxIndex

			 += ( - )
			.MaxIndex = 

			// And update the tail in the new state
			.segments = .segments.Set(.BaseIndex, *)
		}

		// Create the new tail segment
		,  := .createNextSegment()
		if  != nil {
			return nil, nil, 
		}
		.metrics.EntriesTruncated.WithLabelValues("back").Add(float64())

		// Return a finalizer that will be called when all readers are done with the
		// segments in the current state to close and delete old segments.
		 := func() {
			.closeSegments()
			.deleteSegments()
		}
		return , , nil
	})

	return .mutateStateLocked()
}

func ( *WAL) ( map[uint64]uint64) {
	for ,  := range  {
		if  := .sf.Delete(, );  != nil {
			// This is not fatal. We can continue just old files might need manual
			// cleanup somehow.
			level.Error(.logger).Log("msg", "failed to delete old segment", "baseIndex", , "id", , "err", )
		}
	}
}

func ( *WAL) ( []io.Closer) {
	for ,  := range  {
		if  != nil {
			if  := .Close();  != nil {
				// Shouldn't happen!
				level.Error(.logger).Log("msg", "error closing old segment file", "err", )
			}
		}
	}
}

func ( *WAL) () error {
	 := atomic.LoadUint32(&.closed)
	if  != 0 {
		return ErrClosed
	}
	return nil
}

// Close closes all open files related to the WAL. The WAL is in an invalid
// state and should not be used again after this is called. It is safe (though a
// no-op) to call it multiple times and concurrent reads and writes will either
// complete safely or get ErrClosed returned depending on sequencing. Generally
// reads and writes should be stopped before calling this to avoid propagating
// errors to users during shutdown but it's safe from a data-race perspective.
func ( *WAL) () error {
	if  := atomic.SwapUint32(&.closed, 1);  != 0 {
		// Only close once
		return nil
	}

	// Wait for writes
	.writeMu.Lock()
	defer .writeMu.Unlock()

	// It doesn't matter if there is a rotation scheduled because runRotate will
	// exist when it sees we are closed anyway.
	.awaitRotate = nil
	// Awake and terminate the runRotate
	close(.triggerRotate)

	// Replace state with nil state
	 := .loadState()
	.acquire()
	defer .release()

	.s.Store(&state{})

	// Old state might be still in use by readers, attach closers to all open
	// segment files.
	 := make([]io.Closer, 0, .segments.Len())
	 := .segments.Iterator()
	for !.Done() {
		, ,  := .Next()
		if .r != nil {
			 = append(, .r)
		}
	}
	// Store finalizer to run once all readers are done. There can't be an
	// existing finalizer since this was the active state read under a write
	// lock and finalizers are only set on states that have been replaced under
	// that same lock.
	.finalizer.Store(func() {
		.closeSegments()
	})

	return .metaDB.Close()
}