// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package iceberg

import (
	
	
	
	
	

	
	
)

// Schema is an Iceberg table schema, represented as a struct with
// multiple fields. The fields are only exported via accessor methods
// rather than exposing the slice directly in order to ensure a schema
// as immutable.
type Schema struct {
	ID                 int   `json:"schema-id"`
	IdentifierFieldIDs []int `json:"identifier-field-ids"`

	fields []NestedField

	// the following maps are lazily populated as needed.
	// rather than have lock contention with a mutex, we can use
	// atomic pointers to Store/Load the values.
	idToName      atomic.Pointer[map[int]string]
	idToField     atomic.Pointer[map[int]NestedField]
	nameToID      atomic.Pointer[map[string]int]
	nameToIDLower atomic.Pointer[map[string]int]
}

// NewSchema constructs a new schema with the provided ID
// and list of fields.
func ( int,  ...NestedField) *Schema {
	return NewSchemaWithIdentifiers(, []int{}, ...)
}

// NewSchemaWithIdentifiers constructs a new schema with the provided ID
// and fields, along with a slice of field IDs to be listed as identifier
// fields.
func ( int,  []int,  ...NestedField) *Schema {
	return &Schema{ID: , fields: , IdentifierFieldIDs: }
}

func ( *Schema) () string {
	var  strings.Builder
	.WriteString("table {")
	for ,  := range .fields {
		.WriteString("\n\t")
		.WriteString(.String())
	}
	.WriteString("\n}")
	return .String()
}

func ( *Schema) () (map[string]int, error) {
	 := .nameToID.Load()
	if  != nil {
		return *, nil
	}

	,  := IndexByName()
	if  != nil {
		return nil, 
	}

	.nameToID.Store(&)
	return , nil
}

func ( *Schema) () (map[int]NestedField, error) {
	 := .idToField.Load()
	if  != nil {
		return *, nil
	}

	,  := IndexByID()
	if  != nil {
		return nil, 
	}

	.idToField.Store(&)
	return , nil
}

func ( *Schema) () (map[int]string, error) {
	 := .idToName.Load()
	if  != nil {
		return *, nil
	}

	,  := IndexNameByID()
	if  != nil {
		return nil, 
	}

	.idToName.Store(&)
	return , nil
}

func ( *Schema) () (map[string]int, error) {
	 := .nameToIDLower.Load()
	if  != nil {
		return *, nil
	}

	,  := .lazyNameToID()
	if  != nil {
		return nil, 
	}

	 := make(map[string]int)
	for ,  := range  {
		[strings.ToLower()] = 
	}

	.nameToIDLower.Store(&)
	return , nil
}

func ( *Schema) () string { return "struct" }

// AsStruct returns a Struct with the same fields as the schema which can
// then be used as a Type.
func ( *Schema) () StructType    { return StructType{FieldList: .fields} }
func ( *Schema) () int          { return len(.fields) }
func ( *Schema) ( int) NestedField { return .fields[] }
func ( *Schema) () []NestedField   { return slices.Clone(.fields) }

func ( *Schema) ( []byte) error {
	type  Schema
	 := struct {
		 []NestedField `json:"fields"`
		*
	}{: (*)()}

	if  := json.Unmarshal(, &);  != nil {
		return 
	}

	.fields = .
	if .IdentifierFieldIDs == nil {
		.IdentifierFieldIDs = []int{}
	}
	return nil
}

func ( *Schema) () ([]byte, error) {
	if .IdentifierFieldIDs == nil {
		.IdentifierFieldIDs = []int{}
	}

	type  Schema
	return json.Marshal(struct {
		   string        `json:"type"`
		 []NestedField `json:"fields"`
		*
	}{: "struct", : .fields, : (*)()})
}

// FindColumnName returns the name of the column identified by the
// passed in field id. The second return value reports whether or
// not the field id was found in the schema.
func ( *Schema) ( int) (string, bool) {
	,  := .lazyIDToName()
	,  := []
	return , 
}

// FindFieldByName returns the field identified by the name given,
// the second return value will be false if no field by this name
// is found.
//
// Note: This search is done in a case sensitive manner. To perform
// a case insensitive search, use [*Schema.FindFieldByNameCaseInsensitive].
func ( *Schema) ( string) (NestedField, bool) {
	,  := .lazyNameToID()

	,  := []
	if ! {
		return NestedField{}, false
	}

	return .FindFieldByID()
}

// FindFieldByNameCaseInsensitive is like [*Schema.FindFieldByName],
// but performs a case insensitive search.
func ( *Schema) ( string) (NestedField, bool) {
	,  := .lazyNameToIDLower()

	,  := [strings.ToLower()]
	if ! {
		return NestedField{}, false
	}

	return .FindFieldByID()
}

// FindFieldByID is like [*Schema.FindColumnName], but returns the whole
// field rather than just the field name.
func ( *Schema) ( int) (NestedField, bool) {
	,  := .lazyIDToField()
	,  := []
	return , 
}

// FindTypeByID is like [*Schema.FindFieldByID], but returns only the data
// type of the field.
func ( *Schema) ( int) (Type, bool) {
	,  := .FindFieldByID()
	if ! {
		return nil, false
	}

	return .Type, true
}

// FindTypeByName is a convenience function for calling [*Schema.FindFieldByName],
// and then returning just the type.
func ( *Schema) ( string) (Type, bool) {
	,  := .FindFieldByName()
	if ! {
		return nil, false
	}

	return .Type, true
}

// FindTypeByNameCaseInsensitive is like [*Schema.FindTypeByName] but
// performs a case insensitive search.
func ( *Schema) ( string) (Type, bool) {
	,  := .FindFieldByNameCaseInsensitive()
	if ! {
		return nil, false
	}

	return .Type, true
}

// Equals compares the fields and identifierIDs, but does not compare
// the schema ID itself.
func ( *Schema) ( *Schema) bool {
	if  == nil {
		return false
	}

	if  ==  {
		return true
	}

	if len(.fields) != len(.fields) {
		return false
	}

	if !slices.Equal(.IdentifierFieldIDs, .IdentifierFieldIDs) {
		return false
	}

	return slices.EqualFunc(.fields, .fields, func(,  NestedField) bool {
		return .Equals()
	})
}

// Merge combines two schemas into a single schema. It returns a schema with an ID that is one greater thatn the ID of the first schema.
// If the two schemas have the same fields, the first schema is returned.
func ( *Schema) ( *Schema) (*Schema, error) {
	if .Equals() {
		return , nil
	}

	if  == nil {
		return , nil
	}

	 := .fields
	for ,  := range .fields {
		if ,  := .FindFieldByName(.Name); ! {
			 = append(, )
		}
	}

	// Sort the schema by name
	sort.Slice(, func(,  int) bool {
		return [].Name < [].Name
	})

	for  := range  {
		[].ID = 
	}

	return NewSchemaWithIdentifiers(.ID+1, []int{}, ...), nil
}

// HighestFieldID returns the value of the numerically highest field ID
// in this schema.
func ( *Schema) () int {
	,  := Visit[int](, findLastFieldID{})
	return 
}

type Void = struct{}

var void = Void{}

// Select creates a new schema with just the fields identified by name
// passed in the order they are provided. If caseSensitive is false,
// then fields will be identified by case insensitive search.
//
// An error is returned if a requested name cannot be found.
func ( *Schema) ( bool,  ...string) (*Schema, error) {
	 := make(map[int]Void)
	if  {
		,  := .lazyNameToID()
		for ,  := range  {
			,  := []
			if ! {
				return nil, fmt.Errorf("%w: could not find column %s", ErrInvalidSchema, )
			}
			[] = void
		}
	} else {
		,  := .lazyNameToIDLower()
		for ,  := range  {
			,  := [strings.ToLower()]
			if ! {
				return nil, fmt.Errorf("%w: could not find column %s", ErrInvalidSchema, )
			}
			[] = void
		}
	}

	return PruneColumns(, , true)
}

// SchemaVisitor is an interface that can be implemented to allow for
// easy traversal and processing of a schema.
//
// A SchemaVisitor can also optionally implement the Before/After Field,
// ListElement, MapKey, or MapValue interfaces to allow them to get called
// at the appropriate points within schema traversal.
type SchemaVisitor[ any] interface {
	Schema(schema *Schema, structResult ) 
	Struct(st StructType, fieldResults []) 
	Field(field NestedField, fieldResult ) 
	List(list ListType, elemResult ) 
	Map(mapType MapType, keyResult, valueResult ) 
	Primitive(p PrimitiveType) 
}

type BeforeFieldVisitor interface {
	BeforeField(field NestedField)
}

type AfterFieldVisitor interface {
	AfterField(field NestedField)
}

type BeforeListElementVisitor interface {
	BeforeListElement(elem NestedField)
}

type AfterListElementVisitor interface {
	AfterListElement(elem NestedField)
}

type BeforeMapKeyVisitor interface {
	BeforeMapKey(key NestedField)
}

type AfterMapKeyVisitor interface {
	AfterMapKey(key NestedField)
}

type BeforeMapValueVisitor interface {
	BeforeMapValue(value NestedField)
}

type AfterMapValueVisitor interface {
	AfterMapValue(value NestedField)
}

// Visit accepts a visitor and performs a post-order traversal of the given schema.
func [ any]( *Schema,  SchemaVisitor[]) ( ,  error) {
	if  == nil {
		 = fmt.Errorf("%w: cannot visit nil schema", ErrInvalidArgument)
		return
	}

	defer func() {
		if  := recover();  != nil {
			switch e := .(type) {
			case string:
				 = fmt.Errorf("error encountered during schema visitor: %s", )
			case error:
				 = fmt.Errorf("error encountered during schema visitor: %w", )
			}
		}
	}()

	return .Schema(, visitStruct(.AsStruct(), )), nil
}

func visitStruct[ any]( StructType,  SchemaVisitor[])  {
	 := make([], len(.FieldList))

	,  := .(BeforeFieldVisitor)
	,  := .(AfterFieldVisitor)

	for ,  := range .FieldList {
		if  != nil {
			.BeforeField()
		}

		 := visitField(, )

		if  != nil {
			.AfterField()
		}

		[] = .Field(, )
	}

	return .Struct(, )
}

func visitList[ any]( ListType,  SchemaVisitor[])  {
	 := .ElementField()

	if ,  := .(BeforeListElementVisitor);  {
		.BeforeListElement()
	} else if ,  := .(BeforeFieldVisitor);  {
		.BeforeField()
	}

	 := visitField(, )

	if ,  := .(AfterListElementVisitor);  {
		.AfterListElement()
	} else if ,  := .(AfterFieldVisitor);  {
		.AfterField()
	}

	return .List(, )
}

func visitMap[ any]( MapType,  SchemaVisitor[])  {
	,  := .KeyField(), .ValueField()

	if ,  := .(BeforeMapKeyVisitor);  {
		.BeforeMapKey()
	} else if ,  := .(BeforeFieldVisitor);  {
		.BeforeField()
	}

	 := visitField(, )

	if ,  := .(AfterMapKeyVisitor);  {
		.AfterMapKey()
	} else if ,  := .(AfterFieldVisitor);  {
		.AfterField()
	}

	if ,  := .(BeforeMapValueVisitor);  {
		.BeforeMapValue()
	} else if ,  := .(BeforeFieldVisitor);  {
		.BeforeField()
	}

	 := visitField(, )

	if ,  := .(AfterMapValueVisitor);  {
		.AfterMapValue()
	} else if ,  := .(AfterFieldVisitor);  {
		.AfterField()
	}

	return .Map(, , )
}

func visitField[ any]( NestedField,  SchemaVisitor[])  {
	switch typ := .Type.(type) {
	case *StructType:
		return visitStruct(*, )
	case *ListType:
		return visitList(*, )
	case *MapType:
		return visitMap(*, )
	default: // primitive
		return .Primitive(.(PrimitiveType))
	}
}

// IndexByID performs a post-order traversal of the given schema and
// returns a mapping from field ID to field.
func ( *Schema) (map[int]NestedField, error) {
	return Visit[map[int]NestedField](, &indexByID{index: make(map[int]NestedField)})
}

type indexByID struct {
	index map[int]NestedField
}

func ( *indexByID) (*Schema, map[int]NestedField) map[int]NestedField {
	return .index
}

func ( *indexByID) (StructType, []map[int]NestedField) map[int]NestedField {
	return .index
}

func ( *indexByID) ( NestedField,  map[int]NestedField) map[int]NestedField {
	.index[.ID] = 
	return .index
}

func ( *indexByID) ( ListType,  map[int]NestedField) map[int]NestedField {
	.index[.ElementID] = .ElementField()
	return .index
}

func ( *indexByID) ( MapType, ,  map[int]NestedField) map[int]NestedField {
	.index[.KeyID] = .KeyField()
	.index[.ValueID] = .ValueField()
	return .index
}

func ( *indexByID) (PrimitiveType) map[int]NestedField {
	return .index
}

// IndexByName performs a post-order traversal of the schema and returns
// a mapping from field name to field ID.
func ( *Schema) (map[string]int, error) {
	if  == nil {
		return nil, fmt.Errorf("%w: cannot index nil schema", ErrInvalidArgument)
	}

	if len(.fields) > 0 {
		 := &indexByName{
			index:           make(map[string]int),
			shortNameId:     make(map[string]int),
			fieldNames:      make([]string, 0),
			shortFieldNames: make([]string, 0),
		}
		if ,  := Visit[map[string]int](, );  != nil {
			return nil, 
		}

		return .ByName(), nil
	}
	return map[string]int{}, nil
}

// IndexNameByID performs a post-order traversal of the schema and returns
// a mapping from field ID to field name.
func ( *Schema) (map[int]string, error) {
	 := &indexByName{
		index:           make(map[string]int),
		shortNameId:     make(map[string]int),
		fieldNames:      make([]string, 0),
		shortFieldNames: make([]string, 0),
	}
	if ,  := Visit[map[string]int](, );  != nil {
		return nil, 
	}
	return .ByID(), nil
}

type indexByName struct {
	index           map[string]int
	shortNameId     map[string]int
	combinedIndex   map[string]int
	fieldNames      []string
	shortFieldNames []string
}

func ( *indexByName) () map[int]string {
	 := make(map[int]string)
	for ,  := range .index {
		[] = 
	}
	return 
}

func ( *indexByName) () map[string]int {
	.combinedIndex = maps.Clone(.shortNameId)
	maps.Copy(.combinedIndex, .index)
	return .combinedIndex
}

func ( *indexByName) (PrimitiveType) map[string]int { return .index }
func ( *indexByName) ( string,  int) {
	 := 
	if len(.fieldNames) > 0 {
		 = strings.Join(.fieldNames, ".") + "." + 
	}

	if ,  := .index[];  {
		panic(fmt.Errorf("%w: multiple fields for name %s: %d and %d",
			ErrInvalidSchema, , .index[], ))
	}

	.index[] = 
	if len(.shortFieldNames) > 0 {
		 := strings.Join(.shortFieldNames, ".") + "." + 
		.shortNameId[] = 
	}
}

func ( *indexByName) (*Schema, map[string]int) map[string]int {
	return .index
}

func ( *indexByName) (StructType, []map[string]int) map[string]int {
	return .index
}

func ( *indexByName) ( NestedField,  map[string]int) map[string]int {
	.addField(.Name, .ID)
	return .index
}

func ( *indexByName) ( ListType,  map[string]int) map[string]int {
	.addField(.ElementField().Name, .ElementID)
	return .index
}

func ( *indexByName) ( MapType, ,  map[string]int) map[string]int {
	.addField(.KeyField().Name, .KeyID)
	.addField(.ValueField().Name, .ValueID)
	return .index
}

func ( *indexByName) ( NestedField) {
	if ,  := .Type.(*StructType); ! {
		.shortFieldNames = append(.shortFieldNames, .Name)
	}
	.fieldNames = append(.fieldNames, .Name)
}

func ( *indexByName) ( NestedField) {
	if ,  := .Type.(*StructType); ! {
		.shortFieldNames = .shortFieldNames[:len(.shortFieldNames)-1]
	}
	.fieldNames = .fieldNames[:len(.fieldNames)-1]
}

func ( *indexByName) ( NestedField) {
	.fieldNames = append(.fieldNames, .Name)
	.shortFieldNames = append(.shortFieldNames, .Name)
}

func ( *indexByName) ( NestedField) {
	.fieldNames = .fieldNames[:len(.fieldNames)-1]
	.shortFieldNames = .shortFieldNames[:len(.shortFieldNames)-1]
}

// PruneColumns visits a schema pruning any columns which do not exist in the
// provided selected set. Parent fields of a selected child will be retained.
func ( *Schema,  map[int]Void,  bool) (*Schema, error) {

	,  := Visit[Type](, &pruneColVisitor{selected: ,
		fullTypes: })
	if  != nil {
		return nil, 
	}

	,  := .(NestedType)
	if ! {
		 = &StructType{}
	}

	 := make([]int, 0, len(.IdentifierFieldIDs))
	for ,  := range .IdentifierFieldIDs {
		if ,  := [];  {
			 = append(, )
		}
	}

	return &Schema{
		fields:             .Fields(),
		ID:                 .ID,
		IdentifierFieldIDs: ,
	}, nil
}

type pruneColVisitor struct {
	selected  map[int]Void
	fullTypes bool
}

func ( *pruneColVisitor) ( *Schema,  Type) Type {
	return 
}

func ( *pruneColVisitor) ( StructType,  []Type) Type {
	,  := []NestedField{}, .FieldList
	 := true

	for ,  := range  {
		 := []
		if .Type ==  {
			 = append(, )
		} else if  != nil {
			 = false
			// type has changed, create a new field with the projected type
			 = append(, NestedField{
				ID:       .ID,
				Name:     .Name,
				Type:     ,
				Doc:      .Doc,
				Required: .Required,
			})
		}
	}

	if len() > 0 {
		if len() == len() &&  {
			// nothing changed, return the original
			return &
		} else {
			return &StructType{FieldList: }
		}
	}

	return nil
}

func ( *pruneColVisitor) ( NestedField,  Type) Type {
	,  := .selected[.ID]
	if ! {
		if  != nil {
			return 
		}

		return nil
	}

	if .fullTypes {
		return .Type
	}

	if ,  := .Type.(*StructType);  {
		return .projectSelectedStruct()
	}

	,  := .Type.(PrimitiveType)
	if ! {
		panic(fmt.Errorf("%w: cannot explicitly project List or Map types, %d:%s of type %s was selected",
			ErrInvalidSchema, .ID, .Name, .Type))
	}
	return 
}

func ( *pruneColVisitor) ( ListType,  Type) Type {
	,  := .selected[.ElementID]
	if ! {
		if  != nil {
			return .projectList(&, )
		}

		return nil
	}

	if .fullTypes {
		return &
	}

	_,  = .Element.(*StructType)
	if .Element != nil &&  {
		 := .projectSelectedStruct()
		return .projectList(&, )
	}

	if _,  = .Element.(PrimitiveType); ! {
		panic(fmt.Errorf("%w: cannot explicitly project List or Map types, %d of type %s was selected",
			ErrInvalidSchema, .ElementID, .Element))
	}

	return &
}

func ( *pruneColVisitor) ( MapType, ,  Type) Type {
	,  := .selected[.ValueID]
	if ! {
		if  != nil {
			return .projectMap(&, )
		}

		if _,  = .selected[.KeyID];  {
			return &
		}

		return nil
	}

	if .fullTypes {
		return &
	}

	_,  = .ValueType.(*StructType)
	if .ValueType != nil &&  {
		 := .projectSelectedStruct()
		return .projectMap(&, )
	}

	if _,  = .ValueType.(PrimitiveType); ! {
		panic(fmt.Errorf("%w: cannot explicitly project List or Map types, Map value %d of type %s was selected",
			ErrInvalidSchema, .ValueID, .ValueType))
	}

	return &
}

func ( *pruneColVisitor) ( PrimitiveType) Type { return nil }

func (*pruneColVisitor) ( Type) *StructType {
	if  == nil {
		return &StructType{}
	}

	if ,  := .(*StructType);  {
		return 
	}

	panic("expected a struct")
}

func (*pruneColVisitor) ( *ListType,  Type) *ListType {
	if .Element.Equals() {
		return 
	}

	return &ListType{ElementID: .ElementID, Element: ,
		ElementRequired: .ElementRequired}
}

func (*pruneColVisitor) ( *MapType,  Type) *MapType {
	if .ValueType.Equals() {
		return 
	}

	return &MapType{
		KeyID:         .KeyID,
		ValueID:       .ValueID,
		KeyType:       .KeyType,
		ValueType:     ,
		ValueRequired: .ValueRequired,
	}
}

type findLastFieldID struct{}

func (findLastFieldID) ( *Schema,  int) int {
	return 
}

func (findLastFieldID) ( StructType,  []int) int {
	return max(...)
}

func (findLastFieldID) ( NestedField,  int) int {
	return max(.ID, )
}

func (findLastFieldID) ( ListType,  int) int { return  }

func (findLastFieldID) ( MapType, ,  int) int {
	return max(, )
}

func (findLastFieldID) (PrimitiveType) int { return 0 }