package arrowutils

import (
	
	
	
	

	
	
)

// GetGroupsAndOrderedSetRanges returns a min-heap of group ranges and ordered
// set ranges of the given arrow arrays in that order. For the given input with
// a single array:
// a a c d a b c
// This function will return [2, 3, 4, 5, 6] for the group ranges and [4] for
// the ordered set ranges. A group is a collection of values that are equal and
// an ordered set is a collection of groups that are in increasing order.
// The ranges are determined by iterating over the arrays and comparing the
// current group value for each column. The firstGroup to compare against must
// be provided (it can be initialized to the values at index 0 of each array).
// The last group found is returned.
func (
	 []any,  []arrow.Array,
) (*Int64Heap, *Int64Heap, []any, error) {
	if len() != len() {
		return nil,
			nil,
			nil,
			fmt.Errorf(
				"columns mismatch (%d != %d) when getting group ranges",
				len(),
				len(),
			)
	}

	// Safe copy the group in order to not overwrite the input slice values.
	 := make([]any, len())
	for ,  := range  {
		switch concreteV := .(type) {
		case []byte:
			[] = append([]byte(nil), ...)
		default:
			[] = 
		}
	}

	// groupRanges keeps track of the bounds of the group by columns.
	 := &Int64Heap{}
	heap.Init()
	// setRanges keeps track of the bounds of ordered sets. i.e. in the
	// following slice, (a, a, b, c) is an ordered set of three groups. The
	// second ordered set is (a, e): [a, a, b, c, a, e]
	 := &Int64Heap{}
	heap.Init()

	// handleCmpResult is a closure that encapsulates the handling of the result
	// of comparing a current grouping column with a value in a group array.
	 := func(,  int,  arrow.Array,  int) error {
		switch  {
		case -1, 1:
			// New group, append range index.
			heap.Push(, int64())
			if  == 1 {
				// New ordered set encountered.
				heap.Push(, int64())
			}

			// And update the current group.
			 := .GetOneForMarshal()
			switch concreteV := .(type) {
			case []byte:
				// Safe copy, otherwise the value might get overwritten.
				[] = append([]byte(nil), ...)
			default:
				[] = 
			}
		case 0:
			// Equal to group, do nothing.
		}
		return nil
	}
	for ,  := range  {
		switch t := .(type) {
		case *array.Binary:
			for  := 0;  < .Len(); ++ {
				var  []byte
				if [] != nil {
					 = [].([]byte)
				}
				 := .IsNull()
				,  := nullComparison( == nil, )
				if ! {
					 = bytes.Compare(, .Value())
				}
				if  := (, , , );  != nil {
					return nil, nil, nil, 
				}
			}
		case *array.String:
			for  := 0;  < .Len(); ++ {
				var  *string
				if [] != nil {
					 := [].(string)
					 = &
				}
				 := .IsNull()
				,  := nullComparison( == nil, )
				if ! {
					 = strings.Compare(*, .Value())
				}
				if  := (, , , );  != nil {
					return nil, nil, nil, 
				}
			}
		case *array.Int64:
			for  := 0;  < .Len(); ++ {
				var  *int64
				if [] != nil {
					 := [].(int64)
					 = &
				}
				 := .IsNull()
				,  := nullComparison( == nil, )
				if ! {
					 = compareInt64(*, .Value())
				}
				if  := (, , , );  != nil {
					return nil, nil, nil, 
				}
			}
		case *array.Boolean:
			for  := 0;  < .Len(); ++ {
				var  *bool
				if [] != nil {
					 := [].(bool)
					 = &
				}
				 := .IsNull()
				,  := nullComparison( == nil, )
				if ! {
					 = compareBools(*, .Value())
				}
				if  := (, , , );  != nil {
					return nil, nil, nil, 
				}
			}
		case VirtualNullArray:
			for  := 0;  < .Len(); ++ {
				,  := nullComparison([] == nil, true)
				if ! {
					return nil, nil, nil, fmt.Errorf(
						"null comparison should always be valid but group was: %v", [],
					)
				}
				if  := (, , , );  != nil {
					return nil, nil, nil, 
				}
			}
		case *array.Dictionary:
			switch dict := .Dictionary().(type) {
			case *array.Binary:
				for  := 0;  < .Len(); ++ {
					var  []byte
					if [] != nil {
						 = [].([]byte)
					}
					 := .IsNull()
					,  := nullComparison( == nil, )
					if ! {
						 = bytes.Compare(, .Value(.GetValueIndex()))
					}
					if  := (, , , );  != nil {
						return nil, nil, nil, 
					}
				}

			case *array.String:
				for  := 0;  < .Len(); ++ {
					var  *string
					if [] != nil {
						 := [].(string)
						 = &
					}
					 := .IsNull()
					,  := nullComparison( == nil, )
					if ! {
						 = strings.Compare(*,
							.Value(.GetValueIndex()),
						)
					}
					if  := (, , , );  != nil {
						return nil, nil, nil, 
					}
				}

			default:
				panic(fmt.Sprintf("unsupported dictionary type: %T", ))
			}
		default:
			panic(fmt.Sprintf("unsupported type: %T", ))
		}
	}
	return , , , nil
}

// nullComparison encapsulates null comparison. leftNull is whether the current
// Note that this function observes default SQL semantics as well as our own,
// i.e. nulls sort first.
// The comparison integer is returned, as well as whether either value was null.
// If the returned boolean is false, the comparison should be disregarded.
func nullComparison(,  bool) (int, bool) {
	if ! && ! {
		// Both are not null, this implies that the null comparison should be
		// disregarded.
		return 0, false
	}

	if  {
		if ! {
			return -1, true
		}
		return 0, true
	}
	return 1, true
}

func compareInt64(,  int64) int {
	if  <  {
		return -1
	}
	if  >  {
		return 1
	}
	return 0
}

func compareBools(,  bool) int {
	if  ==  {
		return 0
	}

	if ! {
		return -1
	}
	return 1
}

type Int64Heap []int64

func ( Int64Heap) () int {
	return len()
}

func ( Int64Heap) (,  int) bool {
	return [] < []
}

func ( Int64Heap) (,  int) {
	[], [] = [], []
}

func ( *Int64Heap) ( any) {
	* = append(*, .(int64))
}

func ( *Int64Heap) () any {
	 := *
	 := len()
	 := [-1]
	* = [0 : -1]
	return 
}

// PopNextNotEqual returns the next least element not equal to compare.
func ( *Int64Heap) ( int64) (int64, bool) {
	for .Len() > 0 {
		 := heap.Pop().(int64)
		if  !=  {
			return , true
		}
	}
	return 0, false
}

// Unwrap unwraps the heap into the provided scratch space. The result is a
// slice that will have distinct ints in order. This helps with reiterating over
// the same heap.
func ( *Int64Heap) ( []int64) []int64 {
	 = [:0]
	if .Len() == 0 {
		return 
	}
	 := (*)[0]
	 = append(, )
	for .Len() > 0 {
		if  := heap.Pop().(int64);  !=  {
			 = append(, )
			 = 
		}
	}
	return 
}