package ipc
import (
"errors"
"fmt"
"io"
"sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/endian"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/internal/dictutils"
"github.com/apache/arrow-go/v18/arrow/internal/flatbuf"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/utils"
)
type Reader struct {
r MessageReader
schema *arrow .Schema
refCount atomic .Int64
rec arrow .RecordBatch
err error
memo dictutils .Memo
readInitialDicts bool
done bool
swapEndianness bool
ensureNativeEndian bool
expectedSchema *arrow .Schema
mem memory .Allocator
}
func NewReaderFromMessageReader (r MessageReader , opts ...Option ) (reader *Reader , err error ) {
defer func () {
if pErr := recover (); pErr != nil {
err = utils .FormatRecoveredError ("arrow/ipc: unknown error while reading" , pErr )
}
}()
cfg := newConfig ()
for _ , opt := range opts {
opt (cfg )
}
rr := &Reader {
r : r ,
refCount : atomic .Int64 {},
memo : dictutils .NewMemo (),
mem : cfg .alloc ,
ensureNativeEndian : cfg .ensureNativeEndian ,
expectedSchema : cfg .schema ,
}
rr .refCount .Add (1 )
if !cfg .noAutoSchema {
if err := rr .readSchema (cfg .schema ); err != nil {
return nil , err
}
}
return rr , nil
}
func NewReader (r io .Reader , opts ...Option ) (*Reader , error ) {
return NewReaderFromMessageReader (NewMessageReader (r , opts ...), opts ...)
}
func (r *Reader ) Err () error { return r .err }
func (r *Reader ) Schema () *arrow .Schema {
if r .schema == nil {
if err := r .readSchema (r .expectedSchema ); err != nil {
r .err = fmt .Errorf ("arrow/ipc: could not read schema from stream: %w" , err )
r .done = true
}
}
return r .schema
}
func (r *Reader ) readSchema (schema *arrow .Schema ) error {
msg , err := r .r .Message ()
if err != nil {
return fmt .Errorf ("arrow/ipc: could not read message schema: %w" , err )
}
if msg .Type () != MessageSchema {
return fmt .Errorf ("arrow/ipc: invalid message type (got=%v, want=%v)" , msg .Type (), MessageSchema )
}
var schemaFB flatbuf .Schema
initFB (&schemaFB , msg .msg .Header )
r .schema , err = schemaFromFB (&schemaFB , &r .memo )
if err != nil {
return fmt .Errorf ("arrow/ipc: could not decode schema from message schema: %w" , err )
}
if schema != nil && !schema .Equal (r .schema ) {
return errInconsistentSchema
}
if r .ensureNativeEndian && !r .schema .IsNativeEndian () {
r .swapEndianness = true
r .schema = r .schema .WithEndianness (endian .NativeEndian )
}
return nil
}
func (r *Reader ) Retain () {
r .refCount .Add (1 )
}
func (r *Reader ) Release () {
debug .Assert (r .refCount .Load () > 0 , "too many releases" )
if r .refCount .Add (-1 ) == 0 {
if r .rec != nil {
r .rec .Release ()
r .rec = nil
}
if r .r != nil {
r .r .Release ()
r .r = nil
}
r .memo .Clear ()
}
}
func (r *Reader ) Next () bool {
if r .rec != nil {
r .rec .Release ()
r .rec = nil
}
if r .err != nil || r .done {
return false
}
return r .next ()
}
func (r *Reader ) getInitialDicts () bool {
var msg *Message
numDicts := r .memo .Mapper .NumDicts ()
for i := 0 ; i < numDicts ; i ++ {
msg , r .err = r .r .Message ()
if r .err != nil {
r .done = true
if r .err == io .EOF {
if i == 0 {
r .err = nil
} else {
r .err = fmt .Errorf ("arrow/ipc: IPC stream ended without reading the expected (%d) dictionaries" , numDicts )
}
}
return false
}
if msg .Type () != MessageDictionaryBatch {
r .err = fmt .Errorf ("arrow/ipc: IPC stream did not have the expected (%d) dictionaries at the start of the stream" , numDicts )
}
if _ , err := readDictionary (&r .memo , msg .meta , msg .body , r .swapEndianness , r .mem ); err != nil {
r .done = true
r .err = err
return false
}
}
r .readInitialDicts = true
return true
}
func (r *Reader ) next () bool {
defer func () {
if pErr := recover (); pErr != nil {
r .err = utils .FormatRecoveredError ("arrow/ipc: unknown error while reading" , pErr )
}
}()
if r .schema == nil {
if err := r .readSchema (r .expectedSchema ); err != nil {
r .err = fmt .Errorf ("arrow/ipc: could not read schema from stream: %w" , err )
r .done = true
return false
}
}
if !r .readInitialDicts && !r .getInitialDicts () {
return false
}
var msg *Message
msg , r .err = r .r .Message ()
for msg != nil && msg .Type () == MessageDictionaryBatch {
if _, r .err = readDictionary (&r .memo , msg .meta , msg .body , r .swapEndianness , r .mem ); r .err != nil {
r .done = true
return false
}
msg , r .err = r .r .Message ()
}
if r .err != nil {
r .done = true
if errors .Is (r .err , io .EOF ) {
r .err = nil
}
return false
}
if got , want := msg .Type (), MessageRecordBatch ; got != want {
r .err = fmt .Errorf ("arrow/ipc: invalid message type (got=%v, want=%v" , got , want )
return false
}
r .rec = newRecordBatch (r .schema , &r .memo , msg .meta , msg .body , r .swapEndianness , r .mem )
return true
}
func (r *Reader ) RecordBatch () arrow .RecordBatch {
return r .rec
}
func (r *Reader ) Record () arrow .Record {
return r .RecordBatch ()
}
func (r *Reader ) Read () (arrow .RecordBatch , error ) {
if r .rec != nil {
r .rec .Release ()
r .rec = nil
}
if !r .next () {
if r .done && r .err == nil {
return nil , io .EOF
}
return nil , r .err
}
return r .rec , nil
}
var _ array .RecordReader = (*Reader )(nil )
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .