package compute
import (
"context"
"fmt"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/compute/exec"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
)
func haveChunkedArray(values []Datum ) bool {
for _ , v := range values {
if v .Kind () == KindChunked {
return true
}
}
return false
}
func ExecSpanFromBatch (batch *ExecBatch ) *exec .ExecSpan {
out := &exec .ExecSpan {Len : batch .Len , Values : make ([]exec .ExecValue , len (batch .Values ))}
for i , v := range batch .Values {
outVal := &out .Values [i ]
if v .Kind () == KindScalar {
outVal .Scalar = v .(*ScalarDatum ).Value
} else {
outVal .Array .SetMembers (v .(*ArrayDatum ).Value )
outVal .Scalar = nil
}
}
return out
}
func execInternal(ctx context .Context , fn Function , opts FunctionOptions , passedLen int64 , args ...Datum ) (result Datum , err error ) {
if opts == nil {
if err = checkOptions (fn , opts ); err != nil {
return
}
opts = fn .DefaultOptions ()
}
if err = checkAllIsValue (args ); err != nil {
return
}
inTypes := make ([]arrow .DataType , len (args ))
for i , a := range args {
inTypes [i ] = a .(ArrayLikeDatum ).Type ()
}
var (
k exec .Kernel
executor KernelExecutor
)
switch fn .Kind () {
case FuncScalar :
executor = scalarExecPool .Get ().(*scalarExecutor )
defer func () {
executor .Clear ()
scalarExecPool .Put (executor .(*scalarExecutor ))
}()
case FuncVector :
executor = vectorExecPool .Get ().(*vectorExecutor )
defer func () {
executor .Clear ()
vectorExecPool .Put (executor .(*vectorExecutor ))
}()
default :
return nil , fmt .Errorf ("%w: direct execution of %s" , arrow .ErrNotImplemented , fn .Kind ())
}
if k , err = fn .DispatchBest (inTypes ...); err != nil {
return
}
var newArgs []Datum
for i , arg := range args {
if !arrow .TypeEqual (inTypes [i ], arg .(ArrayLikeDatum ).Type ()) {
if newArgs == nil {
newArgs = make ([]Datum , len (args ))
copy (newArgs , args )
}
newArgs [i ], err = CastDatum (ctx , arg , SafeCastOptions (inTypes [i ]))
if err != nil {
return nil , err
}
defer newArgs [i ].Release ()
}
}
if newArgs != nil {
args = newArgs
}
kctx := &exec .KernelCtx {Ctx : ctx , Kernel : k }
init := k .GetInitFn ()
kinitArgs := exec .KernelInitArgs {Kernel : k , Inputs : inTypes , Options : opts }
if init != nil {
kctx .State , err = init (kctx , kinitArgs )
if err != nil {
return
}
}
if err = executor .Init (kctx , kinitArgs ); err != nil {
return
}
input := ExecBatch {Values : args , Len : 0 }
if input .NumValues () == 0 {
if passedLen != -1 {
input .Len = passedLen
}
} else {
inferred , allSame := inferBatchLength (input .Values )
input .Len = inferred
switch fn .Kind () {
case FuncScalar :
if passedLen != -1 && passedLen != inferred {
return nil , fmt .Errorf ("%w: passed batch length for execution did not match actual length for scalar fn execution" ,
arrow .ErrInvalid )
}
case FuncVector :
vkernel := k .(*exec .VectorKernel )
if !allSame && vkernel .CanExecuteChunkWise {
return nil , fmt .Errorf ("%w: vector kernel arguments must all be the same length" , arrow .ErrInvalid )
}
}
}
ectx := GetExecCtx (ctx )
ctx , cancel := context .WithCancel (context .Background ())
defer cancel ()
ch := make (chan Datum , ectx .ExecChannelSize )
go func () {
defer close (ch )
if err = executor .Execute (ctx , &input , ch ); err != nil {
cancel ()
}
}()
result = executor .WrapResults (ctx , ch , haveChunkedArray (input .Values ))
if err == nil {
debug .Assert (executor .CheckResultType (result ) == nil , "invalid result type" )
}
if ctx .Err () == context .Canceled && result != nil {
result .Release ()
}
return
}
func CallFunction (ctx context .Context , funcName string , opts FunctionOptions , args ...Datum ) (Datum , error ) {
ectx := GetExecCtx (ctx )
fn , ok := ectx .Registry .GetFunction (funcName )
if !ok {
return nil , fmt .Errorf ("%w: function '%s' not found" , arrow .ErrKey , funcName )
}
return fn .Execute (ctx , opts , args ...)
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .