// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schema

import (
	

	
	format 
	
	
)

// NodeType describes whether the Node is a Primitive or Group node
type NodeType int

// the available constants for NodeType
const (
	Primitive NodeType = iota
	Group
)

// Node is the interface for both Group and Primitive Nodes.
// A logical schema type has a name, repetition level, and optionally
// a logical type (converted type is the deprecated version of the logical
// type concept, which is maintained for forward compatibility)
type Node interface {
	Name() string
	Type() NodeType
	RepetitionType() parquet.Repetition
	ConvertedType() ConvertedType
	LogicalType() LogicalType
	FieldID() int32
	Parent() Node
	SetParent(Node)
	Path() string
	Equals(Node) bool
	Visit(v Visitor)
	toThrift() *format.SchemaElement
}

// Visitor is an interface for creating functionality to walk the schema tree.
//
// A visitor can be passed to the Visit function of a Node in order to walk
// the tree. VisitPre is called the first time a node is encountered. If
// it is a group node, the return is checked and if it is false, the children
// will be skipped.
//
// VisitPost is called after visiting any children
type Visitor interface {
	VisitPre(Node) bool
	VisitPost(Node)
}

// ColumnPathFromNode walks the parents of the given node to construct it's
// column path
func ( Node) parquet.ColumnPath {
	if  == nil {
		return nil
	}

	 := make([]string, 0)

	// build the path in reverse order as we traverse nodes to the top
	 := 
	for .Parent() != nil {
		 = append(, .Name())
		 = .Parent()
	}

	// reverse the order of the list in place so that our result
	// is in the proper, correct order.
	for  := len()/2 - 1;  >= 0; -- {
		 := len() - 1 - 
		[], [] = [], []
	}

	return 
}

// node is the base embedded struct for both group and primitive nodes
type node struct {
	typ    NodeType
	parent Node

	name          string
	repetition    parquet.Repetition
	fieldID       int32
	logicalType   LogicalType
	convertedType ConvertedType
	colPath       parquet.ColumnPath
}

func ( *node) () *format.SchemaElement    { return nil }
func ( *node) () string                       { return .name }
func ( *node) () NodeType                     { return .typ }
func ( *node) () parquet.Repetition { return .repetition }
func ( *node) () ConvertedType       { return .convertedType }
func ( *node) () LogicalType           { return .logicalType }
func ( *node) () int32                     { return .fieldID }
func ( *node) () Node                       { return .parent }
func ( *node) ( Node)                   { .parent =  }
func ( *node) () string {
	return .columnPath().String()
}
func ( *node) () parquet.ColumnPath {
	if .colPath == nil {
		.colPath = ColumnPathFromNode()
	}
	return .colPath
}

func ( *node) ( Node) bool {
	return .typ == .Type() &&
		.Name() == .Name() &&
		.RepetitionType() == .RepetitionType() &&
		.ConvertedType() == .ConvertedType() &&
		.FieldID() == .FieldID() &&
		.LogicalType().Equals(.LogicalType())
}

func ( *node) ( Visitor) {}

// A PrimitiveNode is a type that is one of the primitive Parquet storage types. In addition to
// the other type metadata (name, repetition level, logical type), also has the
// physical storage type and their type-specific metadata (byte width, decimal
// parameters)
type PrimitiveNode struct {
	node

	ColumnOrder     parquet.ColumnOrder
	physicalType    parquet.Type
	typeLen         int
	decimalMetaData DecimalMetadata
}

// NewPrimitiveNodeLogical constructs a Primitive node using the provided logical type for a given
// physical type and typelength.
func ( string,  parquet.Repetition,  LogicalType,  parquet.Type,  int,  int32) (*PrimitiveNode, error) {
	 := &PrimitiveNode{
		node:         node{typ: Primitive, name: , repetition: , logicalType: , fieldID: },
		physicalType: ,
		typeLen:      ,
	}

	if  != nil {
		if !.IsNested() {
			if .IsApplicable(, int32()) {
				.convertedType, .decimalMetaData = .logicalType.ToConvertedType()
			} else {
				return nil, fmt.Errorf("%s cannot be applied to primitive type %s", , )
			}
		} else {
			return nil, fmt.Errorf("nested logical type %s cannot be applied to a non-group node", )
		}
	} else {
		.logicalType = NoLogicalType{}
		.convertedType, .decimalMetaData = .logicalType.ToConvertedType()
	}

	if !(.logicalType != nil && !.logicalType.IsNested() && .logicalType.IsCompatible(.convertedType, .decimalMetaData)) {
		return nil, fmt.Errorf("invalid logical type %s", .logicalType)
	}

	if .physicalType == parquet.Types.FixedLenByteArray && .typeLen <= 0 {
		return nil, xerrors.New("invalid fixed length byte array length")
	}
	return , nil
}

// NewPrimitiveNodeConverted constructs a primitive node from the given physical type and converted type,
// determining the logical type from the converted type.
func ( string,  parquet.Repetition,  parquet.Type,  ConvertedType, , ,  int,  int32) (*PrimitiveNode, error) {
	 := &PrimitiveNode{
		node:         node{typ: Primitive, name: , repetition: , convertedType: , fieldID: },
		physicalType: ,
		typeLen:      -1,
	}

	switch  {
	case ConvertedTypes.None:
	case ConvertedTypes.UTF8, ConvertedTypes.JSON, ConvertedTypes.BSON:
		if  != parquet.Types.ByteArray {
			return nil, fmt.Errorf("parquet: %s can only annotate BYTE_LEN fields", )
		}
	case ConvertedTypes.Decimal:
		switch  {
		case parquet.Types.Int32, parquet.Types.Int64, parquet.Types.ByteArray, parquet.Types.FixedLenByteArray:
		default:
			return nil, xerrors.New("parquet: DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED")
		}

		switch {
		case  <= 0:
			return nil, fmt.Errorf("parquet: invalid decimal precision: %d, must be between 1 and 38 inclusive", )
		case  < 0:
			return nil, fmt.Errorf("parquet: invalid decimal scale: %d, must be a number between 0 and precision inclusive", )
		case  > :
			return nil, fmt.Errorf("parquet: invalid decimal scale %d, cannot be greater than precision: %d", , )
		}
		.decimalMetaData.IsSet = true
		.decimalMetaData.Precision = int32()
		.decimalMetaData.Scale = int32()
	case ConvertedTypes.Date,
		ConvertedTypes.TimeMillis,
		ConvertedTypes.Int8,
		ConvertedTypes.Int16,
		ConvertedTypes.Int32,
		ConvertedTypes.Uint8,
		ConvertedTypes.Uint16,
		ConvertedTypes.Uint32:
		if  != parquet.Types.Int32 {
			return nil, fmt.Errorf("parquet: %s can only annotate INT32", )
		}
	case ConvertedTypes.TimeMicros,
		ConvertedTypes.TimestampMicros,
		ConvertedTypes.TimestampMillis,
		ConvertedTypes.Int64,
		ConvertedTypes.Uint64:
		if  != parquet.Types.Int64 {
			return nil, fmt.Errorf("parquet: %s can only annotate INT64", )
		}
	case ConvertedTypes.Interval:
		if  != parquet.Types.FixedLenByteArray ||  != 12 {
			return nil, xerrors.New("parquet: INTERVAL can only annotate FIXED_LEN_BYTE_ARRAY(12)")
		}
	case ConvertedTypes.Enum:
		if  != parquet.Types.ByteArray {
			return nil, xerrors.New("parquet: ENUM can only annotate BYTE_ARRAY fields")
		}
	case ConvertedTypes.NA:
	default:
		return nil, fmt.Errorf("parquet: %s cannot be applied to a primitive type", .String())
	}

	.logicalType = .convertedType.ToLogicalType(.decimalMetaData)
	if !(.logicalType != nil && !.logicalType.IsNested() && .logicalType.IsCompatible(.convertedType, .decimalMetaData)) {
		return nil, fmt.Errorf("invalid logical type %s", .logicalType)
	}

	if .physicalType == parquet.Types.FixedLenByteArray {
		if  <= 0 {
			return nil, xerrors.New("invalid fixed len byte array length")
		}
		.typeLen = 
	}

	return , nil
}

func ( *format.SchemaElement) (*PrimitiveNode, error) {
	 := int32(-1)
	if .IsSetFieldID() {
		 = .GetFieldID()
	}

	if .IsSetLogicalType() {
		return NewPrimitiveNodeLogical(.GetName(), parquet.Repetition(.GetRepetitionType()),
			getLogicalType(.GetLogicalType()), parquet.Type(.GetType()), int(.GetTypeLength()),
			)
	} else if .IsSetConvertedType() {
		return NewPrimitiveNodeConverted(.GetName(), parquet.Repetition(.GetRepetitionType()),
			parquet.Type(.GetType()), ConvertedType(.GetConvertedType()),
			int(.GetTypeLength()), int(.GetPrecision()), int(.GetScale()), )
	}
	return NewPrimitiveNodeLogical(.GetName(), parquet.Repetition(.GetRepetitionType()), NoLogicalType{}, parquet.Type(.GetType()), int(.GetTypeLength()), )
}

// NewPrimitiveNode constructs a primitive node with the ConvertedType of None and no logical type.
//
// Use NewPrimitiveNodeLogical and NewPrimitiveNodeConverted to specify the logical or converted type.
func ( string,  parquet.Repetition,  parquet.Type, ,  int32) (*PrimitiveNode, error) {
	return NewPrimitiveNodeLogical(, , nil, , int(), )
}

// Equals returns true if both nodes are primitive nodes with the same physical
// and converted/logical types.
func ( *PrimitiveNode) ( Node) bool {
	if !.node.Equals() {
		return false
	}

	 := .(*PrimitiveNode)
	if  ==  {
		return true
	}

	if .PhysicalType() != .PhysicalType() {
		return false
	}

	 := true
	if .ConvertedType() == ConvertedTypes.Decimal {
		 =  &&
			(.decimalMetaData.Precision == .decimalMetaData.Precision &&
				.decimalMetaData.Scale == .decimalMetaData.Scale)
	}
	if .PhysicalType() == parquet.Types.FixedLenByteArray {
		 =  && .TypeLength() == .TypeLength()
	}
	return 
}

// PhysicalType returns the proper Physical parquet.Type primitive that is used
// to store the values in this column.
func ( *PrimitiveNode) () parquet.Type { return .physicalType }

// SetTypeLength will change the type length of the node, has no effect if the
// physical type is not FixedLength Byte Array
func ( *PrimitiveNode) ( int) {
	if .PhysicalType() == parquet.Types.FixedLenByteArray {
		.typeLen = 
	}
}

// TypeLength will be -1 if not a FixedLenByteArray column, otherwise will be the
// length of the FixedLen Byte Array
func ( *PrimitiveNode) () int { return .typeLen }

// DecimalMetadata returns the current metadata for the node. If not a decimal
// typed column, the return should have IsSet == false.
func ( *PrimitiveNode) () DecimalMetadata { return .decimalMetaData }

// Visit is for implementing a Visitor pattern handler to walk a schema's tree. One
// example is the Schema Printer which walks the tree to print out the schema in order.
func ( *PrimitiveNode) ( Visitor) {
	.VisitPre()
	.VisitPost()
}

func ( *PrimitiveNode) () *format.SchemaElement {
	 := &format.SchemaElement{
		Name:           .Name(),
		RepetitionType: format.FieldRepetitionTypePtr(format.FieldRepetitionType(.RepetitionType())),
		Type:           format.TypePtr(format.Type(.PhysicalType())),
	}
	if .ConvertedType() != ConvertedTypes.None {
		.ConvertedType = format.ConvertedTypePtr(format.ConvertedType(.ConvertedType()))
	}
	if .FieldID() >= 0 {
		.FieldID = thrift.Int32Ptr(.FieldID())
	}
	if .logicalType != nil && .logicalType.IsSerialized() && !.logicalType.Equals(IntervalLogicalType{}) {
		.LogicalType = .logicalType.toThrift()
	}
	if .physicalType == parquet.Types.FixedLenByteArray {
		.TypeLength = thrift.Int32Ptr(int32(.typeLen))
	}
	if .decimalMetaData.IsSet {
		.Precision = &.decimalMetaData.Precision
		.Scale = &.decimalMetaData.Scale
	}
	return 
}

// FieldList is an alias for a slice of Nodes
type FieldList []Node

// Len is equivalent to len(fieldlist)
func ( FieldList) () int { return len() }

// GroupNode is for managing nested nodes like List, Map, etc.
type GroupNode struct {
	node
	fields    FieldList
	nameToIdx strIntMultimap
}

// NewGroupNodeConverted constructs a group node with the provided fields and converted type,
// determining the logical type from that converted type.
func ( string,  parquet.Repetition,  FieldList,  ConvertedType,  int32) ( *GroupNode,  error) {
	 = &GroupNode{
		node:   node{typ: Group, name: , repetition: , convertedType: , fieldID: },
		fields: ,
	}
	.logicalType = .convertedType.ToLogicalType(DecimalMetadata{})
	if !(.logicalType != nil && (.logicalType.IsNested() || .logicalType.IsNone()) && .logicalType.IsCompatible(.convertedType, DecimalMetadata{})) {
		 = fmt.Errorf("invalid logical type %s", .logicalType.String())
		return
	}

	.nameToIdx = make(strIntMultimap)
	for ,  := range .fields {
		.SetParent()
		.nameToIdx.Add(.Name(), )
	}
	return
}

// NewGroupNodeLogical constructs a group node with the provided fields and logical type,
// determining the converted type from the provided logical type.
func ( string,  parquet.Repetition,  FieldList,  LogicalType,  int32) ( *GroupNode,  error) {
	 = &GroupNode{
		node:   node{typ: Group, name: , repetition: , logicalType: , fieldID: },
		fields: ,
	}

	if  != nil {
		if .IsNested() {
			.convertedType, _ = .ToConvertedType()
		} else {
			 = fmt.Errorf("logical type %s cannot be applied to group node", )
			return
		}
	} else {
		.logicalType = NoLogicalType{}
		.convertedType, _ = .logicalType.ToConvertedType()
	}

	if !(.logicalType != nil && (.logicalType.IsNested() || .logicalType.IsNone()) && .logicalType.IsCompatible(.convertedType, DecimalMetadata{})) {
		 = fmt.Errorf("invalid logical type %s", .logicalType)
		return
	}

	.nameToIdx = make(strIntMultimap)
	for ,  := range .fields {
		.SetParent()
		.nameToIdx.Add(.Name(), )
	}
	return
}

// NewGroupNode constructs a new group node with the provided fields,
// but with converted type None and No Logical Type
func ( string,  parquet.Repetition,  FieldList,  int32) (*GroupNode, error) {
	return NewGroupNodeConverted(, , , ConvertedTypes.None, )
}

// Must is a convenience function for the NewNode functions that return a Node
// and an error, panic'ing if err != nil or returning the node
func ( Node,  error) Node {
	if  != nil {
		panic()
	}
	return 
}

// MustGroup is like Must, except it casts the node to a *GroupNode, which will panic
// if it is a primitive node.
func ( Node,  error) *GroupNode {
	if  != nil {
		panic()
	}
	return .(*GroupNode)
}

// MustPrimitive is like Must except it casts the node to *PrimitiveNode which will panic
// if it is a group node.
func ( Node,  error) *PrimitiveNode {
	if  != nil {
		panic()
	}
	return .(*PrimitiveNode)
}

func ( *format.SchemaElement,  FieldList) (*GroupNode, error) {
	 := int32(-1)
	if .IsSetFieldID() {
		 = .GetFieldID()
	}

	if .IsSetLogicalType() {
		return NewGroupNodeLogical(.GetName(), parquet.Repetition(.GetRepetitionType()), , getLogicalType(.GetLogicalType()), )
	}

	 := ConvertedTypes.None
	if .IsSetConvertedType() {
		 = ConvertedType(.GetConvertedType())
	}
	return NewGroupNodeConverted(.GetName(), parquet.Repetition(.GetRepetitionType()), , , )
}

func ( *GroupNode) () *format.SchemaElement {
	 := &format.SchemaElement{
		Name:           .name,
		NumChildren:    thrift.Int32Ptr(int32(len(.fields))),
		RepetitionType: format.FieldRepetitionTypePtr(format.FieldRepetitionType(.RepetitionType())),
	}
	if .convertedType != ConvertedTypes.None {
		.ConvertedType = format.ConvertedTypePtr(format.ConvertedType(.convertedType))
	}
	if .fieldID >= 0 {
		.FieldID = &.fieldID
	}
	if .logicalType != nil && .logicalType.IsSerialized() {
		.LogicalType = .logicalType.toThrift()
	}
	return 
}

// Equals will compare this node to the provided node and only return true if
// this node and all of it's children are the same as the passed in node and its
// children.
func ( *GroupNode) ( Node) bool {
	if !.node.Equals() {
		return false
	}

	 := .(*GroupNode)
	if  ==  {
		return true
	}
	if len(.fields) != len(.fields) {
		return false
	}

	for ,  := range .fields {
		if !.Equals(.fields[]) {
			return false
		}
	}
	return true
}

// NumFields returns the number of direct child fields for this group node
func ( *GroupNode) () int {
	return len(.fields)
}

// Field returns the node in the field list which is of the provided (0-based) index
func ( *GroupNode) ( int) Node {
	return .fields[]
}

// FieldIndexByName provides the index for the field of the given name. Returns
// -1 if not found.
//
// If there are more than one field of this name, it returns the index for the first one.
func ( *GroupNode) ( string) int {
	if ,  := .nameToIdx[];  {
		return [0]
	}
	return -1
}

// FieldIndexByField looks up the index child of this node. Returns -1
// if n isn't a child of this group
func ( *GroupNode) ( Node) int {
	if ,  := .nameToIdx[.Name()];  {
		for ,  := range  {
			if  == .fields[] {
				return 
			}
		}
	}
	return -1
}

// Visit is for implementing a Visitor pattern handler to walk a schema's tree. One
// example is the Schema Printer which walks the tree to print out the schema in order.
func ( *GroupNode) ( Visitor) {
	if .VisitPre() {
		for ,  := range .fields {
			.Visit()
		}
	}
	.VisitPost()
}

// HasRepeatedFields returns true if any of the children of this node have
// Repeated as its repetition type.
//
// This is recursive and will check the children of any group nodes that are children.
func ( *GroupNode) () bool {
	for ,  := range .fields {
		if .RepetitionType() == parquet.Repetitions.Repeated {
			return true
		}
		if .Type() == Group {
			return .(*GroupNode).()
		}
	}
	return false
}

// NewInt32Node is a convenience factory for constructing an Int32 Primitive Node
func ( string,  parquet.Repetition,  int32) *PrimitiveNode {
	return MustPrimitive(NewPrimitiveNode(, , parquet.Types.Int32, , -1))
}

// NewInt64Node is a convenience factory for constructing an Int64 Primitive Node
func ( string,  parquet.Repetition,  int32) *PrimitiveNode {
	return MustPrimitive(NewPrimitiveNode(, , parquet.Types.Int64, , -1))
}

// NewInt96Node is a convenience factory for constructing an Int96 Primitive Node
func ( string,  parquet.Repetition,  int32) *PrimitiveNode {
	return MustPrimitive(NewPrimitiveNode(, , parquet.Types.Int96, , -1))
}

// NewFloat32Node is a convenience factory for constructing an Float Primitive Node
func ( string,  parquet.Repetition,  int32) *PrimitiveNode {
	return MustPrimitive(NewPrimitiveNode(, , parquet.Types.Float, , -1))
}

// NewFloat64Node is a convenience factory for constructing an Double Primitive Node
func ( string,  parquet.Repetition,  int32) *PrimitiveNode {
	return MustPrimitive(NewPrimitiveNode(, , parquet.Types.Double, , -1))
}

// NewBooleanNode is a convenience factory for constructing an Boolean Primitive Node
func ( string,  parquet.Repetition,  int32) *PrimitiveNode {
	return MustPrimitive(NewPrimitiveNode(, , parquet.Types.Boolean, , -1))
}

// NewByteArrayNode is a convenience factory for constructing an Byte Array Primitive Node
func ( string,  parquet.Repetition,  int32) *PrimitiveNode {
	return MustPrimitive(NewPrimitiveNode(, , parquet.Types.ByteArray, , -1))
}

// NewFixedLenByteArrayNode is a convenience factory for constructing an Fixed Length
// Byte Array Primitive Node of the given length
func ( string,  parquet.Repetition,  int32,  int32) *PrimitiveNode {
	return MustPrimitive(NewPrimitiveNode(, , parquet.Types.FixedLenByteArray, , ))
}