package physicalplan
import (
"context"
"errors"
"fmt"
"hash/maphash"
"strings"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/math"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/scalar"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"github.com/polarsignals/frostdb/dynparquet"
"github.com/polarsignals/frostdb/pqarrow/builder"
"github.com/polarsignals/frostdb/query/logicalplan"
)
func Aggregate (
pool memory .Allocator ,
tracer trace .Tracer ,
agg *logicalplan .Aggregation ,
final bool ,
ordered bool ,
seed maphash .Seed ,
) (PhysicalPlan , error ) {
aggregations := make ([]Aggregation , 0 , len (agg .AggExprs ))
for _ , expr := range agg .AggExprs {
aggregation := Aggregation {}
expr .Accept (PreExprVisitorFunc (func (expr logicalplan .Expr ) bool {
if _ , ok := expr .(*logicalplan .DynamicColumn ); ok {
aggregation .dynamic = true
}
return true
}))
aggregation .resultName = expr .Name ()
aggregation .function = expr .Func
aggregation .expr = expr .Expr
aggregations = append (aggregations , aggregation )
}
if ordered {
if len (aggregations ) > 1 {
return nil , fmt .Errorf (
"OrderedAggregate does not support multiple aggregations, found %d" , len (aggregations ),
)
}
return NewOrderedAggregate (
pool ,
tracer ,
aggregations [0 ],
agg .GroupExprs ,
final ,
), nil
}
return NewHashAggregate (
pool ,
tracer ,
aggregations ,
agg .GroupExprs ,
seed ,
final ,
), nil
}
func chooseAggregationFunction(
aggFunc logicalplan .AggFunc ,
_ arrow .DataType ,
) (AggregationFunction , error ) {
switch aggFunc {
case logicalplan .AggFuncSum :
return &SumAggregation {}, nil
case logicalplan .AggFuncMin :
return &MinAggregation {}, nil
case logicalplan .AggFuncMax :
return &MaxAggregation {}, nil
case logicalplan .AggFuncCount :
return &CountAggregation {}, nil
case logicalplan .AggFuncUnique :
return &UniqueAggregation {}, nil
case logicalplan .AggFuncAnd :
return &AndAggregation {}, nil
default :
return nil , fmt .Errorf ("unsupported aggregation function: %s" , aggFunc .String ())
}
}
type Aggregation struct {
expr logicalplan .Expr
dynamic bool
resultName string
function logicalplan .AggFunc
arrays []builder .ColumnBuilder
}
type AggregationFunction interface {
Aggregate (pool memory .Allocator , arrs []arrow .Array ) (arrow .Array , error )
}
type HashAggregate struct {
pool memory .Allocator
tracer trace .Tracer
groupByColumnMatchers []logicalplan .Expr
hashSeed maphash .Seed
next PhysicalPlan
finalStage bool
groupByFields []arrow .Field
groupByFieldHashes []hashCombiner
groupByArrays []arrow .Array
hashToAggregate map [uint64 ]hashtuple
aggregates []*hashAggregate
}
type hashtuple struct {
aggregate int
array int
}
type hashAggregate struct {
dynamicAggregations []Aggregation
dynamicAggregationsConverted map [string ]struct {}
aggregations []Aggregation
concreteAggregations int
groupByCols map [string ]builder .ColumnBuilder
colOrdering []string
rowCount int
}
func NewHashAggregate (
pool memory .Allocator ,
tracer trace .Tracer ,
aggregations []Aggregation ,
groupByColumnMatchers []logicalplan .Expr ,
seed maphash .Seed ,
finalStage bool ,
) *HashAggregate {
dynamic := []Aggregation {}
static := []Aggregation {}
for _ , agg := range aggregations {
if agg .dynamic {
dynamic = append (dynamic , agg )
} else {
static = append (static , agg )
}
}
return &HashAggregate {
pool : pool ,
tracer : tracer ,
groupByColumnMatchers : groupByColumnMatchers ,
hashSeed : seed ,
finalStage : finalStage ,
groupByFields : make ([]arrow .Field , 0 , 10 ),
groupByFieldHashes : make ([]hashCombiner , 0 , 10 ),
groupByArrays : make ([]arrow .Array , 0 , 10 ),
hashToAggregate : map [uint64 ]hashtuple {},
aggregates : []*hashAggregate {
{
dynamicAggregations : dynamic ,
dynamicAggregationsConverted : make (map [string ]struct {}),
aggregations : static ,
concreteAggregations : len (static ),
groupByCols : map [string ]builder .ColumnBuilder {},
colOrdering : []string {},
},
},
}
}
func (a *HashAggregate ) Close () {
for _ , arr := range a .groupByArrays {
arr .Release ()
}
for _ , aggregate := range a .aggregates {
for _ , aggregation := range aggregate .aggregations {
for _ , bldr := range aggregation .arrays {
bldr .Release ()
}
}
for _ , bldr := range aggregate .groupByCols {
bldr .Release ()
}
}
a .next .Close ()
}
func (a *HashAggregate ) SetNext (next PhysicalPlan ) {
a .next = next
}
func (a *HashAggregate ) Draw () *Diagram {
var child *Diagram
if a .next != nil {
child = a .next .Draw ()
}
names := make ([]string , 0 , len (a .aggregates [0 ].aggregations ))
for _ , agg := range a .aggregates [0 ].aggregations {
names = append (names , agg .resultName )
}
var groupings []string
for _ , grouping := range a .groupByColumnMatchers {
groupings = append (groupings , grouping .String ())
}
details := fmt .Sprintf ("HashAggregate (%s by %s)" , strings .Join (names , "," ), strings .Join (groupings , "," ))
return &Diagram {Details : details , Child : child }
}
func hashCombine(lhs , rhs uint64 ) uint64 {
return lhs ^ (rhs + 0x9e3779b9 + (lhs << 6 ) + (lhs >> 2 ))
}
type hashCombiner interface {
hashCombine(rhs uint64 ) uint64
}
type uint64HashCombine struct {
value uint64
}
func (u *uint64HashCombine ) hashCombine (rhs uint64 ) uint64 {
return hashCombine (u .value , rhs )
}
func (a *HashAggregate ) Callback (_ context .Context , r arrow .Record ) error {
aggregate := a .aggregates [len (a .aggregates )-1 ]
fields := r .Schema ().Fields ()
groupByFields := a .groupByFields
groupByFieldHashes := a .groupByFieldHashes
groupByArrays := a .groupByArrays
defer func () {
groupByFields = groupByFields [:0 ]
groupByFieldHashes = groupByFieldHashes [:0 ]
groupByArrays = groupByArrays [:0 ]
}()
columnToAggregate := make ([]arrow .Array , len (aggregate .aggregations ))
concreteAggregateFieldsFound := 0
dynamicAggregateFieldsFound := 0
for i := 0 ; i < r .Schema ().NumFields (); i ++ {
field := r .Schema ().Field (i )
for _ , matcher := range a .groupByColumnMatchers {
if matcher .MatchColumn (field .Name ) {
groupByFields = append (groupByFields , field )
groupByArrays = append (groupByArrays , r .Column (i ))
if a .finalStage {
groupByFieldHashes = append (groupByFieldHashes ,
&uint64HashCombine {value : scalar .Hash (a .hashSeed , scalar .NewStringScalar (field .Name ))},
)
continue
}
groupByFieldHashes = append (groupByFieldHashes ,
&uint64HashCombine {value : scalar .Hash (a .hashSeed , scalar .NewStringScalar (field .Name ))},
)
}
}
if _ , ok := aggregate .dynamicAggregationsConverted [field .Name ]; !ok {
for _ , col := range aggregate .dynamicAggregations {
if a .finalStage {
if col .expr .MatchColumn (field .Name ) {
columnToAggregate = append (columnToAggregate , nil )
aggregate .aggregations = append (aggregate .aggregations , Aggregation {
expr : logicalplan .Col (field .Name ),
dynamic : true ,
resultName : resultNameWithConcreteColumn (col .function , field .Name ),
function : col .function ,
})
aggregate .dynamicAggregationsConverted [field .Name ] = struct {}{}
}
} else {
if col .expr .MatchColumn (field .Name ) {
columnToAggregate = append (columnToAggregate , nil )
aggregate .aggregations = append (aggregate .aggregations , Aggregation {
expr : logicalplan .Col (field .Name ),
dynamic : true ,
resultName : field .Name ,
function : col .function ,
})
aggregate .dynamicAggregationsConverted [field .Name ] = struct {}{}
}
}
}
}
for j , col := range aggregate .aggregations {
if a .finalStage {
if col .resultName == field .Name || (col .dynamic && col .expr .MatchColumn (field .Name )) {
columnToAggregate [j ] = r .Column (i )
if col .dynamic {
dynamicAggregateFieldsFound ++
} else {
concreteAggregateFieldsFound ++
}
}
} else {
if col .expr .MatchColumn (field .Name ) {
columnToAggregate [j ] = r .Column (i )
if col .dynamic {
dynamicAggregateFieldsFound ++
} else {
concreteAggregateFieldsFound ++
}
}
}
}
}
if ((concreteAggregateFieldsFound == 0 || aggregate .concreteAggregations == 0 ) && (len (aggregate .dynamicAggregations ) == 0 )) ||
(len (aggregate .dynamicAggregations ) > 0 ) && dynamicAggregateFieldsFound == 0 {
exprs := make ([]string , len (aggregate .aggregations ))
for i , col := range aggregate .aggregations {
exprs [i ] = col .expr .String ()
}
if a .finalStage {
return fmt .Errorf ("aggregate field(s) not found %#v, final aggregations are not possible without it (%d concrete aggregation fields found; %d concrete aggregations)" , exprs , concreteAggregateFieldsFound , aggregate .concreteAggregations )
}
return fmt .Errorf ("aggregate field(s) not found %#v, aggregations are not possible without it (%d concrete aggregation fields found; %d concrete aggregations)" , exprs , concreteAggregateFieldsFound , aggregate .concreteAggregations )
}
numRows := int (r .NumRows ())
colHashes := make ([][]uint64 , len (groupByArrays ))
for i , arr := range groupByArrays {
col := dynparquet .FindHashedColumn (groupByFields [i ].Name , fields )
if col != -1 {
vals := make ([]uint64 , 0 , numRows )
for _ , v := range r .Column (col ).(*array .Int64 ).Int64Values () {
vals = append (vals , uint64 (v ))
}
colHashes [i ] = vals
} else {
colHashes [i ] = dynparquet .HashArray (arr )
}
}
for i := 0 ; i < numRows ; i ++ {
hash := uint64 (0 )
for j := range colHashes {
if colHashes [j ][i ] == 0 {
continue
}
hash = hashCombine (
hash ,
groupByFieldHashes [j ].hashCombine (colHashes [j ][i ]),
)
}
tuple , ok := a .hashToAggregate [hash ]
if !ok {
aggregate = a .aggregates [len (a .aggregates )-1 ]
for j , col := range columnToAggregate {
agg := builder .NewBuilder (a .pool , col .DataType ())
aggregate .aggregations [j ].arrays = append (aggregate .aggregations [j ].arrays , agg )
}
tuple = hashtuple {
aggregate : len (a .aggregates ) - 1 ,
array : len (aggregate .aggregations [0 ].arrays ) - 1 ,
}
a .hashToAggregate [hash ] = tuple
aggregate .rowCount ++
if err := a .updateGroupByCols (i , groupByArrays , groupByFields ); err != nil {
if !errors .Is (err , builder .ErrMaxSizeReached ) {
return err
}
aggregate .rowCount --
for j := range columnToAggregate {
l := len (aggregate .aggregations [j ].arrays )
aggregate .aggregations [j ].arrays = aggregate .aggregations [j ].arrays [:l -1 ]
}
aggregations := make ([]Aggregation , 0 , len (a .aggregates [0 ].aggregations ))
for _ , agg := range a .aggregates [0 ].aggregations {
aggregations = append (aggregations , Aggregation {
expr : agg .expr ,
resultName : agg .resultName ,
function : agg .function ,
})
}
a .aggregates = append (a .aggregates , &hashAggregate {
aggregations : aggregations ,
groupByCols : map [string ]builder .ColumnBuilder {},
colOrdering : []string {},
})
aggregate = a .aggregates [len (a .aggregates )-1 ]
for j , col := range columnToAggregate {
agg := builder .NewBuilder (a .pool , col .DataType ())
aggregate .aggregations [j ].arrays = append (aggregate .aggregations [j ].arrays , agg )
}
tuple = hashtuple {
aggregate : len (a .aggregates ) - 1 ,
array : len (aggregate .aggregations [0 ].arrays ) - 1 ,
}
a .hashToAggregate [hash ] = tuple
aggregate .rowCount ++
if err := a .updateGroupByCols (i , groupByArrays , groupByFields ); err != nil {
return err
}
}
}
for j , col := range columnToAggregate {
if col == nil {
continue
}
if a .aggregates [tuple .aggregate ].aggregations [j ].arrays == nil {
agg := builder .NewBuilder (a .pool , col .DataType ())
aggregate .aggregations [j ].arrays = append (aggregate .aggregations [j ].arrays , agg )
}
if err := builder .AppendValue (a .aggregates [tuple .aggregate ].aggregations [j ].arrays [tuple .array ], col , i ); err != nil {
return err
}
}
}
return nil
}
func (a *HashAggregate ) updateGroupByCols (row int , groupByArrays []arrow .Array , groupByFields []arrow .Field ) error {
aggregate := a .aggregates [len (a .aggregates )-1 ]
for i , arr := range groupByArrays {
fieldName := groupByFields [i ].Name
groupByCol , found := aggregate .groupByCols [fieldName ]
if !found {
groupByCol = builder .NewBuilder (a .pool , groupByFields [i ].Type )
aggregate .groupByCols [fieldName ] = groupByCol
aggregate .colOrdering = append (aggregate .colOrdering , fieldName )
}
for groupByCol .Len () < len (aggregate .aggregations [0 ].arrays )-1 {
groupByCol .AppendNull ()
}
if err := builder .AppendValue (groupByCol , arr , row ); err != nil {
for j := 0 ; j < i ; j ++ {
if err := builder .RollbackPrevious (aggregate .groupByCols [groupByFields [j ].Name ]); err != nil {
return err
}
}
return err
}
}
return nil
}
func (a *HashAggregate ) Finish (ctx context .Context ) error {
ctx , span := a .tracer .Start (ctx , "HashAggregate/Finish" )
span .SetAttributes (attribute .Bool ("finalStage" , a .finalStage ))
defer span .End ()
totalRows := 0
for i , aggregate := range a .aggregates {
if err := a .finishAggregate (ctx , i , aggregate ); err != nil {
return err
}
totalRows += aggregate .rowCount
}
span .SetAttributes (attribute .Int64 ("rows" , int64 (totalRows )))
return a .next .Finish (ctx )
}
func (a *HashAggregate ) finishAggregate (ctx context .Context , aggIdx int , aggregate *hashAggregate ) error {
numCols := len (aggregate .groupByCols ) + len (aggregate .aggregations )
numRows := aggregate .rowCount
if numRows == 0 {
return nil
}
groupByFields := make ([]arrow .Field , 0 , numCols )
groupByArrays := make ([]arrow .Array , 0 , numCols )
defer func () {
for _ , arr := range groupByArrays {
if arr != nil {
arr .Release ()
}
}
}()
for _ , fieldName := range aggregate .colOrdering {
if a .finalStage && dynparquet .IsHashedColumn (fieldName ) {
continue
}
groupByCol , ok := aggregate .groupByCols [fieldName ]
if !ok {
return fmt .Errorf ("unknown field name: %s" , fieldName )
}
for groupByCol .Len () < numRows {
groupByCol .AppendNull ()
}
arr := groupByCol .NewArray ()
groupByFields = append (groupByFields , arrow .Field {Name : fieldName , Type : arr .DataType ()})
groupByArrays = append (groupByArrays , arr )
if !a .finalStage {
groupByFields = append (groupByFields , arrow .Field {Name : dynparquet .HashedColumnName (fieldName ), Type : arrow .PrimitiveTypes .Int64 })
func () {
bldr := array .NewInt64Builder (a .pool )
defer bldr .Release ()
sortedHashes := make ([]int64 , arr .Len ())
for hash , tuple := range a .hashToAggregate {
if tuple .aggregate == aggIdx {
sortedHashes [tuple .array ] = int64 (hash )
}
}
bldr .AppendValues (sortedHashes , nil )
groupByArrays = append (groupByArrays , bldr .NewArray ())
}()
}
}
aggregateFields := groupByFields
for _ , aggregation := range aggregate .aggregations {
arr := make ([]arrow .Array , 0 , numRows )
for _ , a := range aggregation .arrays {
arr = append (arr , a .NewArray ())
}
aggregateArray , err := runAggregation (a .finalStage , aggregation .function , a .pool , arr )
for _ , a := range arr {
a .Release ()
}
if err != nil {
return fmt .Errorf ("aggregate batched arrays: %w" , err )
}
groupByArrays = append (groupByArrays , aggregateArray )
aggregateFields = append (aggregateFields , arrow .Field {
Name : aggregation .resultName ,
Type : aggregateArray .DataType (),
})
}
r := array .NewRecord (
arrow .NewSchema (aggregateFields , nil ),
groupByArrays ,
int64 (numRows ),
)
defer r .Release ()
err := a .next .Callback (ctx , r )
if err != nil {
return err
}
return nil
}
type AndAggregation struct {}
var ErrUnsupportedAndType = errors .New ("unsupported type for is and aggregation, expected bool" )
func (a *AndAggregation ) Aggregate (pool memory .Allocator , arrs []arrow .Array ) (arrow .Array , error ) {
if len (arrs ) == 0 {
return array .NewBooleanBuilder (pool ).NewArray (), nil
}
typ := arrs [0 ].DataType ().ID ()
switch typ {
case arrow .BOOL :
return AndArrays (pool , arrs ), nil
default :
return nil , fmt .Errorf ("and array of %s: %w" , typ , ErrUnsupportedAndType )
}
}
func AndArrays (pool memory .Allocator , arrs []arrow .Array ) arrow .Array {
b := array .NewBooleanBuilder (pool )
defer b .Release ()
for _ , arr := range arrs {
if arr .Len () == 0 {
b .AppendNull ()
}
arr := arr .(*array .Boolean )
val := true
for i := 0 ; i < arr .Len (); i ++ {
if arr .IsValid (i ) {
val = val && arr .Value (i )
}
}
b .Append (val )
}
return b .NewArray ()
}
type UniqueAggregation struct {}
var ErrUnsupportedIsUniqueType = errors .New ("unsupported type for is unique aggregation, expected int64" )
func (a *UniqueAggregation ) Aggregate (pool memory .Allocator , arrs []arrow .Array ) (arrow .Array , error ) {
if len (arrs ) == 0 {
return array .NewInt64Builder (pool ).NewArray (), nil
}
typ := arrs [0 ].DataType ().ID ()
switch typ {
case arrow .INT64 :
return uniqueInt64arrays (pool , arrs ), nil
default :
return nil , fmt .Errorf ("isUnique array of %s: %w" , typ , ErrUnsupportedIsUniqueType )
}
}
func uniqueInt64arrays(pool memory .Allocator , arrs []arrow .Array ) arrow .Array {
res := array .NewInt64Builder (pool )
defer res .Release ()
for _ , arr := range arrs {
uniqueVal , isUnique , hasValues := int64ArrayHasUniqueValue (arr .(*array .Int64 ))
if !hasValues || !isUnique {
res .AppendNull ()
} else {
res .Append (uniqueVal )
}
}
arr := res .NewArray ()
return arr
}
func int64ArrayHasUniqueValue(arr *array .Int64 ) (int64 , bool , bool ) {
if arr .Len () == 0 {
return 0 , false , false
}
if !arr .IsValid (0 ) {
return 0 , false , true
}
val := arr .Value (0 )
for i := 1 ; i < arr .Len (); i ++ {
if !arr .IsValid (i ) {
return 0 , false , true
}
if val != arr .Value (i ) {
return 0 , false , true
}
}
return val , true , true
}
type SumAggregation struct {}
var ErrUnsupportedSumType = errors .New ("unsupported type for sum aggregation, expected int64 or float64" )
func (a *SumAggregation ) Aggregate (pool memory .Allocator , arrs []arrow .Array ) (arrow .Array , error ) {
if len (arrs ) == 0 {
return array .NewInt64Builder (pool ).NewArray (), nil
}
typ := arrs [0 ].DataType ().ID ()
switch typ {
case arrow .INT64 :
return sumInt64arrays (pool , arrs ), nil
case arrow .FLOAT64 :
return sumFloat64arrays (pool , arrs ), nil
default :
return nil , fmt .Errorf ("sum array of %s: %w" , typ , ErrUnsupportedSumType )
}
}
func sumInt64arrays(pool memory .Allocator , arrs []arrow .Array ) arrow .Array {
res := array .NewInt64Builder (pool )
defer res .Release ()
for _ , arr := range arrs {
res .Append (sumInt64array (arr .(*array .Int64 )))
}
return res .NewArray ()
}
func sumInt64array(arr *array .Int64 ) int64 {
return math .Int64 .Sum (arr )
}
func sumFloat64arrays(pool memory .Allocator , arrs []arrow .Array ) arrow .Array {
res := array .NewFloat64Builder (pool )
defer res .Release ()
for _ , arr := range arrs {
res .Append (sumFloat64array (arr .(*array .Float64 )))
}
return res .NewArray ()
}
func sumFloat64array(arr *array .Float64 ) float64 {
return math .Float64 .Sum (arr )
}
var ErrUnsupportedMinType = errors .New ("unsupported type for max aggregation, expected int64 or float64" )
type MinAggregation struct {}
func (a *MinAggregation ) Aggregate (pool memory .Allocator , arrs []arrow .Array ) (arrow .Array , error ) {
if len (arrs ) == 0 {
return array .NewInt64Builder (pool ).NewArray (), nil
}
typ := arrs [0 ].DataType ().ID ()
switch typ {
case arrow .INT64 :
return minInt64arrays (pool , arrs ), nil
case arrow .FLOAT64 :
return minFloat64arrays (pool , arrs ), nil
default :
return nil , fmt .Errorf ("min array of %s: %w" , typ , ErrUnsupportedMinType )
}
}
func minInt64arrays(pool memory .Allocator , arrs []arrow .Array ) arrow .Array {
res := array .NewInt64Builder (pool )
defer res .Release ()
for _ , arr := range arrs {
if arr .Len () == 0 {
res .AppendNull ()
continue
}
res .Append (minInt64array (arr .(*array .Int64 )))
}
return res .NewArray ()
}
func minInt64array(arr *array .Int64 ) int64 {
vals := arr .Int64Values ()
minV := vals [0 ]
for _ , v := range vals {
if v < minV {
minV = v
}
}
return minV
}
func minFloat64arrays(pool memory .Allocator , arrs []arrow .Array ) arrow .Array {
res := array .NewFloat64Builder (pool )
defer res .Release ()
for _ , arr := range arrs {
if arr .Len () == 0 {
res .AppendNull ()
continue
}
res .Append (minFloat64array (arr .(*array .Float64 )))
}
return res .NewArray ()
}
func minFloat64array(arr *array .Float64 ) float64 {
vals := arr .Float64Values ()
minV := vals [0 ]
for _ , v := range vals {
if v < minV {
minV = v
}
}
return minV
}
type MaxAggregation struct {}
var ErrUnsupportedMaxType = errors .New ("unsupported type for max aggregation, expected int64 or float64" )
func (a *MaxAggregation ) Aggregate (pool memory .Allocator , arrs []arrow .Array ) (arrow .Array , error ) {
if len (arrs ) == 0 {
return array .NewInt64Builder (pool ).NewArray (), nil
}
typ := arrs [0 ].DataType ().ID ()
switch typ {
case arrow .INT64 :
return maxInt64arrays (pool , arrs ), nil
case arrow .FLOAT64 :
return maxFloat64arrays (pool , arrs ), nil
default :
return nil , fmt .Errorf ("max array of %s: %w" , typ , ErrUnsupportedMaxType )
}
}
func maxInt64arrays(pool memory .Allocator , arrs []arrow .Array ) arrow .Array {
res := array .NewInt64Builder (pool )
defer res .Release ()
for _ , arr := range arrs {
if arr .Len () == 0 {
res .AppendNull ()
continue
}
res .Append (maxInt64array (arr .(*array .Int64 )))
}
return res .NewArray ()
}
func maxInt64array(arr *array .Int64 ) int64 {
vals := arr .Int64Values ()
maxV := vals [0 ]
for _ , v := range vals {
if v > maxV {
maxV = v
}
}
return maxV
}
func maxFloat64arrays(pool memory .Allocator , arrs []arrow .Array ) arrow .Array {
res := array .NewFloat64Builder (pool )
defer res .Release ()
for _ , arr := range arrs {
if arr .Len () == 0 {
res .AppendNull ()
continue
}
res .Append (maxFloat64array (arr .(*array .Float64 )))
}
return res .NewArray ()
}
func maxFloat64array(arr *array .Float64 ) float64 {
vals := arr .Float64Values ()
maxV := vals [0 ]
for _ , v := range vals {
if v > maxV {
maxV = v
}
}
return maxV
}
type CountAggregation struct {}
func (a *CountAggregation ) Aggregate (pool memory .Allocator , arrs []arrow .Array ) (arrow .Array , error ) {
if len (arrs ) == 0 {
return array .NewInt64Builder (pool ).NewArray (), nil
}
res := array .NewInt64Builder (pool )
defer res .Release ()
for _ , arr := range arrs {
res .Append (int64 (arr .Len ()))
}
return res .NewArray (), nil
}
func runAggregation(finalStage bool , fn logicalplan .AggFunc , pool memory .Allocator , arrs []arrow .Array ) (arrow .Array , error ) {
if len (arrs ) == 0 {
return array .NewInt64Builder (pool ).NewArray (), nil
}
aggFunc , err := chooseAggregationFunction (fn , arrs [0 ].DataType ())
if err != nil {
return nil , err
}
if _ , ok := aggFunc .(*CountAggregation ); ok && finalStage {
return (&SumAggregation {}).Aggregate (pool , arrs )
}
return aggFunc .Aggregate (pool , arrs )
}
func resultNameWithConcreteColumn(function logicalplan .AggFunc , col string ) string {
switch function {
case logicalplan .AggFuncSum :
return logicalplan .Sum (logicalplan .Col (col )).Name ()
case logicalplan .AggFuncMin :
return logicalplan .Min (logicalplan .Col (col )).Name ()
case logicalplan .AggFuncMax :
return logicalplan .Max (logicalplan .Col (col )).Name ()
case logicalplan .AggFuncCount :
return logicalplan .Count (logicalplan .Col (col )).Name ()
case logicalplan .AggFuncAvg :
return logicalplan .Avg (logicalplan .Col (col )).Name ()
default :
return ""
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .