// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18

package kernels

import (
	
	
	

	
	
	
	
	
	
	
	
	
)

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	 := .State.(CastOptions)
	if !.AllowIntOverflow {
		if  := intsCanFit(&.Values[0].Array, .Type.ID());  != nil {
			return 
		}
	}
	castNumberToNumberUnsafe(&.Values[0].Array, )
	return nil
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	castNumberToNumberUnsafe(&.Values[0].Array, )
	return nil
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	 := .State.(CastOptions)
	castNumberToNumberUnsafe(&.Values[0].Array, )
	if !.AllowFloatTruncate {
		return checkFloatToIntTrunc(&.Values[0].Array, )
	}
	return nil
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	 := .State.(CastOptions)
	if !.AllowFloatTruncate {
		if  := checkIntToFloatTrunc(&.Values[0].Array, .Type.ID());  != nil {
			return 
		}
	}
	castNumberToNumberUnsafe(&.Values[0].Array, )
	return nil
}

type decimal[ decimal128.Num | decimal256.Num] interface {
	Less() bool
	GreaterEqual() bool
	LowBits() uint64
}

func decimalToIntImpl[ decimal128.Num | decimal256.Num,  arrow.IntType | arrow.UintType]( bool, ,  ,  decimal[],  *error)  {
	if ! && (.Less() || .GreaterEqual()) {
		debug.Log("integer value out of bounds from decimal")
		* = fmt.Errorf("%w: integer value out of bounds", arrow.ErrInvalid)
		return (0)
	}
	return (.LowBits())
}

func [ arrow.IntType | arrow.UintType]( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		       = .State.(CastState)
		  = .Values[0].Type().(*arrow.Decimal256Type)
		    = .Scale
		         exec.ArrayKernelExec
		 = uint64(MinOf[]())
		  int64
		        = decimal256.FromU64(uint64(MaxOf[]()))
	)

	if MinOf[]() < 0 {
		 = -1
	}
	 := decimal256.New(uint64(), uint64(), uint64(), )
	if .AllowDecimalTruncate {
		if  < 0 {
			 = ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal256.Num,  *error)  {
				 := .IncreaseScaleBy(-)
				return decimalToIntImpl[decimal256.Num, ](.AllowIntOverflow, , , , )
			})
		} else {
			 = ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal256.Num,  *error)  {
				 := .ReduceScaleBy(, true)
				return decimalToIntImpl[decimal256.Num, ](.AllowIntOverflow, , , , )
			})
		}
	} else {
		 = ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal256.Num,  *error)  {
			,  := .Rescale(, 0)
			if  != nil {
				* = fmt.Errorf("%w: %s", arrow.ErrInvalid, )
				return (0)
			}
			return decimalToIntImpl[decimal256.Num, ](.AllowIntOverflow, , , , )
		})
	}

	return (, , )
}

func [ arrow.IntType | arrow.UintType]( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		       = .State.(CastState)
		  = .Values[0].Type().(*arrow.Decimal128Type)
		    = .Scale
		         exec.ArrayKernelExec
		 = uint64(MinOf[]())
		  int64
		        = decimal128.FromU64(uint64(MaxOf[]()))
	)

	if MinOf[]() < 0 {
		 = -1
	}
	 := decimal128.New(, )
	if .AllowDecimalTruncate {
		if  < 0 {
			 = ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal128.Num,  *error)  {
				 := .IncreaseScaleBy(-)
				return decimalToIntImpl[decimal128.Num, ](.AllowIntOverflow, , , , )
			})
		} else {
			 = ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal128.Num,  *error)  {
				 := .ReduceScaleBy(, true)
				return decimalToIntImpl[decimal128.Num, ](.AllowIntOverflow, , , , )
			})
		}
	} else {
		 = ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal128.Num,  *error)  {
			,  := .Rescale(, 0)
			if  != nil {
				* = fmt.Errorf("%w: %s", arrow.ErrInvalid, )
				return (0)
			}
			return decimalToIntImpl[decimal128.Num, ](.AllowIntOverflow, , , , )
		})
	}

	return (, , )
}

func integerToDecimal128[ arrow.IntType | arrow.UintType]( arrow.Type,  int32) exec.ArrayKernelExec {
	var  func( ) decimal128.Num
	switch  {
	case arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64:
		 = func( ) decimal128.Num { return decimal128.FromU64(uint64()) }
	default:
		 = func( ) decimal128.Num { return decimal128.FromI64(int64()) }
	}
	return ScalarUnaryNotNull(func( *exec.KernelCtx,  ,  *error) decimal128.Num {
		,  := ().Rescale(0, )
		if  != nil {
			* = 
		}
		return 
	})
}

func integerToDecimal256[ arrow.IntType | arrow.UintType]( arrow.Type,  int32) exec.ArrayKernelExec {
	var  func( ) decimal256.Num
	switch  {
	case arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64:
		 = func( ) decimal256.Num { return decimal256.FromU64(uint64()) }
	default:
		 = func( ) decimal256.Num { return decimal256.FromI64(int64()) }
	}
	return ScalarUnaryNotNull(func( *exec.KernelCtx,  ,  *error) decimal256.Num {
		,  := ().Rescale(0, )
		if  != nil {
			* = 
		}
		return 
	})
}

func [ decimal128.Num | decimal256.Num,  arrow.IntType | arrow.UintType]( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		,  int32
		         exec.ArrayKernelExec
	)
	switch dt := .Type.(type) {
	case *arrow.Decimal128Type:
		 = .Precision
		 = .Scale
		 = integerToDecimal128[](.Values[0].Array.Type.ID(), )
	case *arrow.Decimal256Type:
		 = .Precision
		 = .Scale
		 = integerToDecimal256[](.Values[0].Array.Type.ID(), )
	}

	if  < 0 {
		return fmt.Errorf("%w: scale must be non-negative", arrow.ErrInvalid)
	}

	,  := MaxDecimalDigitsForInt(.Values[0].Type().ID())
	if  != nil {
		return 
	}

	 += 
	if  <  {
		return fmt.Errorf("%w: precision is not great enough for result. It should be at least %d",
			arrow.ErrInvalid, )
	}

	return (, , )
}

func getCastIntToDecimal[ decimal128.Num | decimal256.Num]( arrow.Type) exec.ArrayKernelExec {
	switch  {
	case arrow.UINT8:
		return CastIntegerToDecimal[, uint8]
	case arrow.INT8:
		return CastIntegerToDecimal[, int8]
	case arrow.UINT16:
		return CastIntegerToDecimal[, uint16]
	case arrow.INT16:
		return CastIntegerToDecimal[, int16]
	case arrow.UINT32:
		return CastIntegerToDecimal[, uint32]
	case arrow.INT32:
		return CastIntegerToDecimal[, int32]
	case arrow.UINT64:
		return CastIntegerToDecimal[, uint64]
	case arrow.INT64:
		return CastIntegerToDecimal[, int64]
	}
	debug.Assert(false, "invalid integer type")
	return nil
}

func unsafeUpscaleDecimal256Out( arrow.Type,  int32) exec.ArrayKernelExec {
	if  == arrow.DECIMAL128 {
		return ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal128.Num,  *error) decimal256.Num {
			return decimal256.FromDecimal128().IncreaseScaleBy()
		})
	}
	return ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal256.Num,  *error) decimal256.Num {
		return .IncreaseScaleBy()
	})
}

func unsafeUpscaleDecimal128Out( arrow.Type,  int32) exec.ArrayKernelExec {
	if  == arrow.DECIMAL128 {
		return ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal128.Num,  *error) decimal128.Num {
			return .IncreaseScaleBy()
		})
	}
	return ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal256.Num,  *error) decimal128.Num {
		 := .IncreaseScaleBy().Array()
		return decimal128.New(int64([1]), [0])
	})
}

func unsafeDownscaleDecimal256Out( arrow.Type,  int32) exec.ArrayKernelExec {
	if  == arrow.DECIMAL128 {
		return ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal128.Num,  *error) decimal256.Num {
			return decimal256.FromDecimal128().ReduceScaleBy(, false)
		})
	}
	return ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal256.Num,  *error) decimal256.Num {
		return .ReduceScaleBy(, false)
	})
}

func unsafeDownscaleDecimal128Out( arrow.Type,  int32) exec.ArrayKernelExec {
	if  == arrow.DECIMAL128 {
		return ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal128.Num,  *error) decimal128.Num {
			return .ReduceScaleBy(, false)
		})
	}
	return ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal256.Num,  *error) decimal128.Num {
		 := .ReduceScaleBy(, false).Array()
		return decimal128.New(int64([1]), [0])
	})
}

func safeRescaleDecimal256Out( arrow.Type, , ,  int32) exec.ArrayKernelExec {
	if  == arrow.DECIMAL128 {
		return ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal128.Num,  *error) decimal256.Num {
			,  := decimal256.FromDecimal128().Rescale(, )
			if  != nil {
				* = fmt.Errorf("%w: %s", arrow.ErrInvalid, *)
				return decimal256.Num{}
			}

			if .FitsInPrecision() {
				return 
			}

			* = fmt.Errorf("%w: decimal value does not fit in precision", arrow.ErrInvalid)
			return decimal256.Num{}
		})
	}
	return ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal256.Num,  *error) decimal256.Num {
		,  := .Rescale(, )
		if  != nil {
			* = fmt.Errorf("%w: %s", arrow.ErrInvalid, *)
			return decimal256.Num{}
		}

		if .FitsInPrecision() {
			return 
		}

		* = fmt.Errorf("%w: decimal value does not fit in precision", arrow.ErrInvalid)
		return decimal256.Num{}
	})
}

func safeRescaleDecimal128Out( arrow.Type, , ,  int32) exec.ArrayKernelExec {
	if  == arrow.DECIMAL128 {
		return ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal128.Num,  *error) decimal128.Num {
			,  := .Rescale(, )
			if  != nil {
				* = fmt.Errorf("%w: %s", arrow.ErrInvalid, *)
				return decimal128.Num{}
			}

			if .FitsInPrecision() {
				return 
			}

			* = fmt.Errorf("%w: decimal value does not fit in precision", arrow.ErrInvalid)
			return decimal128.Num{}
		})
	}
	return ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal256.Num,  *error) decimal128.Num {
		,  := .Rescale(, )
		if  != nil {
			* = fmt.Errorf("%w: %s", arrow.ErrInvalid, *)
			return decimal128.Num{}
		}

		if .FitsInPrecision() {
			 := .Array()
			return decimal128.New(int64([1]), [0])
		}

		* = fmt.Errorf("%w: decimal value does not fit in precision", arrow.ErrInvalid)
		return decimal128.Num{}
	})
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		              = .State.(CastState)
		            = .Values[0].Type()
		           = .Type
		,  int32
		      int32
	)

	switch dt := .(type) {
	case *arrow.Decimal128Type:
		 = .Scale
	case *arrow.Decimal256Type:
		 = .Scale
	}

	switch dt := .(type) {
	case *arrow.Decimal128Type:
		 = .Scale
		 = .Precision
	case *arrow.Decimal256Type:
		 = .Scale
		 = .Precision
	}

	if .AllowDecimalTruncate {
		if  <  {
			// unsafe upscale
			if .ID() == arrow.DECIMAL128 {
				 := unsafeUpscaleDecimal128Out(.ID(), -)
				return (, , )
			}
			 := unsafeUpscaleDecimal256Out(.ID(), -)
			return (, , )
		} else {
			// unsafe downscale
			if .ID() == arrow.DECIMAL128 {
				 := unsafeDownscaleDecimal128Out(.ID(), -)
				return (, , )
			}
			 := unsafeDownscaleDecimal256Out(.ID(), -)
			return (, , )
		}
	}

	// safe rescale
	if .ID() == arrow.DECIMAL128 {
		 := safeRescaleDecimal128Out(.ID(), , , )
		return (, , )
	}
	 := safeRescaleDecimal256Out(.ID(), , , )
	return (, , )
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		,  int32
		  bool
		    exec.ArrayKernelExec
		        = .State.(CastState)
	)

	 = .AllowDecimalTruncate
	switch dt := .Type.(type) {
	case *arrow.Decimal128Type:
		,  = .Precision, .Scale
		 = ScalarUnaryNotNull(func( *exec.KernelCtx,  float32,  *error) decimal128.Num {
			,  := decimal128.FromFloat32(, , )
			if  == nil {
				return 
			}

			if ! {
				* = fmt.Errorf("%w: %s", arrow.ErrInvalid, )
			}
			return decimal128.Num{}
		})
	case *arrow.Decimal256Type:
		,  = .Precision, .Scale
		 = ScalarUnaryNotNull(func( *exec.KernelCtx,  float32,  *error) decimal256.Num {
			,  := decimal256.FromFloat32(, , )
			if  == nil {
				return 
			}

			if ! {
				* = fmt.Errorf("%w: %s", arrow.ErrInvalid, )
			}
			return decimal256.Num{}
		})
	}

	return (, , )
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		,  int32
		  bool
		    exec.ArrayKernelExec
		        = .State.(CastState)
	)

	 = .AllowDecimalTruncate
	switch dt := .Type.(type) {
	case *arrow.Decimal128Type:
		,  = .Precision, .Scale
		 = ScalarUnaryNotNull(func( *exec.KernelCtx,  float64,  *error) decimal128.Num {
			,  := decimal128.FromFloat64(, , )
			if  == nil {
				return 
			}

			if ! {
				* = fmt.Errorf("%w: %s", arrow.ErrInvalid, )
			}
			return decimal128.Num{}
		})
	case *arrow.Decimal256Type:
		,  = .Precision, .Scale
		 = ScalarUnaryNotNull(func( *exec.KernelCtx,  float64,  *error) decimal256.Num {
			,  := decimal256.FromFloat64(, , )
			if  == nil {
				return 
			}

			if ! {
				* = fmt.Errorf("%w: %s", arrow.ErrInvalid, )
			}
			return decimal256.Num{}
		})
	}

	return (, , )
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		 exec.ArrayKernelExec
	)

	switch dt := .Values[0].Array.Type.(type) {
	case *arrow.Decimal128Type:
		 := .Scale
		 = ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal128.Num,  *error) float16.Num {
			return float16.New(.ToFloat32())
		})
	case *arrow.Decimal256Type:
		 := .Scale
		 = ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal256.Num,  *error) float16.Num {
			return float16.New(.ToFloat32())
		})
	}

	return (, , )
}

func [ constraints.Float]( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		 exec.ArrayKernelExec
	)

	switch dt := .Values[0].Array.Type.(type) {
	case *arrow.Decimal128Type:
		 := .Scale
		 = ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal128.Num,  *error)  {
			return (.ToFloat64())
		})
	case *arrow.Decimal256Type:
		 := .Scale
		 = ScalarUnaryNotNull(func( *exec.KernelCtx,  decimal256.Num,  *error)  {
			return (.ToFloat64())
		})
	}

	return (, , )
}

func boolToNum[ numeric]( *exec.KernelCtx,  []byte,  []) error {
	var (
		 
		  = (1)
	)

	for  := range  {
		if bitutil.BitIsSet(, ) {
			[] = 
		} else {
			[] = 
		}
	}
	return nil
}

func boolToFloat16( *exec.KernelCtx,  []byte,  []float16.Num) error {
	var (
		 float16.Num
		  = float16.New(1)
	)

	for  := range  {
		if bitutil.BitIsSet(, ) {
			[] = 
		} else {
			[] = 
		}
	}
	return nil
}

func wasTrunc[ constraints.Float | float16.Num,  arrow.IntType | arrow.UintType]( ,  ) bool {
	switch v := any().(type) {
	case float16.Num:
		return float16.New(float32()) != 
	case float32:
		return float32() != 
	case float64:
		return float64() != 
	default:
		return false
	}
}

func wasTruncMaybeNull[ constraints.Float | float16.Num,  arrow.IntType | arrow.UintType]( ,  ,  bool) bool {
	switch v := any().(type) {
	case float16.Num:
		return  && (float16.New(float32()) != )
	case float32:
		return  && (float32() != )
	case float64:
		return  && (float64() != )
	default:
		return false
	}
}

func checkFloatTrunc[ constraints.Float | float16.Num,  arrow.IntType | arrow.UintType](,  *exec.ArraySpan) error {
	 := func( ) error {
		return fmt.Errorf("%w: float value %f was truncated converting to %s",
			arrow.ErrInvalid, , .Type)
	}

	 := exec.GetSpanValues[](, 1)
	 := exec.GetSpanValues[](, 1)

	 := .Buffers[0].Buf
	 := bitutils.NewOptionalBitBlockCounter(, .Offset, .Len)
	,  := int64(0), int64(0)
	for  < .Len {
		 := .NextBlock()
		 := false
		if .Popcnt == .Len {
			// fast path: branchless
			for  := 0;  < int(.Len); ++ {
				 =  || wasTrunc([], [])
			}
		} else if .Popcnt > 0 {
			// must only bounds check non-null
			for  := 0;  < int(.Len); ++ {
				 =  || wasTruncMaybeNull([], [], bitutil.BitIsSet(, int()+))
			}
		}
		if  {
			if .Nulls > 0 {
				for  := 0;  < int(.Len); ++ {
					if wasTruncMaybeNull([], [], bitutil.BitIsSet(, int()+)) {
						return ([])
					}
				}
			} else {
				for  := 0;  < int(.Len); ++ {
					if wasTrunc([], []) {
						return ([])
					}
				}
			}
		}
		 = [.Len:]
		 = [.Len:]
		 += int64(.Len)
		 += int64(.Len)
	}
	return nil
}

func checkFloatToIntTruncImpl[ constraints.Float | float16.Num](,  *exec.ArraySpan) error {
	switch .Type.ID() {
	case arrow.INT8:
		return checkFloatTrunc[, int8](, )
	case arrow.UINT8:
		return checkFloatTrunc[, uint8](, )
	case arrow.INT16:
		return checkFloatTrunc[, int16](, )
	case arrow.UINT16:
		return checkFloatTrunc[, uint16](, )
	case arrow.INT32:
		return checkFloatTrunc[, int32](, )
	case arrow.UINT32:
		return checkFloatTrunc[, uint32](, )
	case arrow.INT64:
		return checkFloatTrunc[, int64](, )
	case arrow.UINT64:
		return checkFloatTrunc[, uint64](, )
	}
	debug.Assert(false, "float to int truncation only for integer output")
	return nil
}

func checkFloatToIntTrunc(,  *exec.ArraySpan) error {
	switch .Type.ID() {
	case arrow.FLOAT16:
		return checkFloatToIntTruncImpl[float16.Num](, )
	case arrow.FLOAT32:
		return checkFloatToIntTruncImpl[float32](, )
	case arrow.FLOAT64:
		return checkFloatToIntTruncImpl[float64](, )
	}
	debug.Assert(false, "float to int truncation only for float32 and float64")
	return nil
}

func checkIntToFloatTrunc( *exec.ArraySpan,  arrow.Type) error {
	switch .Type.ID() {
	case arrow.INT8, arrow.INT16, arrow.UINT8, arrow.UINT16:
		// small integers are all exactly representable as whole numbers
		return nil
	case arrow.INT32:
		if  == arrow.FLOAT64 {
			return nil
		}
		const  = int32(1 << 24)
		return intsInRange(, -, )
	case arrow.UINT32:
		if  == arrow.FLOAT64 {
			return nil
		}
		return intsInRange(, 0, uint32(1<<24))
	case arrow.INT64:
		if  == arrow.FLOAT32 {
			const  = int64(1 << 24)
			return intsInRange(, -, )
		}
		const  = int64(1 << 53)
		return intsInRange(, -, )
	case arrow.UINT64:
		if  == arrow.FLOAT32 {
			return intsInRange(, 0, uint64(1<<24))
		}
		return intsInRange(, 0, uint64(1<<53))
	}
	debug.Assert(false, "intToFloatTrunc should only be called with int input")
	return nil
}

func parseStringToNumberImpl[ arrow.IntType | arrow.UintType | arrow.FloatType,  int32 | int64]( func(string) (, error)) exec.ArrayKernelExec {
	return ScalarUnaryNotNullBinaryArg[, ](func( *exec.KernelCtx,  []byte,  *error)  {
		 := *(*string)(unsafe.Pointer(&))
		,  := ()
		if  != nil {
			* = fmt.Errorf("%w: %s", arrow.ErrInvalid, )
		}
		return 
	})
}

func getParseStringExec[ int32 | int64]( arrow.Type) exec.ArrayKernelExec {
	switch  {
	case arrow.INT8:
		return parseStringToNumberImpl[int8, ](func( string) (int8, error) {
			,  := strconv.ParseInt(, 0, 8)
			return int8(), 
		})
	case arrow.UINT8:
		return parseStringToNumberImpl[uint8, ](func( string) (uint8, error) {
			,  := strconv.ParseUint(, 0, 8)
			return uint8(), 
		})
	case arrow.INT16:
		return parseStringToNumberImpl[int16, ](func( string) (int16, error) {
			,  := strconv.ParseInt(, 0, 16)
			return int16(), 
		})
	case arrow.UINT16:
		return parseStringToNumberImpl[uint16, ](func( string) (uint16, error) {
			,  := strconv.ParseUint(, 0, 16)
			return uint16(), 
		})
	case arrow.INT32:
		return parseStringToNumberImpl[int32, ](func( string) (int32, error) {
			,  := strconv.ParseInt(, 0, 32)
			return int32(), 
		})
	case arrow.UINT32:
		return parseStringToNumberImpl[uint32, ](func( string) (uint32, error) {
			,  := strconv.ParseUint(, 0, 32)
			return uint32(), 
		})
	case arrow.INT64:
		return parseStringToNumberImpl[int64, ](func( string) (int64, error) {
			return strconv.ParseInt(, 0, 64)
		})
	case arrow.UINT64:
		return parseStringToNumberImpl[uint64, ](func( string) (uint64, error) {
			return strconv.ParseUint(, 0, 64)
		})
	case arrow.FLOAT32:
		return parseStringToNumberImpl[float32, ](func( string) (float32, error) {
			,  := strconv.ParseFloat(, 32)
			return float32(), 
		})
	case arrow.FLOAT64:
		return parseStringToNumberImpl[float64, ](func( string) (float64, error) {
			return strconv.ParseFloat(, 64)
		})
	}
	panic("invalid type for getParseStringExec")
}

func addFloat16Casts( arrow.DataType,  []exec.ScalarKernel) []exec.ScalarKernel {
	 = append(, GetCommonCastKernels(.ID(), exec.NewOutputType())...)

	 = append(, exec.NewScalarKernel(
		[]exec.InputType{exec.NewExactInput(arrow.FixedWidthTypes.Boolean)},
		exec.NewOutputType(), ScalarUnaryBoolArg(boolToFloat16), nil))

	for ,  := range []arrow.DataType{arrow.BinaryTypes.Binary, arrow.BinaryTypes.String} {
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewExactInput()}, exec.NewOutputType(),
			getParseStringExec[int32](.ID()), nil))
	}
	for ,  := range []arrow.DataType{arrow.BinaryTypes.LargeBinary, arrow.BinaryTypes.LargeString} {
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewExactInput()}, exec.NewOutputType(),
			getParseStringExec[int64](.ID()), nil))
	}
	return 
}

func addCommonNumberCasts[ numeric]( arrow.DataType,  []exec.ScalarKernel) []exec.ScalarKernel {
	 = append(, GetCommonCastKernels(.ID(), exec.NewOutputType())...)

	 = append(, exec.NewScalarKernel(
		[]exec.InputType{exec.NewExactInput(arrow.FixedWidthTypes.Boolean)},
		exec.NewOutputType(), ScalarUnaryBoolArg(boolToNum[]), nil))

	for ,  := range []arrow.DataType{arrow.BinaryTypes.Binary, arrow.BinaryTypes.String} {
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewExactInput()}, exec.NewOutputType(),
			getParseStringExec[int32](.ID()), nil))
	}
	for ,  := range []arrow.DataType{arrow.BinaryTypes.LargeBinary, arrow.BinaryTypes.LargeString} {
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewExactInput()}, exec.NewOutputType(),
			getParseStringExec[int64](.ID()), nil))
	}
	return 
}

func [ arrow.IntType | arrow.UintType]( arrow.DataType) []exec.ScalarKernel {
	 := make([]exec.ScalarKernel, 0)

	 := exec.NewOutputType()
	for ,  := range intTypes {
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewExactInput()}, ,
			CastIntToInt, nil))
	}

	for ,  := range append(floatingTypes, arrow.FixedWidthTypes.Float16) {
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewExactInput()}, ,
			CastFloatingToInteger, nil))
	}

	 = addCommonNumberCasts[](, )
	 = append(, exec.NewScalarKernel(
		[]exec.InputType{exec.NewIDInput(arrow.DECIMAL128)}, ,
		CastDecimal128ToInteger[], nil))
	 = append(, exec.NewScalarKernel(
		[]exec.InputType{exec.NewIDInput(arrow.DECIMAL256)}, ,
		CastDecimal256ToInteger[], nil))
	return 
}

func [ constraints.Float | float16.Num]( arrow.DataType) []exec.ScalarKernel {
	 := make([]exec.ScalarKernel, 0)

	 := exec.NewOutputType()
	for ,  := range intTypes {
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewExactInput()}, ,
			CastIntegerToFloating, nil))
	}

	for ,  := range append(floatingTypes, arrow.FixedWidthTypes.Float16) {
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewExactInput()}, ,
			CastFloatingToFloating, nil))
	}

	var  
	switch any().(type) {
	case float16.Num:
		 = addFloat16Casts(, )
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewIDInput(arrow.DECIMAL128)}, ,
			CastDecimalToFloat16, nil))
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewIDInput(arrow.DECIMAL256)}, ,
			CastDecimalToFloat16, nil))
	case float32:
		 = addCommonNumberCasts[float32](, )
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewIDInput(arrow.DECIMAL128)}, ,
			CastDecimalToFloating[float32], nil))
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewIDInput(arrow.DECIMAL256)}, ,
			CastDecimalToFloating[float32], nil))
	case float64:
		 = addCommonNumberCasts[float64](, )
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewIDInput(arrow.DECIMAL128)}, ,
			CastDecimalToFloating[float64], nil))
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewIDInput(arrow.DECIMAL256)}, ,
			CastDecimalToFloating[float64], nil))
	}

	return 
}

func resolveOutputFromOptions( *exec.KernelCtx,  []arrow.DataType) (arrow.DataType, error) {
	return .State.(CastState).ToType, nil
}

func () []exec.ScalarKernel {
	 := exec.NewComputedOutputType(resolveOutputFromOptions)

	 := make([]exec.ScalarKernel, 0)
	 = append(, GetCommonCastKernels(arrow.DECIMAL128, )...)

	// cast from floating point
	 = append(, exec.NewScalarKernel(
		[]exec.InputType{exec.NewExactInput(arrow.PrimitiveTypes.Float32)},
		, CastFloat32ToDecimal, nil))
	 = append(, exec.NewScalarKernel(
		[]exec.InputType{exec.NewExactInput(arrow.PrimitiveTypes.Float64)},
		, CastFloat64ToDecimal, nil))

	// cast from integer
	for ,  := range intTypes {
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewExactInput()}, ,
			getCastIntToDecimal[decimal128.Num](.ID()), nil))
	}

	 = append(, exec.NewScalarKernel(
		[]exec.InputType{exec.NewIDInput(arrow.DECIMAL128)}, ,
		CastDecimalToDecimal, nil))
	 = append(, exec.NewScalarKernel(
		[]exec.InputType{exec.NewIDInput(arrow.DECIMAL256)}, ,
		CastDecimalToDecimal, nil))
	return 
}

func () []exec.ScalarKernel {
	 := exec.NewComputedOutputType(resolveOutputFromOptions)

	 := make([]exec.ScalarKernel, 0)
	 = append(, GetCommonCastKernels(arrow.DECIMAL256, )...)

	// cast from floating point
	 = append(, exec.NewScalarKernel(
		[]exec.InputType{exec.NewExactInput(arrow.PrimitiveTypes.Float32)},
		, CastFloat32ToDecimal, nil))
	 = append(, exec.NewScalarKernel(
		[]exec.InputType{exec.NewExactInput(arrow.PrimitiveTypes.Float64)},
		, CastFloat64ToDecimal, nil))

	// cast from integer
	for ,  := range intTypes {
		 = append(, exec.NewScalarKernel(
			[]exec.InputType{exec.NewExactInput()}, ,
			getCastIntToDecimal[decimal256.Num](.ID()), nil))
	}

	 = append(, exec.NewScalarKernel(
		[]exec.InputType{exec.NewIDInput(arrow.DECIMAL128)}, ,
		CastDecimalToDecimal, nil))
	 = append(, exec.NewScalarKernel(
		[]exec.InputType{exec.NewIDInput(arrow.DECIMAL256)}, ,
		CastDecimalToDecimal, nil))
	return 
}