// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18

package exec

import (
	
	
	
	
	

	
	
	
	
)

var hashSeed = maphash.MakeSeed()

type ctxAllocKey struct{}

// WithAllocator returns a new context with the provided allocator
// embedded into the context.
func ( context.Context,  memory.Allocator) context.Context {
	return context.WithValue(, ctxAllocKey{}, )
}

// GetAllocator retrieves the allocator from the context, or returns
// memory.DefaultAllocator if there was no allocator in the provided
// context.
func ( context.Context) memory.Allocator {
	,  := .Value(ctxAllocKey{}).(memory.Allocator)
	if ! {
		return memory.DefaultAllocator
	}
	return 
}

// Kernel defines the minimum interface required for the basic execution
// kernel. It will grow as the implementation requires.
type Kernel interface {
	GetInitFn() KernelInitFn
	GetSig() *KernelSignature
}

// NonAggKernel builds on the base Kernel interface for
// non aggregate execution kernels. Specifically this will
// represent Scalar and Vector kernels.
type NonAggKernel interface {
	Kernel
	Exec(*KernelCtx, *ExecSpan, *ExecResult) error
	GetNullHandling() NullHandling
	GetMemAlloc() MemAlloc
	CanFillSlices() bool
	Cleanup() error
}

// KernelCtx is a small struct holding the context for a kernel execution
// consisting of a pointer to the kernel, initialized state (if needed)
// and the context for this execution.
type KernelCtx struct {
	Ctx    context.Context
	Kernel Kernel
	State  KernelState
}

func ( *KernelCtx) ( int) *memory.Buffer {
	 := memory.NewResizableBuffer(GetAllocator(.Ctx))
	.Resize()
	return 
}

func ( *KernelCtx) ( int64) *memory.Buffer {
	 := bitutil.BytesForBits()
	return .Allocate(int())
}

// TypeMatcher define an interface for matching Input or Output types
// for execution kernels. There are multiple implementations of this
// interface provided by this package.
type TypeMatcher interface {
	fmt.Stringer
	Matches(typ arrow.DataType) bool
	Equals(other TypeMatcher) bool
}

type sameTypeIDMatcher struct {
	accepted arrow.Type
}

func ( sameTypeIDMatcher) ( arrow.DataType) bool { return .accepted == .ID() }
func ( sameTypeIDMatcher) ( TypeMatcher) bool {
	if  ==  {
		return true
	}

	,  := .(*sameTypeIDMatcher)
	if ! {
		return false
	}

	return .accepted == .accepted
}

func ( sameTypeIDMatcher) () string {
	return "Type::" + .accepted.String()
}

// SameTypeID returns a type matcher which will match
// any DataType that uses the same arrow.Type ID as the one
// passed in here.
func ( arrow.Type) TypeMatcher { return &sameTypeIDMatcher{} }

type timeUnitMatcher struct {
	id   arrow.Type
	unit arrow.TimeUnit
}

func ( timeUnitMatcher) ( arrow.DataType) bool {
	if .ID() != .id {
		return false
	}
	return .unit == .(arrow.TemporalWithUnit).TimeUnit()
}

func ( timeUnitMatcher) () string {
	return strings.ToLower(.id.String()) + "(" + .unit.String() + ")"
}

func ( *timeUnitMatcher) ( TypeMatcher) bool {
	if  ==  {
		return true
	}

	,  := .(*timeUnitMatcher)
	if ! {
		return false
	}
	return .id == .id && .unit == .unit
}

// TimestampTypeUnit returns a TypeMatcher that will match only
// a Timestamp datatype with the specified TimeUnit.
func ( arrow.TimeUnit) TypeMatcher {
	return &timeUnitMatcher{arrow.TIMESTAMP, }
}

// Time32TypeUnit returns a TypeMatcher that will match only
// a Time32 datatype with the specified TimeUnit.
func ( arrow.TimeUnit) TypeMatcher {
	return &timeUnitMatcher{arrow.TIME32, }
}

// Time64TypeUnit returns a TypeMatcher that will match only
// a Time64 datatype with the specified TimeUnit.
func ( arrow.TimeUnit) TypeMatcher {
	return &timeUnitMatcher{arrow.TIME64, }
}

// DurationTypeUnit returns a TypeMatcher that will match only
// a Duration datatype with the specified TimeUnit.
func ( arrow.TimeUnit) TypeMatcher {
	return &timeUnitMatcher{arrow.DURATION, }
}

type integerMatcher struct{}

func (integerMatcher) () string                  { return "integer" }
func (integerMatcher) ( arrow.DataType) bool { return arrow.IsInteger(.ID()) }
func (integerMatcher) ( TypeMatcher) bool {
	,  := .(integerMatcher)
	return 
}

type binaryLikeMatcher struct{}

func (binaryLikeMatcher) () string                  { return "binary-like" }
func (binaryLikeMatcher) ( arrow.DataType) bool { return arrow.IsBinaryLike(.ID()) }
func (binaryLikeMatcher) ( TypeMatcher) bool {
	,  := .(binaryLikeMatcher)
	return 
}

type largeBinaryLikeMatcher struct{}

func (largeBinaryLikeMatcher) () string { return "large-binary-like" }
func (largeBinaryLikeMatcher) ( arrow.DataType) bool {
	return arrow.IsLargeBinaryLike(.ID())
}
func (largeBinaryLikeMatcher) ( TypeMatcher) bool {
	,  := .(largeBinaryLikeMatcher)
	return 
}

type fsbLikeMatcher struct{}

func (fsbLikeMatcher) () string                  { return "fixed-size-binary-like" }
func (fsbLikeMatcher) ( arrow.DataType) bool { return arrow.IsFixedSizeBinary(.ID()) }
func (fsbLikeMatcher) ( TypeMatcher) bool {
	,  := .(fsbLikeMatcher)
	return 
}

// Integer returns a TypeMatcher which will match any integral type like int8 or uint16
func () TypeMatcher { return integerMatcher{} }

// BinaryLike returns a TypeMatcher that will match Binary or String
func () TypeMatcher { return binaryLikeMatcher{} }

// LargeBinaryLike returns a TypeMatcher which will match LargeBinary or LargeString
func () TypeMatcher { return largeBinaryLikeMatcher{} }

// FixedSizeBinaryLike returns a TypeMatcher that will match FixedSizeBinary
// or Decimal128/256
func () TypeMatcher { return fsbLikeMatcher{} }

type primitiveMatcher struct{}

func (primitiveMatcher) () string                  { return "primitive" }
func (primitiveMatcher) ( arrow.DataType) bool { return arrow.IsPrimitive(.ID()) }
func (primitiveMatcher) ( TypeMatcher) bool {
	,  := .(primitiveMatcher)
	return 
}

// Primitive returns a TypeMatcher that will match any type that arrow.IsPrimitive
// returns true for.
func () TypeMatcher { return primitiveMatcher{} }

type reeMatcher struct {
	runEndsMatcher TypeMatcher
	encodedMatcher TypeMatcher
}

func ( reeMatcher) ( arrow.DataType) bool {
	if .ID() != arrow.RUN_END_ENCODED {
		return false
	}

	 := .(*arrow.RunEndEncodedType)
	return .runEndsMatcher.Matches(.RunEnds()) && .encodedMatcher.Matches(.Encoded())
}

func ( reeMatcher) ( TypeMatcher) bool {
	,  := .(reeMatcher)
	if ! {
		return false
	}
	return .runEndsMatcher.Equals(.runEndsMatcher) && .encodedMatcher.Equals(.encodedMatcher)
}

func ( reeMatcher) () string {
	return "run_end_encoded(run_ends=" + .runEndsMatcher.String() + ", values=" + .encodedMatcher.String() + ")"
}

// RunEndEncoded returns a matcher which matches a RunEndEncoded
// type whose encoded type is matched by the passed in matcher.
func (,  TypeMatcher) TypeMatcher {
	return reeMatcher{
		runEndsMatcher: ,
		encodedMatcher: }
}

// InputKind is an enum representing the type of Input matching
// that will be done. Either accepting any type, an exact specific type
// or using a TypeMatcher.
type InputKind int8

const (
	InputAny InputKind = iota
	InputExact
	InputUseMatcher
)

// InputType is used for type checking arguments passed to a kernel
// and stored within a KernelSignature. The type-checking rule can
// be supplied either with an exact DataType instance or a custom
// TypeMatcher.
type InputType struct {
	Kind    InputKind
	Type    arrow.DataType
	Matcher TypeMatcher
}

func ( arrow.DataType) InputType { return InputType{Kind: InputExact, Type: } }
func ( TypeMatcher) InputType {
	return InputType{Kind: InputUseMatcher, Matcher: }
}
func ( arrow.Type) InputType { return NewMatchedInput(SameTypeID()) }

func ( InputType) () arrow.Type {
	switch .Kind {
	case InputExact:
		return .Type.ID()
	case InputUseMatcher:
		if ,  := .Matcher.(*sameTypeIDMatcher);  {
			return .accepted
		}
	}
	debug.Assert(false, "MatchID called on non-id matching InputType")
	return -1
}

func ( InputType) () string {
	switch .Kind {
	case InputAny:
		return "any"
	case InputUseMatcher:
		return .Matcher.String()
	case InputExact:
		return .Type.String()
	}
	return ""
}

func ( *InputType) ( *InputType) bool {
	if  ==  {
		return true
	}

	if .Kind != .Kind {
		return false
	}

	switch .Kind {
	case InputAny:
		return true
	case InputExact:
		return arrow.TypeEqual(.Type, .Type)
	case InputUseMatcher:
		return .Matcher.Equals(.Matcher)
	default:
		return false
	}
}

func ( InputType) () uint64 {
	var  maphash.Hash

	.SetSeed(hashSeed)
	 := HashCombine(.Sum64(), uint64(.Kind))
	switch .Kind {
	case InputExact:
		 = HashCombine(, arrow.HashType(hashSeed, .Type))
	}
	return 
}

func ( InputType) ( arrow.DataType) bool {
	switch .Kind {
	case InputExact:
		return arrow.TypeEqual(.Type, )
	case InputUseMatcher:
		return .Matcher.Matches()
	case InputAny:
		return true
	default:
		debug.Assert(false, "invalid InputKind")
		return true
	}
}

// ResolveKind defines the way that a particular OutputType resolves
// its type. Either it has a fixed type to resolve to or it contains
// a Resolver which will compute the resolved type based on
// the input types.
type ResolveKind int8

const (
	ResolveFixed ResolveKind = iota
	ResolveComputed
)

// TypeResolver is simply a function that takes a KernelCtx and a list of input types
// and returns the resolved type or an error.
type TypeResolver = func(*KernelCtx, []arrow.DataType) (arrow.DataType, error)

type OutputType struct {
	Kind     ResolveKind
	Type     arrow.DataType
	Resolver TypeResolver
}

func ( arrow.DataType) OutputType {
	return OutputType{Kind: ResolveFixed, Type: }
}

func ( TypeResolver) OutputType {
	return OutputType{Kind: ResolveComputed, Resolver: }
}

func ( OutputType) () string {
	if .Kind == ResolveFixed {
		return .Type.String()
	}
	return "computed"
}

func ( OutputType) ( *KernelCtx,  []arrow.DataType) (arrow.DataType, error) {
	switch .Kind {
	case ResolveFixed:
		return .Type, nil
	}

	return .Resolver(, )
}

// NullHandling is an enum representing how a particular Kernel
// wants the executor to handle nulls.
type NullHandling int8

const (
	// Compute the output validity bitmap by intersection the validity
	// bitmaps of the arguments using bitwise-and operations. This means
	// that values in the output are valid/non-null only if the corresponding
	// values in all input arguments were valid/non-null. Kernels generally
	// do not have to touch the bitmap afterwards, but a kernel's exec function
	// is permitted to alter the bitmap after the null intersection is computed
	// if necessary.
	NullIntersection NullHandling = iota
	// Kernel expects a pre-allocated buffer to write the result bitmap
	// into.
	NullComputedPrealloc
	// Kernel will allocate and set the validity bitmap of the output
	NullComputedNoPrealloc
	// kernel output is never null and a validity bitmap doesn't need to
	// be allocated
	NullNoOutput
)

// MemAlloc is the preference for preallocating memory of fixed-width
// type outputs during kernel execution.
type MemAlloc int8

const (
	// For data types that support pre-allocation (fixed-width), the
	// kernel expects to be provided a pre-allocated buffer to write into.
	// Non-fixed-width types must always allocate their own buffers.
	// The allocation is made for the same length as the execution batch,
	// so vector kernels yielding differently sized outputs should not
	// use this.
	//
	// It is valid for the data to not be preallocated but the validity
	// bitmap is (or is computed using intersection).
	//
	// For variable-size output types like Binary or String, or for nested
	// types, this option has no effect.
	MemPrealloc MemAlloc = iota
	// The kernel is responsible for allocating its own data buffer
	// for fixed-width output types.
	MemNoPrealloc
)

type KernelState any

// KernelInitArgs are the arguments required to initialize an Kernel's
// state using the input types and any options.
type KernelInitArgs struct {
	Kernel Kernel
	Inputs []arrow.DataType
	// Options are opaque and specific to the Kernel being initialized,
	// may be nil if the kernel doesn't require options.
	Options any
}

// KernelInitFn is any function that receives a KernelCtx and initialization
// arguments and returns the initialized state or an error.
type KernelInitFn = func(*KernelCtx, KernelInitArgs) (KernelState, error)

// KernelSignature holds the input and output types for a kernel.
//
// Variable argument functions with a minimum of N arguments should pass
// up to N input types to be used to validate for invocation. The first
// N-1 types will be matched against the first N-1 arguments and the last
// type will be matched against the remaining arguments.
type KernelSignature struct {
	InputTypes []InputType
	OutType    OutputType
	IsVarArgs  bool

	// store the hashcode after it is computed so we don't
	// need to recompute it
	hashCode uint64
}

func ( KernelSignature) () string {
	var  strings.Builder
	if .IsVarArgs {
		.WriteString("varargs[")
	} else {
		.WriteByte('(')
	}

	for ,  := range .InputTypes {
		if  != 0 {
			.WriteString(", ")
		}
		.WriteString(.String())
	}
	if .IsVarArgs {
		.WriteString("*]")
	} else {
		.WriteByte(')')
	}

	.WriteString(" -> ")
	.WriteString(.OutType.String())
	return .String()
}

func ( KernelSignature) ( KernelSignature) bool {
	if .IsVarArgs != .IsVarArgs {
		return false
	}

	return slices.EqualFunc(.InputTypes, .InputTypes, func(,  InputType) bool {
		return .Equals(&)
	})
}

func ( *KernelSignature) () uint64 {
	if .hashCode != 0 {
		return .hashCode
	}

	var  maphash.Hash
	.SetSeed(hashSeed)
	 := .Sum64()
	for ,  := range .InputTypes {
		 = HashCombine(, .Hash())
	}
	.hashCode = 
	return 
}

func ( KernelSignature) ( []arrow.DataType) bool {
	switch .IsVarArgs {
	case true:
		// check that it has enough to match at least the non-vararg types
		if len() < (len(.InputTypes) - 1) {
			return false
		}

		for ,  := range  {
			if !.InputTypes[Min(, len(.InputTypes)-1)].Matches() {
				return false
			}
		}
	case false:
		if len() != len(.InputTypes) {
			return false
		}
		for ,  := range  {
			if !.InputTypes[].Matches() {
				return false
			}
		}
	}
	return true
}

// ArrayKernelExec is an alias definition for a kernel's execution function.
//
// This is used for both stateless and stateful kernels. If a kernel
// depends on some execution state, it can be accessed from the KernelCtx
// object, which also contains the context.Context object which can be
// used for shortcircuiting by checking context.Done / context.Err.
// This allows kernels to control handling timeouts or cancellation of
// computation.
type ArrayKernelExec = func(*KernelCtx, *ExecSpan, *ExecResult) error

type kernel struct {
	Init           KernelInitFn
	Signature      *KernelSignature
	Data           KernelState
	Parallelizable bool
}

func ( kernel) () KernelInitFn  { return .Init }
func ( kernel) () *KernelSignature { return .Signature }

// A ScalarKernel is the kernel implementation for a Scalar Function.
// In addition to the members found in the base Kernel, it contains
// the null handling and memory pre-allocation preferences.
type ScalarKernel struct {
	kernel

	ExecFn             ArrayKernelExec
	CanWriteIntoSlices bool
	NullHandling       NullHandling
	MemAlloc           MemAlloc
	CleanupFn          func(KernelState) error
}

// NewScalarKernel constructs a new kernel for scalar execution, constructing
// a KernelSignature with the provided input types and output type, and using
// the passed in execution implementation and initialization function.
func ( []InputType,  OutputType,  ArrayKernelExec,  KernelInitFn) ScalarKernel {
	return NewScalarKernelWithSig(&KernelSignature{
		InputTypes: ,
		OutType:    ,
	}, , )
}

// NewScalarKernelWithSig is a convenience when you already have a signature
// to use for constructing a kernel. It's equivalent to passing the components
// of the signature (input and output types) to NewScalarKernel.
func ( *KernelSignature,  ArrayKernelExec,  KernelInitFn) ScalarKernel {
	return ScalarKernel{
		kernel:             kernel{Signature: , Init: , Parallelizable: true},
		ExecFn:             ,
		CanWriteIntoSlices: true,
		NullHandling:       NullIntersection,
		MemAlloc:           MemPrealloc,
	}
}

func ( *ScalarKernel) () error {
	if .CleanupFn != nil {
		return .CleanupFn(.Data)
	}
	return nil
}

func ( *ScalarKernel) ( *KernelCtx,  *ExecSpan,  *ExecResult) error {
	return .ExecFn(, , )
}

func ( ScalarKernel) () NullHandling { return .NullHandling }
func ( ScalarKernel) () MemAlloc         { return .MemAlloc }
func ( ScalarKernel) () bool           { return .CanWriteIntoSlices }

// ChunkedExec is the signature for executing a stateful vector kernel
// against a ChunkedArray input. It is optional
type ChunkedExec func(*KernelCtx, []*arrow.Chunked, *ExecResult) ([]*ExecResult, error)

// FinalizeFunc is an optional finalizer function for any postprocessing
// that may need to be done on data before returning it
type FinalizeFunc func(*KernelCtx, []*ArraySpan) ([]*ArraySpan, error)

// VectorKernel is a structure for implementations of vector functions.
// It can optionally contain a finalizer function, the null handling
// and memory pre-allocation preferences (different defaults from
// scalar kernels when using NewVectorKernel), and other execution related
// options.
type VectorKernel struct {
	kernel

	ExecFn              ArrayKernelExec
	ExecChunked         ChunkedExec
	Finalize            FinalizeFunc
	NullHandling        NullHandling
	MemAlloc            MemAlloc
	CanWriteIntoSlices  bool
	CanExecuteChunkWise bool
	OutputChunked       bool
}

// NewVectorKernel constructs a new kernel for execution of vector functions,
// which take into account more than just the individual scalar values
// of its input. Output of a vector kernel may be a different length
// than its inputs.
func ( []InputType,  OutputType,  ArrayKernelExec,  KernelInitFn) VectorKernel {
	return NewVectorKernelWithSig(&KernelSignature{
		InputTypes: , OutType: }, , )
}

// NewVectorKernelWithSig is a convenience function for creating a kernel
// when you already have a signature constructed.
func ( *KernelSignature,  ArrayKernelExec,  KernelInitFn) VectorKernel {
	return VectorKernel{
		kernel:              kernel{Signature: , Init: , Parallelizable: true},
		ExecFn:              ,
		CanWriteIntoSlices:  true,
		CanExecuteChunkWise: true,
		OutputChunked:       true,
		NullHandling:        NullComputedNoPrealloc,
		MemAlloc:            MemNoPrealloc,
	}
}

func ( *VectorKernel) ( *KernelCtx,  *ExecSpan,  *ExecResult) error {
	return .ExecFn(, , )
}

func ( VectorKernel) () NullHandling { return .NullHandling }
func ( VectorKernel) () MemAlloc         { return .MemAlloc }
func ( VectorKernel) () bool           { return .CanWriteIntoSlices }
func ( VectorKernel) () error                { return nil }