package expr
import (
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"github.com/parquet-go/parquet-go"
"github.com/polarsignals/frostdb/pqarrow"
"github.com/polarsignals/frostdb/query/logicalplan"
)
type PreExprVisitorFunc func (expr logicalplan .Expr ) bool
func (f PreExprVisitorFunc ) PreVisit (expr logicalplan .Expr ) bool {
return f (expr )
}
func (f PreExprVisitorFunc ) Visit (_ logicalplan .Expr ) bool {
return false
}
func (f PreExprVisitorFunc ) PostVisit (_ logicalplan .Expr ) bool {
return false
}
type Particulate interface {
Schema () *parquet .Schema
ColumnChunks () []parquet .ColumnChunk
}
type TrueNegativeFilter interface {
Eval (p Particulate , ignoreMissingCols bool ) (bool , error )
}
type AlwaysTrueFilter struct {}
func (f *AlwaysTrueFilter ) Eval (_ Particulate , _ bool ) (bool , error ) {
return true , nil
}
func binaryBooleanExpr(expr *logicalplan .BinaryExpr ) (TrueNegativeFilter , error ) {
switch expr .Op {
case logicalplan .OpNotEq :
fallthrough
case logicalplan .OpLt :
fallthrough
case logicalplan .OpLtEq :
fallthrough
case logicalplan .OpGt :
fallthrough
case logicalplan .OpGtEq :
fallthrough
case logicalplan .OpEq :
var leftColumnRef *ColumnRef
expr .Left .Accept (PreExprVisitorFunc (func (expr logicalplan .Expr ) bool {
switch e := expr .(type ) {
case *logicalplan .Column :
leftColumnRef = &ColumnRef {
ColumnName : e .ColumnName ,
}
return false
}
return true
}))
if leftColumnRef == nil {
return nil , errors .New ("left side of binary expression must be a column" )
}
var (
rightValue parquet .Value
err error
)
expr .Right .Accept (PreExprVisitorFunc (func (expr logicalplan .Expr ) bool {
switch e := expr .(type ) {
case *logicalplan .LiteralExpr :
rightValue , err = pqarrow .ArrowScalarToParquetValue (e .Value )
return false
}
return true
}))
if err != nil {
return nil , err
}
return &BinaryScalarExpr {
Left : leftColumnRef ,
Op : expr .Op ,
Right : rightValue ,
}, nil
case logicalplan .OpAnd :
left , err := BooleanExpr (expr .Left )
if err != nil {
return nil , err
}
right , err := BooleanExpr (expr .Right )
if err != nil {
return nil , err
}
return &AndExpr {
Left : left ,
Right : right ,
}, nil
case logicalplan .OpOr :
left , err := BooleanExpr (expr .Left )
if err != nil {
return nil , err
}
right , err := BooleanExpr (expr .Right )
if err != nil {
return nil , err
}
return &OrExpr {
Left : left ,
Right : right ,
}, nil
default :
return &AlwaysTrueFilter {}, nil
}
}
func aggregationExpr(expr *logicalplan .AggregationFunction ) (TrueNegativeFilter , error ) {
switch expr .Func {
case logicalplan .AggFuncMax :
a := &MaxAgg {}
expr .Expr .Accept (PreExprVisitorFunc (func (expr logicalplan .Expr ) bool {
switch e := expr .(type ) {
case *logicalplan .Column :
a .columnName = e .ColumnName
case *logicalplan .DynamicColumn :
a .columnName = e .ColumnName
a .dynamic = true
default :
return true
}
return false
}))
return a , nil
default :
return &AlwaysTrueFilter {}, nil
}
}
type MaxAgg struct {
maxMap sync .Map
columnName string
dynamic bool
}
func (a *MaxAgg ) Eval (p Particulate , _ bool ) (bool , error ) {
processFurther := false
for i , f := range p .Schema ().Fields () {
if (a .dynamic && !strings .HasPrefix (f .Name (), a .columnName +"." )) || (!a .dynamic && f .Name () != a .columnName ) {
continue
}
chunk := p .ColumnChunks ()[i ]
index , err := chunk .ColumnIndex ()
if err != nil {
return false , fmt .Errorf ("error retrieving column index in MaxAgg.Eval" )
}
if NullCount (index ) == chunk .NumValues () {
continue
}
columnPointer , _ := a .maxMap .LoadOrStore (f .Name (), &atomic .Pointer [parquet .Value ]{})
atomicMax := columnPointer .(*atomic .Pointer [parquet .Value ])
v := Max (index )
for globalMax := atomicMax .Load (); globalMax == nil || compare (v , *globalMax ) > 0 ; globalMax = atomicMax .Load () {
if atomicMax .CompareAndSwap (globalMax , &v ) {
processFurther = true
break
}
}
if !a .dynamic && processFurther {
break
}
}
return processFurther , nil
}
type AndExpr struct {
Left TrueNegativeFilter
Right TrueNegativeFilter
}
func (a *AndExpr ) Eval (p Particulate , ignoreMissingCols bool ) (bool , error ) {
left , err := a .Left .Eval (p , ignoreMissingCols )
if err != nil {
return false , err
}
if !left {
return false , nil
}
right , err := a .Right .Eval (p , ignoreMissingCols )
if err != nil {
return false , err
}
return right , nil
}
type OrExpr struct {
Left TrueNegativeFilter
Right TrueNegativeFilter
}
func (a *OrExpr ) Eval (p Particulate , ignoreMissingCols bool ) (bool , error ) {
left , err := a .Left .Eval (p , ignoreMissingCols )
if err != nil {
return false , err
}
if left {
return true , nil
}
right , err := a .Right .Eval (p , ignoreMissingCols )
if err != nil {
return false , err
}
return right , nil
}
func BooleanExpr (expr logicalplan .Expr ) (TrueNegativeFilter , error ) {
if expr == nil {
return &AlwaysTrueFilter {}, nil
}
switch e := expr .(type ) {
case *logicalplan .BinaryExpr :
return binaryBooleanExpr (e )
case *logicalplan .AggregationFunction :
return aggregationExpr (e )
default :
return nil , fmt .Errorf ("unsupported boolean expression %T" , e )
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .