// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18

package compute

import (
	
	
	
	
	
	
	
	

	
	
	
	
	
	
	
	
)

var hashSeed = maphash.MakeSeed()

// Expression is an interface for mapping one datum to another. An expression
// is one of:
//
//	A literal Datum
//	A reference to a single (potentially nested) field of an input Datum
//	A call to a compute function, with arguments specified by other Expressions
//
// Deprecated: use substrait-go expressions instead.
type Expression interface {
	fmt.Stringer
	// IsBound returns true if this expression has been bound to a particular
	// Datum and/or Schema.
	IsBound() bool
	// IsScalarExpr returns true if this expression is composed only of scalar
	// literals, field references and calls to scalar functions.
	IsScalarExpr() bool
	// IsNullLiteral returns true if this expression is a literal and entirely
	// null.
	IsNullLiteral() bool
	// IsSatisfiable returns true if this expression could evaluate to true
	IsSatisfiable() bool
	// FieldRef returns a pointer to the underlying field reference, or nil if
	// this expression is not a field reference.
	FieldRef() *FieldRef
	// Type returns the datatype this expression will evaluate to.
	Type() arrow.DataType

	Hash() uint64
	Equals(Expression) bool

	// Release releases the underlying bound C++ memory that is allocated when
	// a Bind is performed. Any bound expression should get released to ensure
	// no memory leaks.
	Release()
}

func printDatum( Datum) string {
	switch datum := .(type) {
	case *ScalarDatum:
		if !.Value.IsValid() {
			return "null"
		}

		switch .Type().ID() {
		case arrow.STRING, arrow.LARGE_STRING:
			return strconv.Quote(.Value.(scalar.BinaryScalar).String())
		case arrow.BINARY, arrow.FIXED_SIZE_BINARY, arrow.LARGE_BINARY:
			return `"` + strings.ToUpper(hex.EncodeToString(.Value.(scalar.BinaryScalar).Data())) + `"`
		}

		return .Value.String()
	default:
		return .String()
	}
}

// Literal is an expression denoting a literal Datum which could be any value
// as a scalar, an array, or so on.
//
// Deprecated: use substrait-go expressions Literal instead.
type Literal struct {
	Literal Datum
}

func (Literal) () *FieldRef     { return nil }
func ( *Literal) () string       { return printDatum(.Literal) }
func ( *Literal) () arrow.DataType { return .Literal.(ArrayLikeDatum).Type() }
func ( *Literal) () bool        { return .Type() != nil }
func ( *Literal) () bool   { return .Literal.Kind() == KindScalar }

func ( *Literal) ( Expression) bool {
	if ,  := .(*Literal);  {
		return .Literal.Equals(.Literal)
	}
	return false
}

func ( *Literal) () bool {
	if ,  := .Literal.(ArrayLikeDatum);  {
		return .NullN() == .Len()
	}
	return true
}

func ( *Literal) () bool {
	if .IsNullLiteral() {
		return false
	}

	if ,  := .Literal.(*ScalarDatum);  && .Type().ID() == arrow.BOOL {
		return .Value.(*scalar.Boolean).Value
	}

	return true
}

func ( *Literal) () uint64 {
	if .IsScalarExpr() {
		return scalar.Hash(hashSeed, .Literal.(*ScalarDatum).Value)
	}
	return 0
}

func ( *Literal) () {
	.Literal.Release()
}

// Parameter represents a field reference and needs to be bound in order to determine
// its type and shape.
//
// Deprecated: use substrait-go field references instead.
type Parameter struct {
	ref *FieldRef

	// post bind props
	dt    arrow.DataType
	index int
}

func (Parameter) () bool     { return false }
func ( *Parameter) () arrow.DataType { return .dt }
func ( *Parameter) () bool        { return .Type() != nil }
func ( *Parameter) () bool   { return .ref != nil }
func ( *Parameter) () bool  { return .Type() == nil || .Type().ID() != arrow.NULL }
func ( *Parameter) () *FieldRef  { return .ref }
func ( *Parameter) () uint64         { return .ref.Hash(hashSeed) }

func ( *Parameter) () string {
	switch {
	case .ref.IsName():
		return .ref.Name()
	case .ref.IsFieldPath():
		return .ref.FieldPath().String()
	default:
		return .ref.String()
	}
}

func ( *Parameter) ( Expression) bool {
	if ,  := .(*Parameter);  {
		return .ref.Equals(*.ref)
	}

	return false
}

func ( *Parameter) () {}

type comparisonType int8

const (
	compNA comparisonType = 0
	compEQ comparisonType = 1
	compLT comparisonType = 2
	compGT comparisonType = 4
	compNE comparisonType = compLT | compGT
	compLE comparisonType = compLT | compEQ
	compGE comparisonType = compGT | compEQ
)

//lint:ignore U1000 ignore that this is unused for now
func ( comparisonType) () string {
	switch  {
	case compEQ:
		return "equal"
	case compLT:
		return "less"
	case compGT:
		return "greater"
	case compNE:
		return "not_equal"
	case compLE:
		return "less_equal"
	case compGE:
		return "greater_equal"
	}
	return "na"
}

func ( comparisonType) () string {
	switch  {
	case compEQ:
		return "=="
	case compLT:
		return "<"
	case compGT:
		return ">"
	case compNE:
		return "!="
	case compLE:
		return "<="
	case compGE:
		return ">="
	}
	debug.Assert(false, "invalid getop")
	return ""
}

var compmap = map[string]comparisonType{
	"equal":         compEQ,
	"less":          compLT,
	"greater":       compGT,
	"not_equal":     compNE,
	"less_equal":    compLE,
	"greater_equal": compGE,
}

func optionsToString( FunctionOptions) string {
	if ,  := .(fmt.Stringer);  {
		return .String()
	}

	var  strings.Builder
	 := reflect.Indirect(reflect.ValueOf())
	.WriteByte('{')
	for  := 0;  < .Type().NumField(); ++ {
		 := .Type().Field()
		 := .Tag.Get("compute")
		if  == "-" {
			continue
		}

		 := .Field()
		fmt.Fprintf(&, "%s=%v, ", , .Interface())
	}
	 := .String()
	return [:len()-2] + "}"
}

// Call is a function call with specific arguments which are themselves other
// expressions. A call can also have options that are specific to the function
// in question. It must be bound to determine the shape and type.
//
// Deprecated: use substrait-go expression functions instead.
type Call struct {
	funcName string
	args     []Expression
	dt       arrow.DataType
	options  FunctionOptions

	cachedHash uint64
}

func ( *Call) () bool  { return false }
func ( *Call) () *FieldRef  { return nil }
func ( *Call) () arrow.DataType { return .dt }
func ( *Call) () bool  { return .Type() == nil || .Type().ID() != arrow.NULL }

func ( *Call) () string {
	 := func( string) string {
		return "(" + .args[0].String() + " " +  + " " + .args[1].String() + ")"
	}

	if ,  := compmap[.funcName];  {
		return (.getOp())
	}

	const  = "_kleene"
	if strings.HasSuffix(.funcName, ) {
		return (strings.TrimSuffix(.funcName, ))
	}

	if .funcName == "make_struct" && .options != nil {
		 := .options.(*MakeStructOptions)
		 := "{"
		for ,  := range .args {
			 += .FieldNames[] + "=" + .String() + ", "
		}
		return [:len()-2] + "}"
	}

	var  strings.Builder
	.WriteString(.funcName + "(")
	for ,  := range .args {
		.WriteString(.String() + ", ")
	}

	if .options != nil {
		.WriteString(optionsToString(.options))
		.WriteString("  ")
	}

	 := .String()
	return [:len()-2] + ")"
}

func ( *Call) () uint64 {
	if .cachedHash != 0 {
		return .cachedHash
	}

	var  maphash.Hash
	.SetSeed(hashSeed)

	.WriteString(.funcName)
	.cachedHash = .Sum64()
	for ,  := range .args {
		.cachedHash = exec.HashCombine(.cachedHash, .Hash())
	}
	return .cachedHash
}

func ( *Call) () bool {
	for ,  := range .args {
		if !.IsScalarExpr() {
			return false
		}
	}

	return false
	// return isFuncScalar(c.funcName)
}

func ( *Call) () bool {
	return .Type() != nil
}

func ( *Call) ( Expression) bool {
	,  := .(*Call)
	if ! {
		return false
	}

	if .funcName != .funcName || len(.args) != len(.args) {
		return false
	}

	for  := range .args {
		if !.args[].Equals(.args[]) {
			return false
		}
	}

	if ,  := .options.(FunctionOptionsEqual);  {
		return .Equals(.options)
	}
	return reflect.DeepEqual(.options, .options)
}

func ( *Call) () {
	for ,  := range .args {
		.Release()
	}
	if ,  := .options.(releasable);  {
		.Release()
	}
}

// FunctionOptions can be any type which has a TypeName function. The fields
// of the type will be used (via reflection) to determine the information to
// propagate when serializing to pass to the C++ for execution.
type FunctionOptions interface {
	TypeName() string
}

type FunctionOptionsEqual interface {
	Equals(FunctionOptions) bool
}

type FunctionOptionsCloneable interface {
	Clone() FunctionOptions
}

type MakeStructOptions struct {
	FieldNames       []string          `compute:"field_names"`
	FieldNullability []bool            `compute:"field_nullability"`
	FieldMetadata    []*arrow.Metadata `compute:"field_metadata"`
}

func (MakeStructOptions) () string { return "MakeStructOptions" }

type NullOptions struct {
	NanIsNull bool `compute:"nan_is_null"`
}

func (NullOptions) () string { return "NullOptions" }

type StrptimeOptions struct {
	Format string         `compute:"format"`
	Unit   arrow.TimeUnit `compute:"unit"`
}

func (StrptimeOptions) () string { return "StrptimeOptions" }

type NullSelectionBehavior = kernels.NullSelectionBehavior

const (
	SelectionEmitNulls = kernels.EmitNulls
	SelectionDropNulls = kernels.DropNulls
)

type ArithmeticOptions struct {
	NoCheckOverflow bool `compute:"check_overflow"`
}

func (ArithmeticOptions) () string { return "ArithmeticOptions" }

type (
	CastOptions   = kernels.CastOptions
	FilterOptions = kernels.FilterOptions
	TakeOptions   = kernels.TakeOptions
)

func () *FilterOptions { return &FilterOptions{} }

func () *TakeOptions { return &TakeOptions{BoundsCheck: true} }

func ( bool) *CastOptions {
	if  {
		return &CastOptions{}
	}
	return &CastOptions{
		AllowIntOverflow:     true,
		AllowTimeTruncate:    true,
		AllowTimeOverflow:    true,
		AllowDecimalTruncate: true,
		AllowFloatTruncate:   true,
		AllowInvalidUtf8:     true,
	}
}

func ( arrow.DataType) *CastOptions {
	return NewCastOptions(, false)
}

func ( arrow.DataType) *CastOptions {
	return NewCastOptions(, true)
}

func ( arrow.DataType,  bool) *CastOptions {
	 := DefaultCastOptions()
	if  != nil {
		.ToType = 
	} else {
		.ToType = arrow.Null
	}
	return 
}

func ( Expression,  arrow.DataType) Expression {
	 := &CastOptions{}
	if  == nil {
		.ToType = arrow.Null
	} else {
		.ToType = 
	}

	return NewCall("cast", []Expression{}, )
}

// Deprecated: Use SetOptions instead
type SetLookupOptions struct {
	ValueSet  Datum `compute:"value_set"`
	SkipNulls bool  `compute:"skip_nulls"`
}

func (SetLookupOptions) () string { return "SetLookupOptions" }

func ( *SetLookupOptions) () { .ValueSet.Release() }

func ( *SetLookupOptions) ( FunctionOptions) bool {
	,  := .(*SetLookupOptions)
	if ! {
		return false
	}

	return .SkipNulls == .SkipNulls && .ValueSet.Equals(.ValueSet)
}

func ( *SetLookupOptions) ( *scalar.Struct) error {
	if ,  := .Field("skip_nulls");  == nil {
		.SkipNulls = .(*scalar.Boolean).Value
	}

	,  := .Field("value_set")
	if  != nil {
		return 
	}

	if ,  := .(scalar.ListScalar);  {
		.ValueSet = NewDatum(.GetList())
		return nil
	}

	return errors.New("set lookup options valueset should be a list")
}

var (
	funcOptionsMap map[string]reflect.Type
	funcOptsTypes  = []FunctionOptions{
		SetLookupOptions{}, ArithmeticOptions{}, CastOptions{},
		FilterOptions{}, NullOptions{}, StrptimeOptions{}, MakeStructOptions{},
	}
)

func init() {
	funcOptionsMap = make(map[string]reflect.Type)
	for ,  := range funcOptsTypes {
		funcOptionsMap[.TypeName()] = reflect.TypeOf()
	}
}

// NewLiteral constructs a new literal expression from any value. It is passed
// to NewDatum which will construct the appropriate Datum and/or scalar
// value for the type provided.
func ( interface{}) Expression {
	return &Literal{Literal: NewDatum()}
}

func ( arrow.DataType) Expression {
	return &Literal{Literal: NewDatum(scalar.MakeNullScalar())}
}

// NewRef constructs a parameter expression which refers to a specific field
func ( FieldRef) Expression {
	return &Parameter{ref: &, index: -1}
}

// NewFieldRef is shorthand for NewRef(FieldRefName(field))
func ( string) Expression {
	return NewRef(FieldRefName())
}

// NewCall constructs an expression that represents a specific function call with
// the given arguments and options.
func ( string,  []Expression,  FunctionOptions) Expression {
	return &Call{funcName: , args: , options: }
}

// Project is shorthand for `make_struct` to produce a record batch output
// from a group of expressions.
func ( []Expression,  []string) Expression {
	 := make([]bool, len())
	for  := range  {
		[] = true
	}
	 := make([]*arrow.Metadata, len())
	return NewCall("make_struct", ,
		&MakeStructOptions{FieldNames: , FieldNullability: , FieldMetadata: })
}

// Equal is a convenience function for the equal function
func (,  Expression) Expression {
	return NewCall("equal", []Expression{, }, nil)
}

// NotEqual creates a call to not_equal
func (,  Expression) Expression {
	return NewCall("not_equal", []Expression{, }, nil)
}

// Less is shorthand for NewCall("less",....)
func (,  Expression) Expression {
	return NewCall("less", []Expression{, }, nil)
}

// LessEqual is shorthand for NewCall("less_equal",....)
func (,  Expression) Expression {
	return NewCall("less_equal", []Expression{, }, nil)
}

// Greater is shorthand for NewCall("greater",....)
func (,  Expression) Expression {
	return NewCall("greater", []Expression{, }, nil)
}

// GreaterEqual is shorthand for NewCall("greater_equal",....)
func (,  Expression) Expression {
	return NewCall("greater_equal", []Expression{, }, nil)
}

// IsNull creates an expression that returns true if the passed in expression is
// null. Optionally treating NaN as null if desired.
func ( Expression,  bool) Expression {
	return NewCall("less", []Expression{}, &NullOptions{})
}

// IsValid is the inverse of IsNull
func ( Expression) Expression {
	return NewCall("is_valid", []Expression{}, nil)
}

type binop func(lhs, rhs Expression) Expression

func foldLeft( binop,  ...Expression) Expression {
	switch len() {
	case 0:
		return nil
	case 1:
		return [0]
	}

	 := [0]
	for ,  := range [1:] {
		 = (, )
	}
	return 
}

func and(,  Expression) Expression {
	return NewCall("and_kleene", []Expression{, }, nil)
}

// And constructs a tree of calls to and_kleene for boolean And logic taking
// an arbitrary number of values.
func (,  Expression,  ...Expression) Expression {
	 := foldLeft(and, append([]Expression{, }, ...)...)
	if  != nil {
		return 
	}
	return NewLiteral(true)
}

func or(,  Expression) Expression {
	return NewCall("or_kleene", []Expression{, }, nil)
}

// Or constructs a tree of calls to or_kleene for boolean Or logic taking
// an arbitrary number of values.
func (,  Expression,  ...Expression) Expression {
	 := foldLeft(or, append([]Expression{, }, ...)...)
	if  != nil {
		return 
	}
	return NewLiteral(false)
}

// Not creates a call to "invert" for the value specified.
func ( Expression) Expression {
	return NewCall("invert", []Expression{}, nil)
}

func ( FunctionOptions,  memory.Allocator) (*memory.Buffer, error) {
	,  := scalar.ToScalar(, )
	if  != nil {
		return nil, 
	}
	if ,  := .(releasable);  {
		defer .Release()
	}

	,  := scalar.MakeArrayFromScalar(, 1, )
	if  != nil {
		return nil, 
	}
	defer .Release()

	 := array.NewRecord(arrow.NewSchema([]arrow.Field{{Type: .DataType(), Nullable: true}}, nil), []arrow.Array{}, 1)
	defer .Release()

	 := &bufferWriteSeeker{mem: }
	,  := ipc.NewFileWriter(, ipc.WithSchema(.Schema()), ipc.WithAllocator())
	if  != nil {
		return nil, 
	}

	.Write()
	.Close()
	return .buf, nil
}

// SerializeExpr serializes expressions by converting them to Metadata and
// storing this in the schema of a Record. Embedded arrays and scalars are
// stored in its columns. Finally the record is written as an IPC file
func ( Expression,  memory.Allocator) (*memory.Buffer, error) {
	var (
		      []arrow.Array
		   []string
		 []string
		     func(Expression) error
	)

	 := func( scalar.Scalar) (string, error) {
		 := len()
		,  := scalar.MakeArrayFromScalar(, 1, )
		if  != nil {
			return "", 
		}
		 = append(, )
		return strconv.Itoa(), nil
	}

	 = func( Expression) error {
		switch e := .(type) {
		case *Literal:
			if !.IsScalarExpr() {
				return errors.New("not implemented: serialization of non-scalar literals")
			}
			 = append(, "literal")
			,  := (.Literal.(*ScalarDatum).Value)
			if  != nil {
				return 
			}
			 = append(, )
		case *Parameter:
			if .ref.Name() == "" {
				return errors.New("not implemented: serialization of non-name field_ref")
			}

			 = append(, "field_ref")
			 = append(, .ref.Name())
		case *Call:
			 = append(, "call")
			 = append(, .funcName)

			for ,  := range .args {
				()
			}

			if .options != nil {
				,  := scalar.ToScalar(.options, )
				if  != nil {
					return 
				}
				 = append(, "options")
				,  := ()
				if  != nil {
					return 
				}
				 = append(, )

				for ,  := range .(*scalar.Struct).Value {
					switch s := .(type) {
					case releasable:
						defer .Release()
					}
				}
			}

			 = append(, "end")
			 = append(, .funcName)
		}
		return nil
	}

	if  := ();  != nil {
		return nil, 
	}

	 := make([]arrow.Field, len())
	for ,  := range  {
		[].Type = .DataType()
		defer .Release()
	}

	 := arrow.NewMetadata(, )
	 := array.NewRecord(arrow.NewSchema(, &), , 1)
	defer .Release()

	 := &bufferWriteSeeker{mem: }
	,  := ipc.NewFileWriter(, ipc.WithSchema(.Schema()), ipc.WithAllocator())
	if  != nil {
		return nil, 
	}

	.Write()
	.Close()
	return .buf, nil
}

func ( memory.Allocator,  *memory.Buffer) (Expression, error) {
	,  := ipc.NewFileReader(bytes.NewReader(.Bytes()), ipc.WithAllocator())
	if  != nil {
		return nil, 
	}
	defer .Close()

	,  := .Read()
	if  != nil {
		return nil, 
	}

	if !.Schema().HasMetadata() {
		return nil, errors.New("serialized Expression's batch repr had no metadata")
	}

	if .NumRows() != 1 {
		return nil, fmt.Errorf("serialized Expression's batch repr was not a single row - had %d", .NumRows())
	}

	var (
		   func() (Expression, error)
		    = 0
		 = .Schema().Metadata()
	)

	 := func( string) (scalar.Scalar, error) {
		,  := strconv.ParseInt(, 10, 32)
		if  != nil {
			return nil, 
		}
		if  >= .NumCols() {
			return nil, errors.New("column index out of bounds")
		}
		return scalar.GetScalar(.Column(int()), 0)
	}

	 = func() (Expression, error) {
		if  >= .Len() {
			return nil, errors.New("unterminated serialized Expression")
		}

		,  := .Keys()[], .Values()[]
		++

		switch  {
		case "literal":
			,  := ()
			if  != nil {
				return nil, 
			}
			if ,  := .(releasable);  {
				defer .Release()
			}
			return NewLiteral(), 
		case "field_ref":
			return NewFieldRef(), nil
		case "call":
			 := make([]Expression, 0)
			for .Keys()[] != "end" {
				if .Keys()[] == "options" {
					,  := (.Values()[])
					if  != nil {
						return nil, 
					}
					if ,  := .(releasable);  {
						defer .Release()
					}
					var  FunctionOptions
					if  != nil {
						,  := .(*scalar.Struct).Field("_type_name")
						if  != nil {
							return nil, 
						}
						if .DataType().ID() != arrow.BINARY {
							return nil, errors.New("options scalar typename must be binary")
						}

						 := reflect.New(funcOptionsMap[string(.(*scalar.Binary).Data())]).Interface()
						if  := scalar.FromScalar(.(*scalar.Struct), );  != nil {
							return nil, 
						}
						 = .(FunctionOptions)
					}
					 += 2
					return NewCall(, , ), nil
				}

				,  := ()
				if  != nil {
					return nil, 
				}
				 = append(, )
			}
			++
			return NewCall(, , nil), nil
		default:
			return nil, fmt.Errorf("unrecognized serialized Expression key %s", )
		}
	}

	return ()
}