package expr

import (
	
	

	

	
)

type ColumnRef struct {
	ColumnName string
}

func ( *ColumnRef) ( Particulate) (parquet.ColumnChunk, bool, error) {
	 := findColumnIndex(.Schema(), .ColumnName)
	var  parquet.ColumnChunk
	// columnChunk can be nil if the column is not present in the row group.
	if  != -1 {
		 = .ColumnChunks()[]
	}

	return ,  != -1, nil
}

func findColumnIndex( *parquet.Schema,  string) int {
	for ,  := range .Fields() {
		if .Name() ==  {
			return 
		}
	}
	return -1
}

type BinaryScalarExpr struct {
	Left  *ColumnRef
	Op    logicalplan.Op
	Right parquet.Value
}

func ( BinaryScalarExpr) ( Particulate,  bool) (bool, error) {
	, ,  := .Left.Column()
	if  != nil {
		return false, 
	}

	if ! &&  {
		return true, nil
	}

	// TODO: This needs a bunch of test cases to validate edge cases like non
	// existant columns or null values. I'm pretty sure this is completely
	// wrong and needs per operation, per type specific behavior.
	if ! {
		if .Right.IsNull() {
			switch .Op {
			case logicalplan.OpEq:
				return true, nil
			case logicalplan.OpNotEq:
				return false, nil
			}
		}

		// only handling string for now.
		if .Right.Kind() == parquet.ByteArray || .Right.Kind() == parquet.FixedLenByteArray {
			switch {
			case .Op == logicalplan.OpEq && .Right.String() == "":
				return true, nil
			case .Op == logicalplan.OpNotEq && .Right.String() != "":
				return true, nil
			}
		}
		return false, nil
	}

	return BinaryScalarOperation(, .Right, .Op)
}

var ErrUnsupportedBinaryOperation = errors.New("unsupported binary operation")

// BinaryScalarOperation applies the given operator between the given column
// chunk and value. If BinaryScalarOperation returns true, it means that the
// operator may be satisfied by at least one value in the column chunk. If it
// returns false, it means that the operator will definitely not be satisfied
// by any value in the column chunk.
func ( parquet.ColumnChunk,  parquet.Value,  logicalplan.Op) (bool, error) {
	,  := .ColumnIndex()
	if  != nil {
		return true, 
	}
	 := NullCount()
	 :=  == .NumValues()
	if  == logicalplan.OpEq {
		if .IsNull() {
			return  > 0, nil
		}
		if  {
			// Right value is not null and there are no non-null values, so
			// there is definitely not a match.
			return false, nil
		}

		 := .BloomFilter()
		if  == nil {
			// If there is no bloom filter then we cannot make a statement about true negative, instead check the min max values of the column chunk
			return compare(, Max()) <= 0 && compare(, Min()) >= 0, nil
		}

		,  := .Check()
		if  != nil {
			return true, 
		}
		if ! {
			// Bloom filters may return false positives, but never return false
			// negatives, we know this column chunk does not contain the value.
			return false, nil
		}

		return true, nil
	}

	// If right is NULL automatically return that the column chunk needs further
	// processing. According to SQL semantics we might be able to elide column
	// chunks in these cases since NULL is not comparable to anything else, but
	// play it safe (delegate to execution engine) for now since we shouldn't
	// have many of these cases.
	if .IsNull() {
		return true, nil
	}

	if  == .NumValues() {
		// In this case min/max values are meaningless and not comparable to the
		// right expression, so we can automatically discard the column chunk.
		return false, nil
	}

	switch  {
	case logicalplan.OpLtEq:
		 := Min()
		if .IsNull() {
			// If min is null, we don't know what the non-null min value is, so
			// we need to let the execution engine scan this column chunk
			// further.
			return true, nil
		}
		return compare(, ) <= 0, nil
	case logicalplan.OpLt:
		 := Min()
		if .IsNull() {
			// If min is null, we don't know what the non-null min value is, so
			// we need to let the execution engine scan this column chunk
			// further.
			return true, nil
		}
		return compare(, ) < 0, nil
	case logicalplan.OpGt:
		 := Max()
		if .IsNull() {
			// If max is null, we don't know what the non-null max value is, so
			// we need to let the execution engine scan this column chunk
			// further.
			return true, nil
		}
		return compare(, ) > 0, nil
	case logicalplan.OpGtEq:
		 := Max()
		if .IsNull() {
			// If max is null, we don't know what the non-null max value is, so
			// we need to let the execution engine scan this column chunk
			// further.
			return true, nil
		}
		return compare(, ) >= 0, nil
	default:
		return true, nil
	}
}

// Min returns the minimum value found in the column chunk across all pages.
func ( parquet.ColumnIndex) parquet.Value {
	 := .MinValue(0)
	for  := 1;  < .NumPages(); ++ {
		 := .MinValue()
		if .IsNull() {
			 = 
			continue
		}

		if compare(, ) == 1 {
			 = 
		}
	}

	return 
}

func ( parquet.ColumnIndex) int64 {
	 := int64(0)
	for  := 0;  < .NumPages(); ++ {
		 += .NullCount()
	}
	return 
}

// Max returns the maximum value found in the column chunk across all pages.
func ( parquet.ColumnIndex) parquet.Value {
	 := .MaxValue(0)
	for  := 1;  < .NumPages(); ++ {
		 := .MaxValue()
		if .IsNull() {
			 = 
			continue
		}

		if compare(, ) == -1 {
			 = 
		}
	}

	return 
}

// compares two parquet values. 0 if they are equal, -1 if v1 < v2, 1 if v1 > v2.
func compare(,  parquet.Value) int {
	switch .Kind() {
	case parquet.Int32:
		return parquet.Int32Type.Compare(, )
	case parquet.Int64:
		return parquet.Int64Type.Compare(, )
	case parquet.Float:
		return parquet.FloatType.Compare(, )
	case parquet.Double:
		return parquet.DoubleType.Compare(, )
	case parquet.ByteArray, parquet.FixedLenByteArray:
		return parquet.ByteArrayType.Compare(, )
	case parquet.Boolean:
		return parquet.BooleanType.Compare(, )
	default:
		panic(fmt.Sprintf("unsupported value comparison: %v", .Kind()))
	}
}