package schema
import (
"fmt"
"reflect"
"strconv"
"strings"
"github.com/apache/arrow-go/v18/arrow/float16"
"github.com/apache/arrow-go/v18/internal/utils"
"github.com/apache/arrow-go/v18/parquet"
format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
)
type taggedInfo struct {
Name string
Type parquet .Type
KeyType parquet .Type
ValueType parquet .Type
Length int32
KeyLength int32
ValueLength int32
Scale int32
KeyScale int32
ValueScale int32
Precision int32
KeyPrecision int32
ValuePrecision int32
FieldID int32
KeyFieldID int32
ValueFieldID int32
RepetitionType parquet .Repetition
ValueRepetition parquet .Repetition
Converted ConvertedType
KeyConverted ConvertedType
ValueConverted ConvertedType
LogicalFields map [string ]string
KeyLogicalFields map [string ]string
ValueLogicalFields map [string ]string
LogicalType LogicalType
KeyLogicalType LogicalType
ValueLogicalType LogicalType
Exclude bool
}
func (t *taggedInfo ) CopyForKey () (ret taggedInfo ) {
ret = *t
ret .Type = t .KeyType
ret .Length = t .KeyLength
ret .Scale = t .KeyScale
ret .Precision = t .KeyPrecision
ret .FieldID = t .KeyFieldID
ret .RepetitionType = parquet .Repetitions .Required
ret .Converted = t .KeyConverted
ret .LogicalType = t .KeyLogicalType
return
}
func (t *taggedInfo ) CopyForValue () (ret taggedInfo ) {
ret = *t
ret .Type = t .ValueType
ret .Length = t .ValueLength
ret .Scale = t .ValueScale
ret .Precision = t .ValuePrecision
ret .FieldID = t .ValueFieldID
ret .RepetitionType = t .ValueRepetition
ret .Converted = t .ValueConverted
ret .LogicalType = t .ValueLogicalType
return
}
func (t *taggedInfo ) UpdateLogicalTypes () {
processLogicalType := func (fields map [string ]string , precision , scale int32 ) LogicalType {
t , ok := fields ["type" ]
if !ok {
return NoLogicalType {}
}
switch strings .ToLower (t ) {
case "string" :
return StringLogicalType {}
case "map" :
return MapLogicalType {}
case "list" :
return ListLogicalType {}
case "enum" :
return EnumLogicalType {}
case "decimal" :
if v , ok := fields ["precision" ]; ok {
precision = int32FromType (v )
}
if v , ok := fields ["scale" ]; ok {
scale = int32FromType (v )
}
return NewDecimalLogicalType (precision , scale )
case "date" :
return DateLogicalType {}
case "time" :
unit , ok := fields ["unit" ]
if !ok {
panic ("must specify unit for time logical type" )
}
adjustedToUtc , ok := fields ["isadjustedutc" ]
if !ok {
adjustedToUtc = "true"
}
return NewTimeLogicalType (boolFromStr (adjustedToUtc ), timeUnitFromString (strings .ToLower (unit )))
case "timestamp" :
unit , ok := fields ["unit" ]
if !ok {
panic ("must specify unit for time logical type" )
}
adjustedToUtc , ok := fields ["isadjustedutc" ]
if !ok {
adjustedToUtc = "true"
}
return NewTimestampLogicalType (boolFromStr (adjustedToUtc ), timeUnitFromString (unit ))
case "integer" :
width , ok := fields ["bitwidth" ]
if !ok {
panic ("must specify bitwidth if explicitly setting integer logical type" )
}
signed , ok := fields ["signed" ]
if !ok {
signed = "true"
}
return NewIntLogicalType (int8 (int32FromType (width )), boolFromStr (signed ))
case "null" :
return NullLogicalType {}
case "json" :
return JSONLogicalType {}
case "bson" :
return BSONLogicalType {}
case "uuid" :
return UUIDLogicalType {}
case "float16" :
return Float16LogicalType {}
default :
panic (fmt .Errorf ("invalid logical type specified: %s" , t ))
}
}
t .LogicalType = processLogicalType (t .LogicalFields , t .Precision , t .Scale )
t .KeyLogicalType = processLogicalType (t .KeyLogicalFields , t .KeyPrecision , t .KeyScale )
t .ValueLogicalType = processLogicalType (t .ValueLogicalFields , t .ValuePrecision , t .ValueScale )
}
func newTaggedInfo() taggedInfo {
return taggedInfo {
Type : parquet .Types .Undefined ,
KeyType : parquet .Types .Undefined ,
ValueType : parquet .Types .Undefined ,
RepetitionType : parquet .Repetitions .Undefined ,
ValueRepetition : parquet .Repetitions .Undefined ,
Converted : ConvertedTypes .NA ,
KeyConverted : ConvertedTypes .NA ,
ValueConverted : ConvertedTypes .NA ,
FieldID : -1 ,
KeyFieldID : -1 ,
ValueFieldID : -1 ,
LogicalFields : make (map [string ]string ),
KeyLogicalFields : make (map [string ]string ),
ValueLogicalFields : make (map [string ]string ),
LogicalType : NoLogicalType {},
KeyLogicalType : NoLogicalType {},
ValueLogicalType : NoLogicalType {},
Exclude : false ,
}
}
var int32FromType = func (v string ) int32 {
val , err := strconv .Atoi (v )
if err != nil {
panic (err )
}
return int32 (val )
}
var boolFromStr = func (v string ) bool {
val , err := strconv .ParseBool (v )
if err != nil {
panic (err )
}
return val
}
func infoFromTags(f reflect .StructTag ) *taggedInfo {
typeFromStr := func (v string ) parquet .Type {
t , err := format .TypeFromString (strings .ToUpper (v ))
if err != nil {
panic (fmt .Errorf ("invalid type specified: %s" , v ))
}
return parquet .Type (t )
}
repFromStr := func (v string ) parquet .Repetition {
r , err := format .FieldRepetitionTypeFromString (strings .ToUpper (v ))
if err != nil {
panic (err )
}
return parquet .Repetition (r )
}
convertedFromStr := func (v string ) ConvertedType {
c , err := format .ConvertedTypeFromString (strings .ToUpper (v ))
if err != nil {
panic (err )
}
return ConvertedType (c )
}
if ptags , ok := f .Lookup ("parquet" ); ok {
info := newTaggedInfo ()
if ptags == "-" {
info .Exclude = true
return &info
}
for _ , tag := range strings .Split (strings .ReplaceAll (ptags , "\t" , "" ), "," ) {
tag = strings .TrimSpace (tag )
kv := strings .SplitN (tag , "=" , 2 )
key := strings .TrimSpace (strings .ToLower (kv [0 ]))
value := strings .TrimSpace (kv [1 ])
switch key {
case "name" :
info .Name = value
case "type" :
info .Type = typeFromStr (value )
case "keytype" :
info .KeyType = typeFromStr (value )
case "valuetype" :
info .ValueType = typeFromStr (value )
case "length" :
info .Length = int32FromType (value )
case "keylength" :
info .KeyLength = int32FromType (value )
case "valuelength" :
info .ValueLength = int32FromType (value )
case "scale" :
info .Scale = int32FromType (value )
case "keyscale" :
info .KeyScale = int32FromType (value )
case "valuescale" :
info .ValueScale = int32FromType (value )
case "precision" :
info .Precision = int32FromType (value )
case "keyprecision" :
info .KeyPrecision = int32FromType (value )
case "valueprecision" :
info .ValuePrecision = int32FromType (value )
case "fieldid" :
info .FieldID = int32FromType (value )
case "keyfieldid" :
info .KeyFieldID = int32FromType (value )
case "valuefieldid" :
info .ValueFieldID = int32FromType (value )
case "repetition" :
info .RepetitionType = repFromStr (value )
case "valuerepetition" :
info .ValueRepetition = repFromStr (value )
case "converted" :
info .Converted = convertedFromStr (value )
case "keyconverted" :
info .KeyConverted = convertedFromStr (value )
case "valueconverted" :
info .ValueConverted = convertedFromStr (value )
case "logical" :
info .LogicalFields ["type" ] = value
case "keylogical" :
info .KeyLogicalFields ["type" ] = value
case "valuelogical" :
info .ValueLogicalFields ["type" ] = value
default :
switch {
case strings .HasPrefix (key , "logical." ):
info .LogicalFields [strings .TrimPrefix (key , "logical." )] = value
case strings .HasPrefix (key , "keylogical." ):
info .KeyLogicalFields [strings .TrimPrefix (key , "keylogical." )] = value
case strings .HasPrefix (key , "valuelogical." ):
info .ValueLogicalFields [strings .TrimPrefix (key , "valuelogical." )] = value
}
}
}
info .UpdateLogicalTypes ()
return &info
}
return nil
}
func typeToNode(name string , typ reflect .Type , repType parquet .Repetition , info *taggedInfo ) Node {
var (
converted = ConvertedTypes .None
logical LogicalType = NoLogicalType {}
fieldID = int32 (-1 )
physical = parquet .Types .Undefined
typeLen = 0
precision = 0
scale = 0
)
if info != nil {
fieldID = info .FieldID
if info .Converted != ConvertedTypes .NA {
converted = info .Converted
}
logical = info .LogicalType
physical = info .Type
typeLen = int (info .Length )
precision = int (info .Precision )
scale = int (info .Scale )
if info .Name != "" {
name = info .Name
}
if info .RepetitionType != parquet .Repetitions .Undefined {
repType = info .RepetitionType
}
}
switch typ .Kind () {
case reflect .Map :
if !logical .IsNone () && !logical .Equals (MapLogicalType {}) {
panic ("cannot set logical type to something other than map for a map" )
}
infoCopy := newTaggedInfo ()
if info != nil {
infoCopy = info .CopyForValue ()
}
value := typeToNode ("value" , typ .Elem (), parquet .Repetitions .Required , &infoCopy )
if info != nil {
infoCopy = info .CopyForKey ()
}
key := typeToNode ("key" , typ .Key (), parquet .Repetitions .Required , &infoCopy )
if key .RepetitionType () != parquet .Repetitions .Required {
panic ("key type of map must be Required" )
}
return Must (MapOf (name , key , value , repType , fieldID ))
case reflect .Struct :
if typ == reflect .TypeOf (float16 .Num {}) {
return MustPrimitive (NewPrimitiveNodeLogical (name , repType , Float16LogicalType {}, parquet .Types .FixedLenByteArray , 2 , fieldID ))
}
fields := make (FieldList , 0 )
for i := 0 ; i < typ .NumField (); i ++ {
f := typ .Field (i )
tags := infoFromTags (f .Tag )
if tags == nil || !tags .Exclude {
fields = append (fields , typeToNode (f .Name , f .Type , parquet .Repetitions .Required , tags ))
}
}
if physical != parquet .Types .Undefined {
panic ("cannot specify custom type on struct" )
}
if converted != ConvertedTypes .None {
panic ("cannot specify converted types for a struct" )
}
if !logical .IsNone () {
panic ("cannot specify logicaltype for a struct" )
}
return Must (NewGroupNode (name , repType , fields , fieldID ))
case reflect .Ptr :
return typeToNode (name , typ .Elem (), parquet .Repetitions .Optional , info )
case reflect .Array :
if typ == reflect .TypeOf (parquet .Int96 {}) {
return NewInt96Node (name , repType , fieldID )
}
if typ .Elem () == reflect .TypeOf (byte (0 )) {
if physical == parquet .Types .Undefined {
physical = parquet .Types .FixedLenByteArray
}
if typeLen == 0 {
typeLen = typ .Len ()
}
if !logical .IsNone () {
return MustPrimitive (NewPrimitiveNodeLogical (name , repType , logical , physical , typeLen , fieldID ))
}
return MustPrimitive (NewPrimitiveNodeConverted (name , repType , physical , converted , typeLen , precision , scale , fieldID ))
}
fallthrough
case reflect .Slice :
switch {
case repType == parquet .Repetitions .Repeated :
return typeToNode (name , typ .Elem (), parquet .Repetitions .Repeated , info )
case physical == parquet .Types .FixedLenByteArray || physical == parquet .Types .ByteArray :
if typ .Elem () != reflect .TypeOf (byte (0 )) {
panic ("slice with physical type ByteArray or FixedLenByteArray must be []byte" )
}
fallthrough
case typ .Elem () == reflect .TypeOf (byte (0 )):
if physical == parquet .Types .Undefined {
physical = parquet .Types .ByteArray
}
if !logical .IsNone () {
return MustPrimitive (NewPrimitiveNodeLogical (name , repType , logical , physical , typeLen , fieldID ))
}
return MustPrimitive (NewPrimitiveNodeConverted (name , repType , physical , converted , typeLen , precision , scale , fieldID ))
default :
var elemInfo *taggedInfo
if info != nil {
elemInfo = &taggedInfo {}
*elemInfo = info .CopyForValue ()
}
if !logical .IsNone () && !logical .Equals (ListLogicalType {}) {
panic ("slice must either be repeated or a List type" )
}
if converted != ConvertedTypes .None && converted != ConvertedTypes .List {
panic ("slice must either be repeated or a List type" )
}
return Must (ListOf (typeToNode (name , typ .Elem (), parquet .Repetitions .Required , elemInfo ), repType , fieldID ))
}
case reflect .String :
t := parquet .Types .ByteArray
switch physical {
case parquet .Types .Undefined , parquet .Types .ByteArray :
case parquet .Types .FixedLenByteArray :
t = parquet .Types .FixedLenByteArray
default :
panic ("string fields should be of type bytearray or fixedlenbytearray only" )
}
if !logical .IsNone () {
return MustPrimitive (NewPrimitiveNodeLogical (name , repType , logical , t , typeLen , fieldID ))
}
return MustPrimitive (NewPrimitiveNodeConverted (name , repType , t , converted , typeLen , precision , scale , fieldID ))
case reflect .Int , reflect .Int32 , reflect .Int8 , reflect .Int16 , reflect .Int64 :
ptyp := parquet .Types .Int32
if typ .Bits () == 64 {
ptyp = parquet .Types .Int64
}
if physical != parquet .Types .Undefined {
ptyp = physical
}
if !logical .IsNone () {
return MustPrimitive (NewPrimitiveNodeLogical (name , repType , logical , ptyp , typeLen , fieldID ))
}
bitwidth := int8 (typ .Bits ())
if physical != parquet .Types .Undefined {
switch ptyp {
case parquet .Types .Int32 :
bitwidth = 32
case parquet .Types .Int64 :
bitwidth = 64
}
}
if converted != ConvertedTypes .None {
return MustPrimitive (NewPrimitiveNodeConverted (name , repType , ptyp , converted , 0 , precision , scale , fieldID ))
}
return MustPrimitive (NewPrimitiveNodeLogical (name , repType , NewIntLogicalType (bitwidth , true ), ptyp , 0 , fieldID ))
case reflect .Uint , reflect .Uint32 , reflect .Uint8 , reflect .Uint16 , reflect .Uint64 :
ptyp := parquet .Types .Int32
if typ .Bits () == 64 {
ptyp = parquet .Types .Int64
}
if physical != parquet .Types .Undefined {
ptyp = physical
}
if !logical .IsNone () {
return MustPrimitive (NewPrimitiveNodeLogical (name , repType , logical , ptyp , typeLen , fieldID ))
}
bitwidth := int8 (typ .Bits ())
if physical != parquet .Types .Undefined {
switch ptyp {
case parquet .Types .Int32 :
bitwidth = 32
case parquet .Types .Int64 :
bitwidth = 64
}
}
if converted != ConvertedTypes .None {
return MustPrimitive (NewPrimitiveNodeConverted (name , repType , ptyp , converted , 0 , precision , scale , fieldID ))
}
return MustPrimitive (NewPrimitiveNodeLogical (name , repType , NewIntLogicalType (bitwidth , false ), ptyp , 0 , fieldID ))
case reflect .Bool :
if !logical .IsNone () {
return MustPrimitive (NewPrimitiveNodeLogical (name , repType , logical , parquet .Types .Boolean , typeLen , fieldID ))
}
return MustPrimitive (NewPrimitiveNodeConverted (name , repType , parquet .Types .Boolean , converted , typeLen , precision , scale , fieldID ))
case reflect .Float32 :
if !logical .IsNone () {
return MustPrimitive (NewPrimitiveNodeLogical (name , repType , logical , parquet .Types .Float , typeLen , fieldID ))
}
return MustPrimitive (NewPrimitiveNodeConverted (name , repType , parquet .Types .Float , converted , typeLen , precision , scale , fieldID ))
case reflect .Float64 :
if !logical .IsNone () {
return MustPrimitive (NewPrimitiveNodeLogical (name , repType , logical , parquet .Types .Double , typeLen , fieldID ))
}
return MustPrimitive (NewPrimitiveNodeConverted (name , repType , parquet .Types .Double , converted , typeLen , precision , scale , fieldID ))
}
return nil
}
func NewSchemaFromStruct (obj interface {}) (sc *Schema , err error ) {
ot := reflect .TypeOf (obj )
if ot .Kind () == reflect .Ptr {
ot = ot .Elem ()
}
defer func () {
if r := recover (); r != nil {
sc = nil
err = utils .FormatRecoveredError ("unknown panic" , r )
}
}()
root := typeToNode (ot .Name (), ot , parquet .Repetitions .Repeated , nil )
return NewSchema (root .(*GroupNode )), nil
}
var parquetTypeToReflect = map [parquet .Type ]reflect .Type {
parquet .Types .Boolean : reflect .TypeOf (true ),
parquet .Types .Int32 : reflect .TypeOf (int32 (0 )),
parquet .Types .Int64 : reflect .TypeOf (int64 (0 )),
parquet .Types .Float : reflect .TypeOf (float32 (0 )),
parquet .Types .Double : reflect .TypeOf (float64 (0 )),
parquet .Types .Int96 : reflect .TypeOf (parquet .Int96 {}),
parquet .Types .ByteArray : reflect .TypeOf (parquet .ByteArray {}),
parquet .Types .FixedLenByteArray : reflect .TypeOf (parquet .FixedLenByteArray {}),
}
func typeFromNode(n Node ) reflect .Type {
switch n .Type () {
case Primitive :
typ := parquetTypeToReflect [n .(*PrimitiveNode ).PhysicalType ()]
if n .LogicalType ().Equals (StringLogicalType {}) || n .ConvertedType () == ConvertedTypes .UTF8 {
typ = reflect .TypeOf (string ("" ))
}
if n .RepetitionType () == parquet .Repetitions .Optional {
typ = reflect .PointerTo (typ )
} else if n .RepetitionType () == parquet .Repetitions .Repeated {
typ = reflect .SliceOf (typ )
}
return typ
case Group :
gnode := n .(*GroupNode )
switch gnode .ConvertedType () {
case ConvertedTypes .List :
if gnode .fields .Len () != 1 {
panic ("invalid list node, should have exactly 1 child." )
}
if gnode .fields [0 ].RepetitionType () != parquet .Repetitions .Repeated {
panic ("invalid list node, child should be repeated" )
}
elemMustBeRequired := false
addSlice := false
var elemType reflect .Type
elemNode := gnode .fields [0 ]
switch {
case elemNode .Type () == Primitive ,
elemNode .(*GroupNode ).fields .Len () > 1 ,
elemNode .(*GroupNode ).fields .Len () == 1 && (elemNode .Name () == "array" || elemNode .Name () == gnode .Name ()+"_tuple" ):
elemMustBeRequired = true
elemType = typeFromNode (elemNode )
default :
addSlice = true
elemType = typeFromNode (elemNode .(*GroupNode ).fields [0 ])
}
if elemMustBeRequired && elemType .Kind () == reflect .Ptr {
elemType = elemType .Elem ()
}
if addSlice {
elemType = reflect .SliceOf (elemType )
}
if gnode .RepetitionType () == parquet .Repetitions .Optional {
elemType = reflect .PointerTo (elemType )
}
return elemType
case ConvertedTypes .Map , ConvertedTypes .MapKeyValue :
if gnode .fields .Len () != 1 {
panic ("invalid map node, should have exactly 1 child" )
}
if gnode .fields [0 ].Type () != Group {
panic ("invalid map node, child should be a group node" )
}
keyval := gnode .fields [0 ].(*GroupNode )
keyIndex := keyval .FieldIndexByName ("key" )
if keyIndex == -1 {
keyIndex = 0
}
keyType := typeFromNode (keyval .fields [keyIndex ])
if keyType .Kind () == reflect .Ptr {
keyType = keyType .Elem ()
}
if keyType == reflect .TypeOf (parquet .ByteArray {}) || keyType == reflect .TypeOf (parquet .FixedLenByteArray {}) {
keyType = reflect .TypeOf (string ("" ))
}
valType := reflect .TypeOf (true )
if keyval .fields .Len () > 1 {
valIndex := keyval .FieldIndexByName ("value" )
if valIndex == -1 {
valIndex = 1
}
valType = typeFromNode (keyval .fields [valIndex ])
}
mapType := reflect .MapOf (keyType , valType )
if gnode .RepetitionType () == parquet .Repetitions .Optional {
mapType = reflect .PointerTo (mapType )
}
return mapType
default :
fields := []reflect .StructField {}
for _ , f := range gnode .fields {
fields = append (fields , reflect .StructField {
Name : f .Name (),
Type : typeFromNode (f ),
PkgPath : "parquet" ,
})
}
structType := reflect .StructOf (fields )
if gnode .RepetitionType () == parquet .Repetitions .Repeated {
return reflect .SliceOf (structType )
}
if gnode .RepetitionType () == parquet .Repetitions .Optional {
return reflect .PointerTo (structType )
}
return structType
}
}
panic ("what happened?" )
}
func NewStructFromSchema (sc *Schema ) (t reflect .Type , err error ) {
defer func () {
if r := recover (); r != nil {
t = nil
err = utils .FormatRecoveredError ("unknown panic" , r )
}
}()
t = typeFromNode (sc .root )
if t .Kind () == reflect .Slice || t .Kind () == reflect .Ptr {
return t .Elem (), nil
}
return
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .