package expr

import (
	
	
	
	
	

	

	
	
)

type PreExprVisitorFunc func(expr logicalplan.Expr) bool

func ( PreExprVisitorFunc) ( logicalplan.Expr) bool {
	return ()
}

func ( PreExprVisitorFunc) ( logicalplan.Expr) bool {
	return false
}

func ( PreExprVisitorFunc) ( logicalplan.Expr) bool {
	return false
}

// Particulate is an abstraction of something that can be filtered.
// A parquet.RowGroup is a particulate that is able to be filtered, and wrapping a parquet.File with
// ParquetFileParticulate allows a file to be filtered.
type Particulate interface {
	Schema() *parquet.Schema
	ColumnChunks() []parquet.ColumnChunk
}

type TrueNegativeFilter interface {
	// Eval should be safe to call concurrently.
	Eval(p Particulate, ignoreMissingCols bool) (bool, error)
}

type AlwaysTrueFilter struct{}

func ( *AlwaysTrueFilter) ( Particulate,  bool) (bool, error) {
	return true, nil
}

func binaryBooleanExpr( *logicalplan.BinaryExpr) (TrueNegativeFilter, error) {
	switch .Op {
	case logicalplan.OpNotEq:
		fallthrough
	case logicalplan.OpLt:
		fallthrough
	case logicalplan.OpLtEq:
		fallthrough
	case logicalplan.OpGt:
		fallthrough
	case logicalplan.OpGtEq:
		fallthrough
	case logicalplan.OpEq: // , logicalplan.OpNotEq, logicalplan.OpLt, logicalplan.OpLtEq, logicalplan.OpGt, logicalplan.OpGtEq, logicalplan.OpRegexMatch, logicalplan.RegexNotMatch:
		var  *ColumnRef
		.Left.Accept(PreExprVisitorFunc(func( logicalplan.Expr) bool {
			switch e := .(type) {
			case *logicalplan.Column:
				 = &ColumnRef{
					ColumnName: .ColumnName,
				}
				return false
			}
			return true
		}))
		if  == nil {
			return nil, errors.New("left side of binary expression must be a column")
		}

		var (
			 parquet.Value
			        error
		)
		.Right.Accept(PreExprVisitorFunc(func( logicalplan.Expr) bool {
			switch e := .(type) {
			case *logicalplan.LiteralExpr:
				,  = pqarrow.ArrowScalarToParquetValue(.Value)
				return false
			}
			return true
		}))

		if  != nil {
			return nil, 
		}

		return &BinaryScalarExpr{
			Left:  ,
			Op:    .Op,
			Right: ,
		}, nil
	case logicalplan.OpAnd:
		,  := BooleanExpr(.Left)
		if  != nil {
			return nil, 
		}

		,  := BooleanExpr(.Right)
		if  != nil {
			return nil, 
		}

		return &AndExpr{
			Left:  ,
			Right: ,
		}, nil
	case logicalplan.OpOr:
		,  := BooleanExpr(.Left)
		if  != nil {
			return nil, 
		}

		,  := BooleanExpr(.Right)
		if  != nil {
			return nil, 
		}

		return &OrExpr{
			Left:  ,
			Right: ,
		}, nil
	default:
		return &AlwaysTrueFilter{}, nil
	}
}

func aggregationExpr( *logicalplan.AggregationFunction) (TrueNegativeFilter, error) {
	switch .Func {
	case logicalplan.AggFuncMax:
		 := &MaxAgg{}
		.Expr.Accept(PreExprVisitorFunc(func( logicalplan.Expr) bool {
			switch e := .(type) {
			case *logicalplan.Column:
				.columnName = .ColumnName
			case *logicalplan.DynamicColumn:
				.columnName = .ColumnName
				.dynamic = true
			default:
				return true
			}
			return false
		}))
		return , nil
	default:
		return &AlwaysTrueFilter{}, nil
	}
}

type MaxAgg struct {
	maxMap sync.Map

	// It would be nicer to use a dynamic-aware ColumnRef here, but that would
	// introduce allocations (slices for indexes and concrete names), so for
	// performance reasons we execute the column lookup manually.
	columnName string
	dynamic    bool
}

func ( *MaxAgg) ( Particulate,  bool) (bool, error) {
	 := false
	for ,  := range .Schema().Fields() {
		if (.dynamic && !strings.HasPrefix(.Name(), .columnName+".")) || (!.dynamic && .Name() != .columnName) {
			continue
		}

		 := .ColumnChunks()[]
		,  := .ColumnIndex()
		if  != nil {
			return false, fmt.Errorf("error retrieving column index in MaxAgg.Eval")
		}
		if NullCount() == .NumValues() {
			// This page is full of nulls. Nothing to do since we can't trust
			// min/max index values. This chunk should not be processed since
			// it can't contribute to min/max unless another column can.
			continue
		}

		,  := .maxMap.LoadOrStore(.Name(), &atomic.Pointer[parquet.Value]{})
		 := .(*atomic.Pointer[parquet.Value])

		 := Max()
		for  := .Load();  == nil || compare(, *) > 0;  = .Load() {
			if .CompareAndSwap(, &) {
				// At least one column exceeded the current max so this chunk
				// satisfies the filter. Note that we do not break out of
				// scanning the rest of the columns since we do want to memoize
				// the max for other columns as well.
				 = true
				break
			}
		}
		if !.dynamic &&  {
			// No need to look at the remaining columns if we're only looking
			// for a single concrete column.
			break
		}
	}
	return , nil
}

type AndExpr struct {
	Left  TrueNegativeFilter
	Right TrueNegativeFilter
}

func ( *AndExpr) ( Particulate,  bool) (bool, error) {
	,  := .Left.Eval(, )
	if  != nil {
		return false, 
	}
	if ! {
		return false, nil
	}

	,  := .Right.Eval(, )
	if  != nil {
		return false, 
	}
	return , nil
}

type OrExpr struct {
	Left  TrueNegativeFilter
	Right TrueNegativeFilter
}

func ( *OrExpr) ( Particulate,  bool) (bool, error) {
	,  := .Left.Eval(, )
	if  != nil {
		return false, 
	}
	if  {
		return true, nil
	}

	,  := .Right.Eval(, )
	if  != nil {
		return false, 
	}

	return , nil
}

func ( logicalplan.Expr) (TrueNegativeFilter, error) {
	if  == nil {
		return &AlwaysTrueFilter{}, nil
	}

	switch e := .(type) {
	case *logicalplan.BinaryExpr:
		return binaryBooleanExpr()
	case *logicalplan.AggregationFunction:
		// NOTE: Aggregations are optimized in the case of no grouping columns
		// or other filters.
		return aggregationExpr()
	default:
		return nil, fmt.Errorf("unsupported boolean expression %T", )
	}
}