// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18

package compute

import (
	
	
	
	
	
	

	
	
	
	
	
	
	
	
)

// ExecCtx holds simple contextual information for execution
// such as the default ChunkSize for batch iteration, whether or not
// to ensure contiguous preallocations for kernels that want preallocation,
// and a reference to the desired function registry to use.
//
// An ExecCtx should be placed into a context.Context by using
// SetExecCtx and GetExecCtx to pass it along for execution.
type ExecCtx struct {
	// ChunkSize is the size used when iterating batches for execution
	// ChunkSize elements will be operated on as a time unless an argument
	// is a chunkedarray with a chunk that is smaller
	ChunkSize int64
	// PreallocContiguous determines whether preallocating memory for
	// execution of compute attempts to preallocate a full contiguous
	// buffer for all of the chunks beforehand.
	PreallocContiguous bool
	// Registry allows specifying the Function Registry to utilize
	// when searching for kernel implementations.
	Registry FunctionRegistry
	// ExecChannelSize is the size of the channel used for passing
	// exec results to the WrapResults function.
	ExecChannelSize int
	// NumParallel determines the number of parallel goroutines
	// allowed for parallel executions.
	NumParallel int
}

type ctxExecKey struct{}

const DefaultMaxChunkSize = math.MaxInt64

var (
	// global default ExecCtx object, initialized with the
	// default max chunk size, contiguous preallocations, and
	// the default function registry.
	defaultExecCtx ExecCtx

	// WithAllocator returns a new context with the provided allocator
	// embedded into the context.
	WithAllocator = exec.WithAllocator
	// GetAllocator retrieves the allocator from the context, or returns
	// memory.DefaultAllocator if there was no allocator in the provided
	// context.
	GetAllocator = exec.GetAllocator
)

// DefaultExecCtx returns the default exec context which will be used
// if there is no ExecCtx set into the context for execution.
//
// This can be called to get a copy of the default values which can
// then be modified to set into a context.
//
// The default exec context uses the following values:
//   - ChunkSize = DefaultMaxChunkSize (MaxInt64)
//   - PreallocContiguous = true
//   - Registry = GetFunctionRegistry()
//   - ExecChannelSize = 10
//   - NumParallel = runtime.NumCPU()
func () ExecCtx { return defaultExecCtx }

func init() {
	defaultExecCtx.ChunkSize = DefaultMaxChunkSize
	defaultExecCtx.PreallocContiguous = true
	defaultExecCtx.Registry = GetFunctionRegistry()
	defaultExecCtx.ExecChannelSize = 10
	// default level of parallelism
	// set to 1 to disable parallelization
	defaultExecCtx.NumParallel = runtime.NumCPU()
}

// SetExecCtx returns a new child context containing the passed in ExecCtx
func ( context.Context,  ExecCtx) context.Context {
	return context.WithValue(, ctxExecKey{}, )
}

// GetExecCtx returns an embedded ExecCtx from the provided context.
// If it does not contain an ExecCtx, then the default one is returned.
func ( context.Context) ExecCtx {
	,  := .Value(ctxExecKey{}).(ExecCtx)
	if  {
		return 
	}
	return defaultExecCtx
}

// ExecBatch is a unit of work for kernel execution. It contains a collection
// of Array and Scalar values.
//
// ExecBatch is semantically similar to a RecordBatch but for a SQL-style
// execution context. It represents a collection or records, but constant
// "columns" are represented by Scalar values rather than having to be
// converted into arrays with repeated values.
type ExecBatch struct {
	Values []Datum
	// Guarantee is a predicate Expression guaranteed to evaluate to true for
	// all rows in this batch.
	// Guarantee Expression
	// Len is the semantic length of this ExecBatch. When the values are
	// all scalars, the length should be set to 1 for non-aggregate kernels.
	// Otherwise the length is taken from the array values. Aggregate kernels
	// can have an ExecBatch formed by projecting just the partition columns
	// from a batch in which case it would have scalar rows with length > 1
	//
	// If the array values are of length 0, then the length is 0 regardless of
	// whether any values are Scalar.
	Len int64
}

func ( ExecBatch) () int { return len(.Values) }

// simple struct for defining how to preallocate a particular buffer.
type bufferPrealloc struct {
	bitWidth int
	addLen   int
}

func allocateDataBuffer( *exec.KernelCtx, ,  int) *memory.Buffer {
	switch  {
	case 1:
		return .AllocateBitmap(int64())
	default:
		 := int(bitutil.BytesForBits(int64( * )))
		return .Allocate()
	}
}

func addComputeDataPrealloc( arrow.DataType,  []bufferPrealloc) []bufferPrealloc {
	if ,  := .(arrow.FixedWidthDataType);  {
		return append(, bufferPrealloc{bitWidth: .BitWidth()})
	}

	switch .ID() {
	case arrow.BINARY, arrow.STRING, arrow.LIST, arrow.MAP:
		return append(, bufferPrealloc{bitWidth: 32, addLen: 1})
	case arrow.LARGE_BINARY, arrow.LARGE_STRING, arrow.LARGE_LIST:
		return append(, bufferPrealloc{bitWidth: 64, addLen: 1})
	case arrow.STRING_VIEW, arrow.BINARY_VIEW:
		return append(, bufferPrealloc{bitWidth: arrow.ViewHeaderSizeBytes * 8})
	}
	return 
}

// enum to define a generalized assumption of the nulls in the inputs
type nullGeneralization int8

const (
	nullGenPerhapsNull nullGeneralization = iota
	nullGenAllValid
	nullGenAllNull
)

func getNullGen( *exec.ExecValue) nullGeneralization {
	 := .Type().ID()
	switch {
	case  == arrow.NULL:
		return nullGenAllNull
	case !internal.DefaultHasValidityBitmap():
		return nullGenAllValid
	case .IsScalar():
		if .Scalar.IsValid() {
			return nullGenAllValid
		}
		return nullGenAllNull
	default:
		 := .Array
		// do not count if they haven't been counted already
		if .Nulls == 0 || .Buffers[0].Buf == nil {
			return nullGenAllValid
		}

		if .Nulls == .Len {
			return nullGenAllNull
		}
	}
	return nullGenPerhapsNull
}

func getNullGenDatum( Datum) nullGeneralization {
	var  exec.ExecValue
	switch .Kind() {
	case KindArray:
		.Array.SetMembers(.(*ArrayDatum).Value)
	case KindScalar:
		.Scalar = .(*ScalarDatum).Value
	case KindChunked:
		return nullGenPerhapsNull
	default:
		debug.Assert(false, "should be array, scalar, or chunked!")
		return nullGenPerhapsNull
	}
	return getNullGen(&)
}

// populate the validity bitmaps with the intersection of the nullity
// of the arguments. If a preallocated bitmap is not provided, then one
// will be allocated if needed (in some cases a bitmap can be zero-copied
// from the arguments). If any Scalar value is null, then the entire
// validity bitmap will be set to null.
func propagateNulls( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ArraySpan) ( error) {
	if .Type.ID() == arrow.NULL {
		// null output type is a no-op (rare but it happens)
		return
	}

	// this function is ONLY able to write into output with non-zero offset
	// when the bitmap is preallocated.
	if .Offset != 0 && .Buffers[0].Buf == nil {
		return fmt.Errorf("%w: can only propagate nulls into pre-allocated memory when output offset is non-zero", arrow.ErrInvalid)
	}

	var (
		 = make([]*exec.ArraySpan, 0, len(.Values))
		     bool
		      = .Buffers[0].Buf != nil
	)

	for  := range .Values {
		 := &.Values[]
		 := getNullGen()
		if  == nullGenAllNull {
			 = true
		}
		if  != nullGenAllValid && .IsArray() {
			 = append(, &.Array)
		}
	}

	 := .Buffers[0].Buf
	if  {
		// an all-null value gives us a short circuit opportunity
		// output should all be null
		.Nulls = .Len
		if  {
			bitutil.SetBitsTo(, .Offset, .Len, false)
			return
		}

		// walk all the values with nulls instead of breaking on the first
		// in case we find a bitmap that can be reused in the non-preallocated case
		for ,  := range  {
			if .Nulls == .Len && .Buffers[0].Owner != nil {
				 := .GetBuffer(0)
				.Retain()
				.Buffers[0].Buf = .Bytes()
				.Buffers[0].Owner = 
				return
			}
		}

		 := .AllocateBitmap(int64(.Len))
		.Buffers[0].Owner = 
		.Buffers[0].Buf = .Bytes()
		.Buffers[0].SelfAlloc = true
		bitutil.SetBitsTo(.Buffers[0].Buf, .Offset, .Len, false)
		return
	}

	.Nulls = array.UnknownNullCount
	switch len() {
	case 0:
		.Nulls = 0
		if  {
			bitutil.SetBitsTo(, .Offset, .Len, true)
		}
	case 1:
		 := [0]
		.Nulls = .Nulls
		if  {
			bitutil.CopyBitmap(.Buffers[0].Buf, int(.Offset), int(.Len), , int(.Offset))
			return
		}

		switch {
		case .Offset == 0:
			.Buffers[0] = .Buffers[0]
			.Buffers[0].Owner.Retain()
		case .Offset%8 == 0:
			 := memory.SliceBuffer(.GetBuffer(0), int(.Offset)/8, int(bitutil.BytesForBits(.Len)))
			.Buffers[0].Buf = .Bytes()
			.Buffers[0].Owner = 
		default:
			 := .AllocateBitmap(int64(.Len))
			.Buffers[0].Owner = 
			.Buffers[0].Buf = .Bytes()
			.Buffers[0].SelfAlloc = true
			bitutil.CopyBitmap(.Buffers[0].Buf, int(.Offset), int(.Len), .Buffers[0].Buf, 0)
		}
		return

	default:
		if ! {
			 := .AllocateBitmap(int64(.Len))
			.Buffers[0].Owner = 
			.Buffers[0].Buf = .Bytes()
			.Buffers[0].SelfAlloc = true
			 = .Buffers[0].Buf
		}

		 := func(,  *exec.ArraySpan) {
			debug.Assert(.Buffers[0].Buf != nil, "invalid intersection for null propagation")
			debug.Assert(.Buffers[0].Buf != nil, "invalid intersection for null propagation")
			bitutil.BitmapAnd(.Buffers[0].Buf, .Buffers[0].Buf, .Offset, .Offset, , .Offset, .Len)
		}

		([0], [1])
		for ,  := range [2:] {
			(, )
		}
	}
	return
}

func inferBatchLength( []Datum) ( int64,  bool) {
	,  = -1, true
	 := true
	for ,  := range  {
		switch arg := .(type) {
		case *ArrayDatum:
			 := .Len()
			if  < 0 {
				 = 
			} else {
				if  !=  {
					 = false
					return
				}
			}
			 = false
		case *ChunkedDatum:
			 := .Len()
			if  < 0 {
				 = 
			} else {
				if  !=  {
					 = false
					return
				}
			}
			 = false
		}
	}

	if  && len() > 0 {
		 = 1
	} else if  < 0 {
		 = 0
	}
	 = true
	return
}

// KernelExecutor is the interface for all executors to initialize and
// call kernel execution functions on batches.
type KernelExecutor interface {
	// Init must be called *after* the kernel's init method and any
	// KernelState must be set into the KernelCtx *before* calling
	// this Init method. This is to facilitate the case where
	// Init may be expensive and does not need to be called
	// again for each execution of the kernel. For example,
	// the same lookup table can be re-used for all scanned batches
	// in a dataset filter.
	Init(*exec.KernelCtx, exec.KernelInitArgs) error
	// Execute the kernel for the provided batch and pass the resulting
	// Datum values to the provided channel.
	Execute(context.Context, *ExecBatch, chan<- Datum) error
	// WrapResults exists for the case where an executor wants to post process
	// the batches of result datums. Such as creating a ChunkedArray from
	// multiple output batches or so on. Results from individual batch
	// executions should be read from the out channel, and WrapResults should
	// return the final Datum result.
	WrapResults(ctx context.Context, out <-chan Datum, chunkedArgs bool) Datum
	// CheckResultType checks the actual result type against the resolved
	// output type. If the types don't match an error is returned
	CheckResultType(out Datum) error
	// Clear resets the state in the executor so that it can be reused.
	Clear()
}

// the base implementation for executing non-aggregate kernels.
type nonAggExecImpl struct {
	ctx              *exec.KernelCtx
	ectx             ExecCtx
	kernel           exec.NonAggKernel
	outType          arrow.DataType
	numOutBuf        int
	dataPrealloc     []bufferPrealloc
	preallocValidity bool
}

func ( *nonAggExecImpl) () {
	.ctx, .kernel, .outType = nil, nil, nil
	if .dataPrealloc != nil {
		.dataPrealloc = .dataPrealloc[:0]
	}
}

func ( *nonAggExecImpl) ( *exec.KernelCtx,  exec.KernelInitArgs) ( error) {
	.ctx, .kernel = , .Kernel.(exec.NonAggKernel)
	.outType,  = .kernel.GetSig().OutType.Resolve(, .Inputs)
	.ectx = GetExecCtx(.Ctx)
	return
}

func ( *nonAggExecImpl) ( int) *exec.ExecResult {
	var  = array.UnknownNullCount

	if .kernel.GetNullHandling() == exec.NullNoOutput {
		 = 0
	}

	 := &exec.ArraySpan{
		Type:  .outType,
		Len:   int64(),
		Nulls: int64(),
	}

	if .preallocValidity {
		 := .ctx.AllocateBitmap(int64())
		.Buffers[0].Owner = 
		.Buffers[0].Buf = .Bytes()
		.Buffers[0].SelfAlloc = true
	}

	for ,  := range .dataPrealloc {
		if .bitWidth >= 0 {
			 := allocateDataBuffer(.ctx, +.addLen, .bitWidth)
			.Buffers[+1].Owner = 
			.Buffers[+1].Buf = .Bytes()
			.Buffers[+1].SelfAlloc = true
		}
	}

	return 
}

func ( *nonAggExecImpl) ( Datum) error {
	 := .(ArrayLikeDatum).Type()
	if  != nil && !arrow.TypeEqual(.outType, ) {
		return fmt.Errorf("%w: kernel type result mismatch: declared as %s, actual is %s",
			arrow.ErrType, .outType, )
	}
	return nil
}

type spanIterator func() (exec.ExecSpan, int64, bool)

func () KernelExecutor { return &scalarExecutor{} }

type scalarExecutor struct {
	nonAggExecImpl

	elideValidityBitmap bool
	preallocAllBufs     bool
	preallocContiguous  bool
	allScalars          bool
	iter                spanIterator
	iterLen             int64
}

func ( *scalarExecutor) ( context.Context,  *ExecBatch,  chan<- Datum) ( error) {
	.allScalars, .iter,  = iterateExecSpans(, .ectx.ChunkSize, true)
	if  != nil {
		return
	}

	.iterLen = .Len

	if .Len == 0 {
		 := array.MakeArrayOfNull(exec.GetAllocator(.ctx.Ctx), .outType, 0)
		defer .Release()
		 := &exec.ArraySpan{}
		.SetMembers(.Data())
		return .emitResult(, )
	}

	if  = .setupPrealloc(.Len, .Values);  != nil {
		return
	}

	return .executeSpans()
}

func ( *scalarExecutor) ( context.Context,  <-chan Datum,  bool) Datum {
	var (
		 Datum
		    []arrow.Array
	)

	 := func() {
		 = .(ArrayLikeDatum).Chunks()
		.Release()
		 = nil
	}

	// get first output
	select {
	case <-.Done():
		return nil
	case  = <-:
		// if the inputs contained at least one chunked array
		// then we want to return chunked output
		if  {
			()
		}
	}

	for {
		select {
		case <-.Done():
			// context is done, either cancelled or a timeout.
			// either way, we end early and return what we've got so far.
			return 
		case ,  := <-:
			if ! { // channel closed, wrap it up
				if  != nil {
					return 
				}

				for ,  := range  {
					defer .Release()
				}

				 := arrow.NewChunked(.outType, )
				defer .Release()
				return NewDatum()
			}

			// if we get multiple batches of output, then we need
			// to return it as a chunked array.
			if  == nil {
				()
			}

			defer .Release()
			if .Len() == 0 { // skip any empty batches
				continue
			}

			 = append(, .(*ArrayDatum).MakeArray())
		}
	}
}

func ( *scalarExecutor) ( chan<- Datum) ( error) {
	defer func() {
		 = errors.Join(, .kernel.Cleanup())
	}()

	var (
		  exec.ExecSpan
		 exec.ExecResult
		   bool
	)

	if .preallocContiguous {
		// make one big output alloc
		 := .prepareOutput(int(.iterLen))

		.Offset = 0
		var  int64
		var  int64
		for  == nil {
			if , ,  = .iter(); ! {
				break
			}
			.SetSlice(, .Len)
			 = .executeSingleSpan(&, )
			 = 
		}
		if  != nil {
			.Release()
			return
		}

		if .Offset != 0 {
			.SetSlice(0, .iterLen)
		}

		return .emitResult(, )
	}

	// fully preallocating, but not contiguously
	// we (maybe) preallocate only for the output of processing
	// the current chunk
	for  == nil {
		if , _,  = .iter(); ! {
			break
		}

		 = *.prepareOutput(int(.Len))
		if  = .executeSingleSpan(&, &);  != nil {
			.Release()
			return
		}
		 = .emitResult(&, )
	}

	return
}

func ( *scalarExecutor) ( *exec.ExecSpan,  *exec.ExecResult) error {
	switch {
	case .Type.ID() == arrow.NULL:
		.Nulls = .Len
	case .kernel.GetNullHandling() == exec.NullIntersection:
		if !.elideValidityBitmap {
			propagateNulls(.ctx, , )
		}
	case .kernel.GetNullHandling() == exec.NullNoOutput:
		.Nulls = 0
	}
	return .kernel.Exec(.ctx, , )
}

func ( *scalarExecutor) ( int64,  []Datum) error {
	.numOutBuf = len(.outType.Layout().Buffers)
	 := .outType.ID()
	// default to no validity pre-allocation for the following cases:
	// - Output Array is NullArray
	// - kernel.NullHandling is ComputeNoPrealloc or OutputNotNull
	.preallocValidity = false

	if  != arrow.NULL {
		switch .kernel.GetNullHandling() {
		case exec.NullComputedPrealloc:
			.preallocValidity = true
		case exec.NullIntersection:
			.elideValidityBitmap = true
			for ,  := range  {
				 := getNullGenDatum() == nullGenAllValid
				.elideValidityBitmap = .elideValidityBitmap && 
			}
			.preallocValidity = !.elideValidityBitmap
		case exec.NullNoOutput:
			.elideValidityBitmap = true
		}
	}

	if .kernel.GetMemAlloc() == exec.MemPrealloc {
		.dataPrealloc = addComputeDataPrealloc(.outType, .dataPrealloc)
	}

	// validity bitmap either preallocated or elided, and all data buffers allocated
	// this is basically only true for primitive types that are not dict-encoded
	.preallocAllBufs =
		((.preallocValidity || .elideValidityBitmap) && len(.dataPrealloc) == (.numOutBuf-1) &&
			!arrow.IsNested() &&  != arrow.DICTIONARY)

	// contiguous prealloc only possible on non-nested types if all
	// buffers are preallocated. otherwise we have to go chunk by chunk
	//
	// some kernels are also unable to write into sliced outputs, so
	// we respect the kernel's attributes
	.preallocContiguous =
		(.ectx.PreallocContiguous && .kernel.CanFillSlices() &&
			.preallocAllBufs)

	return nil
}

func ( *scalarExecutor) ( *exec.ArraySpan,  chan<- Datum) error {
	var  Datum
	if len(.Buffers[0].Buf) != 0 {
		.UpdateNullCount()
	}
	if .allScalars {
		// we boxed scalar inputs as ArraySpan so now we have to unbox the output
		 := .MakeArray()
		defer .Release()
		,  := scalar.GetScalar(, 0)
		if  != nil {
			return 
		}
		if ,  := .(scalar.Releasable);  {
			defer .Release()
		}
		 = NewDatum()
	} else {
		 := .MakeData()
		defer .Release()
		 = NewDatum()
	}
	 <- 
	return nil
}

func checkAllIsValue( []Datum) error {
	for ,  := range  {
		if !DatumIsValue() {
			return fmt.Errorf("%w: tried executing function with non-value type: %s",
				arrow.ErrInvalid, )
		}
	}
	return nil
}

func checkIfAllScalar( *ExecBatch) bool {
	for ,  := range .Values {
		if .Kind() != KindScalar {
			return false
		}
	}
	return .NumValues() > 0
}

// iterateExecSpans sets up and returns a function which can iterate a batch
// according to the chunk sizes. If the inputs contain chunked arrays, then
// we will find the min(chunk sizes, maxChunkSize) to ensure we return
// contiguous spans to execute on.
//
// the iteration function returns the next span to execute on, the current
// position in the full batch, and a boolean indicating whether or not
// a span was actually returned (there is data to process).
func iterateExecSpans( *ExecBatch,  int64,  bool) ( bool,  spanIterator,  error) {
	if .NumValues() > 0 {
		,  := inferBatchLength(.Values)
		if  != .Len {
			return false, nil, fmt.Errorf("%w: value lengths differed from execbatch length", arrow.ErrInvalid)
		}
		if ! {
			return false, nil, fmt.Errorf("%w: array args must all be the same length", arrow.ErrInvalid)
		}
	}

	var (
		           = .Values
		    bool
		           = make([]int, len())
		       = make([]int64, len())
		         = make([]int64, len())
		,     int64 = 0, .Len
	)
	 = checkIfAllScalar()
	 = exec.Min(, )

	 := exec.ExecSpan{Values: make([]exec.ExecValue, len()), Len: 0}
	for ,  := range  {
		switch arg := .(type) {
		case *ScalarDatum:
			.Values[].Scalar = .Value
		case *ArrayDatum:
			.Values[].Array.SetMembers(.Value)
			[] = int64(.Value.Offset())
		case *ChunkedDatum:
			// populate from first chunk
			 := .Value
			if len(.Chunks()) > 0 {
				 := .Chunk(0).Data()
				.Values[].Array.SetMembers()
				[] = int64(.Offset())
			} else {
				// fill as zero len
				exec.FillZeroLength(.DataType(), &.Values[].Array)
			}
			 = true
		}
	}

	if  &&  {
		exec.PromoteExecSpanScalars()
	}

	 := func( int64,  exec.ExecSpan) int64 {
		for  := 0;  < len() &&  > 0; ++ {
			// if the argument is not chunked, it's either a scalar or an array
			// in which case it doesn't influence the size of the span
			,  := [].(*ChunkedDatum)
			if ! {
				continue
			}

			 := .Value
			if len(.Chunks()) == 0 {
				 = 0
				continue
			}

			var  arrow.Array
			for {
				 = .Chunk([])
				if [] == int64(.Len()) {
					// chunk is zero-length, or was exhausted in the previous
					// iteration, move to next chunk
					[]++
					 = .Chunk([])
					.Values[].Array.SetMembers(.Data())
					[] = 0
					[] = int64(.Data().Offset())
					continue
				}
				break
			}
			 = exec.Min(int64(.Len())-[], )
		}
		return 
	}

	return , func() (exec.ExecSpan, int64, bool) {
		if  ==  {
			return exec.ExecSpan{}, , false
		}

		 := exec.Min(-, )
		if  {
			 = (, )
		}

		.Len = 
		for ,  := range  {
			if .Kind() != KindScalar {
				.Values[].Array.SetSlice([]+[], )
				[] += 
			}
		}

		 += 
		debug.Assert( <= , "bad state for iteration exec span")
		return , , true
	}, nil
}

var (
	// have a pool of scalar executors to avoid excessive object creation
	scalarExecPool = sync.Pool{
		New: func() any { return &scalarExecutor{} },
	}
	vectorExecPool = sync.Pool{
		New: func() any { return &vectorExecutor{} },
	}
)

func checkCanExecuteChunked( *exec.VectorKernel) error {
	if .ExecChunked == nil {
		return fmt.Errorf("%w: vector kernel cannot execute chunkwise and no chunked exec function defined", arrow.ErrInvalid)
	}

	if .NullHandling == exec.NullIntersection {
		return fmt.Errorf("%w: null pre-propagation is unsupported for chunkedarray execution in vector kernels", arrow.ErrInvalid)
	}
	return nil
}

type vectorExecutor struct {
	nonAggExecImpl

	iter    spanIterator
	results []*exec.ArraySpan
	iterLen int64

	allScalars bool
}

func ( *vectorExecutor) ( context.Context,  *ExecBatch,  chan<- Datum) ( error) {
	 := .kernel.(*exec.VectorKernel).Finalize
	if  != nil {
		if .results == nil {
			.results = make([]*exec.ArraySpan, 0, 1)
		} else {
			.results = .results[:0]
		}
	}
	// some vector kernels have a separate code path for handling chunked
	// arrays (VectorKernel.ExecChunked) so we check for any chunked
	// arrays. If we do and an ExecChunked function is defined
	// then we call that.
	 := haveChunkedArray(.Values)
	.numOutBuf = len(.outType.Layout().Buffers)
	.preallocValidity = .kernel.GetNullHandling() != exec.NullComputedNoPrealloc &&
		.kernel.GetNullHandling() != exec.NullNoOutput
	if .kernel.GetMemAlloc() == exec.MemPrealloc {
		.dataPrealloc = addComputeDataPrealloc(.outType, .dataPrealloc)
	}

	if .kernel.(*exec.VectorKernel).CanExecuteChunkWise {
		.allScalars, .iter,  = iterateExecSpans(, .ectx.ChunkSize, true)
		.iterLen = .Len

		var (
			 exec.ExecSpan
			  bool
		)
		if .iterLen == 0 {
			.Values = make([]exec.ExecValue, .NumValues())
			for ,  := range .Values {
				exec.FillZeroLength(.(ArrayLikeDatum).Type(), &.Values[].Array)
			}
			 = .exec(&, )
		}
		for  == nil {
			if , _,  = .iter(); ! {
				break
			}
			 = .exec(&, )
		}
		if  != nil {
			return
		}
	} else {
		// kernel cannot execute chunkwise. if we have any chunked arrays,
		// then execchunked must be defined or we raise an error
		if  {
			if  = .execChunked(, );  != nil {
				return
			}
		} else {
			// no chunked arrays. we pack the args into an execspan
			// and call regular exec code path
			 := ExecSpanFromBatch()
			if checkIfAllScalar() {
				exec.PromoteExecSpanScalars(*)
			}
			if  = .exec(, );  != nil {
				return
			}
		}
	}

	if  != nil {
		// intermediate results require post-processing after execution is
		// completed (possibly involving some accumulated state)
		,  := (.ctx, .results)
		if  != nil {
			return 
		}

		for ,  := range  {
			 := .MakeData()
			defer .Release()
			 <- NewDatum()
		}
	}

	return nil
}

func ( *vectorExecutor) ( context.Context,  <-chan Datum,  bool) Datum {
	// if kernel doesn't output chunked, just grab the one output and return it
	if !.kernel.(*exec.VectorKernel).OutputChunked {
		var  Datum
		select {
		case <-.Done():
			return nil
		case  = <-:
		}

		// we got an output datum, but let's wait for the channel to
		// close so we don't have any race conditions
		select {
		case <-.Done():
			.Release()
			return nil
		case <-:
			return 
		}
	}

	// if execution yielded multiple chunks then the result is a chunked array
	var (
		 Datum
		    []arrow.Array
	)

	 := func() {
		 := .(ArrayLikeDatum).Chunks()
		 = make([]arrow.Array, 0, len())
		for ,  := range  {
			if .Len() > 0 {
				 = append(, )
			}
		}
		if .Kind() != KindChunked {
			.Release()
		}
		 = nil
	}

	// get first output
	select {
	case <-.Done():
		return nil
	case  = <-:
		if  == nil || .Err() != nil {
			return nil
		}

		// if the inputs contained at least one chunked array
		// then we want to return chunked output
		if  {
			()
		}
	}

	for {
		select {
		case <-.Done():
			// context is done, either cancelled or a timeout.
			// either way, we end early and return what we've got so far.
			return 
		case ,  := <-:
			if ! { // channel closed, wrap it up
				if  != nil {
					return 
				}

				for ,  := range  {
					defer .Release()
				}

				 := arrow.NewChunked(.outType, )
				defer .Release()
				return NewDatum()
			}

			// if we get multiple batches of output, then we need
			// to return it as a chunked array.
			if  == nil {
				()
			}

			defer .Release()
			if .Len() == 0 { // skip any empty batches
				continue
			}

			 = append(, .(*ArrayDatum).MakeArray())
		}
	}
}

func ( *vectorExecutor) ( *exec.ExecSpan,  chan<- Datum) ( error) {
	 := .prepareOutput(int(.Len))
	if .kernel.GetNullHandling() == exec.NullIntersection {
		if  = propagateNulls(.ctx, , );  != nil {
			return
		}
	}
	if  = .kernel.Exec(.ctx, , );  != nil {
		return
	}
	return .emitResult(, )
}

func ( *vectorExecutor) ( *exec.ArraySpan,  chan<- Datum) ( error) {
	if .kernel.(*exec.VectorKernel).Finalize == nil {
		 := .MakeData()
		defer .Release()
		 <- NewDatum()
	} else {
		.results = append(.results, )
	}
	return nil
}

func ( *vectorExecutor) ( *ExecBatch,  chan<- Datum) error {
	if  := checkCanExecuteChunked(.kernel.(*exec.VectorKernel));  != nil {
		return 
	}

	 := .prepareOutput(int(.Len))
	 := make([]*arrow.Chunked, len(.Values))
	for ,  := range .Values {
		switch val := .(type) {
		case *ArrayDatum:
			 := .Chunks()
			[] = arrow.NewChunked(.Type(), )
			[0].Release()
			defer [].Release()
		case *ChunkedDatum:
			[] = .Value
		default:
			return fmt.Errorf("%w: handling with exec chunked", arrow.ErrNotImplemented)
		}
	}
	,  := .kernel.(*exec.VectorKernel).ExecChunked(.ctx, , )
	if  != nil {
		return 
	}

	if len() == 0 {
		 := .MakeArray()
		defer .Release()
		 <- &ChunkedDatum{Value: arrow.NewChunked(.Type, []arrow.Array{})}
		return nil
	}

	for ,  := range  {
		if  := .emitResult(, );  != nil {
			return 
		}
	}
	return nil
}