package physicalplan

import (
	
	
	

	
	
	
	
	

	
)

type columnProjection interface {
	// Name returns the name of the column that this projection will return.
	Name() string
	// String returns the string representation as it may have been used to create the projection.
	String() string
	// Projects one or more columns. Each element in the field list corresponds to an element in the list of arrays.
	Project(mem memory.Allocator, ar arrow.Record) ([]arrow.Field, []arrow.Array, error)
}

type aliasProjection struct {
	p    columnProjection
	expr *logicalplan.AliasExpr
	name string
}

func ( aliasProjection) () string {
	return .name
}

func ( aliasProjection) () string {
	return .expr.String()
}

func ( aliasProjection) ( memory.Allocator,  arrow.Record) ([]arrow.Field, []arrow.Array, error) {
	, ,  := .p.Project(, )
	if  != nil {
		return nil, nil, 
	}

	if len() == 0 || len() != len() {
		return nil, nil, fmt.Errorf("invalid projection for alias: fields and arrays must be non-empty and of equal length")
	}

	return []arrow.Field{{
		Name:     .name,
		Type:     [0].Type,
		Nullable: [0].Nullable,
		Metadata: [0].Metadata,
	}}, , nil
}

type binaryExprProjection struct {
	expr *logicalplan.BinaryExpr

	left  columnProjection
	right columnProjection
}

func ( binaryExprProjection) () string {
	return .expr.String()
}

func ( binaryExprProjection) () string {
	return .expr.String()
}

func ( binaryExprProjection) ( memory.Allocator,  arrow.Record) ([]arrow.Field, []arrow.Array, error) {
	, ,  := .left.Project(, )
	if  != nil {
		return nil, nil, fmt.Errorf("project left side of binary expression: %w", )
	}
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()

	, ,  := .right.Project(, )
	if  != nil {
		return nil, nil, fmt.Errorf("project right side of binary expression: %w", )
	}
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()

	if len() != 1 || len() != 1 {
		return nil, nil, fmt.Errorf("binary expression projection expected one field and one array for each side, got %d fields and %d arrays on left", len(), len())
	}

	if len() != 1 || len() != 1 {
		return nil, nil, fmt.Errorf("binary expression projection expected one field and one array for each side, got %d fields and %d arrays on right", len(), len())
	}

	 := [0]
	 := [0]

	 := []arrow.Field{{
		Name:     .expr.Name(),
		Type:     [0].Type,
		Nullable: [0].Nullable,
		Metadata: [0].Metadata,
	}}
	switch leftArray := .(type) {
	case *array.Int64:
		switch .expr.Op {
		case logicalplan.OpAdd:
			return , []arrow.Array{AddInt64s(, , .(*array.Int64))}, nil
		case logicalplan.OpSub:
			return , []arrow.Array{SubInt64s(, , .(*array.Int64))}, nil
		case logicalplan.OpMul:
			return , []arrow.Array{MulInt64s(, , .(*array.Int64))}, nil
		case logicalplan.OpDiv:
			return , []arrow.Array{DivInt64s(, , .(*array.Int64))}, nil
		default:
			return nil, nil, fmt.Errorf("unsupported binary expression: %s", .expr.String())
		}
	case *array.Int32:
		switch .expr.Op {
		case logicalplan.OpAdd:
			return , []arrow.Array{AddInt32s(, , .(*array.Int32))}, nil
		case logicalplan.OpSub:
			return , []arrow.Array{SubInt32s(, , .(*array.Int32))}, nil
		case logicalplan.OpMul:
			return , []arrow.Array{MulInt32s(, , .(*array.Int32))}, nil
		case logicalplan.OpDiv:
			return , []arrow.Array{DivInt32s(, , .(*array.Int32))}, nil
		default:
			return nil, nil, fmt.Errorf("unsupported binary expression: %s", .expr.String())
		}
	case *array.Uint64:
		switch .expr.Op {
		case logicalplan.OpAdd:
			return , []arrow.Array{AddUint64s(, , .(*array.Uint64))}, nil
		case logicalplan.OpSub:
			return , []arrow.Array{SubUint64s(, , .(*array.Uint64))}, nil
		case logicalplan.OpMul:
			return , []arrow.Array{MulUint64s(, , .(*array.Uint64))}, nil
		case logicalplan.OpDiv:
			return , []arrow.Array{DivUint64s(, , .(*array.Uint64))}, nil
		default:
			return nil, nil, fmt.Errorf("unsupported binary expression: %s", .expr.String())
		}
	case *array.Float64:
		switch .expr.Op {
		case logicalplan.OpAdd:
			return , []arrow.Array{AddFloat64s(, , .(*array.Float64))}, nil
		case logicalplan.OpSub:
			return , []arrow.Array{SubFloat64s(, , .(*array.Float64))}, nil
		case logicalplan.OpMul:
			return , []arrow.Array{MulFloat64s(, , .(*array.Float64))}, nil
		case logicalplan.OpDiv:
			return , []arrow.Array{DivFloat64s(, , .(*array.Float64))}, nil
		default:
			return nil, nil, fmt.Errorf("unsupported binary expression: %s", .expr.String())
		}
	default:
		return nil, nil, fmt.Errorf("unsupported type: %T", )
	}
}

func ( memory.Allocator, ,  *array.Int64) *array.Int64 {
	 := array.NewInt64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		.Append(.Value() + .Value())
	}

	return .NewInt64Array()
}

func ( memory.Allocator, ,  *array.Int64) *array.Int64 {
	 := array.NewInt64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		.Append(.Value() - .Value())
	}

	return .NewInt64Array()
}

func ( memory.Allocator, ,  *array.Int64) *array.Int64 {
	 := array.NewInt64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		.Append(.Value() * .Value())
	}

	return .NewInt64Array()
}

func ( memory.Allocator, ,  *array.Int64) *array.Int64 {
	 := array.NewInt64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		 := .Value()
		if .Value() == 0 {
			.AppendNull()
		} else {
			.Append(.Value() / )
		}
	}

	return .NewInt64Array()
}

func ( memory.Allocator, ,  *array.Float64) *array.Float64 {
	 := array.NewFloat64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		.Append(.Value() + .Value())
	}

	return .NewFloat64Array()
}

func ( memory.Allocator, ,  *array.Float64) *array.Float64 {
	 := array.NewFloat64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		.Append(.Value() - .Value())
	}

	return .NewFloat64Array()
}

func ( memory.Allocator, ,  *array.Float64) *array.Float64 {
	 := array.NewFloat64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		.Append(.Value() * .Value())
	}

	return .NewFloat64Array()
}

func ( memory.Allocator, ,  *array.Float64) *array.Float64 {
	 := array.NewFloat64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		 := .Value()
		if .Value() == 0 {
			.AppendNull()
		} else {
			.Append(.Value() / )
		}
	}

	return .NewFloat64Array()
}

func ( memory.Allocator, ,  *array.Int32) *array.Int32 {
	 := array.NewInt32Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		.Append(.Value() + .Value())
	}

	return .NewInt32Array()
}

func ( memory.Allocator, ,  *array.Int32) *array.Int32 {
	 := array.NewInt32Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		.Append(.Value() - .Value())
	}

	return .NewInt32Array()
}

func ( memory.Allocator, ,  *array.Int32) *array.Int32 {
	 := array.NewInt32Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		.Append(.Value() * .Value())
	}

	return .NewInt32Array()
}

func ( memory.Allocator, ,  *array.Int32) *array.Int32 {
	 := array.NewInt32Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		 := .Value()
		if .Value() == 0 {
			.AppendNull()
		} else {
			.Append(.Value() / )
		}
	}

	return .NewInt32Array()
}

func ( memory.Allocator, ,  *array.Uint64) *array.Uint64 {
	 := array.NewUint64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		.Append(.Value() + .Value())
	}

	return .NewUint64Array()
}

func ( memory.Allocator, ,  *array.Uint64) *array.Uint64 {
	 := array.NewUint64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		.Append(.Value() - .Value())
	}

	return .NewUint64Array()
}

func ( memory.Allocator, ,  *array.Uint64) *array.Uint64 {
	 := array.NewUint64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		.Append(.Value() * .Value())
	}

	return .NewUint64Array()
}

func ( memory.Allocator, ,  *array.Uint64) *array.Uint64 {
	 := array.NewUint64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		 := .Value()
		if .Value() == 0 {
			.AppendNull()
		} else {
			.Append(.Value() / )
		}
	}

	return .NewUint64Array()
}

type boolExprProjection struct {
	boolExpr BooleanExpression
}

func ( boolExprProjection) () string {
	return .boolExpr.String()
}

func ( boolExprProjection) () string {
	return .boolExpr.String()
}

func ( boolExprProjection) ( memory.Allocator,  arrow.Record) ([]arrow.Field, []arrow.Array, error) {
	var  arrow.Array
	if .Schema().HasField(.boolExpr.String()) {
		 := .Column(.Schema().FieldIndices(.boolExpr.String())[0])
		if .NullN() == 0 {
			// This means we have fully pre-computed the result of the
			// expression in the table scan already.
			.Retain()
			return []arrow.Field{
				{
					Name: .boolExpr.String(),
					Type: &arrow.BooleanType{},
				},
			}, []arrow.Array{}, nil
		} else if .NullN() < .Len() {
			// If we got here, expression results were partially computed at
			// the table scan layer. We fall back to evaluating the expression
			// and overwriting with any results found in this array (i.e.
			// non-null entries). Note that if zero results were pre-computed
			// (arr.NullN == arr.Len()), we'll just fully fall back to
			// expression evaluation.
			 = 
		}
	}

	,  := .boolExpr.Eval()
	if  != nil {
		return nil, nil, 
	}

	 := make([]bool, .NumRows())
	 := array.NewBooleanBuilder()
	defer .Release()

	// We can do this because we now the values in the array are between 0 and
	// NumRows()-1
	for ,  := range .ToArray() {
		[int()] = true
	}

	if  != nil {
		 := .(*array.Boolean)
		for  := 0;  < .Len(); ++ {
			if !.IsNull() {
				// Non-null result, this is the precomputed expression result.
				[] = .Value()
			}
		}
	}

	.AppendValues(, nil)

	return []arrow.Field{
		{
			Name: .boolExpr.String(),
			Type: &arrow.BooleanType{},
		},
	}, []arrow.Array{.NewArray()}, nil
}

type plainProjection struct {
	expr *logicalplan.Column
}

func ( plainProjection) () string {
	return .expr.ColumnName
}

func ( plainProjection) () string {
	return .expr.ColumnName
}

func ( plainProjection) ( memory.Allocator,  arrow.Record) ([]arrow.Field, []arrow.Array, error) {
	for  := 0;  < .Schema().NumFields(); ++ {
		 := .Schema().Field()
		if .expr.MatchColumn(.Name) {
			.Column().Retain() // Retain the column since we're keeping it.
			return []arrow.Field{}, []arrow.Array{.Column()}, nil
		}
	}

	return nil, nil, nil
}

type convertProjection struct {
	p    columnProjection
	expr *logicalplan.ConvertExpr
}

func ( convertProjection) () string {
	return .expr.Name()
}

func ( convertProjection) () string {
	return .expr.Name()
}

func ( convertProjection) ( memory.Allocator,  arrow.Array) (arrow.Array, error) {
	defer .Release()

	switch c := .(type) {
	case *array.Int64:
		switch .expr.Type {
		case arrow.PrimitiveTypes.Float64:
			return convertInt64ToFloat64(, ), nil
		default:
			return nil, fmt.Errorf("unsupported conversion from %s to %s", .DataType(), .expr.Type)
		}
	default:
		return nil, fmt.Errorf("unsupported conversion from %s to %s", .DataType(), .expr.Type)
	}
}

func convertInt64ToFloat64( memory.Allocator,  *array.Int64) *array.Float64 {
	 := array.NewFloat64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		.Append(float64(.Value()))
	}

	return .NewFloat64Array()
}

func ( convertProjection) ( memory.Allocator,  arrow.Record) ([]arrow.Field, []arrow.Array, error) {
	, ,  := .p.Project(, )
	if  != nil {
		return nil, nil, 
	}

	if len() == 0 || len() != len() {
		return nil, nil, fmt.Errorf("invalid projection for convert: fields and arrays must be non-empty and of equal length")
	}

	,  := .convert(, [0])
	if  != nil {
		return nil, nil, 
	}

	return []arrow.Field{{
		Name:     .expr.Name(),
		Type:     .expr.Type,
		Nullable: [0].Nullable,
		Metadata: [0].Metadata,
	}}, []arrow.Array{}, nil
}

type isNullProjection struct {
	expr *logicalplan.IsNullExpr
	p    columnProjection
}

func ( isNullProjection) () string {
	return .expr.String()
}

func ( isNullProjection) () string {
	return .expr.String()
}

func ( isNullProjection) ( memory.Allocator,  arrow.Record) ([]arrow.Field, []arrow.Array, error) {
	, ,  := .p.Project(, )
	if  != nil {
		return nil, nil, 
	}
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()

	if len() != 1 || len() != len() {
		return nil, nil, fmt.Errorf("invalid projection for isNull: expected 1 field and array, got %d fields %d arrays", len(), len())
	}

	 := array.NewBooleanBuilder()
	defer .Release()

	.Resize([0].Len())

	for  := 0;  < [0].Len(); ++ {
		.Append([0].IsNull())
	}

	return []arrow.Field{{
		Name:     .expr.String(),
		Type:     arrow.FixedWidthTypes.Boolean,
		Nullable: [0].Nullable,
		Metadata: [0].Metadata,
	}}, []arrow.Array{.NewBooleanArray()}, nil
}

type ifExprProjection struct {
	expr *logicalplan.IfExpr

	cond columnProjection
	then columnProjection
	els  columnProjection
}

func ( ifExprProjection) () string {
	return .expr.Name()
}

func ( ifExprProjection) () string {
	return .expr.Name()
}

func ( ifExprProjection) ( memory.Allocator,  arrow.Record) ([]arrow.Field, []arrow.Array, error) {
	, ,  := .then.Project(, )
	if  != nil {
		return nil, nil, 
	}
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()

	, ,  := .els.Project(, )
	if  != nil {
		return nil, nil, 
	}
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()

	, ,  := .cond.Project(, )
	if  != nil {
		return nil, nil, 
	}
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()

	if len() != 1 || len() != 1 || len() != 1 {
		return nil, nil, fmt.Errorf("invalid projection for if: all columns must be non-empty and of equal length")
	}

	 := [0]
	 := [0]
	 := [0]

	,  := .(*array.Boolean)
	if ! {
		return nil, nil, fmt.Errorf("invalid projection for if: condition column must be of type boolean")
	}

	if .Len() != .Len() || .Len() != .Len() {
		return nil, nil, fmt.Errorf("invalid projection for if: condition, then and else columns must be of equal length")
	}

	if !arrow.TypeEqual([0].DataType(), [0].DataType()) {
		return nil, nil, fmt.Errorf("invalid projection for if: then and else columns must be of the same type")
	}

	var  arrow.Array
	switch [0].(type) {
	case *array.Int64:
		 = conditionalAddInt64(, , .(*array.Int64), .(*array.Int64))
	default:
		return nil, nil, fmt.Errorf("unsupported if expression type: %s %T", [0].DataType(), [0])
	}

	return []arrow.Field{{
		Name:     .expr.Name(),
		Type:     .DataType(),
		Nullable: [0].Nullable,
		Metadata: [0].Metadata,
	}}, []arrow.Array{}, nil
}

func conditionalAddInt64( memory.Allocator,  *array.Boolean, ,  *array.Int64) *array.Int64 {
	 := array.NewInt64Builder()
	defer .Release()

	.Resize(.Len())

	for  := 0;  < .Len(); ++ {
		if .IsValid() && .Value() {
			.Append(.Value())
		} else {
			.Append(.Value())
		}
	}

	return .NewInt64Array()
}

type literalProjection struct {
	value scalar.Scalar
}

func ( literalProjection) () string {
	return .value.String()
}

func ( literalProjection) () string {
	return .value.String()
}

func ( literalProjection) ( memory.Allocator,  arrow.Record) ([]arrow.Field, []arrow.Array, error) {
	,  := scalar.MakeArrayFromScalar(.value, int(.NumRows()), )
	if  != nil {
		return nil, nil, fmt.Errorf("make array from literal value: %w", )
	}

	return []arrow.Field{{
			Name: .value.String(),
			Type: .value.DataType(),
		}}, []arrow.Array{
			,
		}, nil
}

type dynamicProjection struct {
	expr *logicalplan.DynamicColumn
}

func ( dynamicProjection) () string {
	return .expr.ColumnName
}

func ( dynamicProjection) () string {
	return .expr.ColumnName
}

func ( dynamicProjection) ( memory.Allocator,  arrow.Record) ([]arrow.Field, []arrow.Array, error) {
	 := []arrow.Field{}
	 := []arrow.Array{}
	for  := 0;  < .Schema().NumFields(); ++ {
		 := .Schema().Field()
		if .expr.MatchColumn(.Name) {
			 = append(, )
			 = append(, .Column())
			.Column().Retain() // Retain the column since we're keeping it.
		}
	}

	return , , nil
}

func projectionFromExpr( logicalplan.Expr) (columnProjection, error) {
	switch e := .(type) {
	case *logicalplan.AllExpr:
		return allProjection{}, nil
	case *logicalplan.Column:
		return plainProjection{
			expr: ,
		}, nil
	case *logicalplan.ConvertExpr:
		,  := (.Expr)
		if  != nil {
			return nil, fmt.Errorf("projection to convert: %w", )
		}

		return convertProjection{
			p:    ,
			expr: ,
		}, nil
	case *logicalplan.AggregationFunction:
		return plainProjection{
			expr: logicalplan.Col(.Name()),
		}, nil
	case *logicalplan.DynamicColumn:
		return dynamicProjection{
			expr: ,
		}, nil
	case *logicalplan.LiteralExpr:
		return literalProjection{
			value: .Value,
		}, nil
	case *logicalplan.AliasExpr:
		,  := (.Expr)
		if  != nil {
			return nil, fmt.Errorf("projection to convert: %w", )
		}

		return aliasProjection{
			p:    ,
			expr: ,
			name: .Alias,
		}, nil
	case *logicalplan.BinaryExpr:
		switch .Op {
		case logicalplan.OpEq, logicalplan.OpNotEq, logicalplan.OpGt, logicalplan.OpGtEq, logicalplan.OpLt, logicalplan.OpLtEq, logicalplan.OpRegexMatch, logicalplan.OpRegexNotMatch, logicalplan.OpAnd, logicalplan.OpOr:
			,  := binaryBooleanExpr()
			if  != nil {
				return nil, fmt.Errorf("boolean projection from expr: %w", )
			}
			return boolExprProjection{
				boolExpr: ,
			}, nil
		case logicalplan.OpAdd, logicalplan.OpSub, logicalplan.OpMul, logicalplan.OpDiv:
			,  := (.Left)
			if  != nil {
				return nil, fmt.Errorf("left projection for arithmetic projection: %w", )
			}

			,  := (.Right)
			if  != nil {
				return nil, fmt.Errorf("right projection for arithmetic projection: %w", )
			}

			return binaryExprProjection{
				expr: ,

				left:  ,
				right: ,
			}, nil
		default:
			return nil, fmt.Errorf("unknown binary expression: %s", .String())
		}
	case *logicalplan.IfExpr:
		,  := (.Cond)
		if  != nil {
			return nil, fmt.Errorf("condition projection for if projection: %w", )
		}

		,  := (.Then)
		if  != nil {
			return nil, fmt.Errorf("then projection for if projection: %w", )
		}

		,  := (.Else)
		if  != nil {
			return nil, fmt.Errorf("else projection for if projection: %w", )
		}

		return ifExprProjection{
			expr: ,
			cond: ,
			then: ,
			els:  ,
		}, nil
	case *logicalplan.IsNullExpr:
		,  := (.Expr)
		if  != nil {
			return nil, fmt.Errorf("projection for is null projection: %w", )
		}

		return isNullProjection{
			expr: ,
			p:    ,
		}, nil
	default:
		return nil, fmt.Errorf("unsupported expression type for projection: %T", )
	}
}

type Projection struct {
	pool   memory.Allocator
	tracer trace.Tracer

	colProjections []columnProjection

	next PhysicalPlan
}

func ( memory.Allocator,  trace.Tracer,  []logicalplan.Expr) (*Projection, error) {
	 := &Projection{
		pool:           ,
		tracer:         ,
		colProjections: make([]columnProjection, 0, len()),
	}

	for ,  := range  {
		,  := projectionFromExpr()
		if  != nil {
			return nil, 
		}
		.colProjections = append(.colProjections, )
	}

	return , nil
}

func ( *Projection) () {
	.next.Close()
}

func ( *Projection) ( context.Context,  arrow.Record) error {
	,  := .Project(, )
	if  != nil {
		return 
	}
	defer .Release()

	return .next.Callback(, )
}

func ( *Projection) ( context.Context,  arrow.Record) (arrow.Record, error) {
	// Generates high volume of spans. Comment out if needed during development.
	// ctx, span := p.tracer.Start(ctx, "Projection/Callback")
	// defer span.End()

	 := make([]arrow.Field, 0, len(.colProjections))
	 := make([]arrow.Array, 0, len(.colProjections))

	for ,  := range .colProjections {
		, ,  := .Project(.pool, )
		if  != nil {
			return nil, 
		}
		if  == nil {
			continue
		}

		 = append(, ...)
		 = append(, ...)
	}

	 := int64(0)
	if len() > 0 {
		 = int64([0].Len())
	}

	 := array.NewRecord(
		arrow.NewSchema(, nil),
		,
		,
	)

	for ,  := range  {
		.Release()
	}

	return , nil
}

func ( *Projection) ( context.Context) error {
	return .next.Finish()
}

func ( *Projection) ( PhysicalPlan) {
	.next = 
}

func ( *Projection) () *Diagram {
	var  *Diagram
	if .next != nil {
		 = .next.Draw()
	}

	var  []string
	for ,  := range .colProjections {
		 = append(, .String())
	}
	 := fmt.Sprintf("Projection (%s)", strings.Join(, ", "))
	return &Diagram{Details: , Child: }
}

type allProjection struct{}

func ( allProjection) () string { return "all" }

func ( allProjection) () string { return "*" }

func ( allProjection) ( memory.Allocator,  arrow.Record) ([]arrow.Field, []arrow.Array, error) {
	return .Schema().Fields(), .Columns(), nil
}