package exec
import (
"cmp"
"fmt"
"math"
"slices"
"sync/atomic"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/memory"
)
func GetSpanValues [T arrow .FixedWidthType ](span *ArraySpan , i int ) []T {
if len (span .Buffers [i ].Buf ) == 0 {
return nil
}
ret := unsafe .Slice ((*T )(unsafe .Pointer (&span .Buffers [i ].Buf [0 ])), span .Offset +span .Len )
return ret [span .Offset :]
}
func GetSpanOffsets [T int32 | int64 ](span *ArraySpan , i int ) []T {
ret := unsafe .Slice ((*T )(unsafe .Pointer (&span .Buffers [i ].Buf [0 ])), span .Offset +span .Len +1 )
return ret [span .Offset :]
}
func Min [T cmp .Ordered ](a , b T ) T {
if a < b {
return a
}
return b
}
func Max [T cmp .Ordered ](a , b T ) T {
if a > b {
return a
}
return b
}
func OptionsInit [T any ](_ *KernelCtx , args KernelInitArgs ) (KernelState , error ) {
if opts , ok := args .Options .(*T ); ok {
return *opts , nil
}
return nil , fmt .Errorf ("%w: attempted to initialize kernel state from invalid function options" ,
arrow .ErrInvalid )
}
type arrayBuilder[T arrow .NumericType | bool ] interface {
array .Builder
Append(T )
AppendValues([]T , []bool )
}
func ArrayFromSlice [T arrow .NumericType | bool ](mem memory .Allocator , data []T ) arrow .Array {
bldr := array .NewBuilder (mem , arrow .GetDataType [T ]()).(arrayBuilder [T ])
defer bldr .Release ()
bldr .AppendValues (data , nil )
return bldr .NewArray ()
}
func ArrayFromSliceWithValid [T arrow .NumericType | bool ](mem memory .Allocator , data []T , valid []bool ) arrow .Array {
bldr := array .NewBuilder (mem , arrow .GetDataType [T ]()).(arrayBuilder [T ])
defer bldr .Release ()
bldr .AppendValues (data , valid )
return bldr .NewArray ()
}
func RechunkArraysConsistently (groups [][]arrow .Array ) [][]arrow .Array {
if len (groups ) <= 1 {
return groups
}
var totalLen int
for _ , a := range groups [0 ] {
totalLen += a .Len ()
}
if totalLen == 0 {
return groups
}
rechunked := make ([][]arrow .Array , len (groups ))
offsets := make ([]int64 , len (groups ))
var start int64
for start < int64 (totalLen ) {
var chunkLength int64 = math .MaxInt64
for i , g := range groups {
offset := offsets [i ]
for offset == int64 (g [0 ].Len ()) {
g = g [1 :]
offset = 0
}
arr := g [0 ]
chunkLength = Min (chunkLength , int64 (arr .Len ())-offset )
offsets [i ] = offset
groups [i ] = g
}
for i , g := range groups {
offset := offsets [i ]
arr := g [0 ]
if offset == 0 && int64 (arr .Len ()) == chunkLength {
arr .Retain ()
rechunked [i ] = append (rechunked [i ], arr )
} else {
rechunked [i ] = append (rechunked [i ], array .NewSlice (arr , int64 (offset ), int64 (offset +chunkLength )))
}
offsets [i ] += chunkLength
}
start += int64 (chunkLength )
}
return rechunked
}
type ChunkResolver struct {
offsets []int64
cached atomic .Int64
}
func NewChunkResolver (chunks []arrow .Array ) *ChunkResolver {
offsets := make ([]int64 , len (chunks )+1 )
var offset int64
for i , c := range chunks {
curOffset := offset
offset += int64 (c .Len ())
offsets [i ] = curOffset
}
offsets [len (chunks )] = offset
return &ChunkResolver {offsets : offsets }
}
func (c *ChunkResolver ) Resolve (idx int64 ) (chunk , index int64 ) {
if len (c .offsets ) <= 1 {
return 0 , idx
}
cached := c .cached .Load ()
cacheHit := idx >= c .offsets [cached ] && idx < c .offsets [cached +1 ]
if cacheHit {
return cached , idx - c .offsets [cached ]
}
chkIdx , found := slices .BinarySearch (c .offsets , idx )
if !found {
chkIdx --
}
chunk , index = int64 (chkIdx ), idx -c .offsets [chkIdx ]
c .cached .Store (chunk )
return
}
type arrayTypes interface {
arrow .FixedWidthType | arrow .TemporalType | bool | string | []byte
}
type ArrayIter [T arrayTypes ] interface {
Next () T
}
type BoolIter struct {
Rdr *bitutil .BitmapReader
}
func NewBoolIter (arr *ArraySpan ) ArrayIter [bool ] {
return &BoolIter {
Rdr : bitutil .NewBitmapReader (arr .Buffers [1 ].Buf , int (arr .Offset ), int (arr .Len )),
}
}
func (b *BoolIter ) Next () (out bool ) {
out = b .Rdr .Set ()
b .Rdr .Next ()
return
}
type PrimitiveIter [T arrow .FixedWidthType ] struct {
Values []T
}
func NewPrimitiveIter [T arrow .FixedWidthType ](arr *ArraySpan ) ArrayIter [T ] {
return &PrimitiveIter [T ]{Values : GetSpanValues [T ](arr , 1 )}
}
func (p *PrimitiveIter [T ]) Next () (v T ) {
v = p .Values [0 ]
p .Values = p .Values [1 :]
return
}
type VarBinaryIter [OffsetT int32 | int64 ] struct {
Offsets []OffsetT
Data []byte
Pos int64
}
func NewVarBinaryIter [OffsetT int32 | int64 ](arr *ArraySpan ) ArrayIter [[]byte ] {
return &VarBinaryIter [OffsetT ]{
Offsets : GetSpanOffsets [OffsetT ](arr , 1 ),
Data : arr .Buffers [2 ].Buf ,
}
}
func (v *VarBinaryIter [OffsetT ]) Next () []byte {
cur := v .Pos
v .Pos ++
return v .Data [v .Offsets [cur ]:v .Offsets [v .Pos ]]
}
type FSBIter struct {
Data []byte
Width int
Pos int64
}
func NewFSBIter (arr *ArraySpan ) ArrayIter [[]byte ] {
return &FSBIter {
Data : arr .Buffers [1 ].Buf ,
Width : arr .Type .(arrow .FixedWidthDataType ).Bytes (),
}
}
func (f *FSBIter ) Next () []byte {
start := f .Width * int (f .Pos )
f .Pos ++
return f .Data [start : start +f .Width ]
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .