package expr
import (
"errors"
"fmt"
"github.com/parquet-go/parquet-go"
"github.com/polarsignals/frostdb/query/logicalplan"
)
type ColumnRef struct {
ColumnName string
}
func (c *ColumnRef ) Column (p Particulate ) (parquet .ColumnChunk , bool , error ) {
columnIndex := findColumnIndex (p .Schema (), c .ColumnName )
var columnChunk parquet .ColumnChunk
if columnIndex != -1 {
columnChunk = p .ColumnChunks ()[columnIndex ]
}
return columnChunk , columnIndex != -1 , nil
}
func findColumnIndex(s *parquet .Schema , columnName string ) int {
for i , field := range s .Fields () {
if field .Name () == columnName {
return i
}
}
return -1
}
type BinaryScalarExpr struct {
Left *ColumnRef
Op logicalplan .Op
Right parquet .Value
}
func (e BinaryScalarExpr ) Eval (p Particulate , ignoreMissingCol bool ) (bool , error ) {
leftData , exists , err := e .Left .Column (p )
if err != nil {
return false , err
}
if !exists && ignoreMissingCol {
return true , nil
}
if !exists {
if e .Right .IsNull () {
switch e .Op {
case logicalplan .OpEq :
return true , nil
case logicalplan .OpNotEq :
return false , nil
}
}
if e .Right .Kind () == parquet .ByteArray || e .Right .Kind () == parquet .FixedLenByteArray {
switch {
case e .Op == logicalplan .OpEq && e .Right .String () == "" :
return true , nil
case e .Op == logicalplan .OpNotEq && e .Right .String () != "" :
return true , nil
}
}
return false , nil
}
return BinaryScalarOperation (leftData , e .Right , e .Op )
}
var ErrUnsupportedBinaryOperation = errors .New ("unsupported binary operation" )
func BinaryScalarOperation (left parquet .ColumnChunk , right parquet .Value , operator logicalplan .Op ) (bool , error ) {
leftColumnIndex , err := left .ColumnIndex ()
if err != nil {
return true , err
}
numNulls := NullCount (leftColumnIndex )
fullOfNulls := numNulls == left .NumValues ()
if operator == logicalplan .OpEq {
if right .IsNull () {
return numNulls > 0 , nil
}
if fullOfNulls {
return false , nil
}
bloomFilter := left .BloomFilter ()
if bloomFilter == nil {
return compare (right , Max (leftColumnIndex )) <= 0 && compare (right , Min (leftColumnIndex )) >= 0 , nil
}
ok , err := bloomFilter .Check (right )
if err != nil {
return true , err
}
if !ok {
return false , nil
}
return true , nil
}
if right .IsNull () {
return true , nil
}
if numNulls == left .NumValues () {
return false , nil
}
switch operator {
case logicalplan .OpLtEq :
minValue := Min (leftColumnIndex )
if minValue .IsNull () {
return true , nil
}
return compare (minValue , right ) <= 0 , nil
case logicalplan .OpLt :
minValue := Min (leftColumnIndex )
if minValue .IsNull () {
return true , nil
}
return compare (minValue , right ) < 0 , nil
case logicalplan .OpGt :
maxValue := Max (leftColumnIndex )
if maxValue .IsNull () {
return true , nil
}
return compare (maxValue , right ) > 0 , nil
case logicalplan .OpGtEq :
maxValue := Max (leftColumnIndex )
if maxValue .IsNull () {
return true , nil
}
return compare (maxValue , right ) >= 0 , nil
default :
return true , nil
}
}
func Min (columnIndex parquet .ColumnIndex ) parquet .Value {
minV := columnIndex .MinValue (0 )
for i := 1 ; i < columnIndex .NumPages (); i ++ {
v := columnIndex .MinValue (i )
if minV .IsNull () {
minV = v
continue
}
if compare (minV , v ) == 1 {
minV = v
}
}
return minV
}
func NullCount (columnIndex parquet .ColumnIndex ) int64 {
numNulls := int64 (0 )
for i := 0 ; i < columnIndex .NumPages (); i ++ {
numNulls += columnIndex .NullCount (i )
}
return numNulls
}
func Max (columnIndex parquet .ColumnIndex ) parquet .Value {
maxValue := columnIndex .MaxValue (0 )
for i := 1 ; i < columnIndex .NumPages (); i ++ {
v := columnIndex .MaxValue (i )
if maxValue .IsNull () {
maxValue = v
continue
}
if compare (maxValue , v ) == -1 {
maxValue = v
}
}
return maxValue
}
func compare(v1 , v2 parquet .Value ) int {
switch v1 .Kind () {
case parquet .Int32 :
return parquet .Int32Type .Compare (v1 , v2 )
case parquet .Int64 :
return parquet .Int64Type .Compare (v1 , v2 )
case parquet .Float :
return parquet .FloatType .Compare (v1 , v2 )
case parquet .Double :
return parquet .DoubleType .Compare (v1 , v2 )
case parquet .ByteArray , parquet .FixedLenByteArray :
return parquet .ByteArrayType .Compare (v1 , v2 )
case parquet .Boolean :
return parquet .BooleanType .Compare (v1 , v2 )
default :
panic (fmt .Sprintf ("unsupported value comparison: %v" , v1 .Kind ()))
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .