// Copyright (c) HashiCorp, Inc
// SPDX-License-Identifier: MPL-2.0

package segment

import (
	
	
	
	

	
)

const (
	segmentFileSuffix      = ".wal"
	segmentFileNamePattern = "%020d-%016x" + segmentFileSuffix
)

// Filer implements the abstraction for managing a set of segment files in a
// directory. It uses a VFS to abstract actual file system operations for easier
// testing.
type Filer struct {
	dir string
	vfs types.VFS
}

// NewFiler creates a Filer ready for use.
func ( string,  types.VFS) *Filer {
	return &Filer{
		dir: ,
		vfs: ,
	}
}

// FileName returns the formatted file name expected for this segment.
// SegmentFiler implementations could choose to ignore this but it's here to
func ( types.SegmentInfo) string {
	return fmt.Sprintf(segmentFileNamePattern, .BaseIndex, .ID)
}

// Create adds a new segment with the given info and returns a writer or an
// error.
func ( *Filer) ( types.SegmentInfo) (types.SegmentWriter, error) {
	if .BaseIndex == 0 {
		return nil, fmt.Errorf("BaseIndex must be greater than zero")
	}
	 := FileName()

	,  := .vfs.Create(.dir, , uint64(.SizeLimit))
	if  != nil {
		return nil, 
	}

	return createFile(, )
}

// RecoverTail is called on an unsealed segment when re-opening the WAL it will
// attempt to recover from a possible crash. It will either return an error, or
// return a valid segmentWriter that is ready for further appends. If the
// expected tail segment doesn't exist it must return an error wrapping
// os.ErrNotExist.
func ( *Filer) ( types.SegmentInfo) (types.SegmentWriter, error) {
	 := FileName()

	,  := .vfs.OpenWriter(.dir, )
	if  != nil {
		return nil, 
	}

	return recoverFile(, )
}

// Open an already sealed segment for reading. Open may validate the file's
// header and return an error if it doesn't match the expected info.
func ( *Filer) ( types.SegmentInfo) (types.SegmentReader, error) {
	 := FileName()

	,  := .vfs.OpenReader(.dir, )
	if  != nil {
		return nil, 
	}

	// Validate header here since openReader is re-used by writer where it's valid
	// for the file header not to be committed yet after a crash so we can't check
	// it there.
	var  [fileHeaderLen]byte

	if ,  := .ReadAt([:], 0);  != nil {
		if errors.Is(, io.EOF) {
			// Treat failure to read a header as corruption since a sealed file should
			// never not have a valid header. (I.e. even if crashes happen it should
			// be impossible to seal a segment with no header written so this
			// indicates that something truncated the file after the fact)
			return nil, fmt.Errorf("%w: failed to read header: %s", types.ErrCorrupt, )
		}
		return nil, 
	}

	,  := readFileHeader([:])
	if  != nil {
		return nil, 
	}

	if  := validateFileHeader(*, );  != nil {
		return nil, 
	}

	return openReader(, )
}

// List returns the set of segment IDs currently stored. It's used by the WAL
// on recovery to find any segment files that need to be deleted following a
// unclean shutdown. The returned map is a map of ID -> BaseIndex. BaseIndex
// is returned to allow subsequent Delete calls to be made.
func ( *Filer) () (map[uint64]uint64, error) {
	, ,  := .listInternal()
	return , 
}

func ( *Filer) () (map[uint64]uint64, []uint64, error) {
	,  := .vfs.ListDir(.dir)
	if  != nil {
		return nil, nil, 
	}

	 := make(map[uint64]uint64)
	 := make([]uint64, 0)
	for ,  := range  {
		if !strings.HasSuffix(, segmentFileSuffix) {
			continue
		}
		// Parse BaseIndex and ID from the file name
		var ,  uint64
		,  := fmt.Sscanf(, segmentFileNamePattern, &, &)
		if  != nil {
			return nil, nil, types.ErrCorrupt
		}
		if  != 2 {
			// Misnamed segment files with the right suffix indicates a bug or
			// tampering, we can't be sure what's happened to the data.
			return nil, nil, types.ErrCorrupt
		}
		[] = 
		 = append(, )
	}

	return , , nil
}

// Delete removes the segment with given baseIndex and id if it exists. Note
// that baseIndex is technically redundant since ID is unique on it's own. But
// in practice we name files (or keys) with both so that they sort correctly.
// This interface allows a  simpler implementation where we can just delete
// the file if it exists without having to scan the underlying storage for a.
func ( *Filer) ( uint64,  uint64) error {
	 := fmt.Sprintf(segmentFileNamePattern, , )
	return .vfs.Delete(.dir, )
}

// DumpSegment attempts to read the segment file specified by the baseIndex and
// ID. It's intended purpose is for debugging the contents of segment files and
// unlike the SegmentFiler interface, it doesn't assume the caller has access to
// the correct metadata. This allows dumping log segments in a WAL that is still
// being written to by another process. Without metadata we don't know if the
// file is sealed so always recover by reading through the whole file. If after
// or before are non-zero, the specify a exclusive lower or upper bound on which
// log entries should be emitted. No error checking is done on the read data. fn
// is called for each entry passing the raft info read from the file header (so
// that the caller knows which codec to use for example) the raft index of the
// entry and the raw bytes of the entry itself. The callback must return true to
// continue reading. The data slice is only valid for the lifetime of the call.
func ( *Filer) ( uint64,  uint64, ,  uint64,  func( types.SegmentInfo,  types.LogEntry) (bool, error)) error {
	 := fmt.Sprintf(segmentFileNamePattern, , )

	,  := .vfs.OpenReader(.dir, )
	if  != nil {
		return 
	}

	 := make([]byte, 64*1024)
	 := 

	type  struct {
		  uint64
		 int64
		    uint32
	}
	var  []

	_,  = readThroughSegment(, func( types.SegmentInfo,  frameHeader,  int64) (bool, error) {
		if .typ == FrameCommit {
			// All the previous entries have been committed. Read them and send up to
			// caller.
			for ,  := range  {
				// Check the header is reasonable
				if . > MaxEntrySize {
					return false, fmt.Errorf("failed to read entry idx=%d, frame header length (%d) is too big: %w",
						., ., )
				}

				if . > uint32(len()) {
					 = make([]byte, .)
				}

				,  := .ReadAt([:.], .+frameHeaderLen)
				if  != nil {
					return false, 
				}
				if uint32() < . {
					return false, io.ErrUnexpectedEOF
				}

				,  := (, types.LogEntry{Index: ., Data: [:]})
				if ! ||  != nil {
					return , 
				}
			}
			// Reset batch
			 = [:0]
			return true, nil
		}

		if .typ != FrameEntry {
			return true, nil
		}

		if  <=  {
			// Not in the range we care about, skip reading the entry.
			++
			return true, nil
		}
		if  > 0 &&  >=  {
			// We're done
			return false, nil
		}

		 = append(, {, , .len})
		++
		return true, nil
	})

	return 
}

// DumpLogs attempts to read all log entries from segment files in the directory
// for debugging purposes. It does _not_ use the metadata and so may output log
// entries that are uncommitted or already truncated as far as the writing
// process is concerned. As such it should not be used for replication of data.
// It is useful though to debug the contents of the log even while the writing
// application is still running. After and before if non-zero specify exclusive
// bounds on the logs that should be returned which may allow the implementation
// to skip reading entire segment files that are not in the range.
func ( *Filer) (,  uint64,  func( types.SegmentInfo,  types.LogEntry) (bool, error)) error {
	, ,  := .listInternal()
	if  != nil {
		return 
	}

	for ,  := range  {
		 := []
		 := uint64(0)
		if +1 < len() {
			// This is not the last segment, peek at the base index of that one and
			// assume that this segment won't contain indexes that high.
			 = [[+1]]
		}
		// See if this file contains any indexes in the range
		if  > 0 &&  > 0 &&  >=  {
			// This segment is all indexes before the lower bound we care about
			continue
		}
		if  > 0 &&  <=  {
			// This segment is all indexes higher than the upper bound. We've output
			// every log in the range at this point (barring edge cases where we race
			// with a truncation which leaves multiple generations of segment files on
			// disk which we are going to ignore for now).
			return nil
		}

		// We probably care about at least some of the entries in this segment
		 := .DumpSegment(, , , , )
		if  != nil {
			return 
		}
	}

	return nil
}