package ipc
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"sync"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/internal"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/internal/dictutils"
"github.com/apache/arrow-go/v18/arrow/internal/flatbuf"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/utils"
)
type streamWriter struct {
w io .Writer
pos int64
}
func (w *streamWriter ) Start () error { return nil }
func (w *streamWriter ) Close () error {
_ , err := w .Write (kEOS [:])
return err
}
func (w *streamWriter ) WritePayload (p Payload ) error {
_ , err := writeIPCPayload (w , p )
if err != nil {
return err
}
return nil
}
func (w *streamWriter ) Write (p []byte ) (int , error ) {
n , err := w .w .Write (p )
w .pos += int64 (n )
return n , err
}
func hasNestedDict(data arrow .ArrayData ) bool {
if data .DataType ().ID () == arrow .DICTIONARY {
return true
}
for _ , c := range data .Children () {
if hasNestedDict (c ) {
return true
}
}
return false
}
type Writer struct {
w io .Writer
mem memory .Allocator
pw PayloadWriter
started bool
schema *arrow .Schema
mapper dictutils .Mapper
codec flatbuf .CompressionType
compressNP int
compressors []compressor
minSpaceSavings *float64
lastWrittenDicts map [int64 ]arrow .Array
emitDictDeltas bool
}
func NewWriterWithPayloadWriter (pw PayloadWriter , opts ...Option ) *Writer {
cfg := newConfig (opts ...)
return &Writer {
mem : cfg .alloc ,
pw : pw ,
schema : cfg .schema ,
codec : cfg .codec ,
compressNP : cfg .compressNP ,
minSpaceSavings : cfg .minSpaceSavings ,
emitDictDeltas : cfg .emitDictDeltas ,
compressors : make ([]compressor , cfg .compressNP ),
}
}
func NewWriter (w io .Writer , opts ...Option ) *Writer {
cfg := newConfig (opts ...)
return &Writer {
w : w ,
mem : cfg .alloc ,
pw : &streamWriter {w : w },
schema : cfg .schema ,
codec : cfg .codec ,
emitDictDeltas : cfg .emitDictDeltas ,
compressNP : cfg .compressNP ,
compressors : make ([]compressor , cfg .compressNP ),
}
}
func (w *Writer ) Close () error {
if !w .started {
err := w .start ()
if err != nil {
return err
}
}
if w .pw == nil {
return nil
}
err := w .pw .Close ()
if err != nil {
return fmt .Errorf ("arrow/ipc: could not close payload writer: %w" , err )
}
w .pw = nil
for _ , d := range w .lastWrittenDicts {
d .Release ()
}
return nil
}
func (w *Writer ) Write (rec arrow .RecordBatch ) (err error ) {
defer func () {
if pErr := recover (); pErr != nil {
err = utils .FormatRecoveredError ("arrow/ipc: unknown error while writing" , pErr )
}
}()
incomingSchema := rec .Schema ()
if !w .started {
if w .schema == nil {
w .schema = incomingSchema
}
err := w .start ()
if err != nil {
return err
}
}
if incomingSchema == nil || !incomingSchema .Equal (w .schema ) {
return errInconsistentSchema
}
const allow64b = true
var (
data = Payload {msg : MessageRecordBatch }
enc = newRecordEncoder (
w .mem ,
0 ,
kMaxNestingDepth ,
allow64b ,
w .codec ,
w .compressNP ,
w .minSpaceSavings ,
w .compressors ,
)
)
defer data .Release ()
err = writeDictionaryPayloads (w .mem , rec , false , w .emitDictDeltas , &w .mapper , w .lastWrittenDicts , w .pw , enc )
if err != nil {
return fmt .Errorf ("arrow/ipc: failure writing dictionary batches: %w" , err )
}
enc .reset ()
if err := enc .Encode (&data , rec ); err != nil {
return fmt .Errorf ("arrow/ipc: could not encode record to payload: %w" , err )
}
return w .pw .WritePayload (data )
}
func writeDictionaryPayloads(mem memory .Allocator , batch arrow .RecordBatch , isFileFormat bool , emitDictDeltas bool , mapper *dictutils .Mapper , lastWrittenDicts map [int64 ]arrow .Array , pw PayloadWriter , encoder *recordEncoder ) error {
dictionaries , err := dictutils .CollectDictionaries (batch , mapper )
if err != nil {
return err
}
defer func () {
for _ , d := range dictionaries {
d .Dict .Release ()
}
}()
eqopt := array .WithNaNsEqual (true )
for _ , pair := range dictionaries {
encoder .reset ()
var (
deltaStart int64
enc = dictEncoder {encoder }
)
lastDict , exists := lastWrittenDicts [pair .ID ]
if exists {
if lastDict .Data () == pair .Dict .Data () {
continue
}
newLen , lastLen := pair .Dict .Len (), lastDict .Len ()
if lastLen == newLen && array .ApproxEqual (lastDict , pair .Dict , eqopt ) {
continue
}
if isFileFormat {
return errors .New ("arrow/ipc: Dictionary replacement detected when writing IPC file format. Arrow IPC File only supports single dictionary per field" )
}
if newLen > lastLen &&
emitDictDeltas &&
!hasNestedDict (pair .Dict .Data ()) &&
(array .SliceApproxEqual (lastDict , 0 , int64 (lastLen ), pair .Dict , 0 , int64 (lastLen ), eqopt )) {
deltaStart = int64 (lastLen )
}
}
var data = Payload {msg : MessageDictionaryBatch }
defer data .Release ()
dict := pair .Dict
if deltaStart > 0 {
dict = array .NewSlice (dict , deltaStart , int64 (dict .Len ()))
defer dict .Release ()
}
if err := enc .Encode (&data , pair .ID , deltaStart > 0 , dict ); err != nil {
return err
}
if err := pw .WritePayload (data ); err != nil {
return err
}
lastWrittenDicts [pair .ID ] = pair .Dict
if lastDict != nil {
lastDict .Release ()
}
pair .Dict .Retain ()
}
return nil
}
func (w *Writer ) start () error {
w .started = true
w .mapper .ImportSchema (w .schema )
w .lastWrittenDicts = make (map [int64 ]arrow .Array )
ps := payloadFromSchema (w .schema , w .mem , &w .mapper )
defer ps .Release ()
for _ , data := range ps {
err := w .pw .WritePayload (data )
if err != nil {
return err
}
}
return nil
}
type dictEncoder struct {
*recordEncoder
}
func (d *dictEncoder ) encodeMetadata (p *Payload , isDelta bool , id , nrows int64 ) error {
p .meta = writeDictionaryMessage (d .mem , id , isDelta , nrows , p .size , d .fields , d .meta , d .codec , d .variadicCounts )
return nil
}
func (d *dictEncoder ) Encode (p *Payload , id int64 , isDelta bool , dict arrow .Array ) error {
d .start = 0
defer func () {
d .start = 0
}()
schema := arrow .NewSchema ([]arrow .Field {{Name : "dictionary" , Type : dict .DataType (), Nullable : true }}, nil )
batch := array .NewRecord (schema , []arrow .Array {dict }, int64 (dict .Len ()))
defer batch .Release ()
if err := d .encode (p , batch ); err != nil {
return err
}
return d .encodeMetadata (p , isDelta , id , batch .NumRows ())
}
type recordEncoder struct {
mem memory .Allocator
fields []fieldMetadata
meta []bufferMetadata
variadicCounts []int64
depth int64
start int64
allow64b bool
codec flatbuf .CompressionType
compressNP int
compressors []compressor
minSpaceSavings *float64
}
func newRecordEncoder(
mem memory .Allocator ,
startOffset ,
maxDepth int64 ,
allow64b bool ,
codec flatbuf .CompressionType ,
compressNP int ,
minSpaceSavings *float64 ,
compressors []compressor ,
) *recordEncoder {
return &recordEncoder {
mem : mem ,
start : startOffset ,
depth : maxDepth ,
allow64b : allow64b ,
codec : codec ,
compressNP : compressNP ,
compressors : compressors ,
minSpaceSavings : minSpaceSavings ,
}
}
func (w *recordEncoder ) shouldCompress (uncompressed , compressed int ) bool {
debug .Assert (uncompressed > 0 , "uncompressed size is 0" )
if w .minSpaceSavings == nil {
return true
}
savings := 1.0 - float64 (compressed )/float64 (uncompressed )
return savings >= *w .minSpaceSavings
}
func (w *recordEncoder ) reset () {
w .start = 0
w .fields = make ([]fieldMetadata , 0 )
}
func (w *recordEncoder ) getCompressor (id int ) compressor {
if w .compressors [id ] == nil {
w .compressors [id ] = getCompressor (w .codec )
}
return w .compressors [id ]
}
func (w *recordEncoder ) compressBodyBuffers (p *Payload ) error {
compress := func (idx int , codec compressor ) error {
if p .body [idx ] == nil || p .body [idx ].Len () == 0 {
return nil
}
buf := memory .NewResizableBuffer (w .mem )
buf .Reserve (codec .MaxCompressedLen (p .body [idx ].Len ()) + arrow .Int64SizeBytes )
binary .LittleEndian .PutUint64 (buf .Buf (), uint64 (p .body [idx ].Len ()))
bw := &bufferWriter {buf : buf , pos : arrow .Int64SizeBytes }
codec .Reset (bw )
n , err := codec .Write (p .body [idx ].Bytes ())
if err != nil {
return err
}
if err := codec .Close (); err != nil {
return err
}
finalLen := bw .pos
compressedLen := bw .pos - arrow .Int64SizeBytes
if !w .shouldCompress (n , compressedLen ) {
n = copy (buf .Buf ()[arrow .Int64SizeBytes :], p .body [idx ].Bytes ())
var noprefix int64 = -1
binary .LittleEndian .PutUint64 (buf .Buf (), uint64 (noprefix ))
finalLen = n + arrow .Int64SizeBytes
}
bw .buf .Resize (finalLen )
p .body [idx ].Release ()
p .body [idx ] = buf
return nil
}
if w .compressNP <= 1 {
codec := w .getCompressor (0 )
for idx := range p .body {
if err := compress (idx , codec ); err != nil {
return err
}
}
return nil
}
var (
wg sync .WaitGroup
ch = make (chan int )
errch = make (chan error )
ctx , cancel = context .WithCancel (context .Background ())
)
defer cancel ()
for workerID := 0 ; workerID < w .compressNP ; workerID ++ {
wg .Add (1 )
go func (id int ) {
defer wg .Done ()
codec := w .getCompressor (id )
for {
select {
case idx , ok := <- ch :
if !ok {
return
}
if err := compress (idx , codec ); err != nil {
errch <- err
cancel ()
return
}
case <- ctx .Done ():
return
}
}
}(workerID )
}
for idx := range p .body {
ch <- idx
}
close (ch )
wg .Wait ()
close (errch )
return <-errch
}
func (w *recordEncoder ) encode (p *Payload , rec arrow .RecordBatch ) error {
for i , col := range rec .Columns () {
err := w .visit (p , col )
if err != nil {
return fmt .Errorf ("arrow/ipc: could not encode column %d (%q): %w" , i , rec .ColumnName (i ), err )
}
}
if w .codec != -1 {
if w .minSpaceSavings != nil {
pct := *w .minSpaceSavings
if pct < 0 || pct > 1 {
p .Release ()
return fmt .Errorf ("%w: minSpaceSavings not in range [0,1]. Provided %.05f" ,
arrow .ErrInvalid , pct )
}
}
w .compressBodyBuffers (p )
}
offset := w .start
w .meta = make ([]bufferMetadata , len (p .body ))
for i , buf := range p .body {
var (
size int64
padding int64
)
if buf != nil {
size = int64 (buf .Len ())
padding = bitutil .CeilByte64 (size ) - size
}
w .meta [i ] = bufferMetadata {
Offset : offset ,
Len : size ,
}
offset += size + padding
}
p .size = offset - w .start
if !bitutil .IsMultipleOf8 (p .size ) {
panic ("not aligned" )
}
return nil
}
func (w *recordEncoder ) visit (p *Payload , arr arrow .Array ) error {
if w .depth <= 0 {
return errMaxRecursion
}
if !w .allow64b && arr .Len () > math .MaxInt32 {
return errBigArray
}
if arr .DataType ().ID () == arrow .EXTENSION {
arr := arr .(array .ExtensionArray )
err := w .visit (p , arr .Storage ())
if err != nil {
return fmt .Errorf ("failed visiting storage of for array %T: %w" , arr , err )
}
return nil
}
if arr .DataType ().ID () == arrow .DICTIONARY {
arr := arr .(*array .Dictionary )
return w .visit (p , arr .Indices ())
}
w .fields = append (w .fields , fieldMetadata {
Len : int64 (arr .Len ()),
Nulls : int64 (arr .NullN ()),
Offset : 0 ,
})
if arr .DataType ().ID () == arrow .NULL {
return nil
}
if internal .HasValidityBitmap (arr .DataType ().ID (), flatbuf .MetadataVersion (currentMetadataVersion )) {
switch arr .NullN () {
case 0 :
p .body = append (p .body , nil )
default :
data := arr .Data ()
var bitmap *memory .Buffer
if data .NullN () == data .Len () {
bitmap = memory .NewResizableBuffer (w .mem )
minLength := paddedLength (bitutil .BytesForBits (int64 (data .Len ())), kArrowAlignment )
bitmap .Resize (int (minLength ))
} else {
bitmap = newTruncatedBitmap (w .mem , int64 (data .Offset ()), int64 (data .Len ()), data .Buffers ()[0 ])
}
p .body = append (p .body , bitmap )
}
}
switch dtype := arr .DataType ().(type ) {
case *arrow .NullType :
case *arrow .BooleanType :
var (
data = arr .Data ()
bitm *memory .Buffer
)
if data .Len () != 0 {
bitm = newTruncatedBitmap (w .mem , int64 (data .Offset ()), int64 (data .Len ()), data .Buffers ()[1 ])
}
p .body = append (p .body , bitm )
case arrow .FixedWidthDataType :
data := arr .Data ()
values := data .Buffers ()[1 ]
arrLen := int64 (arr .Len ())
typeWidth := int64 (dtype .BitWidth () / 8 )
minLength := paddedLength (arrLen *typeWidth , kArrowAlignment )
switch {
case needTruncate (int64 (data .Offset ()), values , minLength ):
offset := int64 (data .Offset ()) * typeWidth
len := min (bitutil .CeilByte64 (arrLen *typeWidth ), int64 (values .Len ())-offset )
values = memory .NewBufferBytes (values .Bytes ()[offset : offset +len ])
default :
if values != nil {
values .Retain ()
}
}
p .body = append (p .body , values )
case *arrow .BinaryType , *arrow .LargeBinaryType , *arrow .StringType , *arrow .LargeStringType :
arr := arr .(array .BinaryLike )
voffsets := w .getZeroBasedValueOffsets (arr )
data := arr .Data ()
values := data .Buffers ()[2 ]
var totalDataBytes int64
if voffsets != nil {
totalDataBytes = int64 (len (arr .ValueBytes ()))
}
switch {
case needTruncate (int64 (data .Offset ()), values , totalDataBytes ):
var (
beg int64 = 0
len = min (paddedLength (totalDataBytes , kArrowAlignment ), int64 (totalDataBytes ))
)
if arr .Len () > 0 {
beg = arr .ValueOffset64 (0 )
}
values = memory .NewBufferBytes (data .Buffers ()[2 ].Bytes ()[beg : beg +len ])
default :
if values != nil {
values .Retain ()
}
}
p .body = append (p .body , voffsets )
p .body = append (p .body , values )
case arrow .BinaryViewDataType :
data := arr .Data ()
values := data .Buffers ()[1 ]
arrLen := int64 (arr .Len ())
typeWidth := int64 (arrow .ViewHeaderSizeBytes )
minLength := paddedLength (arrLen *typeWidth , kArrowAlignment )
switch {
case needTruncate (int64 (data .Offset ()), values , minLength ):
offset := data .Offset () * int (typeWidth )
len := int (min (bitutil .CeilByte64 (arrLen *typeWidth ), int64 (values .Len ()-offset )))
values = memory .SliceBuffer (values , offset , len )
default :
if values != nil {
values .Retain ()
}
}
p .body = append (p .body , values )
w .variadicCounts = append (w .variadicCounts , int64 (len (data .Buffers ())-2 ))
for _ , b := range data .Buffers ()[2 :] {
b .Retain ()
p .body = append (p .body , b )
}
case *arrow .StructType :
w .depth --
arr := arr .(*array .Struct )
for i := 0 ; i < arr .NumField (); i ++ {
err := w .visit (p , arr .Field (i ))
if err != nil {
return fmt .Errorf ("could not visit field %d of struct-array: %w" , i , err )
}
}
w .depth ++
case *arrow .SparseUnionType :
offset , length := arr .Data ().Offset (), arr .Len ()
arr := arr .(*array .SparseUnion )
typeCodes := getTruncatedBuffer (int64 (offset ), int64 (length ), int32 (unsafe .Sizeof (arrow .UnionTypeCode (0 ))), arr .TypeCodes ())
p .body = append (p .body , typeCodes )
w .depth --
for i := 0 ; i < arr .NumFields (); i ++ {
err := w .visit (p , arr .Field (i ))
if err != nil {
return fmt .Errorf ("could not visit field %d of sparse union array: %w" , i , err )
}
}
w .depth ++
case *arrow .DenseUnionType :
offset , length := arr .Data ().Offset (), arr .Len ()
arr := arr .(*array .DenseUnion )
typeCodes := getTruncatedBuffer (int64 (offset ), int64 (length ), int32 (unsafe .Sizeof (arrow .UnionTypeCode (0 ))), arr .TypeCodes ())
p .body = append (p .body , typeCodes )
w .depth --
dt := arr .UnionType ()
maxCode := dt .MaxTypeCode ()
offsets := make ([]int32 , maxCode +1 )
lengths := make ([]int32 , maxCode +1 )
offsets [0 ], lengths [0 ] = -1 , 0
for i := 1 ; i < len (offsets ); i *= 2 {
copy (offsets [i :], offsets [:i ])
copy (lengths [i :], lengths [:i ])
}
var valueOffsets *memory .Buffer
if offset != 0 {
valueOffsets = w .rebaseDenseUnionValueOffsets (arr , offsets , lengths )
} else {
valueOffsets = getTruncatedBuffer (int64 (offset ), int64 (length ), int32 (arrow .Int32SizeBytes ), arr .ValueOffsets ())
}
p .body = append (p .body , valueOffsets )
for i := range dt .Fields () {
child := arr .Field (i )
if offset != 0 {
code := dt .TypeCodes ()[i ]
childOffset := offsets [code ]
childLen := lengths [code ]
if childOffset > 0 {
child = array .NewSlice (child , int64 (childOffset ), int64 (childOffset +childLen ))
defer child .Release ()
} else if childLen < int32 (child .Len ()) {
child = array .NewSlice (child , 0 , int64 (childLen ))
defer child .Release ()
}
}
if err := w .visit (p , child ); err != nil {
return fmt .Errorf ("could not visit field %d of dense union array: %w" , i , err )
}
}
w .depth ++
case *arrow .MapType , *arrow .ListType , *arrow .LargeListType :
arr := arr .(array .ListLike )
voffsets := w .getZeroBasedValueOffsets (arr )
p .body = append (p .body , voffsets )
w .depth --
var (
values = arr .ListValues ()
mustRelease = false
values_offset int64
values_end int64
)
defer func () {
if mustRelease {
values .Release ()
}
}()
if arr .Len () > 0 && voffsets != nil {
values_offset , _ = arr .ValueOffsets (0 )
_, values_end = arr .ValueOffsets (arr .Len () - 1 )
}
if arr .Len () != 0 || values_end < int64 (values .Len ()) {
values = array .NewSlice (values , values_offset , values_end )
mustRelease = true
}
err := w .visit (p , values )
if err != nil {
return fmt .Errorf ("could not visit list element for array %T: %w" , arr , err )
}
w .depth ++
case *arrow .ListViewType , *arrow .LargeListViewType :
arr := arr .(array .VarLenListLike )
voffsets , minOffset , maxEnd := w .getZeroBasedListViewOffsets (arr )
vsizes := w .getListViewSizes (arr )
p .body = append (p .body , voffsets )
p .body = append (p .body , vsizes )
w .depth --
var (
values = arr .ListValues ()
)
if minOffset != 0 || maxEnd < int64 (values .Len ()) {
values = array .NewSlice (values , minOffset , maxEnd )
defer values .Release ()
}
err := w .visit (p , values )
if err != nil {
return fmt .Errorf ("could not visit list element for array %T: %w" , arr , err )
}
w .depth ++
case *arrow .FixedSizeListType :
arr := arr .(*array .FixedSizeList )
w .depth --
size := int64 (arr .DataType ().(*arrow .FixedSizeListType ).Len ())
beg := int64 (arr .Offset ()) * size
end := int64 (arr .Offset ()+arr .Len ()) * size
values := array .NewSlice (arr .ListValues (), beg , end )
defer values .Release ()
err := w .visit (p , values )
if err != nil {
return fmt .Errorf ("could not visit list element for array %T: %w" , arr , err )
}
w .depth ++
case *arrow .RunEndEncodedType :
arr := arr .(*array .RunEndEncoded )
w .depth --
child := arr .LogicalRunEndsArray (w .mem )
defer child .Release ()
if err := w .visit (p , child ); err != nil {
return err
}
child = arr .LogicalValuesArray ()
defer child .Release ()
if err := w .visit (p , child ); err != nil {
return err
}
w .depth ++
default :
panic (fmt .Errorf ("arrow/ipc: unknown array %T (dtype=%T)" , arr , dtype ))
}
return nil
}
func (w *recordEncoder ) getZeroBasedValueOffsets (arr arrow .Array ) *memory .Buffer {
data := arr .Data ()
voffsets := data .Buffers ()[1 ]
offsetTraits := arr .DataType ().(arrow .OffsetsDataType ).OffsetTypeTraits ()
offsetBytesNeeded := offsetTraits .BytesRequired (data .Len () + 1 )
if voffsets == nil || voffsets .Len () == 0 {
return nil
}
dataTypeWidth := arr .DataType ().Layout ().Buffers [1 ].ByteWidth
hasNonZeroOffset := data .Offset () != 0
hasMoreOffsetsThanValues := offsetBytesNeeded < voffsets .Len ()
var firstOffset int64
if dataTypeWidth == 8 {
firstOffset = arrow .Int64Traits .CastFromBytes (voffsets .Bytes ())[0 ]
} else {
firstOffset = int64 (arrow .Int32Traits .CastFromBytes (voffsets .Bytes ())[0 ])
}
offsetsDoNotStartFromZero := firstOffset != 0
needsTruncateAndShift := hasNonZeroOffset || hasMoreOffsetsThanValues || offsetsDoNotStartFromZero
if needsTruncateAndShift {
shiftedOffsets := memory .NewResizableBuffer (w .mem )
shiftedOffsets .Resize (offsetBytesNeeded )
switch dataTypeWidth {
case 8 :
dest := arrow .Int64Traits .CastFromBytes (shiftedOffsets .Bytes ())
offsets := arrow .Int64Traits .CastFromBytes (voffsets .Bytes ())[data .Offset () : data .Offset ()+data .Len ()+1 ]
startOffset := offsets [0 ]
for i , o := range offsets {
dest [i ] = o - startOffset
}
default :
debug .Assert (arr .DataType ().Layout ().Buffers [1 ].ByteWidth == 4 , "invalid offset bytewidth" )
dest := arrow .Int32Traits .CastFromBytes (shiftedOffsets .Bytes ())
offsets := arrow .Int32Traits .CastFromBytes (voffsets .Bytes ())[data .Offset () : data .Offset ()+data .Len ()+1 ]
startOffset := offsets [0 ]
for i , o := range offsets {
dest [i ] = o - startOffset
}
}
voffsets = shiftedOffsets
} else {
voffsets .Retain ()
}
return voffsets
}
func getZeroBasedListViewOffsets[OffsetT int32 | int64 ](mem memory .Allocator , arr array .VarLenListLike ) (valueOffsets *memory .Buffer , minOffset , maxEnd OffsetT ) {
requiredBytes := int (unsafe .Sizeof (minOffset )) * arr .Len ()
if arr .Data ().Offset () == 0 {
minOffset , maxEnd = 0 , OffsetT (arr .ListValues ().Len ())
valueOffsets = arr .Data ().Buffers ()[1 ]
if valueOffsets .Len () > requiredBytes {
valueOffsets = memory .SliceBuffer (valueOffsets , 0 , requiredBytes )
} else {
valueOffsets .Retain ()
}
return
}
valueOffsets = memory .NewResizableBuffer (mem )
valueOffsets .Resize (requiredBytes )
if arr .Len () > 0 {
minOffset = (^OffsetT (0 )) << ((8 * unsafe .Sizeof (minOffset )) - 1 )
for i := 0 ; i < arr .Len (); i ++ {
start , end := arr .ValueOffsets (i )
minOffset = utils .Min (minOffset , OffsetT (start ))
maxEnd = utils .Max (maxEnd , OffsetT (end ))
}
}
offsets := arrow .GetData [OffsetT ](arr .Data ().Buffers ()[1 ].Bytes ())[arr .Data ().Offset ():]
destOffset := arrow .GetData [OffsetT ](valueOffsets .Bytes ())
for i := 0 ; i < arr .Len (); i ++ {
destOffset [i ] = offsets [i ] - minOffset
}
return
}
func getListViewSizes[OffsetT int32 | int64 ](arr array .VarLenListLike ) *memory .Buffer {
var z OffsetT
requiredBytes := int (unsafe .Sizeof (z )) * arr .Len ()
sizes := arr .Data ().Buffers ()[2 ]
if arr .Data ().Offset () != 0 || sizes .Len () > requiredBytes {
offsetBytes := arr .Data ().Offset () * int (unsafe .Sizeof (z ))
sizes = memory .SliceBuffer (sizes , offsetBytes , requiredBytes )
} else {
sizes .Retain ()
}
return sizes
}
func (w *recordEncoder ) getZeroBasedListViewOffsets (arr array .VarLenListLike ) (*memory .Buffer , int64 , int64 ) {
if arr .Len () == 0 {
return nil , 0 , 0
}
var (
outOffsets *memory .Buffer
minOff , maxEnd int64
)
switch v := arr .(type ) {
case *array .ListView :
voffsets , outOff , outEnd := getZeroBasedListViewOffsets [int32 ](w .mem , v )
outOffsets = voffsets
minOff , maxEnd = int64 (outOff ), int64 (outEnd )
case *array .LargeListView :
outOffsets , minOff , maxEnd = getZeroBasedListViewOffsets [int64 ](w .mem , v )
}
return outOffsets , minOff , maxEnd
}
func (w *recordEncoder ) getListViewSizes (arr array .VarLenListLike ) *memory .Buffer {
if arr .Len () == 0 {
return nil
}
switch v := arr .(type ) {
case *array .ListView :
return getListViewSizes [int32 ](v )
case *array .LargeListView :
return getListViewSizes [int64 ](v )
}
return nil
}
func (w *recordEncoder ) rebaseDenseUnionValueOffsets (arr *array .DenseUnion , offsets , lengths []int32 ) *memory .Buffer {
unshiftedOffsets := arr .RawValueOffsets ()
codes := arr .RawTypeCodes ()
shiftedOffsetsBuf := memory .NewResizableBuffer (w .mem )
shiftedOffsetsBuf .Resize (arrow .Int32Traits .BytesRequired (arr .Len ()))
shiftedOffsets := arrow .Int32Traits .CastFromBytes (shiftedOffsetsBuf .Bytes ())
for i , c := range codes {
if offsets [c ] == -1 {
offsets [c ] = unshiftedOffsets [i ]
shiftedOffsets [i ] = 0
} else {
shiftedOffsets [i ] = unshiftedOffsets [i ] - offsets [c ]
}
lengths [c ] = max (lengths [c ], shiftedOffsets [i ]+1 )
}
return shiftedOffsetsBuf
}
func (w *recordEncoder ) Encode (p *Payload , rec arrow .RecordBatch ) error {
if err := w .encode (p , rec ); err != nil {
return err
}
return w .encodeMetadata (p , rec .NumRows ())
}
func (w *recordEncoder ) encodeMetadata (p *Payload , nrows int64 ) error {
p .meta = writeRecordMessage (w .mem , nrows , p .size , w .fields , w .meta , w .codec , w .variadicCounts )
return nil
}
func newTruncatedBitmap(mem memory .Allocator , offset , length int64 , input *memory .Buffer ) *memory .Buffer {
if input == nil {
return nil
}
minLength := paddedLength (bitutil .BytesForBits (length ), kArrowAlignment )
switch {
case offset != 0 || minLength < int64 (input .Len ()):
buf := memory .NewResizableBuffer (mem )
buf .Resize (int (minLength ))
bitutil .CopyBitmap (input .Bytes (), int (offset ), int (length ), buf .Bytes (), 0 )
return buf
default :
input .Retain ()
return input
}
}
func getTruncatedBuffer(offset , length int64 , byteWidth int32 , buf *memory .Buffer ) *memory .Buffer {
if buf == nil {
return buf
}
paddedLen := paddedLength (length *int64 (byteWidth ), kArrowAlignment )
if offset != 0 || paddedLen < int64 (buf .Len ()) {
return memory .SliceBuffer (buf , int (offset *int64 (byteWidth )), int (min (paddedLen , int64 (buf .Len ()))))
}
buf .Retain ()
return buf
}
func needTruncate(offset int64 , buf *memory .Buffer , minLength int64 ) bool {
if buf == nil {
return false
}
return offset != 0 || minLength < int64 (buf .Len ())
}
func GetRecordBatchPayload (batch arrow .RecordBatch , opts ...Option ) (Payload , error ) {
cfg := newConfig (opts ...)
var (
data = Payload {msg : MessageRecordBatch }
enc = newRecordEncoder (
cfg .alloc ,
0 ,
kMaxNestingDepth ,
true ,
cfg .codec ,
cfg .compressNP ,
cfg .minSpaceSavings ,
make ([]compressor , cfg .compressNP ),
)
)
err := enc .Encode (&data , batch )
if err != nil {
return Payload {}, err
}
return data , nil
}
func GetSchemaPayload (schema *arrow .Schema , mem memory .Allocator ) Payload {
var mapper dictutils .Mapper
mapper .ImportSchema (schema )
ps := payloadFromSchema (schema , mem , &mapper )
return ps [0 ]
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .