package compute
import (
"context"
"errors"
"fmt"
"math"
"runtime"
"sync"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/compute/exec"
"github.com/apache/arrow-go/v18/arrow/internal"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/scalar"
)
type ExecCtx struct {
ChunkSize int64
PreallocContiguous bool
Registry FunctionRegistry
ExecChannelSize int
NumParallel int
}
type ctxExecKey struct {}
const DefaultMaxChunkSize = math .MaxInt64
var (
defaultExecCtx ExecCtx
WithAllocator = exec .WithAllocator
GetAllocator = exec .GetAllocator
)
func DefaultExecCtx () ExecCtx { return defaultExecCtx }
func init() {
defaultExecCtx .ChunkSize = DefaultMaxChunkSize
defaultExecCtx .PreallocContiguous = true
defaultExecCtx .Registry = GetFunctionRegistry ()
defaultExecCtx .ExecChannelSize = 10
defaultExecCtx .NumParallel = runtime .NumCPU ()
}
func SetExecCtx (ctx context .Context , e ExecCtx ) context .Context {
return context .WithValue (ctx , ctxExecKey {}, e )
}
func GetExecCtx (ctx context .Context ) ExecCtx {
e , ok := ctx .Value (ctxExecKey {}).(ExecCtx )
if ok {
return e
}
return defaultExecCtx
}
type ExecBatch struct {
Values []Datum
Len int64
}
func (e ExecBatch ) NumValues () int { return len (e .Values ) }
type bufferPrealloc struct {
bitWidth int
addLen int
}
func allocateDataBuffer(ctx *exec .KernelCtx , length , bitWidth int ) *memory .Buffer {
switch bitWidth {
case 1 :
return ctx .AllocateBitmap (int64 (length ))
default :
bufsiz := int (bitutil .BytesForBits (int64 (length * bitWidth )))
return ctx .Allocate (bufsiz )
}
}
func addComputeDataPrealloc(dt arrow .DataType , widths []bufferPrealloc ) []bufferPrealloc {
if typ , ok := dt .(arrow .FixedWidthDataType ); ok {
return append (widths , bufferPrealloc {bitWidth : typ .BitWidth ()})
}
switch dt .ID () {
case arrow .BINARY , arrow .STRING , arrow .LIST , arrow .MAP :
return append (widths , bufferPrealloc {bitWidth : 32 , addLen : 1 })
case arrow .LARGE_BINARY , arrow .LARGE_STRING , arrow .LARGE_LIST :
return append (widths , bufferPrealloc {bitWidth : 64 , addLen : 1 })
case arrow .STRING_VIEW , arrow .BINARY_VIEW :
return append (widths , bufferPrealloc {bitWidth : arrow .ViewHeaderSizeBytes * 8 })
}
return widths
}
type nullGeneralization int8
const (
nullGenPerhapsNull nullGeneralization = iota
nullGenAllValid
nullGenAllNull
)
func getNullGen(val *exec .ExecValue ) nullGeneralization {
dtID := val .Type ().ID ()
switch {
case dtID == arrow .NULL :
return nullGenAllNull
case !internal .DefaultHasValidityBitmap (dtID ):
return nullGenAllValid
case val .IsScalar ():
if val .Scalar .IsValid () {
return nullGenAllValid
}
return nullGenAllNull
default :
arr := val .Array
if arr .Nulls == 0 || arr .Buffers [0 ].Buf == nil {
return nullGenAllValid
}
if arr .Nulls == arr .Len {
return nullGenAllNull
}
}
return nullGenPerhapsNull
}
func getNullGenDatum(datum Datum ) nullGeneralization {
var val exec .ExecValue
switch datum .Kind () {
case KindArray :
val .Array .SetMembers (datum .(*ArrayDatum ).Value )
case KindScalar :
val .Scalar = datum .(*ScalarDatum ).Value
case KindChunked :
return nullGenPerhapsNull
default :
debug .Assert (false , "should be array, scalar, or chunked!" )
return nullGenPerhapsNull
}
return getNullGen (&val )
}
func propagateNulls(ctx *exec .KernelCtx , batch *exec .ExecSpan , out *exec .ArraySpan ) (err error ) {
if out .Type .ID () == arrow .NULL {
return
}
if out .Offset != 0 && out .Buffers [0 ].Buf == nil {
return fmt .Errorf ("%w: can only propagate nulls into pre-allocated memory when output offset is non-zero" , arrow .ErrInvalid )
}
var (
arrsWithNulls = make ([]*exec .ArraySpan , 0 , len (batch .Values ))
isAllNull bool
prealloc = out .Buffers [0 ].Buf != nil
)
for i := range batch .Values {
v := &batch .Values [i ]
nullGen := getNullGen (v )
if nullGen == nullGenAllNull {
isAllNull = true
}
if nullGen != nullGenAllValid && v .IsArray () {
arrsWithNulls = append (arrsWithNulls , &v .Array )
}
}
outBitmap := out .Buffers [0 ].Buf
if isAllNull {
out .Nulls = out .Len
if prealloc {
bitutil .SetBitsTo (outBitmap , out .Offset , out .Len , false )
return
}
for _ , arr := range arrsWithNulls {
if arr .Nulls == arr .Len && arr .Buffers [0 ].Owner != nil {
buf := arr .GetBuffer (0 )
buf .Retain ()
out .Buffers [0 ].Buf = buf .Bytes ()
out .Buffers [0 ].Owner = buf
return
}
}
buf := ctx .AllocateBitmap (int64 (out .Len ))
out .Buffers [0 ].Owner = buf
out .Buffers [0 ].Buf = buf .Bytes ()
out .Buffers [0 ].SelfAlloc = true
bitutil .SetBitsTo (out .Buffers [0 ].Buf , out .Offset , out .Len , false )
return
}
out .Nulls = array .UnknownNullCount
switch len (arrsWithNulls ) {
case 0 :
out .Nulls = 0
if prealloc {
bitutil .SetBitsTo (outBitmap , out .Offset , out .Len , true )
}
case 1 :
arr := arrsWithNulls [0 ]
out .Nulls = arr .Nulls
if prealloc {
bitutil .CopyBitmap (arr .Buffers [0 ].Buf , int (arr .Offset ), int (arr .Len ), outBitmap , int (out .Offset ))
return
}
switch {
case arr .Offset == 0 :
out .Buffers [0 ] = arr .Buffers [0 ]
out .Buffers [0 ].Owner .Retain ()
case arr .Offset %8 == 0 :
buf := memory .SliceBuffer (arr .GetBuffer (0 ), int (arr .Offset )/8 , int (bitutil .BytesForBits (arr .Len )))
out .Buffers [0 ].Buf = buf .Bytes ()
out .Buffers [0 ].Owner = buf
default :
buf := ctx .AllocateBitmap (int64 (out .Len ))
out .Buffers [0 ].Owner = buf
out .Buffers [0 ].Buf = buf .Bytes ()
out .Buffers [0 ].SelfAlloc = true
bitutil .CopyBitmap (arr .Buffers [0 ].Buf , int (arr .Offset ), int (arr .Len ), out .Buffers [0 ].Buf , 0 )
}
return
default :
if !prealloc {
buf := ctx .AllocateBitmap (int64 (out .Len ))
out .Buffers [0 ].Owner = buf
out .Buffers [0 ].Buf = buf .Bytes ()
out .Buffers [0 ].SelfAlloc = true
outBitmap = out .Buffers [0 ].Buf
}
acc := func (left , right *exec .ArraySpan ) {
debug .Assert (left .Buffers [0 ].Buf != nil , "invalid intersection for null propagation" )
debug .Assert (right .Buffers [0 ].Buf != nil , "invalid intersection for null propagation" )
bitutil .BitmapAnd (left .Buffers [0 ].Buf , right .Buffers [0 ].Buf , left .Offset , right .Offset , outBitmap , out .Offset , out .Len )
}
acc (arrsWithNulls [0 ], arrsWithNulls [1 ])
for _ , arr := range arrsWithNulls [2 :] {
acc (out , arr )
}
}
return
}
func inferBatchLength(values []Datum ) (length int64 , allSame bool ) {
length , allSame = -1 , true
areAllScalar := true
for _ , arg := range values {
switch arg := arg .(type ) {
case *ArrayDatum :
argLength := arg .Len ()
if length < 0 {
length = argLength
} else {
if length != argLength {
allSame = false
return
}
}
areAllScalar = false
case *ChunkedDatum :
argLength := arg .Len ()
if length < 0 {
length = argLength
} else {
if length != argLength {
allSame = false
return
}
}
areAllScalar = false
}
}
if areAllScalar && len (values ) > 0 {
length = 1
} else if length < 0 {
length = 0
}
allSame = true
return
}
type KernelExecutor interface {
Init (*exec .KernelCtx , exec .KernelInitArgs ) error
Execute (context .Context , *ExecBatch , chan <- Datum ) error
WrapResults (ctx context .Context , out <-chan Datum , chunkedArgs bool ) Datum
CheckResultType (out Datum ) error
Clear ()
}
type nonAggExecImpl struct {
ctx *exec .KernelCtx
ectx ExecCtx
kernel exec .NonAggKernel
outType arrow .DataType
numOutBuf int
dataPrealloc []bufferPrealloc
preallocValidity bool
}
func (e *nonAggExecImpl ) Clear () {
e .ctx , e .kernel , e .outType = nil , nil , nil
if e .dataPrealloc != nil {
e .dataPrealloc = e .dataPrealloc [:0 ]
}
}
func (e *nonAggExecImpl ) Init (ctx *exec .KernelCtx , args exec .KernelInitArgs ) (err error ) {
e .ctx , e .kernel = ctx , args .Kernel .(exec .NonAggKernel )
e .outType , err = e .kernel .GetSig ().OutType .Resolve (ctx , args .Inputs )
e .ectx = GetExecCtx (ctx .Ctx )
return
}
func (e *nonAggExecImpl ) prepareOutput (length int ) *exec .ExecResult {
var nullCount = array .UnknownNullCount
if e .kernel .GetNullHandling () == exec .NullNoOutput {
nullCount = 0
}
output := &exec .ArraySpan {
Type : e .outType ,
Len : int64 (length ),
Nulls : int64 (nullCount ),
}
if e .preallocValidity {
buf := e .ctx .AllocateBitmap (int64 (length ))
output .Buffers [0 ].Owner = buf
output .Buffers [0 ].Buf = buf .Bytes ()
output .Buffers [0 ].SelfAlloc = true
}
for i , pre := range e .dataPrealloc {
if pre .bitWidth >= 0 {
buf := allocateDataBuffer (e .ctx , length +pre .addLen , pre .bitWidth )
output .Buffers [i +1 ].Owner = buf
output .Buffers [i +1 ].Buf = buf .Bytes ()
output .Buffers [i +1 ].SelfAlloc = true
}
}
return output
}
func (e *nonAggExecImpl ) CheckResultType (out Datum ) error {
typ := out .(ArrayLikeDatum ).Type ()
if typ != nil && !arrow .TypeEqual (e .outType , typ ) {
return fmt .Errorf ("%w: kernel type result mismatch: declared as %s, actual is %s" ,
arrow .ErrType , e .outType , typ )
}
return nil
}
type spanIterator func () (exec .ExecSpan , int64 , bool )
func NewScalarExecutor () KernelExecutor { return &scalarExecutor {} }
type scalarExecutor struct {
nonAggExecImpl
elideValidityBitmap bool
preallocAllBufs bool
preallocContiguous bool
allScalars bool
iter spanIterator
iterLen int64
}
func (s *scalarExecutor ) Execute (ctx context .Context , batch *ExecBatch , data chan <- Datum ) (err error ) {
s .allScalars , s .iter , err = iterateExecSpans (batch , s .ectx .ChunkSize , true )
if err != nil {
return
}
s .iterLen = batch .Len
if batch .Len == 0 {
result := array .MakeArrayOfNull (exec .GetAllocator (s .ctx .Ctx ), s .outType , 0 )
defer result .Release ()
out := &exec .ArraySpan {}
out .SetMembers (result .Data ())
return s .emitResult (out , data )
}
if err = s .setupPrealloc (batch .Len , batch .Values ); err != nil {
return
}
return s .executeSpans (data )
}
func (s *scalarExecutor ) WrapResults (ctx context .Context , out <-chan Datum , hasChunked bool ) Datum {
var (
output Datum
acc []arrow .Array
)
toChunked := func () {
acc = output .(ArrayLikeDatum ).Chunks ()
output .Release ()
output = nil
}
select {
case <- ctx .Done ():
return nil
case output = <- out :
if hasChunked {
toChunked ()
}
}
for {
select {
case <- ctx .Done ():
return output
case o , ok := <- out :
if !ok {
if output != nil {
return output
}
for _ , c := range acc {
defer c .Release ()
}
chkd := arrow .NewChunked (s .outType , acc )
defer chkd .Release ()
return NewDatum (chkd )
}
if acc == nil {
toChunked ()
}
defer o .Release ()
if o .Len () == 0 {
continue
}
acc = append (acc , o .(*ArrayDatum ).MakeArray ())
}
}
}
func (s *scalarExecutor ) executeSpans (data chan <- Datum ) (err error ) {
defer func () {
err = errors .Join (err , s .kernel .Cleanup ())
}()
var (
input exec .ExecSpan
output exec .ExecResult
next bool
)
if s .preallocContiguous {
output := s .prepareOutput (int (s .iterLen ))
output .Offset = 0
var resultOffset int64
var nextOffset int64
for err == nil {
if input , nextOffset , next = s .iter (); !next {
break
}
output .SetSlice (resultOffset , input .Len )
err = s .executeSingleSpan (&input , output )
resultOffset = nextOffset
}
if err != nil {
output .Release ()
return
}
if output .Offset != 0 {
output .SetSlice (0 , s .iterLen )
}
return s .emitResult (output , data )
}
for err == nil {
if input , _, next = s .iter (); !next {
break
}
output = *s .prepareOutput (int (input .Len ))
if err = s .executeSingleSpan (&input , &output ); err != nil {
output .Release ()
return
}
err = s .emitResult (&output , data )
}
return
}
func (s *scalarExecutor ) executeSingleSpan (input *exec .ExecSpan , out *exec .ExecResult ) error {
switch {
case out .Type .ID () == arrow .NULL :
out .Nulls = out .Len
case s .kernel .GetNullHandling () == exec .NullIntersection :
if !s .elideValidityBitmap {
propagateNulls (s .ctx , input , out )
}
case s .kernel .GetNullHandling () == exec .NullNoOutput :
out .Nulls = 0
}
return s .kernel .Exec (s .ctx , input , out )
}
func (s *scalarExecutor ) setupPrealloc (_ int64 , args []Datum ) error {
s .numOutBuf = len (s .outType .Layout ().Buffers )
outTypeID := s .outType .ID ()
s .preallocValidity = false
if outTypeID != arrow .NULL {
switch s .kernel .GetNullHandling () {
case exec .NullComputedPrealloc :
s .preallocValidity = true
case exec .NullIntersection :
s .elideValidityBitmap = true
for _ , a := range args {
nullGen := getNullGenDatum (a ) == nullGenAllValid
s .elideValidityBitmap = s .elideValidityBitmap && nullGen
}
s .preallocValidity = !s .elideValidityBitmap
case exec .NullNoOutput :
s .elideValidityBitmap = true
}
}
if s .kernel .GetMemAlloc () == exec .MemPrealloc {
s .dataPrealloc = addComputeDataPrealloc (s .outType , s .dataPrealloc )
}
s .preallocAllBufs =
((s .preallocValidity || s .elideValidityBitmap ) && len (s .dataPrealloc ) == (s .numOutBuf -1 ) &&
!arrow .IsNested (outTypeID ) && outTypeID != arrow .DICTIONARY )
s .preallocContiguous =
(s .ectx .PreallocContiguous && s .kernel .CanFillSlices () &&
s .preallocAllBufs )
return nil
}
func (s *scalarExecutor ) emitResult (resultData *exec .ArraySpan , data chan <- Datum ) error {
var output Datum
if len (resultData .Buffers [0 ].Buf ) != 0 {
resultData .UpdateNullCount ()
}
if s .allScalars {
arr := resultData .MakeArray ()
defer arr .Release ()
sc , err := scalar .GetScalar (arr , 0 )
if err != nil {
return err
}
if r , ok := sc .(scalar .Releasable ); ok {
defer r .Release ()
}
output = NewDatum (sc )
} else {
d := resultData .MakeData ()
defer d .Release ()
output = NewDatum (d )
}
data <- output
return nil
}
func checkAllIsValue(vals []Datum ) error {
for _ , v := range vals {
if !DatumIsValue (v ) {
return fmt .Errorf ("%w: tried executing function with non-value type: %s" ,
arrow .ErrInvalid , v )
}
}
return nil
}
func checkIfAllScalar(batch *ExecBatch ) bool {
for _ , v := range batch .Values {
if v .Kind () != KindScalar {
return false
}
}
return batch .NumValues () > 0
}
func iterateExecSpans(batch *ExecBatch , maxChunkSize int64 , promoteIfAllScalar bool ) (haveAllScalars bool , itr spanIterator , err error ) {
if batch .NumValues () > 0 {
inferred , allArgsSame := inferBatchLength (batch .Values )
if inferred != batch .Len {
return false , nil , fmt .Errorf ("%w: value lengths differed from execbatch length" , arrow .ErrInvalid )
}
if !allArgsSame {
return false , nil , fmt .Errorf ("%w: array args must all be the same length" , arrow .ErrInvalid )
}
}
var (
args = batch .Values
haveChunked bool
chunkIdxes = make ([]int , len (args ))
valuePositions = make ([]int64 , len (args ))
valueOffsets = make ([]int64 , len (args ))
pos , length int64 = 0 , batch .Len
)
haveAllScalars = checkIfAllScalar (batch )
maxChunkSize = exec .Min (length , maxChunkSize )
span := exec .ExecSpan {Values : make ([]exec .ExecValue , len (args )), Len : 0 }
for i , a := range args {
switch arg := a .(type ) {
case *ScalarDatum :
span .Values [i ].Scalar = arg .Value
case *ArrayDatum :
span .Values [i ].Array .SetMembers (arg .Value )
valueOffsets [i ] = int64 (arg .Value .Offset ())
case *ChunkedDatum :
carr := arg .Value
if len (carr .Chunks ()) > 0 {
arr := carr .Chunk (0 ).Data ()
span .Values [i ].Array .SetMembers (arr )
valueOffsets [i ] = int64 (arr .Offset ())
} else {
exec .FillZeroLength (carr .DataType (), &span .Values [i ].Array )
}
haveChunked = true
}
}
if haveAllScalars && promoteIfAllScalar {
exec .PromoteExecSpanScalars (span )
}
nextChunkSpan := func (iterSz int64 , span exec .ExecSpan ) int64 {
for i := 0 ; i < len (args ) && iterSz > 0 ; i ++ {
chunkedArg , ok := args [i ].(*ChunkedDatum )
if !ok {
continue
}
arg := chunkedArg .Value
if len (arg .Chunks ()) == 0 {
iterSz = 0
continue
}
var curChunk arrow .Array
for {
curChunk = arg .Chunk (chunkIdxes [i ])
if valuePositions [i ] == int64 (curChunk .Len ()) {
chunkIdxes [i ]++
curChunk = arg .Chunk (chunkIdxes [i ])
span .Values [i ].Array .SetMembers (curChunk .Data ())
valuePositions [i ] = 0
valueOffsets [i ] = int64 (curChunk .Data ().Offset ())
continue
}
break
}
iterSz = exec .Min (int64 (curChunk .Len ())-valuePositions [i ], iterSz )
}
return iterSz
}
return haveAllScalars , func () (exec .ExecSpan , int64 , bool ) {
if pos == length {
return exec .ExecSpan {}, pos , false
}
iterationSize := exec .Min (length -pos , maxChunkSize )
if haveChunked {
iterationSize = nextChunkSpan (iterationSize , span )
}
span .Len = iterationSize
for i , a := range args {
if a .Kind () != KindScalar {
span .Values [i ].Array .SetSlice (valuePositions [i ]+valueOffsets [i ], iterationSize )
valuePositions [i ] += iterationSize
}
}
pos += iterationSize
debug .Assert (pos <= length , "bad state for iteration exec span" )
return span , pos , true
}, nil
}
var (
scalarExecPool = sync .Pool {
New : func () any { return &scalarExecutor {} },
}
vectorExecPool = sync .Pool {
New : func () any { return &vectorExecutor {} },
}
)
func checkCanExecuteChunked(k *exec .VectorKernel ) error {
if k .ExecChunked == nil {
return fmt .Errorf ("%w: vector kernel cannot execute chunkwise and no chunked exec function defined" , arrow .ErrInvalid )
}
if k .NullHandling == exec .NullIntersection {
return fmt .Errorf ("%w: null pre-propagation is unsupported for chunkedarray execution in vector kernels" , arrow .ErrInvalid )
}
return nil
}
type vectorExecutor struct {
nonAggExecImpl
iter spanIterator
results []*exec .ArraySpan
iterLen int64
allScalars bool
}
func (v *vectorExecutor ) Execute (ctx context .Context , batch *ExecBatch , data chan <- Datum ) (err error ) {
final := v .kernel .(*exec .VectorKernel ).Finalize
if final != nil {
if v .results == nil {
v .results = make ([]*exec .ArraySpan , 0 , 1 )
} else {
v .results = v .results [:0 ]
}
}
hasChunked := haveChunkedArray (batch .Values )
v .numOutBuf = len (v .outType .Layout ().Buffers )
v .preallocValidity = v .kernel .GetNullHandling () != exec .NullComputedNoPrealloc &&
v .kernel .GetNullHandling () != exec .NullNoOutput
if v .kernel .GetMemAlloc () == exec .MemPrealloc {
v .dataPrealloc = addComputeDataPrealloc (v .outType , v .dataPrealloc )
}
if v .kernel .(*exec .VectorKernel ).CanExecuteChunkWise {
v .allScalars , v .iter , err = iterateExecSpans (batch , v .ectx .ChunkSize , true )
v .iterLen = batch .Len
var (
input exec .ExecSpan
next bool
)
if v .iterLen == 0 {
input .Values = make ([]exec .ExecValue , batch .NumValues ())
for i , v := range batch .Values {
exec .FillZeroLength (v .(ArrayLikeDatum ).Type (), &input .Values [i ].Array )
}
err = v .exec (&input , data )
}
for err == nil {
if input , _, next = v .iter (); !next {
break
}
err = v .exec (&input , data )
}
if err != nil {
return
}
} else {
if hasChunked {
if err = v .execChunked (batch , data ); err != nil {
return
}
} else {
span := ExecSpanFromBatch (batch )
if checkIfAllScalar (batch ) {
exec .PromoteExecSpanScalars (*span )
}
if err = v .exec (span , data ); err != nil {
return
}
}
}
if final != nil {
output , err := final (v .ctx , v .results )
if err != nil {
return err
}
for _ , r := range output {
d := r .MakeData ()
defer d .Release ()
data <- NewDatum (d )
}
}
return nil
}
func (v *vectorExecutor ) WrapResults (ctx context .Context , out <-chan Datum , hasChunked bool ) Datum {
if !v .kernel .(*exec .VectorKernel ).OutputChunked {
var output Datum
select {
case <- ctx .Done ():
return nil
case output = <- out :
}
select {
case <- ctx .Done ():
output .Release ()
return nil
case <- out :
return output
}
}
var (
output Datum
acc []arrow .Array
)
toChunked := func () {
out := output .(ArrayLikeDatum ).Chunks ()
acc = make ([]arrow .Array , 0 , len (out ))
for _ , o := range out {
if o .Len () > 0 {
acc = append (acc , o )
}
}
if output .Kind () != KindChunked {
output .Release ()
}
output = nil
}
select {
case <- ctx .Done ():
return nil
case output = <- out :
if output == nil || ctx .Err () != nil {
return nil
}
if hasChunked {
toChunked ()
}
}
for {
select {
case <- ctx .Done ():
return output
case o , ok := <- out :
if !ok {
if output != nil {
return output
}
for _ , c := range acc {
defer c .Release ()
}
chkd := arrow .NewChunked (v .outType , acc )
defer chkd .Release ()
return NewDatum (chkd )
}
if acc == nil {
toChunked ()
}
defer o .Release ()
if o .Len () == 0 {
continue
}
acc = append (acc , o .(*ArrayDatum ).MakeArray ())
}
}
}
func (v *vectorExecutor ) exec (span *exec .ExecSpan , data chan <- Datum ) (err error ) {
out := v .prepareOutput (int (span .Len ))
if v .kernel .GetNullHandling () == exec .NullIntersection {
if err = propagateNulls (v .ctx , span , out ); err != nil {
return
}
}
if err = v .kernel .Exec (v .ctx , span , out ); err != nil {
return
}
return v .emitResult (out , data )
}
func (v *vectorExecutor ) emitResult (result *exec .ArraySpan , data chan <- Datum ) (err error ) {
if v .kernel .(*exec .VectorKernel ).Finalize == nil {
d := result .MakeData ()
defer d .Release ()
data <- NewDatum (d )
} else {
v .results = append (v .results , result )
}
return nil
}
func (v *vectorExecutor ) execChunked (batch *ExecBatch , out chan <- Datum ) error {
if err := checkCanExecuteChunked (v .kernel .(*exec .VectorKernel )); err != nil {
return err
}
output := v .prepareOutput (int (batch .Len ))
input := make ([]*arrow .Chunked , len (batch .Values ))
for i , v := range batch .Values {
switch val := v .(type ) {
case *ArrayDatum :
chks := val .Chunks ()
input [i ] = arrow .NewChunked (val .Type (), chks )
chks [0 ].Release ()
defer input [i ].Release ()
case *ChunkedDatum :
input [i ] = val .Value
default :
return fmt .Errorf ("%w: handling with exec chunked" , arrow .ErrNotImplemented )
}
}
result , err := v .kernel .(*exec .VectorKernel ).ExecChunked (v .ctx , input , output )
if err != nil {
return err
}
if len (result ) == 0 {
empty := output .MakeArray ()
defer empty .Release ()
out <- &ChunkedDatum {Value : arrow .NewChunked (output .Type , []arrow .Array {empty })}
return nil
}
for _ , r := range result {
if err := v .emitResult (r , out ); err != nil {
return err
}
}
return nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .