/*
 * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
 * SPDX-License-Identifier: Apache-2.0
 */

package y

import (
	
	
	stderrors 
	
	
	
	
	
	
	
	
	
	

	
	
)

var (
	// ErrEOF indicates an end of file when trying to read from a memory mapped file
	// and encountering the end of slice.
	ErrEOF = stderrors.New("ErrEOF: End of file")

	// ErrCommitAfterFinish indicates that write batch commit was called after
	// finish
	ErrCommitAfterFinish = stderrors.New("Batch commit not permitted after finish")
)

type Flags int

const (
	// Sync indicates that O_DSYNC should be set on the underlying file,
	// ensuring that data writes do not return until the data is flushed
	// to disk.
	Sync Flags = 1 << iota
	// ReadOnly opens the underlying file on a read-only basis.
	ReadOnly
)

var (
	// This is O_DSYNC (datasync) on platforms that support it -- see file_unix.go
	datasyncFileFlag = 0x0

	// CastagnoliCrcTable is a CRC32 polynomial table
	CastagnoliCrcTable = crc32.MakeTable(crc32.Castagnoli)
)

// OpenExistingFile opens an existing file, errors if it doesn't exist.
func ( string,  Flags) (*os.File, error) {
	 := os.O_RDWR
	if &ReadOnly != 0 {
		 = os.O_RDONLY
	}

	if &Sync != 0 {
		 |= datasyncFileFlag
	}
	return os.OpenFile(, , 0)
}

// CreateSyncedFile creates a new file (using O_EXCL), errors if it already existed.
func ( string,  bool) (*os.File, error) {
	 := os.O_RDWR | os.O_CREATE | os.O_EXCL
	if  {
		 |= datasyncFileFlag
	}
	return os.OpenFile(, , 0600)
}

// OpenSyncedFile creates the file if one doesn't exist.
func ( string,  bool) (*os.File, error) {
	 := os.O_RDWR | os.O_CREATE
	if  {
		 |= datasyncFileFlag
	}
	return os.OpenFile(, , 0600)
}

// OpenTruncFile opens the file with O_RDWR | O_CREATE | O_TRUNC
func ( string,  bool) (*os.File, error) {
	 := os.O_RDWR | os.O_CREATE | os.O_TRUNC
	if  {
		 |= datasyncFileFlag
	}
	return os.OpenFile(, , 0600)
}

// SafeCopy does append(a[:0], src...).
func (,  []byte) []byte {
	 := append([:0], ...)
	if  == nil {
		return []byte{}
	}
	return 
}

// Copy copies a byte slice and returns the copied slice.
func ( []byte) []byte {
	 := make([]byte, len())
	copy(, )
	return 
}

// KeyWithTs generates a new key by appending ts to key.
func ( []byte,  uint64) []byte {
	 := make([]byte, len()+8)
	copy(, )
	binary.BigEndian.PutUint64([len():], math.MaxUint64-)
	return 
}

// ParseTs parses the timestamp from the key bytes.
func ( []byte) uint64 {
	if len() <= 8 {
		return 0
	}
	return math.MaxUint64 - binary.BigEndian.Uint64([len()-8:])
}

// CompareKeys checks the key without timestamp and checks the timestamp if keyNoTs
// is same.
// a<timestamp> would be sorted higher than aa<timestamp> if we use bytes.compare
// All keys should have timestamp.
func (,  []byte) int {
	if  := bytes.Compare([:len()-8], [:len()-8]);  != 0 {
		return 
	}
	return bytes.Compare([len()-8:], [len()-8:])
}

// ParseKey parses the actual key from the key bytes.
func ( []byte) []byte {
	if len() < 8 {
		return nil
	}

	return [:len()-8]
}

// SameKey checks for key equality ignoring the version timestamp suffix.
func (,  []byte) bool {
	if len() != len() {
		return false
	}
	return bytes.Equal(ParseKey(), ParseKey())
}

// Slice holds a reusable buf, will reallocate if you request a larger size than ever before.
// One problem is with n distinct sizes in random order it'll reallocate log(n) times.
type Slice struct {
	buf []byte
}

// Resize reuses the Slice's buffer (or makes a new one) and returns a slice in that buffer of
// length sz.
func ( *Slice) ( int) []byte {
	if cap(.buf) <  {
		.buf = make([]byte, )
	}
	return .buf[0:]
}

// FixedDuration returns a string representation of the given duration with the
// hours, minutes, and seconds.
func ( time.Duration) string {
	 := fmt.Sprintf("%02ds", int(.Seconds())%60)
	if  >= time.Minute {
		 = fmt.Sprintf("%02dm", int(.Minutes())%60) + 
	}
	if  >= time.Hour {
		 = fmt.Sprintf("%02dh", int(.Hours())) + 
	}
	return 
}

// Throttle allows a limited number of workers to run at a time. It also
// provides a mechanism to check for errors encountered by workers and wait for
// them to finish.
type Throttle struct {
	once      sync.Once
	wg        sync.WaitGroup
	ch        chan struct{}
	errCh     chan error
	finishErr error
}

// NewThrottle creates a new throttle with a max number of workers.
func ( int) *Throttle {
	return &Throttle{
		ch:    make(chan struct{}, ),
		errCh: make(chan error, ),
	}
}

// Do should be called by workers before they start working. It blocks if there
// are already maximum number of workers working. If it detects an error from
// previously Done workers, it would return it.
func ( *Throttle) () error {
	for {
		select {
		case .ch <- struct{}{}:
			.wg.Add(1)
			return nil
		case  := <-.errCh:
			if  != nil {
				return 
			}
		}
	}
}

// Done should be called by workers when they finish working. They can also
// pass the error status of work done.
func ( *Throttle) ( error) {
	if  != nil {
		.errCh <- 
	}
	select {
	case <-.ch:
	default:
		panic("Throttle Do Done mismatch")
	}
	.wg.Done()
}

// Finish waits until all workers have finished working. It would return any error passed by Done.
// If Finish is called multiple time, it will wait for workers to finish only once(first time).
// From next calls, it will return same error as found on first call.
func ( *Throttle) () error {
	.once.Do(func() {
		.wg.Wait()
		close(.ch)
		close(.errCh)
		for  := range .errCh {
			if  != nil {
				.finishErr = 
				return
			}
		}
	})

	return .finishErr
}

// U16ToBytes converts the given Uint16 to bytes
func ( uint16) []byte {
	var  [2]byte
	binary.BigEndian.PutUint16([:], )
	return [:]
}

// BytesToU16 converts the given byte slice to uint16
func ( []byte) uint16 {
	return binary.BigEndian.Uint16()
}

// U32ToBytes converts the given Uint32 to bytes
func ( uint32) []byte {
	var  [4]byte
	binary.BigEndian.PutUint32([:], )
	return [:]
}

// BytesToU32 converts the given byte slice to uint32
func ( []byte) uint32 {
	return binary.BigEndian.Uint32()
}

// U32SliceToBytes converts the given Uint32 slice to byte slice
func ( []uint32) []byte {
	if len() == 0 {
		return nil
	}
	var  []byte
	 := (*reflect.SliceHeader)(unsafe.Pointer(&))
	.Len = len() * 4
	.Cap = .Len
	.Data = uintptr(unsafe.Pointer(&[0]))
	return 
}

// BytesToU32Slice converts the given byte slice to uint32 slice
func ( []byte) []uint32 {
	if len() == 0 {
		return nil
	}
	var  []uint32
	 := (*reflect.SliceHeader)(unsafe.Pointer(&))
	.Len = len() / 4
	.Cap = .Len
	.Data = uintptr(unsafe.Pointer(&[0]))
	return 
}

// U64ToBytes converts the given Uint64 to bytes
func ( uint64) []byte {
	var  [8]byte
	binary.BigEndian.PutUint64([:], )
	return [:]
}

// BytesToU64 converts the given byte slice to uint64
func ( []byte) uint64 {
	return binary.BigEndian.Uint64()
}

// U64SliceToBytes converts the given Uint64 slice to byte slice
func ( []uint64) []byte {
	if len() == 0 {
		return nil
	}
	var  []byte
	 := (*reflect.SliceHeader)(unsafe.Pointer(&))
	.Len = len() * 8
	.Cap = .Len
	.Data = uintptr(unsafe.Pointer(&[0]))
	return 
}

// BytesToU64Slice converts the given byte slice to uint64 slice
func ( []byte) []uint64 {
	if len() == 0 {
		return nil
	}
	var  []uint64
	 := (*reflect.SliceHeader)(unsafe.Pointer(&))
	.Len = len() / 8
	.Cap = .Len
	.Data = uintptr(unsafe.Pointer(&[0]))
	return 
}

// page struct contains one underlying buffer.
type page struct {
	buf []byte
}

// PageBuffer consists of many pages. A page is a wrapper over []byte. PageBuffer can act as a
// replacement of bytes.Buffer. Instead of having single underlying buffer, it has multiple
// underlying buffers. Hence it avoids any copy during relocation(as happens in bytes.Buffer).
// PageBuffer allocates memory in pages. Once a page is full, it will allocate page with double the
// size of previous page. Its function are not thread safe.
type PageBuffer struct {
	pages []*page

	length       int // Length of PageBuffer.
	nextPageSize int // Size of next page to be allocated.
}

// NewPageBuffer returns a new PageBuffer with first page having size pageSize.
func ( int) *PageBuffer {
	 := &PageBuffer{}
	.pages = append(.pages, &page{buf: make([]byte, 0, )})
	.nextPageSize =  * 2
	return 
}

// Write writes data to PageBuffer b. It returns number of bytes written and any error encountered.
func ( *PageBuffer) ( []byte) (int, error) {
	 := len()
	for {
		 := .pages[len(.pages)-1] // Current page.

		 := copy(.buf[len(.buf):cap(.buf)], )
		.buf = .buf[:len(.buf)+]
		.length += 

		if len() ==  {
			break
		}
		 = [:]

		.pages = append(.pages, &page{buf: make([]byte, 0, .nextPageSize)})
		.nextPageSize *= 2
	}

	return , nil
}

// WriteByte writes data byte to PageBuffer and returns any encountered error.
func ( *PageBuffer) ( byte) error {
	,  := .Write([]byte{})
	return 
}

// Len returns length of PageBuffer.
func ( *PageBuffer) () int {
	return .length
}

// pageForOffset returns pageIdx and startIdx for the offset.
func ( *PageBuffer) ( int) (int, int) {
	AssertTrue( < .length)

	var , ,  int
	for  := 0;  < len(.pages); ++ {
		 := .pages[]

		if +len(.buf)-1 <  {
			 += len(.buf)
		} else {
			 = 
			 =  - 
			break
		}
	}

	return , 
}

// Truncate truncates PageBuffer to length n.
func ( *PageBuffer) ( int) {
	,  := .pageForOffset()
	// For simplicity of the code reject extra pages. These pages can be kept.
	.pages = .pages[:+1]
	 := .pages[len(.pages)-1]
	.buf = .buf[:]
	.length = 
}

// Bytes returns whole Buffer data as single []byte.
func ( *PageBuffer) () []byte {
	 := make([]byte, .length)
	 := 0
	for  := 0;  < len(.pages); ++ {
		 += copy([:], .pages[].buf)
	}

	return 
}

// WriteTo writes whole buffer to w. It returns number of bytes written and any error encountered.
func ( *PageBuffer) ( io.Writer) (int64, error) {
	 := int64(0)
	for  := 0;  < len(.pages); ++ {
		,  := .Write(.pages[].buf)
		 += int64()
		if  != nil {
			return , 
		}
	}

	return , nil
}

// NewReaderAt returns a reader which starts reading from offset in page buffer.
func ( *PageBuffer) ( int) *PageBufferReader {
	,  := .pageForOffset()

	return &PageBufferReader{
		buf:      ,
		pageIdx:  ,
		startIdx: ,
	}
}

// PageBufferReader is a reader for PageBuffer.
type PageBufferReader struct {
	buf      *PageBuffer // Underlying page buffer.
	pageIdx  int         // Idx of page from where it will start reading.
	startIdx int         // Idx inside page - buf.pages[pageIdx] from where it will start reading.
}

// Read reads upto len(p) bytes. It returns number of bytes read and any error encountered.
func ( *PageBufferReader) ( []byte) (int, error) {
	// Check if there is enough to Read.
	 := len(.buf.pages)

	 := 0
	for .pageIdx <  &&  < len() {
		 := .buf.pages[.pageIdx] // Current Page.
		 := len(.buf)        // Last Idx up to which we can read from this page.

		 := copy([:], .buf[.startIdx:])
		 += 
		.startIdx += 

		// Instead of len(cp.buf), we comparing with cap(cp.buf). This ensures that we move to next
		// page only when we have read all data. Reading from last page is an edge case. We don't
		// want to move to next page until last page is full to its capacity.
		if .startIdx >= cap(.buf) {
			// We should move to next page.
			.pageIdx++
			.startIdx = 0
			continue
		}

		// When last page in not full to its capacity and we have read all data up to its
		// length, just break out of the loop.
		if .pageIdx == -1 {
			break
		}
	}

	if  == 0 && len() > 0 {
		return , io.EOF
	}

	return , nil
}

const kvsz = int(unsafe.Sizeof(pb.KV{}))

func ( *z.Allocator) *pb.KV {
	if  == nil {
		return &pb.KV{}
	}
	 := .AllocateAligned(kvsz)
	return (*pb.KV)(unsafe.Pointer(&[0]))
}

// IBytesToString converts size in bytes to human readable format.
// The code is taken from humanize library and changed to provide
// value upto custom decimal precision.
// IBytesToString(12312412, 1) -> 11.7 MiB
func ( uint64,  int) string {
	 := []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"}
	 := float64(1024)
	if  < 10 {
		return fmt.Sprintf("%d B", )
	}
	 := math.Floor(math.Log(float64()) / math.Log())
	 := [int()]
	 := float64() / math.Pow(, )
	 := "%." + strconv.Itoa() + "f %s"

	return fmt.Sprintf(, , )
}

type RateMonitor struct {
	start       time.Time
	lastSent    uint64
	lastCapture time.Time
	rates       []float64
	idx         int
}

func ( int) *RateMonitor {
	return &RateMonitor{
		start: time.Now(),
		rates: make([]float64, ),
	}
}

const minRate = 0.0001

// Capture captures the current number of sent bytes. This number should be monotonically
// increasing.
func ( *RateMonitor) ( uint64) {
	 :=  - .lastSent
	 := time.Since(.lastCapture)
	.lastCapture, .lastSent = time.Now(), 

	 := float64() / .Seconds()
	if  < minRate {
		 = minRate
	}
	.rates[.idx] = 
	.idx = (.idx + 1) % len(.rates)
}

// Rate returns the average rate of transmission smoothed out by the number of samples.
func ( *RateMonitor) () uint64 {
	var  float64
	var  float64
	for ,  := range .rates {
		if  < minRate {
			// Ignore this. We always set minRate, so this is a zero.
			// Typically at the start of the rate monitor, we'd have zeros.
			continue
		}
		 += 
		 += 1.0
	}
	if  < minRate {
		return 0
	}
	return uint64( / )
}