package parquet
import (
"errors"
"fmt"
"io"
"reflect"
)
const (
defaultRowBufferSize = 42
)
type Row []Value
func MakeRow (columns ...[]Value ) Row { return AppendRow (nil , columns ...) }
func AppendRow (row Row , columns ...[]Value ) Row {
numValues := 0
for expectedColumnIndex , column := range columns {
numValues += len (column )
for _ , value := range column {
if value .columnIndex != ^int16 (expectedColumnIndex ) {
panic (fmt .Sprintf ("value of column %d has column index %d" , expectedColumnIndex , value .Column ()))
}
}
}
if capacity := cap (row ) - len (row ); capacity < numValues {
row = append (make (Row , 0 , len (row )+numValues ), row ...)
}
return appendRow (row , columns )
}
func appendRow(row Row , columns [][]Value ) Row {
for _ , column := range columns {
row = append (row , column ...)
}
return row
}
func (row Row ) Clone () Row {
clone := make (Row , len (row ))
for i := range row {
clone [i ] = row [i ].Clone ()
}
return clone
}
func (row Row ) Equal (other Row ) bool {
if len (row ) != len (other ) {
return false
}
for i := range row {
if !Equal (row [i ], other [i ]) {
return false
}
if row [i ].repetitionLevel != other [i ].repetitionLevel {
return false
}
if row [i ].definitionLevel != other [i ].definitionLevel {
return false
}
if row [i ].columnIndex != other [i ].columnIndex {
return false
}
}
return true
}
func (row Row ) Range (f func (columnIndex int , columnValues []Value ) bool ) {
columnIndex := 0
for i := 0 ; i < len (row ); {
j := i + 1
for j < len (row ) && row [j ].columnIndex == ^int16 (columnIndex ) {
j ++
}
if !f (columnIndex , row [i :j :j ]) {
break
}
columnIndex ++
i = j
}
}
type RowSeeker interface {
SeekToRow (int64 ) error
}
type RowReader interface {
ReadRows ([]Row ) (int , error )
}
type RowReaderFrom interface {
ReadRowsFrom (RowReader ) (int64 , error )
}
type RowReaderWithSchema interface {
RowReader
Schema () *Schema
}
type RowReadSeeker interface {
RowReader
RowSeeker
}
type RowWriter interface {
WriteRows ([]Row ) (int , error )
}
type RowWriterTo interface {
WriteRowsTo (RowWriter ) (int64 , error )
}
type RowWriterWithSchema interface {
RowWriter
Schema () *Schema
}
type RowReaderFunc func ([]Row ) (int , error )
func (f RowReaderFunc ) ReadRows (rows []Row ) (int , error ) { return f (rows ) }
type RowWriterFunc func ([]Row ) (int , error )
func (f RowWriterFunc ) WriteRows (rows []Row ) (int , error ) { return f (rows ) }
func MultiRowWriter (writers ...RowWriter ) RowWriter {
m := &multiRowWriter {writers : make ([]RowWriter , len (writers ))}
copy (m .writers , writers )
return m
}
type multiRowWriter struct { writers []RowWriter }
func (m *multiRowWriter ) WriteRows (rows []Row ) (int , error ) {
for _ , w := range m .writers {
n , err := w .WriteRows (rows )
if err != nil {
return n , err
}
if n != len (rows ) {
return n , io .ErrShortWrite
}
}
return len (rows ), nil
}
type forwardRowSeeker struct {
rows RowReader
seek int64
index int64
}
func (r *forwardRowSeeker ) ReadRows (rows []Row ) (int , error ) {
for {
n , err := r .rows .ReadRows (rows )
if n > 0 && r .index < r .seek {
skip := r .seek - r .index
r .index += int64 (n )
if skip >= int64 (n ) {
continue
}
for i , j := 0 , int (skip ); j < n ; i ++ {
rows [i ] = append (rows [i ][:0 ], rows [j ]...)
}
n -= int (skip )
}
return n , err
}
}
func (r *forwardRowSeeker ) SeekToRow (rowIndex int64 ) error {
if rowIndex >= r .index {
r .seek = rowIndex
return nil
}
return fmt .Errorf (
"SeekToRow: %T does not implement parquet.RowSeeker: cannot seek backward from row %d to %d" ,
r .rows ,
r .index ,
rowIndex ,
)
}
func CopyRows (dst RowWriter , src RowReader ) (int64 , error ) {
return copyRows (dst , src , nil )
}
func copyRows(dst RowWriter , src RowReader , buf []Row ) (written int64 , err error ) {
targetSchema := targetSchemaOf (dst )
sourceSchema := sourceSchemaOf (src )
if targetSchema != nil && sourceSchema != nil {
if !nodesAreEqual (targetSchema , sourceSchema ) {
conv , err := Convert (targetSchema , sourceSchema )
if err != nil {
return 0 , err
}
src = ConvertRowReader (src , conv )
}
}
if wt , ok := src .(RowWriterTo ); ok {
return wt .WriteRowsTo (dst )
}
if rf , ok := dst .(RowReaderFrom ); ok {
return rf .ReadRowsFrom (src )
}
if len (buf ) == 0 {
buf = make ([]Row , defaultRowBufferSize )
}
defer clearRows (buf )
for {
rn , err := src .ReadRows (buf )
if rn > 0 {
wn , err := dst .WriteRows (buf [:rn ])
if err != nil {
return written , err
}
written += int64 (wn )
}
if err != nil {
if errors .Is (err , io .EOF ) {
err = nil
}
return written , err
}
if rn == 0 {
return written , io .ErrNoProgress
}
}
}
func makeRows(n int ) []Row {
buf := make ([]Value , n )
row := make ([]Row , n )
for i := range row {
row [i ] = buf [i : i : i +1 ]
}
return row
}
func clearRows(rows []Row ) {
for i , values := range rows {
clearValues (values )
rows [i ] = values [:0 ]
}
}
func sourceSchemaOf(r RowReader ) *Schema {
if rrs , ok := r .(RowReaderWithSchema ); ok {
return rrs .Schema ()
}
return nil
}
func targetSchemaOf(w RowWriter ) *Schema {
if rws , ok := w .(RowWriterWithSchema ); ok {
return rws .Schema ()
}
return nil
}
type levels struct {
repetitionDepth byte
repetitionLevel byte
definitionLevel byte
}
type deconstructFunc func ([][]Value , levels , reflect .Value )
func deconstructFuncOf(columnIndex int16 , node Node ) (int16 , deconstructFunc ) {
switch {
case node .Optional ():
return deconstructFuncOfOptional (columnIndex , node )
case node .Repeated ():
return deconstructFuncOfRepeated (columnIndex , node )
case isList (node ):
return deconstructFuncOfList (columnIndex , node )
case isMap (node ):
return deconstructFuncOfMap (columnIndex , node )
default :
return deconstructFuncOfRequired (columnIndex , node )
}
}
func deconstructFuncOfOptional(columnIndex int16 , node Node ) (int16 , deconstructFunc ) {
columnIndex , deconstruct := deconstructFuncOf (columnIndex , Required (node ))
return columnIndex , func (columns [][]Value , levels levels , value reflect .Value ) {
if value .IsValid () {
if value .IsZero () {
value = reflect .Value {}
} else {
if value .Kind () == reflect .Ptr {
value = value .Elem ()
}
levels .definitionLevel ++
}
}
deconstruct (columns , levels , value )
}
}
func deconstructFuncOfRepeated(columnIndex int16 , node Node ) (int16 , deconstructFunc ) {
columnIndex , deconstruct := deconstructFuncOf (columnIndex , Required (node ))
return columnIndex , func (columns [][]Value , levels levels , value reflect .Value ) {
if value .Kind () == reflect .Interface {
value = value .Elem ()
}
if !value .IsValid () || value .Len () == 0 {
deconstruct (columns , levels , reflect .Value {})
return
}
levels .repetitionDepth ++
levels .definitionLevel ++
for i , n := 0 , value .Len (); i < n ; i ++ {
deconstruct (columns , levels , value .Index (i ))
levels .repetitionLevel = levels .repetitionDepth
}
}
}
func deconstructFuncOfRequired(columnIndex int16 , node Node ) (int16 , deconstructFunc ) {
switch {
case node .Leaf ():
return deconstructFuncOfLeaf (columnIndex , node )
default :
return deconstructFuncOfGroup (columnIndex , node )
}
}
func deconstructFuncOfList(columnIndex int16 , node Node ) (int16 , deconstructFunc ) {
return deconstructFuncOf (columnIndex , Repeated (listElementOf (node )))
}
func deconstructFuncOfMap(columnIndex int16 , node Node ) (int16 , deconstructFunc ) {
keyValue := mapKeyValueOf (node )
keyValueType := keyValue .GoType ()
keyValueElem := keyValueType .Elem ()
keyType := keyValueElem .Field (0 ).Type
valueType := keyValueElem .Field (1 ).Type
nextColumnIndex , deconstruct := deconstructFuncOf (columnIndex , schemaOf (keyValueElem ))
return nextColumnIndex , func (columns [][]Value , levels levels , mapValue reflect .Value ) {
if !mapValue .IsValid () || mapValue .Len () == 0 {
deconstruct (columns , levels , reflect .Value {})
return
}
levels .repetitionDepth ++
levels .definitionLevel ++
elem := reflect .New (keyValueElem ).Elem ()
k := elem .Field (0 )
v := elem .Field (1 )
for _ , key := range mapValue .MapKeys () {
k .Set (key .Convert (keyType ))
v .Set (mapValue .MapIndex (key ).Convert (valueType ))
deconstruct (columns , levels , elem )
levels .repetitionLevel = levels .repetitionDepth
}
}
}
func deconstructFuncOfGroup(columnIndex int16 , node Node ) (int16 , deconstructFunc ) {
fields := node .Fields ()
funcs := make ([]deconstructFunc , len (fields ))
for i , field := range fields {
columnIndex , funcs [i ] = deconstructFuncOf (columnIndex , field )
}
return columnIndex , func (columns [][]Value , levels levels , value reflect .Value ) {
if value .IsValid () {
for i , f := range funcs {
f (columns , levels , fields [i ].Value (value ))
}
} else {
for _ , f := range funcs {
f (columns , levels , value )
}
}
}
}
func deconstructFuncOfLeaf(columnIndex int16 , node Node ) (int16 , deconstructFunc ) {
if columnIndex > MaxColumnIndex {
panic ("row cannot be deconstructed because it has more than 127 columns" )
}
typ := node .Type ()
kind := typ .Kind ()
lt := typ .LogicalType ()
valueColumnIndex := ^columnIndex
return columnIndex + 1 , func (columns [][]Value , levels levels , value reflect .Value ) {
v := Value {}
if value .IsValid () {
v = makeValue (kind , lt , value )
}
v .repetitionLevel = levels .repetitionLevel
v .definitionLevel = levels .definitionLevel
v .columnIndex = valueColumnIndex
columns [columnIndex ] = append (columns [columnIndex ], v )
}
}
type reconstructFunc func (reflect .Value , levels , [][]Value ) error
func reconstructFuncOf(columnIndex int16 , node Node ) (int16 , reconstructFunc ) {
switch {
case node .Optional ():
return reconstructFuncOfOptional (columnIndex , node )
case node .Repeated ():
return reconstructFuncOfRepeated (columnIndex , node )
case isList (node ):
return reconstructFuncOfList (columnIndex , node )
case isMap (node ):
return reconstructFuncOfMap (columnIndex , node )
default :
return reconstructFuncOfRequired (columnIndex , node )
}
}
func reconstructFuncOfOptional(columnIndex int16 , node Node ) (int16 , reconstructFunc ) {
nextColumnIndex , reconstruct := reconstructFuncOf (columnIndex , Required (node ))
return nextColumnIndex , func (value reflect .Value , levels levels , columns [][]Value ) error {
levels .definitionLevel ++
if columns [0 ][0 ].definitionLevel < levels .definitionLevel {
value .Set (reflect .Zero (value .Type ()))
return nil
}
if value .Kind () == reflect .Ptr {
if value .IsNil () {
value .Set (reflect .New (value .Type ().Elem ()))
}
value = value .Elem ()
}
return reconstruct (value , levels , columns )
}
}
func setMakeSlice(v reflect .Value , n int ) reflect .Value {
t := v .Type ()
if t .Kind () == reflect .Interface {
t = reflect .TypeOf (([]interface {})(nil ))
}
s := reflect .MakeSlice (t , n , n )
v .Set (s )
return s
}
func reconstructFuncOfRepeated(columnIndex int16 , node Node ) (int16 , reconstructFunc ) {
nextColumnIndex , reconstruct := reconstructFuncOf (columnIndex , Required (node ))
return nextColumnIndex , func (value reflect .Value , levels levels , columns [][]Value ) error {
levels .repetitionDepth ++
levels .definitionLevel ++
if columns [0 ][0 ].definitionLevel < levels .definitionLevel {
setMakeSlice (value , 0 )
return nil
}
values := make ([][]Value , len (columns ))
column := columns [0 ]
n := 0
for i , column := range columns {
values [i ] = column [0 :0 :len (column )]
}
for i := 0 ; i < len (column ); {
i ++
n ++
for i < len (column ) && column [i ].repetitionLevel > levels .repetitionDepth {
i ++
}
}
value = setMakeSlice (value , n )
for i := 0 ; i < n ; i ++ {
for j , column := range values {
column = column [:cap (column )]
if len (column ) == 0 {
continue
}
k := 1
for k < len (column ) && column [k ].repetitionLevel > levels .repetitionDepth {
k ++
}
values [j ] = column [:k ]
}
if err := reconstruct (value .Index (i ), levels , values ); err != nil {
return err
}
for j , column := range values {
values [j ] = column [len (column ):len (column ):cap (column )]
}
levels .repetitionLevel = levels .repetitionDepth
}
return nil
}
}
func reconstructFuncOfRequired(columnIndex int16 , node Node ) (int16 , reconstructFunc ) {
switch {
case node .Leaf ():
return reconstructFuncOfLeaf (columnIndex , node )
default :
return reconstructFuncOfGroup (columnIndex , node )
}
}
func reconstructFuncOfList(columnIndex int16 , node Node ) (int16 , reconstructFunc ) {
return reconstructFuncOf (columnIndex , Repeated (listElementOf (node )))
}
func reconstructFuncOfMap(columnIndex int16 , node Node ) (int16 , reconstructFunc ) {
keyValue := mapKeyValueOf (node )
keyValueType := keyValue .GoType ()
keyValueElem := keyValueType .Elem ()
keyValueZero := reflect .Zero (keyValueElem )
nextColumnIndex , reconstruct := reconstructFuncOf (columnIndex , schemaOf (keyValueElem ))
return nextColumnIndex , func (value reflect .Value , levels levels , columns [][]Value ) error {
levels .repetitionDepth ++
levels .definitionLevel ++
if columns [0 ][0 ].definitionLevel < levels .definitionLevel {
value .Set (reflect .MakeMap (value .Type ()))
return nil
}
values := make ([][]Value , len (columns ))
column := columns [0 ]
t := value .Type ()
if t .Kind () == reflect .Interface {
t = reflect .TypeOf ((map [string ]any )(nil ))
}
k := t .Key ()
v := t .Elem ()
n := 0
for i , column := range columns {
values [i ] = column [0 :0 :len (column )]
}
for i := 0 ; i < len (column ); {
i ++
n ++
for i < len (column ) && column [i ].repetitionLevel > levels .repetitionDepth {
i ++
}
}
if value .IsNil () {
m := reflect .MakeMapWithSize (t , n )
value .Set (m )
value = m
}
elem := reflect .New (keyValueElem ).Elem ()
for i := 0 ; i < n ; i ++ {
for j , column := range values {
column = column [:cap (column )]
k := 1
for k < len (column ) && column [k ].repetitionLevel > levels .repetitionDepth {
k ++
}
values [j ] = column [:k ]
}
if err := reconstruct (elem , levels , values ); err != nil {
return err
}
for j , column := range values {
values [j ] = column [len (column ):len (column ):cap (column )]
}
value .SetMapIndex (elem .Field (0 ).Convert (k ), elem .Field (1 ).Convert (v ))
elem .Set (keyValueZero )
levels .repetitionLevel = levels .repetitionDepth
}
return nil
}
}
func reconstructFuncOfGroup(columnIndex int16 , node Node ) (int16 , reconstructFunc ) {
fields := node .Fields ()
funcs := make ([]reconstructFunc , len (fields ))
columnOffsets := make ([]int16 , len (fields ))
firstColumnIndex := columnIndex
for i , field := range fields {
columnIndex , funcs [i ] = reconstructFuncOf (columnIndex , field )
columnOffsets [i ] = columnIndex - firstColumnIndex
}
return columnIndex , func (value reflect .Value , levels levels , columns [][]Value ) error {
if value .Kind () == reflect .Interface {
value .Set (reflect .MakeMap (reflect .TypeOf ((map [string ]interface {})(nil ))))
value = value .Elem ()
}
if value .Kind () == reflect .Map {
elemType := value .Type ().Elem ()
name := reflect .New (reflect .TypeOf ("" )).Elem ()
elem := reflect .New (elemType ).Elem ()
zero := reflect .Zero (elemType )
if value .Len () > 0 {
value .Set (reflect .MakeMap (value .Type ()))
}
off := int16 (0 )
for i , f := range funcs {
name .SetString (fields [i ].Name ())
end := columnOffsets [i ]
err := f (elem , levels , columns [off :end :end ])
if err != nil {
return fmt .Errorf ("%s → %w" , name , err )
}
off = end
value .SetMapIndex (name , elem )
elem .Set (zero )
}
} else {
off := int16 (0 )
for i , f := range funcs {
end := columnOffsets [i ]
err := f (fields [i ].Value (value ), levels , columns [off :end :end ])
if err != nil {
return fmt .Errorf ("%s → %w" , fields [i ].Name (), err )
}
off = end
}
}
return nil
}
}
func reconstructFuncOfLeaf(columnIndex int16 , node Node ) (int16 , reconstructFunc ) {
typ := node .Type ()
return columnIndex + 1 , func (value reflect .Value , _ levels , columns [][]Value ) error {
column := columns [0 ]
if len (column ) == 0 {
return fmt .Errorf ("no values found in parquet row for column %d" , columnIndex )
}
return typ .AssignValue (value , column [0 ])
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .