// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18

package kernels

import (
	
	

	
	
	
	
	
	
	
	
)

// ScalarUnary returns a kernel for performing a unary operation on
// FixedWidth types which is implemented using the passed in function
// which will receive a slice containing the raw input data along with
// a slice to populate for the output data.
//
// Note that bool is not included in arrow.FixedWidthType since it is
// represented as a bitmap, not as a slice of bool.
func [,  arrow.FixedWidthType]( func(*exec.KernelCtx, [], []) error) exec.ArrayKernelExec {
	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		 := .Values[0].Array
		 := exec.GetSpanValues[](&, 1)
		 := exec.GetSpanValues[](, 1)
		return (, , )
	}
}

// ScalarUnaryNotNull is for generating a kernel to operate only on the
// non-null values in the input array. The zerovalue of the output type
// is used for any null input values.
func [,  arrow.FixedWidthType]( func(*exec.KernelCtx, , *error) ) exec.ArrayKernelExec {
	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		var (
			     = &.Values[0].Array
			 = exec.GetSpanValues[](, 1)
			   = 0
			      
			  = exec.GetSpanValues[](, 1)
			   = .Buffers[0].Buf
			      error
		)

		bitutils.VisitBitBlocks(, .Offset, .Len,
			func( int64) {
				[] = (, [], &)
				++
			}, func() {
				[] = 
				++
			})
		return 
	}
}

// ScalarUnaryBoolOutput is like ScalarUnary only it is for cases of boolean
// output. The function should take in a slice of the input type and a slice
// of bytes to fill with the output boolean bitmap.
func [ arrow.FixedWidthType]( func(*exec.KernelCtx, [], []byte) error) exec.ArrayKernelExec {
	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		 := .Values[0].Array
		 := exec.GetSpanValues[](&, 1)
		return (, , .Buffers[1].Buf)
	}
}

// ScalarUnaryNotNullBinaryArgBoolOut creates a unary kernel that accepts
// a binary type input (Binary [offset int32], String [offset int32],
// LargeBinary [offset int64], LargeString [offset int64]) and returns
// a boolean output which is never null.
//
// It implements the handling to iterate the offsets and values calling
// the provided function on each byte slice. The provided default value
// will be used as the output for elements of the input that are null.
func [ int32 | int64]( bool,  func(*exec.KernelCtx, []byte, *error) bool) exec.ArrayKernelExec {
	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		var (
			        = .Values[0].Array
			     = .Buffers[1].Buf
			      = 0
			 = exec.GetSpanOffsets[](&, 1)
			    = .Buffers[2].Buf
			      = .Buffers[0].Buf
			         error
		)

		bitutils.VisitBitBlocks(, .Offset, .Len,
			func( int64) {
				 := [[]:[+1]]
				bitutil.SetBitTo(, int(.Offset)+, (, , &))
				++
			}, func() {
				bitutil.SetBitTo(, int(.Offset)+, )
				++
			})
		return 
	}
}

// ScalarUnaryNotNullBinaryArg creates a unary kernel that accepts
// a binary type input (Binary [offset int32], String [offset int32],
// LargeBinary [offset int64], LargeString [offset int64]) and returns
// a FixedWidthType output which is never null.
//
// It implements the handling to iterate the offsets and values calling
// the provided function on each byte slice. The zero value of the OutT
// will be used as the output for elements of the input that are null.
func [ arrow.FixedWidthType,  int32 | int64]( func(*exec.KernelCtx, []byte, *error) ) exec.ArrayKernelExec {
	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		var (
			        = &.Values[0].Array
			     = exec.GetSpanValues[](, 1)
			      = 0
			 = exec.GetSpanOffsets[](, 1)
			         
			    = .Buffers[2].Buf
			      = .Buffers[0].Buf
			         error
		)

		bitutils.VisitBitBlocks(, .Offset, .Len,
			func( int64) {
				 := [[]:[+1]]
				[] = (, , &)
				++
			}, func() {
				[] = 
				++
			})
		return 
	}
}

// ScalarUnaryBoolArg is like ScalarUnary except it specifically expects a
// function that takes a byte slice since booleans arrays are represented
// as a bitmap.
func [ arrow.FixedWidthType]( func(*exec.KernelCtx, []byte, []) error) exec.ArrayKernelExec {
	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		 := exec.GetSpanValues[](, 1)
		return (, .Values[0].Array.Buffers[1].Buf, )
	}
}

func [ arrow.FixedWidthType]( scalar.PrimitiveScalar)  {
	return *(*)(unsafe.Pointer(&.Data()[0]))
}

func ( scalar.BinaryScalar) []byte {
	if !.IsValid() {
		return nil
	}
	return .Data()
}

type arrArrFn[, ,  arrow.FixedWidthType] func(*exec.KernelCtx, [], [], []) error
type arrScalarFn[, ,  arrow.FixedWidthType] func(*exec.KernelCtx, [], , []) error
type scalarArrFn[, ,  arrow.FixedWidthType] func(*exec.KernelCtx, , [], []) error

type binaryOps[, ,  arrow.FixedWidthType] struct {
	arrArr    arrArrFn[, , ]
	arrScalar arrScalarFn[, , ]
	scalarArr scalarArrFn[, , ]
}

type binaryBoolOps struct {
	arrArr    func(ctx *exec.KernelCtx, lhs, rhs, out bitutil.Bitmap) error
	arrScalar func(ctx *exec.KernelCtx, lhs bitutil.Bitmap, rhs bool, out bitutil.Bitmap) error
	scalarArr func(ctx *exec.KernelCtx, lhs bool, rhs, out bitutil.Bitmap) error
}

func [, ,  arrow.FixedWidthType]( binaryOps[, , ]) exec.ArrayKernelExec {
	 := func( *exec.KernelCtx, ,  *exec.ArraySpan,  *exec.ExecResult) error {
		var (
			      = exec.GetSpanValues[](, 1)
			      = exec.GetSpanValues[](, 1)
			 = exec.GetSpanValues[](, 1)
		)
		return .arrArr(, , , )
	}

	 := func( *exec.KernelCtx,  *exec.ArraySpan,  scalar.Scalar,  *exec.ExecResult) error {
		var (
			      = exec.GetSpanValues[](, 1)
			      = UnboxScalar[](.(scalar.PrimitiveScalar))
			 = exec.GetSpanValues[](, 1)
		)
		return .arrScalar(, , , )
	}

	 := func( *exec.KernelCtx,  scalar.Scalar,  *exec.ArraySpan,  *exec.ExecResult) error {
		var (
			      = UnboxScalar[](.(scalar.PrimitiveScalar))
			      = exec.GetSpanValues[](, 1)
			 = exec.GetSpanValues[](, 1)
		)
		return .scalarArr(, , , )
	}

	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		if .Values[0].IsArray() {
			if .Values[1].IsArray() {
				return (, &.Values[0].Array, &.Values[1].Array, )
			}
			return (, &.Values[0].Array, .Values[1].Scalar, )
		}

		if .Values[1].IsArray() {
			return (, .Values[0].Scalar, &.Values[1].Array, )
		}

		debug.Assert(false, "should be unreachable")
		return fmt.Errorf("%w: scalar binary with two scalars?", arrow.ErrInvalid)
	}
}

func ( *binaryBoolOps) exec.ArrayKernelExec {
	 := func( *exec.KernelCtx, ,  *exec.ArraySpan,  *exec.ExecResult) error {
		var (
			  = bitutil.Bitmap{Data: .Buffers[1].Buf, Offset: .Offset, Len: .Len}
			  = bitutil.Bitmap{Data: .Buffers[1].Buf, Offset: .Offset, Len: .Len}
			 = bitutil.Bitmap{Data: .Buffers[1].Buf, Offset: .Offset, Len: .Len}
		)

		return .arrArr(, , , )
	}

	 := func( *exec.KernelCtx,  *exec.ArraySpan,  scalar.Scalar,  *exec.ExecResult) error {
		var (
			  = bitutil.Bitmap{Data: .Buffers[1].Buf, Offset: .Offset, Len: .Len}
			    = .(*scalar.Boolean).Value
			 = bitutil.Bitmap{Data: .Buffers[1].Buf, Offset: .Offset, Len: .Len}
		)
		return .arrScalar(, , , )
	}

	 := func( *exec.KernelCtx,  scalar.Scalar,  *exec.ArraySpan,  *exec.ExecResult) error {
		var (
			    = .(*scalar.Boolean).Value
			  = bitutil.Bitmap{Data: .Buffers[1].Buf, Offset: .Offset, Len: .Len}
			 = bitutil.Bitmap{Data: .Buffers[1].Buf, Offset: .Offset, Len: .Len}
		)
		return .scalarArr(, , , )
	}

	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		if .Values[0].IsArray() {
			if .Values[1].IsArray() {
				return (, &.Values[0].Array, &.Values[1].Array, )
			}
			return (, &.Values[0].Array, .Values[1].Scalar, )
		}

		if .Values[1].IsArray() {
			return (, .Values[0].Scalar, &.Values[1].Array, )
		}

		debug.Assert(false, "should be unreachable")
		return fmt.Errorf("%w: scalar binary with two scalars?", arrow.ErrInvalid)
	}
}

func [, ,  arrow.FixedWidthType]( func(*exec.KernelCtx, , , *error) ) exec.ArrayKernelExec {
	 := func( *exec.KernelCtx, ,  *exec.ArraySpan,  *exec.ExecResult) ( error) {
		// fast path if one side is entirely null
		if .UpdateNullCount() == .Len || .UpdateNullCount() == .Len {
			return nil
		}

		var (
			      = exec.GetSpanValues[](, 1)
			      = exec.GetSpanValues[](, 1)
			 = exec.GetSpanValues[](, 1)
			  int64
			     
		)
		bitutils.VisitTwoBitBlocks(.Buffers[0].Buf, .Buffers[0].Buf, .Offset, .Offset, .Len,
			func( int64) {
				[] = (, [], [], &)
				++
			}, func() {
				[] = 
				++
			})
		return
	}

	 := func( *exec.KernelCtx,  *exec.ArraySpan,  scalar.Scalar,  *exec.ExecResult) ( error) {
		// fast path if one side is entirely null
		if .UpdateNullCount() == .Len || !.IsValid() {
			return nil
		}

		var (
			      = exec.GetSpanValues[](, 1)
			 = exec.GetSpanValues[](, 1)
			  int64
			     
		)
		if !.IsValid() {
			return nil
		}

		 := UnboxScalar[](.(scalar.PrimitiveScalar))
		bitutils.VisitBitBlocks(.Buffers[0].Buf, .Offset, .Len,
			func( int64) {
				[] = (, [], , &)
				++
			}, func() {
				[] = 
				++
			})
		return
	}

	 := func( *exec.KernelCtx,  scalar.Scalar,  *exec.ArraySpan,  *exec.ExecResult) ( error) {
		// fast path if one side is entirely null
		if .UpdateNullCount() == .Len || !.IsValid() {
			return nil
		}

		var (
			      = exec.GetSpanValues[](, 1)
			 = exec.GetSpanValues[](, 1)
			  int64
			     
		)
		if !.IsValid() {
			return nil
		}

		 := UnboxScalar[](.(scalar.PrimitiveScalar))
		bitutils.VisitBitBlocks(.Buffers[0].Buf, .Offset, .Len,
			func( int64) {
				[] = (, , [], &)
				++
			}, func() {
				[] = 
				++
			})
		return
	}

	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		if .Values[0].IsArray() {
			if .Values[1].IsArray() {
				return (, &.Values[0].Array, &.Values[1].Array, )
			}
			return (, &.Values[0].Array, .Values[1].Scalar, )
		}

		if .Values[1].IsArray() {
			return (, .Values[0].Scalar, &.Values[1].Array, )
		}

		debug.Assert(false, "should be unreachable")
		return fmt.Errorf("%w: scalar binary with two scalars?", arrow.ErrInvalid)
	}
}

type binaryBinOp[ arrow.FixedWidthType | bool] func(ctx *exec.KernelCtx, arg0, arg1 []byte) 

func ( func(*exec.ArraySpan) exec.ArrayIter[[]byte],  binaryBinOp[bool]) exec.ArrayKernelExec {
	 := func( *exec.KernelCtx, ,  *exec.ArraySpan,  *exec.ExecResult) error {
		var (
			 = ()
			 = ()
		)

		bitutils.GenerateBitsUnrolled(.Buffers[1].Buf, .Offset, .Len, func() bool {
			return (, .Next(), .Next())
		})
		return nil
	}

	 := func( *exec.KernelCtx,  *exec.ArraySpan,  scalar.Scalar,  *exec.ExecResult) error {
		var (
			 = ()
			     = UnboxBinaryScalar(.(scalar.BinaryScalar))
		)

		bitutils.GenerateBitsUnrolled(.Buffers[1].Buf, .Offset, .Len, func() bool {
			return (, .Next(), )
		})
		return nil
	}

	 := func( *exec.KernelCtx,  scalar.Scalar,  *exec.ArraySpan,  *exec.ExecResult) error {
		var (
			 = ()
			     = UnboxBinaryScalar(.(scalar.BinaryScalar))
		)

		bitutils.GenerateBitsUnrolled(.Buffers[1].Buf, .Offset, .Len, func() bool {
			return (, , .Next())
		})
		return nil
	}

	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		if .Values[0].IsArray() {
			if .Values[1].IsArray() {
				return (, &.Values[0].Array, &.Values[1].Array, )
			}
			return (, &.Values[0].Array, .Values[1].Scalar, )
		}

		if .Values[1].IsArray() {
			return (, .Values[0].Scalar, &.Values[1].Array, )
		}

		debug.Assert(false, "should be unreachable")
		return fmt.Errorf("%w: scalar binary with two scalars?", arrow.ErrInvalid)
	}
}

// SizeOf determines the size in number of bytes for an integer
// based on the generic value in a way that the compiler should
// be able to easily evaluate and create as a constant.
func [ constraints.Integer]() uint {
	 := uint16(1 << 8)
	 := uint32(2 << 16)
	 := uint64(4 << 32)
	return 1 + uint(())>>8 + uint(())>>16 + uint(())>>32
}

// MinOf returns the minimum value for a given type since there is not
// currently a generic way to do this with Go generics yet.
func [ constraints.Integer]()  {
	if  := ^(0);  < 0 {
		return  << (8*SizeOf[]() - 1)
	}
	return 0
}

// MaxOf determines the max value for a given type since there is not
// currently a generic way to do this for Go generics yet as all of the
// math.Max/Min values are constants.
func [ constraints.Integer]()  {
	 := ^(0)
	if  < 0 {
		return  ^ ( << (8*SizeOf[]() - 1))
	}
	return 
}

func getSafeMinSameSign[,  constraints.Integer]()  {
	if SizeOf[]() > SizeOf[]() {
		return (MinOf[]())
	}
	return MinOf[]()
}

func getSafeMaxSameSign[,  constraints.Integer]()  {
	if SizeOf[]() > SizeOf[]() {
		return (MaxOf[]())
	}
	return MaxOf[]()
}

func getSafeMaxSignedUnsigned[ constraints.Signed,  constraints.Unsigned]()  {
	if SizeOf[]() <= SizeOf[]() {
		return MaxOf[]()
	}
	return (MaxOf[]())
}

func getSafeMaxUnsignedSigned[ constraints.Unsigned,  constraints.Signed]()  {
	if SizeOf[]() < SizeOf[]() {
		return MaxOf[]()
	}
	return (MaxOf[]())
}

func getSafeMinMaxSigned[ constraints.Signed]( arrow.Type) (,  ) {
	switch  {
	case arrow.UINT8:
		,  = 0, getSafeMaxSignedUnsigned[, uint8]()
	case arrow.UINT16:
		,  = 0, getSafeMaxSignedUnsigned[, uint16]()
	case arrow.UINT32:
		,  = 0, getSafeMaxSignedUnsigned[, uint32]()
	case arrow.UINT64:
		,  = 0, getSafeMaxSignedUnsigned[, uint64]()
	case arrow.INT8:
		 = getSafeMinSameSign[, int8]()
		 = getSafeMaxSameSign[, int8]()
	case arrow.INT16:
		 = getSafeMinSameSign[, int16]()
		 = getSafeMaxSameSign[, int16]()
	case arrow.INT32:
		 = getSafeMinSameSign[, int32]()
		 = getSafeMaxSameSign[, int32]()
	case arrow.INT64:
		 = getSafeMinSameSign[, int64]()
		 = getSafeMaxSameSign[, int64]()
	}
	return
}

func getSafeMinMaxUnsigned[ constraints.Unsigned]( arrow.Type) (,  ) {
	 = 0
	switch  {
	case arrow.UINT8:
		 = getSafeMaxSameSign[, uint8]()
	case arrow.UINT16:
		 = getSafeMaxSameSign[, uint16]()
	case arrow.UINT32:
		 = getSafeMaxSameSign[, uint32]()
	case arrow.UINT64:
		 = getSafeMaxSameSign[, uint64]()
	case arrow.INT8:
		 = getSafeMaxUnsignedSigned[, int8]()
	case arrow.INT16:
		 = getSafeMaxUnsignedSigned[, int16]()
	case arrow.INT32:
		 = getSafeMaxUnsignedSigned[, int32]()
	case arrow.INT64:
		 = getSafeMaxUnsignedSigned[, int64]()
	}
	return
}

func intsCanFit( *exec.ArraySpan,  arrow.Type) error {
	if !arrow.IsInteger() {
		return fmt.Errorf("%w: target type is not an integer type %s", arrow.ErrInvalid, )
	}

	switch .Type.ID() {
	case arrow.INT8:
		,  := getSafeMinMaxSigned[int8]()
		return intsInRange(, , )
	case arrow.UINT8:
		,  := getSafeMinMaxUnsigned[uint8]()
		return intsInRange(, , )
	case arrow.INT16:
		,  := getSafeMinMaxSigned[int16]()
		return intsInRange(, , )
	case arrow.UINT16:
		,  := getSafeMinMaxUnsigned[uint16]()
		return intsInRange(, , )
	case arrow.INT32:
		,  := getSafeMinMaxSigned[int32]()
		return intsInRange(, , )
	case arrow.UINT32:
		,  := getSafeMinMaxUnsigned[uint32]()
		return intsInRange(, , )
	case arrow.INT64:
		,  := getSafeMinMaxSigned[int64]()
		return intsInRange(, , )
	case arrow.UINT64:
		,  := getSafeMinMaxUnsigned[uint64]()
		return intsInRange(, , )
	default:
		return fmt.Errorf("%w: invalid type for int bounds checking", arrow.ErrInvalid)
	}
}

func intsInRange[ arrow.IntType | arrow.UintType]( *exec.ArraySpan, ,  ) error {
	if MinOf[]() >=  && MaxOf[]() <=  {
		return nil
	}

	 := func( ) bool {
		return  <  ||  > 
	}
	 := func( ,  bool) bool {
		return  && ( <  ||  > )
	}
	 := func( ) error {
		return fmt.Errorf("%w: integer value %d not in range: %d to %d",
			arrow.ErrInvalid, , , )
	}

	 := exec.GetSpanValues[](, 1)
	 := .Buffers[0].Buf

	 := bitutils.NewOptionalBitBlockCounter(, .Offset, .Len)
	,  := 0, .Offset
	for  < int(.Len) {
		 := .NextBlock()
		 := false

		if .Popcnt == .Len {
			// fast path: branchless
			 := 0
			for  := 0;  < int(.Len)/8; ++ {
				for  := 0;  < 8; ++ {
					 =  || ([])
					++
				}
			}
			for ;  < int(.Len); ++ {
				 =  || ([])
			}
		} else if .Popcnt > 0 {
			// values may be null, only bounds check non-null vals
			 := 0
			for  := 0;  < int(.Len)/8; ++ {
				for  := 0;  < 8; ++ {
					 =  || (
						[], bitutil.BitIsSet(, int()+))
					++
				}
			}
			for ;  < int(.Len); ++ {
				 =  || (
					[], bitutil.BitIsSet(, int()+))
			}
		}
		if  {
			if .Nulls > 0 {
				for  := 0;  < int(.Len); ++ {
					if ([], bitutil.BitIsSet(, int()+)) {
						return ([])
					}
				}
			} else {
				for  := 0;  < int(.Len); ++ {
					if ([]) {
						return ([])
					}
				}
			}
		}

		 = [.Len:]
		 += int(.Len)
		 += int64(.Len)
	}
	return nil
}

type numeric interface {
	arrow.IntType | arrow.UintType | constraints.Float
}

func memCpySpan[ numeric](,  *exec.ArraySpan) {
	 := exec.GetSpanValues[](, 1)
	 := exec.GetSpanValues[](, 1)
	copy(, )
}

func castNumberMemCpy(,  *exec.ArraySpan) {
	switch .Type.ID() {
	case arrow.INT8:
		memCpySpan[int8](, )
	case arrow.UINT8:
		memCpySpan[uint8](, )
	case arrow.INT16:
		memCpySpan[int16](, )
	case arrow.UINT16:
		memCpySpan[uint16](, )
	case arrow.INT32:
		memCpySpan[int32](, )
	case arrow.UINT32:
		memCpySpan[uint32](, )
	case arrow.INT64:
		memCpySpan[int64](, )
	case arrow.UINT64:
		memCpySpan[uint64](, )
	case arrow.FLOAT32:
		memCpySpan[float32](, )
	case arrow.FLOAT64:
		memCpySpan[float64](, )
	}
}

func castNumberToNumberUnsafe(,  *exec.ArraySpan) {
	if .Type.ID() == .Type.ID() {
		castNumberMemCpy(, )
		return
	}

	 := .Type.(arrow.FixedWidthDataType).Bytes() * int(.Offset)
	 := .Type.(arrow.FixedWidthDataType).Bytes() * int(.Offset)
	if .Type.ID() == arrow.FLOAT16 || .Type.ID() == arrow.FLOAT16 {
		castNumericGo(.Type.ID(), .Type.ID(), .Buffers[1].Buf[:], .Buffers[1].Buf[:], int(.Len))
	} else {
		castNumericUnsafe(.Type.ID(), .Type.ID(), .Buffers[1].Buf[:], .Buffers[1].Buf[:], int(.Len))
	}
}

func ( arrow.Type) (int32, error) {
	switch  {
	case arrow.INT8, arrow.UINT8:
		return 3, nil
	case arrow.INT16, arrow.UINT16:
		return 5, nil
	case arrow.INT32, arrow.UINT32:
		return 10, nil
	case arrow.INT64:
		return 19, nil
	case arrow.UINT64:
		return 20, nil
	}
	return -1, fmt.Errorf("%w: not an integer type: %s", arrow.ErrInvalid, )
}

func ( *exec.KernelCtx,  []arrow.DataType) (arrow.DataType, error) {
	 := .State.(CastState)
	return .ToType, nil
}

var OutputTargetType = exec.NewComputedOutputType(ResolveOutputFromOptions)

var OutputFirstType = exec.NewComputedOutputType(func( *exec.KernelCtx,  []arrow.DataType) (arrow.DataType, error) {
	return [0], nil
})

var OutputLastType = exec.NewComputedOutputType(func( *exec.KernelCtx,  []arrow.DataType) (arrow.DataType, error) {
	return [len()-1], nil
})

func resolveDecimalBinaryOpOutput( []arrow.DataType,  func(, , ,  int32) (,  int32)) (arrow.DataType, error) {
	,  := [0].(arrow.DecimalType), [1].(arrow.DecimalType)
	debug.Assert(.ID() == .ID(), "decimal binary ops should have casted to the same type")

	,  := (.GetPrecision(), .GetScale(),
		.GetPrecision(), .GetScale())

	return arrow.NewDecimalType(.ID(), , )
}

func resolveDecimalAddOrSubtractType( *exec.KernelCtx,  []arrow.DataType) (arrow.DataType, error) {
	return resolveDecimalBinaryOpOutput(,
		func(, , ,  int32) ( int32,  int32) {
			debug.Assert( == , "decimal operations should use the same scale")
			 = 
			 = exec.Max(-, -) +  + 1
			return
		})
}

func resolveDecimalMultiplyOutput( *exec.KernelCtx,  []arrow.DataType) (arrow.DataType, error) {
	return resolveDecimalBinaryOpOutput(,
		func(, , ,  int32) ( int32,  int32) {
			 =  + 
			 =  +  + 1
			return
		})
}

func resolveDecimalDivideOutput( *exec.KernelCtx,  []arrow.DataType) (arrow.DataType, error) {
	return resolveDecimalBinaryOpOutput(,
		func(, , ,  int32) ( int32,  int32) {
			debug.Assert( >= , "when dividing decimal values numerator scale should be greater/equal to denom scale")
			 =  - 
			 = 
			return
		})
}

func resolveTemporalOutput( *exec.KernelCtx,  []arrow.DataType) (arrow.DataType, error) {
	debug.Assert([0].ID() == [1].ID(), "should only be used on the same types")
	,  := [0].(*arrow.TimestampType), [1].(*arrow.TimestampType)
	debug.Assert(.Unit == .Unit, "should match units")

	if (.TimeZone == "" || .TimeZone == "") && (.TimeZone != .TimeZone) {
		return nil, fmt.Errorf("%w: subtraction of zoned and non-zoned times is ambiguous (%s, %s)",
			arrow.ErrInvalid, .TimeZone, .TimeZone)
	}

	return &arrow.DurationType{Unit: .Unit}, nil
}

var OutputResolveTemporal = exec.NewComputedOutputType(resolveTemporalOutput)

type validityBuilder struct {
	mem    memory.Allocator
	buffer *memory.Buffer

	data       []byte
	bitLength  int
	falseCount int
}

func ( *validityBuilder) ( int64) {
	if .buffer == nil {
		.buffer = memory.NewResizableBuffer(.mem)
	}

	.buffer.ResizeNoShrink(int(bitutil.BytesForBits()))
	.data = .buffer.Bytes()
}

func ( *validityBuilder) ( int64) {
	if .buffer == nil {
		.buffer = memory.NewResizableBuffer(.mem)
	}

	.buffer.Reserve(.buffer.Cap() + int(bitutil.BytesForBits()))
	.data = .buffer.Buf()
}

func ( *validityBuilder) ( bool) {
	bitutil.SetBitTo(.data, .bitLength, )
	if ! {
		.falseCount++
	}
	.bitLength++
}

func ( *validityBuilder) ( int64,  bool) {
	bitutil.SetBitsTo(.data, int64(.bitLength), , )
	if ! {
		.falseCount += int()
	}
	.bitLength += int()
}

func ( *validityBuilder) ( bool) {
	.Reserve(1)
	.UnsafeAppend()
}

func ( *validityBuilder) ( int64,  bool) {
	.Reserve()
	.UnsafeAppendN(, )
}

func ( *validityBuilder) () ( *memory.Buffer) {
	if .bitLength > 0 {
		.buffer.Resize(int(bitutil.BytesForBits(int64(.bitLength))))
	}

	.bitLength, .falseCount = 0, 0
	 = .buffer
	.buffer = nil
	return
}

type execBufBuilder struct {
	mem    memory.Allocator
	buffer *memory.Buffer
	data   []byte
	sz     int
}

func ( *execBufBuilder) ( int) {
	if .buffer == nil {
		.buffer = memory.NewResizableBuffer(.mem)
	}

	 := .sz + 
	if  <= cap(.data) {
		return
	}
	.buffer.ResizeNoShrink()
	.data = .buffer.Buf()
}

func ( *execBufBuilder) ( []byte) {
	copy(.data[.sz:], )
	.sz += len()
}

func ( *execBufBuilder) () ( *memory.Buffer) {
	if .buffer == nil {
		 = memory.NewBufferBytes(nil)
		return
	}
	.buffer.Resize(.sz)
	 = .buffer
	.buffer, .sz = nil, 0
	return
}

type bufferBuilder[ arrow.FixedWidthType] struct {
	execBufBuilder
	zero 
}

func newBufferBuilder[ arrow.FixedWidthType]( memory.Allocator) *bufferBuilder[] {
	return &bufferBuilder[]{
		execBufBuilder: execBufBuilder{
			mem: ,
		},
	}
}

func ( *bufferBuilder[]) ( int) {
	.execBufBuilder.reserve( * int(unsafe.Sizeof(.zero)))
}

func ( *bufferBuilder[]) ( ) {
	.execBufBuilder.unsafeAppend(arrow.GetBytes([]{}))
}

func ( *bufferBuilder[]) ( []) {
	.execBufBuilder.unsafeAppend(arrow.GetBytes())
}

func ( *bufferBuilder[]) () int { return .sz / int(unsafe.Sizeof(.zero)) }

func ( *bufferBuilder[]) () int {
	return cap(.data) / int(unsafe.Sizeof(.zero))
}

func checkIndexBoundsImpl[ arrow.IntType | arrow.UintType]( *exec.ArraySpan,  uint64) error {
	// for unsigned integers, if the values array is larger
	// than the maximum index value, then there's no need to bounds check
	 := !arrow.IsUnsignedInteger(.Type.ID())
	if ! &&  > uint64(MaxOf[]()) {
		return nil
	}

	 := exec.GetSpanValues[](, 1)
	 := .Buffers[0].Buf
	 := func( ) bool {
		return (( &&  < 0) ||  >= 0 && uint64() >= )
	}
	return bitutils.VisitSetBitRuns(, .Offset, .Len,
		func(,  int64) error {
			 := false
			for  := int64(0);  < ; ++ {
				 =  || ([+])
			}
			if  {
				for  := int64(0);  < ; ++ {
					if ([+]) {
						return fmt.Errorf("%w: %d out of bounds",
							arrow.ErrIndex, [+])
					}
				}
			}
			return nil
		})
}

func checkIndexBounds( *exec.ArraySpan,  uint64) error {
	switch .Type.ID() {
	case arrow.INT8:
		return checkIndexBoundsImpl[int8](, )
	case arrow.UINT8:
		return checkIndexBoundsImpl[uint8](, )
	case arrow.INT16:
		return checkIndexBoundsImpl[int16](, )
	case arrow.UINT16:
		return checkIndexBoundsImpl[uint16](, )
	case arrow.INT32:
		return checkIndexBoundsImpl[int32](, )
	case arrow.UINT32:
		return checkIndexBoundsImpl[uint32](, )
	case arrow.INT64:
		return checkIndexBoundsImpl[int64](, )
	case arrow.UINT64:
		return checkIndexBoundsImpl[uint64](, )
	default:
		return fmt.Errorf("%w: invalid index type for bounds checking", arrow.ErrInvalid)
	}
}

func checkIndexBoundsChunked( *arrow.Chunked,  uint64) error {
	var  exec.ArraySpan
	for ,  := range .Chunks() {
		.SetMembers(.Data())
		if  := checkIndexBounds(&, );  != nil {
			return 
		}
	}
	return nil
}

func packBits( [32]uint32,  []byte) {
	const  = 32
	for  := 0;  < ;  += 8 {
		[0] = byte([] | [+1]<<1 | [+2]<<2 | [+3]<<3 |
			[+4]<<4 | [+5]<<5 | [+6]<<6 | [+7]<<7)
		 = [1:]
	}
}