package kernels
import (
"fmt"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/compute/exec"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/bitutils"
"github.com/apache/arrow-go/v18/internal/hashing"
)
type HashState interface {
Reset () error
Flush (*exec .ExecResult ) error
FlushFinal (out *exec .ExecResult ) error
GetDictionary () (arrow .ArrayData , error )
ValueType () arrow .DataType
Append (*exec .KernelCtx , *exec .ArraySpan ) error
Allocator () memory .Allocator
}
type Action interface {
Reset () error
Reserve (int ) error
Flush (*exec .ExecResult ) error
FlushFinal (*exec .ExecResult ) error
ObserveFound (int )
ObserveNotFound (int ) error
ObserveNullFound (int )
ObserveNullNotFound (int ) error
ShouldEncodeNulls () bool
}
type emptyAction struct {
mem memory .Allocator
dt arrow .DataType
}
func (emptyAction ) Reset () error { return nil }
func (emptyAction ) Reserve (int ) error { return nil }
func (emptyAction ) Flush (*exec .ExecResult ) error { return nil }
func (emptyAction ) FlushFinal (*exec .ExecResult ) error { return nil }
func (emptyAction ) ObserveFound (int ) {}
func (emptyAction ) ObserveNotFound (int ) error { return nil }
func (emptyAction ) ObserveNullFound (int ) {}
func (emptyAction ) ObserveNullNotFound (int ) error { return nil }
func (emptyAction ) ShouldEncodeNulls () bool { return true }
type uniqueAction = emptyAction
type regularHashState struct {
mem memory .Allocator
typ arrow .DataType
memoTable hashing .MemoTable
action Action
doAppend func (Action , hashing .MemoTable , *exec .ArraySpan ) error
}
func (rhs *regularHashState ) Allocator () memory .Allocator { return rhs .mem }
func (rhs *regularHashState ) ValueType () arrow .DataType { return rhs .typ }
func (rhs *regularHashState ) Reset () error {
rhs .memoTable .Reset ()
return rhs .action .Reset ()
}
func (rhs *regularHashState ) Append (_ *exec .KernelCtx , arr *exec .ArraySpan ) error {
if err := rhs .action .Reserve (int (arr .Len )); err != nil {
return err
}
return rhs .doAppend (rhs .action , rhs .memoTable , arr )
}
func (rhs *regularHashState ) Flush (out *exec .ExecResult ) error { return rhs .action .Flush (out ) }
func (rhs *regularHashState ) FlushFinal (out *exec .ExecResult ) error {
return rhs .action .FlushFinal (out )
}
func (rhs *regularHashState ) GetDictionary () (arrow .ArrayData , error ) {
return array .GetDictArrayData (rhs .mem , rhs .typ , rhs .memoTable , 0 )
}
func doAppendBinary[OffsetT int32 | int64 ](action Action , memo hashing .MemoTable , arr *exec .ArraySpan ) error {
var (
bitmap = arr .Buffers [0 ].Buf
offsets = exec .GetSpanOffsets [OffsetT ](arr , 1 )
data = arr .Buffers [2 ].Buf
shouldEncodeNulls = action .ShouldEncodeNulls ()
)
return bitutils .VisitBitBlocksShort (bitmap , arr .Offset , arr .Len ,
func (pos int64 ) error {
v := data [offsets [pos ]:offsets [pos +1 ]]
idx , found , err := memo .GetOrInsert (v )
if err != nil {
return err
}
if found {
action .ObserveFound (idx )
return nil
}
return action .ObserveNotFound (idx )
},
func () error {
if !shouldEncodeNulls {
return action .ObserveNullNotFound (-1 )
}
idx , found := memo .GetOrInsertNull ()
if found {
action .ObserveNullFound (idx )
}
return action .ObserveNullNotFound (idx )
})
}
func doAppendFixedSize(action Action , memo hashing .MemoTable , arr *exec .ArraySpan ) error {
sz := int64 (arr .Type .(arrow .FixedWidthDataType ).Bytes ())
arrData := arr .Buffers [1 ].Buf [arr .Offset *sz :]
shouldEncodeNulls := action .ShouldEncodeNulls ()
return bitutils .VisitBitBlocksShort (arr .Buffers [0 ].Buf , arr .Offset , arr .Len ,
func (pos int64 ) error {
idx , found , err := memo .GetOrInsert (arrData [pos *sz : (pos +1 )*sz ])
if err != nil {
return err
}
if found {
action .ObserveFound (idx )
return nil
}
return action .ObserveNotFound (idx )
}, func () error {
if !shouldEncodeNulls {
return action .ObserveNullNotFound (-1 )
}
idx , found := memo .GetOrInsertNull ()
if found {
action .ObserveNullFound (idx )
}
return action .ObserveNullNotFound (idx )
})
}
func doAppendNumeric[T arrow .IntType | arrow .UintType | arrow .FloatType ](action Action , memo hashing .MemoTable , arr *exec .ArraySpan ) error {
arrData := exec .GetSpanValues [T ](arr , 1 )
shouldEncodeNulls := action .ShouldEncodeNulls ()
return bitutils .VisitBitBlocksShort (arr .Buffers [0 ].Buf , arr .Offset , arr .Len ,
func (pos int64 ) error {
idx , found , err := memo .GetOrInsert (arrData [pos ])
if err != nil {
return err
}
if found {
action .ObserveFound (idx )
return nil
}
return action .ObserveNotFound (idx )
}, func () error {
if !shouldEncodeNulls {
return action .ObserveNullNotFound (-1 )
}
idx , found := memo .GetOrInsertNull ()
if found {
action .ObserveNullFound (idx )
}
return action .ObserveNullNotFound (idx )
})
}
type nullHashState struct {
mem memory .Allocator
typ arrow .DataType
seenNull bool
action Action
}
func (nhs *nullHashState ) Allocator () memory .Allocator { return nhs .mem }
func (nhs *nullHashState ) ValueType () arrow .DataType { return nhs .typ }
func (nhs *nullHashState ) Reset () error {
return nhs .action .Reset ()
}
func (nhs *nullHashState ) Append (_ *exec .KernelCtx , arr *exec .ArraySpan ) (err error ) {
if err := nhs .action .Reserve (int (arr .Len )); err != nil {
return err
}
for i := 0 ; i < int (arr .Len ); i ++ {
if i == 0 {
nhs .seenNull = true
err = nhs .action .ObserveNullNotFound (0 )
} else {
nhs .action .ObserveNullFound (0 )
}
}
return
}
func (nhs *nullHashState ) Flush (out *exec .ExecResult ) error { return nhs .action .Flush (out ) }
func (nhs *nullHashState ) FlushFinal (out *exec .ExecResult ) error {
return nhs .action .FlushFinal (out )
}
func (nhs *nullHashState ) GetDictionary () (arrow .ArrayData , error ) {
var out arrow .Array
if nhs .seenNull {
out = array .NewNull (1 )
} else {
out = array .NewNull (0 )
}
data := out .Data ()
data .Retain ()
out .Release ()
return data , nil
}
type dictionaryHashState struct {
indicesKernel HashState
dictionary arrow .Array
dictValueType arrow .DataType
}
func (dhs *dictionaryHashState ) Allocator () memory .Allocator { return dhs .indicesKernel .Allocator () }
func (dhs *dictionaryHashState ) Reset () error { return dhs .indicesKernel .Reset () }
func (dhs *dictionaryHashState ) Flush (out *exec .ExecResult ) error {
return dhs .indicesKernel .Flush (out )
}
func (dhs *dictionaryHashState ) FlushFinal (out *exec .ExecResult ) error {
return dhs .indicesKernel .FlushFinal (out )
}
func (dhs *dictionaryHashState ) GetDictionary () (arrow .ArrayData , error ) {
return dhs .indicesKernel .GetDictionary ()
}
func (dhs *dictionaryHashState ) ValueType () arrow .DataType { return dhs .indicesKernel .ValueType () }
func (dhs *dictionaryHashState ) DictionaryValueType () arrow .DataType { return dhs .dictValueType }
func (dhs *dictionaryHashState ) Dictionary () arrow .Array { return dhs .dictionary }
func (dhs *dictionaryHashState ) Append (ctx *exec .KernelCtx , arr *exec .ArraySpan ) error {
arrDict := arr .Dictionary ().MakeArray ()
if dhs .dictionary == nil || array .Equal (dhs .dictionary , arrDict ) {
dhs .dictionary = arrDict
return dhs .indicesKernel .Append (ctx , arr )
}
defer arrDict .Release ()
unifier , err := array .NewDictionaryUnifier (dhs .indicesKernel .Allocator (), dhs .dictValueType )
if err != nil {
return err
}
defer unifier .Release ()
if err := unifier .Unify (dhs .dictionary ); err != nil {
return err
}
transposeMap , err := unifier .UnifyAndTranspose (arrDict )
if err != nil {
return err
}
defer transposeMap .Release ()
_ , outDict , err := unifier .GetResult ()
if err != nil {
return err
}
defer func () {
dhs .dictionary .Release ()
dhs .dictionary = outDict
}()
inDict := arr .MakeData ()
defer inDict .Release ()
tmp , err := array .TransposeDictIndices (dhs .Allocator (), inDict , arr .Type , arr .Type , outDict .Data (), arrow .Int32Traits .CastFromBytes (transposeMap .Bytes ()))
if err != nil {
return err
}
defer tmp .Release ()
var tmpSpan exec .ArraySpan
tmpSpan .SetMembers (tmp )
return dhs .indicesKernel .Append (ctx , &tmpSpan )
}
func nullHashInit(actionInit initAction ) exec .KernelInitFn {
return func (ctx *exec .KernelCtx , args exec .KernelInitArgs ) (exec .KernelState , error ) {
mem := exec .GetAllocator (ctx .Ctx )
ret := &nullHashState {
mem : mem ,
typ : args .Inputs [0 ],
action : actionInit (args .Inputs [0 ], args .Options , mem ),
}
ret .Reset ()
return ret , nil
}
}
func newMemoTable(mem memory .Allocator , dt arrow .Type ) (hashing .MemoTable , error ) {
switch dt {
case arrow .INT8 , arrow .UINT8 :
return hashing .NewMemoTable [uint8 ](0 ), nil
case arrow .INT16 , arrow .UINT16 :
return hashing .NewMemoTable [uint16 ](0 ), nil
case arrow .INT32 , arrow .UINT32 , arrow .FLOAT32 , arrow .DECIMAL32 ,
arrow .DATE32 , arrow .TIME32 , arrow .INTERVAL_MONTHS :
return hashing .NewMemoTable [uint32 ](0 ), nil
case arrow .INT64 , arrow .UINT64 , arrow .FLOAT64 , arrow .DECIMAL64 ,
arrow .DATE64 , arrow .TIME64 , arrow .TIMESTAMP ,
arrow .DURATION , arrow .INTERVAL_DAY_TIME :
return hashing .NewMemoTable [uint64 ](0 ), nil
case arrow .BINARY , arrow .STRING , arrow .FIXED_SIZE_BINARY , arrow .DECIMAL128 ,
arrow .DECIMAL256 , arrow .INTERVAL_MONTH_DAY_NANO :
return hashing .NewBinaryMemoTable (0 , 0 ,
array .NewBinaryBuilder (mem , arrow .BinaryTypes .Binary )), nil
case arrow .LARGE_BINARY , arrow .LARGE_STRING :
return hashing .NewBinaryMemoTable (0 , 0 ,
array .NewBinaryBuilder (mem , arrow .BinaryTypes .LargeBinary )), nil
default :
return nil , fmt .Errorf ("%w: unsupported type %s" , arrow .ErrNotImplemented , dt )
}
}
func regularHashInit(dt arrow .DataType , actionInit initAction , appendFn func (Action , hashing .MemoTable , *exec .ArraySpan ) error ) exec .KernelInitFn {
return func (ctx *exec .KernelCtx , args exec .KernelInitArgs ) (exec .KernelState , error ) {
mem := exec .GetAllocator (ctx .Ctx )
memoTable , err := newMemoTable (mem , dt .ID ())
if err != nil {
return nil , err
}
ret := ®ularHashState {
mem : mem ,
typ : args .Inputs [0 ],
memoTable : memoTable ,
action : actionInit (args .Inputs [0 ], args .Options , mem ),
doAppend : appendFn ,
}
ret .Reset ()
return ret , nil
}
}
func dictionaryHashInit(actionInit initAction ) exec .KernelInitFn {
return func (ctx *exec .KernelCtx , args exec .KernelInitArgs ) (exec .KernelState , error ) {
var (
dictType = args .Inputs [0 ].(*arrow .DictionaryType )
indicesHasher exec .KernelState
err error
)
switch dictType .IndexType .ID () {
case arrow .INT8 , arrow .UINT8 :
indicesHasher , err = getHashInit (arrow .UINT8 , actionInit )(ctx , args )
case arrow .INT16 , arrow .UINT16 :
indicesHasher , err = getHashInit (arrow .UINT16 , actionInit )(ctx , args )
case arrow .INT32 , arrow .UINT32 :
indicesHasher , err = getHashInit (arrow .UINT32 , actionInit )(ctx , args )
case arrow .INT64 , arrow .UINT64 :
indicesHasher , err = getHashInit (arrow .UINT64 , actionInit )(ctx , args )
default :
return nil , fmt .Errorf ("%w: unsupported dictionary index type" , arrow .ErrInvalid )
}
if err != nil {
return nil , err
}
return &dictionaryHashState {
indicesKernel : indicesHasher .(HashState ),
dictValueType : dictType .ValueType ,
}, nil
}
}
type initAction func (arrow .DataType , any , memory .Allocator ) Action
func getHashInit(typeID arrow .Type , actionInit initAction ) exec .KernelInitFn {
switch typeID {
case arrow .NULL :
return nullHashInit (actionInit )
case arrow .INT8 , arrow .UINT8 :
return regularHashInit (arrow .PrimitiveTypes .Uint8 , actionInit , doAppendNumeric [uint8 ])
case arrow .INT16 , arrow .UINT16 :
return regularHashInit (arrow .PrimitiveTypes .Uint16 , actionInit , doAppendNumeric [uint16 ])
case arrow .INT32 , arrow .UINT32 , arrow .FLOAT32 ,
arrow .DATE32 , arrow .TIME32 , arrow .INTERVAL_MONTHS :
return regularHashInit (arrow .PrimitiveTypes .Uint32 , actionInit , doAppendNumeric [uint32 ])
case arrow .INT64 , arrow .UINT64 , arrow .FLOAT64 ,
arrow .DATE64 , arrow .TIME64 , arrow .TIMESTAMP ,
arrow .DURATION , arrow .INTERVAL_DAY_TIME :
return regularHashInit (arrow .PrimitiveTypes .Uint64 , actionInit , doAppendNumeric [uint64 ])
case arrow .BINARY , arrow .STRING :
return regularHashInit (arrow .BinaryTypes .Binary , actionInit , doAppendBinary [int32 ])
case arrow .LARGE_BINARY , arrow .LARGE_STRING :
return regularHashInit (arrow .BinaryTypes .LargeBinary , actionInit , doAppendBinary [int64 ])
case arrow .FIXED_SIZE_BINARY , arrow .DECIMAL128 , arrow .DECIMAL256 :
return regularHashInit (arrow .BinaryTypes .Binary , actionInit , doAppendFixedSize )
case arrow .INTERVAL_MONTH_DAY_NANO :
return regularHashInit (arrow .FixedWidthTypes .MonthDayNanoInterval , actionInit , doAppendFixedSize )
default :
debug .Assert (false , "unsupported hash init type" )
return nil
}
}
func hashExec(ctx *exec .KernelCtx , batch *exec .ExecSpan , out *exec .ExecResult ) error {
impl , ok := ctx .State .(HashState )
if !ok {
return fmt .Errorf ("%w: bad initialization of hash state" , arrow .ErrInvalid )
}
if err := impl .Append (ctx , &batch .Values [0 ].Array ); err != nil {
return err
}
return impl .Flush (out )
}
func uniqueFinalize(ctx *exec .KernelCtx , results []*exec .ArraySpan ) ([]*exec .ArraySpan , error ) {
impl , ok := ctx .State .(HashState )
if !ok {
return nil , fmt .Errorf ("%w: HashState in invalid state" , arrow .ErrInvalid )
}
for _ , r := range results {
r .Release ()
}
uniques , err := impl .GetDictionary ()
if err != nil {
return nil , err
}
defer uniques .Release ()
var out exec .ArraySpan
out .TakeOwnership (uniques )
return []*exec .ArraySpan {&out }, nil
}
func ensureHashDictionary(_ *exec .KernelCtx , hash *dictionaryHashState ) (*exec .ArraySpan , error ) {
out := &exec .ArraySpan {}
if hash .dictionary != nil {
out .TakeOwnership (hash .dictionary .Data ())
hash .dictionary .Release ()
return out , nil
}
exec .FillZeroLength (hash .DictionaryValueType (), out )
return out , nil
}
func uniqueFinalizeDictionary(ctx *exec .KernelCtx , result []*exec .ArraySpan ) (out []*exec .ArraySpan , err error ) {
if out , err = uniqueFinalize (ctx , result ); err != nil {
return
}
hash , ok := ctx .State .(*dictionaryHashState )
if !ok {
return nil , fmt .Errorf ("%w: state should be *dictionaryHashState" , arrow .ErrInvalid )
}
dict , err := ensureHashDictionary (ctx , hash )
if err != nil {
return nil , err
}
out [0 ].SetDictionary (dict )
return
}
func addHashKernels(base exec .VectorKernel , actionInit initAction , outTy exec .OutputType ) []exec .VectorKernel {
kernels := make ([]exec .VectorKernel , 0 )
for _ , ty := range primitiveTypes {
base .Init = getHashInit (ty .ID (), actionInit )
base .Signature = &exec .KernelSignature {
InputTypes : []exec .InputType {exec .NewExactInput (ty )},
OutType : outTy ,
}
kernels = append (kernels , base )
}
parametricTypes := []arrow .Type {arrow .TIME32 , arrow .TIME64 , arrow .TIMESTAMP ,
arrow .DURATION , arrow .FIXED_SIZE_BINARY , arrow .DECIMAL128 , arrow .DECIMAL256 ,
arrow .INTERVAL_DAY_TIME , arrow .INTERVAL_MONTHS , arrow .INTERVAL_MONTH_DAY_NANO }
for _ , ty := range parametricTypes {
base .Init = getHashInit (ty , actionInit )
base .Signature = &exec .KernelSignature {
InputTypes : []exec .InputType {exec .NewIDInput (ty )},
OutType : outTy ,
}
kernels = append (kernels , base )
}
return kernels
}
func initUnique(dt arrow .DataType , _ any , mem memory .Allocator ) Action {
return uniqueAction {mem : mem , dt : dt }
}
func GetVectorHashKernels () (unique , valueCounts , dictEncode []exec .VectorKernel ) {
var base exec .VectorKernel
base .ExecFn = hashExec
base .Finalize = uniqueFinalize
base .OutputChunked = false
base .CanExecuteChunkWise = true
unique = addHashKernels (base , initUnique , OutputFirstType )
base .Init = dictionaryHashInit (initUnique )
base .Finalize = uniqueFinalizeDictionary
base .Signature = &exec .KernelSignature {
InputTypes : []exec .InputType {exec .NewIDInput (arrow .DICTIONARY )},
OutType : OutputFirstType ,
}
unique = append (unique , base )
return
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .