package avro

import (
	
	
	
)

type recursionError struct{}

func ( recursionError) () string {
	return ""
}

type compatKey struct {
	reader [32]byte
	writer [32]byte
}

// SchemaCompatibility determines the compatibility of schemas.
type SchemaCompatibility struct {
	cache sync.Map // map[compatKey]error
}

// NewSchemaCompatibility creates a new schema compatibility instance.
func () *SchemaCompatibility {
	return &SchemaCompatibility{}
}

// Compatible determines the compatibility if the reader and writer schemas.
func ( *SchemaCompatibility) (,  Schema) error {
	return .compatible(, )
}

func ( *SchemaCompatibility) (,  Schema) error {
	 := compatKey{reader: .Fingerprint(), writer: .Fingerprint()}
	if ,  := .cache.Load();  {
		if ,  := .(recursionError);  {
			// Break the recursion here.
			return nil
		}

		if  == nil {
			return nil
		}

		return .(error)
	}

	.cache.Store(, recursionError{})
	 := .match(, )
	if  != nil {
		// We dont want to pay the cost of fmt.Errorf every time
		 = errors.New(.Error())
	}
	.cache.Store(, )
	return 
}

func ( *SchemaCompatibility) (,  Schema) error {
	// If the schema is a reference, get the actual schema
	if .Type() == Ref {
		 = .(*RefSchema).Schema()
	}
	if .Type() == Ref {
		 = .(*RefSchema).Schema()
	}

	if .Type() != .Type() {
		if .Type() == Union {
			// Reader must be compatible with all types in writer
			for ,  := range .(*UnionSchema).Types() {
				if  := .compatible(, );  != nil {
					return 
				}
			}

			return nil
		}

		if .Type() == Union {
			// Writer must be compatible with at least one reader schema
			var  error
			for ,  := range .(*UnionSchema).Types() {
				 = .compatible(, )
				if  == nil {
					return nil
				}
			}

			return fmt.Errorf("reader union lacking writer schema %s", .Type())
		}

		switch .Type() {
		case Int:
			if .Type() == Long || .Type() == Float || .Type() == Double {
				return nil
			}

		case Long:
			if .Type() == Float || .Type() == Double {
				return nil
			}

		case Float:
			if .Type() == Double {
				return nil
			}

		case String:
			if .Type() == Bytes {
				return nil
			}

		case Bytes:
			if .Type() == String {
				return nil
			}
		}

		return fmt.Errorf("reader schema %s not compatible with writer schema %s", .Type(), .Type())
	}

	switch .Type() {
	case Array:
		return .compatible(.(*ArraySchema).Items(), .(*ArraySchema).Items())

	case Map:
		return .compatible(.(*MapSchema).Values(), .(*MapSchema).Values())

	case Fixed:
		 := .(*FixedSchema)
		 := .(*FixedSchema)

		if  := .checkSchemaName(, );  != nil {
			return 
		}

		if  := .checkFixedSize(, );  != nil {
			return 
		}

	case Enum:
		 := .(*EnumSchema)
		 := .(*EnumSchema)

		if  := .checkSchemaName(, );  != nil {
			return 
		}

		if  := .checkEnumSymbols(, );  != nil {
			if .HasDefault() {
				return nil
			}
			return 
		}

	case Record:
		 := .(*RecordSchema)
		 := .(*RecordSchema)

		if  := .checkSchemaName(, );  != nil {
			return 
		}

		if  := .checkRecordFields(, );  != nil {
			return 
		}

	case Union:
		for ,  := range .(*UnionSchema).Types() {
			if  := .compatible(, );  != nil {
				return 
			}
		}
	}

	return nil
}

func ( *SchemaCompatibility) (,  NamedSchema) error {
	if .Name() != .Name() {
		if .contains(.Aliases(), .FullName()) {
			return nil
		}
		return fmt.Errorf("reader schema %s and writer schema %s  names do not match", .FullName(), .FullName())
	}

	return nil
}

func ( *SchemaCompatibility) (,  *FixedSchema) error {
	if .Size() != .Size() {
		return fmt.Errorf("%s reader and writer fixed sizes do not match", .FullName())
	}

	return nil
}

func ( *SchemaCompatibility) (,  *EnumSchema) error {
	for ,  := range .Symbols() {
		if !.contains(.Symbols(), ) {
			return fmt.Errorf("reader %s is missing symbol %s", .FullName(), )
		}
	}

	return nil
}

func ( *SchemaCompatibility) (,  *RecordSchema) error {
	for ,  := range .Fields() {
		,  := .getField(.Fields(), , func( *getFieldOptions) {
			.fieldAlias = true
		})
		if ! {
			if .HasDefault() {
				continue
			}

			return fmt.Errorf("reader field %s is missing in writer schema and has no default", .Name())
		}

		if  := .compatible(.Type(), .Type());  != nil {
			return 
		}
	}

	return nil
}

func ( *SchemaCompatibility) ( []string,  string) bool {
	for ,  := range  {
		if  ==  {
			return true
		}
	}

	return false
}

type getFieldOptions struct {
	fieldAlias bool
	elemAlias  bool
}

func ( *SchemaCompatibility) ( []*Field,  *Field,  ...func(*getFieldOptions)) (*Field, bool) {
	 := getFieldOptions{}
	for ,  := range  {
		(&)
	}
	for ,  := range  {
		if .Name() == .Name() {
			return , true
		}
		if .fieldAlias {
			if .contains(.Aliases(), .Name()) {
				return , true
			}
		}
		if .elemAlias {
			if .contains(.Aliases(), .Name()) {
				return , true
			}
		}
	}

	return nil, false
}

// Resolve returns a composite schema that allows decoding data written by the writer schema,
// and makes necessary adjustments to support the reader schema.
//
// It fails if the writer and reader schemas are not compatible.
func ( *SchemaCompatibility) (,  Schema) (Schema, error) {
	if  := .compatible(, );  != nil {
		return nil, 
	}

	, ,  := .resolve(, )
	return , 
}

// resolve requires the reader's schema to be already compatible with the writer's.
func ( *SchemaCompatibility) (,  Schema) ( Schema,  bool,  error) {
	if .Type() == Ref {
		 = .(*RefSchema).Schema()
	}
	if .Type() == Ref {
		 = .(*RefSchema).Schema()
	}

	if .Type() != .Type() {
		if .Type() == Union {
			for ,  := range .(*UnionSchema).Types() {
				// Compatibility is not guaranteed for every Union reader schema.
				// Therefore, we need to check compatibility in every iteration.
				if  := .compatible(, );  != nil {
					continue
				}
				, ,  := .(, )
				if  != nil {
					continue
				}
				return , true, nil
			}

			return nil, false, fmt.Errorf("reader union lacking writer schema %s", .Type())
		}

		if .Type() == Union {
			 := make([]Schema, 0)
			for ,  := range .(*UnionSchema).Types() {
				, ,  := .(, )
				if  != nil {
					return nil, false, 
				}
				 = append(, )
			}
			,  := NewUnionSchema(, withWriterFingerprint(.Fingerprint()))
			return , true, 
		}

		if isPromotable(.Type(), .Type()) {
			 := NewPrimitiveSchema(.Type(), .(*PrimitiveSchema).Logical(),
				withWriterFingerprint(.Fingerprint()),
			)
			.encodedType = .Type()
			return , true, nil
		}

		return nil, false, fmt.Errorf("failed to resolve composite schema for %s and %s", .Type(), .Type())
	}

	if isNative(.Type()) {
		return , false, nil
	}

	if .Type() == Enum {
		 := .(*EnumSchema)
		 := .(*EnumSchema)
		if  = .checkEnumSymbols(, );  != nil {
			if .HasDefault() {
				,  := NewEnumSchema(.Name(), .Namespace(), .Symbols(),
					WithAliases(.Aliases()),
					WithDefault(.Default()),
					withWriterFingerprint(.Fingerprint()),
				)
				.encodedSymbols = .Symbols()
				return , true, nil
			}

			return nil, false, 
		}
		return , false, nil
	}

	if .Type() == Fixed {
		return , false, nil
	}

	if .Type() == Union {
		 := make([]Schema, 0)
		for ,  := range .(*UnionSchema).Types() {
			, ,  := .(, )
			if  != nil {
				return nil, false, 
			}
			 = append(, )
			 =  || 
		}
		,  := NewUnionSchema(, withWriterFingerprintIfResolved(.Fingerprint(), ))
		if  != nil {
			return nil, false, 
		}
		return , , nil
	}

	if .Type() == Array {
		, ,  = .(.(*ArraySchema).Items(), .(*ArraySchema).Items())
		if  != nil {
			return nil, false, 
		}
		return NewArraySchema(, withWriterFingerprintIfResolved(.Fingerprint(), )), , nil
	}

	if .Type() == Map {
		, ,  = .(.(*MapSchema).Values(), .(*MapSchema).Values())
		if  != nil {
			return nil, false, 
		}
		return NewMapSchema(, withWriterFingerprintIfResolved(.Fingerprint(), )), , nil
	}

	if .Type() == Record {
		return .resolveRecord(, )
	}

	return nil, false, fmt.Errorf("failed to resolve composite schema for %s and %s", .Type(), .Type())
}

func ( *SchemaCompatibility) (,  Schema) (Schema, bool, error) {
	 := .(*RecordSchema)
	 := .(*RecordSchema)

	 := make([]*Field, 0)
	 := make(map[string]struct{})

	var  bool
	for ,  := range .Fields() {
		,  := .getField(.Fields(), , func( *getFieldOptions) {
			.elemAlias = true
		})
		if ! {
			// The field was not found in the reader schema, it should be ignored.
			,  := NewField(.Name(), .Type(), WithAliases(.aliases), WithOrder(.order))
			.def = .def
			.hasDef = .hasDef
			.action = FieldIgnore
			 = append(, )

			 = true
			continue
		}

		, ,  := .resolve(.Type(), .Type())
		if  != nil {
			return nil, false, 
		}
		,  := NewField(.Name(), , WithAliases(.aliases), WithOrder(.order))
		.def = .def
		.hasDef = .hasDef
		 = append(, )
		 =  || 

		[.Name()] = struct{}{}
	}

	for ,  := range .Fields() {
		if ,  := [.Name()];  {
			// This field has already been seen.
			continue
		}

		// The schemas are already known to be compatible, so there must be a default on
		// the field in the writer. Use the default.

		,  := NewField(.Name(), .Type(), WithAliases(.aliases), WithOrder(.order))
		.def = .def
		.hasDef = .hasDef
		.action = FieldSetDefault
		 = append(, )

		 = true
	}

	,  := NewRecordSchema(.Name(), .Namespace(), ,
		WithAliases(.Aliases()),
		withWriterFingerprintIfResolved(.Fingerprint(), ),
	)
	return , , 
}

func isNative( Type) bool {
	switch  {
	case Null, Boolean, Int, Long, Float, Double, Bytes, String:
		return true
	default:
		return false
	}
}

func isPromotable(,  Type) bool {
	switch  {
	case Int:
		return  == Long ||  == Float ||  == Double
	case Long:
		return  == Float ||  == Double
	case Float:
		return  == Double
	case String:
		return  == Bytes
	case Bytes:
		return  == String
	default:
		return false
	}
}