package arrowutils

import (
	
	
	
	
	

	
	
	

	
)

// MergeRecords merges the given records. The records must all have the same
// schema. orderByCols is a slice of indexes into the columns that the records
// and resulting records are ordered by. While ordering the limit is checked before appending more rows.
// If limit is  0, no limit is applied.
// Note that the given records should already be ordered by the given columns.
// WARNING: Only ascending ordering is currently supported.
func (
	 memory.Allocator,
	 []arrow.Record,
	 []SortingColumn,
	 uint64,
) (arrow.Record, error) {
	 := cursorHeap{
		cursors:     make([]cursor, len()),
		orderByCols: ,
	}
	for  := range .cursors {
		.cursors[].r = []
	}

	 := [0].Schema()
	 := builder.NewRecordBuilder(, )
	defer .Release()

	if  == 0 {
		 = math.MaxInt64
	}
	 := uint64(0)

	heap.Init(&)
	for .Len() > 0 &&  <  {
		// Minimum cursor is always at index 0.
		 := .cursors[0].r
		 := .cursors[0].curIdx
		for ,  := range .Fields() {
			if  := builder.AppendValue(, .Column(), );  != nil {
				return nil, 
			}
		}
		if int64(+1) >= .NumRows() {
			// Pop the cursor since it has no more data.
			_ = heap.Pop(&)
			++
			continue
		}
		.cursors[0].curIdx++
		heap.Fix(&, 0)
		++
	}

	return .NewRecord(), nil
}

type cursor struct {
	r      arrow.Record
	curIdx int
}

type cursorHeap struct {
	cursors     []cursor
	orderByCols []SortingColumn
}

func ( cursorHeap) () int {
	return len(.cursors)
}

func ( cursorHeap) (,  int) bool {
	for  := range .orderByCols {
		 := .cursors[]
		 := .cursors[]
		 := .orderByCols[]
		 := .r.Column(.Index)
		 := .r.Column(.Index)
		if ,  := nullComparison(.IsNull(.curIdx), .IsNull(.curIdx));  {
			if .orderByCols[].NullsFirst {
				return  == -1
			}
			if !.orderByCols[].NullsFirst {
				return  == 1
			}
		}

		 := .compare(, , )
		if  != 0 {
			// Use direction to reorder the comparison. Direction determines if the list
			// is in ascending or descending.
			//
			// For instance if comparison between i,j value is -1 and direction is -1
			// this will resolve to true hence the list will be in ascending order. Same
			// principle applies for descending.
			return  == .orderByCols[].Direction.comparison()
		}
		// Try comparing the next column
	}

	return false
}

func ( cursorHeap) (, ,  int) int {
	 := .cursors[]
	 := .cursors[]
	 := .orderByCols[]
	switch arr1 := .r.Column(.Index).(type) {
	case *array.Binary:
		 := .r.Column(.Index).(*array.Binary)
		return bytes.Compare(.Value(.curIdx), .Value(.curIdx))
	case *array.String:
		 := .r.Column(.Index).(*array.String)
		return strings.Compare(.Value(.curIdx), .Value(.curIdx))
	case *array.Int64:
		 := .r.Column(.Index).(*array.Int64)
		 := .Value(.curIdx)
		 := .Value(.curIdx)
		if  ==  {
			return 0
		}
		if  <  {
			return -1
		}
		return 1
	case *array.Int32:
		 := .r.Column(.Index).(*array.Int32)
		 := .Value(.curIdx)
		 := .Value(.curIdx)
		if  ==  {
			return 0
		}
		if  <  {
			return -1
		}
		return 1
	case *array.Uint64:
		 := .r.Column(.Index).(*array.Uint64)
		 := .Value(.curIdx)
		 := .Value(.curIdx)
		if  ==  {
			return 0
		}
		if  <  {
			return -1
		}
		return 1
	case *array.Dictionary:
		switch dict := .Dictionary().(type) {
		case *array.Binary:
			 := .r.Column(.Index).(*array.Dictionary)
			 := .Dictionary().(*array.Binary)
			return bytes.Compare(.Value(.GetValueIndex(.curIdx)), .Value(.GetValueIndex(.curIdx)))
		default:
			panic(fmt.Sprintf("unsupported dictionary type for record merging %T", ))
		}
	default:
		panic(fmt.Sprintf("unsupported type for record merging %T", ))
	}
}

func ( cursorHeap) (,  int) {
	.cursors[], .cursors[] = .cursors[], .cursors[]
}

func ( cursorHeap) ( any) {
	panic(
		"number of cursors are known at Init time, none should ever be pushed",
	)
}

func ( *cursorHeap) () any {
	 := len(.cursors) - 1
	 := .cursors[]
	.cursors = .cursors[:]
	return 
}