package array
import (
"errors"
"fmt"
"io"
"strings"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/hashing"
"github.com/apache/arrow-go/v18/internal/json"
)
func min(a , b int ) int {
if a < b {
return a
}
return b
}
type fromJSONCfg struct {
multiDocument bool
startOffset int64
useNumber bool
}
type FromJSONOption func (*fromJSONCfg )
func WithMultipleDocs () FromJSONOption {
return func (c *fromJSONCfg ) {
c .multiDocument = true
}
}
func WithStartOffset (off int64 ) FromJSONOption {
return func (c *fromJSONCfg ) {
c .startOffset = off
}
}
func WithUseNumber () FromJSONOption {
return func (c *fromJSONCfg ) {
c .useNumber = true
}
}
func FromJSON (mem memory .Allocator , dt arrow .DataType , r io .Reader , opts ...FromJSONOption ) (arr arrow .Array , offset int64 , err error ) {
var cfg fromJSONCfg
for _ , o := range opts {
o (&cfg )
}
if cfg .startOffset != 0 {
seeker , ok := r .(io .ReadSeeker )
if !ok {
return nil , 0 , errors .New ("using StartOffset option requires reader to be a ReadSeeker, cannot seek" )
}
seeker .Seek (cfg .startOffset , io .SeekStart )
}
bldr := NewBuilder (mem , dt )
defer bldr .Release ()
dec := json .NewDecoder (r )
defer func () {
if errors .Is (err , io .EOF ) {
err = fmt .Errorf ("failed parsing json: %w" , io .ErrUnexpectedEOF )
}
}()
if cfg .useNumber {
dec .UseNumber ()
}
if !cfg .multiDocument {
t , err := dec .Token ()
if err != nil {
return nil , dec .InputOffset (), err
}
if delim , ok := t .(json .Delim ); !ok || delim != '[' {
return nil , dec .InputOffset (), fmt .Errorf ("json doc must be an array, found %s" , delim )
}
}
if err = bldr .Unmarshal (dec ); err != nil {
return nil , dec .InputOffset (), err
}
if !cfg .multiDocument {
if _, err = dec .Token (); err != nil {
return nil , dec .InputOffset (), err
}
}
return bldr .NewArray (), dec .InputOffset (), nil
}
func RecordToStructArray (rec arrow .RecordBatch ) *Struct {
cols := make ([]arrow .ArrayData , rec .NumCols ())
for i , c := range rec .Columns () {
cols [i ] = c .Data ()
}
data := NewData (arrow .StructOf (rec .Schema ().Fields ()...), int (rec .NumRows ()), []*memory .Buffer {nil }, cols , 0 , 0 )
defer data .Release ()
return NewStructData (data )
}
func RecordFromStructArray (in *Struct , schema *arrow .Schema ) arrow .RecordBatch {
if schema == nil {
schema = arrow .NewSchema (in .DataType ().(*arrow .StructType ).Fields (), nil )
}
return NewRecord (schema , in .fields , int64 (in .Len ()))
}
func RecordFromJSON (mem memory .Allocator , schema *arrow .Schema , r io .Reader , opts ...FromJSONOption ) (arrow .RecordBatch , int64 , error ) {
var cfg fromJSONCfg
for _ , o := range opts {
o (&cfg )
}
if cfg .startOffset != 0 {
seeker , ok := r .(io .ReadSeeker )
if !ok {
return nil , 0 , errors .New ("using StartOffset option requires reader to be a ReadSeeker, cannot seek" )
}
if _ , err := seeker .Seek (cfg .startOffset , io .SeekStart ); err != nil {
return nil , 0 , fmt .Errorf ("failed to seek to start offset %d: %w" , cfg .startOffset , err )
}
}
if mem == nil {
mem = memory .DefaultAllocator
}
bldr := NewRecordBuilder (mem , schema )
defer bldr .Release ()
dec := json .NewDecoder (r )
if cfg .useNumber {
dec .UseNumber ()
}
if !cfg .multiDocument {
t , err := dec .Token ()
if err != nil {
return nil , dec .InputOffset (), err
}
if delim , ok := t .(json .Delim ); !ok || delim != '[' {
return nil , dec .InputOffset (), fmt .Errorf ("json doc must be an array, found %s" , delim )
}
for dec .More () {
if err := dec .Decode (bldr ); err != nil {
return nil , dec .InputOffset (), fmt .Errorf ("failed to decode json: %w" , err )
}
}
if _, err = dec .Token (); err != nil {
return nil , dec .InputOffset (), fmt .Errorf ("failed to decode json: %w" , err )
}
return bldr .NewRecord (), dec .InputOffset (), nil
}
for {
err := dec .Decode (bldr )
if err != nil {
if errors .Is (err , io .EOF ) {
break
}
return nil , dec .InputOffset (), fmt .Errorf ("failed to decode json: %w" , err )
}
}
return bldr .NewRecord (), dec .InputOffset (), nil
}
func RecordToJSON (rec arrow .RecordBatch , w io .Writer ) error {
enc := json .NewEncoder (w )
fields := rec .Schema ().Fields ()
cols := make (map [string ]interface {})
for i := 0 ; int64 (i ) < rec .NumRows (); i ++ {
for j , c := range rec .Columns () {
cols [fields [j ].Name ] = c .GetOneForMarshal (i )
}
if err := enc .Encode (cols ); err != nil {
return err
}
}
return nil
}
func TableFromJSON (mem memory .Allocator , sc *arrow .Schema , recJSON []string , opt ...FromJSONOption ) (arrow .Table , error ) {
batches := make ([]arrow .RecordBatch , len (recJSON ))
for i , batchJSON := range recJSON {
batch , _ , err := RecordFromJSON (mem , sc , strings .NewReader (batchJSON ), opt ...)
if err != nil {
return nil , err
}
defer batch .Release ()
batches [i ] = batch
}
return NewTableFromRecords (sc , batches ), nil
}
func GetDictArrayData (mem memory .Allocator , valueType arrow .DataType , memoTable hashing .MemoTable , startOffset int ) (*Data , error ) {
dictLen := memoTable .Size () - startOffset
buffers := []*memory .Buffer {nil , nil }
buffers [1 ] = memory .NewResizableBuffer (mem )
defer buffers [1 ].Release ()
switch tbl := memoTable .(type ) {
case hashing .NumericMemoTable :
nbytes := tbl .TypeTraits ().BytesRequired (dictLen )
buffers [1 ].Resize (nbytes )
tbl .WriteOutSubset (startOffset , buffers [1 ].Bytes ())
case *hashing .BinaryMemoTable :
switch valueType .ID () {
case arrow .BINARY , arrow .STRING :
buffers = append (buffers , memory .NewResizableBuffer (mem ))
defer buffers [2 ].Release ()
buffers [1 ].Resize (arrow .Int32Traits .BytesRequired (dictLen + 1 ))
offsets := arrow .Int32Traits .CastFromBytes (buffers [1 ].Bytes ())
tbl .CopyOffsetsSubset (startOffset , offsets )
valuesz := offsets [len (offsets )-1 ] - offsets [0 ]
buffers [2 ].Resize (int (valuesz ))
tbl .CopyValuesSubset (startOffset , buffers [2 ].Bytes ())
case arrow .LARGE_BINARY , arrow .LARGE_STRING :
buffers = append (buffers , memory .NewResizableBuffer (mem ))
defer buffers [2 ].Release ()
buffers [1 ].Resize (arrow .Int64Traits .BytesRequired (dictLen + 1 ))
offsets := arrow .Int64Traits .CastFromBytes (buffers [1 ].Bytes ())
tbl .CopyLargeOffsetsSubset (startOffset , offsets )
valuesz := offsets [len (offsets )-1 ] - offsets [0 ]
buffers [2 ].Resize (int (valuesz ))
tbl .CopyValuesSubset (startOffset , buffers [2 ].Bytes ())
default :
bw := int (bitutil .BytesForBits (int64 (valueType .(arrow .FixedWidthDataType ).BitWidth ())))
buffers [1 ].Resize (dictLen * bw )
tbl .CopyFixedWidthValues (startOffset , bw , buffers [1 ].Bytes ())
}
default :
return nil , fmt .Errorf ("arrow/array: dictionary unifier unimplemented type: %s" , valueType )
}
var nullcount int
if idx , ok := memoTable .GetNull (); ok && idx >= startOffset {
buffers [0 ] = memory .NewResizableBuffer (mem )
defer buffers [0 ].Release ()
nullcount = 1
buffers [0 ].Resize (int (bitutil .BytesForBits (int64 (dictLen ))))
memory .Set (buffers [0 ].Bytes (), 0xFF )
bitutil .ClearBit (buffers [0 ].Bytes (), idx )
}
return NewData (valueType , dictLen , buffers , nil , nullcount , 0 ), nil
}
func DictArrayFromJSON (mem memory .Allocator , dt *arrow .DictionaryType , indicesJSON , dictJSON string ) (arrow .Array , error ) {
indices , _ , err := FromJSON (mem , dt .IndexType , strings .NewReader (indicesJSON ))
if err != nil {
return nil , err
}
defer indices .Release ()
dict , _ , err := FromJSON (mem , dt .ValueType , strings .NewReader (dictJSON ))
if err != nil {
return nil , err
}
defer dict .Release ()
return NewDictionaryArray (dt , indices , dict ), nil
}
func ChunkedFromJSON (mem memory .Allocator , dt arrow .DataType , chunkStrs []string , opts ...FromJSONOption ) (*arrow .Chunked , error ) {
chunks := make ([]arrow .Array , len (chunkStrs ))
defer func () {
for _ , c := range chunks {
if c != nil {
c .Release ()
}
}
}()
var err error
for i , c := range chunkStrs {
chunks [i ], _, err = FromJSON (mem , dt , strings .NewReader (c ), opts ...)
if err != nil {
return nil , err
}
}
return arrow .NewChunked (dt , chunks ), nil
}
func getMaxBufferLen(dt arrow .DataType , length int ) int {
bufferLen := int (bitutil .BytesForBits (int64 (length )))
maxOf := func (bl int ) int {
if bl > bufferLen {
return bl
}
return bufferLen
}
switch dt := dt .(type ) {
case *arrow .DictionaryType :
bufferLen = maxOf (getMaxBufferLen (dt .ValueType , length ))
return maxOf (getMaxBufferLen (dt .IndexType , length ))
case *arrow .FixedSizeBinaryType :
return maxOf (dt .ByteWidth * length )
case arrow .FixedWidthDataType :
return maxOf (int (bitutil .BytesForBits (int64 (dt .BitWidth ()))) * length )
case *arrow .StructType :
for _ , f := range dt .Fields () {
bufferLen = maxOf (getMaxBufferLen (f .Type , length ))
}
return bufferLen
case *arrow .SparseUnionType :
bufferLen = maxOf (length )
for _ , f := range dt .Fields () {
bufferLen = maxOf (getMaxBufferLen (f .Type , length ))
}
return bufferLen
case *arrow .DenseUnionType :
bufferLen = maxOf (length )
bufferLen = maxOf (arrow .Int32SizeBytes * length )
for _ , f := range dt .Fields () {
bufferLen = maxOf (getMaxBufferLen (f .Type , 1 ))
}
return bufferLen
case arrow .OffsetsDataType :
return maxOf (dt .OffsetTypeTraits ().BytesRequired (length + 1 ))
case *arrow .FixedSizeListType :
return maxOf (getMaxBufferLen (dt .Elem (), int (dt .Len ())*length ))
case arrow .ExtensionType :
return maxOf (getMaxBufferLen (dt .StorageType (), length ))
default :
panic (fmt .Errorf ("arrow/array: arrayofnull not implemented for type %s" , dt ))
}
}
type nullArrayFactory struct {
mem memory .Allocator
dt arrow .DataType
len int
buf *memory .Buffer
}
func (n *nullArrayFactory ) create () *Data {
if n .buf == nil {
bufLen := getMaxBufferLen (n .dt , n .len )
n .buf = memory .NewResizableBuffer (n .mem )
n .buf .Resize (bufLen )
defer n .buf .Release ()
}
var (
dt = n .dt
bufs = []*memory .Buffer {memory .SliceBuffer (n .buf , 0 , int (bitutil .BytesForBits (int64 (n .len ))))}
childData []arrow .ArrayData
dictData arrow .ArrayData
)
defer bufs [0 ].Release ()
if ex , ok := dt .(arrow .ExtensionType ); ok {
dt = ex .StorageType ()
}
if nf , ok := dt .(arrow .NestedType ); ok {
childData = make ([]arrow .ArrayData , nf .NumFields ())
}
switch dt := dt .(type ) {
case *arrow .NullType :
case *arrow .DictionaryType :
bufs = append (bufs , n .buf )
arr := MakeArrayOfNull (n .mem , dt .ValueType , 0 )
defer arr .Release ()
dictData = arr .Data ()
case arrow .FixedWidthDataType :
bufs = append (bufs , n .buf )
case arrow .BinaryDataType :
bufs = append (bufs , n .buf , n .buf )
case arrow .OffsetsDataType :
bufs = append (bufs , n .buf )
childData [0 ] = n .createChild (dt , 0 , 0 )
defer childData [0 ].Release ()
case *arrow .FixedSizeListType :
childData [0 ] = n .createChild (dt , 0 , n .len *int (dt .Len ()))
defer childData [0 ].Release ()
case *arrow .StructType :
for i := range dt .Fields () {
childData [i ] = n .createChild (dt , i , n .len )
defer childData [i ].Release ()
}
case *arrow .RunEndEncodedType :
bldr := NewBuilder (n .mem , dt .RunEnds ())
defer bldr .Release ()
switch b := bldr .(type ) {
case *Int16Builder :
b .Append (int16 (n .len ))
case *Int32Builder :
b .Append (int32 (n .len ))
case *Int64Builder :
b .Append (int64 (n .len ))
}
childData [0 ] = bldr .newData ()
defer childData [0 ].Release ()
childData [1 ] = n .createChild (dt .Encoded (), 1 , 1 )
defer childData [1 ].Release ()
case arrow .UnionType :
bufs [0 ].Release ()
bufs [0 ] = nil
bufs = append (bufs , n .buf )
if dt .TypeCodes ()[0 ] != 0 {
bufs [1 ] = memory .NewResizableBuffer (n .mem )
bufs [1 ].Resize (n .len )
defer bufs [1 ].Release ()
memory .Set (bufs [1 ].Bytes (), byte (dt .TypeCodes ()[0 ]))
}
childLen := n .len
if dt .Mode () == arrow .DenseMode {
bufs = append (bufs , n .buf )
childLen = 1
}
for i := range dt .Fields () {
childData [i ] = n .createChild (dt , i , childLen )
defer childData [i ].Release ()
}
}
out := NewData (n .dt , n .len , bufs , childData , n .len , 0 )
if dictData != nil {
out .SetDictionary (dictData )
}
return out
}
func (n *nullArrayFactory ) createChild (_ arrow .DataType , i , length int ) *Data {
childFactory := &nullArrayFactory {
mem : n .mem , dt : n .dt .(arrow .NestedType ).Fields ()[i ].Type ,
len : length , buf : n .buf }
return childFactory .create ()
}
func MakeArrayOfNull (mem memory .Allocator , dt arrow .DataType , length int ) arrow .Array {
if dt .ID () == arrow .NULL {
return NewNull (length )
}
data := (&nullArrayFactory {mem : mem , dt : dt , len : length }).create ()
defer data .Release ()
return MakeFromData (data )
}
func stripNulls(s string ) string {
return strings .TrimRight (s , "\x00" )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .