// Copyright (c) HashiCorp, Inc
// SPDX-License-Identifier: MPL-2.0

package segment

import (
	
	
	
	

	
)

// Writer allows appending logs to a segment file as well as reading them back.
type Writer struct {
	// commitIdx is updated after an append batch is fully persisted to disk to
	// allow readers to read the new value. Note that readers must not read values
	// larger than this even if they are available in tailIndex as they are not
	// yet committed to disk!
	commitIdx uint64

	// offsets is the index offset. The first element corresponds to the
	// BaseIndex. It is accessed concurrently by readers and the single writer
	// without locks! This is race-free via the following invariants:
	//  - the slice here is never mutated only copied though it may still refer to
	//    the same backing array.
	//  - readers only ever read up to len(offsets) in the atomically accessed
	//    slice. Those elements of the backing array are immutable and will never
	//    be modified once they are accessible to readers.
	//  - readers and writers synchronize on atomic access to the slice
	//  - serial writer will only append to the end which either mutates the
	//    shared backing array but at an index greater than the len any reader has
	//    seen, or a new backing array is allocated and the old one copied into it
	//    which also will never mutate the entries readers can already "see" via
	//    the old slice.
	offsets atomic.Value // []uint32

	// writer state is accessed only on the (serial) write path so doesn't need
	// synchronization.
	writer struct {
		// commitBuf stores the pending frames waiting to be flushed to the current
		// tail block.
		commitBuf []byte

		// crc is the rolling crc32 Castagnoli sum of all data written since the
		// last fsync.
		crc uint32

		// writeOffset is the absolute file offset up to which we've written data to
		// the file. The contents of commitBuf will be written at this offset when
		// it commits or we reach the end of the block, whichever happens first.
		writeOffset uint32

		// indexStart is set when the tail is sealed indicating the file offset at
		// which the index array was written.
		indexStart uint64
	}

	info types.SegmentInfo
	wf   types.WritableFile
	r    types.SegmentReader
}

func createFile( types.SegmentInfo,  types.WritableFile) (*Writer, error) {
	,  := openReader(, )
	if  != nil {
		return nil, 
	}
	 := &Writer{
		info: ,
		wf:   ,
		r:    ,
	}
	.tail = 
	if  := .initEmpty();  != nil {
		return nil, 
	}
	return , nil
}

func recoverFile( types.SegmentInfo,  types.WritableFile) (*Writer, error) {
	,  := openReader(, )
	if  != nil {
		return nil, 
	}
	 := &Writer{
		info: ,
		wf:   ,
		r:    ,
	}
	.tail = 

	if  := .recoverTail();  != nil {
		return nil, 
	}

	return , nil
}

func ( *Writer) () error {
	// Write header into write buffer to be written out with the first commit.
	.writer.writeOffset = 0
	.ensureBufCap(fileHeaderLen)
	.writer.commitBuf = .writer.commitBuf[:fileHeaderLen]

	if  := writeFileHeader(.writer.commitBuf, .info);  != nil {
		return 
	}

	.writer.crc = crc32.Checksum(.writer.commitBuf[:fileHeaderLen], castagnoliTable)

	// Initialize the index
	 := make([]uint32, 0, 32*1024)
	.offsets.Store()
	return nil
}

func ( *Writer) () error {
	// We need to track the last two commit frames
	type  struct {
		         frameHeader
		     int64
		   int64
		 int
	}
	var ,  *

	 := make([]uint32, 0, 32*1024)

	,  := readThroughSegment(.wf, func( types.SegmentInfo,  frameHeader,  int64) (bool, error) {
		switch .typ {
		case FrameEntry:
			// Record the frame offset
			 = append(, uint32())

		case FrameIndex:
			// So this segment was sealed! (or attempted) keep track of this
			// indexStart in case it turns out the Seal actually committed completely.
			// We store the start of the actual array not the frame header.
			.writer.indexStart = uint64() + frameHeaderLen

		case FrameCommit:
			// The payload is not the length field in this case!
			 = 
			 = &{
				:         ,
				:     ,
				:   0,            // First commit includes the file header
				: len(), // Track how many entries were found up to this commit point.
			}
			if  != nil {
				. = . + frameHeaderLen
			}
		}
		return true, nil
	})
	if  != nil {
		return 
	}

	if  == nil {
		// There were no commit frames found at all. This segment file is
		// effectively empty. Init it that way ready for appending. This overwrites
		// the file header so it doesn't matter if it was valid or not.
		return .initEmpty()
	}

	// Assume that the final commit is good for now and set the writer state
	.writer.writeOffset = uint32(. + frameHeaderLen)

	// Just store what we have for now to ensure the defer doesn't panic we'll
	// probably update this below.
	.offsets.Store()

	// Whichever path we take, fix up the commitIdx before we leave
	defer func() {
		 := .getOffsets()
		if len() > 0 {
			// Non atomic is OK because this file is not visible to any other threads
			// yet.
			.commitIdx = .info.BaseIndex + uint64(len()) - 1
		}
	}()

	if . < len() {
		// Some entries were found after the last commit. Those must be a partial
		// write that was uncommitted so can be ignored. But the fact they were
		// written at all means that the last commit frame must have been completed
		// and acknowledged so we don't need to verify anything. Just truncate the
		// extra entries from index and reset the write cursor to continue appending
		// after the last commit.
		 = [:.]
		.offsets.Store()

		// Since at least one commit was found, the header better be valid!
		return validateFileHeader(*, .info)
	}

	// Last frame was a commit frame! Let's check that all the data written in
	// that commit frame made it to disk.
	// Verify the length first
	 := . - .
	// We know bufLen can't be bigger than the whole segment file because none of
	// the values above were read from the data just from the offsets we moved
	// through.
	 := make([]byte, )

	if ,  := .wf.ReadAt(, .);  != nil {
		return fmt.Errorf("failed to read last committed batch for CRC validation: %w", )
	}

	 := crc32.Checksum(, castagnoliTable)
	if  == ..crc {
		// All is good. We already setup the state we need for writer other than
		// offsets.
		.offsets.Store()

		// Since at least one commit was found, the header better be valid!
		return validateFileHeader(*, .info)
	}

	// Last commit was incomplete rewind back to the previous one or start of file
	if  == nil {
		// Init wil re-write the file header so it doesn't matter if it was corrupt
		// or not!
		return .initEmpty()
	}

	.writer.writeOffset = uint32(. + frameHeaderLen)
	 = [:.]
	.offsets.Store()

	// Since at least one commit was found, the header better be valid!
	return validateFileHeader(*, .info)
}

// Close implements io.Closer
func ( *Writer) () error {
	return .r.Close()
}

// GetLog implements types.SegmentReader
func ( *Writer) ( uint64,  *types.LogEntry) error {
	return .r.GetLog(, )
}

// Append adds one or more entries. It must not return until the entries are
// durably stored otherwise raft's guarantees will be compromised.
func ( *Writer) ( []types.LogEntry) error {
	if len() < 1 {
		return nil
	}

	if .writer.indexStart > 0 {
		return types.ErrSealed
	}

	// Iterate entries and append each one
	for ,  := range  {
		if  := .appendEntry();  != nil {
			return 
		}
	}

	 := .getOffsets()
	// Work out if we need to seal before we commit and sync.
	if (.writer.writeOffset + uint32(len(.writer.commitBuf)+indexFrameSize(len()))) > .info.SizeLimit {
		// Seal the segment! We seal it by writing an index frame before we commit.
		if  := .appendIndex();  != nil {
			return 
		}
	}

	// Write the commit frame
	if  := .appendCommit();  != nil {
		return 
	}

	// Commit in-memory
	atomic.StoreUint64(&.commitIdx, [len()-1].Index)
	return nil
}

func ( *Writer) () []uint32 {
	return .offsets.Load().([]uint32)
}

// OffsetForFrame implements tailWriter and allows readers to lookup entry
// frames in the tail's in-memory index.
func ( *Writer) ( uint64) (uint32, error) {
	if  < .info.BaseIndex ||  < .info.MinIndex ||  > .LastIndex() {
		return 0, types.ErrNotFound
	}
	 := .getOffsets()
	 :=  - .info.BaseIndex
	// No bounds check on entryIndex since LastIndex must ensure it's in bounds.
	return [], nil
}

func ( *Writer) ( types.LogEntry) error {
	 := .getOffsets()

	// Check the invariant that this entry is the next one we expect otherwise our
	// index logic is incorrect and will result in panics on read.
	if .Index != .info.BaseIndex+uint64(len()) {
		return fmt.Errorf("non-monotonic append to segment with BaseIndex=%d. Entry index %d, expected %d",
			.info.BaseIndex, .Index, .info.BaseIndex+uint64(len()))
	}

	 := frameHeader{
		typ: FrameEntry,
		len: uint32(len(.Data)),
	}
	,  := .appendFrame(, .Data)
	if  != nil {
		return 
	}
	// Update the offsets index

	// Add the index entry. Note this is safe despite mutating the same backing
	// array as tail because it's beyond the limit current readers will access
	// until we do the atomic update below. Even if append re-allocates the
	// backing array, it will only read the indexes smaller than numEntries from
	// the old array to copy them into the new one and we are not mutating the
	// same memory locations. Old readers might still be looking at the old
	// array (lower than numEntries) through the current tail.offsets slice but
	// we are not touching that at least below numEntries.
	 = append(, .writer.writeOffset+uint32())

	// Now we can make it available to readers. Note that readers still
	// shouldn't read it until we actually commit to disk (and increment
	// commitIdx) but it's race free for them to now!
	.offsets.Store()
	return nil
}

func ( *Writer) () error {
	 := frameHeader{
		typ: FrameCommit,
		crc: .writer.crc,
	}
	if ,  := .appendFrame(, nil);  != nil {
		return 
	}

	// Flush all writes to the file
	if  := .sync();  != nil {
		return 
	}

	// Finally, reset crc so that by the time we write the next trailer
	// we'll know where the append batch started.
	.writer.crc = 0
	return nil
}

func ( *Writer) ( int) {
	 := len(.writer.commitBuf) + 
	if cap(.writer.commitBuf) <  {
		 := minBufSize
		// Double buffer size until it's big enough to amortize cost
		for  <  {
			 =  * 2
		}
		 := make([]byte, )
		 := len(.writer.commitBuf)
		copy(, .writer.commitBuf)
		.writer.commitBuf = [:]
	}
}

func ( *Writer) () error {
	// Append the index record before we commit (commit and flush happen later
	// generally)
	 := .getOffsets()
	 := indexFrameSize(len())
	.ensureBufCap()

	 := len(.writer.commitBuf)

	if  := writeIndexFrame(.writer.commitBuf[:+], );  != nil {
		return 
	}
	.writer.commitBuf = .writer.commitBuf[:+]

	// Update crc with those values
	.writer.crc = crc32.Update(.writer.crc, castagnoliTable, .writer.commitBuf[:+])

	// Record the file offset where the index starts (the actual index data so
	// after the frame header).
	.writer.indexStart = uint64(.writer.writeOffset) + uint64(+frameHeaderLen)
	return nil
}

// appendFrame appends the given frame to the current block. The frame must fit
// already otherwise an error will be returned.
func ( *Writer) ( frameHeader,  []byte) (int, error) {
	// Encode frame header into current block buffer
	 := encodedFrameSize(len())
	.ensureBufCap()

	 := len(.writer.commitBuf)
	if  := writeFrame(.writer.commitBuf[:+], , );  != nil {
		return 0, 
	}
	// Update len of commitBuf since we resliced it for the write
	.writer.commitBuf = .writer.commitBuf[:+]

	// Update the CRC
	.writer.crc = crc32.Update(.writer.crc, castagnoliTable, .writer.commitBuf[:+])
	return , nil
}

func ( *Writer) () error {
	// Write to file
	,  := .wf.WriteAt(.writer.commitBuf, int64(.writer.writeOffset))
	if  == io.EOF &&  == len(.writer.commitBuf) {
		// Writer may return EOF even if it wrote all bytes if it wrote right up to
		// the end of the file. Ignore that case though.
		 = nil
	}
	if  != nil {
		return 
	}

	// Reset writer state ready for next writes
	.writer.writeOffset += uint32(len(.writer.commitBuf))
	.writer.commitBuf = .writer.commitBuf[:0]
	return nil
}

func ( *Writer) () error {
	// Write out current buffer to file
	if  := .flush();  != nil {
		return 
	}

	// Sync file
	if  := .wf.Sync();  != nil {
		return 
	}

	// Update commitIdx atomically
	 := .getOffsets()
	 := uint64(0)
	if len() > 0 {
		// Probably not possible for the to be less, but just in case we ever flush
		// the file with only meta data written...
		 = uint64(.info.BaseIndex) + uint64(len()) - 1
	}
	atomic.StoreUint64(&.commitIdx, )
	return nil
}

// Sealed returns whether the segment is sealed or not. If it is it returns
// true and the file offset that it's index array starts at to be saved in
// meta data. WAL will call this after every append so it should be relatively
// cheap in the common case. This design allows the final Append to write out
// the index or any additional data needed at seal time in the same fsync.
func ( *Writer) () (bool, uint64, error) {
	if .writer.indexStart == 0 {
		return false, 0, nil
	}
	return true, .writer.indexStart, nil
}

// LastIndex returns the most recently persisted index in the log. It must
// respond without blocking on append since it's needed frequently by read
// paths that may call it concurrently. Typically this will be loaded from an
// atomic int. If the segment is empty lastIndex should return zero.
func ( *Writer) () uint64 {
	return atomic.LoadUint64(&.commitIdx)
}

func readThroughSegment( types.ReadableFile,  func( types.SegmentInfo,  frameHeader,  int64) (bool, error)) (*types.SegmentInfo, error) {
	// First read the file header. Note we wrote it as part of the first commit so
	// it may be missing or partial written and that's OK as long as there are no
	// other later commit frames!
	var  [fileHeaderLen]byte
	,  := .ReadAt([:], 0)
	// EOF is ok - the file might be empty if we crashed before committing
	// anything and preallocation isn't supported.
	if  != io.EOF &&  != nil {
		return nil, 
	}

	,  := readFileHeader([:])
	if  == types.ErrCorrupt {
		// Header is malformed or missing, don't error yet though we'll detect it
		// later when we know if it's a problem or not.
		 = nil
	}
	if  != nil {
		return nil, 
	}
	// If header wasn't detected as corrupt, it might still be just in a way
	// that's valid since we've not verified it against the expected metadata yet.
	// We'll wait to see if the header was part of the last commit before decide
	// if we should validate it for corruption or not though. For now just make
	// sure it's not nil so we don't have to handle nil checks everywhere.
	if  == nil {
		// Zero info will fail validation against the actual metadata if it was
		// corrupt when it shouldn't be later. Just prevents a nil panic.
		 = &types.SegmentInfo{}
	}

	// Read through file from after header until we hit zeros, EOF or corrupt
	// frames.
	 := int64(fileHeaderLen)
	var  [frameHeaderLen]byte

	for {
		,  := .ReadAt([:], )
		if  == io.EOF {
			if  < frameHeaderLen {
				return , nil
			}
			// This is OK! The last frame in file might be a commit frame so as long
			// as we have it all then we can ignore the EOF for this iteration.
			 = nil
		}
		if  != nil {
			return , fmt.Errorf("failed reading frame at offset=%d: %w", , )
		}
		,  := readFrameHeader([:frameHeaderLen])
		if  != nil {
			// This is not actually an error case. If we failed to decode it could be
			// because of a torn write (since we don't assume writes are atomic). We
			// assume that previously committed data is not silently corrupted by the
			// FS (see README for details). So this must be due to corruption that
			// happened due to non-atomic sector updates whilst committing the last
			// write batch.
			return , nil
		}
		if .typ == FrameInvalid {
			// This means we've hit zeros at the end of the file (or due to an
			// incomplete write, which we treat the same way).
			return , nil
		}

		// Call the callback
		,  := (*, , )
		if  != nil {
			return , 
		}
		if ! {
			return , nil
		}

		// Skip to next frame
		 += int64(encodedFrameSize(int(.len)))
	}
}