package schema
import (
"fmt"
"github.com/apache/arrow-go/v18/parquet"
format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
"github.com/apache/thrift/lib/go/thrift"
"golang.org/x/xerrors"
)
type NodeType int
const (
Primitive NodeType = iota
Group
)
type Node interface {
Name () string
Type () NodeType
RepetitionType () parquet .Repetition
ConvertedType () ConvertedType
LogicalType () LogicalType
FieldID () int32
Parent () Node
SetParent (Node )
Path () string
Equals (Node ) bool
Visit (v Visitor )
toThrift() *format .SchemaElement
}
type Visitor interface {
VisitPre (Node ) bool
VisitPost (Node )
}
func ColumnPathFromNode (n Node ) parquet .ColumnPath {
if n == nil {
return nil
}
c := make ([]string , 0 )
cursor := n
for cursor .Parent () != nil {
c = append (c , cursor .Name ())
cursor = cursor .Parent ()
}
for i := len (c )/2 - 1 ; i >= 0 ; i -- {
opp := len (c ) - 1 - i
c [i ], c [opp ] = c [opp ], c [i ]
}
return c
}
type node struct {
typ NodeType
parent Node
name string
repetition parquet .Repetition
fieldID int32
logicalType LogicalType
convertedType ConvertedType
colPath parquet .ColumnPath
}
func (n *node ) toThrift () *format .SchemaElement { return nil }
func (n *node ) Name () string { return n .name }
func (n *node ) Type () NodeType { return n .typ }
func (n *node ) RepetitionType () parquet .Repetition { return n .repetition }
func (n *node ) ConvertedType () ConvertedType { return n .convertedType }
func (n *node ) LogicalType () LogicalType { return n .logicalType }
func (n *node ) FieldID () int32 { return n .fieldID }
func (n *node ) Parent () Node { return n .parent }
func (n *node ) SetParent (p Node ) { n .parent = p }
func (n *node ) Path () string {
return n .columnPath ().String ()
}
func (n *node ) columnPath () parquet .ColumnPath {
if n .colPath == nil {
n .colPath = ColumnPathFromNode (n )
}
return n .colPath
}
func (n *node ) Equals (rhs Node ) bool {
return n .typ == rhs .Type () &&
n .Name () == rhs .Name () &&
n .RepetitionType () == rhs .RepetitionType () &&
n .ConvertedType () == rhs .ConvertedType () &&
n .FieldID () == rhs .FieldID () &&
n .LogicalType ().Equals (rhs .LogicalType ())
}
func (n *node ) Visit (v Visitor ) {}
type PrimitiveNode struct {
node
ColumnOrder parquet .ColumnOrder
physicalType parquet .Type
typeLen int
decimalMetaData DecimalMetadata
}
func NewPrimitiveNodeLogical (name string , repetition parquet .Repetition , logicalType LogicalType , physicalType parquet .Type , typeLen int , id int32 ) (*PrimitiveNode , error ) {
n := &PrimitiveNode {
node : node {typ : Primitive , name : name , repetition : repetition , logicalType : logicalType , fieldID : id },
physicalType : physicalType ,
typeLen : typeLen ,
}
if logicalType != nil {
if !logicalType .IsNested () {
if logicalType .IsApplicable (physicalType , int32 (typeLen )) {
n .convertedType , n .decimalMetaData = n .logicalType .ToConvertedType ()
} else {
return nil , fmt .Errorf ("%s cannot be applied to primitive type %s" , logicalType , physicalType )
}
} else {
return nil , fmt .Errorf ("nested logical type %s cannot be applied to a non-group node" , logicalType )
}
} else {
n .logicalType = NoLogicalType {}
n .convertedType , n .decimalMetaData = n .logicalType .ToConvertedType ()
}
if !(n .logicalType != nil && !n .logicalType .IsNested () && n .logicalType .IsCompatible (n .convertedType , n .decimalMetaData )) {
return nil , fmt .Errorf ("invalid logical type %s" , n .logicalType )
}
if n .physicalType == parquet .Types .FixedLenByteArray && n .typeLen <= 0 {
return nil , xerrors .New ("invalid fixed length byte array length" )
}
return n , nil
}
func NewPrimitiveNodeConverted (name string , repetition parquet .Repetition , typ parquet .Type , converted ConvertedType , typeLen , precision , scale int , id int32 ) (*PrimitiveNode , error ) {
n := &PrimitiveNode {
node : node {typ : Primitive , name : name , repetition : repetition , convertedType : converted , fieldID : id },
physicalType : typ ,
typeLen : -1 ,
}
switch converted {
case ConvertedTypes .None :
case ConvertedTypes .UTF8 , ConvertedTypes .JSON , ConvertedTypes .BSON :
if typ != parquet .Types .ByteArray {
return nil , fmt .Errorf ("parquet: %s can only annotate BYTE_LEN fields" , typ )
}
case ConvertedTypes .Decimal :
switch typ {
case parquet .Types .Int32 , parquet .Types .Int64 , parquet .Types .ByteArray , parquet .Types .FixedLenByteArray :
default :
return nil , xerrors .New ("parquet: DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED" )
}
switch {
case precision <= 0 :
return nil , fmt .Errorf ("parquet: invalid decimal precision: %d, must be between 1 and 38 inclusive" , precision )
case scale < 0 :
return nil , fmt .Errorf ("parquet: invalid decimal scale: %d, must be a number between 0 and precision inclusive" , scale )
case scale > precision :
return nil , fmt .Errorf ("parquet: invalid decimal scale %d, cannot be greater than precision: %d" , scale , precision )
}
n .decimalMetaData .IsSet = true
n .decimalMetaData .Precision = int32 (precision )
n .decimalMetaData .Scale = int32 (scale )
case ConvertedTypes .Date ,
ConvertedTypes .TimeMillis ,
ConvertedTypes .Int8 ,
ConvertedTypes .Int16 ,
ConvertedTypes .Int32 ,
ConvertedTypes .Uint8 ,
ConvertedTypes .Uint16 ,
ConvertedTypes .Uint32 :
if typ != parquet .Types .Int32 {
return nil , fmt .Errorf ("parquet: %s can only annotate INT32" , converted )
}
case ConvertedTypes .TimeMicros ,
ConvertedTypes .TimestampMicros ,
ConvertedTypes .TimestampMillis ,
ConvertedTypes .Int64 ,
ConvertedTypes .Uint64 :
if typ != parquet .Types .Int64 {
return nil , fmt .Errorf ("parquet: %s can only annotate INT64" , converted )
}
case ConvertedTypes .Interval :
if typ != parquet .Types .FixedLenByteArray || typeLen != 12 {
return nil , xerrors .New ("parquet: INTERVAL can only annotate FIXED_LEN_BYTE_ARRAY(12)" )
}
case ConvertedTypes .Enum :
if typ != parquet .Types .ByteArray {
return nil , xerrors .New ("parquet: ENUM can only annotate BYTE_ARRAY fields" )
}
case ConvertedTypes .NA :
default :
return nil , fmt .Errorf ("parquet: %s cannot be applied to a primitive type" , converted .String ())
}
n .logicalType = n .convertedType .ToLogicalType (n .decimalMetaData )
if !(n .logicalType != nil && !n .logicalType .IsNested () && n .logicalType .IsCompatible (n .convertedType , n .decimalMetaData )) {
return nil , fmt .Errorf ("invalid logical type %s" , n .logicalType )
}
if n .physicalType == parquet .Types .FixedLenByteArray {
if typeLen <= 0 {
return nil , xerrors .New ("invalid fixed len byte array length" )
}
n .typeLen = typeLen
}
return n , nil
}
func PrimitiveNodeFromThrift (elem *format .SchemaElement ) (*PrimitiveNode , error ) {
fieldID := int32 (-1 )
if elem .IsSetFieldID () {
fieldID = elem .GetFieldID ()
}
if elem .IsSetLogicalType () {
return NewPrimitiveNodeLogical (elem .GetName (), parquet .Repetition (elem .GetRepetitionType ()),
getLogicalType (elem .GetLogicalType ()), parquet .Type (elem .GetType ()), int (elem .GetTypeLength ()),
fieldID )
} else if elem .IsSetConvertedType () {
return NewPrimitiveNodeConverted (elem .GetName (), parquet .Repetition (elem .GetRepetitionType ()),
parquet .Type (elem .GetType ()), ConvertedType (elem .GetConvertedType ()),
int (elem .GetTypeLength ()), int (elem .GetPrecision ()), int (elem .GetScale ()), fieldID )
}
return NewPrimitiveNodeLogical (elem .GetName (), parquet .Repetition (elem .GetRepetitionType ()), NoLogicalType {}, parquet .Type (elem .GetType ()), int (elem .GetTypeLength ()), fieldID )
}
func NewPrimitiveNode (name string , repetition parquet .Repetition , typ parquet .Type , fieldID , typeLength int32 ) (*PrimitiveNode , error ) {
return NewPrimitiveNodeLogical (name , repetition , nil , typ , int (typeLength ), fieldID )
}
func (p *PrimitiveNode ) Equals (rhs Node ) bool {
if !p .node .Equals (rhs ) {
return false
}
other := rhs .(*PrimitiveNode )
if p == other {
return true
}
if p .PhysicalType () != other .PhysicalType () {
return false
}
equal := true
if p .ConvertedType () == ConvertedTypes .Decimal {
equal = equal &&
(p .decimalMetaData .Precision == other .decimalMetaData .Precision &&
p .decimalMetaData .Scale == other .decimalMetaData .Scale )
}
if p .PhysicalType () == parquet .Types .FixedLenByteArray {
equal = equal && p .TypeLength () == other .TypeLength ()
}
return equal
}
func (p *PrimitiveNode ) PhysicalType () parquet .Type { return p .physicalType }
func (p *PrimitiveNode ) SetTypeLength (length int ) {
if p .PhysicalType () == parquet .Types .FixedLenByteArray {
p .typeLen = length
}
}
func (p *PrimitiveNode ) TypeLength () int { return p .typeLen }
func (p *PrimitiveNode ) DecimalMetadata () DecimalMetadata { return p .decimalMetaData }
func (p *PrimitiveNode ) Visit (v Visitor ) {
v .VisitPre (p )
v .VisitPost (p )
}
func (p *PrimitiveNode ) toThrift () *format .SchemaElement {
elem := &format .SchemaElement {
Name : p .Name (),
RepetitionType : format .FieldRepetitionTypePtr (format .FieldRepetitionType (p .RepetitionType ())),
Type : format .TypePtr (format .Type (p .PhysicalType ())),
}
if p .ConvertedType () != ConvertedTypes .None {
elem .ConvertedType = format .ConvertedTypePtr (format .ConvertedType (p .ConvertedType ()))
}
if p .FieldID () >= 0 {
elem .FieldID = thrift .Int32Ptr (p .FieldID ())
}
if p .logicalType != nil && p .logicalType .IsSerialized () && !p .logicalType .Equals (IntervalLogicalType {}) {
elem .LogicalType = p .logicalType .toThrift ()
}
if p .physicalType == parquet .Types .FixedLenByteArray {
elem .TypeLength = thrift .Int32Ptr (int32 (p .typeLen ))
}
if p .decimalMetaData .IsSet {
elem .Precision = &p .decimalMetaData .Precision
elem .Scale = &p .decimalMetaData .Scale
}
return elem
}
type FieldList []Node
func (f FieldList ) Len () int { return len (f ) }
type GroupNode struct {
node
fields FieldList
nameToIdx strIntMultimap
}
func NewGroupNodeConverted (name string , repetition parquet .Repetition , fields FieldList , converted ConvertedType , id int32 ) (n *GroupNode , err error ) {
n = &GroupNode {
node : node {typ : Group , name : name , repetition : repetition , convertedType : converted , fieldID : id },
fields : fields ,
}
n .logicalType = n .convertedType .ToLogicalType (DecimalMetadata {})
if !(n .logicalType != nil && (n .logicalType .IsNested () || n .logicalType .IsNone ()) && n .logicalType .IsCompatible (n .convertedType , DecimalMetadata {})) {
err = fmt .Errorf ("invalid logical type %s" , n .logicalType .String ())
return
}
n .nameToIdx = make (strIntMultimap )
for idx , f := range n .fields {
f .SetParent (n )
n .nameToIdx .Add (f .Name (), idx )
}
return
}
func NewGroupNodeLogical (name string , repetition parquet .Repetition , fields FieldList , logical LogicalType , id int32 ) (n *GroupNode , err error ) {
n = &GroupNode {
node : node {typ : Group , name : name , repetition : repetition , logicalType : logical , fieldID : id },
fields : fields ,
}
if logical != nil {
if logical .IsNested () {
n .convertedType , _ = logical .ToConvertedType ()
} else {
err = fmt .Errorf ("logical type %s cannot be applied to group node" , logical )
return
}
} else {
n .logicalType = NoLogicalType {}
n .convertedType , _ = n .logicalType .ToConvertedType ()
}
if !(n .logicalType != nil && (n .logicalType .IsNested () || n .logicalType .IsNone ()) && n .logicalType .IsCompatible (n .convertedType , DecimalMetadata {})) {
err = fmt .Errorf ("invalid logical type %s" , n .logicalType )
return
}
n .nameToIdx = make (strIntMultimap )
for idx , f := range n .fields {
f .SetParent (n )
n .nameToIdx .Add (f .Name (), idx )
}
return
}
func NewGroupNode (name string , repetition parquet .Repetition , fields FieldList , fieldID int32 ) (*GroupNode , error ) {
return NewGroupNodeConverted (name , repetition , fields , ConvertedTypes .None , fieldID )
}
func Must (n Node , err error ) Node {
if err != nil {
panic (err )
}
return n
}
func MustGroup (n Node , err error ) *GroupNode {
if err != nil {
panic (err )
}
return n .(*GroupNode )
}
func MustPrimitive (n Node , err error ) *PrimitiveNode {
if err != nil {
panic (err )
}
return n .(*PrimitiveNode )
}
func GroupNodeFromThrift (elem *format .SchemaElement , fields FieldList ) (*GroupNode , error ) {
id := int32 (-1 )
if elem .IsSetFieldID () {
id = elem .GetFieldID ()
}
if elem .IsSetLogicalType () {
return NewGroupNodeLogical (elem .GetName (), parquet .Repetition (elem .GetRepetitionType ()), fields , getLogicalType (elem .GetLogicalType ()), id )
}
converted := ConvertedTypes .None
if elem .IsSetConvertedType () {
converted = ConvertedType (elem .GetConvertedType ())
}
return NewGroupNodeConverted (elem .GetName (), parquet .Repetition (elem .GetRepetitionType ()), fields , converted , id )
}
func (g *GroupNode ) toThrift () *format .SchemaElement {
elem := &format .SchemaElement {
Name : g .name ,
NumChildren : thrift .Int32Ptr (int32 (len (g .fields ))),
RepetitionType : format .FieldRepetitionTypePtr (format .FieldRepetitionType (g .RepetitionType ())),
}
if g .convertedType != ConvertedTypes .None {
elem .ConvertedType = format .ConvertedTypePtr (format .ConvertedType (g .convertedType ))
}
if g .fieldID >= 0 {
elem .FieldID = &g .fieldID
}
if g .logicalType != nil && g .logicalType .IsSerialized () {
elem .LogicalType = g .logicalType .toThrift ()
}
return elem
}
func (g *GroupNode ) Equals (rhs Node ) bool {
if !g .node .Equals (rhs ) {
return false
}
other := rhs .(*GroupNode )
if g == other {
return true
}
if len (g .fields ) != len (other .fields ) {
return false
}
for idx , field := range g .fields {
if !field .Equals (other .fields [idx ]) {
return false
}
}
return true
}
func (g *GroupNode ) NumFields () int {
return len (g .fields )
}
func (g *GroupNode ) Field (i int ) Node {
return g .fields [i ]
}
func (g *GroupNode ) FieldIndexByName (name string ) int {
if idx , ok := g .nameToIdx [name ]; ok {
return idx [0 ]
}
return -1
}
func (g *GroupNode ) FieldIndexByField (n Node ) int {
if search , ok := g .nameToIdx [n .Name ()]; ok {
for _ , idx := range search {
if n == g .fields [idx ] {
return idx
}
}
}
return -1
}
func (g *GroupNode ) Visit (v Visitor ) {
if v .VisitPre (g ) {
for _ , field := range g .fields {
field .Visit (v )
}
}
v .VisitPost (g )
}
func (g *GroupNode ) HasRepeatedFields () bool {
for _ , field := range g .fields {
if field .RepetitionType () == parquet .Repetitions .Repeated {
return true
}
if field .Type () == Group {
return field .(*GroupNode ).HasRepeatedFields ()
}
}
return false
}
func NewInt32Node (name string , rep parquet .Repetition , fieldID int32 ) *PrimitiveNode {
return MustPrimitive (NewPrimitiveNode (name , rep , parquet .Types .Int32 , fieldID , -1 ))
}
func NewInt64Node (name string , rep parquet .Repetition , fieldID int32 ) *PrimitiveNode {
return MustPrimitive (NewPrimitiveNode (name , rep , parquet .Types .Int64 , fieldID , -1 ))
}
func NewInt96Node (name string , rep parquet .Repetition , fieldID int32 ) *PrimitiveNode {
return MustPrimitive (NewPrimitiveNode (name , rep , parquet .Types .Int96 , fieldID , -1 ))
}
func NewFloat32Node (name string , rep parquet .Repetition , fieldID int32 ) *PrimitiveNode {
return MustPrimitive (NewPrimitiveNode (name , rep , parquet .Types .Float , fieldID , -1 ))
}
func NewFloat64Node (name string , rep parquet .Repetition , fieldID int32 ) *PrimitiveNode {
return MustPrimitive (NewPrimitiveNode (name , rep , parquet .Types .Double , fieldID , -1 ))
}
func NewBooleanNode (name string , rep parquet .Repetition , fieldID int32 ) *PrimitiveNode {
return MustPrimitive (NewPrimitiveNode (name , rep , parquet .Types .Boolean , fieldID , -1 ))
}
func NewByteArrayNode (name string , rep parquet .Repetition , fieldID int32 ) *PrimitiveNode {
return MustPrimitive (NewPrimitiveNode (name , rep , parquet .Types .ByteArray , fieldID , -1 ))
}
func NewFixedLenByteArrayNode (name string , rep parquet .Repetition , length int32 , fieldID int32 ) *PrimitiveNode {
return MustPrimitive (NewPrimitiveNode (name , rep , parquet .Types .FixedLenByteArray , fieldID , length ))
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .