package arrowutils

import (
	
	
	
	
	
	
	
	
	

	
	
	
	
	

	
)

type Direction uint

const (
	Ascending Direction = iota
	Descending
)

func ( Direction) () int {
	switch  {
	case Ascending:
		return -1
	case Descending:
		return 1
	default:
		panic("unexpected direction value " + strconv.Itoa(int()) + " only -1 and 1 are allowed")
	}
}

// SortingColumn describes a sorting column on a arrow.Record.
type SortingColumn struct {
	Index      int
	Direction  Direction
	NullsFirst bool
}

// SortRecord sorts given arrow.Record by columns. Returns *array.Int32 of
// indices to sorted rows or record r.
//
// Comparison is made sequentially by each column. When rows are equal in the
// first column we compare the rows om the second column and so on and so forth
// until rows that are not equal are found.
func ( arrow.Record,  []SortingColumn) (*array.Int32, error) {
	if len() == 0 {
		return nil, errors.New("pqarrow/arrowutils: at least one column is needed for sorting")
	}
	,  := newMultiColSorter(, )
	if  != nil {
		return nil, 
	}
	defer .Release()
	sort.Sort()
	return .indices.NewArray().(*array.Int32), nil
}

// Take uses indices which is an array of row index and returns a new record
// that only contains rows specified in indices.
//
// Use compute.WithAllocator to pass a custom memory.Allocator.
func ( context.Context,  arrow.Record,  *array.Int32) (arrow.Record, error) {
	// compute.Take doesn't support dictionaries or lists. Use take on r when r
	// does not have these columns.
	var  bool
	for  := 0;  < int(.NumCols()); ++ {
		if .Column().DataType().ID() == arrow.DICTIONARY ||
			.Column().DataType().ID() == arrow.RUN_END_ENCODED ||
			.Column().DataType().ID() == arrow.LIST ||
			.Column().DataType().ID() == arrow.STRUCT {
			 = true
			break
		}
	}
	if ! {
		,  := compute.Take(
			,
			compute.TakeOptions{BoundsCheck: true},
			compute.NewDatumWithoutOwning(),
			compute.NewDatumWithoutOwning(),
		)
		if  != nil {
			return nil, 
		}
		return .(*compute.RecordDatum).Value, nil
	}
	if .NumCols() == 0 {
		return , nil
	}

	 := make([]arrow.Array, .NumCols())
	defer func() {
		for ,  := range  {
			if  != nil {
				.Release()
			}
		}
	}()
	var  errgroup.Group
	for  := 0;  < int(.NumCols()); ++ {
		 := 
		 := .Column()
		switch arr := .Column().(type) {
		case *array.Dictionary:
			.Go(func() error { return TakeDictColumn(, , , , ) })
		case *array.RunEndEncoded:
			.Go(func() error { return TakeRunEndEncodedColumn(, , , , ) })
		case *array.List:
			.Go(func() error { return TakeListColumn(, , , , ) })
		case *array.Struct:
			.Go(func() error { return TakeStructColumn(, , , , ) })
		default:
			.Go(func() error { return TakeColumn(, , , , ) })
		}
	}
	if  := .Wait();  != nil {
		return nil, 
	}

	// We checked for at least one column at the beginning of the function.
	 := [0].Len()
	for ,  := range  {
		if .Len() !=  {
			return nil, fmt.Errorf(
				"pqarrow/arrowutils: expected same length %d for all columns got %d for %s", , .Len(), .DataType().Name(),
			)
		}
	}
	return array.NewRecord(.Schema(), , int64(.Len())), nil
}

func ( context.Context,  arrow.Array,  int,  []arrow.Array,  *array.Int32) error {
	,  := compute.TakeArray(, , )
	if  != nil {
		return 
	}
	[] = 
	return nil
}

func ( context.Context,  *array.Dictionary,  int,  []arrow.Array,  *array.Int32) error {
	switch .Dictionary().(type) {
	case *array.String, *array.Binary:
		 := array.NewDictionaryBuilderWithDict(
			compute.GetAllocator(), .DataType().(*arrow.DictionaryType), .Dictionary(),
		).(*array.BinaryDictionaryBuilder)
		defer .Release()

		.Reserve(.Len())
		 := .IndexBuilder()
		for ,  := range .Int32Values() {
			if .IsNull(int()) {
				.AppendNull()
				continue
			}
			.Append(.GetValueIndex(int()))
		}

		[] = .NewArray()
		return nil
	case *array.FixedSizeBinary:
		 := array.NewDictionaryBuilderWithDict(
			compute.GetAllocator(), .DataType().(*arrow.DictionaryType), .Dictionary(),
		).(*array.FixedSizeBinaryDictionaryBuilder)
		defer .Release()

		.Reserve(.Len())
		 := .IndexBuilder()
		for ,  := range .Int32Values() {
			if .IsNull(int()) {
				.AppendNull()
				continue
			}
			// TODO: Improve this by not copying actual values.
			.Append(.GetValueIndex(int()))
		}

		[] = .NewArray()
		return nil
	}

	return nil
}

func ( context.Context,  *array.RunEndEncoded,  int,  []arrow.Array,  *array.Int32) error {
	 := array.NewInt32Builder(compute.GetAllocator())
	defer .Release()

	 := .Values().(*array.Dictionary)
	for  := 0;  < .Len(); ++ {
		if .IsNull(.GetPhysicalIndex()) {
			.AppendNull()
		} else {
			.Append(int32(.GetValueIndex(.GetPhysicalIndex())))
		}
	}
	 := .NewInt32Array()
	defer .Release()

	 := make([]arrow.Array, 1)
	if  := TakeColumn(, , 0, , );  != nil {
		return 
	}
	 := [0].(*array.Int32)
	defer .Release()

	 := array.NewRunEndEncodedBuilder(
		compute.GetAllocator(), .RunEndsArr().DataType(), .Values().DataType(),
	)
	defer .Release()
	.Reserve(.Len())

	 := .Dictionary().(*array.String)
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			.AppendNull()
			continue
		}
		 := .Value()
		 := .Value(int())
		if  := .AppendValueFromString();  != nil {
			return 
		}
	}

	[] = .NewRunEndEncodedArray()
	return nil
}

func ( context.Context,  *array.List,  int,  []arrow.Array,  *array.Int32) error {
	 := compute.GetAllocator()
	 := array.NewBuilder(, .DataType()).(*array.ListBuilder)

	switch valueBuilder := .ValueBuilder().(type) {
	case *array.BinaryDictionaryBuilder:
		defer .Release()

		 := .ListValues().(*array.Dictionary)
		switch dictV := .Dictionary().(type) {
		case *array.String:
			if  := .InsertStringDictValues();  != nil {
				return 
			}
		case *array.Binary:
			if  := .InsertDictValues();  != nil {
				return 
			}
		}
		 := .IndexBuilder()

		.Reserve(.Len())
		for ,  := range .Int32Values() {
			if .IsNull(int()) {
				.AppendNull()
				continue
			}

			.Append(true)
			,  := .ValueOffsets(int())
			for  := ;  < ; ++ {
				.Append(.GetValueIndex(int()))
			}
			// Resize is necessary here for the correct offsets to be appended to
			// the list builder. Otherwise, length will remain at 0.
			.Resize(.Len())
		}

		[] = .NewArray()
		return nil
	case *array.StructBuilder:
		defer .Release()

		 := .ListValues().(*array.Struct)

		// expand the indices from the list to each row in the struct.
		 := array.NewInt32Builder()
		.Reserve(.Len())
		defer .Release()

		for ,  := range .Int32Values() {
			,  := .ValueOffsets(int())
			for  := ;  < ; ++ {
				.Append(int32())
			}
		}
		 := .NewInt32Array()
		defer .Release()

		 := []arrow.Array{}
		 := TakeStructColumn(, , 0, , )
		if  != nil {
			return 
		}
		defer func() {
			for ,  := range  {
				.Release()
			}
		}()

		 := array.NewInt32Builder()
		defer .Release()

		.Append(0)
		 := int32(0)
		for ,  := range .Int32Values() {
			if .IsNull(int()) {
				.AppendNull()
				continue
			}

			,  := .ValueOffsets(int())
			// calculate the length of the current list element and add it to the offsets
			 += int32( - )
			.Append()
		}
		 := .NewInt32Array()
		defer .Release()

		 := array.NewData(
			arrow.ListOf(.DataType()),
			.Len(),
			[]*memory.Buffer{nil, .Data().Buffers()[1]},
			[]arrow.ArrayData{[0].Data()},
			.NullN(),
			0,
		)
		defer .Release()
		[] = array.NewListData()

		return nil
	default:
		return fmt.Errorf("unexpected value builder type %T for list column", .ValueBuilder())
	}
}

func ( context.Context,  *array.Struct,  int,  []arrow.Array,  *array.Int32) error {
	 := .Data().DataType().(*arrow.StructType)

	// Immediately, return this struct if it has no fields/columns
	if .NumField() == 0 {
		// If the original record is released and this is released once more,
		// as usually done, we want to retain it once more.
		.Retain()
		[] = 
		return nil
	}

	 := make([]arrow.Array, .NumField())
	 := make([]string, .NumField())
	defer func() {
		for ,  := range  {
			if  != nil {
				.Release()
			}
		}
	}()

	for  := 0;  < .NumField(); ++ {
		[] = .Field().Name

		switch f := .Field().(type) {
		case *array.RunEndEncoded:
			if  := TakeRunEndEncodedColumn(, , , , );  != nil {
				return 
			}
		case *array.Dictionary:
			if  := TakeDictColumn(, , , , );  != nil {
				return 
			}
		case *array.List:
			if  := TakeListColumn(, , , , );  != nil {
				return 
			}
		default:
			 := TakeColumn(, , , , )
			if  != nil {
				return 
			}
		}
	}

	,  := array.NewStructArray(, )
	if  != nil {
		return 
	}

	[] = 
	return nil
}

type multiColSorter struct {
	indices     *builder.OptInt32Builder
	comparisons []comparator
	directions  []int
	nullsFirst  []bool
}

func newMultiColSorter(
	 arrow.Record,
	 []SortingColumn,
) (*multiColSorter, error) {
	 := multiColSorterPool.Get().(*multiColSorter)
	if .NumRows() <= 1 {
		if .NumRows() == 1 {
			.indices.Append(0)
		}
		return , nil
	}
	.Reserve(int(.NumRows()), len())
	for  := range  {
		.directions[] = [].Direction.comparison()
		.nullsFirst[] = [].NullsFirst
	}
	for ,  := range  {
		switch e := .Column(.Index).(type) {
		case *array.Int16:
			.comparisons[] = newOrderedSorter[int16](, cmp.Compare)
		case *array.Int32:
			.comparisons[] = newOrderedSorter[int32](, cmp.Compare)
		case *array.Int64:
			.comparisons[] = newOrderedSorter[int64](, cmp.Compare)
		case *array.Uint16:
			.comparisons[] = newOrderedSorter[uint16](, cmp.Compare)
		case *array.Uint32:
			.comparisons[] = newOrderedSorter[uint32](, cmp.Compare)
		case *array.Uint64:
			.comparisons[] = newOrderedSorter[uint64](, cmp.Compare)
		case *array.Float64:
			.comparisons[] = newOrderedSorter[float64](, cmp.Compare)
		case *array.String:
			.comparisons[] = newOrderedSorter[string](, cmp.Compare)
		case *array.Binary:
			.comparisons[] = newOrderedSorter[[]byte](, bytes.Compare)
		case *array.Timestamp:
			.comparisons[] = newOrderedSorter[arrow.Timestamp](, cmp.Compare)
		case *array.Dictionary:
			switch elem := .Dictionary().(type) {
			case *array.String:
				.comparisons[] = newOrderedSorter[string](
					&stringDictionary{
						dict: ,
						elem: ,
					},
					cmp.Compare,
				)
			case *array.Binary:
				.comparisons[] = newOrderedSorter[[]byte](
					&binaryDictionary{
						dict: ,
						elem: ,
					},
					bytes.Compare,
				)
			case *array.FixedSizeBinary:
				.comparisons[] = newOrderedSorter[[]byte](
					&fixedSizeBinaryDictionary{
						dict: ,
						elem: ,
					},
					bytes.Compare,
				)
			default:
				.Release()
				return nil, fmt.Errorf("unsupported dictionary column type for sorting %T for column %s", , .Schema().Field(.Index).Name)
			}
		default:
			.Release()
			return nil, fmt.Errorf("unsupported column type for sorting %T for column %s", , .Schema().Field(.Index).Name)
		}
	}
	return , nil
}

func ( *multiColSorter) (,  int) {
	.indices.Reserve()
	for  := 0;  < ; ++ {
		.indices.Set(, int32())
	}
	.comparisons = slices.Grow(.comparisons, )[:]
	.directions = slices.Grow(.directions, )[:]
	.nullsFirst = slices.Grow(.nullsFirst, )[:]
}

func ( *multiColSorter) () {
	.indices.Reserve(0)
	.comparisons = .comparisons[:0]
	.directions = .directions[:0]
	.nullsFirst = .nullsFirst[:0]
}

func ( *multiColSorter) () {
	.Reset()
	multiColSorterPool.Put()
}

var multiColSorterPool = &sync.Pool{
	New: func() any {
		return &multiColSorter{
			indices: builder.NewOptInt32Builder(arrow.PrimitiveTypes.Int32),
		}
	},
}

var _ sort.Interface = (*multiColSorter)(nil)

func ( *multiColSorter) () int { return .indices.Len() }

func ( *multiColSorter) (,  int) bool {
	for  := range .comparisons {
		 := .compare(, int(.indices.Value()), int(.indices.Value()))
		if  != 0 {
			// Use direction to reorder the comparison. Direction determines if the list
			// is in ascending or descending.
			//
			// For instance if comparison between i,j value is -1 and direction is -1
			// this will resolve to true hence the list will be in ascending order. Same
			// principle applies for descending.
			return  == .directions[]
		}
		// Try comparing the next column
	}
	return false
}

func ( *multiColSorter) (, ,  int) int {
	 := .comparisons[]
	if .IsNull() {
		if .IsNull() {
			return 0
		}
		if .directions[] == 1 {
			if .nullsFirst[] {
				return 1
			}
			return -1
		}
		if .nullsFirst[] {
			return -1
		}
		return 1
	}
	if .IsNull() {
		if .directions[] == 1 {
			if .nullsFirst[] {
				return -1
			}
			return 1
		}
		if .nullsFirst[] {
			return 1
		}
		return -1
	}
	return .Compare(, )
}

func ( *multiColSorter) (,  int) {
	.indices.Swap(, )
}

type comparator interface {
	Compare(i, j int) int
	IsNull(int) bool
}

type orderedArray[ any] interface {
	Value(int) 
	IsNull(int) bool
}

type orderedSorter[ any] struct {
	array   orderedArray[]
	compare func(, ) int
}

func newOrderedSorter[ any]( orderedArray[],  func(, ) int) *orderedSorter[] {
	return &orderedSorter[]{
		array:   ,
		compare: ,
	}
}

func ( *orderedSorter[]) ( int) bool {
	return .array.IsNull()
}

func ( *orderedSorter[]) (,  int) int {
	return .compare(.array.Value(), .array.Value())
}

type stringDictionary struct {
	dict *array.Dictionary
	elem *array.String
}

func ( *stringDictionary) ( int) bool {
	return .dict.IsNull()
}

func ( *stringDictionary) ( int) string {
	return .elem.Value(.dict.GetValueIndex())
}

type binaryDictionary struct {
	dict *array.Dictionary
	elem *array.Binary
}

func ( *binaryDictionary) ( int) bool {
	return .dict.IsNull()
}

func ( *binaryDictionary) ( int) []byte {
	return .elem.Value(.dict.GetValueIndex())
}

type fixedSizeBinaryDictionary struct {
	dict *array.Dictionary
	elem *array.FixedSizeBinary
}

func ( *fixedSizeBinaryDictionary) ( int) bool {
	return .dict.IsNull()
}

func ( *fixedSizeBinaryDictionary) ( int) []byte {
	return .elem.Value(.dict.GetValueIndex())
}