package logicalplan

import (
	
	
	

	
	

	
	
)

// LogicalPlan is a logical representation of a query. Each LogicalPlan is a
// sub-tree of the query. It is built recursively.
type LogicalPlan struct {
	Input *LogicalPlan

	// Each LogicalPlan struct must only have one of the following.
	SchemaScan  *SchemaScan
	TableScan   *TableScan
	Filter      *Filter
	Distinct    *Distinct
	Projection  *Projection
	Aggregation *Aggregation
	Limit       *Limit
	Sample      *Sample
}

// Callback is a function that is called throughout a chain of operators
// modifying the underlying data.
type Callback func(ctx context.Context, r arrow.Record) error

// IterOptions are a set of options for the TableReader Iterators.
type IterOptions struct {
	PhysicalProjection []Expr
	Projection         []Expr
	Filter             Expr
	DistinctColumns    []Expr
	ReadMode           ReadMode
}

type Option func(opts *IterOptions)

func ( ReadMode) Option {
	return func( *IterOptions) {
		.ReadMode = 
	}
}

func ( ...Expr) Option {
	return func( *IterOptions) {
		.PhysicalProjection = append(.PhysicalProjection, ...)
	}
}

func ( ...Expr) Option {
	return func( *IterOptions) {
		.Projection = append(.Projection, ...)
	}
}

func ( Expr) Option {
	return func( *IterOptions) {
		.Filter = 
	}
}

func ( ...Expr) Option {
	return func( *IterOptions) {
		.DistinctColumns = append(.DistinctColumns, ...)
	}
}

func ( *LogicalPlan) () string {
	return .string(0)
}

func ( *LogicalPlan) ( int) string {
	 := ""
	switch {
	case .SchemaScan != nil:
		 = .SchemaScan.String()
	case .TableScan != nil:
		 = .TableScan.String()
	case .Filter != nil:
		 = .Filter.String()
	case .Projection != nil:
		 = .Projection.String()
	case .Aggregation != nil:
		 = .Aggregation.String()
	case .Distinct != nil:
		 = .Distinct.String()
	default:
		 = "Unknown LogicalPlan"
	}

	 = strings.Repeat("  ", ) + 
	if .Input != nil {
		 += "\n" + .Input.(+1)
	}
	return 
}

func ( *LogicalPlan) ( Expr) (arrow.DataType, error) {
	switch {
	case .SchemaScan != nil:
		,  := .SchemaScan.DataTypeForExpr()
		if  != nil {
			return nil, fmt.Errorf("data type for expr %v within SchemaScan: %w", , )
		}

		return , nil
	case .TableScan != nil:
		,  := .TableScan.DataTypeForExpr()
		if  != nil {
			return nil, fmt.Errorf("data type for expr %v within TableScan: %w", , )
		}

		return , nil
	case .Filter != nil:
		,  := .Input.()
		if  != nil {
			return nil, fmt.Errorf("data type for expr %v within Filter: %w", , )
		}

		return , nil
	case .Projection != nil:
		for ,  := range .Projection.Exprs {
			if .Name() == .Name() {
				return .DataType(.Input)
			}
		}

		,  := .DataType(.Input)
		if  != nil {
			return nil, fmt.Errorf("data type for expr %v within Projection: %w", , )
		}

		return , nil
	case .Aggregation != nil:
		if ,  := .(*AggregationFunction);  {
			if .Func == AggFuncCount {
				return arrow.PrimitiveTypes.Int64, nil
			}

			return .Expr.DataType(.Input)
		}

		,  := .DataType(.Input)
		if  != nil {
			return nil, fmt.Errorf("data type for expr %v within Aggregation: %w", , )
		}

		return , nil
	case .Distinct != nil:
		,  := .DataType(.Input)
		if  != nil {
			return nil, fmt.Errorf("data type for expr %v within Distinct: %w", , )
		}

		return , nil
	case .Sample != nil:
		,  := .DataType(.Input)
		if  != nil {
			return nil, fmt.Errorf("data type for expr %v within Sample: %w", , )
		}

		return , nil
	default:
		return nil, fmt.Errorf("unknown logical plan")
	}
}

// TableReader returns the table reader.
func ( *LogicalPlan) () (TableReader, error) {
	if .TableScan != nil {
		return .TableScan.TableProvider.GetTable(.TableScan.TableName)
	}
	if .SchemaScan != nil {
		return .SchemaScan.TableProvider.GetTable(.SchemaScan.TableName)
	}
	if .Input != nil {
		return .Input.()
	}
	return nil, fmt.Errorf("no table reader provided")
}

// InputSchema returns the schema that the query will execute against.
func ( *LogicalPlan) () *dynparquet.Schema {
	,  := .TableReader()
	if  != nil {
		return nil
	}

	return .Schema()
}

type PlanVisitor interface {
	PreVisit(plan *LogicalPlan) bool
	PostVisit(plan *LogicalPlan) bool
}

func ( *LogicalPlan) ( PlanVisitor) bool {
	 := .PreVisit()
	if ! {
		return false
	}

	if .Input != nil {
		 = .Input.()
		if ! {
			return false
		}
	}

	return .PostVisit()
}

type TableReader interface {
	View(ctx context.Context, fn func(ctx context.Context, tx uint64) error) error
	Iterator(
		ctx context.Context,
		tx uint64,
		pool memory.Allocator,
		callbacks []Callback,
		options ...Option,
	) error
	SchemaIterator(
		ctx context.Context,
		tx uint64,
		pool memory.Allocator,
		callbacks []Callback,
		options ...Option,
	) error
	Schema() *dynparquet.Schema
}
type TableProvider interface {
	GetTable(name string) (TableReader, error)
}

type TableScan struct {
	TableProvider TableProvider
	TableName     string

	// PhysicalProjection describes the columns that are to be physically read
	// by the table scan. This is an Expr so it can be either a column or
	// dynamic column.
	PhysicalProjection []Expr

	// Filter is the predicate that is to be applied by the table scan to rule
	// out any blocks of data to be scanned at all.
	Filter Expr

	// Distinct describes the columns that are to be distinct.
	Distinct []Expr

	// Projection is the list of columns that are to be projected.
	Projection []Expr

	// ReadMode indicates the mode to use when reading.
	ReadMode ReadMode
}

func ( *TableScan) ( Expr) (arrow.DataType, error) {
	,  := .TableProvider.GetTable(.TableName)
	if  != nil {
		return nil, fmt.Errorf("get table %q: %w", .TableName, )
	}

	 := .Schema()
	if  == nil {
		return nil, fmt.Errorf("table %q has no schema", .TableName)
	}

	,  := DataTypeForExprWithSchema(, )
	if  != nil {
		return nil, fmt.Errorf("type for expr %q in table %q: %w", , .TableName, )
	}

	return , nil
}

func ( Expr,  *dynparquet.Schema) (arrow.DataType, error) {
	switch expr := .(type) {
	case *Column:
		,  := .FindDynamicColumnForConcreteColumn(.ColumnName)
		if  {
			,  := convert.ParquetNodeToType(.StorageLayout)
			if  != nil {
				return nil, fmt.Errorf("convert parquet node to type: %w", )
			}

			return , nil
		}

		,  = .FindColumn(.ColumnName)
		if  {
			,  := convert.ParquetNodeToType(.StorageLayout)
			if  != nil {
				return nil, fmt.Errorf("convert parquet node to type: %w", )
			}

			return , nil
		}

		return nil, fmt.Errorf("column %q not found", .ColumnName)
	case *DynamicColumn:
		,  := .FindDynamicColumn(.ColumnName)
		if  {
			,  := convert.ParquetNodeToType(.StorageLayout)
			if  != nil {
				return nil, fmt.Errorf("convert parquet node to type: %w", )
			}

			return , nil
		}

		return nil, fmt.Errorf("dynamic column %q not found", .ColumnName)
	default:
		return nil, fmt.Errorf("unhandled expr type %T", )
	}
}

func ( *TableScan) () string {
	return "TableScan" +
		" Table: " + .TableName +
		" Projection: " + fmt.Sprint(.Projection) +
		" Filter: " + fmt.Sprint(.Filter) +
		" Distinct: " + fmt.Sprint(.Distinct)
}

type ReadMode int

const (
	// ReadModeDefault is the default read mode. Reads from in-memory and object
	// storage.
	ReadModeDefault ReadMode = iota
	// ReadModeInMemoryOnly reads from in-memory storage only.
	ReadModeInMemoryOnly
	// ReadModeDataSourcesOnly reads from data sources only.
	ReadModeDataSourcesOnly
)

type SchemaScan struct {
	TableProvider TableProvider
	TableName     string

	// PhysicalProjection describes the columns that are to be physically read
	// by the table scan.
	PhysicalProjection []Expr

	// filter is the predicate that is to be applied by the table scan to rule
	// out any blocks of data to be scanned at all.
	Filter Expr

	// Distinct describes the columns that are to be distinct.
	Distinct []Expr

	// Projection is the list of columns that are to be projected.
	Projection []Expr

	// ReadMode indicates the mode to use when reading.
	ReadMode ReadMode
}

func ( *SchemaScan) () string {
	return "SchemaScan"
}

func ( *SchemaScan) ( Expr) (arrow.DataType, error) {
	switch expr := .(type) {
	case *Column:
		if .ColumnName == "name" {
			return arrow.BinaryTypes.String, nil
		}

		return nil, fmt.Errorf("unknown column %s", .ColumnName)
	default:
		return nil, fmt.Errorf("unhandled expr %T", )
	}
}

type Filter struct {
	Expr Expr
}

func ( *Filter) () string {
	return "Filter" + " Expr: " + fmt.Sprint(.Expr)
}

type Distinct struct {
	Exprs []Expr
}

func ( *Distinct) () string {
	return "Distinct"
}

type Projection struct {
	Exprs []Expr
}

func ( *Projection) () string {
	return "Projection (" + fmt.Sprint(.Exprs) + ")"
}

type Aggregation struct {
	AggExprs   []*AggregationFunction
	GroupExprs []Expr
}

func ( *Aggregation) () string {
	return "Aggregation " + fmt.Sprint(.AggExprs) + " Group: " + fmt.Sprint(.GroupExprs)
}

type Limit struct {
	Expr Expr
}

func ( *Limit) () string {
	return "Limit" + " Expr: " + fmt.Sprint(.Expr)
}

type Sample struct {
	Expr  Expr
	Limit Expr
}

func ( *Sample) () string {
	return "Sample" + " Expr: " + fmt.Sprint(.Expr)
}