package physicalplan

import (
	
	
	
	
	

	
	
	
	

	
)

type ArrayRef struct {
	ColumnName string
}

func ( *ArrayRef) ( arrow.Record) (arrow.Array, bool, error) {
	 := .Schema().FieldIndices(.ColumnName)
	if len() != 1 {
		return nil, false, nil
	}

	return .Column([0]), true, nil
}

func ( *ArrayRef) () string {
	return .ColumnName
}

type BinaryScalarExpr struct {
	Left  *ArrayRef
	Op    logicalplan.Op
	Right scalar.Scalar
}

func ( BinaryScalarExpr) ( arrow.Record) (*Bitmap, error) {
	, ,  := .Left.ArrowArray()
	if  != nil {
		return nil, 
	}

	if ! {
		 := NewBitmap()
		switch .Op {
		case logicalplan.OpEq:
			if .Right.IsValid() { // missing column; looking for == non-nil
				switch t := .Right.(type) {
				case *scalar.Binary:
					if .String() != "" { // treat empty string equivalent to nil
						return , nil
					}
				case *scalar.String:
					if .String() != "" { // treat empty string equivalent to nil
						return , nil
					}
				}
			}
		case logicalplan.OpNotEq: // missing column; looking for != nil
			if !.Right.IsValid() {
				return , nil
			}
		case logicalplan.OpLt, logicalplan.OpLtEq, logicalplan.OpGt, logicalplan.OpGtEq:
			return , nil
		}

		.AddRange(0, uint64(.NumRows()))
		return , nil
	}

	return BinaryScalarOperation(, .Right, .Op)
}

func ( BinaryScalarExpr) () string {
	return .Left.String() + " " + .Op.String() + " " + .Right.String()
}

var ErrUnsupportedBinaryOperation = errors.New("unsupported binary operation")

func ( arrow.Array,  scalar.Scalar,  logicalplan.Op) (*Bitmap, error) {
	switch  {
	case logicalplan.OpContains, logicalplan.OpNotContains:
		switch arr := .(type) {
		case *array.Binary, *array.String:
			return ArrayScalarContains(, ,  == logicalplan.OpNotContains)
		case *array.Dictionary:
			return DictionaryArrayScalarContains(, ,  == logicalplan.OpNotContains)
		default:
			panic("unsupported array type " + fmt.Sprintf("%T", ))
		}
	}

	// TODO: Figure out dictionary arrays and lists with compute next
	 := .DataType()
	switch arr := .(type) {
	case *array.Dictionary:
		switch  {
		case logicalplan.OpEq:
			return DictionaryArrayScalarEqual(, )
		case logicalplan.OpNotEq:
			return DictionaryArrayScalarNotEqual(, )
		default:
			return nil, fmt.Errorf("unsupported operator: %v", )
		}
	}

	switch .(type) {
	case *arrow.ListType:
		panic("TODO: list comparisons unimplemented")
	}

	return ArrayScalarCompute(.ArrowString(), , )
}

func ( string,  arrow.Array,  scalar.Scalar) (*Bitmap, error) {
	 := compute.NewDatum()
	defer .Release()
	 := compute.NewDatum()
	defer .Release()
	,  := compute.CallFunction(context.TODO(), , nil, , )
	if  != nil {
		if errors.Unwrap().Error() == "not implemented" {
			return nil, ErrUnsupportedBinaryOperation
		}
		return nil, fmt.Errorf("error calling equal function: %w", )
	}
	defer .Release()
	,  := .(*compute.ArrayDatum)
	if ! {
		return nil, fmt.Errorf("expected *compute.ArrayDatum, got %T", )
	}
	,  := .MakeArray().(*array.Boolean)
	if ! {
		return nil, fmt.Errorf("expected *array.Boolean, got %T", .MakeArray())
	}
	defer .Release()

	 := NewBitmap()
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}
		if .Value() {
			.AddInt()
		}
	}
	return , nil
}

func ( *array.Dictionary,  scalar.Scalar) (*Bitmap, error) {
	 := NewBitmap()
	var  []byte
	switch r := .(type) {
	case *scalar.Binary:
		 = .Data()
	case *scalar.String:
		 = .Data()
	}

	// This is a special case for where the left side should not equal NULL
	if  == scalar.ScalarNull {
		for  := 0;  < .Len(); ++ {
			if !.IsNull() {
				.Add(uint32())
			}
		}
		return , nil
	}

	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}

		switch dict := .Dictionary().(type) {
		case *array.Binary:
			if !bytes.Equal(.Value(.GetValueIndex()), ) {
				.Add(uint32())
			}
		case *array.String:
			if .Value(.GetValueIndex()) != string() {
				.Add(uint32())
			}
		}
	}

	return , nil
}

func ( *array.Dictionary,  scalar.Scalar) (*Bitmap, error) {
	 := NewBitmap()
	var  []byte
	switch r := .(type) {
	case *scalar.Binary:
		 = .Data()
	case *scalar.String:
		 = .Data()
	}

	// This is a special case for where the left side should equal NULL
	if  == scalar.ScalarNull {
		for  := 0;  < .Len(); ++ {
			if .IsNull() {
				.Add(uint32())
			}
		}
		return , nil
	}

	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}

		switch dict := .Dictionary().(type) {
		case *array.Binary:
			if bytes.Equal(.Value(.GetValueIndex()), ) {
				.Add(uint32())
			}
		case *array.String:
			if .Value(.GetValueIndex()) == string() {
				.Add(uint32())
			}
		}
	}

	return , nil
}

func ( arrow.Array,  scalar.Scalar,  bool) (*Bitmap, error) {
	var  []byte
	switch s := .(type) {
	case *scalar.Binary:
		 = .Data()
	case *scalar.String:
		 = .Data()
	}

	 := NewBitmap()
	switch a := .(type) {
	case *array.Binary:
		for  := 0;  < .Len(); ++ {
			if .IsNull() {
				continue
			}
			 := bytes.Contains(.Value(), )
			if  && ! || ! &&  {
				.Add(uint32())
			}
		}
		return , nil
	case *array.String:
		for  := 0;  < .Len(); ++ {
			if .IsNull() {
				continue
			}
			 := bytes.Contains(unsafeStringToBytes(.Value()), )
			if  && ! || ! &&  {
				.Add(uint32())
			}
		}
		return , nil
	}
	return nil, fmt.Errorf("contains not implemented for %T", )
}

func ( *array.Dictionary,  scalar.Scalar,  bool) (*Bitmap, error) {
	 := NewBitmap()
	var  []byte
	switch r := .(type) {
	case *scalar.Binary:
		 = .Data()
	case *scalar.String:
		 = .Data()
	}

	// This is a special case for where the left side should not equal NULL
	if  == scalar.ScalarNull {
		for  := 0;  < .Len(); ++ {
			if !.IsNull() {
				.Add(uint32())
			}
		}
		return , nil
	}

	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}

		switch dict := .Dictionary().(type) {
		case *array.Binary:
			 := bytes.Contains(.Value(.GetValueIndex()), )
			if  && ! || ! &&  {
				.Add(uint32())
			}
		case *array.String:
			 := bytes.Contains(unsafeStringToBytes(.Value(.GetValueIndex())), )
			if  && ! || ! &&  {
				.Add(uint32())
			}
		}
	}

	return , nil
}

func unsafeStringToBytes( string) []byte {
	return unsafe.Slice(unsafe.StringData(), len())
}