package parquet

import (
	
	
	
	

	
	
	
	
)

// Node values represent nodes of a parquet schema.
//
// Nodes carry the type of values, as well as properties like whether the values
// are optional or repeat. Nodes with one or more children represent parquet
// groups and therefore do not have a logical type.
//
// Nodes are immutable values and therefore safe to use concurrently from
// multiple goroutines.
type Node interface {
	// The id of this node in its parent node. Zero value is treated as id is not
	// set. ID only needs to be unique within its parent context.
	//
	// This is the same as parquet field_id
	ID() int

	// Returns a human-readable representation of the parquet node.
	String() string

	// For leaf nodes, returns the type of values of the parquet column.
	//
	// Calling this method on non-leaf nodes will panic.
	Type() Type

	// Returns whether the parquet column is optional.
	Optional() bool

	// Returns whether the parquet column is repeated.
	Repeated() bool

	// Returns whether the parquet column is required.
	Required() bool

	// Returns true if this a leaf node.
	Leaf() bool

	// Returns a mapping of the node's fields.
	//
	// As an optimization, the same slices may be returned by multiple calls to
	// this method, programs must treat the returned values as immutable.
	//
	// This method returns an empty mapping when called on leaf nodes.
	Fields() []Field

	// Returns the encoding used by the node.
	//
	// The method may return nil to indicate that no specific encoding was
	// configured on the node, in which case a default encoding might be used.
	Encoding() encoding.Encoding

	// Returns compression codec used by the node.
	//
	// The method may return nil to indicate that no specific compression codec
	// was configured on the node, in which case a default compression might be
	// used.
	Compression() compress.Codec

	// Returns the Go type that best represents the parquet node.
	//
	// For leaf nodes, this will be one of bool, int32, int64, deprecated.Int96,
	// float32, float64, string, []byte, or [N]byte.
	//
	// For groups, the method returns a struct type.
	//
	// If the method is called on a repeated node, the method returns a slice of
	// the underlying type.
	//
	// For optional nodes, the method returns a pointer of the underlying type.
	//
	// For nodes that were constructed from Go values (e.g. using SchemaOf), the
	// method returns the original Go type.
	GoType() reflect.Type
}

// Field instances represent fields of a parquet node, which associate a node to
// their name in their parent node.
type Field interface {
	Node

	// Returns the name of this field in its parent node.
	Name() string

	// Given a reference to the Go value matching the structure of the parent
	// node, returns the Go value of the field.
	Value(base reflect.Value) reflect.Value
}

// Encoded wraps the node passed as argument to use the given encoding.
//
// The function panics if it is called on a non-leaf node, or if the
// encoding does not support the node type.
func ( Node,  encoding.Encoding) Node {
	if !.Leaf() {
		panic("cannot add encoding to a non-leaf node")
	}
	if  != nil {
		 := .Type().Kind()
		if !canEncode(, ) {
			panic("cannot apply " + .Encoding().String() + " to node of type " + .String())
		}
	}
	return &encodedNode{
		Node:     ,
		encoding: ,
	}
}

type encodedNode struct {
	Node
	encoding encoding.Encoding
}

func ( *encodedNode) () encoding.Encoding {
	return .encoding
}

// Compressed wraps the node passed as argument to use the given compression
// codec.
//
// If the codec is nil, the node's compression is left unchanged.
//
// The function panics if it is called on a non-leaf node.
func ( Node,  compress.Codec) Node {
	if !.Leaf() {
		panic("cannot add compression codec to a non-leaf node")
	}
	return &compressedNode{
		Node:  ,
		codec: ,
	}
}

type compressedNode struct {
	Node
	codec compress.Codec
}

func ( *compressedNode) () compress.Codec {
	return .codec
}

// Optional wraps the given node to make it optional.
func ( Node) Node { return &optionalNode{} }

type optionalNode struct{ Node }

func ( *optionalNode) () bool       { return true }
func ( *optionalNode) () bool       { return false }
func ( *optionalNode) () bool       { return false }
func ( *optionalNode) () reflect.Type { return reflect.PtrTo(.Node.GoType()) }

// FieldID wraps a node to provide node field id
func ( Node,  int) Node { return &fieldIDNode{Node: , id: } }

type fieldIDNode struct {
	Node
	id int
}

func ( *fieldIDNode) () int { return .id }

// Repeated wraps the given node to make it repeated.
func ( Node) Node { return &repeatedNode{} }

type repeatedNode struct{ Node }

func ( *repeatedNode) () bool       { return false }
func ( *repeatedNode) () bool       { return true }
func ( *repeatedNode) () bool       { return false }
func ( *repeatedNode) () reflect.Type { return reflect.SliceOf(.Node.GoType()) }

// Required wraps the given node to make it required.
func ( Node) Node { return &requiredNode{} }

type requiredNode struct{ Node }

func ( *requiredNode) () bool       { return false }
func ( *requiredNode) () bool       { return false }
func ( *requiredNode) () bool       { return true }
func ( *requiredNode) () reflect.Type { return .Node.GoType() }

type node struct{}

// Leaf returns a leaf node of the given type.
func ( Type) Node {
	return &leafNode{typ: }
}

type leafNode struct{ typ Type }

func ( *leafNode) () int { return 0 }

func ( *leafNode) () string { return sprint("", ) }

func ( *leafNode) () Type { return .typ }

func ( *leafNode) () bool { return false }

func ( *leafNode) () bool { return false }

func ( *leafNode) () bool { return true }

func ( *leafNode) () bool { return true }

func ( *leafNode) () []Field { return nil }

func ( *leafNode) () encoding.Encoding { return nil }

func ( *leafNode) () compress.Codec { return nil }

func ( *leafNode) () reflect.Type { return goTypeOfLeaf() }

var repetitionTypes = [...]format.FieldRepetitionType{
	0: format.Required,
	1: format.Optional,
	2: format.Repeated,
}

func fieldRepetitionTypePtrOf( Node) *format.FieldRepetitionType {
	switch {
	case .Required():
		return &repetitionTypes[format.Required]
	case .Optional():
		return &repetitionTypes[format.Optional]
	case .Repeated():
		return &repetitionTypes[format.Repeated]
	default:
		return nil
	}
}

func fieldRepetitionTypeOf( Node) format.FieldRepetitionType {
	switch {
	case .Optional():
		return format.Optional
	case .Repeated():
		return format.Repeated
	default:
		return format.Required
	}
}

func applyFieldRepetitionType( format.FieldRepetitionType, ,  byte) (byte, byte) {
	switch  {
	case format.Optional:
		++
	case format.Repeated:
		++
		++
	}
	return , 
}

type Group map[string]Node

func ( Group) () int { return 0 }

func ( Group) () string { return sprint("", ) }

func ( Group) () Type { return groupType{} }

func ( Group) () bool { return false }

func ( Group) () bool { return false }

func ( Group) () bool { return true }

func ( Group) () bool { return false }

func ( Group) () []Field {
	 := make([]groupField, 0, len())
	for ,  := range  {
		 = append(, groupField{
			Node: ,
			name: ,
		})
	}
	sort.Slice(, func(,  int) bool {
		return [].name < [].name
	})
	 := make([]Field, len())
	for  := range  {
		[] = &[]
	}
	return 
}

func ( Group) () encoding.Encoding { return nil }

func ( Group) () compress.Codec { return nil }

func ( Group) () reflect.Type { return goTypeOfGroup() }

type groupField struct {
	Node
	name string
}

func ( *groupField) () string { return .name }

func ( *groupField) ( reflect.Value) reflect.Value {
	if .Kind() == reflect.Interface {
		if .IsNil() {
			return reflect.ValueOf(nil)
		}
		if  = .Elem(); .Kind() == reflect.Pointer && .IsNil() {
			return reflect.ValueOf(nil)
		}
	}
	return .MapIndex(reflect.ValueOf(&.name).Elem())
}

func goTypeOf( Node) reflect.Type {
	switch {
	case .Optional():
		return goTypeOfOptional()
	case .Repeated():
		return goTypeOfRepeated()
	default:
		return goTypeOfRequired()
	}
}

func goTypeOfOptional( Node) reflect.Type {
	return reflect.PtrTo(goTypeOfRequired())
}

func goTypeOfRepeated( Node) reflect.Type {
	return reflect.SliceOf(goTypeOfRequired())
}

func goTypeOfRequired( Node) reflect.Type {
	if .Leaf() {
		return goTypeOfLeaf()
	} else {
		return goTypeOfGroup()
	}
}

func goTypeOfLeaf( Node) reflect.Type {
	 := .Type()
	if ,  := .(interface{ () reflect.Type });  {
		return .()
	}
	switch .Kind() {
	case Boolean:
		return reflect.TypeOf(false)
	case Int32:
		return reflect.TypeOf(int32(0))
	case Int64:
		return reflect.TypeOf(int64(0))
	case Int96:
		return reflect.TypeOf(deprecated.Int96{})
	case Float:
		return reflect.TypeOf(float32(0))
	case Double:
		return reflect.TypeOf(float64(0))
	case ByteArray:
		return reflect.TypeOf(([]byte)(nil))
	case FixedLenByteArray:
		return reflect.ArrayOf(.Length(), reflect.TypeOf(byte(0)))
	default:
		panic("BUG: parquet type returned an unsupported kind")
	}
}

func goTypeOfGroup( Node) reflect.Type {
	 := .Fields()
	 := make([]reflect.StructField, len())
	for ,  := range  {
		[].Name = exportedStructFieldName(.Name())
		[].Type = .GoType()
		// TODO: can we reconstruct a struct tag that would be valid if a value
		// of this type were passed to SchemaOf?
	}
	return reflect.StructOf()
}

func exportedStructFieldName( string) string {
	,  := utf8.DecodeRuneInString()
	return string([]rune{unicode.ToUpper()}) + [:]
}

func isList( Node) bool {
	 := .Type().LogicalType()
	return  != nil && .List != nil
}

func isMap( Node) bool {
	 := .Type().LogicalType()
	return  != nil && .Map != nil
}

func numLeafColumnsOf( Node) int16 {
	return makeColumnIndex(numLeafColumns(, 0))
}

func numLeafColumns( Node,  int) int {
	if .Leaf() {
		return  + 1
	}
	for ,  := range .Fields() {
		 = (, )
	}
	return 
}

func listElementOf( Node) Node {
	if !.Leaf() {
		if  := fieldByName(, "list");  != nil {
			if  := fieldByName(, "element");  != nil {
				return 
			}
			// TODO: It should not be named "item", but some versions of pyarrow
			//       and some versions of polars used that instead of "element".
			//       https://issues.apache.org/jira/browse/ARROW-11497
			//       https://github.com/pola-rs/polars/issues/17100
			if  := fieldByName(, "item");  != nil {
				return 
			}
		}
	}
	panic("node with logical type LIST is not composed of a repeated .list.element")
}

func mapKeyValueOf( Node) Node {
	if !.Leaf() && (.Required() || .Optional()) {
		for ,  := range []string{"key_value", "map"} {
			if  := fieldByName(, );  != nil && !.Leaf() && .Repeated() {
				 := fieldByName(, "key")
				 := fieldByName(, "value")
				if  != nil &&  != nil && .Required() {
					return 
				}
			}
		}
	}
	panic("node with logical type MAP is not composed of a repeated .key_value group (or .map group) with key and value fields")
}

func encodingOf( Node) encoding.Encoding {
	 := .Encoding()
	// The parquet-format documentation states that the
	// DELTA_LENGTH_BYTE_ARRAY is always preferred to PLAIN when
	// encoding BYTE_ARRAY values. We apply it as a default if
	// none were explicitly specified, which gives the application
	// the opportunity to override this behavior if needed.
	//
	// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-length-byte-array-delta_length_byte_array--6
	if .Type().Kind() == ByteArray &&  == nil {
		 = &DeltaLengthByteArray
	}
	if  == nil {
		 = &Plain
	}
	return 
}

func forEachNodeOf( string,  Node,  func(string, Node)) {
	(, )

	for ,  := range .Fields() {
		(.Name(), , )
	}
}

func fieldByName( Node,  string) Field {
	for ,  := range .Fields() {
		if .Name() ==  {
			return 
		}
	}
	return nil
}

func nodesAreEqual(,  Node) bool {
	if .Leaf() {
		return .Leaf() && leafNodesAreEqual(, )
	} else {
		return !.Leaf() && groupNodesAreEqual(, )
	}
}

func typesAreEqual(,  Type) bool {
	return .Kind() == .Kind() &&
		.Length() == .Length() &&
		reflect.DeepEqual(.LogicalType(), .LogicalType())
}

func repetitionsAreEqual(,  Node) bool {
	return .Optional() == .Optional() && .Repeated() == .Repeated()
}

func leafNodesAreEqual(,  Node) bool {
	return typesAreEqual(.Type(), .Type()) && repetitionsAreEqual(, )
}

func groupNodesAreEqual(,  Node) bool {
	 := .Fields()
	 := .Fields()

	if len() != len() {
		return false
	}

	if !repetitionsAreEqual(, ) {
		return false
	}

	for  := range  {
		 := []
		 := []

		if .Name() != .Name() {
			return false
		}

		if !nodesAreEqual(, ) {
			return false
		}
	}

	return true
}