package physicalplan

import (
	
	
	
	

	
	
	
	
	
	

	
	
	
)

// TODO: Make this smarter.
var concurrencyHardcoded = runtime.GOMAXPROCS(0)

type PhysicalPlan interface {
	Callback(ctx context.Context, r arrow.Record) error
	Finish(ctx context.Context) error
	SetNext(next PhysicalPlan)
	Draw() *Diagram
	Close()
}

type ScanPhysicalPlan interface {
	Execute(ctx context.Context, pool memory.Allocator) error
	Draw() *Diagram
}

type PrePlanVisitorFunc func(plan *logicalplan.LogicalPlan) bool

func ( PrePlanVisitorFunc) ( *logicalplan.LogicalPlan) bool {
	return ()
}

func ( PrePlanVisitorFunc) ( *logicalplan.LogicalPlan) bool {
	return false
}

type PostPlanVisitorFunc func(plan *logicalplan.LogicalPlan) bool

func ( PostPlanVisitorFunc) ( *logicalplan.LogicalPlan) bool {
	return true
}

func ( PostPlanVisitorFunc) ( *logicalplan.LogicalPlan) bool {
	return ()
}

type OutputPlan struct {
	callback func(ctx context.Context, r arrow.Record) error
	scan     ScanPhysicalPlan
}

func ( *OutputPlan) () *Diagram {
	// Doesn't change anything anymore as it's the root of the plan.
	return &Diagram{}
}

func ( *OutputPlan) () string {
	return .scan.Draw().String()
}

func ( *OutputPlan) ( context.Context,  arrow.Record) error {
	return .callback(, )
}

func ( *OutputPlan) ( func( context.Context,  arrow.Record) error) {
	.callback = 
}

func ( *OutputPlan) ( context.Context) error {
	return nil
}

func ( *OutputPlan) () {}

func ( *OutputPlan) ( PhysicalPlan) {
	// OutputPlan should be the last step.
	// If this gets called we're doing something wrong.
	panic("bug in builder! output plan should not have a next plan!")
}

func ( *OutputPlan) ( context.Context,  memory.Allocator,  func( context.Context,  arrow.Record) error) error {
	.callback = 
	return .scan.Execute(, )
}

type TableScan struct {
	tracer  trace.Tracer
	options *logicalplan.TableScan
	plans   []PhysicalPlan
}

func ( *TableScan) () *Diagram {
	 := "TableScan"
	var  *Diagram
	if  := len(.plans);  > 0 {
		 = .plans[0].Draw()
		if  > 1 {
			 += " [concurrent]"
		}
	}
	return &Diagram{Details: , Child: }
}

func ( *TableScan) ( context.Context,  memory.Allocator) error {
	,  := .tracer.Start(, "TableScan/Execute")
	defer .End()

	,  := .options.TableProvider.GetTable(.options.TableName)
	if  == nil ||  != nil {
		return fmt.Errorf("table not found: %w", )
	}

	 := make([]logicalplan.Callback, 0, len(.plans))
	for ,  := range .plans {
		 = append(, .Callback)
	}
	defer func() { // Close all plans to ensure memory cleanup.
		for ,  := range .plans {
			.Close()
		}
	}()

	 := []logicalplan.Option{
		logicalplan.WithPhysicalProjection(.options.PhysicalProjection...),
		logicalplan.WithProjection(.options.Projection...),
		logicalplan.WithFilter(.options.Filter),
		logicalplan.WithDistinctColumns(.options.Distinct...),
		logicalplan.WithReadMode(.options.ReadMode),
	}

	,  := errgroup.WithContext()
	.Go(recovery.Do(func() error {
		return .View(, func( context.Context,  uint64) error {
			return .Iterator(
				,
				,
				,
				,
				...,
			)
		})
	}))
	if  := .Wait();  != nil {
		return 
	}

	, _ = errgroup.WithContext()
	for ,  := range .plans {
		 := 
		.Go(recovery.Do(func() ( error) {
			return .Finish()
		}))
	}

	return .Wait()
}

type SchemaScan struct {
	tracer  trace.Tracer
	options *logicalplan.SchemaScan
	plans   []PhysicalPlan
}

func ( *SchemaScan) () *Diagram {
	 := "SchemaScan"
	var  *Diagram
	if  := len(.plans);  > 0 {
		 = .plans[0].Draw()
		if  > 1 {
			 += " [concurrent]"
		}
	}
	return &Diagram{Details: , Child: }
}

func ( *SchemaScan) ( context.Context,  memory.Allocator) error {
	,  := .options.TableProvider.GetTable(.options.TableName)
	if  == nil ||  != nil {
		return fmt.Errorf("table not found: %w", )
	}

	 := make([]logicalplan.Callback, 0, len(.plans))
	for ,  := range .plans {
		 = append(, .Callback)
	}

	 := []logicalplan.Option{
		logicalplan.WithPhysicalProjection(.options.PhysicalProjection...),
		logicalplan.WithProjection(.options.Projection...),
		logicalplan.WithFilter(.options.Filter),
		logicalplan.WithDistinctColumns(.options.Distinct...),
		logicalplan.WithReadMode(.options.ReadMode),
	}

	,  := errgroup.WithContext()
	.Go(recovery.Do(func() error {
		return .View(, func( context.Context,  uint64) error {
			return .SchemaIterator(
				,
				,
				,
				,
				...,
			)
		})
	}))
	if  := .Wait();  != nil {
		return 
	}

	, _ = errgroup.WithContext()
	for ,  := range .plans {
		 := 
		.Go(recovery.Do(func() error {
			return .Finish()
		}))
	}

	return .Wait()
}

type noopOperator struct {
	next PhysicalPlan
}

func ( *noopOperator) () {
	.next.Close()
}

func ( *noopOperator) ( context.Context,  arrow.Record) error {
	return .next.Callback(, )
}

func ( *noopOperator) ( context.Context) error {
	return .next.Finish()
}

func ( *noopOperator) ( PhysicalPlan) {
	.next = 
}

func ( *noopOperator) () *Diagram {
	if .next == nil {
		return nil
	}
	return .next.Draw()
}

type execOptions struct {
	orderedAggregations bool
	overrideInput       []PhysicalPlan
	readMode            logicalplan.ReadMode
}

type Option func(o *execOptions)

func ( logicalplan.ReadMode) Option {
	return func( *execOptions) {
		.readMode = 
	}
}

func () Option {
	return func( *execOptions) {
		.orderedAggregations = true
	}
}

// WithOverrideInput can be used to provide an input stage on top of which the
// Build function can build the physical plan.
func ( []PhysicalPlan) Option {
	return func( *execOptions) {
		.overrideInput = 
	}
}

func (
	 context.Context,
	 memory.Allocator,
	 trace.Tracer,
	 *dynparquet.Schema,
	 *logicalplan.LogicalPlan,
	 ...Option,
) (*OutputPlan, error) {
	,  := .Start(, "PhysicalPlan/Build")
	defer .End()

	 := execOptions{}
	for ,  := range  {
		(&)
	}
	 := .overrideInput

	 := &OutputPlan{}
	 := &planOrderingInfo{
		state: planOrderingInfoStateInit,
	}
	if  != nil {
		// TODO(asubiotto): There are cases in which the schema can be nil.
		// Eradicate these.
		.sortingCols = .ColumnDefinitionsForSortingColumns()
	}

	var  error
	.Accept(PostPlanVisitorFunc(func( *logicalplan.LogicalPlan) bool {
		.newNode()
		switch {
		case .SchemaScan != nil:
			// Create noop operators since we don't know what to push the scan
			// results to. In a following node visit, these noops will have
			// SetNext called on them and push to the correct operator.
			 := make([]PhysicalPlan, concurrencyHardcoded)
			for  := range  {
				[] = &noopOperator{}
			}
			.SchemaScan.ReadMode = .readMode
			.scan = &SchemaScan{
				tracer:  ,
				options: .SchemaScan,
				plans:   ,
			}
			 = append([:0], ...)
		case .TableScan != nil:
			// Create noop operators since we don't know what to push the scan
			// results to. In a following node visit, these noops will have
			// SetNext called on them and push to the correct operator.
			 := make([]PhysicalPlan, concurrencyHardcoded)
			for  := range  {
				[] = &noopOperator{}
			}
			.TableScan.ReadMode = .readMode
			.scan = &TableScan{
				tracer:  ,
				options: .TableScan,
				plans:   ,
			}
			 = append([:0], ...)
			.nodeMaintainsOrdering()
		case .Projection != nil:
			for ,  := range .Projection.Exprs { // Don't build the projection if it's a wildcard, the projection pushdown optimization will handle it.
				if .Name() == "all" {
					return true
				}
			}
			// For each previous physical plan create one Projection
			for  := range  {
				,  := Project(, , .Projection.Exprs)
				if  != nil {
					 = 
					return false
				}
				[].SetNext()
				[] = 
			}
		case .Distinct != nil:
			var  *Synchronizer
			if len() > 1 {
				// These distinct operators need to be synchronized.
				 = Synchronize(len())
			}
			for  := 0;  < len(); ++ {
				 := Distinct(, , .Distinct.Exprs)
				[].SetNext()
				[] = 
				if  != nil {
					.SetNext()
				}
			}
			if  != nil {
				// Plan a distinct operator to run a distinct on all the
				// synchronized distincts.
				 := Distinct(, , .Distinct.Exprs)
				.SetNext()
				 = [0:1]
				[0] = 
			}
		case .Limit != nil:
			var  *Synchronizer
			if len() > 1 {
				// These limit operators need to be synchronized.
				 = Synchronize(len())
			}
			for  := 0;  < len(); ++ {
				,  := Limit(, , .Limit.Expr)
				if  != nil {
					 = 
					return false
				}
				[].SetNext()
				[] = 
				if  != nil {
					.SetNext()
				}
			}
			if  != nil {
				// Plan a limit operator to run a limit on all the
				// synchronized limits.
				,  := Limit(, , .Limit.Expr)
				if  != nil {
					 = 
					return false
				}
				.SetNext()
				 = [0:1]
				[0] = 
			}
		case .Filter != nil:
			// Create a filter for each previous plan.
			// Can be multiple filters or just a single
			// filter depending on the previous concurrency.
			for  := range  {
				,  := Filter(, , .Filter.Expr)
				if  != nil {
					 = 
					return false
				}
				[].SetNext()
				[] = 
			}
			.applyFilter(.Filter.Expr)
			.nodeMaintainsOrdering()
		case .Aggregation != nil:
			,  := shouldPlanOrderedAggregate(, , .Aggregation)
			if  != nil {
				// TODO(asubiotto): Log the error.
				 = false
			}
			var  PhysicalPlan
			if len() > 1 {
				// These aggregate operators need to be synchronized.
				if  && len(.Aggregation.GroupExprs) > 0 {
					 = NewOrderedSynchronizer(, len(), .Aggregation.GroupExprs)
				} else {
					 = Synchronize(len())
				}
			}
			 := maphash.MakeSeed()
			for  := 0;  < len(); ++ {
				,  := Aggregate(, , .Aggregation,  == nil, , )
				if  != nil {
					 = 
					return false
				}
				[].SetNext()
				[] = 
				if  != nil {
					.SetNext()
				}
			}
			if  != nil {
				// Plan an aggregate operator to run an aggregation on all the
				// aggregations.
				,  := Aggregate(, , .Aggregation, true, , )
				if  != nil {
					 = 
					return false
				}
				.SetNext()
				 = [0:1]
				[0] = 
			}
			if  {
				.nodeMaintainsOrdering()
			}
		case .Sample != nil:
			 := .Sample.Expr.(*logicalplan.LiteralExpr).Value.(*scalar.Int64).Value
			 := .Sample.Limit.(*logicalplan.LiteralExpr).Value.(*scalar.Int64).Value
			 :=  / int64(len())
			 :=  / int64(len())
			 :=  % int64(len())
			for  := range  {
				 := int64(0)
				if  < int() {
					 = 1
				}
				 := NewReservoirSampler(+, , )
				[].SetNext()
				[] = 
			}
		default:
			panic("Unsupported plan")
		}
		return  == nil
	}))
	if  != nil {
		return nil, 
	}

	if .overrideInput == nil {
		.SetAttributes(attribute.String("plan", .scan.Draw().String()))
	}

	// Synchronize the last stage if necessary.
	var  *Synchronizer
	if len() > 1 {
		 = Synchronize(len())
		for  := range  {
			[].SetNext()
		}
		.SetNext()
	} else {
		[0].SetNext()
	}

	return , nil
}

func shouldPlanOrderedAggregate(
	 execOptions,  *planOrderingInfo,  *logicalplan.Aggregation,
) (bool, error) {
	if !.orderedAggregations {
		// Ordered aggregations disabled.
		return false, nil
	}
	if len(.AggExprs) > 1 {
		// More than one aggregation is not yet supported.
		return false, nil
	}
	if !.orderingMaintained() {
		return false, nil
	}
	 := .GroupExprs
	 := .getNonCoveringOrdering()
	for ,  := range  {
		 := .ColumnsUsedExprs()
		if len() > 1 {
			return false, fmt.Errorf("expected only one group column but found %v", )
		}
		if len() == 0 {
			return false, nil
		}
		 := [0]
		 = [1:]
		 := .Name
		if .Dynamic {
			// TODO(asubiotto): Appending a "." is necessary for the MatchColumn
			// call below to work for dynamic columns. Not sure if there's a
			// better way to do this.
			 += "."
		}
		if ![0].MatchColumn() {
			return false, nil
		}
	}
	return true, nil
}

type Diagram struct {
	Details string
	Child   *Diagram
}

func ( *Diagram) () string {
	if .Child == nil {
		return .Details
	}
	 := .Child.()
	if  == "" {
		return .Details
	}
	return .Details + " - " + 
}