package array
import (
"bytes"
"fmt"
"iter"
"strings"
"sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/json"
)
type RecordReader interface {
Retain ()
Release ()
Schema () *arrow .Schema
Next () bool
RecordBatch () arrow .RecordBatch
Record () arrow .Record
Err () error
}
type simpleRecords struct {
refCount atomic .Int64
schema *arrow .Schema
recs []arrow .RecordBatch
cur arrow .RecordBatch
}
func NewRecordReader (schema *arrow .Schema , recs []arrow .RecordBatch ) (RecordReader , error ) {
rs := &simpleRecords {
schema : schema ,
recs : recs ,
cur : nil ,
}
rs .refCount .Add (1 )
for _ , rec := range rs .recs {
rec .Retain ()
}
for _ , rec := range recs {
if !rec .Schema ().Equal (rs .schema ) {
rs .Release ()
return nil , fmt .Errorf ("arrow/array: mismatch schema" )
}
}
return rs , nil
}
func (rs *simpleRecords ) Retain () {
rs .refCount .Add (1 )
}
func (rs *simpleRecords ) Release () {
debug .Assert (rs .refCount .Load () > 0 , "too many releases" )
if rs .refCount .Add (-1 ) == 0 {
if rs .cur != nil {
rs .cur .Release ()
}
for _ , rec := range rs .recs {
rec .Release ()
}
rs .recs = nil
}
}
func (rs *simpleRecords ) Schema () *arrow .Schema { return rs .schema }
func (rs *simpleRecords ) RecordBatch () arrow .RecordBatch { return rs .cur }
func (rs *simpleRecords ) Record () arrow .Record { return rs .RecordBatch () }
func (rs *simpleRecords ) Next () bool {
if len (rs .recs ) == 0 {
return false
}
if rs .cur != nil {
rs .cur .Release ()
}
rs .cur = rs .recs [0 ]
rs .recs = rs .recs [1 :]
return true
}
func (rs *simpleRecords ) Err () error { return nil }
type simpleRecord struct {
refCount atomic .Int64
schema *arrow .Schema
rows int64
arrs []arrow .Array
}
func NewRecordBatch (schema *arrow .Schema , cols []arrow .Array , nrows int64 ) arrow .RecordBatch {
rec := &simpleRecord {
schema : schema ,
rows : nrows ,
arrs : make ([]arrow .Array , len (cols )),
}
rec .refCount .Add (1 )
copy (rec .arrs , cols )
for _ , arr := range rec .arrs {
arr .Retain ()
}
if rec .rows < 0 {
switch len (rec .arrs ) {
case 0 :
rec .rows = 0
default :
rec .rows = int64 (rec .arrs [0 ].Len ())
}
}
err := rec .validate ()
if err != nil {
rec .Release ()
panic (err )
}
return rec
}
func NewRecord (schema *arrow .Schema , cols []arrow .Array , nrows int64 ) arrow .Record {
return NewRecordBatch (schema , cols , nrows )
}
func (rec *simpleRecord ) SetColumn (i int , arr arrow .Array ) (arrow .RecordBatch , error ) {
if i < 0 || i >= len (rec .arrs ) {
return nil , fmt .Errorf ("arrow/array: column index out of range [0, %d): got=%d" , len (rec .arrs ), i )
}
if arr .Len () != int (rec .rows ) {
return nil , fmt .Errorf ("arrow/array: mismatch number of rows in column %q: got=%d, want=%d" ,
rec .schema .Field (i ).Name ,
arr .Len (), rec .rows ,
)
}
f := rec .schema .Field (i )
if !arrow .TypeEqual (f .Type , arr .DataType ()) {
return nil , fmt .Errorf ("arrow/array: column %q type mismatch: got=%v, want=%v" ,
f .Name ,
arr .DataType (), f .Type ,
)
}
arrs := make ([]arrow .Array , len (rec .arrs ))
copy (arrs , rec .arrs )
arrs [i ] = arr
return NewRecordBatch (rec .schema , arrs , rec .rows ), nil
}
func (rec *simpleRecord ) validate () error {
if rec .rows == 0 && len (rec .arrs ) == 0 {
return nil
}
if len (rec .arrs ) != rec .schema .NumFields () {
return fmt .Errorf ("arrow/array: number of columns/fields mismatch" )
}
for i , arr := range rec .arrs {
f := rec .schema .Field (i )
if int64 (arr .Len ()) < rec .rows {
return fmt .Errorf ("arrow/array: mismatch number of rows in column %q: got=%d, want=%d" ,
f .Name ,
arr .Len (), rec .rows ,
)
}
if !arrow .TypeEqual (f .Type , arr .DataType ()) {
return fmt .Errorf ("arrow/array: column %q type mismatch: got=%v, want=%v" ,
f .Name ,
arr .DataType (), f .Type ,
)
}
}
return nil
}
func (rec *simpleRecord ) Retain () {
rec .refCount .Add (1 )
}
func (rec *simpleRecord ) Release () {
debug .Assert (rec .refCount .Load () > 0 , "too many releases" )
if rec .refCount .Add (-1 ) == 0 {
for _ , arr := range rec .arrs {
arr .Release ()
}
rec .arrs = nil
}
}
func (rec *simpleRecord ) Schema () *arrow .Schema { return rec .schema }
func (rec *simpleRecord ) NumRows () int64 { return rec .rows }
func (rec *simpleRecord ) NumCols () int64 { return int64 (len (rec .arrs )) }
func (rec *simpleRecord ) Columns () []arrow .Array { return rec .arrs }
func (rec *simpleRecord ) Column (i int ) arrow .Array { return rec .arrs [i ] }
func (rec *simpleRecord ) ColumnName (i int ) string { return rec .schema .Field (i ).Name }
func (rec *simpleRecord ) NewSlice (i , j int64 ) arrow .RecordBatch {
arrs := make ([]arrow .Array , len (rec .arrs ))
for ii , arr := range rec .arrs {
arrs [ii ] = NewSlice (arr , i , j )
}
defer func () {
for _ , arr := range arrs {
arr .Release ()
}
}()
return NewRecordBatch (rec .schema , arrs , j -i )
}
func (rec *simpleRecord ) String () string {
o := new (strings .Builder )
fmt .Fprintf (o , "record:\n %v\n" , rec .schema )
fmt .Fprintf (o , " rows: %d\n" , rec .rows )
for i , col := range rec .arrs {
fmt .Fprintf (o , " col[%d][%s]: %v\n" , i , rec .schema .Field (i ).Name , col )
}
return o .String ()
}
func (rec *simpleRecord ) MarshalJSON () ([]byte , error ) {
arr := RecordToStructArray (rec )
defer arr .Release ()
return arr .MarshalJSON ()
}
type RecordBuilder struct {
refCount atomic .Int64
mem memory .Allocator
schema *arrow .Schema
fields []Builder
}
func NewRecordBuilder (mem memory .Allocator , schema *arrow .Schema ) *RecordBuilder {
b := &RecordBuilder {
mem : mem ,
schema : schema ,
fields : make ([]Builder , schema .NumFields ()),
}
b .refCount .Add (1 )
for i := 0 ; i < schema .NumFields (); i ++ {
b .fields [i ] = NewBuilder (b .mem , schema .Field (i ).Type )
}
return b
}
func (b *RecordBuilder ) Retain () {
b .refCount .Add (1 )
}
func (b *RecordBuilder ) Release () {
debug .Assert (b .refCount .Load () > 0 , "too many releases" )
if b .refCount .Add (-1 ) == 0 {
for _ , f := range b .fields {
f .Release ()
}
b .fields = nil
}
}
func (b *RecordBuilder ) Schema () *arrow .Schema { return b .schema }
func (b *RecordBuilder ) Fields () []Builder { return b .fields }
func (b *RecordBuilder ) Field (i int ) Builder { return b .fields [i ] }
func (b *RecordBuilder ) Reserve (size int ) {
for _ , f := range b .fields {
f .Reserve (size )
}
}
func (b *RecordBuilder ) NewRecordBatch () arrow .RecordBatch {
cols := make ([]arrow .Array , len (b .fields ))
rows := int64 (0 )
defer func (cols []arrow .Array ) {
for _ , col := range cols {
if col == nil {
continue
}
col .Release ()
}
}(cols )
for i , f := range b .fields {
cols [i ] = f .NewArray ()
irow := int64 (cols [i ].Len ())
if i > 0 && irow != rows {
panic (fmt .Errorf ("arrow/array: field %d has %d rows. want=%d" , i , irow , rows ))
}
rows = irow
}
return NewRecordBatch (b .schema , cols , rows )
}
func (b *RecordBuilder ) NewRecord () arrow .Record {
return b .NewRecordBatch ()
}
func (b *RecordBuilder ) UnmarshalJSON (data []byte ) error {
dec := json .NewDecoder (bytes .NewReader (data ))
t , err := dec .Token ()
if err != nil {
return err
}
if delim , ok := t .(json .Delim ); !ok || delim != '{' {
return fmt .Errorf ("record should start with '{', not %s" , t )
}
keylist := make (map [string ]bool )
for dec .More () {
keyTok , err := dec .Token ()
if err != nil {
return err
}
key := keyTok .(string )
if keylist [key ] {
return fmt .Errorf ("key %s shows up twice in row to be decoded" , key )
}
keylist [key ] = true
indices := b .schema .FieldIndices (key )
if len (indices ) == 0 {
var extra interface {}
if err := dec .Decode (&extra ); err != nil {
return err
}
continue
}
if err := b .fields [indices [0 ]].UnmarshalOne (dec ); err != nil {
return err
}
}
for i := 0 ; i < b .schema .NumFields (); i ++ {
if !keylist [b .schema .Field (i ).Name ] {
b .fields [i ].AppendNull ()
}
}
return nil
}
type iterReader struct {
refCount atomic .Int64
schema *arrow .Schema
cur arrow .RecordBatch
next func () (arrow .RecordBatch , error , bool )
stop func ()
err error
}
func (ir *iterReader ) Schema () *arrow .Schema { return ir .schema }
func (ir *iterReader ) Retain () { ir .refCount .Add (1 ) }
func (ir *iterReader ) Release () {
debug .Assert (ir .refCount .Load () > 0 , "too many releases" )
if ir .refCount .Add (-1 ) == 0 {
ir .stop ()
ir .schema , ir .next = nil , nil
if ir .cur != nil {
ir .cur .Release ()
}
}
}
func (ir *iterReader ) RecordBatch () arrow .RecordBatch { return ir .cur }
func (ir *iterReader ) Record () arrow .Record { return ir .RecordBatch () }
func (ir *iterReader ) Err () error { return ir .err }
func (ir *iterReader ) Next () bool {
if ir .cur != nil {
ir .cur .Release ()
}
var ok bool
ir .cur , ir .err , ok = ir .next ()
if ir .err != nil {
ir .stop ()
return false
}
return ok
}
func ReaderFromIter (schema *arrow .Schema , itr iter .Seq2 [arrow .RecordBatch , error ]) RecordReader {
next , stop := iter .Pull2 (itr )
rdr := &iterReader {
schema : schema ,
next : next ,
stop : stop ,
}
rdr .refCount .Add (1 )
return rdr
}
func IterFromReader (rdr RecordReader ) iter .Seq2 [arrow .RecordBatch , error ] {
rdr .Retain ()
return func (yield func (arrow .RecordBatch , error ) bool ) {
defer rdr .Release ()
for rdr .Next () {
if !yield (rdr .RecordBatch (), nil ) {
return
}
}
if rdr .Err () != nil {
yield (nil , rdr .Err ())
}
}
}
var (
_ arrow .RecordBatch = (*simpleRecord )(nil )
_ RecordReader = (*simpleRecords )(nil )
)
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .