package logicalplan

import (
	
	
	

	
	
	
)

// PlanValidationError is the error representing a logical plan that is not valid.
type PlanValidationError struct {
	message  string
	plan     *LogicalPlan
	children []*ExprValidationError
	input    *PlanValidationError
}

// PlanValidationError.Error prints the error message in a human-readable format.
// implements the error interface.
func ( *PlanValidationError) () string {
	 := make([]string, 0)
	 = append(, .message)
	 = append(, "\n")
	 = append(, .plan.String())

	for ,  := range .children {
		 = append(, "\n  -> invalid expression: ")
		 = append(, .Error())
		 = append(, "\n")
	}

	if .input != nil {
		 = append(, "-> invalid input: ")
		 = append(, .input.())
	}

	return strings.Join(, "")
}

// ExprValidationError is the error for an invalid expression that was found during validation.
type ExprValidationError struct {
	message  string
	expr     Expr
	children []*ExprValidationError
}

// ExprValidationError.Error prints the error message in a human-readable format.
// implements the error interface.
func ( *ExprValidationError) () string {
	 := make([]string, 0)
	 = append(, .message)
	 = append(, ": ")
	 = append(, .expr.String())
	for ,  := range .children {
		 = append(, "\n     -> invalid sub-expression: ")
		 = append(, .())
	}

	return strings.Join(, "")
}

// Validate validates the logical plan.
func ( *LogicalPlan) error {
	 := ValidateSingleFieldSet()
	if  == nil {
		switch {
		case .SchemaScan != nil:
			 = ValidateSchemaScan()
		case .TableScan != nil:
			 = ValidateTableScan()
		case .Filter != nil:
			 = ValidateFilter()
		case .Distinct != nil:
			 = nil
		case .Projection != nil:
			 = nil
		case .Aggregation != nil:
			 = ValidateAggregation()
		}
	}

	// traverse backwards up the plan to validate all inputs
	 := ValidateInput()
	if  != nil {
		if  == nil {
			 = 
		} else {
			.input = 
		}
	}

	if  != nil {
		return 
	}
	return nil
}

// ValidateSingleFieldSet checks that only a single field is set on the plan.
func ( *LogicalPlan) *PlanValidationError {
	 := make([]int, 0)
	if .SchemaScan != nil {
		 = append(, 0)
	}
	if .TableScan != nil {
		 = append(, 1)
	}
	if .Filter != nil {
		 = append(, 2)
	}
	if .Distinct != nil {
		 = append(, 3)
	}
	if .Projection != nil {
		 = append(, 4)
	}
	if .Aggregation != nil {
		 = append(, 5)
	}
	if .Limit != nil {
		 = append(, 6)
	}
	if .Sample != nil {
		 = append(, 7)
	}

	if len() != 1 {
		 := make([]string, 0)
		 := []string{"SchemaScan", "TableScan", "Filter", "Distinct", "Projection", "Aggregation", "Limit", "Sample"}
		for ,  := range  {
			 = append(, [])
		}

		 := make([]string, 0)
		 = append(,
			fmt.Sprintf("invalid number of fields. expected: 1, found: %d (%s). plan must only have one of the following: ",
				len(),
				strings.Join(, ", "),
			),
		)
		 = append(, strings.Join(, ", "))

		return &PlanValidationError{
			plan:    ,
			message: strings.Join(, ""),
		}
	}
	return nil
}

func ( *LogicalPlan) *PlanValidationError {
	if .SchemaScan.TableProvider == nil {
		return &PlanValidationError{
			plan:    ,
			message: "table provider must not be nil",
		}
	}

	if .SchemaScan.TableName == "" {
		return &PlanValidationError{
			plan:    ,
			message: "table name must not be empty",
		}
	}

	,  := .SchemaScan.TableProvider.GetTable(.SchemaScan.TableName)
	if  != nil {
		return &PlanValidationError{
			plan:    ,
			message: fmt.Sprintf("failed to get table: %s", ),
		}
	}
	if  == nil {
		return &PlanValidationError{
			plan:    ,
			message: "table not found",
		}
	}

	 := .Schema()
	if  == nil {
		return &PlanValidationError{
			plan:    ,
			message: "table schema must not be nil",
		}
	}

	return nil
}

func ( *LogicalPlan) *PlanValidationError {
	if .TableScan.TableProvider == nil {
		return &PlanValidationError{
			plan:    ,
			message: "table provider must not be nil",
		}
	}

	if .TableScan.TableName == "" {
		return &PlanValidationError{
			plan:    ,
			message: "table name must not be empty",
		}
	}

	,  := .TableScan.TableProvider.GetTable(.TableScan.TableName)
	if  != nil {
		return &PlanValidationError{
			plan:    ,
			message: fmt.Sprintf("failed to get table: %s", ),
		}
	}
	if  == nil {
		return &PlanValidationError{
			plan:    ,
			message: "table not found",
		}
	}

	 := .Schema()
	if  == nil {
		return &PlanValidationError{
			plan:    ,
			message: "table schema must not be nil",
		}
	}

	return nil
}

// ValidateAggregation validates the logical plan's aggregation step.
func ( *LogicalPlan) *PlanValidationError {
	// check that the expression is not nil
	if len(.Aggregation.AggExprs) == 0 {
		return &PlanValidationError{
			plan:    ,
			message: "invalid aggregation: expression cannot be nil",
		}
	}

	// check that the expression is valid
	 := ValidateAggregationExpr()
	if  != nil {
		return &PlanValidationError{
			plan:     ,
			message:  "invalid aggregation",
			children: []*ExprValidationError{},
		}
	}

	return nil
}

type Named interface {
	Name() string
}

func ( *LogicalPlan) *ExprValidationError {
	for ,  := range .Aggregation.AggExprs {
		,  := .Expr.DataType(.Input)
		if  != nil {
			return &ExprValidationError{
				expr:    .Expr,
				message: fmt.Errorf("get type of expression to aggregate: %w", ).Error(),
			}
		}

		if  == nil {
			return &ExprValidationError{
				expr:    .Expr,
				message: "invalid aggregation: expression type cannot be determined",
			}
		}

		switch .Func {
		case AggFuncSum, AggFuncMin, AggFuncMax, AggFuncCount, AggFuncAvg, AggFuncUnique:
			switch  {
			case
				arrow.PrimitiveTypes.Int64,
				arrow.PrimitiveTypes.Uint64,
				arrow.PrimitiveTypes.Float64:
				// valid
			default:
				return &ExprValidationError{
					expr:    .Expr,
					message: fmt.Errorf("invalid aggregation: expression type %s is not supported", ).Error(),
				}
			}
		case AggFuncAnd:
			if  != arrow.FixedWidthTypes.Boolean {
				return &ExprValidationError{
					expr:    .Expr,
					message: fmt.Errorf("invalid aggregation: and aggregations can only aggregate bool type expressions, not %s", ).Error(),
				}
			}
		}
	}

	return nil
}

// ValidateInput validates that the current logical plans input is valid.
// It returns nil if the plan has no input.
func ( *LogicalPlan) *PlanValidationError {
	if .Input != nil {
		 := Validate(.Input)
		if  != nil {
			,  := .(*PlanValidationError)
			if ! {
				// if we are here it is a bug in the code
				panic(fmt.Sprintf("Unexpected error: %v expected a PlanValidationError", ))
			}
			return 
		}
	}
	return nil
}

// ValidateFilter validates the logical plan's filter step.
func ( *LogicalPlan) *PlanValidationError {
	if  := ValidateFilterExpr(, .Filter.Expr);  != nil {
		return &PlanValidationError{
			message:  "invalid filter",
			plan:     ,
			children: []*ExprValidationError{},
		}
	}
	return nil
}

// ValidateFilterExpr validates filter's expression.
func ( *LogicalPlan,  Expr) *ExprValidationError {
	switch expr := .(type) {
	case *BinaryExpr:
		 := ValidateFilterBinaryExpr(, )
		return 
	}

	return nil
}

// ValidateFilterBinaryExpr validates the filter's binary expression.
func ( *LogicalPlan,  *BinaryExpr) *ExprValidationError {
	if .Op == OpAnd || .Op == OpOr {
		return ValidateFilterAndBinaryExpr(, )
	}

	// try to find the column expression on the left side of the binary expression
	 := newTypeFinder((*Column)(nil))
	.Left.Accept(&)
	if .result == nil {
		return &ExprValidationError{
			message: "left side of binary expression must be a column",
			expr:    ,
		}
	}

	// try to find the column in the schema
	 := .result.(*Column)
	 := .InputSchema()
	if  != nil {
		,  := .ColumnByName(.ColumnName)
		if  {
			// try to find the literal on the other side of the expression
			 := newTypeFinder((*LiteralExpr)(nil))
			.Right.Accept(&)
			if .result != nil {
				// ensure that the column type is compatible with the literal being compared to it
				 := .StorageLayout.Type()
				 := .result.(*LiteralExpr)
				if  := ValidateComparingTypes(.LogicalType(), .Value);  != nil {
					.expr = 
					return 
				}
			}
		}
	}

	return nil
}

// ValidateComparingTypes validates if the types being compared by a binary expression are compatible.
func ( *format.LogicalType,  scalar.Scalar) *ExprValidationError {
	switch {
	// if the columns logical type is nil, it may be of type bool
	case  == nil:
		switch t := .(type) {
		case *scalar.Boolean:
			return nil
		default:
			return &ExprValidationError{
				// TODO: this is probably correct? We should probably rewrite the query to be comparing against a bool I think...
				message: fmt.Sprintf("incompatible types: nil logical type column cannot be compared with %v", ),
			}
		}

	// if the column is a string type, it shouldn't be compared to a number
	case .UTF8 != nil:
		switch .(type) {
		case *scalar.Float64:
			return &ExprValidationError{
				message: "incompatible types: string column cannot be compared with numeric literal",
			}
		case *scalar.Int64:
			return &ExprValidationError{
				message: "incompatible types: string column cannot be compared with numeric literal",
			}
		}
	// if the column is a numeric type, it shouldn't be compared to a string
	case .Integer != nil:
		switch .(type) {
		case *scalar.String:
			return &ExprValidationError{
				message: "incompatible types: numeric column cannot be compared with string literal",
			}
		}
	}
	return nil
}

// ValidateFilterAndBinaryExpr validates the filter's binary expression where Op = AND.
func ( *LogicalPlan,  *BinaryExpr) *ExprValidationError {
	 := ValidateFilterExpr(, .Left)
	 := ValidateFilterExpr(, .Right)

	if  != nil ||  != nil {
		 := make([]string, 0, 3)
		 = append(, "invalid children:")

		 := ExprValidationError{
			expr:     ,
			children: make([]*ExprValidationError, 0),
		}

		if  != nil {
			 := 
			 = append(, "left")
			.children = append(.children, )
		}

		if  != nil {
			 := 
			 = append(, "right")
			.children = append(.children, )
		}

		.message = strings.Join(, " ")
		return &
	}
	return nil
}

// NewTypeFinder returns an instance of the findExpressionForTypeVisitor for the
// passed type. It expects to receive a pointer to the  type it is will find.
func newTypeFinder( interface{}) findExpressionForTypeVisitor {
	return findExpressionForTypeVisitor{exprType: reflect.TypeOf()}
}

// findExpressionForTypeVisitor is an instance of Visitor that will try to find
// an expression of the given type while visiting the expressions.
type findExpressionForTypeVisitor struct {
	exprType reflect.Type
	// if an expression of the type is found, it will be set on this field after
	// visiting. Other-wise this field will be nil
	result Expr
}

func ( *findExpressionForTypeVisitor) ( Expr) bool {
	return true
}

func ( *findExpressionForTypeVisitor) ( Expr) bool {
	return true
}

func ( *findExpressionForTypeVisitor) ( Expr) bool {
	 := .exprType == reflect.TypeOf()
	if  {
		.result = 
	}
	return !
}