// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18

package kernels

import (
	
	

	
	
	
	
	
	
	
)

type NullSelectionBehavior int8

const (
	DropNulls NullSelectionBehavior = iota
	EmitNulls
)

type FilterOptions struct {
	NullSelection NullSelectionBehavior `compute:"null_selection_behavior"`
}

func (FilterOptions) () string { return "FilterOptions" }

type FilterState = FilterOptions

type TakeOptions struct {
	BoundsCheck bool
}

func (TakeOptions) () string { return "TakeOptions" }

type TakeState = TakeOptions

func getFilterOutputSize( *exec.ArraySpan,  NullSelectionBehavior) ( int64) {
	if .MayHaveNulls() {
		 := bitutils.NewBinaryBitBlockCounter(.Buffers[1].Buf,
			.Buffers[0].Buf, .Offset, .Offset, .Len)

		 := int64(0)
		if  == EmitNulls {
			for  < .Len {
				 := .NextOrNotWord()
				 += int64(.Popcnt)
				 += int64(.Len)
			}
		} else {
			for  < .Len {
				 := .NextAndWord()
				 += int64(.Popcnt)
				 += int64(.Len)
			}
		}
		return
	}

	// filter has no nulls, so we can just use CountSetBits
	return int64(bitutil.CountSetBits(.Buffers[1].Buf, int(.Offset), int(.Len)))
}

func preallocateData( *exec.KernelCtx,  int64,  int,  bool,  *exec.ExecResult) {
	.Len = 
	if  {
		.Buffers[0].WrapBuffer(.AllocateBitmap())
	}
	if  == 1 {
		.Buffers[1].WrapBuffer(.AllocateBitmap())
	} else {
		.Buffers[1].WrapBuffer(.Allocate(int() * ( / 8)))
	}
}

type builder[ any] interface {
	array.Builder
	Append()
	UnsafeAppend()
	UnsafeAppendBoolToBitmap(bool)
}

func getTakeIndices[ arrow.IntType | arrow.UintType]( memory.Allocator,  *exec.ArraySpan,  NullSelectionBehavior) arrow.ArrayData {
	var (
		      = .Buffers[1].Buf
		 = .MayHaveNulls()
		   = .Buffers[0].Buf
		         = arrow.GetDataType[]()
	)

	if  &&  == EmitNulls {
		// Most complex case: the filter may have nulls and we don't drop them.
		// The logic is ternary:
		// - filter is null: emit null
		// - filter is valid and true: emit index
		// - filter is valid and false: don't emit anything

		 := array.NewBuilder(, ).(builder[])
		defer .Release()

		// position relative to start of filter
		var  
		// current position taking the filter offset into account
		 := .Offset

		// to count blocks where filterData[i] || !filterIsValid[i]
		 := bitutils.NewBinaryBitBlockCounter(, , .Offset, .Offset, .Len)
		 := bitutils.NewBitBlockCounter(, .Offset, .Len)
		for int64() < .Len {
			// true OR NOT valid
			 := .NextOrNotWord()
			if .NoneSet() {
				 += (.Len)
				 += int64(.Len)
				continue
			}
			.Reserve(int(.Popcnt))

			// if the values are all valid and the selectedOrNullBlock
			// is full, then we can infer that all the values are true
			// and skip the bit checking
			 := .NextWord()
			if .AllSet() && .AllSet() {
				// all the values are selected and non-null
				for  := 0;  < int(.Len); ++ {
					.UnsafeAppend()
					++
				}
				 += int64(.Len)
			} else {
				// some of the values are false or null
				for  := 0;  < int(.Len); ++ {
					if bitutil.BitIsSet(, int()) {
						if bitutil.BitIsSet(, int()) {
							.UnsafeAppend()
						}
					} else {
						// null slot, append null
						.UnsafeAppendBoolToBitmap(false)
					}
					++
					++
				}
			}
		}

		 := .NewArray()
		defer .Release()
		.Data().Retain()
		return .Data()
	}

	 := newBufferBuilder[]()
	if  {
		// the filter may have nulls, so we scan the validity bitmap
		// and the filter data bitmap together
		debug.Assert( == DropNulls, "incorrect nullselect logic")

		// position relative to start of the filter
		var  
		// current position taking the filter offset into account
		 := .Offset

		 := bitutils.NewBinaryBitBlockCounter(, , .Offset, .Offset, .Len)
		for int64() < .Len {
			 := .NextAndWord()
			.reserve(int(.Popcnt))
			if .AllSet() {
				// all the values are selected and non-null
				for  := 0;  < int(.Len); ++ {
					.unsafeAppend()
					++
				}
				 += int64(.Len)
			} else if !.NoneSet() {
				// some values are false or null
				for  := 0;  < int(.Len); ++ {
					if bitutil.BitIsSet(, int()) && bitutil.BitIsSet(, int()) {
						.unsafeAppend()
					}
					++
					++
				}
			} else {
				 += (.Len)
				 += int64(.Len)
			}
		}
	} else {
		// filter has no nulls, so we only need to look for true values
		bitutils.VisitSetBitRuns(, .Offset, .Len,
			func(,  int64) error {
				// append consecutive run of indices
				.reserve(int())
				for  := int64(0);  < ; ++ {
					.unsafeAppend(( + ))
				}
				return nil
			})
	}

	 := .len()
	 := .finish()
	defer .Release()
	return array.NewData(, , []*memory.Buffer{nil, }, nil, 0, 0)
}

func ( memory.Allocator,  *exec.ArraySpan,  NullSelectionBehavior) (arrow.ArrayData, error) {
	debug.Assert(.Type.ID() == arrow.BOOL, "filter should be a boolean array")
	if .Len < math.MaxUint16 {
		return getTakeIndices[uint16](, , ), nil
	} else if .Len < math.MaxUint32 {
		return getTakeIndices[uint32](, , ), nil
	}
	return nil, fmt.Errorf("%w: filter length exceeds UINT32_MAX, consider a different strategy for selecting elements",
		arrow.ErrNotImplemented)
}

type writeFiltered interface {
	OutPos() int
	WriteValue(int64)
	WriteValueSegment(int64, int64)
	WriteNull()
}

type dropNullCounter struct {
	dataCounter         bitutils.BitBlockCounter
	dataValidityCounter bitutils.BinaryBitBlockCounter
	hasValidity         bool
}

func newDropNullCounter( []byte,  []byte,  int64,  int64) *dropNullCounter {
	return &dropNullCounter{
		dataCounter:         *bitutils.NewBitBlockCounter(, , ),
		dataValidityCounter: *bitutils.NewBinaryBitBlockCounter(, , , , ),
		hasValidity:         len() > 0,
	}
}

func ( *dropNullCounter) () bitutils.BitBlockCount {
	if .hasValidity {
		// filter is true AND not null
		return .dataValidityCounter.NextAndWord()
	}
	return .dataCounter.NextWord()
}

func primitiveFilterImpl( writeFiltered,  *exec.ArraySpan,  *exec.ArraySpan,  NullSelectionBehavior,  *exec.ExecResult) {
	var (
		 = .Buffers[0].Buf
		 = .Buffers[0].Buf
		    = .Buffers[1].Buf
		    = .Buffers[0].Buf
	)

	if .Nulls == 0 && .Nulls == 0 {
		// fast filter path when values and filters have no nulls
		bitutils.VisitSetBitRuns(, .Offset, .Len,
			func(,  int64) error {
				.WriteValueSegment(, )
				return nil
			})
		return
	}

	var (
		          = newDropNullCounter(, , .Offset, .Len)
		        = bitutils.NewOptionalBitBlockCounter(, .Offset, .Len)
		 = bitutils.NewOptionalBitBlockCounter(, .Offset, .Len)
		       = func( int64) {
			bitutil.SetBit(, int(.Offset)+.OutPos())
			.WriteValue()
		}
		 = func( int64) {
			bitutil.SetBitTo(, int(.Offset)+.OutPos(),
				bitutil.BitIsSet(, int(.Offset+)))
			.WriteValue()
		}
		 int64
	)

	for  < .Len {
		 := .NextBlock()
		 := .NextWord()
		 := .NextWord()

		switch {
		case .AllSet() && .AllSet():
			// faster path: all values in block are included and not null
			bitutil.SetBitsTo(, .Offset+int64(.OutPos()), int64(.Len), true)
			.WriteValueSegment(, int64(.Len))
			 += int64(.Len)
		case .AllSet():
			// faster: all values are selected, but some are null
			// batch copy bits from values validity bitmap to output validity bitmap
			bitutil.CopyBitmap(, int(.Offset+), int(.Len),
				, int(.Offset)+.OutPos())
			.WriteValueSegment(, int64(.Len))
			 += int64(.Len)
		case .NoneSet() &&  == DropNulls:
			// for this exceedingly common case in low-selectivity filters
			// we can skip further analysis of the data and move onto the next block
			 += int64(.Len)
		default:
			// some filter values are false or null
			if .AllSet() {
				// no values are null
				if .AllSet() {
					// filter is non-null but some values are false
					for  := 0;  < int(.Len); ++ {
						if bitutil.BitIsSet(, int(.Offset+)) {
							()
						}
						++
					}
				} else if  == DropNulls {
					// if any values are selected, they ARE NOT  null
					for  := 0;  < int(.Len); ++ {
						if bitutil.BitIsSet(, int(.Offset+)) &&
							bitutil.BitIsSet(, int(.Offset+)) {
							()
						}
						++
					}
				} else { // nullselect == EmitNulls
					// data values in this block are not null
					for  := 0;  < int(.Len); ++ {
						 := bitutil.BitIsSet(, int(.Offset+))
						if  && bitutil.BitIsSet(, int(.Offset+)) {
							// filter slot is non-null and set
							()
						} else if ! {
							// filter slot is null, so we have a null in the output
							bitutil.ClearBit(, int(.Offset)+.OutPos())
							.WriteNull()
						}
						++
					}
				}
			} else { // !dataBlock.AllSet()
				// some values are null
				if .AllSet() {
					// filter is non-null but some values are false
					for  := 0;  < int(.Len); ++ {
						if bitutil.BitIsSet(, int(.Offset+)) {
							()
						}
						++
					}
				} else if  == DropNulls {
					// if any values are selected they ARE NOT null
					for  := 0;  < int(.Len); ++ {
						if bitutil.BitIsSet(, int(.Offset+)) && bitutil.BitIsSet(, int(.Offset+)) {
							()
						}
						++
					}
				} else { // nullselect == emitnulls
					// Data values in this block are not null
					for  := 0;  < int(.Len); ++ {
						 := bitutil.BitIsSet(, int(.Offset+))
						if  && bitutil.BitIsSet(, int(.Offset+)) {
							// filter slot is non-null and set
							()
						} else if ! {
							// filter slot is null, so we have a null in the output
							bitutil.ClearBit(, int(.Offset)+.OutPos())
							.WriteNull()
						}
						++
					}
				}
			}
		}
	}
}

type filterWriter[ arrow.UintType] struct {
	outPosition  int
	outOffset    int
	valuesOffset int
	valuesData   []
	outData      []
}

func ( *filterWriter[]) () int { return .outPosition }

func ( *filterWriter[]) ( int64) {
	.outData[.outPosition] = .valuesData[]
	.outPosition++
}

func ( *filterWriter[]) (,  int64) {
	copy(.outData[.outPosition:], .valuesData[:+])
	.outPosition += int()
}

func ( *filterWriter[]) () {
	var  
	.outData[.outPosition] = 
	.outPosition++
}

type boolFilterWriter struct {
	outPosition  int
	outOffset    int
	valuesOffset int
	valuesData   []byte
	outData      []byte
}

func ( *boolFilterWriter) () int { return .outPosition }

func ( *boolFilterWriter) ( int64) {
	bitutil.SetBitTo(.outData, .outOffset+.outPosition,
		bitutil.BitIsSet(.valuesData, .valuesOffset+int()))
}

func ( *boolFilterWriter) (,  int64) {
	bitutil.CopyBitmap(.valuesData, .valuesOffset+int(), int(),
		.outData, .outOffset+.outPosition)
	.outPosition += int()
}

func ( *boolFilterWriter) () {
	bitutil.ClearBit(.outData, .outOffset+.outPosition)
	.outPosition++
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		        = &.Values[0].Array
		        = &.Values[1].Array
		 = .State.(FilterState).NullSelection
	)

	.UpdateNullCount()
	.UpdateNullCount()

	 := getFilterOutputSize(, )

	// the output precomputed null count is unknown except in the narrow
	// condition that all the values are non-null and the filter will not
	// cause any new nulls to be created
	if .Nulls == 0 && ( == DropNulls || .Nulls == 0) {
		.Nulls = 0
	} else {
		.Nulls = array.UnknownNullCount
	}

	// when neither the values nor filter is known to have any nulls,
	// we will elect the optimized ExecNonNull path where there is no
	// need to populate a validity bitmap.
	 := .Nulls != 0 || .Nulls != 0
	 := .Type.(arrow.FixedWidthDataType).BitWidth()
	preallocateData(, , , , )

	var  writeFiltered
	switch  {
	case 1:
		 = &boolFilterWriter{
			outOffset:    int(.Offset),
			valuesOffset: int(.Offset),
			outData:      .Buffers[1].Buf,
			valuesData:   .Buffers[1].Buf,
		}
	case 8:
		 = &filterWriter[uint8]{
			outOffset:    int(.Offset),
			valuesOffset: int(.Offset),
			outData:      exec.GetSpanValues[uint8](, 1),
			valuesData:   exec.GetSpanValues[uint8](, 1),
		}
	case 16:
		 = &filterWriter[uint16]{
			outOffset:    int(.Offset),
			valuesOffset: int(.Offset),
			outData:      exec.GetSpanValues[uint16](, 1),
			valuesData:   exec.GetSpanValues[uint16](, 1),
		}
	case 32:
		 = &filterWriter[uint32]{
			outOffset:    int(.Offset),
			valuesOffset: int(.Offset),
			outData:      exec.GetSpanValues[uint32](, 1),
			valuesData:   exec.GetSpanValues[uint32](, 1),
		}
	case 64:
		 = &filterWriter[uint64]{
			outOffset:    int(.Offset),
			valuesOffset: int(.Offset),
			outData:      exec.GetSpanValues[uint64](, 1),
			valuesData:   exec.GetSpanValues[uint64](, 1),
		}
	default:
		return fmt.Errorf("%w: invalid values bit width", arrow.ErrType)
	}

	primitiveFilterImpl(, , , , )
	return nil
}

type primitiveGetter[ arrow.IntType | bool] interface {
	IsValid(int64) bool
	GetValue(int64) 
	NullCount() int64
	Len() int64
}

type boolGetter struct {
	inner  *exec.ArraySpan
	values []byte
}

func ( *boolGetter) ( int64) bool {
	return bitutil.BitIsSet(.inner.Buffers[0].Buf, int(.inner.Offset+))
}

func ( *boolGetter) ( int64) bool {
	return bitutil.BitIsSet(.values, int(.inner.Offset+))
}

func ( *boolGetter) () int64 { return .inner.Nulls }
func ( *boolGetter) () int64       { return .inner.Len }

type primitiveGetterImpl[ arrow.IntType] struct {
	inner  *exec.ArraySpan
	values []
}

func ( *primitiveGetterImpl[]) ( int64) bool {
	return bitutil.BitIsSet(.inner.Buffers[0].Buf, int(.inner.Offset+))
}
func ( *primitiveGetterImpl[]) ( int64)  { return .values[] }
func ( *primitiveGetterImpl[]) () int64   { return .inner.Nulls }
func ( *primitiveGetterImpl[]) () int64         { return .inner.Len }

type chunkedBoolGetter struct {
	inner         *arrow.Chunked
	resolver      *exec.ChunkResolver
	nulls         int64
	len           int64
	chunkLengths  []int64
	valuesData    [][]byte
	valuesIsValid [][]byte
	valuesOffset  []int64
}

func newChunkedBoolGetter( *arrow.Chunked) *chunkedBoolGetter {
	 := len(.Chunks())
	 := make([]int64, )
	 := make([][]byte, )
	 := make([][]byte, )
	 := make([]int64, )

	for ,  := range .Chunks() {
		[] = int64(.Len())
		[] = int64(.Data().Offset())
		[] = .NullBitmapBytes()
		[] = .Data().Buffers()[1].Bytes()
	}

	return &chunkedBoolGetter{
		inner:         ,
		resolver:      exec.NewChunkResolver(.Chunks()),
		nulls:         int64(.NullN()),
		len:           int64(.Len()),
		chunkLengths:  ,
		valuesData:    ,
		valuesIsValid: ,
		valuesOffset:  ,
	}
}

func ( *chunkedBoolGetter) ( int64) bool {
	,  := .resolver.Resolve()
	 := .valuesIsValid[]
	if  == nil {
		return true
	}
	return bitutil.BitIsSet(, int(.valuesOffset[]+))
}

func ( *chunkedBoolGetter) ( int64) bool {
	,  := .resolver.Resolve()
	return bitutil.BitIsSet(.valuesData[], int(.valuesOffset[]+))
}

func ( *chunkedBoolGetter) () int64 { return .nulls }
func ( *chunkedBoolGetter) () int64       { return .len }

type chunkedPrimitiveGetter[ arrow.IntType] struct {
	inner         *arrow.Chunked
	resolver      *exec.ChunkResolver
	nulls         int64
	len           int64
	chunkLengths  []int64
	valuesData    [][]
	valuesIsValid [][]byte
	valuesOffset  []int64
}

func newChunkedPrimitiveGetter[ arrow.IntType]( *arrow.Chunked) *chunkedPrimitiveGetter[] {
	 := len(.Chunks())
	 := make([]int64, )
	 := make([][], )
	 := make([][]byte, )
	 := make([]int64, )

	for ,  := range .Chunks() {
		[] = int64(.Len())
		[] = int64(.Data().Offset())
		[] = .NullBitmapBytes()
		[] = arrow.GetValues[](.Data(), 1)
	}

	return &chunkedPrimitiveGetter[]{
		inner:         ,
		resolver:      exec.NewChunkResolver(.Chunks()),
		nulls:         int64(.NullN()),
		len:           int64(.Len()),
		chunkLengths:  ,
		valuesData:    ,
		valuesIsValid: ,
		valuesOffset:  ,
	}
}

func ( *chunkedPrimitiveGetter[]) ( int64) bool {
	,  := .resolver.Resolve()
	 := .valuesIsValid[]
	if  == nil {
		return true
	}
	return bitutil.BitIsSet(, int(.valuesOffset[]+))
}

func ( *chunkedPrimitiveGetter[]) ( int64)  {
	,  := .resolver.Resolve()
	return .valuesData[][]
}

func ( *chunkedPrimitiveGetter[]) () int64 { return .nulls }
func ( *chunkedPrimitiveGetter[]) () int64       { return .len }

func primitiveTakeImpl[ arrow.UintType,  arrow.IntType]( primitiveGetter[],  *exec.ArraySpan,  *exec.ExecResult) {
	var (
		    = exec.GetSpanValues[](, 1)
		 = .Buffers[0].Buf
		  = .Offset

		    = exec.GetSpanValues[](, 1)
		 = .Buffers[0].Buf
		  = .Offset
	)

	,  := int64(0), int64(0)
	if .NullCount() == 0 && .Nulls == 0 {
		// values and indices are both never null
		// this means we didn't allocate the validity bitmap
		// and can simplify everything
		for ,  := range  {
			[] = .GetValue(int64())
		}
		.Nulls = 0
		return
	}

	 := bitutils.NewOptionalBitBlockCounter(, , .Len)
	for  < .Len {
		 := .NextBlock()
		if .NullCount() == 0 {
			// values are never null, so things are easier
			 += int64(.Popcnt)
			if .AllSet() {
				// fastest path: neither values nor index nulls
				bitutil.SetBitsTo(, +, int64(.Len), true)
				for  := 0;  < int(.Len); ++ {
					[] = .GetValue(int64([]))
					++
				}
			} else if .Popcnt > 0 {
				// slow path: some indices but not all are null
				for  := 0;  < int(.Len); ++ {
					if bitutil.BitIsSet(, int(+)) {
						// index is not null
						bitutil.SetBit(, int(+))
						[] = .GetValue(int64([]))
					}
					++
				}
			} else {
				 += int64(.Len)
			}
		} else {
			// values have nulls, so we must do random access into the values bitmap
			if .AllSet() {
				// faster path: indices are not null but values may be
				for  := 0;  < int(.Len); ++ {
					if .IsValid(int64([])) {
						// value is not null
						[] = .GetValue(int64([]))
						bitutil.SetBit(, int(+))
						++
					}
					++
				}
			} else if .Popcnt > 0 {
				// slow path: some but not all indices are null. since we
				// are doing random access in general we have to check the
				// value nullness one by one
				for  := 0;  < int(.Len); ++ {
					if bitutil.BitIsSet(, int(+)) &&
						.IsValid(int64([])) {
						// index is not null && value is not null
						[] = .GetValue(int64([]))
						bitutil.SetBit(, int(+))
						++
					}
					++
				}
			} else {
				 += int64(.Len)
			}
		}
	}

	.Nulls = .Len - 
}

func booleanTakeImpl[ arrow.UintType]( primitiveGetter[bool],  *exec.ArraySpan,  *exec.ExecResult) {
	var (
		    = exec.GetSpanValues[](, 1)
		 = .Buffers[0].Buf
		  = .Offset

		    = .Buffers[1].Buf
		 = .Buffers[0].Buf
		  = .Offset
	)

	 := func( int64,  ) {
		bitutil.SetBitTo(, int(+), .GetValue(int64()))
	}

	,  := int64(0), int64(0)
	if .NullCount() == 0 && .Nulls == 0 {
		// values and indices are both never null
		// this means we didn't allocate the validity bitmap
		// and can simplify everything
		for ,  := range  {
			(int64(), )
		}
		.Nulls = 0
		return
	}

	 := bitutils.NewOptionalBitBlockCounter(, , .Len)
	for  < .Len {
		 := .NextBlock()
		if .NullCount() == 0 {
			// values are never null so things are easier
			 += int64(.Popcnt)
			if .AllSet() {
				// fastest path: neither values nor index nulls
				bitutil.SetBitsTo(, +, int64(.Len), true)
				for  := 0;  < int(.Len); ++ {
					(, [])
					++
				}
			} else if .Popcnt > 0 {
				// slow path: some but not all indices are null
				for  := 0;  < int(.Len); ++ {
					if bitutil.BitIsSet(, int(+)) {
						// index is not null
						bitutil.SetBit(, int(+))
						(, [])
					}
					++
				}
			} else {
				 += int64(.Len)
			}
		} else {
			// values have nulls so we must do random access into the values bitmap
			if .AllSet() {
				// faster path: indices are not null but values may be
				for  := 0;  < int(.Len); ++ {
					if .IsValid(int64([])) {
						// value is not null
						bitutil.SetBit(, int(+))
						(, [])
						++
					}
					++
				}
			} else if .Popcnt > 0 {
				// slow path: some but not all indices are null.
				// we have to check the values one by one
				for  := 0;  < int(.Len); ++ {
					if bitutil.BitIsSet(, int(+)) &&
						.IsValid(int64([])) {
						(, [])
						bitutil.SetBit(, int(+))
						++
					}
					++
				}
			} else {
				 += int64(.Len)
			}
		}
	}
	.Nulls = .Len - 
}

func booleanTakeDispatchChunked(,  *arrow.Chunked,  []*exec.ExecResult) error {
	 := newChunkedBoolGetter()
	var  func(primitiveGetter[bool], *exec.ArraySpan, *exec.ExecResult)

	switch .DataType().(arrow.FixedWidthDataType).Bytes() {
	case 1:
		 = booleanTakeImpl[uint8]
	case 2:
		 = booleanTakeImpl[uint16]
	case 4:
		 = booleanTakeImpl[uint32]
	case 8:
		 = booleanTakeImpl[uint64]
	default:
		return fmt.Errorf("%w: invalid indices byte width", arrow.ErrIndex)
	}

	var  exec.ArraySpan
	for ,  := range .Chunks() {
		.SetMembers(.Data())
		(, &, [])
	}
	return nil
}

func booleanTakeDispatch(,  *exec.ArraySpan,  *exec.ExecResult) error {
	 := &boolGetter{inner: , values: .Buffers[1].Buf}

	switch .Type.(arrow.FixedWidthDataType).Bytes() {
	case 1:
		booleanTakeImpl[uint8](, , )
	case 2:
		booleanTakeImpl[uint16](, , )
	case 4:
		booleanTakeImpl[uint32](, , )
	case 8:
		booleanTakeImpl[uint64](, , )
	default:
		return fmt.Errorf("%w: invalid indices byte width", arrow.ErrIndex)
	}
	return nil
}

func takeIdxChunkedDispatch[ arrow.IntType](,  *arrow.Chunked,  []*exec.ExecResult) error {
	 := newChunkedPrimitiveGetter[]()
	var  func(primitiveGetter[], *exec.ArraySpan, *exec.ExecResult)

	switch .DataType().(arrow.FixedWidthDataType).Bytes() {
	case 1:
		 = primitiveTakeImpl[uint8, ]
	case 2:
		 = primitiveTakeImpl[uint16, ]
	case 4:
		 = primitiveTakeImpl[uint32, ]
	case 8:
		 = primitiveTakeImpl[uint64, ]
	default:
		return fmt.Errorf("%w: invalid byte width for indices", arrow.ErrIndex)
	}

	var  exec.ArraySpan
	for ,  := range .Chunks() {
		.SetMembers(.Data())
		(, &, [])
	}
	return nil
}

func takeIdxDispatch[ arrow.IntType](,  *exec.ArraySpan,  *exec.ExecResult) error {
	 := &primitiveGetterImpl[]{inner: , values: exec.GetSpanValues[](, 1)}

	switch .Type.(arrow.FixedWidthDataType).Bytes() {
	case 1:
		primitiveTakeImpl[uint8](, , )
	case 2:
		primitiveTakeImpl[uint16](, , )
	case 4:
		primitiveTakeImpl[uint32](, , )
	case 8:
		primitiveTakeImpl[uint64](, , )
	default:
		return fmt.Errorf("%w: invalid indices byte width", arrow.ErrIndex)
	}
	return nil
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		  = &.Values[0].Array
		 = &.Values[1].Array
	)

	if .State.(TakeState).BoundsCheck {
		if  := checkIndexBounds(, uint64(.Len));  != nil {
			return 
		}
	}

	 := .Type.(arrow.FixedWidthDataType).BitWidth()
	 := .Nulls != 0 || .Nulls != 0
	preallocateData(, .Len, , , )

	switch  {
	case 1:
		return booleanTakeDispatch(, , )
	case 8:
		return takeIdxDispatch[int8](, , )
	case 16:
		return takeIdxDispatch[int16](, , )
	case 32:
		return takeIdxDispatch[int32](, , )
	case 64:
		return takeIdxDispatch[int64](, , )
	default:
		return fmt.Errorf("%w: invalid values byte width for take", arrow.ErrInvalid)
	}
}

func ( *exec.KernelCtx,  []*arrow.Chunked,  *exec.ExecResult) ([]*exec.ExecResult, error) {
	var (
		  = [0]
		 = [1]
	)

	if .State.(TakeState).BoundsCheck {
		if  := checkIndexBoundsChunked(, uint64(.Len()));  != nil {
			return nil, 
		}
	}

	 := .DataType().(arrow.FixedWidthDataType).BitWidth()
	 := .NullN() != 0 || .NullN() != 0
	 := make([]*exec.ExecResult, len(.Chunks()))
	for ,  := range .Chunks() {
		[] = &exec.ExecResult{Type: .Type}
		preallocateData(, int64(.Len()), , , [])
	}

	switch  {
	case 1:
		return , booleanTakeDispatchChunked(, , )
	case 8:
		return , takeIdxChunkedDispatch[int8](, , )
	case 16:
		return , takeIdxChunkedDispatch[int16](, , )
	case 32:
		return , takeIdxChunkedDispatch[int32](, , )
	case 64:
		return , takeIdxChunkedDispatch[int64](, , )
	default:
		return nil, fmt.Errorf("%w: invalid values byte width for take", arrow.ErrInvalid)
	}
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	if .State.(TakeState).BoundsCheck {
		if  := checkIndexBounds(&.Values[1].Array, uint64(.Values[0].Array.Len));  != nil {
			return 
		}
	}

	// batch.length doesn't take into account the take indices
	.Len = .Values[1].Array.Len
	.Type = arrow.Null
	return nil
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	 := getFilterOutputSize(&.Values[1].Array, .State.(FilterState).NullSelection)
	.Len = 
	.Type = arrow.Null
	return nil
}

func filterExec( *exec.KernelCtx,  int64, ,  *exec.ArraySpan,  *exec.ExecResult,  func( int64) error,  func() error) error {
	var (
		 = .State.(FilterState).NullSelection
		    = .Buffers[1].Buf
		 = .Buffers[0].Buf
		  = .Offset

		// we use 3 block counters for fast scanning
		//
		// values valid counter: for values null/not-null
		// filter valid counter: for filter null/not-null
		// filter counter: for filter true/false
		      = bitutil.OptionalBitIndexer{Bitmap: .Buffers[0].Buf, Offset: int(.Offset)}
		 = bitutils.NewOptionalBitBlockCounter(.Buffers[0].Buf, .Offset, .Len)
		 = bitutils.NewOptionalBitBlockCounter(, , .Len)
		      = bitutils.NewBitBlockCounter(, , .Len)
		              int64

		 = validityBuilder{mem: exec.GetAllocator(.Ctx)}
	)

	.Reserve()

	 := func( int64) error {
		.UnsafeAppend(true)
		return ()
	}

	 := func() error {
		.UnsafeAppend(false)
		return ()
	}

	 := func( int64) error {
		if .GetBit(int()) {
			return ()
		}
		return ()
	}

	for  < .Len {
		 := .NextWord()
		 := .NextWord()
		 := .NextWord()

		switch {
		case .NoneSet() &&  == DropNulls:
			// for this exceedingly common case in low-selectivity filters
			// we can skip further analysis of the data and move onto the next block
			 += int64(.Len)
		case .AllSet():
			// simpler path: no filter values are null
			if .AllSet() {
				// fastest path, filter values are all true and not null
				if .AllSet() {
					// values aren't null either
					.UnsafeAppendN(int64(.Len), true)
					for  := 0;  < int(.Len); ++ {
						if  := ();  != nil {
							return 
						}
						++
					}
				} else {
					// some values are null in this block
					for  := 0;  < int(.Len); ++ {
						if  := ();  != nil {
							return 
						}
						++
					}
				}
			} else { // !filterBlock.AllSet()
				// some filter values are false, but all not null
				if .AllSet() {
					// all the values are not-null, so we can skip null checking for them
					for  := 0;  < int(.Len); ++ {
						if bitutil.BitIsSet(, int(+)) {
							if  := ();  != nil {
								return 
							}
						}
						++
					}
				} else {
					// some of the values in the block are null
					// gotta check each one :(
					for  := 0;  < int(.Len); ++ {
						if bitutil.BitIsSet(, int(+)) {
							if  := ();  != nil {
								return 
							}
						}
						++
					}
				}
			}
		default:
			// !filterValidBlock.AllSet()
			// some filter values are null, so we have to handle drop
			// versus emit null
			if  == DropNulls {
				// filter null values are treated as false
				for  := 0;  < int(.Len); ++ {
					if bitutil.BitIsSet(, int(+)) &&
						bitutil.BitIsSet(, int(+)) {
						if  := ();  != nil {
							return 
						}
					}
					++
				}
			} else {
				// filter null values are appended to output as null
				// whether the value in the corresponding slot is valid
				// or not
				var  error
				for  := 0;  < int(.Len); ++ {
					 := bitutil.BitIsSet(, int(+))
					if  && bitutil.BitIsSet(, int(+)) {
						 = ()
					} else if ! {
						// emit null case
						 = ()
					}
					if  != nil {
						return 
					}
					++
				}
			}
		}
	}

	.Len = int64(.bitLength)
	.Nulls = int64(.falseCount)
	.Buffers[0].WrapBuffer(.Finish())
	return nil
}

func binaryFilterNonNull[ int32 | int64]( *exec.KernelCtx, ,  *exec.ArraySpan,  int64,  NullSelectionBehavior,  *exec.ExecResult) error {
	var (
		 = newBufferBuilder[](exec.GetAllocator(.Ctx))
		   = newBufferBuilder[uint8](exec.GetAllocator(.Ctx))
		    = exec.GetSpanOffsets[](, 1)
		       = .Buffers[2].Buf
	)

	.reserve(int() + 1)
	// get a rough estimate and pre-size the data builder
	if .Len > 0 {
		 := float64([.Len]-[0]) / float64(.Len)
		.reserve(int( * float64()))
	}

	 := .cap()
	var  
	 := .Buffers[1].Buf

	 := bitutils.VisitSetBitRuns(, .Offset, .Len,
		func(,  int64) error {
			,  := [], [+]
			// bulk-append raw data
			 := ( - )
			if  > () {
				.reserve(int())
				 = .cap() - .len()
			}
			.unsafeAppendSlice([:])
			 -= int()
			 := 
			for  := int64(0);  < ; ++ {
				.unsafeAppend()
				 += [++1] - 
				 = [++1]
			}
			return nil
		})

	if  != nil {
		return 
	}

	.unsafeAppend()
	.Len = 
	.Buffers[1].WrapBuffer(.finish())
	.Buffers[2].WrapBuffer(.finish())
	return nil
}

func binaryFilterImpl[ int32 | int64]( *exec.KernelCtx, ,  *exec.ArraySpan,  int64,  NullSelectionBehavior,  *exec.ExecResult) error {
	var (
		    = .Buffers[1].Buf
		 = .Buffers[0].Buf
		  = .Offset

		 = .Buffers[0].Buf
		  = .Offset
		// output bitmap should already be zero'd out so we just
		// have to set valid bits to true
		 = .Buffers[0].Buf

		    = exec.GetSpanOffsets[](, 1)
		       = .Buffers[2].Buf
		 = newBufferBuilder[](exec.GetAllocator(.Ctx))
		   = newBufferBuilder[uint8](exec.GetAllocator(.Ctx))
	)

	.reserve(int() + 1)
	if .Len > 0 {
		 := float64([.Len]-[0]) / float64(.Len)
		.reserve(int( * float64()))
	}

	 := .cap()
	var  

	// we use 3 block counters for fast scanning of the filter
	//
	// * valuesValidCounter: for values null/not-null
	// * filterValidCounter: for filter null/not-null
	// * filterCounter: for filter true/false
	 := bitutils.NewOptionalBitBlockCounter(.Buffers[0].Buf, .Offset, .Len)
	 := bitutils.NewOptionalBitBlockCounter(, , .Len)
	 := bitutils.NewBitBlockCounter(, , .Len)

	,  := int64(0), int64(0)

	 := func( []byte) {
		if len() >  {
			.reserve(len())
			 = .cap() - .len()
		}
		.unsafeAppendSlice()
		 -= len()
	}

	 := func() {
		 := [[]:[+1]]
		()
		 += (len())
	}

	for  < .Len {
		,  := .NextWord(), .NextWord()
		 := .NextWord()
		switch {
		case .NoneSet() &&  == DropNulls:
			// for this exceedingly common case in low-selectivity filters
			// we can skip further analysis of the data and move on to the
			// next block
			 += int64(.Len)
		case .AllSet():
			// simpler path: no filter values are null
			if .AllSet() {
				// fastest path: filter values are all true and not null
				if .AllSet() {
					// the values aren't null either
					bitutil.SetBitsTo(, , int64(.Len), true)

					// bulk-append raw data
					,  := [], [+int64(.Len)]
					([:])
					// append offsets
					for  := 0;  < int(.Len); ,  = +1, +1 {
						.unsafeAppend()
						 += [+1] - []
					}
					 += int64(.Len)
				} else {
					// some of the values in this block are null
					for  := 0;  < int(.Len); , ,  = +1, +1, +1 {
						.unsafeAppend()
						if bitutil.BitIsSet(, int(+)) {
							bitutil.SetBit(, int())
							()
						}
					}
				}
				continue
			}
			// !filterBlock.AllSet()
			// some of the filter values are false, but all not null
			if .AllSet() {
				// all the values are non-null, so we can skip null checking
				for  := 0;  < int(.Len); ,  = +1, +1 {
					if bitutil.BitIsSet(, int(+)) {
						.unsafeAppend()
						bitutil.SetBit(, int())
						++
						()
					}
				}
			} else {
				// some of the values in the block are null, so we have to check
				for  := 0;  < int(.Len); ,  = +1, +1 {
					if bitutil.BitIsSet(, int(+)) {
						.unsafeAppend()
						if bitutil.BitIsSet(, int(+)) {
							bitutil.SetBit(, int())
							()
						}
						++
					}
				}
			}
		default:
			// !filterValidBlock.AllSet()
			// some of the filter values are null, so we have to handle
			// the DROP vs EMIT_NULL null selection behavior
			if  == DropNulls {
				// filter null values are treated as false
				if .AllSet() {
					for  := 0;  < int(.Len); ,  = +1, +1 {
						if bitutil.BitIsSet(, int(+)) &&
							bitutil.BitIsSet(, int(+)) {
							.unsafeAppend()
							bitutil.SetBit(, int())
							++
							()
						}
					}
				} else {
					for  := 0;  < int(.Len); ,  = +1, +1 {
						if bitutil.BitIsSet(, int(+)) &&
							bitutil.BitIsSet(, int(+)) {
							.unsafeAppend()
							if bitutil.BitIsSet(, int(+)) {
								bitutil.SetBit(, int())
								()
							}
							++
						}
					}
				}
			} else {
				for  := 0;  < int(.Len); ,  = +1, +1 {
					 := bitutil.BitIsSet(, int(+))
					if  && bitutil.BitIsSet(, int(+)) {
						.unsafeAppend()
						if bitutil.BitIsSet(, int(+)) {
							bitutil.SetBit(, int())
							()
						}
						++
					} else if ! {
						.unsafeAppend()
						++
					}
				}
			}
		}
	}

	.unsafeAppend()
	.Len = 
	.Buffers[1].WrapBuffer(.finish())
	.Buffers[2].WrapBuffer(.finish())
	return nil
}

func takeExecImpl[ arrow.UintType]( *exec.KernelCtx,  int64, ,  *exec.ArraySpan,  *exec.ExecResult,  func(int64) error,  func() error) error {
	var (
		 = validityBuilder{mem: exec.GetAllocator(.Ctx)}
		   = exec.GetSpanValues[](, 1)
		         = .Buffers[0].Buf
		 = .MayHaveNulls()

		 = bitutil.OptionalBitIndexer{Bitmap: , Offset: int(.Offset)}
		  = bitutil.OptionalBitIndexer{Bitmap: .Buffers[0].Buf, Offset: int(.Offset)}
		     = bitutils.NewOptionalBitBlockCounter(, .Offset, .Len)
		            int64
	)

	.Reserve()
	for  < .Len {
		 := .NextBlock()
		 := .Popcnt < .Len
		if ! && ! {
			// fastest path, neither indices nor values have nulls
			.UnsafeAppendN(int64(.Len), true)
			for  := 0;  < int(.Len); ++ {
				if  := (int64([]));  != nil {
					return 
				}
				++
			}
		} else if .Popcnt > 0 {
			// since we have to branch on whether indices are null or not,
			// we combine the "non-null indices block but some values null"
			// and "some null indices block but values non-null" into single loop
			for  := 0;  < int(.Len); ++ {
				if (! || .GetBit(int())) && .GetBit(int([])) {
					.UnsafeAppend(true)
					if  := (int64([]));  != nil {
						return 
					}
				} else {
					.UnsafeAppend(false)
					if  := ();  != nil {
						return 
					}
				}
				++
			}
		} else {
			// the whole block is null
			.UnsafeAppendN(int64(.Len), false)
			for  := 0;  < int(.Len); ++ {
				if  := ();  != nil {
					return 
				}
			}
			 += int64(.Len)
		}
	}

	.Len = int64(.bitLength)
	.Nulls = int64(.falseCount)
	.Buffers[0].WrapBuffer(.Finish())
	return nil
}

func takeExec( *exec.KernelCtx,  int64, ,  *exec.ArraySpan,  *exec.ExecResult,  func(int64) error,  func() error) error {
	 := .Type.(arrow.FixedWidthDataType).Bytes()

	switch  {
	case 1:
		return takeExecImpl[uint8](, , , , , , )
	case 2:
		return takeExecImpl[uint16](, , , , , , )
	case 4:
		return takeExecImpl[uint32](, , , , , , )
	case 8:
		return takeExecImpl[uint64](, , , , , , )
	default:
		return fmt.Errorf("%w: invalid index width", arrow.ErrInvalid)
	}
}

type selectionOutputFn func(*exec.KernelCtx, int64, *exec.ArraySpan, *exec.ArraySpan, *exec.ExecResult, func(int64) error, func() error) error
type selectionImplFn func(*exec.KernelCtx, *exec.ExecSpan, int64, *exec.ExecResult, selectionOutputFn) error

func ( selectionImplFn) exec.ArrayKernelExec {
	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		var (
			    = &.Values[1].Array
			 = getFilterOutputSize(, .State.(FilterState).NullSelection)
		)
		return (, , , , filterExec)
	}
}

func ( selectionImplFn) exec.ArrayKernelExec {
	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		if .State.(TakeState).BoundsCheck {
			if  := checkIndexBounds(&.Values[1].Array, uint64(.Values[0].Array.Len));  != nil {
				return 
			}
		}

		return (, , .Values[1].Array.Len, , takeExec)
	}
}

func [ int32 | int64]( *exec.KernelCtx,  *exec.ExecSpan,  int64,  *exec.ExecResult,  selectionOutputFn) error {
	var (
		        = &.Values[0].Array
		     = &.Values[1].Array
		    = exec.GetSpanOffsets[](, 1)
		       = .Buffers[2].Buf
		 = newBufferBuilder[](exec.GetAllocator(.Ctx))
		   = newBufferBuilder[uint8](exec.GetAllocator(.Ctx))
	)

	// presize the data builder with a rough estimate of the required data size
	if .Len > 0 {
		 := [.Len] - [0]
		 := float64() / float64(.Len)
		.reserve(int())
	}

	.reserve(int() + 1)
	 := .cap()
	var  
	 := (, , , , ,
		func( int64) error {
			.unsafeAppend()
			 := []
			 := [+1] - 

			if  == 0 {
				return nil
			}
			 += 
			if  > () {
				.reserve(int())
				 = .cap() - .len()
			}
			.unsafeAppendSlice([ : +])
			 -= int()
			return nil
		}, func() error {
			.unsafeAppend()
			return nil
		})

	if  != nil {
		return 
	}

	.unsafeAppend()
	.Buffers[1].WrapBuffer(.finish())
	.Buffers[2].WrapBuffer(.finish())
	return nil
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  int64,  *exec.ExecResult,  selectionOutputFn) error {
	var (
		    = &.Values[0].Array
		 = &.Values[1].Array
		 = int64(.Type.(arrow.FixedWidthDataType).Bytes())
		 = .Buffers[1].Buf[.Offset*:]
	)

	.Buffers[1].WrapBuffer(.Allocate(int( * )))
	 := .Buffers[1].Buf

	 := (, , , , ,
		func( int64) error {
			 :=  * int64()
			copy(, [:+])
			 = [:]
			return nil
		},
		func() error {
			 = [:]
			return nil
		})

	if  != nil {
		.Buffers[1].Buf = nil
		.Buffers[1].Owner.Release()
		.Buffers[1].Owner = nil
		return 
	}

	return nil
}

func [ int32 | int64]( *exec.KernelCtx,  *exec.ExecSpan,  int64,  *exec.ExecResult,  selectionOutputFn) error {
	var (
		    = &.Values[0].Array
		 = &.Values[1].Array

		      = exec.GetSpanOffsets[](, 1)
		             = exec.GetAllocator(.Ctx)
		   = newBufferBuilder[]()
		 = newBufferBuilder[]()
	)

	if .Len > 0 {
		 := [.Len] - [0]
		 := float64() / float64(.Len)
		.reserve(int())
	}

	.reserve(int() + 1)
	var  
	 := (, , , , ,
		func( int64) error {
			.unsafeAppend()
			 := []
			 := [+1] - 
			 += 
			.reserve(int())
			for  := ;  < +; ++ {
				.unsafeAppend()
			}
			return nil
		}, func() error {
			.unsafeAppend()
			return nil
		})

	if  != nil {
		return 
	}

	.unsafeAppend()
	.Buffers[1].WrapBuffer(.finish())

	.Children = make([]exec.ArraySpan, 1)
	.Children[0].Type = arrow.GetDataType[]()
	.Children[0].Len = int64(.len())
	.Children[0].Buffers[1].WrapBuffer(.finish())

	return nil
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  int64,  *exec.ExecResult,  selectionOutputFn) error {
	var (
		    = &.Values[0].Array
		 = &.Values[1].Array

		   = .Type.(*arrow.FixedSizeListType).Len()
		 = .Offset

		 = array.NewInt64Builder(exec.GetAllocator(.Ctx))
	)

	// we need to take listSize elements even for null elements of indices
	.Reserve(int() * int())
	 := (, , , , ,
		func( int64) error {
			 := ( + ) * int64()
			for  := ;  < ( + int64()); ++ {
				.UnsafeAppend()
			}
			return nil
		}, func() error {
			for  := int32(0);  < ; ++ {
				.AppendNull()
			}
			return nil
		})

	if  != nil {
		return 
	}

	 := .NewArray()
	defer .Release()
	.Children = make([]exec.ArraySpan, 1)
	.Children[0].TakeOwnership(.Data())
	return nil
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  int64,  *exec.ExecResult,  selectionOutputFn) error {
	var (
		    = &.Values[0].Array
		 = &.Values[1].Array

		               = exec.GetAllocator(.Ctx)
		   = newBufferBuilder[int32]()
		       = newBufferBuilder[int8]()
		         = .Type.(arrow.UnionType).TypeCodes()
		 = make([]*array.Int32Builder, len())
	)

	for  := range  {
		[] = array.NewInt32Builder()
	}

	.reserve(int())
	.reserve(int())

	 := .MakeArray().(*array.DenseUnion)
	defer .Release()

	 := (, , , , ,
		func( int64) error {
			 := .ChildID(int())
			.unsafeAppend([])
			 := .ValueOffset(int())
			.unsafeAppend(int32([].Len()))
			[].Append()
			return nil
		}, func() error {
			 := 0
			.unsafeAppend([])
			.unsafeAppend(int32([].Len()))
			[].AppendNull()
			return nil
		})
	if  != nil {
		return 
	}

	.Type = .DataType()
	.Buffers[1].WrapBuffer(.finish())
	.Buffers[2].WrapBuffer(.finish())

	.Children = make([]exec.ArraySpan, len())
	for ,  := range  {
		 := .NewArray()
		.Children[].TakeOwnership(.Data())
		.Release()
		.Release()
	}
	return nil
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		 = .State.(FilterState).NullSelection
		     = &.Values[0].Array
		     = &.Values[1].Array
		  = getFilterOutputSize(, )
	)

	// the output precomputed null count is unknown except in the
	// narrow condition that all the values are non-null and the filter
	// will not cause any new nulls to be created
	if .Nulls == 0 && ( == DropNulls || .Nulls == 0) {
		.Nulls = 0
	} else {
		.Nulls = array.UnknownNullCount
	}

	 := .Type.ID()
	if .Nulls == 0 && .Nulls == 0 {
		// faster no nulls case
		switch {
		case arrow.IsBinaryLike():
			return binaryFilterNonNull[int32](, , , , , )
		case arrow.IsLargeBinaryLike():
			return binaryFilterNonNull[int64](, , , , , )
		default:
			return fmt.Errorf("%w: invalid type for binary filter", arrow.ErrInvalid)
		}
	}

	// output may have nulls
	.Buffers[0].WrapBuffer(.AllocateBitmap())
	switch {
	case arrow.IsBinaryLike():
		return binaryFilterImpl[int32](, , , , , )
	case arrow.IsLargeBinaryLike():
		return binaryFilterImpl[int64](, , , , , )
	}

	return fmt.Errorf("%w: invalid type for binary filter", arrow.ErrInvalid)
}

func visitNoop() error         { return nil }
func visitIdxNoop(int64) error { return nil }

func ( *exec.KernelCtx,  *exec.ExecSpan,  int64,  *exec.ExecResult,  selectionOutputFn) error {
	var (
		    = &.Values[0].Array
		 = &.Values[1].Array
	)

	// nothing we need to do other than generate the validity bitmap
	return (, , , , , visitIdxNoop, visitNoop)
}

type SelectionKernelData struct {
	In      exec.InputType
	Exec    exec.ArrayKernelExec
	Chunked exec.ChunkedExec
}

func ( arrow.DataType) bool {
	return arrow.IsPrimitive(.ID())
}

func () (,  []SelectionKernelData) {
	 = []SelectionKernelData{
		{In: exec.NewMatchedInput(exec.Primitive()), Exec: PrimitiveFilter},
		{In: exec.NewExactInput(arrow.Null), Exec: NullFilter},
		{In: exec.NewIDInput(arrow.DECIMAL128), Exec: FilterExec(FSBImpl)},
		{In: exec.NewIDInput(arrow.DECIMAL256), Exec: FilterExec(FSBImpl)},
		{In: exec.NewIDInput(arrow.FIXED_SIZE_BINARY), Exec: FilterExec(FSBImpl)},
		{In: exec.NewMatchedInput(exec.BinaryLike()), Exec: FilterBinary},
		{In: exec.NewMatchedInput(exec.LargeBinaryLike()), Exec: FilterBinary},
	}

	 = []SelectionKernelData{
		{In: exec.NewExactInput(arrow.Null), Exec: NullTake},
		{In: exec.NewMatchedInput(exec.Primitive()), Exec: PrimitiveTake, Chunked: ChunkedPrimitiveTake},
		{In: exec.NewIDInput(arrow.DECIMAL128), Exec: TakeExec(FSBImpl)},
		{In: exec.NewIDInput(arrow.DECIMAL256), Exec: TakeExec(FSBImpl)},
		{In: exec.NewIDInput(arrow.FIXED_SIZE_BINARY), Exec: TakeExec(FSBImpl)},
		{In: exec.NewMatchedInput(exec.BinaryLike()), Exec: TakeExec(VarBinaryImpl[int32])},
		{In: exec.NewMatchedInput(exec.LargeBinaryLike()), Exec: TakeExec(VarBinaryImpl[int64])},
	}
	return
}