package parquet
import (
"errors"
"fmt"
"io"
"reflect"
)
type GenericReader [T any ] struct {
base Reader
read readFunc [T ]
}
func NewGenericReader [T any ](input io .ReaderAt , options ...ReaderOption ) *GenericReader [T ] {
c , err := NewReaderConfig (options ...)
if err != nil {
panic (err )
}
f , err := openFile (input )
if err != nil {
panic (err )
}
rowGroup := fileRowGroupOf (f )
t := typeOf [T ]()
if c .Schema == nil {
if t == nil {
c .Schema = rowGroup .Schema ()
} else {
c .Schema = schemaOf (dereference (t ))
}
}
r := &GenericReader [T ]{
base : Reader {
file : reader {
schema : c .Schema ,
rowGroup : rowGroup ,
},
},
}
if !nodesAreEqual (c .Schema , f .schema ) {
r .base .file .rowGroup = convertRowGroupTo (r .base .file .rowGroup , c .Schema )
}
r .base .read .init (r .base .file .schema , r .base .file .rowGroup )
r .read = readFuncOf [T ](t , r .base .file .schema )
return r
}
func NewGenericRowGroupReader [T any ](rowGroup RowGroup , options ...ReaderOption ) *GenericReader [T ] {
c , err := NewReaderConfig (options ...)
if err != nil {
panic (err )
}
t := typeOf [T ]()
if c .Schema == nil {
if t == nil {
c .Schema = rowGroup .Schema ()
} else {
c .Schema = schemaOf (dereference (t ))
}
}
r := &GenericReader [T ]{
base : Reader {
file : reader {
schema : c .Schema ,
rowGroup : rowGroup ,
},
},
}
if !nodesAreEqual (c .Schema , rowGroup .Schema ()) {
r .base .file .rowGroup = convertRowGroupTo (r .base .file .rowGroup , c .Schema )
}
r .base .read .init (r .base .file .schema , r .base .file .rowGroup )
r .read = readFuncOf [T ](t , r .base .file .schema )
return r
}
func (r *GenericReader [T ]) Reset () {
r .base .Reset ()
}
func (r *GenericReader [T ]) Read (rows []T ) (int , error ) {
return r .read (r , rows )
}
func (r *GenericReader [T ]) ReadRows (rows []Row ) (int , error ) {
return r .base .ReadRows (rows )
}
func (r *GenericReader [T ]) Schema () *Schema {
return r .base .Schema ()
}
func (r *GenericReader [T ]) NumRows () int64 {
return r .base .NumRows ()
}
func (r *GenericReader [T ]) SeekToRow (rowIndex int64 ) error {
return r .base .SeekToRow (rowIndex )
}
func (r *GenericReader [T ]) Close () error {
return r .base .Close ()
}
func (r *GenericReader [T ]) readRows (rows []T ) (int , error ) {
nRequest := len (rows )
if cap (r .base .rowbuf ) < nRequest {
r .base .rowbuf = make ([]Row , nRequest )
} else {
r .base .rowbuf = r .base .rowbuf [:nRequest ]
}
var n , nTotal int
var err error
for {
n , err = r .base .ReadRows (r .base .rowbuf [:nRequest -nTotal ])
if n > 0 {
schema := r .base .Schema ()
for i , row := range r .base .rowbuf [:n ] {
if err2 := schema .Reconstruct (&rows [nTotal +i ], row ); err2 != nil {
return nTotal + i , err2
}
}
}
nTotal += n
if n == 0 || nTotal == nRequest || err != nil {
break
}
}
return nTotal , err
}
var (
_ Rows = (*GenericReader [any ])(nil )
_ RowReaderWithSchema = (*Reader )(nil )
_ Rows = (*GenericReader [struct {}])(nil )
_ RowReaderWithSchema = (*GenericReader [struct {}])(nil )
_ Rows = (*GenericReader [map [struct {}]struct {}])(nil )
_ RowReaderWithSchema = (*GenericReader [map [struct {}]struct {}])(nil )
)
type readFunc[T any ] func (*GenericReader [T ], []T ) (int , error )
func readFuncOf[T any ](t reflect .Type , schema *Schema ) readFunc [T ] {
if t == nil {
return (*GenericReader [T ]).readRows
}
switch t .Kind () {
case reflect .Interface , reflect .Map :
return (*GenericReader [T ]).readRows
case reflect .Struct :
return (*GenericReader [T ]).readRows
case reflect .Pointer :
if e := t .Elem (); e .Kind () == reflect .Struct {
return (*GenericReader [T ]).readRows
}
}
panic ("cannot create reader for values of type " + t .String ())
}
type Reader struct {
seen reflect .Type
file reader
read reader
rowIndex int64
rowbuf []Row
}
func NewReader (input io .ReaderAt , options ...ReaderOption ) *Reader {
c , err := NewReaderConfig (options ...)
if err != nil {
panic (err )
}
f , err := openFile (input )
if err != nil {
panic (err )
}
r := &Reader {
file : reader {
schema : f .schema ,
rowGroup : fileRowGroupOf (f ),
},
}
if c .Schema != nil {
r .file .schema = c .Schema
r .file .rowGroup = convertRowGroupTo (r .file .rowGroup , c .Schema )
}
r .read .init (r .file .schema , r .file .rowGroup )
return r
}
func openFile(input io .ReaderAt ) (*File , error ) {
f , _ := input .(*File )
if f != nil {
return f , nil
}
n , err := sizeOf (input )
if err != nil {
return nil , err
}
return OpenFile (input , n )
}
func fileRowGroupOf(f *File ) RowGroup {
switch rowGroups := f .RowGroups (); len (rowGroups ) {
case 0 :
return newEmptyRowGroup (f .Schema ())
case 1 :
return rowGroups [0 ]
default :
return newMultiRowGroup (f .config .ReadMode , rowGroups ...)
}
}
func NewRowGroupReader (rowGroup RowGroup , options ...ReaderOption ) *Reader {
c , err := NewReaderConfig (options ...)
if err != nil {
panic (err )
}
if c .Schema != nil {
rowGroup = convertRowGroupTo (rowGroup , c .Schema )
}
r := &Reader {
file : reader {
schema : rowGroup .Schema (),
rowGroup : rowGroup ,
},
}
r .read .init (r .file .schema , r .file .rowGroup )
return r
}
func convertRowGroupTo(rowGroup RowGroup , schema *Schema ) RowGroup {
if rowGroupSchema := rowGroup .Schema (); !nodesAreEqual (schema , rowGroupSchema ) {
conv , err := Convert (schema , rowGroupSchema )
if err != nil {
panic (err )
}
rowGroup = ConvertRowGroup (rowGroup , conv )
}
return rowGroup
}
func sizeOf(r io .ReaderAt ) (int64 , error ) {
switch f := r .(type ) {
case interface { Size () int64 }:
return f .Size (), nil
case io .Seeker :
off , err := f .Seek (0 , io .SeekCurrent )
if err != nil {
return 0 , err
}
end , err := f .Seek (0 , io .SeekEnd )
if err != nil {
return 0 , err
}
_, err = f .Seek (off , io .SeekStart )
return end , err
default :
return 0 , fmt .Errorf ("cannot determine length of %T" , r )
}
}
func (r *Reader ) Reset () {
r .file .Reset ()
r .read .Reset ()
r .rowIndex = 0
clearRows (r .rowbuf )
}
func (r *Reader ) Read (row interface {}) error {
if rowType := dereference (reflect .TypeOf (row )); rowType .Kind () == reflect .Struct {
if r .seen != rowType {
if err := r .updateReadSchema (rowType ); err != nil {
return fmt .Errorf ("cannot read parquet row into go value of type %T: %w" , row , err )
}
}
}
if err := r .read .SeekToRow (r .rowIndex ); err != nil {
if errors .Is (err , io .ErrClosedPipe ) {
return io .EOF
}
return fmt .Errorf ("seeking reader to row %d: %w" , r .rowIndex , err )
}
if cap (r .rowbuf ) == 0 {
r .rowbuf = make ([]Row , 1 )
} else {
r .rowbuf = r .rowbuf [:1 ]
}
n , err := r .read .ReadRows (r .rowbuf [:])
if n == 0 {
return err
}
r .rowIndex ++
return r .read .schema .Reconstruct (row , r .rowbuf [0 ])
}
func (r *Reader ) updateReadSchema (rowType reflect .Type ) error {
schema := schemaOf (rowType )
if nodesAreEqual (schema , r .file .schema ) {
r .read .init (schema , r .file .rowGroup )
} else {
conv , err := Convert (schema , r .file .schema )
if err != nil {
return err
}
r .read .init (schema , ConvertRowGroup (r .file .rowGroup , conv ))
}
r .seen = rowType
return nil
}
func (r *Reader ) ReadRows (rows []Row ) (int , error ) {
if err := r .file .SeekToRow (r .rowIndex ); err != nil {
return 0 , err
}
n , err := r .file .ReadRows (rows )
r .rowIndex += int64 (n )
return n , err
}
func (r *Reader ) Schema () *Schema { return r .file .schema }
func (r *Reader ) NumRows () int64 { return r .file .rowGroup .NumRows () }
func (r *Reader ) SeekToRow (rowIndex int64 ) error {
if err := r .file .SeekToRow (rowIndex ); err != nil {
return err
}
r .rowIndex = rowIndex
return nil
}
func (r *Reader ) Close () error {
if err := r .read .Close (); err != nil {
return err
}
if err := r .file .Close (); err != nil {
return err
}
return nil
}
type reader struct {
schema *Schema
rowGroup RowGroup
rows Rows
rowIndex int64
}
func (r *reader ) init (schema *Schema , rowGroup RowGroup ) {
r .schema = schema
r .rowGroup = rowGroup
r .Reset ()
}
func (r *reader ) Reset () {
r .rowIndex = 0
if rows , ok := r .rows .(interface { Reset () }); ok {
rows .Reset ()
return
}
if r .rows != nil {
r .rows .Close ()
r .rows = nil
}
}
func (r *reader ) ReadRows (rows []Row ) (int , error ) {
if r .rowGroup == nil {
return 0 , io .EOF
}
if r .rows == nil {
r .rows = r .rowGroup .Rows ()
if r .rowIndex > 0 {
if err := r .rows .SeekToRow (r .rowIndex ); err != nil {
return 0 , err
}
}
}
n , err := r .rows .ReadRows (rows )
r .rowIndex += int64 (n )
return n , err
}
func (r *reader ) SeekToRow (rowIndex int64 ) error {
if r .rowGroup == nil {
return io .ErrClosedPipe
}
if rowIndex != r .rowIndex {
if r .rows != nil {
if err := r .rows .SeekToRow (rowIndex ); err != nil {
return err
}
}
r .rowIndex = rowIndex
}
return nil
}
func (r *reader ) Close () (err error ) {
r .rowGroup = nil
if r .rows != nil {
err = r .rows .Close ()
}
return err
}
var (
_ Rows = (*Reader )(nil )
_ RowReaderWithSchema = (*Reader )(nil )
_ RowReader = (*reader )(nil )
_ RowSeeker = (*reader )(nil )
)
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .