package physicalplan

import (
	
	
	
	

	
	
	
	
	
	

	
)

type PredicateFilter struct {
	pool       memory.Allocator
	tracer     trace.Tracer
	filterExpr BooleanExpression
	next       PhysicalPlan
}

func ( *PredicateFilter) () *Diagram {
	var  *Diagram
	if .next != nil {
		 = .next.Draw()
	}
	 := fmt.Sprintf("PredicateFilter (%s)", .filterExpr.String())
	return &Diagram{Details: , Child: }
}

type Bitmap = roaring.Bitmap

func () *Bitmap {
	return roaring.New()
}

type BooleanExpression interface {
	Eval(r arrow.Record) (*Bitmap, error)
	String() string
}

var ErrUnsupportedBooleanExpression = errors.New("unsupported boolean expression")

type ArrayReference struct{}

type PreExprVisitorFunc func(expr logicalplan.Expr) bool

func ( PreExprVisitorFunc) ( logicalplan.Expr) bool {
	return ()
}

func ( PreExprVisitorFunc) ( logicalplan.Expr) bool {
	return false
}

func ( PreExprVisitorFunc) ( logicalplan.Expr) bool {
	return false
}

func binaryBooleanExpr( *logicalplan.BinaryExpr) (BooleanExpression, error) {
	switch .Op {
	case logicalplan.OpEq,
		logicalplan.OpNotEq,
		logicalplan.OpLt,
		logicalplan.OpLtEq,
		logicalplan.OpGt,
		logicalplan.OpGtEq,
		logicalplan.OpRegexMatch,
		logicalplan.OpRegexNotMatch,
		logicalplan.OpAdd,
		logicalplan.OpSub,
		logicalplan.OpMul,
		logicalplan.OpDiv,
		logicalplan.OpContains,
		logicalplan.OpNotContains:
		var  *ArrayRef
		.Left.Accept(PreExprVisitorFunc(func( logicalplan.Expr) bool {
			switch e := .(type) {
			case *logicalplan.Column:
				 = &ArrayRef{
					ColumnName: .ColumnName,
				}
				return false
			}
			return true
		}))
		if  == nil {
			return nil, errors.New("left side of binary expression must be a column")
		}

		var  scalar.Scalar
		.Right.Accept(PreExprVisitorFunc(func( logicalplan.Expr) bool {
			switch e := .(type) {
			case *logicalplan.LiteralExpr:
				 = .Value
				return false
			}
			return true
		}))

		switch .Op {
		case logicalplan.OpRegexMatch:
			,  := regexp.Compile(string(.(*scalar.String).Data()))
			if  != nil {
				return nil, 
			}
			return &RegExpFilter{
				left:  ,
				right: ,
			}, nil
		case logicalplan.OpRegexNotMatch:
			,  := regexp.Compile(string(.(*scalar.String).Data()))
			if  != nil {
				return nil, 
			}
			return &RegExpFilter{
				left:     ,
				right:    ,
				notMatch: true,
			}, nil
		}

		return &BinaryScalarExpr{
			Left:  ,
			Op:    .Op,
			Right: ,
		}, nil
	case logicalplan.OpAnd:
		,  := booleanExpr(.Left)
		if  != nil {
			return nil, fmt.Errorf("left bool expr: %w", )
		}

		,  := booleanExpr(.Right)
		if  != nil {
			return nil, fmt.Errorf("right bool expr: %w", )
		}

		return &AndExpr{
			Left:  ,
			Right: ,
		}, nil
	case logicalplan.OpOr:
		,  := booleanExpr(.Left)
		if  != nil {
			return nil, fmt.Errorf("left bool expr: %w", )
		}

		,  := booleanExpr(.Right)
		if  != nil {
			return nil, fmt.Errorf("right bool expr: %w", )
		}

		return &OrExpr{
			Left:  ,
			Right: ,
		}, nil
	default:
		return nil, fmt.Errorf("binary expr %s: %w", .Op.String(), ErrUnsupportedBooleanExpression)
	}
}

type AndExpr struct {
	Left  BooleanExpression
	Right BooleanExpression
}

func ( *AndExpr) ( arrow.Record) (*Bitmap, error) {
	,  := .Left.Eval()
	if  != nil {
		return nil, 
	}

	if .IsEmpty() {
		return , nil
	}

	,  := .Right.Eval()
	if  != nil {
		return nil, 
	}

	// This stores the result in place to avoid allocations.
	.And()
	return , nil
}

func ( *AndExpr) () string {
	return "(" + .Left.String() + " AND " + .Right.String() + ")"
}

type OrExpr struct {
	Left  BooleanExpression
	Right BooleanExpression
}

func ( *OrExpr) ( arrow.Record) (*Bitmap, error) {
	,  := .Left.Eval()
	if  != nil {
		return nil, 
	}

	,  := .Right.Eval()
	if  != nil {
		return nil, 
	}

	// This stores the result in place to avoid allocations.
	.Or()
	return , nil
}

func ( *OrExpr) () string {
	return "(" + .Left.String() + " OR " + .Right.String() + ")"
}

func booleanExpr( logicalplan.Expr) (BooleanExpression, error) {
	switch e := .(type) {
	case *logicalplan.BinaryExpr:
		return binaryBooleanExpr()
	default:
		return nil, ErrUnsupportedBooleanExpression
	}
}

func ( memory.Allocator,  trace.Tracer,  logicalplan.Expr) (*PredicateFilter, error) {
	,  := booleanExpr()
	if  != nil {
		return nil, fmt.Errorf("create bool expr: %w", )
	}

	return newFilter(, , ), nil
}

func newFilter( memory.Allocator,  trace.Tracer,  BooleanExpression) *PredicateFilter {
	return &PredicateFilter{
		pool:       ,
		tracer:     ,
		filterExpr: ,
	}
}

func ( *PredicateFilter) ( PhysicalPlan) {
	.next = 
}

func ( *PredicateFilter) () {
	.next.Close()
}

func ( *PredicateFilter) ( context.Context,  arrow.Record) error {
	// Generates high volume of spans. Comment out if needed during development.
	// ctx, span := f.tracer.Start(ctx, "PredicateFilter/Callback")
	// defer span.End()

	, ,  := filter(.pool, .filterExpr, )
	if  != nil {
		return 
	}
	if  {
		return nil
	}

	defer .Release()
	return .next.Callback(, )
}

func ( *PredicateFilter) ( context.Context) error {
	return .next.Finish()
}

func filter( memory.Allocator,  BooleanExpression,  arrow.Record) (arrow.Record, bool, error) {
	,  := .Eval()
	if  != nil {
		return nil, true, 
	}

	if .IsEmpty() {
		return nil, true, nil
	}

	 := .ToArray()
	 := buildIndexRanges()

	 := int64(0)
	 := make([]arrow.Record, len())
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()
	for ,  := range  {
		[] = .NewSlice(int64(.Start), int64(.End))
		 += int64(.End - .Start)
	}

	 := make([]arrow.Array, 0, .NumCols())
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()
	 := len()
	for  := range .Columns() {
		 := make([]arrow.Array, 0, )
		for ,  := range  {
			 = append(, .Column())
		}

		,  := array.Concatenate(, )
		if  != nil {
			return nil, true, 
		}

		 = append(, )
	}

	return array.NewRecord(.Schema(), , ), false, nil
}

type IndexRange struct {
	Start uint32
	End   uint32
}

// buildIndexRanges returns a set of continguous index ranges from the given indicies
// ex: [1,2,7,8,9] would return [{Start:1, End:3},{Start:7,End:10}]
func buildIndexRanges( []uint32) []IndexRange {
	 := []IndexRange{}

	 := IndexRange{
		Start: [0],
		End:   [0] + 1,
	}

	for ,  := range [1:] {
		if  == .End {
			.End++
		} else {
			 = append(, )
			 = IndexRange{
				Start: ,
				End:    + 1,
			}
		}
	}

	 = append(, )
	return 
}