package parquet
import (
"fmt"
"math"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/parquet-go/parquet-go/compress"
"github.com/parquet-go/parquet-go/deprecated"
"github.com/parquet-go/parquet-go/encoding"
)
type Schema struct {
name string
root Node
deconstruct deconstructFunc
reconstruct reconstructFunc
mapping columnMapping
columns [][]string
}
func SchemaOf (model interface {}) *Schema {
return schemaOf (dereference (reflect .TypeOf (model )))
}
var cachedSchemas sync .Map
func schemaOf(model reflect .Type ) *Schema {
cached , _ := cachedSchemas .Load (model )
schema , _ := cached .(*Schema )
if schema != nil {
return schema
}
if model .Kind () != reflect .Struct {
panic ("cannot construct parquet schema from value of type " + model .String ())
}
schema = NewSchema (model .Name (), nodeOf (model , nil ))
if actual , loaded := cachedSchemas .LoadOrStore (model , schema ); loaded {
schema = actual .(*Schema )
}
return schema
}
func NewSchema (name string , root Node ) *Schema {
mapping , columns := columnMappingOf (root )
return &Schema {
name : name ,
root : root ,
deconstruct : makeDeconstructFunc (root ),
reconstruct : makeReconstructFunc (root ),
mapping : mapping ,
columns : columns ,
}
}
func dereference(t reflect .Type ) reflect .Type {
for t .Kind () == reflect .Ptr {
t = t .Elem ()
}
return t
}
func makeDeconstructFunc(node Node ) (deconstruct deconstructFunc ) {
if schema , _ := node .(*Schema ); schema != nil {
return schema .deconstruct
}
if !node .Leaf () {
_, deconstruct = deconstructFuncOf (0 , node )
}
return deconstruct
}
func makeReconstructFunc(node Node ) (reconstruct reconstructFunc ) {
if schema , _ := node .(*Schema ); schema != nil {
return schema .reconstruct
}
if !node .Leaf () {
_, reconstruct = reconstructFuncOf (0 , node )
}
return reconstruct
}
func (s *Schema ) ConfigureRowGroup (config *RowGroupConfig ) { config .Schema = s }
func (s *Schema ) ConfigureReader (config *ReaderConfig ) { config .Schema = s }
func (s *Schema ) ConfigureWriter (config *WriterConfig ) { config .Schema = s }
func (s *Schema ) ID () int { return s .root .ID () }
func (s *Schema ) String () string { return sprint (s .name , s .root ) }
func (s *Schema ) Name () string { return s .name }
func (s *Schema ) Type () Type { return s .root .Type () }
func (s *Schema ) Optional () bool { return s .root .Optional () }
func (s *Schema ) Repeated () bool { return s .root .Repeated () }
func (s *Schema ) Required () bool { return s .root .Required () }
func (s *Schema ) Leaf () bool { return s .root .Leaf () }
func (s *Schema ) Fields () []Field { return s .root .Fields () }
func (s *Schema ) Encoding () encoding .Encoding { return s .root .Encoding () }
func (s *Schema ) Compression () compress .Codec { return s .root .Compression () }
func (s *Schema ) GoType () reflect .Type { return s .root .GoType () }
func (s *Schema ) Deconstruct (row Row , value interface {}) Row {
columns := make ([][]Value , len (s .columns ))
values := make ([]Value , len (s .columns ))
for i := range columns {
columns [i ] = values [i : i : i +1 ]
}
s .deconstructValueToColumns (columns , reflect .ValueOf (value ))
return appendRow (row , columns )
}
func (s *Schema ) deconstructValueToColumns (columns [][]Value , value reflect .Value ) {
for value .Kind () == reflect .Ptr || value .Kind () == reflect .Interface {
if value .IsNil () {
value = reflect .Value {}
break
}
value = value .Elem ()
}
s .deconstruct (columns , levels {}, value )
}
func (s *Schema ) Reconstruct (value interface {}, row Row ) error {
v := reflect .ValueOf (value )
if !v .IsValid () {
panic ("cannot reconstruct row into go value of type <nil>" )
}
if v .Kind () != reflect .Ptr {
panic ("cannot reconstruct row into go value of non-pointer type " + v .Type ().String ())
}
if v .IsNil () {
panic ("cannot reconstruct row into nil pointer of type " + v .Type ().String ())
}
for v .Kind () == reflect .Ptr {
if v .IsNil () {
v .Set (reflect .New (v .Type ().Elem ()))
}
v = v .Elem ()
}
b := valuesSliceBufferPool .Get ().(*valuesSliceBuffer )
columns := b .reserve (len (s .columns ))
row .Range (func (columnIndex int , columnValues []Value ) bool {
if columnIndex < len (columns ) {
columns [columnIndex ] = columnValues
}
return true
})
err := s .reconstruct (v , levels {}, columns )
b .release ()
return err
}
type valuesSliceBuffer struct {
values [][]Value
}
func (v *valuesSliceBuffer ) reserve (n int ) [][]Value {
if n <= cap (v .values ) {
return v .values [:n ]
}
v .values = make ([][]Value , n )
return v .values
}
func (v *valuesSliceBuffer ) release () {
v .values = v .values [:0 ]
valuesSliceBufferPool .Put (v )
}
var valuesSliceBufferPool = &sync .Pool {
New : func () interface {} {
return &valuesSliceBuffer {
values : make ([][]Value , 0 , 64 ),
}
},
}
func (s *Schema ) Lookup (path ...string ) (LeafColumn , bool ) {
leaf := s .mapping .lookup (path )
return LeafColumn {
Node : leaf .node ,
Path : leaf .path ,
ColumnIndex : int (leaf .columnIndex ),
MaxRepetitionLevel : int (leaf .maxRepetitionLevel ),
MaxDefinitionLevel : int (leaf .maxDefinitionLevel ),
}, leaf .node != nil
}
func (s *Schema ) Columns () [][]string {
return s .columns
}
func (s *Schema ) Comparator (sortingColumns ...SortingColumn ) func (Row , Row ) int {
return compareRowsFuncOf (s , sortingColumns )
}
func (s *Schema ) forEachNode (do func (name string , node Node )) {
forEachNodeOf (s .Name (), s , do )
}
type structNode struct {
gotype reflect .Type
fields []structField
}
func structNodeOf(t reflect .Type ) *structNode {
fields := structFieldsOf (t )
s := &structNode {
gotype : t ,
fields : make ([]structField , len (fields )),
}
for i := range fields {
field := structField {name : fields [i ].Name , index : fields [i ].Index }
field .Node = makeNodeOf (fields [i ].Type , fields [i ].Name , []string {
fields [i ].Tag .Get ("parquet" ),
fields [i ].Tag .Get ("parquet-key" ),
fields [i ].Tag .Get ("parquet-value" ),
})
s .fields [i ] = field
}
return s
}
func structFieldsOf(t reflect .Type ) []reflect .StructField {
fields := appendStructFields (t , nil , nil , 0 )
for i := range fields {
f := &fields [i ]
if tag := f .Tag .Get ("parquet" ); tag != "" {
name , _ := split (tag )
if name != "" {
f .Name = name
}
}
}
return fields
}
func appendStructFields(t reflect .Type , fields []reflect .StructField , index []int , offset uintptr ) []reflect .StructField {
for i , n := 0 , t .NumField (); i < n ; i ++ {
f := t .Field (i )
if tag := f .Tag .Get ("parquet" ); tag != "" {
name , _ := split (tag )
if tag != "-," && name == "-" {
continue
}
}
fieldIndex := index [:len (index ):len (index )]
fieldIndex = append (fieldIndex , i )
f .Offset += offset
if f .Anonymous {
fields = appendStructFields (f .Type , fields , fieldIndex , f .Offset )
} else if f .IsExported () {
f .Index = fieldIndex
fields = append (fields , f )
}
}
return fields
}
func (s *structNode ) Optional () bool { return false }
func (s *structNode ) Repeated () bool { return false }
func (s *structNode ) Required () bool { return true }
func (s *structNode ) Leaf () bool { return false }
func (s *structNode ) Encoding () encoding .Encoding { return nil }
func (s *structNode ) Compression () compress .Codec { return nil }
func (s *structNode ) GoType () reflect .Type { return s .gotype }
func (s *structNode ) ID () int { return 0 }
func (s *structNode ) String () string { return sprint ("" , s ) }
func (s *structNode ) Type () Type { return groupType {} }
func (s *structNode ) Fields () []Field {
fields := make ([]Field , len (s .fields ))
for i := range s .fields {
fields [i ] = &s .fields [i ]
}
return fields
}
func fieldByIndex(v reflect .Value , index []int ) reflect .Value {
for _ , i := range index {
if v = v .Field (i ); v .Kind () == reflect .Ptr || v .Kind () == reflect .Interface {
if v .IsNil () {
v .Set (reflect .New (v .Type ().Elem ()))
v = v .Elem ()
break
} else {
v = v .Elem ()
}
}
}
return v
}
type structField struct {
Node
name string
index []int
}
func (f *structField ) Name () string { return f .name }
func (f *structField ) Value (base reflect .Value ) reflect .Value {
switch base .Kind () {
case reflect .Map :
return base .MapIndex (reflect .ValueOf (&f .name ).Elem ())
case reflect .Ptr :
if base .IsNil () {
base .Set (reflect .New (base .Type ().Elem ()))
}
return fieldByIndex (base .Elem (), f .index )
default :
if len (f .index ) == 1 {
return base .Field (f .index [0 ])
} else {
return fieldByIndex (base , f .index )
}
}
}
func nodeString(t reflect .Type , name string , tag ...string ) string {
return fmt .Sprintf ("%s %s %v" , name , t .String (), tag )
}
func throwInvalidTag(t reflect .Type , name string , tag string ) {
panic (tag + " is an invalid parquet tag: " + nodeString (t , name , tag ))
}
func throwUnknownTag(t reflect .Type , name string , tag string ) {
panic (tag + " is an unrecognized parquet tag: " + nodeString (t , name , tag ))
}
func throwInvalidNode(t reflect .Type , msg , name string , tag ...string ) {
panic (msg + ": " + nodeString (t , name , tag ...))
}
func decimalFixedLenByteArraySize(precision int ) int {
return int (math .Ceil ((math .Log10 (2 ) + float64 (precision )) / math .Log10 (256 )))
}
func forEachStructTagOption(sf reflect .StructField , do func (t reflect .Type , option , args string )) {
if tag := sf .Tag .Get ("parquet" ); tag != "" {
_, tag = split (tag )
for tag != "" {
option := ""
args := ""
option , tag = split (tag )
option , args = splitOptionArgs (option )
ft := sf .Type
if ft .Kind () == reflect .Ptr {
ft = ft .Elem ()
}
do (ft , option , args )
}
}
}
func nodeOf(t reflect .Type , tag []string ) Node {
switch t {
case reflect .TypeOf (deprecated .Int96 {}):
return Leaf (Int96Type )
case reflect .TypeOf (uuid .UUID {}):
return UUID ()
case reflect .TypeOf (time .Time {}):
return Timestamp (Nanosecond )
}
var n Node
switch t .Kind () {
case reflect .Bool :
n = Leaf (BooleanType )
case reflect .Int , reflect .Int64 :
n = Int (64 )
case reflect .Int8 , reflect .Int16 , reflect .Int32 :
n = Int (t .Bits ())
case reflect .Uint , reflect .Uintptr , reflect .Uint64 :
n = Uint (64 )
case reflect .Uint8 , reflect .Uint16 , reflect .Uint32 :
n = Uint (t .Bits ())
case reflect .Float32 :
n = Leaf (FloatType )
case reflect .Float64 :
n = Leaf (DoubleType )
case reflect .String :
n = String ()
case reflect .Ptr :
n = Optional (nodeOf (t .Elem (), nil ))
case reflect .Slice :
if elem := t .Elem (); elem .Kind () == reflect .Uint8 {
n = Leaf (ByteArrayType )
} else {
n = Repeated (nodeOf (elem , nil ))
}
case reflect .Array :
if t .Elem ().Kind () == reflect .Uint8 {
n = Leaf (FixedLenByteArrayType (t .Len ()))
}
case reflect .Map :
var mapTag , valueTag , keyTag string
if len (tag ) > 0 {
mapTag = tag [0 ]
if len (tag ) > 1 {
keyTag = tag [1 ]
}
if len (tag ) >= 2 {
valueTag = tag [2 ]
}
}
if strings .Contains (mapTag , "json" ) {
n = JSON ()
} else {
n = Map (
makeNodeOf (t .Key (), t .Name (), []string {keyTag }),
makeNodeOf (t .Elem (), t .Name (), []string {valueTag }),
)
}
forEachTagOption ([]string {mapTag }, func (option , args string ) {
switch option {
case "" , "json" :
return
case "optional" :
n = Optional (n )
case "id" :
id , err := parseIDArgs (args )
if err != nil {
throwInvalidTag (t , "map" , option )
}
n = FieldID (n , id )
default :
throwUnknownTag (t , "map" , option )
}
})
case reflect .Struct :
return structNodeOf (t )
}
if n == nil {
panic ("cannot create parquet node from go value of type " + t .String ())
}
return &goNode {Node : n , gotype : t }
}
func split(s string ) (head , tail string ) {
if i := strings .IndexByte (s , ',' ); i < 0 {
head = s
} else {
head , tail = s [:i ], s [i +1 :]
}
return
}
func splitOptionArgs(s string ) (option , args string ) {
if i := strings .IndexByte (s , '(' ); i >= 0 {
option = s [:i ]
args = s [i :]
} else {
option = s
args = "()"
}
return
}
func parseDecimalArgs(args string ) (scale , precision int , err error ) {
if !strings .HasPrefix (args , "(" ) || !strings .HasSuffix (args , ")" ) {
return 0 , 0 , fmt .Errorf ("malformed decimal args: %s" , args )
}
args = strings .TrimPrefix (args , "(" )
args = strings .TrimSuffix (args , ")" )
parts := strings .Split (args , ":" )
if len (parts ) != 2 {
return 0 , 0 , fmt .Errorf ("malformed decimal args: (%s)" , args )
}
s , err := strconv .ParseInt (parts [0 ], 10 , 32 )
if err != nil {
return 0 , 0 , err
}
p , err := strconv .ParseInt (parts [1 ], 10 , 32 )
if err != nil {
return 0 , 0 , err
}
return int (s ), int (p ), nil
}
func parseIDArgs(args string ) (int , error ) {
if !strings .HasPrefix (args , "(" ) || !strings .HasSuffix (args , ")" ) {
return 0 , fmt .Errorf ("malformed id args: %s" , args )
}
args = strings .TrimPrefix (args , "(" )
args = strings .TrimSuffix (args , ")" )
return strconv .Atoi (args )
}
func parseTimestampArgs(args string ) (TimeUnit , error ) {
if !strings .HasPrefix (args , "(" ) || !strings .HasSuffix (args , ")" ) {
return nil , fmt .Errorf ("malformed timestamp args: %s" , args )
}
args = strings .TrimPrefix (args , "(" )
args = strings .TrimSuffix (args , ")" )
if len (args ) == 0 {
return Millisecond , nil
}
switch args {
case "millisecond" :
return Millisecond , nil
case "microsecond" :
return Microsecond , nil
case "nanosecond" :
return Nanosecond , nil
default :
}
return nil , fmt .Errorf ("unknown time unit: %s" , args )
}
type goNode struct {
Node
gotype reflect .Type
}
func (n *goNode ) GoType () reflect .Type { return n .gotype }
var (
_ RowGroupOption = (*Schema )(nil )
_ ReaderOption = (*Schema )(nil )
_ WriterOption = (*Schema )(nil )
)
func makeNodeOf(t reflect .Type , name string , tag []string ) Node {
var (
node Node
optional bool
list bool
encoded encoding .Encoding
compressed compress .Codec
fieldID int
)
setNode := func (n Node ) {
if node != nil {
throwInvalidNode (t , "struct field has multiple logical parquet types declared" , name , tag ...)
}
node = n
}
setOptional := func () {
if optional {
throwInvalidNode (t , "struct field has multiple declaration of the optional tag" , name , tag ...)
}
optional = true
}
setList := func () {
if list {
throwInvalidNode (t , "struct field has multiple declaration of the list tag" , name , tag ...)
}
list = true
}
setEncoding := func (e encoding .Encoding ) {
if encoded != nil {
throwInvalidNode (t , "struct field has encoding declared multiple time" , name , tag ...)
}
encoded = e
}
setCompression := func (c compress .Codec ) {
if compressed != nil {
throwInvalidNode (t , "struct field has compression codecs declared multiple times" , name , tag ...)
}
compressed = c
}
forEachTagOption (tag , func (option , args string ) {
if t .Kind () == reflect .Map {
node = nodeOf (t , tag )
return
}
switch option {
case "" :
return
case "optional" :
setOptional ()
case "snappy" :
setCompression (&Snappy )
case "gzip" :
setCompression (&Gzip )
case "brotli" :
setCompression (&Brotli )
case "lz4" :
setCompression (&Lz4Raw )
case "zstd" :
setCompression (&Zstd )
case "uncompressed" :
setCompression (&Uncompressed )
case "plain" :
setEncoding (&Plain )
case "dict" :
setEncoding (&RLEDictionary )
case "json" :
setNode (JSON ())
case "delta" :
switch t .Kind () {
case reflect .Int , reflect .Int32 , reflect .Int64 , reflect .Uint , reflect .Uint32 , reflect .Uint64 :
setEncoding (&DeltaBinaryPacked )
case reflect .String :
setEncoding (&DeltaByteArray )
case reflect .Slice :
if t .Elem ().Kind () == reflect .Uint8 {
setEncoding (&DeltaByteArray )
} else {
throwInvalidTag (t , name , option )
}
case reflect .Array :
if t .Elem ().Kind () == reflect .Uint8 {
setEncoding (&DeltaByteArray )
} else {
throwInvalidTag (t , name , option )
}
default :
switch t {
case reflect .TypeOf (time .Time {}):
setEncoding (&DeltaBinaryPacked )
default :
throwInvalidTag (t , name , option )
}
}
case "split" :
switch t .Kind () {
case reflect .Float32 , reflect .Float64 :
setEncoding (&ByteStreamSplit )
default :
throwInvalidTag (t , name , option )
}
case "list" :
switch t .Kind () {
case reflect .Slice :
element := nodeOf (t .Elem (), nil )
setNode (element )
setList ()
default :
throwInvalidTag (t , name , option )
}
case "enum" :
switch t .Kind () {
case reflect .String :
setNode (Enum ())
default :
throwInvalidTag (t , name , option )
}
case "uuid" :
switch t .Kind () {
case reflect .Array :
if t .Elem ().Kind () != reflect .Uint8 || t .Len () != 16 {
throwInvalidTag (t , name , option )
}
default :
throwInvalidTag (t , name , option )
}
case "decimal" :
scale , precision , err := parseDecimalArgs (args )
if err != nil {
throwInvalidTag (t , name , option +args )
}
var baseType Type
switch t .Kind () {
case reflect .Int32 :
baseType = Int32Type
case reflect .Int64 :
baseType = Int64Type
case reflect .Array , reflect .Slice :
baseType = FixedLenByteArrayType (decimalFixedLenByteArraySize (precision ))
default :
throwInvalidTag (t , name , option )
}
setNode (Decimal (scale , precision , baseType ))
case "date" :
switch t .Kind () {
case reflect .Int32 :
setNode (Date ())
default :
throwInvalidTag (t , name , option )
}
case "timestamp" :
switch t .Kind () {
case reflect .Int64 :
timeUnit , err := parseTimestampArgs (args )
if err != nil {
throwInvalidTag (t , name , option )
}
setNode (Timestamp (timeUnit ))
default :
switch t {
case reflect .TypeOf (time .Time {}):
timeUnit , err := parseTimestampArgs (args )
if err != nil {
throwInvalidTag (t , name , option )
}
setNode (Timestamp (timeUnit ))
default :
throwInvalidTag (t , name , option )
}
}
case "id" :
id , err := parseIDArgs (args )
if err != nil {
throwInvalidNode (t , "struct field has field id that is not a valid int" , name , tag ...)
}
fieldID = id
}
})
if node == nil && t .Kind () == reflect .Slice {
isUint8 := t .Elem ().Kind () == reflect .Uint8
if optional && !isUint8 {
node = Repeated (Optional (nodeOf (t .Elem (), tag )))
optional = false
}
}
if node == nil {
node = nodeOf (t , tag )
}
if compressed != nil {
node = Compressed (node , compressed )
}
if encoded != nil {
node = Encoded (node , encoded )
}
if list {
node = List (node )
}
if node .Repeated () && !list {
repeated := node .GoType ().Elem ()
if repeated .Kind () == reflect .Slice {
if repeated .Elem ().Kind () != reflect .Uint8 {
panic ("unhandled nested slice on parquet schema without list tag" )
}
}
}
if optional {
node = Optional (node )
}
if fieldID != 0 {
node = FieldID (node , fieldID )
}
return node
}
func forEachTagOption(tags []string , do func (option , args string )) {
for _ , tag := range tags {
_, tag = split (tag )
for tag != "" {
option := ""
option , tag = split (tag )
var args string
option , args = splitOptionArgs (option )
do (option , args )
}
}
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .