package exec
import (
"sync/atomic"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/scalar"
)
type BufferSpan struct {
Buf []byte
Owner *memory .Buffer
SelfAlloc bool
}
func (b *BufferSpan ) SetBuffer (buf *memory .Buffer ) {
b .Buf = buf .Bytes ()
b .Owner = buf
b .SelfAlloc = false
}
func (b *BufferSpan ) WrapBuffer (buf *memory .Buffer ) {
b .Buf = buf .Bytes ()
b .Owner = buf
b .SelfAlloc = true
}
type ArraySpan struct {
Type arrow .DataType
Len int64
Nulls int64
Offset int64
Buffers [3 ]BufferSpan
Scratch [2 ]uint64
Children []ArraySpan
}
func (a *ArraySpan ) Release () {
for _ , c := range a .Children {
c .Release ()
}
for _ , b := range a .Buffers {
if b .SelfAlloc {
b .Owner .Release ()
}
}
}
func (a *ArraySpan ) MayHaveNulls () bool {
return atomic .LoadInt64 (&a .Nulls ) != 0 && a .Buffers [0 ].Buf != nil
}
func (a *ArraySpan ) UpdateNullCount () int64 {
curNulls := atomic .LoadInt64 (&a .Nulls )
if curNulls != array .UnknownNullCount {
return curNulls
}
newNulls := a .Len - int64 (bitutil .CountSetBits (a .Buffers [0 ].Buf , int (a .Offset ), int (a .Len )))
atomic .StoreInt64 (&a .Nulls , newNulls )
return newNulls
}
func (a *ArraySpan ) Dictionary () *ArraySpan { return &a .Children [0 ] }
func (a *ArraySpan ) NumBuffers () int { return getNumBuffers (a .Type ) }
func (a *ArraySpan ) MakeData () arrow .ArrayData {
var bufs [3 ]*memory .Buffer
for i := range bufs {
b := a .GetBuffer (i )
bufs [i ] = b
if b != nil && a .Buffers [i ].SelfAlloc {
defer b .Release ()
}
}
var (
nulls = int (atomic .LoadInt64 (&a .Nulls ))
length = int (a .Len )
off = int (a .Offset )
dt = a .Type
children []arrow .ArrayData
)
if a .Type .ID () == arrow .NULL {
nulls = length
} else if len (a .Buffers [0 ].Buf ) == 0 {
nulls = 0
}
if dt .ID () == arrow .EXTENSION {
dt = dt .(arrow .ExtensionType ).StorageType ()
}
if dt .ID () == arrow .DICTIONARY {
result := array .NewData (a .Type , length , bufs [:a .NumBuffers ()], nil , nulls , off )
dict := a .Dictionary ().MakeData ()
defer dict .Release ()
result .SetDictionary (dict )
return result
} else if dt .ID () == arrow .DENSE_UNION || dt .ID () == arrow .SPARSE_UNION {
bufs [0 ] = nil
nulls = 0
}
if len (a .Children ) > 0 {
children = make ([]arrow .ArrayData , len (a .Children ))
for i , c := range a .Children {
d := c .MakeData ()
defer d .Release ()
children [i ] = d
}
}
return array .NewData (a .Type , length , bufs [:a .NumBuffers ()], children , nulls , off )
}
func (a *ArraySpan ) MakeArray () arrow .Array {
d := a .MakeData ()
defer d .Release ()
return array .MakeFromData (d )
}
func (a *ArraySpan ) SetSlice (off , length int64 ) {
if off == a .Offset && length == a .Len {
return
}
if a .Type .ID () != arrow .NULL {
if a .Nulls != 0 {
if a .Nulls == a .Len {
a .Nulls = length
} else {
a .Nulls = array .UnknownNullCount
}
}
} else {
a .Nulls = length
}
a .Offset , a .Len = off , length
}
func (a *ArraySpan ) GetBuffer (idx int ) *memory .Buffer {
buf := a .Buffers [idx ]
switch {
case buf .Owner != nil :
return buf .Owner
case buf .Buf != nil :
return memory .NewBufferBytes (buf .Buf )
}
return nil
}
func (a *ArraySpan ) resizeChildren (i int ) {
if cap (a .Children ) >= i {
a .Children = a .Children [:i ]
} else {
a .Children = make ([]ArraySpan , i )
}
}
func (a *ArraySpan ) FillFromScalar (val scalar .Scalar ) {
var (
trueBit byte = 0x01
falseBit byte = 0x00
)
a .Type = val .DataType ()
a .Len = 1
typeID := a .Type .ID ()
if val .IsValid () {
a .Nulls = 0
} else {
a .Nulls = 1
}
if !arrow .IsUnion (typeID ) && typeID != arrow .NULL {
if val .IsValid () {
a .Buffers [0 ].Buf = []byte {trueBit }
} else {
a .Buffers [0 ].Buf = []byte {falseBit }
}
a .Buffers [0 ].Owner = nil
a .Buffers [0 ].SelfAlloc = false
}
switch {
case typeID == arrow .BOOL :
if val .(*scalar .Boolean ).Value {
a .Buffers [1 ].Buf = []byte {trueBit }
} else {
a .Buffers [1 ].Buf = []byte {falseBit }
}
a .Buffers [1 ].Owner = nil
a .Buffers [1 ].SelfAlloc = false
case arrow .IsPrimitive (typeID ) || arrow .IsDecimal (typeID ):
sc := val .(scalar .PrimitiveScalar )
a .Buffers [1 ].Buf = sc .Data ()
a .Buffers [1 ].Owner = nil
a .Buffers [1 ].SelfAlloc = false
case typeID == arrow .DICTIONARY :
sc := val .(scalar .PrimitiveScalar )
a .Buffers [1 ].Buf = sc .Data ()
a .Buffers [1 ].Owner = nil
a .Buffers [1 ].SelfAlloc = false
a .resizeChildren (1 )
a .Children [0 ].SetMembers (val .(*scalar .Dictionary ).Value .Dict .Data ())
case arrow .IsBaseBinary (typeID ):
sc := val .(scalar .BinaryScalar )
a .Buffers [1 ].Buf = arrow .Uint64Traits .CastToBytes (a .Scratch [:])
a .Buffers [1 ].Owner = nil
a .Buffers [1 ].SelfAlloc = false
var dataBuffer []byte
if sc .IsValid () {
dataBuffer = sc .Data ()
a .Buffers [2 ].Owner = sc .Buffer ()
a .Buffers [2 ].SelfAlloc = false
}
if arrow .IsBinaryLike (typeID ) {
setOffsetsForScalar (a ,
unsafe .Slice ((*int32 )(unsafe .Pointer (&a .Scratch [0 ])), 2 ),
int64 (len (dataBuffer )), 1 )
} else {
setOffsetsForScalar (a ,
unsafe .Slice ((*int64 )(unsafe .Pointer (&a .Scratch [0 ])), 2 ),
int64 (len (dataBuffer )), 1 )
}
a .Buffers [2 ].Buf = dataBuffer
case typeID == arrow .FIXED_SIZE_BINARY :
sc := val .(scalar .BinaryScalar )
if !sc .IsValid () {
a .Buffers [1 ].Buf = make ([]byte , sc .DataType ().(*arrow .FixedSizeBinaryType ).ByteWidth )
a .Buffers [1 ].Owner = nil
a .Buffers [1 ].SelfAlloc = false
break
}
a .Buffers [1 ].Buf = sc .Data ()
a .Buffers [1 ].Owner = sc .Buffer ()
a .Buffers [1 ].SelfAlloc = false
case arrow .IsListLike (typeID ):
sc := val .(scalar .ListScalar )
valueLen := 0
a .resizeChildren (1 )
if sc .GetList () != nil {
a .Children [0 ].SetMembers (sc .GetList ().Data ())
valueLen = sc .GetList ().Len ()
} else {
FillZeroLength (sc .DataType ().(arrow .NestedType ).Fields ()[0 ].Type , &a .Children [0 ])
}
switch typeID {
case arrow .LIST , arrow .MAP :
setOffsetsForScalar (a ,
unsafe .Slice ((*int32 )(unsafe .Pointer (&a .Scratch [0 ])), 2 ),
int64 (valueLen ), 1 )
case arrow .LARGE_LIST :
setOffsetsForScalar (a ,
unsafe .Slice ((*int64 )(unsafe .Pointer (&a .Scratch [0 ])), 2 ),
int64 (valueLen ), 1 )
default :
a .Buffers [1 ].Buf , a .Buffers [1 ].Owner = nil , nil
a .Buffers [1 ].SelfAlloc = false
}
case typeID == arrow .STRUCT :
sc := val .(*scalar .Struct )
a .Buffers [1 ].Buf = nil
a .Buffers [1 ].Owner = nil
a .Buffers [1 ].SelfAlloc = false
a .resizeChildren (len (sc .Value ))
for i , v := range sc .Value {
a .Children [i ].FillFromScalar (v )
}
case arrow .IsUnion (typeID ):
a .Buffers [0 ].Buf , a .Buffers [0 ].Owner = nil , nil
a .Buffers [0 ].SelfAlloc = false
a .Buffers [1 ].Buf = arrow .Uint64Traits .CastToBytes (a .Scratch [:])[:1 ]
a .Buffers [1 ].Owner = nil
a .Buffers [1 ].SelfAlloc = false
codes := unsafe .Slice ((*arrow .UnionTypeCode )(unsafe .Pointer (&a .Buffers [1 ].Buf [0 ])), 1 )
a .resizeChildren (len (a .Type .(arrow .UnionType ).Fields ()))
switch sc := val .(type ) {
case *scalar .DenseUnion :
codes [0 ] = sc .TypeCode
off := unsafe .Slice ((*int32 )(unsafe .Add (unsafe .Pointer (&a .Scratch [0 ]), arrow .Int32SizeBytes )), 2 )
setOffsetsForScalar (a , off , 1 , 2 )
childIDS := a .Type .(arrow .UnionType ).ChildIDs ()
for i , f := range a .Type .(arrow .UnionType ).Fields () {
if i == childIDS [sc .TypeCode ] {
a .Children [i ].FillFromScalar (sc .Value )
} else {
FillZeroLength (f .Type , &a .Children [i ])
}
}
case *scalar .SparseUnion :
codes [0 ] = sc .TypeCode
for i , v := range sc .Value {
a .Children [i ].FillFromScalar (v )
}
}
case typeID == arrow .EXTENSION :
sc := val .(*scalar .Extension )
a .FillFromScalar (sc .Value )
a .Type = val .DataType ()
case typeID == arrow .NULL :
for i := range a .Buffers {
a .Buffers [i ].Buf = nil
a .Buffers [i ].Owner = nil
a .Buffers [i ].SelfAlloc = false
}
}
}
func (a *ArraySpan ) SetDictionary (span *ArraySpan ) {
a .resizeChildren (1 )
a .Children [0 ].Release ()
a .Children [0 ] = *span
}
func (a *ArraySpan ) TakeOwnership (data arrow .ArrayData ) {
a .Type = data .DataType ()
a .Len = int64 (data .Len ())
if a .Type .ID () == arrow .NULL {
a .Nulls = a .Len
} else {
a .Nulls = int64 (data .NullN ())
}
a .Offset = int64 (data .Offset ())
for i , b := range data .Buffers () {
if b != nil {
a .Buffers [i ].WrapBuffer (b )
b .Retain ()
} else {
a .Buffers [i ].Buf = nil
a .Buffers [i ].Owner = nil
a .Buffers [i ].SelfAlloc = false
}
}
typeID := a .Type .ID ()
if a .Buffers [0 ].Buf == nil {
switch typeID {
case arrow .NULL , arrow .SPARSE_UNION , arrow .DENSE_UNION :
default :
a .Nulls = 0
}
}
for i := len (data .Buffers ()); i < 3 ; i ++ {
a .Buffers [i ].Buf = nil
a .Buffers [i ].Owner = nil
a .Buffers [i ].SelfAlloc = false
}
if typeID == arrow .DICTIONARY {
a .resizeChildren (1 )
dict := data .Dictionary ()
if dict != (*array .Data )(nil ) {
a .Children [0 ].TakeOwnership (dict )
}
} else {
a .resizeChildren (len (data .Children ()))
for i , c := range data .Children () {
a .Children [i ].TakeOwnership (c )
}
}
}
func (a *ArraySpan ) SetMembers (data arrow .ArrayData ) {
a .Type = data .DataType ()
a .Len = int64 (data .Len ())
if a .Type .ID () == arrow .NULL {
a .Nulls = a .Len
} else {
a .Nulls = int64 (data .NullN ())
}
a .Offset = int64 (data .Offset ())
for i , b := range data .Buffers () {
if b != nil {
a .Buffers [i ].SetBuffer (b )
} else {
a .Buffers [i ].Buf = nil
a .Buffers [i ].Owner = nil
a .Buffers [i ].SelfAlloc = false
}
}
typeID := a .Type .ID ()
if a .Buffers [0 ].Buf == nil {
switch typeID {
case arrow .NULL , arrow .SPARSE_UNION , arrow .DENSE_UNION :
default :
a .Nulls = 0
}
}
for i := len (data .Buffers ()); i < 3 ; i ++ {
a .Buffers [i ].Buf = nil
a .Buffers [i ].Owner = nil
a .Buffers [i ].SelfAlloc = false
}
if typeID == arrow .DICTIONARY {
a .resizeChildren (1 )
dict := data .Dictionary ()
if dict != (*array .Data )(nil ) {
a .Children [0 ].SetMembers (dict )
}
} else {
if cap (a .Children ) >= len (data .Children ()) {
a .Children = a .Children [:len (data .Children ())]
} else {
a .Children = make ([]ArraySpan , len (data .Children ()))
}
for i , c := range data .Children () {
a .Children [i ].SetMembers (c )
}
}
}
type ExecValue struct {
Array ArraySpan
Scalar scalar .Scalar
}
func (e *ExecValue ) IsArray () bool { return e .Scalar == nil }
func (e *ExecValue ) IsScalar () bool { return !e .IsArray () }
func (e *ExecValue ) Type () arrow .DataType {
if e .IsArray () {
return e .Array .Type
}
return e .Scalar .DataType ()
}
type ExecResult = ArraySpan
type ExecSpan struct {
Len int64
Values []ExecValue
}
func getNumBuffers(dt arrow .DataType ) int {
switch dt .ID () {
case arrow .RUN_END_ENCODED :
return 0
case arrow .NULL , arrow .STRUCT , arrow .FIXED_SIZE_LIST :
return 1
case arrow .BINARY , arrow .LARGE_BINARY , arrow .STRING , arrow .LARGE_STRING , arrow .DENSE_UNION :
return 3
case arrow .EXTENSION :
return getNumBuffers (dt .(arrow .ExtensionType ).StorageType ())
default :
return 2
}
}
func FillZeroLength (dt arrow .DataType , span *ArraySpan ) {
span .Scratch [0 ], span .Scratch [1 ] = 0 , 0
span .Type = dt
span .Len = 0
numBufs := getNumBuffers (dt )
for i := 0 ; i < numBufs ; i ++ {
span .Buffers [i ].Buf = arrow .Uint64Traits .CastToBytes (span .Scratch [:])[:0 ]
span .Buffers [i ].Owner = nil
}
for i := numBufs ; i < 3 ; i ++ {
span .Buffers [i ].Buf , span .Buffers [i ].Owner = nil , nil
}
if dt .ID () == arrow .DICTIONARY {
span .resizeChildren (1 )
FillZeroLength (dt .(*arrow .DictionaryType ).ValueType , &span .Children [0 ])
return
}
nt , ok := dt .(arrow .NestedType )
if !ok {
if len (span .Children ) > 0 {
span .Children = span .Children [:0 ]
}
return
}
span .resizeChildren (nt .NumFields ())
for i , f := range nt .Fields () {
FillZeroLength (f .Type , &span .Children [i ])
}
}
func PromoteExecSpanScalars (span ExecSpan ) {
for i := range span .Values {
if span .Values [i ].Scalar != nil {
span .Values [i ].Array .FillFromScalar (span .Values [i ].Scalar )
span .Values [i ].Scalar = nil
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .