package exec
import (
"context"
"fmt"
"hash/maphash"
"slices"
"strings"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
)
var hashSeed = maphash .MakeSeed ()
type ctxAllocKey struct {}
func WithAllocator (ctx context .Context , mem memory .Allocator ) context .Context {
return context .WithValue (ctx , ctxAllocKey {}, mem )
}
func GetAllocator (ctx context .Context ) memory .Allocator {
mem , ok := ctx .Value (ctxAllocKey {}).(memory .Allocator )
if !ok {
return memory .DefaultAllocator
}
return mem
}
type Kernel interface {
GetInitFn () KernelInitFn
GetSig () *KernelSignature
}
type NonAggKernel interface {
Kernel
Exec (*KernelCtx , *ExecSpan , *ExecResult ) error
GetNullHandling () NullHandling
GetMemAlloc () MemAlloc
CanFillSlices () bool
Cleanup () error
}
type KernelCtx struct {
Ctx context .Context
Kernel Kernel
State KernelState
}
func (k *KernelCtx ) Allocate (bufsize int ) *memory .Buffer {
buf := memory .NewResizableBuffer (GetAllocator (k .Ctx ))
buf .Resize (bufsize )
return buf
}
func (k *KernelCtx ) AllocateBitmap (nbits int64 ) *memory .Buffer {
nbytes := bitutil .BytesForBits (nbits )
return k .Allocate (int (nbytes ))
}
type TypeMatcher interface {
fmt .Stringer
Matches (typ arrow .DataType ) bool
Equals (other TypeMatcher ) bool
}
type sameTypeIDMatcher struct {
accepted arrow .Type
}
func (s sameTypeIDMatcher ) Matches (typ arrow .DataType ) bool { return s .accepted == typ .ID () }
func (s sameTypeIDMatcher ) Equals (other TypeMatcher ) bool {
if s == other {
return true
}
o , ok := other .(*sameTypeIDMatcher )
if !ok {
return false
}
return s .accepted == o .accepted
}
func (s sameTypeIDMatcher ) String () string {
return "Type::" + s .accepted .String ()
}
func SameTypeID (id arrow .Type ) TypeMatcher { return &sameTypeIDMatcher {id } }
type timeUnitMatcher struct {
id arrow .Type
unit arrow .TimeUnit
}
func (s timeUnitMatcher ) Matches (typ arrow .DataType ) bool {
if typ .ID () != s .id {
return false
}
return s .unit == typ .(arrow .TemporalWithUnit ).TimeUnit ()
}
func (s timeUnitMatcher ) String () string {
return strings .ToLower (s .id .String ()) + "(" + s .unit .String () + ")"
}
func (s *timeUnitMatcher ) Equals (other TypeMatcher ) bool {
if s == other {
return true
}
o , ok := other .(*timeUnitMatcher )
if !ok {
return false
}
return o .id == s .id && o .unit == s .unit
}
func TimestampTypeUnit (unit arrow .TimeUnit ) TypeMatcher {
return &timeUnitMatcher {arrow .TIMESTAMP , unit }
}
func Time32TypeUnit (unit arrow .TimeUnit ) TypeMatcher {
return &timeUnitMatcher {arrow .TIME32 , unit }
}
func Time64TypeUnit (unit arrow .TimeUnit ) TypeMatcher {
return &timeUnitMatcher {arrow .TIME64 , unit }
}
func DurationTypeUnit (unit arrow .TimeUnit ) TypeMatcher {
return &timeUnitMatcher {arrow .DURATION , unit }
}
type integerMatcher struct {}
func (integerMatcher ) String () string { return "integer" }
func (integerMatcher ) Matches (typ arrow .DataType ) bool { return arrow .IsInteger (typ .ID ()) }
func (integerMatcher ) Equals (other TypeMatcher ) bool {
_ , ok := other .(integerMatcher )
return ok
}
type binaryLikeMatcher struct {}
func (binaryLikeMatcher ) String () string { return "binary-like" }
func (binaryLikeMatcher ) Matches (typ arrow .DataType ) bool { return arrow .IsBinaryLike (typ .ID ()) }
func (binaryLikeMatcher ) Equals (other TypeMatcher ) bool {
_ , ok := other .(binaryLikeMatcher )
return ok
}
type largeBinaryLikeMatcher struct {}
func (largeBinaryLikeMatcher ) String () string { return "large-binary-like" }
func (largeBinaryLikeMatcher ) Matches (typ arrow .DataType ) bool {
return arrow .IsLargeBinaryLike (typ .ID ())
}
func (largeBinaryLikeMatcher ) Equals (other TypeMatcher ) bool {
_ , ok := other .(largeBinaryLikeMatcher )
return ok
}
type fsbLikeMatcher struct {}
func (fsbLikeMatcher ) String () string { return "fixed-size-binary-like" }
func (fsbLikeMatcher ) Matches (typ arrow .DataType ) bool { return arrow .IsFixedSizeBinary (typ .ID ()) }
func (fsbLikeMatcher ) Equals (other TypeMatcher ) bool {
_ , ok := other .(fsbLikeMatcher )
return ok
}
func Integer () TypeMatcher { return integerMatcher {} }
func BinaryLike () TypeMatcher { return binaryLikeMatcher {} }
func LargeBinaryLike () TypeMatcher { return largeBinaryLikeMatcher {} }
func FixedSizeBinaryLike () TypeMatcher { return fsbLikeMatcher {} }
type primitiveMatcher struct {}
func (primitiveMatcher ) String () string { return "primitive" }
func (primitiveMatcher ) Matches (typ arrow .DataType ) bool { return arrow .IsPrimitive (typ .ID ()) }
func (primitiveMatcher ) Equals (other TypeMatcher ) bool {
_ , ok := other .(primitiveMatcher )
return ok
}
func Primitive () TypeMatcher { return primitiveMatcher {} }
type reeMatcher struct {
runEndsMatcher TypeMatcher
encodedMatcher TypeMatcher
}
func (r reeMatcher ) Matches (typ arrow .DataType ) bool {
if typ .ID () != arrow .RUN_END_ENCODED {
return false
}
dt := typ .(*arrow .RunEndEncodedType )
return r .runEndsMatcher .Matches (dt .RunEnds ()) && r .encodedMatcher .Matches (dt .Encoded ())
}
func (r reeMatcher ) Equals (other TypeMatcher ) bool {
o , ok := other .(reeMatcher )
if !ok {
return false
}
return r .runEndsMatcher .Equals (o .runEndsMatcher ) && r .encodedMatcher .Equals (o .encodedMatcher )
}
func (r reeMatcher ) String () string {
return "run_end_encoded(run_ends=" + r .runEndsMatcher .String () + ", values=" + r .encodedMatcher .String () + ")"
}
func RunEndEncoded (runEndsMatcher , encodedMatcher TypeMatcher ) TypeMatcher {
return reeMatcher {
runEndsMatcher : runEndsMatcher ,
encodedMatcher : encodedMatcher }
}
type InputKind int8
const (
InputAny InputKind = iota
InputExact
InputUseMatcher
)
type InputType struct {
Kind InputKind
Type arrow .DataType
Matcher TypeMatcher
}
func NewExactInput (dt arrow .DataType ) InputType { return InputType {Kind : InputExact , Type : dt } }
func NewMatchedInput (match TypeMatcher ) InputType {
return InputType {Kind : InputUseMatcher , Matcher : match }
}
func NewIDInput (id arrow .Type ) InputType { return NewMatchedInput (SameTypeID (id )) }
func (it InputType ) MatchID () arrow .Type {
switch it .Kind {
case InputExact :
return it .Type .ID ()
case InputUseMatcher :
if idMatch , ok := it .Matcher .(*sameTypeIDMatcher ); ok {
return idMatch .accepted
}
}
debug .Assert (false , "MatchID called on non-id matching InputType" )
return -1
}
func (it InputType ) String () string {
switch it .Kind {
case InputAny :
return "any"
case InputUseMatcher :
return it .Matcher .String ()
case InputExact :
return it .Type .String ()
}
return ""
}
func (it *InputType ) Equals (other *InputType ) bool {
if it == other {
return true
}
if it .Kind != other .Kind {
return false
}
switch it .Kind {
case InputAny :
return true
case InputExact :
return arrow .TypeEqual (it .Type , other .Type )
case InputUseMatcher :
return it .Matcher .Equals (other .Matcher )
default :
return false
}
}
func (it InputType ) Hash () uint64 {
var h maphash .Hash
h .SetSeed (hashSeed )
result := HashCombine (h .Sum64 (), uint64 (it .Kind ))
switch it .Kind {
case InputExact :
result = HashCombine (result , arrow .HashType (hashSeed , it .Type ))
}
return result
}
func (it InputType ) Matches (dt arrow .DataType ) bool {
switch it .Kind {
case InputExact :
return arrow .TypeEqual (it .Type , dt )
case InputUseMatcher :
return it .Matcher .Matches (dt )
case InputAny :
return true
default :
debug .Assert (false , "invalid InputKind" )
return true
}
}
type ResolveKind int8
const (
ResolveFixed ResolveKind = iota
ResolveComputed
)
type TypeResolver = func (*KernelCtx , []arrow .DataType ) (arrow .DataType , error )
type OutputType struct {
Kind ResolveKind
Type arrow .DataType
Resolver TypeResolver
}
func NewOutputType (dt arrow .DataType ) OutputType {
return OutputType {Kind : ResolveFixed , Type : dt }
}
func NewComputedOutputType (resolver TypeResolver ) OutputType {
return OutputType {Kind : ResolveComputed , Resolver : resolver }
}
func (o OutputType ) String () string {
if o .Kind == ResolveFixed {
return o .Type .String ()
}
return "computed"
}
func (o OutputType ) Resolve (ctx *KernelCtx , types []arrow .DataType ) (arrow .DataType , error ) {
switch o .Kind {
case ResolveFixed :
return o .Type , nil
}
return o .Resolver (ctx , types )
}
type NullHandling int8
const (
NullIntersection NullHandling = iota
NullComputedPrealloc
NullComputedNoPrealloc
NullNoOutput
)
type MemAlloc int8
const (
MemPrealloc MemAlloc = iota
MemNoPrealloc
)
type KernelState any
type KernelInitArgs struct {
Kernel Kernel
Inputs []arrow .DataType
Options any
}
type KernelInitFn = func (*KernelCtx , KernelInitArgs ) (KernelState , error )
type KernelSignature struct {
InputTypes []InputType
OutType OutputType
IsVarArgs bool
hashCode uint64
}
func (k KernelSignature ) String () string {
var b strings .Builder
if k .IsVarArgs {
b .WriteString ("varargs[" )
} else {
b .WriteByte ('(' )
}
for i , t := range k .InputTypes {
if i != 0 {
b .WriteString (", " )
}
b .WriteString (t .String ())
}
if k .IsVarArgs {
b .WriteString ("*]" )
} else {
b .WriteByte (')' )
}
b .WriteString (" -> " )
b .WriteString (k .OutType .String ())
return b .String ()
}
func (k KernelSignature ) Equals (other KernelSignature ) bool {
if k .IsVarArgs != other .IsVarArgs {
return false
}
return slices .EqualFunc (k .InputTypes , other .InputTypes , func (e1 , e2 InputType ) bool {
return e1 .Equals (&e2 )
})
}
func (k *KernelSignature ) Hash () uint64 {
if k .hashCode != 0 {
return k .hashCode
}
var h maphash .Hash
h .SetSeed (hashSeed )
result := h .Sum64 ()
for _ , typ := range k .InputTypes {
result = HashCombine (result , typ .Hash ())
}
k .hashCode = result
return result
}
func (k KernelSignature ) MatchesInputs (types []arrow .DataType ) bool {
switch k .IsVarArgs {
case true :
if len (types ) < (len (k .InputTypes ) - 1 ) {
return false
}
for i , t := range types {
if !k .InputTypes [Min (i , len (k .InputTypes )-1 )].Matches (t ) {
return false
}
}
case false :
if len (types ) != len (k .InputTypes ) {
return false
}
for i , t := range types {
if !k .InputTypes [i ].Matches (t ) {
return false
}
}
}
return true
}
type ArrayKernelExec = func (*KernelCtx , *ExecSpan , *ExecResult ) error
type kernel struct {
Init KernelInitFn
Signature *KernelSignature
Data KernelState
Parallelizable bool
}
func (k kernel ) GetInitFn () KernelInitFn { return k .Init }
func (k kernel ) GetSig () *KernelSignature { return k .Signature }
type ScalarKernel struct {
kernel
ExecFn ArrayKernelExec
CanWriteIntoSlices bool
NullHandling NullHandling
MemAlloc MemAlloc
CleanupFn func (KernelState ) error
}
func NewScalarKernel (in []InputType , out OutputType , exec ArrayKernelExec , init KernelInitFn ) ScalarKernel {
return NewScalarKernelWithSig (&KernelSignature {
InputTypes : in ,
OutType : out ,
}, exec , init )
}
func NewScalarKernelWithSig (sig *KernelSignature , exec ArrayKernelExec , init KernelInitFn ) ScalarKernel {
return ScalarKernel {
kernel : kernel {Signature : sig , Init : init , Parallelizable : true },
ExecFn : exec ,
CanWriteIntoSlices : true ,
NullHandling : NullIntersection ,
MemAlloc : MemPrealloc ,
}
}
func (s *ScalarKernel ) Cleanup () error {
if s .CleanupFn != nil {
return s .CleanupFn (s .Data )
}
return nil
}
func (s *ScalarKernel ) Exec (ctx *KernelCtx , sp *ExecSpan , out *ExecResult ) error {
return s .ExecFn (ctx , sp , out )
}
func (s ScalarKernel ) GetNullHandling () NullHandling { return s .NullHandling }
func (s ScalarKernel ) GetMemAlloc () MemAlloc { return s .MemAlloc }
func (s ScalarKernel ) CanFillSlices () bool { return s .CanWriteIntoSlices }
type ChunkedExec func (*KernelCtx , []*arrow .Chunked , *ExecResult ) ([]*ExecResult , error )
type FinalizeFunc func (*KernelCtx , []*ArraySpan ) ([]*ArraySpan , error )
type VectorKernel struct {
kernel
ExecFn ArrayKernelExec
ExecChunked ChunkedExec
Finalize FinalizeFunc
NullHandling NullHandling
MemAlloc MemAlloc
CanWriteIntoSlices bool
CanExecuteChunkWise bool
OutputChunked bool
}
func NewVectorKernel (inTypes []InputType , outType OutputType , exec ArrayKernelExec , init KernelInitFn ) VectorKernel {
return NewVectorKernelWithSig (&KernelSignature {
InputTypes : inTypes , OutType : outType }, exec , init )
}
func NewVectorKernelWithSig (sig *KernelSignature , exec ArrayKernelExec , init KernelInitFn ) VectorKernel {
return VectorKernel {
kernel : kernel {Signature : sig , Init : init , Parallelizable : true },
ExecFn : exec ,
CanWriteIntoSlices : true ,
CanExecuteChunkWise : true ,
OutputChunked : true ,
NullHandling : NullComputedNoPrealloc ,
MemAlloc : MemNoPrealloc ,
}
}
func (s *VectorKernel ) Exec (ctx *KernelCtx , sp *ExecSpan , out *ExecResult ) error {
return s .ExecFn (ctx , sp , out )
}
func (s VectorKernel ) GetNullHandling () NullHandling { return s .NullHandling }
func (s VectorKernel ) GetMemAlloc () MemAlloc { return s .MemAlloc }
func (s VectorKernel ) CanFillSlices () bool { return s .CanWriteIntoSlices }
func (s VectorKernel ) Cleanup () error { return nil }
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .