package logicalplan

var hashedMatch = "hashed"

type Optimizer interface {
	Optimize(plan *LogicalPlan) *LogicalPlan
}

func () []Optimizer {
	return []Optimizer{
		&PhysicalProjectionPushDown{
			defaultProjections: []Expr{
				Not(DynCol(hashedMatch)),
			},
		},
		&FilterPushDown{},
		&DistinctPushDown{},
		&AggFuncPushDown{},
	}
}

// PhysicalProjectionPushDown finds the first projecting logical plan and
// collects all columns it needs, it is concatenated with all other columns
// used until it, for example a filter layer. Because the tree has the scan
// layer as the inner most layer, the logic actually works by resetting the
// list every time a projecting layer is found.
type PhysicalProjectionPushDown struct {
	defaultProjections []Expr
}

func ( *PhysicalProjectionPushDown) ( *LogicalPlan) *LogicalPlan {
	.optimize(, nil)
	return 
}

func ( *PhysicalProjectionPushDown) ( *LogicalPlan,  []Expr) {
	switch {
	case .SchemaScan != nil:
		.SchemaScan.PhysicalProjection = append(.defaultProjections, ...)
	case .TableScan != nil:
		.TableScan.PhysicalProjection = append(.defaultProjections, ...)
	case .Filter != nil:
		.defaultProjections = []Expr{}
		 = append(, .Filter.Expr.ColumnsUsedExprs()...)
	case .Distinct != nil:
		// distinct is projecting so we need to reset
		 = []Expr{}
		for ,  := range .Distinct.Exprs {
			 = append(, .ColumnsUsedExprs()...)
		}
	case .Projection != nil:
		// projections are is projecting so we need to reset
		 = []Expr{}
		for ,  := range .Projection.Exprs {
			 = append(, .ColumnsUsedExprs()...)
		}
	case .Aggregation != nil:
		// aggregations are projecting so we need to reset
		 = []Expr{}
		for ,  := range .Aggregation.GroupExprs {
			 = append(, .ColumnsUsedExprs()...)
		}
		for ,  := range .Aggregation.AggExprs {
			 = append(, .ColumnsUsedExprs()...)
		}
		.defaultProjections = []Expr{}
		 = append(, DynCol(hashedMatch))
	}

	if .Input != nil {
		.(.Input, )
	}
}

// FilterPushDown optimizer tries to push down the filters of a query down
// to the actual physical table scan. This allows the table provider to make
// smarter decisions about which pieces of data to load in the first place or
// which are definitely not useful to the query at all. It does not guarantee
// that all data will be filtered accordingly, it is just a mechanism to read
// less data from disk. It modifies the plan in place.
type FilterPushDown struct{}

func ( *FilterPushDown) ( *LogicalPlan) *LogicalPlan {
	.optimize(, nil)
	return 
}

func ( *FilterPushDown) ( *LogicalPlan,  []Expr) {
	switch {
	case .SchemaScan != nil:
		if len() > 0 {
			.SchemaScan.Filter = and()
		}
	case .TableScan != nil:
		if len() > 0 {
			.TableScan.Filter = and()
		}
	case .Filter != nil:
		 = append(, .Filter.Expr)
	}

	if .Input != nil {
		.(.Input, )
	}
}

// DistinctPushDown optimizer tries to push down the distinct operator to
// the table provider. There are certain cases of distinct queries where the
// storage engine can make smarter decisions than just returning all the data,
// such as with dictionary encoded columns that are not filtered they can
// return only the dictionary avoiding unnecessary decoding and deduplication
// in downstream distinct operators. It modifies the plan in place.
type DistinctPushDown struct{}

func ( *DistinctPushDown) ( *LogicalPlan) *LogicalPlan {
	.optimize(, nil)
	return 
}

func exprsEqual(,  []Expr) bool {
	if len() != len() {
		return false
	}

	for ,  := range  {
		if !.Equal([]) {
			return false
		}
	}

	return true
}

func ( *DistinctPushDown) ( *LogicalPlan,  []Expr) {
	switch {
	case .TableScan != nil:
		if len() > 0 {
			.TableScan.Distinct = 
		}
	case .Distinct != nil:
		 = append(, .Distinct.Exprs...)
	case .Projection != nil:
		if !exprsEqual(, .Projection.Exprs) {
			// if and only if the distinct columns are identical to the
			// projection columns we can perform the optimization, so we need
			// to reset it in this case.
			 = []Expr{}
		}
	default:
		// reset distinct columns
		 = []Expr{}
	}

	if .Input != nil {
		.(.Input, )
	}
}

// AggFuncPushDown optimizer tries to push down an aggregation function operator
// to the table provider. This can be done in the case of some aggregation
// functions on global aggregations (i.e. no group by) without filters.
// The storage engine can make smarter decisions than just returning all the
// data, such as in the case of max functions, memoizing the max value seen
// so far and only scanning row groups that contain a value greater than the
// memoized value. It modifies the plan in place.
type AggFuncPushDown struct{}

func ( *AggFuncPushDown) ( *LogicalPlan) *LogicalPlan {
	.optimize(, nil)
	return 
}

func ( *AggFuncPushDown) ( *LogicalPlan,  Expr) {
	switch {
	case .TableScan != nil:
		if  != nil {
			.TableScan.Filter = 
		}
	case .Aggregation != nil:
		if len(.Aggregation.GroupExprs) == 0 && len(.Aggregation.AggExprs) == 1 {
			// TODO(asubiotto): Should we make this less specific?
			 = .Aggregation.AggExprs[0]
		}
	default:
		// If we find anything other than a table scan after a global
		// aggregation, bail out by setting the filterExpr to nil.
		 = nil
	}

	if .Input != nil {
		.(.Input, )
	}
}