package parquet

import (
	
	
	
	
	
)

// MergeRowGroups constructs a row group which is a merged view of rowGroups. If
// rowGroups are sorted and the passed options include sorting, the merged row
// group will also be sorted.
//
// The function validates the input to ensure that the merge operation is
// possible, ensuring that the schemas match or can be converted to an
// optionally configured target schema passed as argument in the option list.
//
// The sorting columns of each row group are also consulted to determine whether
// the output can be represented. If sorting columns are configured on the merge
// they must be a prefix of sorting columns of all row groups being merged.
func ( []RowGroup,  ...RowGroupOption) (RowGroup, error) {
	,  := NewRowGroupConfig(...)
	if  != nil {
		return nil, 
	}

	 := .Schema
	if len() == 0 {
		return newEmptyRowGroup(), nil
	}
	if  == nil {
		 = [0].Schema()

		for ,  := range [1:] {
			if !nodesAreEqual(, .Schema()) {
				return nil, ErrRowGroupSchemaMismatch
			}
		}
	}

	 := make([]RowGroup, len())
	copy(, )

	for ,  := range  {
		if  := .Schema(); !nodesAreEqual(, ) {
			,  := Convert(, )
			if  != nil {
				return nil, fmt.Errorf("cannot merge row groups: %w", )
			}
			[] = ConvertRowGroup(, )
		}
	}

	 := &mergedRowGroup{sorting: .Sorting.SortingColumns}
	.init(, )

	if len(.sorting) == 0 {
		// When the row group has no ordering, use a simpler version of the
		// merger which simply concatenates rows from each of the row groups.
		// This is preferable because it makes the output deterministic, the
		// heap merge may otherwise reorder rows across groups.
		return &.multiRowGroup, nil
	}

	for ,  := range .rowGroups {
		if !sortingColumnsHavePrefix(.SortingColumns(), .sorting) {
			return nil, ErrRowGroupSortingColumnsMismatch
		}
	}

	.compare = compareRowsFuncOf(, .sorting)
	return , nil
}

type mergedRowGroup struct {
	multiRowGroup
	sorting []SortingColumn
	compare func(Row, Row) int
}

func ( *mergedRowGroup) () []SortingColumn {
	return .sorting
}

func ( *mergedRowGroup) () Rows {
	// The row group needs to respect a sorting order; the merged row reader
	// uses a heap to merge rows from the row groups.
	 := make([]Rows, len(.rowGroups))
	for  := range  {
		[] = .rowGroups[].Rows()
	}
	return &mergedRowGroupRows{
		merge: mergedRowReader{
			compare: .compare,
			readers: makeBufferedRowReaders(len(), func( int) RowReader { return [] }),
		},
		rows:   ,
		schema: .schema,
	}
}

type mergedRowGroupRows struct {
	merge     mergedRowReader
	rowIndex  int64
	seekToRow int64
	rows      []Rows
	schema    *Schema
}

func ( *mergedRowGroupRows) ( RowWriter) ( int64,  error) {
	 := newMergeBuffer()
	.setup(.rows, .merge.compare)
	,  = .WriteRowsTo()
	.rowIndex += int64()
	.release()
	return
}

func ( *mergedRowGroupRows) ( []Row) (int, error) {
	,  := .merge.ReadRows()
	.rowIndex += int64()
	return , 
}

func ( *mergedRowGroupRows) () ( error) {
	.merge.close()
	.rowIndex = 0
	.seekToRow = 0

	for ,  := range .rows {
		if  := .Close();  != nil {
			 = 
		}
	}

	return 
}

func ( *mergedRowGroupRows) ( []Row) (int, error) {
	for .rowIndex < .seekToRow {
		 := int(.seekToRow - .rowIndex)
		if  > len() {
			 = len()
		}
		,  := .readInternal([:])
		if  != nil {
			return 0, 
		}
	}

	return .readInternal()
}

func ( *mergedRowGroupRows) ( int64) error {
	if  >= .rowIndex {
		.seekToRow = 
		return nil
	}
	return fmt.Errorf("SeekToRow: merged row reader cannot seek backward from row %d to %d", .rowIndex, )
}

func ( *mergedRowGroupRows) () *Schema {
	return .schema
}

// MergeRowReader constructs a RowReader which creates an ordered sequence of
// all the readers using the given compare function as the ordering predicate.
func ( []RowReader,  func(Row, Row) int) RowReader {
	return &mergedRowReader{
		compare: ,
		readers: makeBufferedRowReaders(len(), func( int) RowReader { return [] }),
	}
}

func makeBufferedRowReaders( int,  func(int) RowReader) []*bufferedRowReader {
	 := make([]bufferedRowReader, )
	 := make([]*bufferedRowReader, )

	for  := range  {
		[].rows = ()
		[] = &[]
	}

	return 
}

type mergedRowReader struct {
	compare     func(Row, Row) int
	readers     []*bufferedRowReader
	initialized bool
}

func ( *mergedRowReader) () error {
	for ,  := range .readers {
		switch  := .read();  {
		case nil:
		case io.EOF:
			.readers[] = nil
		default:
			.readers = nil
			return 
		}
	}

	 := 0
	for ,  := range .readers {
		if  != nil {
			.readers[] = 
			++
		}
	}

	 := .readers[:]
	for  := range  {
		[] = nil
	}

	.readers = .readers[:]
	heap.Init()
	return nil
}

func ( *mergedRowReader) () {
	for ,  := range .readers {
		.close()
	}
	.readers = nil
}

func ( *mergedRowReader) ( []Row) ( int,  error) {
	if !.initialized {
		.initialized = true

		if  := .initialize();  != nil {
			return 0, 
		}
	}

	for  < len() && len(.readers) != 0 {
		 := .readers[0]
		if .end == .off { // This readers buffer has been exhausted, repopulate it.
			if  := .read();  != nil {
				if  == io.EOF {
					heap.Pop()
					continue
				}
				return , 
			} else {
				heap.Fix(, 0)
				continue
			}
		}

		[] = append([][:0], .head()...)
		++

		if  := .next();  != nil {
			if  != io.EOF {
				return , 
			}
			return , nil
		} else {
			heap.Fix(, 0)
		}
	}

	if len(.readers) == 0 {
		 = io.EOF
	}

	return , 
}

func ( *mergedRowReader) (,  int) bool {
	return .compare(.readers[].head(), .readers[].head()) < 0
}

func ( *mergedRowReader) () int {
	return len(.readers)
}

func ( *mergedRowReader) (,  int) {
	.readers[], .readers[] = .readers[], .readers[]
}

func ( *mergedRowReader) ( interface{}) {
	panic("NOT IMPLEMENTED")
}

func ( *mergedRowReader) () interface{} {
	 := len(.readers) - 1
	 := .readers[]
	.readers = .readers[:]
	return 
}

type bufferedRowReader struct {
	rows RowReader
	off  int32
	end  int32
	buf  [10]Row
}

func ( *bufferedRowReader) () Row {
	return .buf[.off]
}

func ( *bufferedRowReader) () error {
	if .off++; .off == .end {
		.off = 0
		.end = 0
		// We need to read more rows, however it is unsafe to do so here because we haven't
		// returned the current rows to the caller yet which may cause buffer corruption.
		return io.EOF
	}
	return nil
}

func ( *bufferedRowReader) () error {
	if .rows == nil {
		return io.EOF
	}
	,  := .rows.ReadRows(.buf[.end:])
	if  != nil &&  == 0 {
		return 
	}
	.end += int32()
	return nil
}

func ( *bufferedRowReader) () {
	.rows = nil
	.off = 0
	.end = 0
}

type mergeBuffer struct {
	compare func(Row, Row) int
	rows    []Rows
	buffer  [][]Row
	head    []int
	len     int
	copy    [mergeBufferSize]Row
}

const mergeBufferSize = 1 << 10

func newMergeBuffer() *mergeBuffer {
	return mergeBufferPool.Get().(*mergeBuffer)
}

var mergeBufferPool = &sync.Pool{
	New: func() any {
		return new(mergeBuffer)
	},
}

func ( *mergeBuffer) ( []Rows,  func(Row, Row) int) {
	.compare = 
	.rows = append(.rows, ...)
	 := len()
	if len(.buffer) <  {
		 :=  - len(.buffer)
		 := make([][]Row, )
		for  := range  {
			[] = make([]Row, 0, mergeBufferSize)
		}
		.buffer = append(.buffer, ...)
		.head = append(.head, make([]int, )...)
	}
	.len = 
}

func ( *mergeBuffer) () {
	for  := range .rows {
		.buffer[] = .buffer[][:0]
		.head[] = 0
	}
	.rows = .rows[:0]
	.compare = nil
	for  := range .copy {
		.copy[] = nil
	}
	.len = 0
}

func ( *mergeBuffer) () {
	.reset()
	mergeBufferPool.Put()
}

func ( *mergeBuffer) () error {
	.len = len(.rows)
	for  := range .rows {
		if .head[] < len(.buffer[]) {
			// There is still rows data in m.buffer[i]. Skip filling the row buffer until
			// all rows have been read.
			continue
		}
		.head[] = 0
		.buffer[] = .buffer[][:mergeBufferSize]
		,  := .rows[].ReadRows(.buffer[])
		if  != nil {
			if !errors.Is(, io.EOF) {
				return 
			}
		}
		.buffer[] = .buffer[][:]
	}
	heap.Init()
	return nil
}

func ( *mergeBuffer) (,  int) bool {
	 := .buffer[]
	if len() == 0 {
		return false
	}
	 := .buffer[]
	if len() == 0 {
		return true
	}
	return .compare([.head[]], [.head[]]) == -1
}

func ( *mergeBuffer) () interface{} {
	.len--
	// We don't use the popped value.
	return nil
}

func ( *mergeBuffer) () int {
	return .len
}

func ( *mergeBuffer) (,  int) {
	.buffer[], .buffer[] = .buffer[], .buffer[]
	.head[], .head[] = .head[], .head[]
}

func ( *mergeBuffer) ( interface{}) {
	panic("NOT IMPLEMENTED")
}

func ( *mergeBuffer) ( RowWriter) ( int64,  error) {
	 = .fill()
	if  != nil {
		return 0, 
	}
	var  int
	for .left() {
		 := .read()
		if  == 0 {
			break
		}
		,  = .WriteRows(.copy[:])
		if  != nil {
			return
		}
		 += int64()
		 = .fill()
		if  != nil {
			return
		}
	}
	return
}

func ( *mergeBuffer) () bool {
	for  := 0;  < .len; ++ {
		if .head[] < len(.buffer[]) {
			return true
		}
	}
	return false
}

func ( *mergeBuffer) () ( int64) {
	for  < int64(len(.copy)) && .Len() != 0 {
		 := .buffer[:.len][0]
		if len() == 0 {
			heap.Pop()
			continue
		}
		.copy[] = append(.copy[][:0], [.head[0]]...)
		.head[0]++
		++
		if .head[0] < len() {
			// There is still rows in this row group. Adjust  the heap
			heap.Fix(, 0)
		} else {
			heap.Pop()
		}
	}
	return
}

var (
	_ RowReaderWithSchema = (*mergedRowGroupRows)(nil)
	_ RowWriterTo         = (*mergedRowGroupRows)(nil)
	_ heap.Interface      = (*mergeBuffer)(nil)
	_ RowWriterTo         = (*mergeBuffer)(nil)
)