package avro

import (
	
	
	
	
	
	

	
	jsoniter 
)

// DefaultSchemaCache is the default cache for schemas.
var DefaultSchemaCache = &SchemaCache{}

// SkipNameValidation sets whether to skip name validation.
// Avro spec incurs a strict naming convention for names and aliases, however official Avro tools do not follow that
// More info:
// https://lists.apache.org/thread/39v98os6wdpyr6w31xdkz0yzol51fsrr
// https://github.com/apache/avro/pull/1995
var SkipNameValidation = false

// Parse parses a schema string.
func ( string) (Schema, error) {
	return ParseBytes([]byte())
}

// ParseWithCache parses a schema string using the given namespace and schema cache.
func (,  string,  *SchemaCache) (Schema, error) {
	return ParseBytesWithCache([]byte(), , )
}

// MustParse parses a schema string, panicking if there is an error.
func ( string) Schema {
	,  := Parse()
	if  != nil {
		panic()
	}

	return 
}

// ParseFiles parses the schemas in the files, in the order they appear, returning the last schema.
//
// This is useful when your schemas rely on other schemas.
func ( ...string) (Schema, error) {
	var  Schema
	for ,  := range  {
		,  := os.ReadFile(filepath.Clean())
		if  != nil {
			return nil, 
		}

		,  = ParseBytes()
		if  != nil {
			return nil, 
		}
	}

	return , nil
}

// ParseBytes parses a schema byte slice.
func ( []byte) (Schema, error) {
	return ParseBytesWithCache(, "", DefaultSchemaCache)
}

// ParseBytesWithCache parses a schema byte slice using the given namespace and schema cache.
func ( []byte,  string,  *SchemaCache) (Schema, error) {
	var  any
	if  := jsoniter.Unmarshal(, &);  != nil {
		 = string()
	}

	 := &SchemaCache{}
	.AddAll()

	 := seenCache{}
	,  := parseType(, , , )
	if  != nil {
		return nil, 
	}

	.AddAll()

	return derefSchema(), nil
}

func parseType( string,  any,  seenCache,  *SchemaCache) (Schema, error) {
	switch val := .(type) {
	case nil:
		return &NullSchema{}, nil

	case string:
		return parsePrimitiveType(, , )

	case map[string]any:
		return parseComplexType(, , , )

	case []any:
		return parseUnion(, , , )
	}

	return nil, fmt.Errorf("avro: unknown type: %v", )
}

func parsePrimitiveType(,  string,  *SchemaCache) (Schema, error) {
	 := Type()
	switch  {
	case Null:
		return &NullSchema{}, nil

	case String, Bytes, Int, Long, Float, Double, Boolean:
		return parsePrimitive(, nil)

	default:
		 := .Get(fullName(, ))
		if  != nil {
			return , nil
		}

		return nil, fmt.Errorf("avro: unknown type: %s", )
	}
}

func parseComplexType( string,  map[string]any,  seenCache,  *SchemaCache) (Schema, error) {
	if ,  := ["type"].([]any);  {
		// Note: According to the spec, this is not allowed:
		// 		https://avro.apache.org/docs/1.12.0/specification/#schema-declaration
		// The "type" property in an object must be a string. A union type will be a slice,
		// but NOT an object with a "type" property that is a slice.
		// Might be advisable to remove this call (tradeoff between better conformance
		// with the spec vs. possible backwards-compatibility issue).
		return parseUnion(, , , )
	}

	,  := ["type"].(string)
	if ! {
		return nil, fmt.Errorf("avro: unknown type: %+v", )
	}
	 := Type()

	switch  {
	case String, Bytes, Int, Long, Float, Double, Boolean, Null:
		return parsePrimitive(, )

	case Record, Error:
		return parseRecord(, , , , )

	case Enum:
		return parseEnum(, , , )

	case Array:
		return parseArray(, , , )

	case Map:
		return parseMap(, , , )

	case Fixed:
		return parseFixed(, , , )

	default:
		return parseType(, string(), , )
	}
}

type primitiveSchema struct {
	Type  string         `mapstructure:"type"`
	Props map[string]any `mapstructure:",remain"`
}

func parsePrimitive( Type,  map[string]any) (Schema, error) {
	if len() == 0 {
		if  == Null {
			return &NullSchema{}, nil
		}
		return NewPrimitiveSchema(, nil), nil
	}

	var (
		    primitiveSchema
		 mapstructure.Metadata
	)
	if  := decodeMap(, &, &);  != nil {
		return nil, fmt.Errorf("avro: error decoding primitive: %w", )
	}

	var  LogicalSchema
	if  := logicalTypeProperty(.Props);  != "" {
		 = parsePrimitiveLogicalType(, , .Props)
		if  != nil {
			delete(.Props, "logicalType")
		}
	}

	if  == Null {
		return NewNullSchema(WithProps(.Props)), nil
	}
	return NewPrimitiveSchema(, , WithProps(.Props)), nil
}

func parsePrimitiveLogicalType( Type,  string,  map[string]any) LogicalSchema {
	 := LogicalType()
	if ( == String &&  == UUID) ||
		( == Int &&  == Date) ||
		( == Int &&  == TimeMillis) ||
		( == Long &&  == TimeMicros) ||
		( == Long &&  == TimestampMillis) ||
		( == Long &&  == TimestampMicros) ||
		( == Long &&  == LocalTimestampMillis) ||
		( == Long &&  == LocalTimestampMicros) {
		return NewPrimitiveLogicalSchema()
	}

	if  == Bytes &&  == Decimal {
		return parseDecimalLogicalType(-1, )
	}

	return nil // otherwise, not a recognized logical type
}

type recordSchema struct {
	Type      string           `mapstructure:"type"`
	Name      string           `mapstructure:"name"`
	Namespace string           `mapstructure:"namespace"`
	Aliases   []string         `mapstructure:"aliases"`
	Doc       string           `mapstructure:"doc"`
	Fields    []map[string]any `mapstructure:"fields"`
	Props     map[string]any   `mapstructure:",remain"`
}

func parseRecord( Type,  string,  map[string]any,  seenCache,  *SchemaCache) (Schema, error) {
	var (
		    recordSchema
		 mapstructure.Metadata
	)
	if  := decodeMap(, &, &);  != nil {
		return nil, fmt.Errorf("avro: error decoding record: %w", )
	}

	if  := checkParsedName(.Name);  != nil {
		return nil, 
	}
	if .Namespace == "" {
		.Namespace = 
	}

	if !hasKey(.Keys, "fields") {
		return nil, errors.New("avro: record must have an array of fields")
	}
	 := make([]*Field, len(.Fields))

	var (
		 *RecordSchema
		 error
	)
	switch  {
	case Record:
		,  = NewRecordSchema(.Name, .Namespace, ,
			WithAliases(.Aliases), WithDoc(.Doc), WithProps(.Props),
		)
	case Error:
		,  = NewErrorRecordSchema(.Name, .Namespace, ,
			WithAliases(.Aliases), WithDoc(.Doc), WithProps(.Props),
		)
	}
	if  != nil {
		return nil, 
	}

	if  = .Add(.FullName());  != nil {
		return nil, 
	}

	 := NewRefSchema()
	.Add(.FullName(), )
	for ,  := range .Aliases() {
		.Add(, )
	}

	 := make(map[string]struct{})
	for ,  := range .Fields {
		,  := parseField(.namespace, , , )
		if  != nil {
			return nil, 
		}

		if ,  := [.name];  {
			return nil, fmt.Errorf("avro: duplicate field name %q", .name)
		}
		[.name] = struct{}{}

		[] = 
	}

	return , nil
}

type fieldSchema struct {
	Name    string         `mapstructure:"name"`
	Aliases []string       `mapstructure:"aliases"`
	Type    any            `mapstructure:"type"`
	Doc     string         `mapstructure:"doc"`
	Default any            `mapstructure:"default"`
	Order   Order          `mapstructure:"order"`
	Props   map[string]any `mapstructure:",remain"`
}

func parseField( string,  map[string]any,  seenCache,  *SchemaCache) (*Field, error) {
	var (
		    fieldSchema
		 mapstructure.Metadata
	)
	if  := decodeMap(, &, &);  != nil {
		return nil, fmt.Errorf("avro: error decoding field: %w", )
	}

	if  := checkParsedName(.Name);  != nil {
		return nil, 
	}

	if !hasKey(.Keys, "type") {
		return nil, errors.New("avro: field requires a type")
	}
	,  := parseType(, .Type, , )
	if  != nil {
		return nil, 
	}

	if !hasKey(.Keys, "default") {
		.Default = NoDefault
	}

	,  := NewField(.Name, ,
		WithDefault(.Default), WithAliases(.Aliases), WithDoc(.Doc), WithOrder(.Order), WithProps(.Props),
	)
	if  != nil {
		return nil, 
	}

	return , nil
}

type enumSchema struct {
	Name      string         `mapstructure:"name"`
	Namespace string         `mapstructure:"namespace"`
	Aliases   []string       `mapstructure:"aliases"`
	Type      string         `mapstructure:"type"`
	Doc       string         `mapstructure:"doc"`
	Symbols   []string       `mapstructure:"symbols"`
	Default   string         `mapstructure:"default"`
	Props     map[string]any `mapstructure:",remain"`
}

func parseEnum( string,  map[string]any,  seenCache,  *SchemaCache) (Schema, error) {
	var (
		    enumSchema
		 mapstructure.Metadata
	)
	if  := decodeMap(, &, &);  != nil {
		return nil, fmt.Errorf("avro: error decoding enum: %w", )
	}

	if  := checkParsedName(.Name);  != nil {
		return nil, 
	}
	if .Namespace == "" {
		.Namespace = 
	}

	,  := NewEnumSchema(.Name, .Namespace, .Symbols,
		WithDefault(.Default), WithAliases(.Aliases), WithDoc(.Doc), WithProps(.Props),
	)
	if  != nil {
		return nil, 
	}

	if  = .Add(.FullName());  != nil {
		return nil, 
	}

	 := NewRefSchema()
	.Add(.FullName(), )
	for ,  := range .Aliases() {
		.Add(, )
	}

	return , nil
}

type arraySchema struct {
	Type  string         `mapstructure:"type"`
	Items any            `mapstructure:"items"`
	Props map[string]any `mapstructure:",remain"`
}

func parseArray( string,  map[string]any,  seenCache,  *SchemaCache) (Schema, error) {
	var (
		    arraySchema
		 mapstructure.Metadata
	)
	if  := decodeMap(, &, &);  != nil {
		return nil, fmt.Errorf("avro: error decoding array: %w", )
	}

	if !hasKey(.Keys, "items") {
		return nil, errors.New("avro: array must have an items key")
	}
	,  := parseType(, .Items, , )
	if  != nil {
		return nil, 
	}

	return NewArraySchema(, WithProps(.Props)), nil
}

type mapSchema struct {
	Type   string         `mapstructure:"type"`
	Values any            `mapstructure:"values"`
	Props  map[string]any `mapstructure:",remain"`
}

func parseMap( string,  map[string]any,  seenCache,  *SchemaCache) (Schema, error) {
	var (
		   mapSchema
		 mapstructure.Metadata
	)
	if  := decodeMap(, &, &);  != nil {
		return nil, fmt.Errorf("avro: error decoding map: %w", )
	}

	if !hasKey(.Keys, "values") {
		return nil, errors.New("avro: map must have an values key")
	}
	,  := parseType(, .Values, , )
	if  != nil {
		return nil, 
	}

	return NewMapSchema(, WithProps(.Props)), nil
}

func parseUnion( string,  []any,  seenCache,  *SchemaCache) (Schema, error) {
	var  error
	 := make([]Schema, len())
	for  := range  {
		[],  = parseType(, [], , )
		if  != nil {
			return nil, 
		}
	}

	return NewUnionSchema()
}

type fixedSchema struct {
	Name      string         `mapstructure:"name"`
	Namespace string         `mapstructure:"namespace"`
	Aliases   []string       `mapstructure:"aliases"`
	Type      string         `mapstructure:"type"`
	Size      int            `mapstructure:"size"`
	Props     map[string]any `mapstructure:",remain"`
}

func parseFixed( string,  map[string]any,  seenCache,  *SchemaCache) (Schema, error) {
	var (
		    fixedSchema
		 mapstructure.Metadata
	)
	if  := decodeMap(, &, &);  != nil {
		return nil, fmt.Errorf("avro: error decoding fixed: %w", )
	}

	if  := checkParsedName(.Name);  != nil {
		return nil, 
	}
	if .Namespace == "" {
		.Namespace = 
	}

	if !hasKey(.Keys, "size") {
		return nil, errors.New("avro: fixed must have a size")
	}

	var  LogicalSchema
	if  := logicalTypeProperty(.Props);  != "" {
		 = parseFixedLogicalType(.Size, , .Props)
		if  != nil {
			delete(.Props, "logicalType")
		}
	}

	,  := NewFixedSchema(.Name, .Namespace, .Size, , WithAliases(.Aliases), WithProps(.Props))
	if  != nil {
		return nil, 
	}

	if  = .Add(.FullName());  != nil {
		return nil, 
	}

	 := NewRefSchema()
	.Add(.FullName(), )
	for ,  := range .Aliases() {
		.Add(, )
	}

	return , nil
}

func parseFixedLogicalType( int,  string,  map[string]any) LogicalSchema {
	 := LogicalType()
	switch {
	case  == Duration &&  == 12:
		return NewPrimitiveLogicalSchema(Duration)
	case  == Decimal:
		return parseDecimalLogicalType(, )
	}

	return nil
}

type decimalSchema struct {
	Precision int `mapstructure:"precision"`
	Scale     int `mapstructure:"scale"`
}

func parseDecimalLogicalType( int,  map[string]any) LogicalSchema {
	var (
		    decimalSchema
		 mapstructure.Metadata
	)
	if  := decodeMap(, &, &);  != nil {
		return nil
	}
	 := newDecimalLogicalType(, .Precision, .Scale)
	if  != nil {
		// Remove the properties that we consumed
		delete(, "precision")
		delete(, "scale")
	}
	return 
}

func newDecimalLogicalType(, ,  int) LogicalSchema {
	if  <= 0 {
		return nil
	}

	if  > 0 {
		 := int(math.Round(math.Floor(math.Log10(2) * (8*float64() - 1))))
		if  >  {
			return nil
		}
	}

	if  < 0 {
		return nil
	}

	// Scale may not be bigger than precision
	if  >  {
		return nil
	}

	return NewDecimalLogicalSchema(, )
}

func fullName(,  string) string {
	if len() == 0 || strings.ContainsRune(, '.') {
		return 
	}

	return  + "." + 
}

func checkParsedName( string) error {
	if  == "" {
		return errors.New("avro: non-empty name key required")
	}
	return nil
}

func hasKey( []string,  string) bool {
	for ,  := range  {
		if  ==  {
			return true
		}
	}
	return false
}

func decodeMap(,  any,  *mapstructure.Metadata) error {
	 := &mapstructure.DecoderConfig{
		ZeroFields: true,
		Metadata:   ,
		Result:     ,
	}

	,  := mapstructure.NewDecoder()
	return .Decode()
}

func derefSchema( Schema) Schema {
	 := map[string]struct{}{}

	return walkSchema(, func( Schema) Schema {
		if ,  := .(NamedSchema);  {
			if ,  := [.FullName()];  {
				// This NamedSchema has been seen in this run, it needs
				// to be turned into a reference. It is possible it was
				// dereferenced in a previous run.
				return NewRefSchema()
			}

			[.FullName()] = struct{}{}
			return 
		}

		,  := .(*RefSchema)
		if ! {
			return 
		}

		if ,  := [.Schema().FullName()]; ! {
			[.Schema().FullName()] = struct{}{}
			return .Schema()
		}
		return 
	})
}

type seenCache map[string]struct{}

func ( seenCache) ( string) error {
	if ,  := [];  {
		return fmt.Errorf("duplicate name %q", )
	}
	[] = struct{}{}
	return nil
}

func logicalTypeProperty( map[string]any) string {
	if ,  := ["logicalType"].(string);  {
		return 
	}
	return ""
}