package kernels
import (
"fmt"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/compute/exec"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/apache/arrow-go/v18/internal/bitutils"
"golang.org/x/exp/constraints"
)
func ScalarUnary [OutT , Arg0T arrow .FixedWidthType ](op func (*exec .KernelCtx , []Arg0T , []OutT ) error ) exec .ArrayKernelExec {
return func (ctx *exec .KernelCtx , in *exec .ExecSpan , out *exec .ExecResult ) error {
arg0 := in .Values [0 ].Array
inData := exec .GetSpanValues [Arg0T ](&arg0 , 1 )
outData := exec .GetSpanValues [OutT ](out , 1 )
return op (ctx , inData , outData )
}
}
func ScalarUnaryNotNull [OutT , Arg0T arrow .FixedWidthType ](op func (*exec .KernelCtx , Arg0T , *error ) OutT ) exec .ArrayKernelExec {
return func (ctx *exec .KernelCtx , in *exec .ExecSpan , out *exec .ExecResult ) error {
var (
arg0 = &in .Values [0 ].Array
arg0Data = exec .GetSpanValues [Arg0T ](arg0 , 1 )
outPos = 0
def OutT
outData = exec .GetSpanValues [OutT ](out , 1 )
bitmap = arg0 .Buffers [0 ].Buf
err error
)
bitutils .VisitBitBlocks (bitmap , arg0 .Offset , arg0 .Len ,
func (pos int64 ) {
outData [outPos ] = op (ctx , arg0Data [pos ], &err )
outPos ++
}, func () {
outData [outPos ] = def
outPos ++
})
return err
}
}
func ScalarUnaryBoolOutput [Arg0T arrow .FixedWidthType ](op func (*exec .KernelCtx , []Arg0T , []byte ) error ) exec .ArrayKernelExec {
return func (ctx *exec .KernelCtx , in *exec .ExecSpan , out *exec .ExecResult ) error {
arg0 := in .Values [0 ].Array
inData := exec .GetSpanValues [Arg0T ](&arg0 , 1 )
return op (ctx , inData , out .Buffers [1 ].Buf )
}
}
func ScalarUnaryNotNullBinaryArgBoolOut [OffsetT int32 | int64 ](defVal bool , op func (*exec .KernelCtx , []byte , *error ) bool ) exec .ArrayKernelExec {
return func (ctx *exec .KernelCtx , in *exec .ExecSpan , out *exec .ExecResult ) error {
var (
arg0 = in .Values [0 ].Array
outData = out .Buffers [1 ].Buf
outPos = 0
arg0Offsets = exec .GetSpanOffsets [OffsetT ](&arg0 , 1 )
arg0Data = arg0 .Buffers [2 ].Buf
bitmap = arg0 .Buffers [0 ].Buf
err error
)
bitutils .VisitBitBlocks (bitmap , arg0 .Offset , arg0 .Len ,
func (pos int64 ) {
v := arg0Data [arg0Offsets [pos ]:arg0Offsets [pos +1 ]]
bitutil .SetBitTo (outData , int (out .Offset )+outPos , op (ctx , v , &err ))
outPos ++
}, func () {
bitutil .SetBitTo (outData , int (out .Offset )+outPos , defVal )
outPos ++
})
return err
}
}
func ScalarUnaryNotNullBinaryArg [OutT arrow .FixedWidthType , OffsetT int32 | int64 ](op func (*exec .KernelCtx , []byte , *error ) OutT ) exec .ArrayKernelExec {
return func (ctx *exec .KernelCtx , in *exec .ExecSpan , out *exec .ExecResult ) error {
var (
arg0 = &in .Values [0 ].Array
outData = exec .GetSpanValues [OutT ](out , 1 )
outPos = 0
arg0Offsets = exec .GetSpanOffsets [OffsetT ](arg0 , 1 )
def OutT
arg0Data = arg0 .Buffers [2 ].Buf
bitmap = arg0 .Buffers [0 ].Buf
err error
)
bitutils .VisitBitBlocks (bitmap , arg0 .Offset , arg0 .Len ,
func (pos int64 ) {
v := arg0Data [arg0Offsets [pos ]:arg0Offsets [pos +1 ]]
outData [outPos ] = op (ctx , v , &err )
outPos ++
}, func () {
outData [outPos ] = def
outPos ++
})
return err
}
}
func ScalarUnaryBoolArg [OutT arrow .FixedWidthType ](op func (*exec .KernelCtx , []byte , []OutT ) error ) exec .ArrayKernelExec {
return func (ctx *exec .KernelCtx , input *exec .ExecSpan , out *exec .ExecResult ) error {
outData := exec .GetSpanValues [OutT ](out , 1 )
return op (ctx , input .Values [0 ].Array .Buffers [1 ].Buf , outData )
}
}
func UnboxScalar [T arrow .FixedWidthType ](val scalar .PrimitiveScalar ) T {
return *(*T )(unsafe .Pointer (&val .Data ()[0 ]))
}
func UnboxBinaryScalar (val scalar .BinaryScalar ) []byte {
if !val .IsValid () {
return nil
}
return val .Data ()
}
type arrArrFn[OutT , Arg0T , Arg1T arrow .FixedWidthType ] func (*exec .KernelCtx , []Arg0T , []Arg1T , []OutT ) error
type arrScalarFn[OutT , Arg0T , Arg1T arrow .FixedWidthType ] func (*exec .KernelCtx , []Arg0T , Arg1T , []OutT ) error
type scalarArrFn[OutT , Arg0T , Arg1T arrow .FixedWidthType ] func (*exec .KernelCtx , Arg0T , []Arg1T , []OutT ) error
type binaryOps[OutT , Arg0T , Arg1T arrow .FixedWidthType ] struct {
arrArr arrArrFn [OutT , Arg0T , Arg1T ]
arrScalar arrScalarFn [OutT , Arg0T , Arg1T ]
scalarArr scalarArrFn [OutT , Arg0T , Arg1T ]
}
type binaryBoolOps struct {
arrArr func (ctx *exec .KernelCtx , lhs, rhs, out bitutil .Bitmap ) error
arrScalar func (ctx *exec .KernelCtx , lhs bitutil .Bitmap , rhs bool , out bitutil .Bitmap ) error
scalarArr func (ctx *exec .KernelCtx , lhs bool , rhs, out bitutil .Bitmap ) error
}
func ScalarBinary [OutT , Arg0T , Arg1T arrow .FixedWidthType ](ops binaryOps [OutT , Arg0T , Arg1T ]) exec .ArrayKernelExec {
arrayArray := func (ctx *exec .KernelCtx , arg0 , arg1 *exec .ArraySpan , out *exec .ExecResult ) error {
var (
a0 = exec .GetSpanValues [Arg0T ](arg0 , 1 )
a1 = exec .GetSpanValues [Arg1T ](arg1 , 1 )
outData = exec .GetSpanValues [OutT ](out , 1 )
)
return ops .arrArr (ctx , a0 , a1 , outData )
}
arrayScalar := func (ctx *exec .KernelCtx , arg0 *exec .ArraySpan , arg1 scalar .Scalar , out *exec .ExecResult ) error {
var (
a0 = exec .GetSpanValues [Arg0T ](arg0 , 1 )
a1 = UnboxScalar [Arg1T ](arg1 .(scalar .PrimitiveScalar ))
outData = exec .GetSpanValues [OutT ](out , 1 )
)
return ops .arrScalar (ctx , a0 , a1 , outData )
}
scalarArray := func (ctx *exec .KernelCtx , arg0 scalar .Scalar , arg1 *exec .ArraySpan , out *exec .ExecResult ) error {
var (
a0 = UnboxScalar [Arg0T ](arg0 .(scalar .PrimitiveScalar ))
a1 = exec .GetSpanValues [Arg1T ](arg1 , 1 )
outData = exec .GetSpanValues [OutT ](out , 1 )
)
return ops .scalarArr (ctx , a0 , a1 , outData )
}
return func (ctx *exec .KernelCtx , batch *exec .ExecSpan , out *exec .ExecResult ) error {
if batch .Values [0 ].IsArray () {
if batch .Values [1 ].IsArray () {
return arrayArray (ctx , &batch .Values [0 ].Array , &batch .Values [1 ].Array , out )
}
return arrayScalar (ctx , &batch .Values [0 ].Array , batch .Values [1 ].Scalar , out )
}
if batch .Values [1 ].IsArray () {
return scalarArray (ctx , batch .Values [0 ].Scalar , &batch .Values [1 ].Array , out )
}
debug .Assert (false , "should be unreachable" )
return fmt .Errorf ("%w: scalar binary with two scalars?" , arrow .ErrInvalid )
}
}
func ScalarBinaryBools (ops *binaryBoolOps ) exec .ArrayKernelExec {
arrayArray := func (ctx *exec .KernelCtx , arg0 , arg1 *exec .ArraySpan , out *exec .ExecResult ) error {
var (
a0Bm = bitutil .Bitmap {Data : arg0 .Buffers [1 ].Buf , Offset : arg0 .Offset , Len : arg0 .Len }
a1Bm = bitutil .Bitmap {Data : arg1 .Buffers [1 ].Buf , Offset : arg1 .Offset , Len : arg1 .Len }
outBm = bitutil .Bitmap {Data : out .Buffers [1 ].Buf , Offset : out .Offset , Len : out .Len }
)
return ops .arrArr (ctx , a0Bm , a1Bm , outBm )
}
arrayScalar := func (ctx *exec .KernelCtx , arg0 *exec .ArraySpan , arg1 scalar .Scalar , out *exec .ExecResult ) error {
var (
a0Bm = bitutil .Bitmap {Data : arg0 .Buffers [1 ].Buf , Offset : arg0 .Offset , Len : arg0 .Len }
a1 = arg1 .(*scalar .Boolean ).Value
outBm = bitutil .Bitmap {Data : out .Buffers [1 ].Buf , Offset : out .Offset , Len : out .Len }
)
return ops .arrScalar (ctx , a0Bm , a1 , outBm )
}
scalarArray := func (ctx *exec .KernelCtx , arg0 scalar .Scalar , arg1 *exec .ArraySpan , out *exec .ExecResult ) error {
var (
a0 = arg0 .(*scalar .Boolean ).Value
a1Bm = bitutil .Bitmap {Data : arg1 .Buffers [1 ].Buf , Offset : arg1 .Offset , Len : arg1 .Len }
outBm = bitutil .Bitmap {Data : out .Buffers [1 ].Buf , Offset : out .Offset , Len : out .Len }
)
return ops .scalarArr (ctx , a0 , a1Bm , outBm )
}
return func (ctx *exec .KernelCtx , batch *exec .ExecSpan , out *exec .ExecResult ) error {
if batch .Values [0 ].IsArray () {
if batch .Values [1 ].IsArray () {
return arrayArray (ctx , &batch .Values [0 ].Array , &batch .Values [1 ].Array , out )
}
return arrayScalar (ctx , &batch .Values [0 ].Array , batch .Values [1 ].Scalar , out )
}
if batch .Values [1 ].IsArray () {
return scalarArray (ctx , batch .Values [0 ].Scalar , &batch .Values [1 ].Array , out )
}
debug .Assert (false , "should be unreachable" )
return fmt .Errorf ("%w: scalar binary with two scalars?" , arrow .ErrInvalid )
}
}
func ScalarBinaryNotNull [OutT , Arg0T , Arg1T arrow .FixedWidthType ](op func (*exec .KernelCtx , Arg0T , Arg1T , *error ) OutT ) exec .ArrayKernelExec {
arrayArray := func (ctx *exec .KernelCtx , arg0 , arg1 *exec .ArraySpan , out *exec .ExecResult ) (err error ) {
if arg0 .UpdateNullCount () == arg0 .Len || arg1 .UpdateNullCount () == arg1 .Len {
return nil
}
var (
a0 = exec .GetSpanValues [Arg0T ](arg0 , 1 )
a1 = exec .GetSpanValues [Arg1T ](arg1 , 1 )
outData = exec .GetSpanValues [OutT ](out , 1 )
outPos int64
def OutT
)
bitutils .VisitTwoBitBlocks (arg0 .Buffers [0 ].Buf , arg1 .Buffers [0 ].Buf , arg0 .Offset , arg1 .Offset , out .Len ,
func (pos int64 ) {
outData [outPos ] = op (ctx , a0 [pos ], a1 [pos ], &err )
outPos ++
}, func () {
outData [outPos ] = def
outPos ++
})
return
}
arrayScalar := func (ctx *exec .KernelCtx , arg0 *exec .ArraySpan , arg1 scalar .Scalar , out *exec .ExecResult ) (err error ) {
if arg0 .UpdateNullCount () == arg0 .Len || !arg1 .IsValid () {
return nil
}
var (
a0 = exec .GetSpanValues [Arg0T ](arg0 , 1 )
outData = exec .GetSpanValues [OutT ](out , 1 )
outPos int64
def OutT
)
if !arg1 .IsValid () {
return nil
}
a1 := UnboxScalar [Arg1T ](arg1 .(scalar .PrimitiveScalar ))
bitutils .VisitBitBlocks (arg0 .Buffers [0 ].Buf , arg0 .Offset , arg0 .Len ,
func (pos int64 ) {
outData [outPos ] = op (ctx , a0 [pos ], a1 , &err )
outPos ++
}, func () {
outData [outPos ] = def
outPos ++
})
return
}
scalarArray := func (ctx *exec .KernelCtx , arg0 scalar .Scalar , arg1 *exec .ArraySpan , out *exec .ExecResult ) (err error ) {
if arg1 .UpdateNullCount () == arg1 .Len || !arg0 .IsValid () {
return nil
}
var (
a1 = exec .GetSpanValues [Arg1T ](arg1 , 1 )
outData = exec .GetSpanValues [OutT ](out , 1 )
outPos int64
def OutT
)
if !arg0 .IsValid () {
return nil
}
a0 := UnboxScalar [Arg0T ](arg0 .(scalar .PrimitiveScalar ))
bitutils .VisitBitBlocks (arg1 .Buffers [0 ].Buf , arg1 .Offset , arg1 .Len ,
func (pos int64 ) {
outData [outPos ] = op (ctx , a0 , a1 [pos ], &err )
outPos ++
}, func () {
outData [outPos ] = def
outPos ++
})
return
}
return func (ctx *exec .KernelCtx , batch *exec .ExecSpan , out *exec .ExecResult ) error {
if batch .Values [0 ].IsArray () {
if batch .Values [1 ].IsArray () {
return arrayArray (ctx , &batch .Values [0 ].Array , &batch .Values [1 ].Array , out )
}
return arrayScalar (ctx , &batch .Values [0 ].Array , batch .Values [1 ].Scalar , out )
}
if batch .Values [1 ].IsArray () {
return scalarArray (ctx , batch .Values [0 ].Scalar , &batch .Values [1 ].Array , out )
}
debug .Assert (false , "should be unreachable" )
return fmt .Errorf ("%w: scalar binary with two scalars?" , arrow .ErrInvalid )
}
}
type binaryBinOp[T arrow .FixedWidthType | bool ] func (ctx *exec .KernelCtx , arg0, arg1 []byte ) T
func ScalarBinaryBinaryArgsBoolOut (itrFn func (*exec .ArraySpan ) exec .ArrayIter [[]byte ], op binaryBinOp [bool ]) exec .ArrayKernelExec {
arrArr := func (ctx *exec .KernelCtx , arg0 , arg1 *exec .ArraySpan , out *exec .ExecResult ) error {
var (
arg0It = itrFn (arg0 )
arg1It = itrFn (arg1 )
)
bitutils .GenerateBitsUnrolled (out .Buffers [1 ].Buf , out .Offset , out .Len , func () bool {
return op (ctx , arg0It .Next (), arg1It .Next ())
})
return nil
}
arrScalar := func (ctx *exec .KernelCtx , arg0 *exec .ArraySpan , arg1 scalar .Scalar , out *exec .ExecResult ) error {
var (
arg0It = itrFn (arg0 )
a1 = UnboxBinaryScalar (arg1 .(scalar .BinaryScalar ))
)
bitutils .GenerateBitsUnrolled (out .Buffers [1 ].Buf , out .Offset , out .Len , func () bool {
return op (ctx , arg0It .Next (), a1 )
})
return nil
}
scalarArr := func (ctx *exec .KernelCtx , arg0 scalar .Scalar , arg1 *exec .ArraySpan , out *exec .ExecResult ) error {
var (
arg1It = itrFn (arg1 )
a0 = UnboxBinaryScalar (arg0 .(scalar .BinaryScalar ))
)
bitutils .GenerateBitsUnrolled (out .Buffers [1 ].Buf , out .Offset , out .Len , func () bool {
return op (ctx , a0 , arg1It .Next ())
})
return nil
}
return func (ctx *exec .KernelCtx , batch *exec .ExecSpan , out *exec .ExecResult ) error {
if batch .Values [0 ].IsArray () {
if batch .Values [1 ].IsArray () {
return arrArr (ctx , &batch .Values [0 ].Array , &batch .Values [1 ].Array , out )
}
return arrScalar (ctx , &batch .Values [0 ].Array , batch .Values [1 ].Scalar , out )
}
if batch .Values [1 ].IsArray () {
return scalarArr (ctx , batch .Values [0 ].Scalar , &batch .Values [1 ].Array , out )
}
debug .Assert (false , "should be unreachable" )
return fmt .Errorf ("%w: scalar binary with two scalars?" , arrow .ErrInvalid )
}
}
func SizeOf [T constraints .Integer ]() uint {
x := uint16 (1 << 8 )
y := uint32 (2 << 16 )
z := uint64 (4 << 32 )
return 1 + uint (T (x ))>>8 + uint (T (y ))>>16 + uint (T (z ))>>32
}
func MinOf [T constraints .Integer ]() T {
if ones := ^T (0 ); ones < 0 {
return ones << (8 *SizeOf [T ]() - 1 )
}
return 0
}
func MaxOf [T constraints .Integer ]() T {
ones := ^T (0 )
if ones < 0 {
return ones ^ (ones << (8 *SizeOf [T ]() - 1 ))
}
return ones
}
func getSafeMinSameSign[I , O constraints .Integer ]() I {
if SizeOf [I ]() > SizeOf [O ]() {
return I (MinOf [O ]())
}
return MinOf [I ]()
}
func getSafeMaxSameSign[I , O constraints .Integer ]() I {
if SizeOf [I ]() > SizeOf [O ]() {
return I (MaxOf [O ]())
}
return MaxOf [I ]()
}
func getSafeMaxSignedUnsigned[I constraints .Signed , O constraints .Unsigned ]() I {
if SizeOf [I ]() <= SizeOf [O ]() {
return MaxOf [I ]()
}
return I (MaxOf [O ]())
}
func getSafeMaxUnsignedSigned[I constraints .Unsigned , O constraints .Signed ]() I {
if SizeOf [I ]() < SizeOf [O ]() {
return MaxOf [I ]()
}
return I (MaxOf [O ]())
}
func getSafeMinMaxSigned[T constraints .Signed ](target arrow .Type ) (min , max T ) {
switch target {
case arrow .UINT8 :
min , max = 0 , getSafeMaxSignedUnsigned [T , uint8 ]()
case arrow .UINT16 :
min , max = 0 , getSafeMaxSignedUnsigned [T , uint16 ]()
case arrow .UINT32 :
min , max = 0 , getSafeMaxSignedUnsigned [T , uint32 ]()
case arrow .UINT64 :
min , max = 0 , getSafeMaxSignedUnsigned [T , uint64 ]()
case arrow .INT8 :
min = getSafeMinSameSign [T , int8 ]()
max = getSafeMaxSameSign [T , int8 ]()
case arrow .INT16 :
min = getSafeMinSameSign [T , int16 ]()
max = getSafeMaxSameSign [T , int16 ]()
case arrow .INT32 :
min = getSafeMinSameSign [T , int32 ]()
max = getSafeMaxSameSign [T , int32 ]()
case arrow .INT64 :
min = getSafeMinSameSign [T , int64 ]()
max = getSafeMaxSameSign [T , int64 ]()
}
return
}
func getSafeMinMaxUnsigned[T constraints .Unsigned ](target arrow .Type ) (min , max T ) {
min = 0
switch target {
case arrow .UINT8 :
max = getSafeMaxSameSign [T , uint8 ]()
case arrow .UINT16 :
max = getSafeMaxSameSign [T , uint16 ]()
case arrow .UINT32 :
max = getSafeMaxSameSign [T , uint32 ]()
case arrow .UINT64 :
max = getSafeMaxSameSign [T , uint64 ]()
case arrow .INT8 :
max = getSafeMaxUnsignedSigned [T , int8 ]()
case arrow .INT16 :
max = getSafeMaxUnsignedSigned [T , int16 ]()
case arrow .INT32 :
max = getSafeMaxUnsignedSigned [T , int32 ]()
case arrow .INT64 :
max = getSafeMaxUnsignedSigned [T , int64 ]()
}
return
}
func intsCanFit(data *exec .ArraySpan , target arrow .Type ) error {
if !arrow .IsInteger (target ) {
return fmt .Errorf ("%w: target type is not an integer type %s" , arrow .ErrInvalid , target )
}
switch data .Type .ID () {
case arrow .INT8 :
min , max := getSafeMinMaxSigned [int8 ](target )
return intsInRange (data , min , max )
case arrow .UINT8 :
min , max := getSafeMinMaxUnsigned [uint8 ](target )
return intsInRange (data , min , max )
case arrow .INT16 :
min , max := getSafeMinMaxSigned [int16 ](target )
return intsInRange (data , min , max )
case arrow .UINT16 :
min , max := getSafeMinMaxUnsigned [uint16 ](target )
return intsInRange (data , min , max )
case arrow .INT32 :
min , max := getSafeMinMaxSigned [int32 ](target )
return intsInRange (data , min , max )
case arrow .UINT32 :
min , max := getSafeMinMaxUnsigned [uint32 ](target )
return intsInRange (data , min , max )
case arrow .INT64 :
min , max := getSafeMinMaxSigned [int64 ](target )
return intsInRange (data , min , max )
case arrow .UINT64 :
min , max := getSafeMinMaxUnsigned [uint64 ](target )
return intsInRange (data , min , max )
default :
return fmt .Errorf ("%w: invalid type for int bounds checking" , arrow .ErrInvalid )
}
}
func intsInRange[T arrow .IntType | arrow .UintType ](data *exec .ArraySpan , lowerBound , upperBound T ) error {
if MinOf [T ]() >= lowerBound && MaxOf [T ]() <= upperBound {
return nil
}
isOutOfBounds := func (val T ) bool {
return val < lowerBound || val > upperBound
}
isOutOfBoundsMaybeNull := func (val T , isValid bool ) bool {
return isValid && (val < lowerBound || val > upperBound )
}
getError := func (val T ) error {
return fmt .Errorf ("%w: integer value %d not in range: %d to %d" ,
arrow .ErrInvalid , val , lowerBound , upperBound )
}
values := exec .GetSpanValues [T ](data , 1 )
bitmap := data .Buffers [0 ].Buf
bitCounter := bitutils .NewOptionalBitBlockCounter (bitmap , data .Offset , data .Len )
pos , offsetPos := 0 , data .Offset
for pos < int (data .Len ) {
block := bitCounter .NextBlock ()
outOfBounds := false
if block .Popcnt == block .Len {
i := 0
for chunk := 0 ; chunk < int (block .Len )/8 ; chunk ++ {
for j := 0 ; j < 8 ; j ++ {
outOfBounds = outOfBounds || isOutOfBounds (values [i ])
i ++
}
}
for ; i < int (block .Len ); i ++ {
outOfBounds = outOfBounds || isOutOfBounds (values [i ])
}
} else if block .Popcnt > 0 {
i := 0
for chunk := 0 ; chunk < int (block .Len )/8 ; chunk ++ {
for j := 0 ; j < 8 ; j ++ {
outOfBounds = outOfBounds || isOutOfBoundsMaybeNull (
values [i ], bitutil .BitIsSet (bitmap , int (offsetPos )+i ))
i ++
}
}
for ; i < int (block .Len ); i ++ {
outOfBounds = outOfBounds || isOutOfBoundsMaybeNull (
values [i ], bitutil .BitIsSet (bitmap , int (offsetPos )+i ))
}
}
if outOfBounds {
if data .Nulls > 0 {
for i := 0 ; i < int (block .Len ); i ++ {
if isOutOfBoundsMaybeNull (values [i ], bitutil .BitIsSet (bitmap , int (offsetPos )+i )) {
return getError (values [i ])
}
}
} else {
for i := 0 ; i < int (block .Len ); i ++ {
if isOutOfBounds (values [i ]) {
return getError (values [i ])
}
}
}
}
values = values [block .Len :]
pos += int (block .Len )
offsetPos += int64 (block .Len )
}
return nil
}
type numeric interface {
arrow .IntType | arrow .UintType | constraints .Float
}
func memCpySpan[T numeric ](in , out *exec .ArraySpan ) {
inData := exec .GetSpanValues [T ](in , 1 )
outData := exec .GetSpanValues [T ](out , 1 )
copy (outData , inData )
}
func castNumberMemCpy(in , out *exec .ArraySpan ) {
switch in .Type .ID () {
case arrow .INT8 :
memCpySpan [int8 ](in , out )
case arrow .UINT8 :
memCpySpan [uint8 ](in , out )
case arrow .INT16 :
memCpySpan [int16 ](in , out )
case arrow .UINT16 :
memCpySpan [uint16 ](in , out )
case arrow .INT32 :
memCpySpan [int32 ](in , out )
case arrow .UINT32 :
memCpySpan [uint32 ](in , out )
case arrow .INT64 :
memCpySpan [int64 ](in , out )
case arrow .UINT64 :
memCpySpan [uint64 ](in , out )
case arrow .FLOAT32 :
memCpySpan [float32 ](in , out )
case arrow .FLOAT64 :
memCpySpan [float64 ](in , out )
}
}
func castNumberToNumberUnsafe(in , out *exec .ArraySpan ) {
if in .Type .ID () == out .Type .ID () {
castNumberMemCpy (in , out )
return
}
inputOffset := in .Type .(arrow .FixedWidthDataType ).Bytes () * int (in .Offset )
outputOffset := out .Type .(arrow .FixedWidthDataType ).Bytes () * int (out .Offset )
if in .Type .ID () == arrow .FLOAT16 || out .Type .ID () == arrow .FLOAT16 {
castNumericGo (in .Type .ID (), out .Type .ID (), in .Buffers [1 ].Buf [inputOffset :], out .Buffers [1 ].Buf [outputOffset :], int (in .Len ))
} else {
castNumericUnsafe (in .Type .ID (), out .Type .ID (), in .Buffers [1 ].Buf [inputOffset :], out .Buffers [1 ].Buf [outputOffset :], int (in .Len ))
}
}
func MaxDecimalDigitsForInt (id arrow .Type ) (int32 , error ) {
switch id {
case arrow .INT8 , arrow .UINT8 :
return 3 , nil
case arrow .INT16 , arrow .UINT16 :
return 5 , nil
case arrow .INT32 , arrow .UINT32 :
return 10 , nil
case arrow .INT64 :
return 19 , nil
case arrow .UINT64 :
return 20 , nil
}
return -1 , fmt .Errorf ("%w: not an integer type: %s" , arrow .ErrInvalid , id )
}
func ResolveOutputFromOptions (ctx *exec .KernelCtx , _ []arrow .DataType ) (arrow .DataType , error ) {
opts := ctx .State .(CastState )
return opts .ToType , nil
}
var OutputTargetType = exec .NewComputedOutputType (ResolveOutputFromOptions )
var OutputFirstType = exec .NewComputedOutputType (func (_ *exec .KernelCtx , args []arrow .DataType ) (arrow .DataType , error ) {
return args [0 ], nil
})
var OutputLastType = exec .NewComputedOutputType (func (_ *exec .KernelCtx , args []arrow .DataType ) (arrow .DataType , error ) {
return args [len (args )-1 ], nil
})
func resolveDecimalBinaryOpOutput(types []arrow .DataType , resolver func (prec1 , scale1 , prec2 , scale2 int32 ) (prec , scale int32 )) (arrow .DataType , error ) {
leftType , rightType := types [0 ].(arrow .DecimalType ), types [1 ].(arrow .DecimalType )
debug .Assert (leftType .ID () == rightType .ID (), "decimal binary ops should have casted to the same type" )
prec , scale := resolver (leftType .GetPrecision (), leftType .GetScale (),
rightType .GetPrecision (), rightType .GetScale ())
return arrow .NewDecimalType (leftType .ID (), prec , scale )
}
func resolveDecimalAddOrSubtractType(_ *exec .KernelCtx , args []arrow .DataType ) (arrow .DataType , error ) {
return resolveDecimalBinaryOpOutput (args ,
func (prec1 , scale1 , prec2 , scale2 int32 ) (prec int32 , scale int32 ) {
debug .Assert (scale1 == scale2 , "decimal operations should use the same scale" )
scale = scale1
prec = exec .Max (prec1 -scale1 , prec2 -scale2 ) + scale + 1
return
})
}
func resolveDecimalMultiplyOutput(_ *exec .KernelCtx , args []arrow .DataType ) (arrow .DataType , error ) {
return resolveDecimalBinaryOpOutput (args ,
func (prec1 , scale1 , prec2 , scale2 int32 ) (prec int32 , scale int32 ) {
scale = scale1 + scale2
prec = prec1 + prec2 + 1
return
})
}
func resolveDecimalDivideOutput(_ *exec .KernelCtx , args []arrow .DataType ) (arrow .DataType , error ) {
return resolveDecimalBinaryOpOutput (args ,
func (prec1 , scale1 , prec2 , scale2 int32 ) (prec int32 , scale int32 ) {
debug .Assert (scale1 >= scale2 , "when dividing decimal values numerator scale should be greater/equal to denom scale" )
scale = scale1 - scale2
prec = prec1
return
})
}
func resolveTemporalOutput(_ *exec .KernelCtx , args []arrow .DataType ) (arrow .DataType , error ) {
debug .Assert (args [0 ].ID () == args [1 ].ID (), "should only be used on the same types" )
leftType , rightType := args [0 ].(*arrow .TimestampType ), args [1 ].(*arrow .TimestampType )
debug .Assert (leftType .Unit == rightType .Unit , "should match units" )
if (leftType .TimeZone == "" || rightType .TimeZone == "" ) && (leftType .TimeZone != rightType .TimeZone ) {
return nil , fmt .Errorf ("%w: subtraction of zoned and non-zoned times is ambiguous (%s, %s)" ,
arrow .ErrInvalid , leftType .TimeZone , rightType .TimeZone )
}
return &arrow .DurationType {Unit : rightType .Unit }, nil
}
var OutputResolveTemporal = exec .NewComputedOutputType (resolveTemporalOutput )
type validityBuilder struct {
mem memory .Allocator
buffer *memory .Buffer
data []byte
bitLength int
falseCount int
}
func (v *validityBuilder ) Resize (n int64 ) {
if v .buffer == nil {
v .buffer = memory .NewResizableBuffer (v .mem )
}
v .buffer .ResizeNoShrink (int (bitutil .BytesForBits (n )))
v .data = v .buffer .Bytes ()
}
func (v *validityBuilder ) Reserve (n int64 ) {
if v .buffer == nil {
v .buffer = memory .NewResizableBuffer (v .mem )
}
v .buffer .Reserve (v .buffer .Cap () + int (bitutil .BytesForBits (n )))
v .data = v .buffer .Buf ()
}
func (v *validityBuilder ) UnsafeAppend (val bool ) {
bitutil .SetBitTo (v .data , v .bitLength , val )
if !val {
v .falseCount ++
}
v .bitLength ++
}
func (v *validityBuilder ) UnsafeAppendN (n int64 , val bool ) {
bitutil .SetBitsTo (v .data , int64 (v .bitLength ), n , val )
if !val {
v .falseCount += int (n )
}
v .bitLength += int (n )
}
func (v *validityBuilder ) Append (val bool ) {
v .Reserve (1 )
v .UnsafeAppend (val )
}
func (v *validityBuilder ) AppendN (n int64 , val bool ) {
v .Reserve (n )
v .UnsafeAppendN (n , val )
}
func (v *validityBuilder ) Finish () (buf *memory .Buffer ) {
if v .bitLength > 0 {
v .buffer .Resize (int (bitutil .BytesForBits (int64 (v .bitLength ))))
}
v .bitLength , v .falseCount = 0 , 0
buf = v .buffer
v .buffer = nil
return
}
type execBufBuilder struct {
mem memory .Allocator
buffer *memory .Buffer
data []byte
sz int
}
func (bldr *execBufBuilder ) reserve (additional int ) {
if bldr .buffer == nil {
bldr .buffer = memory .NewResizableBuffer (bldr .mem )
}
mincap := bldr .sz + additional
if mincap <= cap (bldr .data ) {
return
}
bldr .buffer .ResizeNoShrink (mincap )
bldr .data = bldr .buffer .Buf ()
}
func (bldr *execBufBuilder ) unsafeAppend (data []byte ) {
copy (bldr .data [bldr .sz :], data )
bldr .sz += len (data )
}
func (bldr *execBufBuilder ) finish () (buf *memory .Buffer ) {
if bldr .buffer == nil {
buf = memory .NewBufferBytes (nil )
return
}
bldr .buffer .Resize (bldr .sz )
buf = bldr .buffer
bldr .buffer , bldr .sz = nil , 0
return
}
type bufferBuilder[T arrow .FixedWidthType ] struct {
execBufBuilder
zero T
}
func newBufferBuilder[T arrow .FixedWidthType ](mem memory .Allocator ) *bufferBuilder [T ] {
return &bufferBuilder [T ]{
execBufBuilder : execBufBuilder {
mem : mem ,
},
}
}
func (b *bufferBuilder [T ]) reserve (additional int ) {
b .execBufBuilder .reserve (additional * int (unsafe .Sizeof (b .zero )))
}
func (b *bufferBuilder [T ]) unsafeAppend (value T ) {
b .execBufBuilder .unsafeAppend (arrow .GetBytes ([]T {value }))
}
func (b *bufferBuilder [T ]) unsafeAppendSlice (values []T ) {
b .execBufBuilder .unsafeAppend (arrow .GetBytes (values ))
}
func (b *bufferBuilder [T ]) len () int { return b .sz / int (unsafe .Sizeof (b .zero )) }
func (b *bufferBuilder [T ]) cap () int {
return cap (b .data ) / int (unsafe .Sizeof (b .zero ))
}
func checkIndexBoundsImpl[T arrow .IntType | arrow .UintType ](values *exec .ArraySpan , upperLimit uint64 ) error {
isSigned := !arrow .IsUnsignedInteger (values .Type .ID ())
if !isSigned && upperLimit > uint64 (MaxOf [T ]()) {
return nil
}
valuesData := exec .GetSpanValues [T ](values , 1 )
bitmap := values .Buffers [0 ].Buf
isOutOfBounds := func (val T ) bool {
return ((isSigned && val < 0 ) || val >= 0 && uint64 (val ) >= upperLimit )
}
return bitutils .VisitSetBitRuns (bitmap , values .Offset , values .Len ,
func (pos , length int64 ) error {
outOfBounds := false
for i := int64 (0 ); i < length ; i ++ {
outOfBounds = outOfBounds || isOutOfBounds (valuesData [pos +i ])
}
if outOfBounds {
for i := int64 (0 ); i < length ; i ++ {
if isOutOfBounds (valuesData [pos +i ]) {
return fmt .Errorf ("%w: %d out of bounds" ,
arrow .ErrIndex , valuesData [pos +i ])
}
}
}
return nil
})
}
func checkIndexBounds(values *exec .ArraySpan , upperLimit uint64 ) error {
switch values .Type .ID () {
case arrow .INT8 :
return checkIndexBoundsImpl [int8 ](values , upperLimit )
case arrow .UINT8 :
return checkIndexBoundsImpl [uint8 ](values , upperLimit )
case arrow .INT16 :
return checkIndexBoundsImpl [int16 ](values , upperLimit )
case arrow .UINT16 :
return checkIndexBoundsImpl [uint16 ](values , upperLimit )
case arrow .INT32 :
return checkIndexBoundsImpl [int32 ](values , upperLimit )
case arrow .UINT32 :
return checkIndexBoundsImpl [uint32 ](values , upperLimit )
case arrow .INT64 :
return checkIndexBoundsImpl [int64 ](values , upperLimit )
case arrow .UINT64 :
return checkIndexBoundsImpl [uint64 ](values , upperLimit )
default :
return fmt .Errorf ("%w: invalid index type for bounds checking" , arrow .ErrInvalid )
}
}
func checkIndexBoundsChunked(values *arrow .Chunked , upperLimit uint64 ) error {
var span exec .ArraySpan
for _ , v := range values .Chunks () {
span .SetMembers (v .Data ())
if err := checkIndexBounds (&span , upperLimit ); err != nil {
return err
}
}
return nil
}
func packBits(vals [32 ]uint32 , out []byte ) {
const batchSize = 32
for i := 0 ; i < batchSize ; i += 8 {
out [0 ] = byte (vals [i ] | vals [i +1 ]<<1 | vals [i +2 ]<<2 | vals [i +3 ]<<3 |
vals [i +4 ]<<4 | vals [i +5 ]<<5 | vals [i +6 ]<<6 | vals [i +7 ]<<7 )
out = out [1 :]
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .