package physicalplan

import (
	
	
	
	
	

	
	
	
	
	
	
	

	
	
	
)

func (
	 memory.Allocator,
	 trace.Tracer,
	 *logicalplan.Aggregation,
	 bool,
	 bool,
	 maphash.Seed,
) (PhysicalPlan, error) {
	 := make([]Aggregation, 0, len(.AggExprs))

	// TODO(brancz): This is not correct, it doesn't handle aggregations
	// correctly of previously projected columns like `sum(value + timestamp)`.
	// Need to understand why we need to handle dynamic columns here
	// differently and not just use the aggregation funciton's expression.
	for ,  := range .AggExprs {
		 := Aggregation{}
		.Accept(PreExprVisitorFunc(func( logicalplan.Expr) bool {
			if ,  := .(*logicalplan.DynamicColumn);  {
				.dynamic = true
			}

			return true
		}))

		.resultName = .Name()
		.function = .Func
		.expr = .Expr

		 = append(, )
	}

	if  {
		if len() > 1 {
			return nil, fmt.Errorf(
				"OrderedAggregate does not support multiple aggregations, found %d", len(),
			)
		}
		return NewOrderedAggregate(
			,
			,
			// TODO(asubiotto): Multiple aggregation functions are not yet
			// supported. The planning code should already have planned a hash
			// aggregation in this case.
			[0],
			.GroupExprs,
			,
		), nil
	}
	return NewHashAggregate(
		,
		,
		,
		.GroupExprs,
		,
		,
	), nil
}

func chooseAggregationFunction(
	 logicalplan.AggFunc,
	 arrow.DataType,
) (AggregationFunction, error) {
	switch  {
	case logicalplan.AggFuncSum:
		return &SumAggregation{}, nil
	case logicalplan.AggFuncMin:
		return &MinAggregation{}, nil
	case logicalplan.AggFuncMax:
		return &MaxAggregation{}, nil
	case logicalplan.AggFuncCount:
		return &CountAggregation{}, nil
	case logicalplan.AggFuncUnique:
		return &UniqueAggregation{}, nil
	case logicalplan.AggFuncAnd:
		return &AndAggregation{}, nil
	default:
		return nil, fmt.Errorf("unsupported aggregation function: %s", .String())
	}
}

// Aggregation groups together some lower level primitives to for the column to be aggregated by its function.
type Aggregation struct {
	expr       logicalplan.Expr
	dynamic    bool // dynamic indicates that this aggregation is performed against a dynamic column.
	resultName string
	function   logicalplan.AggFunc
	arrays     []builder.ColumnBuilder // TODO: These can actually live outside this struct and be shared. Only at the very end will they be read by each column and then aggregated separately.
}

type AggregationFunction interface {
	Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)
}

type HashAggregate struct {
	pool                  memory.Allocator
	tracer                trace.Tracer
	groupByColumnMatchers []logicalplan.Expr
	hashSeed              maphash.Seed
	next                  PhysicalPlan
	// Indicate is this is the last aggregation or
	// if this is a aggregation with another aggregation to follow after synchronizing.
	finalStage bool

	// Buffers that are reused across callback calls.
	groupByFields      []arrow.Field
	groupByFieldHashes []hashCombiner
	groupByArrays      []arrow.Array
	hashToAggregate    map[uint64]hashtuple

	// aggregates are the collection of all the hash aggregates for this hash aggregation. This is useful when a single hash aggregate cannot fit
	// into a single record and needs to be split into multiple records.
	aggregates []*hashAggregate
}

type hashtuple struct {
	aggregate int // aggregate is the index into the aggregates slice
	array     int // array is the index into the aggregations array
}

// hashAggregate represents a single hash aggregation.
type hashAggregate struct {
	dynamicAggregations []Aggregation
	// dynamicFieldsConverted tracks the fields that match with
	// dynamicAggregations and have been converted to aggregations on a concrete
	// column.
	dynamicAggregationsConverted map[string]struct{}
	aggregations                 []Aggregation
	// concreteAggregations memoizes the number of concrete aggregations at
	// initialization this number needs to be recorded because dynamic
	// aggregations are converted to concrete aggregations at runtime.
	concreteAggregations int
	groupByCols          map[string]builder.ColumnBuilder

	colOrdering []string
	rowCount    int
}

func (
	 memory.Allocator,
	 trace.Tracer,
	 []Aggregation,
	 []logicalplan.Expr,
	 maphash.Seed,
	 bool,
) *HashAggregate {
	 := []Aggregation{}
	 := []Aggregation{}
	for ,  := range  {
		if .dynamic {
			 = append(, )
		} else {
			 = append(, )
		}
	}

	return &HashAggregate{
		pool:   ,
		tracer: ,
		// TODO: Matchers can be optimized to be something like a radix tree or just a fast-lookup datastructure for exact matches or prefix matches.
		groupByColumnMatchers: ,
		hashSeed:              ,
		finalStage:            ,

		groupByFields:      make([]arrow.Field, 0, 10),
		groupByFieldHashes: make([]hashCombiner, 0, 10),
		groupByArrays:      make([]arrow.Array, 0, 10),
		hashToAggregate:    map[uint64]hashtuple{},
		aggregates: []*hashAggregate{ // initialize a single hash aggregate; we expect this array to only every grow during very large aggregations.
			{
				dynamicAggregations:          ,
				dynamicAggregationsConverted: make(map[string]struct{}),
				aggregations:                 ,
				concreteAggregations:         len(),
				groupByCols:                  map[string]builder.ColumnBuilder{},
				colOrdering:                  []string{},
			},
		},
	}
}

func ( *HashAggregate) () {
	for ,  := range .groupByArrays {
		.Release()
	}
	for ,  := range .aggregates {
		for ,  := range .aggregations {
			for ,  := range .arrays {
				.Release()
			}
		}
		for ,  := range .groupByCols {
			.Release()
		}
	}
	.next.Close()
}

func ( *HashAggregate) ( PhysicalPlan) {
	.next = 
}

func ( *HashAggregate) () *Diagram {
	var  *Diagram
	if .next != nil {
		 = .next.Draw()
	}

	 := make([]string, 0, len(.aggregates[0].aggregations))
	for ,  := range .aggregates[0].aggregations {
		 = append(, .resultName)
	}

	var  []string
	for ,  := range .groupByColumnMatchers {
		 = append(, .String())
	}
	 := fmt.Sprintf("HashAggregate (%s by %s)", strings.Join(, ","), strings.Join(, ","))
	return &Diagram{Details: , Child: }
}

// Go translation of boost's hash_combine function. Read here why these values
// are used and good choices: https://stackoverflow.com/questions/35985960/c-why-is-boosthash-combine-the-best-way-to-combine-hash-values
func hashCombine(,  uint64) uint64 {
	return  ^ ( + 0x9e3779b9 + ( << 6) + ( >> 2))
}

// hashCombiner combines a given hash with another hash that is passed.
type hashCombiner interface {
	hashCombine(rhs uint64) uint64
}

// uint64HashCombine combines a pre-defined uint64 hash with a given uint64 hash.
type uint64HashCombine struct {
	value uint64
}

func ( *uint64HashCombine) ( uint64) uint64 {
	return hashCombine(.value, )
}

func ( *HashAggregate) ( context.Context,  arrow.Record) error {
	// Generates high volume of spans. Comment out if needed during development.
	// ctx, span := a.tracer.Start(ctx, "HashAggregate/Callback")
	// defer span.End()

	// aggregate is the current aggregation
	 := .aggregates[len(.aggregates)-1]

	 := .Schema().Fields() // NOTE: call Fields() once to avoid creating a copy each time
	 := .groupByFields
	 := .groupByFieldHashes
	 := .groupByArrays

	defer func() {
		 = [:0]
		 = [:0]
		 = [:0]
	}()

	 := make([]arrow.Array, len(.aggregations))
	 := 0
	 := 0

	for  := 0;  < .Schema().NumFields(); ++ {
		 := .Schema().Field()
		for ,  := range .groupByColumnMatchers {
			if .MatchColumn(.Name) {
				 = append(, )
				 = append(, .Column())

				if .finalStage { // in the final stage expect the hashes to already exist, so only need to combine them as normal hashes
					 = append(,
						&uint64HashCombine{value: scalar.Hash(.hashSeed, scalar.NewStringScalar(.Name))},
					)
					continue
				}

				 = append(,
					&uint64HashCombine{value: scalar.Hash(.hashSeed, scalar.NewStringScalar(.Name))},
				)
			}
		}

		if ,  := .dynamicAggregationsConverted[.Name]; ! {
			for ,  := range .dynamicAggregations {
				if .finalStage {
					if .expr.MatchColumn(.Name) {
						// expand the aggregate.aggregations with a final concrete column aggregation.
						 = append(, nil)
						.aggregations = append(.aggregations, Aggregation{
							expr:       logicalplan.Col(.Name),
							dynamic:    true,
							resultName: resultNameWithConcreteColumn(.function, .Name),
							function:   .function,
						})
						.dynamicAggregationsConverted[.Name] = struct{}{}
					}
				} else {
					// If we're aggregating the raw data we need to find the columns by their actual names for now.
					if .expr.MatchColumn(.Name) {
						// expand the aggregate.aggregations with a concrete column aggregation.
						 = append(, nil)
						.aggregations = append(.aggregations, Aggregation{
							expr:       logicalplan.Col(.Name),
							dynamic:    true,
							resultName: .Name, // Don't rename the column yet, we'll do that in the final stage. Dynamic aggregations can't match agains't the pre-computed name.
							function:   .function,
						})
						.dynamicAggregationsConverted[.Name] = struct{}{}
					}
				}
			}
		}

		for ,  := range .aggregations {
			// If we're aggregating at the final stage we have previously
			// renamed the pre-aggregated columns to their result names.
			if .finalStage {
				if .resultName == .Name || (.dynamic && .expr.MatchColumn(.Name)) {
					[] = .Column()
					if .dynamic {
						++
					} else {
						++
					}
				}
			} else {
				// If we're aggregating the raw data we need to find the columns by their actual names for now.
				if .expr.MatchColumn(.Name) {
					[] = .Column()
					if .dynamic {
						++
					} else {
						++
					}
				}
			}
		}
	}

	// It's ok for the same aggregation to be found multiple times, optimizers
	// should remove them but for correctness in the case where they don't we
	// need to handle it, so concrete aggregates are allowed to be different
	// from concrete aggregations.
	if (( == 0 || .concreteAggregations == 0) && (len(.dynamicAggregations) == 0)) ||
		(len(.dynamicAggregations) > 0) &&  == 0 {
		// To perform an aggregation ALL concrete columns must have been matched
		// or at least one dynamic column if performing dynamic aggregations.
		 := make([]string, len(.aggregations))
		for ,  := range .aggregations {
			[] = .expr.String()
		}

		if .finalStage {
			return fmt.Errorf("aggregate field(s) not found %#v, final aggregations are not possible without it (%d concrete aggregation fields found; %d concrete aggregations)", , , .concreteAggregations)
		}
		return fmt.Errorf("aggregate field(s) not found %#v, aggregations are not possible without it (%d concrete aggregation fields found; %d concrete aggregations)", , , .concreteAggregations)
	}

	 := int(.NumRows())

	 := make([][]uint64, len())
	for ,  := range  {
		 := dynparquet.FindHashedColumn([].Name, )
		if  != -1 {
			 := make([]uint64, 0, )
			for ,  := range .Column().(*array.Int64).Int64Values() {
				 = append(, uint64())
			}
			[] = 
		} else {
			[] = dynparquet.HashArray()
		}
	}

	for  := 0;  < ; ++ {
		 := uint64(0)
		for  := range  {
			if [][] == 0 {
				continue
			}

			 = hashCombine(
				,
				[].hashCombine([][]),
			)
		}

		,  := .hashToAggregate[]
		if ! {
			 = .aggregates[len(.aggregates)-1]
			for ,  := range  {
				 := builder.NewBuilder(.pool, .DataType())
				.aggregations[].arrays = append(.aggregations[].arrays, )
			}
			 = hashtuple{
				aggregate: len(.aggregates) - 1, // always add new aggregates to the current aggregate
				array:     len(.aggregations[0].arrays) - 1,
			}
			.hashToAggregate[] = 
			.rowCount++

			// insert new row into columns grouped by and create new aggregate array to append to.
			if  := .updateGroupByCols(, , );  != nil {
				if !errors.Is(, builder.ErrMaxSizeReached) {
					return 
				}

				// Max size reached, rollback the aggregation creation and create new aggregate
				.rowCount--
				for  := range  {
					 := len(.aggregations[].arrays)
					.aggregations[].arrays = .aggregations[].arrays[:-1]
				}

				// Create new aggregation
				 := make([]Aggregation, 0, len(.aggregates[0].aggregations))
				for ,  := range .aggregates[0].aggregations {
					 = append(, Aggregation{
						expr:       .expr,
						resultName: .resultName,
						function:   .function,
					})
				}
				.aggregates = append(.aggregates, &hashAggregate{
					aggregations: ,
					groupByCols:  map[string]builder.ColumnBuilder{},
					colOrdering:  []string{},
				})

				 = .aggregates[len(.aggregates)-1]
				for ,  := range  {
					 := builder.NewBuilder(.pool, .DataType())
					.aggregations[].arrays = append(.aggregations[].arrays, )
				}
				 = hashtuple{
					aggregate: len(.aggregates) - 1, // always add new aggregates to the current aggregate
					array:     len(.aggregations[0].arrays) - 1,
				}
				.hashToAggregate[] = 
				.rowCount++

				if  := .updateGroupByCols(, , );  != nil {
					return 
				}
			}
		}

		for ,  := range  {
			if  == nil {
				// This is a dynamic aggregation that had no match.
				continue
			}
			if .aggregates[.aggregate].aggregations[].arrays == nil {
				// This can happen with dynamic column aggregations without
				// groupings. The group exists, but the array to append to does
				// not.
				 := builder.NewBuilder(.pool, .DataType())
				.aggregations[].arrays = append(.aggregations[].arrays, )
			}
			if  := builder.AppendValue(.aggregates[.aggregate].aggregations[].arrays[.array], , );  != nil {
				return 
			}
		}
	}

	return nil
}

func ( *HashAggregate) ( int,  []arrow.Array,  []arrow.Field) error {
	// aggregate is the current aggregation
	 := .aggregates[len(.aggregates)-1]

	for ,  := range  {
		 := [].Name

		,  := .groupByCols[]
		if ! {
			 = builder.NewBuilder(.pool, [].Type)
			.groupByCols[] = 
			.colOrdering = append(.colOrdering, )
		}

		// We already appended to the arrays to aggregate, so we have
		// to account for that. We only want to back-fill null values
		// up until the index that we are about to insert into.
		for .Len() < len(.aggregations[0].arrays)-1 {
			.AppendNull()
		}

		if  := builder.AppendValue(, , );  != nil {
			// Rollback
			for  := 0;  < ; ++ {
				if  := builder.RollbackPrevious(.groupByCols[[].Name]);  != nil {
					return 
				}
			}

			return 
		}
	}
	return nil
}

func ( *HashAggregate) ( context.Context) error {
	,  := .tracer.Start(, "HashAggregate/Finish")
	.SetAttributes(attribute.Bool("finalStage", .finalStage))
	defer .End()

	 := 0
	for ,  := range .aggregates {
		if  := .finishAggregate(, , );  != nil {
			return 
		}
		 += .rowCount
	}
	.SetAttributes(attribute.Int64("rows", int64()))
	return .next.Finish()
}

func ( *HashAggregate) ( context.Context,  int,  *hashAggregate) error {
	 := len(.groupByCols) + len(.aggregations)
	 := .rowCount

	if  == 0 { // skip empty aggregates
		return nil
	}

	 := make([]arrow.Field, 0, )
	 := make([]arrow.Array, 0, )
	defer func() {
		for ,  := range  {
			if  != nil {
				.Release()
			}
		}
	}()
	for ,  := range .colOrdering {
		if .finalStage && dynparquet.IsHashedColumn() {
			continue
		}
		,  := .groupByCols[]
		if ! {
			return fmt.Errorf("unknown field name: %s", )
		}
		for .Len() <  {
			// It's possible that columns that are grouped by haven't occurred
			// in all aggregated rows which causes them to not be of equal size
			// as the total number of rows so we need to backfill. This happens
			// for example when there are different sets of dynamic columns in
			// different row-groups of the table.
			.AppendNull()
		}
		 := .NewArray()
		 = append(, arrow.Field{Name: , Type: .DataType()})
		 = append(, )
		// Pass forward the hashings of the group-by columns
		if !.finalStage {
			 = append(, arrow.Field{Name: dynparquet.HashedColumnName(), Type: arrow.PrimitiveTypes.Int64})
			func() {
				 := array.NewInt64Builder(.pool)
				defer .Release()
				 := make([]int64, .Len())
				for ,  := range .hashToAggregate {
					if .aggregate ==  { // only append the hash for the current aggregate
						[.array] = int64()
					}
				}
				.AppendValues(, nil)
				 = append(, .NewArray())
			}()
		}
	}

	// Rename to clarity upon appending aggregations later
	 := 

	for ,  := range .aggregations {
		 := make([]arrow.Array, 0, )
		for ,  := range .arrays {
			 = append(, .NewArray())
		}

		,  := runAggregation(.finalStage, .function, .pool, )
		for ,  := range  {
			.Release()
		}
		if  != nil {
			return fmt.Errorf("aggregate batched arrays: %w", )
		}
		 = append(, )

		 = append(, arrow.Field{
			Name: .resultName,
			Type: .DataType(),
		})
	}

	 := array.NewRecord(
		arrow.NewSchema(, nil),
		,
		int64(),
	)
	defer .Release()
	 := .next.Callback(, )
	if  != nil {
		return 
	}

	return nil
}

type AndAggregation struct{}

var ErrUnsupportedAndType = errors.New("unsupported type for is and aggregation, expected bool")

func ( *AndAggregation) ( memory.Allocator,  []arrow.Array) (arrow.Array, error) {
	if len() == 0 {
		return array.NewBooleanBuilder().NewArray(), nil
	}

	 := [0].DataType().ID()
	switch  {
	case arrow.BOOL:
		return AndArrays(, ), nil
	default:
		return nil, fmt.Errorf("and array of %s: %w", , ErrUnsupportedAndType)
	}
}

func ( memory.Allocator,  []arrow.Array) arrow.Array {
	 := array.NewBooleanBuilder()
	defer .Release()

	for ,  := range  {
		if .Len() == 0 {
			.AppendNull()
		}

		 := .(*array.Boolean)

		 := true
		for  := 0;  < .Len(); ++ {
			if .IsValid() {
				 =  && .Value()
			}
		}

		.Append()
	}

	return .NewArray()
}

type UniqueAggregation struct{}

var ErrUnsupportedIsUniqueType = errors.New("unsupported type for is unique aggregation, expected int64")

func ( *UniqueAggregation) ( memory.Allocator,  []arrow.Array) (arrow.Array, error) {
	if len() == 0 {
		return array.NewInt64Builder().NewArray(), nil
	}

	 := [0].DataType().ID()
	switch  {
	case arrow.INT64:
		return uniqueInt64arrays(, ), nil
	default:
		return nil, fmt.Errorf("isUnique array of %s: %w", , ErrUnsupportedIsUniqueType)
	}
}

func uniqueInt64arrays( memory.Allocator,  []arrow.Array) arrow.Array {
	 := array.NewInt64Builder()
	defer .Release()

	for ,  := range  {
		, ,  := int64ArrayHasUniqueValue(.(*array.Int64))
		if ! || ! {
			.AppendNull()
		} else {
			.Append()
		}
	}

	 := .NewArray()
	return 
}

func int64ArrayHasUniqueValue( *array.Int64) (int64, bool, bool) {
	if .Len() == 0 {
		return 0, false, false
	}

	if !.IsValid(0) {
		return 0, false, true
	}

	 := .Value(0)
	for  := 1;  < .Len(); ++ {
		if !.IsValid() {
			return 0, false, true
		}
		if  != .Value() {
			return 0, false, true
		}
	}

	return , true, true
}

type SumAggregation struct{}

var ErrUnsupportedSumType = errors.New("unsupported type for sum aggregation, expected int64 or float64")

func ( *SumAggregation) ( memory.Allocator,  []arrow.Array) (arrow.Array, error) {
	if len() == 0 {
		return array.NewInt64Builder().NewArray(), nil
	}

	 := [0].DataType().ID()
	switch  {
	case arrow.INT64:
		return sumInt64arrays(, ), nil
	case arrow.FLOAT64:
		return sumFloat64arrays(, ), nil
	default:
		return nil, fmt.Errorf("sum array of %s: %w", , ErrUnsupportedSumType)
	}
}

func sumInt64arrays( memory.Allocator,  []arrow.Array) arrow.Array {
	 := array.NewInt64Builder()
	defer .Release()
	for ,  := range  {
		.Append(sumInt64array(.(*array.Int64)))
	}

	return .NewArray()
}

func sumInt64array( *array.Int64) int64 {
	return math.Int64.Sum()
}

func sumFloat64arrays( memory.Allocator,  []arrow.Array) arrow.Array {
	 := array.NewFloat64Builder()
	defer .Release()
	for ,  := range  {
		.Append(sumFloat64array(.(*array.Float64)))
	}

	return .NewArray()
}

func sumFloat64array( *array.Float64) float64 {
	return math.Float64.Sum()
}

var ErrUnsupportedMinType = errors.New("unsupported type for max aggregation, expected int64 or float64")

type MinAggregation struct{}

func ( *MinAggregation) ( memory.Allocator,  []arrow.Array) (arrow.Array, error) {
	if len() == 0 {
		return array.NewInt64Builder().NewArray(), nil
	}

	 := [0].DataType().ID()
	switch  {
	case arrow.INT64:
		return minInt64arrays(, ), nil
	case arrow.FLOAT64:
		return minFloat64arrays(, ), nil
	default:
		return nil, fmt.Errorf("min array of %s: %w", , ErrUnsupportedMinType)
	}
}

func minInt64arrays( memory.Allocator,  []arrow.Array) arrow.Array {
	 := array.NewInt64Builder()
	defer .Release()
	for ,  := range  {
		if .Len() == 0 {
			.AppendNull()
			continue
		}
		.Append(minInt64array(.(*array.Int64)))
	}

	return .NewArray()
}

// minInt64array finds the minimum value in arr. Note that we considered using
// generics for this function, but the runtime doubled in comparison with
// processing a slice of a concrete type.
func minInt64array( *array.Int64) int64 {
	// Note that the zero-length check must be performed before calling this
	// function.
	 := .Int64Values()
	 := [0]
	for ,  := range  {
		if  <  {
			 = 
		}
	}
	return 
}

func minFloat64arrays( memory.Allocator,  []arrow.Array) arrow.Array {
	 := array.NewFloat64Builder()
	defer .Release()
	for ,  := range  {
		if .Len() == 0 {
			.AppendNull()
			continue
		}
		.Append(minFloat64array(.(*array.Float64)))
	}

	return .NewArray()
}

// Same as minInt64array but for Float64.
func minFloat64array( *array.Float64) float64 {
	// Note that the zero-length check must be performed before calling this
	// function.
	 := .Float64Values()
	 := [0]
	for ,  := range  {
		if  <  {
			 = 
		}
	}
	return 
}

type MaxAggregation struct{}

var ErrUnsupportedMaxType = errors.New("unsupported type for max aggregation, expected int64 or float64")

func ( *MaxAggregation) ( memory.Allocator,  []arrow.Array) (arrow.Array, error) {
	if len() == 0 {
		return array.NewInt64Builder().NewArray(), nil
	}

	 := [0].DataType().ID()
	switch  {
	case arrow.INT64:
		return maxInt64arrays(, ), nil
	case arrow.FLOAT64:
		return maxFloat64arrays(, ), nil
	default:
		return nil, fmt.Errorf("max array of %s: %w", , ErrUnsupportedMaxType)
	}
}

func maxInt64arrays( memory.Allocator,  []arrow.Array) arrow.Array {
	 := array.NewInt64Builder()
	defer .Release()
	for ,  := range  {
		if .Len() == 0 {
			.AppendNull()
			continue
		}
		.Append(maxInt64array(.(*array.Int64)))
	}

	return .NewArray()
}

// maxInt64Array finds the maximum value in arr. Note that we considered using
// generics for this function, but the runtime doubled in comparison with
// processing a slice of a concrete type.
func maxInt64array( *array.Int64) int64 {
	// Note that the zero-length check must be performed before calling this
	// function.
	 := .Int64Values()
	 := [0]
	for ,  := range  {
		if  >  {
			 = 
		}
	}
	return 
}

func maxFloat64arrays( memory.Allocator,  []arrow.Array) arrow.Array {
	 := array.NewFloat64Builder()
	defer .Release()
	for ,  := range  {
		if .Len() == 0 {
			.AppendNull()
			continue
		}
		.Append(maxFloat64array(.(*array.Float64)))
	}

	return .NewArray()
}

func maxFloat64array( *array.Float64) float64 {
	// Note that the zero-length check must be performed before calling this
	// function.
	 := .Float64Values()
	 := [0]
	for ,  := range  {
		if  >  {
			 = 
		}
	}
	return 
}

type CountAggregation struct{}

func ( *CountAggregation) ( memory.Allocator,  []arrow.Array) (arrow.Array, error) {
	if len() == 0 {
		return array.NewInt64Builder().NewArray(), nil
	}

	 := array.NewInt64Builder()
	defer .Release()
	for ,  := range  {
		.Append(int64(.Len()))
	}
	return .NewArray(), nil
}

// runAggregation is a helper to run the given aggregation function given
// the set of values. It is aware of the final stage and chooses the aggregation
// function appropriately.
func runAggregation( bool,  logicalplan.AggFunc,  memory.Allocator,  []arrow.Array) (arrow.Array, error) {
	if len() == 0 {
		return array.NewInt64Builder().NewArray(), nil
	}

	,  := chooseAggregationFunction(, [0].DataType())
	if  != nil {
		return nil, 
	}

	if ,  := .(*CountAggregation);  &&  {
		// The final stage of aggregation needs to sum up all the counts of the
		// previous steps, instead of counting the previous counts.
		return (&SumAggregation{}).Aggregate(, )
	}
	return .Aggregate(, )
}

func resultNameWithConcreteColumn( logicalplan.AggFunc,  string) string {
	switch  {
	case logicalplan.AggFuncSum:
		return logicalplan.Sum(logicalplan.Col()).Name()
	case logicalplan.AggFuncMin:
		return logicalplan.Min(logicalplan.Col()).Name()
	case logicalplan.AggFuncMax:
		return logicalplan.Max(logicalplan.Col()).Name()
	case logicalplan.AggFuncCount:
		return logicalplan.Count(logicalplan.Col()).Name()
	case logicalplan.AggFuncAvg:
		return logicalplan.Avg(logicalplan.Col()).Name()
	default:
		return ""
	}
}