// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package variant

import (
	
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	
)

//go:generate go tool stringer -type=BasicType -linecomment -output=basic_type_stringer.go
//go:generate go tool stringer -type=PrimitiveType -linecomment -output=primitive_type_stringer.go

// BasicType represents the fundamental type category of a variant value.
type BasicType int

const (
	BasicUndefined   BasicType = iota - 1 // Unknown
	BasicPrimitive                        // Primitive
	BasicShortString                      // ShortString
	BasicObject                           // Object
	BasicArray                            // Array
)

func basicTypeFromHeader( byte) BasicType {
	// because we're doing hdr & 0x3, it is impossible for the result
	// to be outside of the range of BasicType. Therefore, we don't
	// need to perform any checks. The value will always be [0,3]
	return BasicType( & basicTypeMask)
}

// PrimitiveType represents specific primitive data types within the variant format.
type PrimitiveType int

const (
	PrimitiveInvalid            PrimitiveType = iota - 1 // Unknown
	PrimitiveNull                                        // Null
	PrimitiveBoolTrue                                    // BoolTrue
	PrimitiveBoolFalse                                   // BoolFalse
	PrimitiveInt8                                        // Int8
	PrimitiveInt16                                       // Int16
	PrimitiveInt32                                       // Int32
	PrimitiveInt64                                       // Int64
	PrimitiveDouble                                      // Double
	PrimitiveDecimal4                                    // Decimal32
	PrimitiveDecimal8                                    // Decimal64
	PrimitiveDecimal16                                   // Decimal128
	PrimitiveDate                                        // Date
	PrimitiveTimestampMicros                             // Timestamp(micros)
	PrimitiveTimestampMicrosNTZ                          // TimestampNTZ(micros)
	PrimitiveFloat                                       // Float
	PrimitiveBinary                                      // Binary
	PrimitiveString                                      // String
	PrimitiveTimeMicrosNTZ                               // TimeNTZ(micros)
	PrimitiveTimestampNanos                              // Timestamp(nanos)
	PrimitiveTimestampNanosNTZ                           // TimestampNTZ(nanos)
	PrimitiveUUID                                        // UUID
)

func primitiveTypeFromHeader( byte) PrimitiveType {
	return PrimitiveType(( >> basicTypeBits) & typeInfoMask)
}

// Type represents the high-level variant data type.
// This is what applications typically use to identify the type of a variant value.
type Type int

const (
	Object Type = iota
	Array
	Null
	Bool
	Int8
	Int16
	Int32
	Int64
	String
	Double
	Decimal4
	Decimal8
	Decimal16
	Date
	TimestampMicros
	TimestampMicrosNTZ
	Float
	Binary
	Time
	TimestampNanos
	TimestampNanosNTZ
	UUID
)

const (
	versionMask        uint8 = 0x0F
	sortedStrMask      uint8 = 0b10000
	basicTypeMask      uint8 = 0x3
	basicTypeBits      uint8 = 2
	typeInfoMask       uint8 = 0x3F
	hdrSizeBytes             = 1
	minOffsetSizeBytes       = 1
	maxOffsetSizeBytes       = 4

	// mask is applied after shift
	offsetSizeMask       uint8 = 0b11
	offsetSizeBitShift   uint8 = 6
	supportedVersion           = 1
	maxShortStringSize         = 0x3F
	metadataMaxSizeLimit       = 128 * 1024 * 1024 // 128MB
)

var (
	// EmptyMetadataBytes contains a minimal valid metadata section with no dictionary entries.
	EmptyMetadataBytes = [3]byte{0x1, 0, 0}

	ErrInvalidMetadata = errors.New("invalid variant metadata")
)

// Metadata represents the dictionary part of a variant value, which stores
// the keys used in object values.
type Metadata struct {
	data []byte
	keys [][]byte
}

// NewMetadata creates a Metadata instance from a raw byte slice.
// It validates the metadata format and loads the key dictionary.
func ( []byte) (Metadata, error) {
	 := Metadata{data: }
	if len() < hdrSizeBytes+minOffsetSizeBytes*2 {
		return , fmt.Errorf("%w: too short: size=%d", ErrInvalidMetadata, len())
	}

	if .Version() != supportedVersion {
		return , fmt.Errorf("%w: unsupported version: %d", ErrInvalidMetadata, .Version())
	}

	 := .OffsetSize()
	return , .loadDictionary()
}

// Clone creates a deep copy of the metadata.
func ( *Metadata) () Metadata {
	return Metadata{
		data: bytes.Clone(.data),
		// shallow copy of the values, but the slice is copied
		// more efficient, and nothing should be mutating the keys
		// so it's probably safe, but something we should keep in mind
		keys: slices.Clone(.keys),
	}
}

func ( *Metadata) ( uint8) error {
	if int(+hdrSizeBytes) > len(.data) {
		return fmt.Errorf("%w: too short for dictionary size", ErrInvalidMetadata)
	}

	 := readLEU32(.data[hdrSizeBytes : hdrSizeBytes+])
	.keys = make([][]byte, )

	if  == 0 {
		return nil
	}

	// first offset is always 0
	,  := uint32(0), hdrSizeBytes+
	 := hdrSizeBytes + (+2)*uint32()
	if hdrSizeBytes+int(+1)*int() > len(.data) {
		return fmt.Errorf("%w: offset out of range: %d > %d",
			ErrInvalidMetadata, (+hdrSizeBytes)*uint32(), len(.data))
	}

	for  := range  {
		 += 
		 := readLEU32(.data[ : +])

		 :=  - 
		 :=  + 
		if + > uint32(len(.data)) {
			return fmt.Errorf("%w: string data out of range: %d + %d > %d",
				ErrInvalidMetadata, , , len(.data))
		}
		.keys[] = .data[ : +]
		 += 
	}

	return nil
}

// Bytes returns the raw byte representation of the metadata.
func ( Metadata) () []byte { return .data }

// Version returns the metadata format version.
func ( Metadata) () uint8 { return .data[0] & versionMask }

// SortedAndUnique returns whether the keys in the metadata dictionary are sorted and unique.
func ( Metadata) () bool { return .data[0]&sortedStrMask != 0 }

// OffsetSize returns the size in bytes used to store offsets in the metadata.
func ( Metadata) () uint8 {
	return ((.data[0] >> offsetSizeBitShift) & offsetSizeMask) + 1
}

// DictionarySize returns the number of keys in the metadata dictionary.
func ( Metadata) () uint32 { return uint32(len(.keys)) }

// KeyAt returns the string key at the given dictionary ID.
// Returns an error if the ID is out of range.
func ( Metadata) ( uint32) (string, error) {
	if  >= uint32(len(.keys)) {
		return "", fmt.Errorf("invalid variant metadata: id out of range: %d >= %d",
			, len(.keys))
	}

	return unsafe.String(&.keys[][0], len(.keys[])), nil
}

// IdFor returns the dictionary IDs for the given key.
// If the metadata is sorted and unique, this performs a binary search.
// Otherwise, it performs a linear search.
//
// If the metadata is not sorted and unique, then it's possible that multiple
// IDs will be returned for the same key.
func ( Metadata) ( string) []uint32 {
	 := unsafe.Slice(unsafe.StringData(), len())

	var  []uint32
	if .SortedAndUnique() {
		,  := slices.BinarySearchFunc(.keys, , bytes.Compare)
		if  {
			 = append(, uint32())
		}

		return 
	}

	for ,  := range .keys {
		if bytes.Equal(, ) {
			 = append(, uint32())
		}
	}

	return 
}

// DecimalValue represents a decimal number with a specified scale.
// The generic parameter T can be any supported variant decimal type (Decimal32, Decimal64, Decimal128).
type DecimalValue[ decimal.DecimalTypes] struct {
	Scale uint8
	Value decimal.Num[]
}

// MarshalJSON implements the json.Marshaler interface for DecimalValue.
func ( DecimalValue[]) () ([]byte, error) {
	return []byte(.Value.ToString(int32(.Scale))), nil
}

// ArrayValue represents an array of variant values.
type ArrayValue struct {
	value []byte
	meta  Metadata

	numElements uint32
	dataStart   uint32
	offsetSize  uint8
	offsetStart uint8
}

// MarshalJSON implements the json.Marshaler interface for ArrayValue.
func ( ArrayValue) () ([]byte, error) {
	return json.Marshal(slices.Collect(.Values()))
}

// Len returns the number of elements in the array.
func ( ArrayValue) () uint32 { return .numElements }

// Values returns an iterator for the elements in the array, allowing
// for lazy evaluation of the offsets (for the situation where not all elements
// are iterated).
func ( ArrayValue) () iter.Seq[Value] {
	return func( func(Value) bool) {
		for  := range .numElements {
			 := uint32(.offsetStart) + *uint32(.offsetSize)
			 := readLEU32(.value[ : +uint32(.offsetSize)])

			 := .value[.dataStart+:]
			 := valueSize()
			 = [:] // trim to actual size

			if !(Value{value: , meta: .meta}) {
				return
			}
		}
	}
}

// Value returns the Value at the specified index.
// Returns an error if the index is out of range.
func ( ArrayValue) ( uint32) (Value, error) {
	if  >= .numElements {
		return Value{}, fmt.Errorf("%w: invalid array value: index out of range: %d >= %d",
			arrow.ErrIndex, , .numElements)
	}

	 := uint32(.offsetStart) + *uint32(.offsetSize)
	 := readLEU32(.value[ : +uint32(.offsetSize)])

	return Value{meta: .meta, value: .value[.dataStart+:]}, nil
}

// ObjectValue represents an object (map/dictionary) of key-value pairs.
type ObjectValue struct {
	value []byte
	meta  Metadata

	numElements uint32
	offsetStart uint32
	dataStart   uint32
	idSize      uint8
	offsetSize  uint8
	idStart     uint8
}

// ObjectField represents a key-value pair in an object.
type ObjectField struct {
	Key   string
	Value Value
}

// NumElements returns the number of fields in the object.
func ( ObjectValue) () uint32 { return .numElements }

// ValueByKey returns the field with the specified key.
// Returns arrow.ErrNotFound if the key doesn't exist.
func ( ObjectValue) ( string) (ObjectField, error) {
	 := .numElements

	// if total list size is smaller than threshold, linear search will
	// likely be faster than a binary search
	const  = 32
	if  <  {
		for  := range  {
			 := uint32(.idStart) + *uint32(.idSize)
			 := readLEU32(.value[ : +uint32(.idSize)])
			,  := .meta.KeyAt()
			if  != nil {
				return ObjectField{}, fmt.Errorf("invalid object value: fieldID at idx %d is not in metadata", )
			}
			if  ==  {
				 := uint32(.offsetStart) + uint32(.offsetSize)*
				 := readLEU32(.value[ : +uint32(.offsetSize)])
				return ObjectField{
					Key:   ,
					Value: Value{value: .value[.dataStart+:], meta: .meta}}, nil
			}
		}
		return ObjectField{}, arrow.ErrNotFound
	}

	,  := uint32(0), 
	for  <  {
		 := ( + ) >> 1
		 := uint32(.idStart) + *uint32(.idSize)
		 := readLEU32(.value[ : +uint32(.idSize)])
		,  := .meta.KeyAt()
		if  != nil {
			return ObjectField{}, fmt.Errorf("invalid object value: fieldID at idx %d is not in metadata", )
		}

		switch strings.Compare(, ) {
		case -1:
			 =  + 1
		case 0:
			 := uint32(.offsetStart) + uint32(.offsetSize)*
			 := readLEU32(.value[ : +uint32(.offsetSize)])

			return ObjectField{
				Key:   ,
				Value: Value{value: .value[.dataStart+:], meta: .meta}}, nil
		case 1:
			 =  - 1
		}
	}

	return ObjectField{}, arrow.ErrNotFound
}

// FieldAt returns the field at the specified index.
// Returns an error if the index is out of range.
func ( ObjectValue) ( uint32) (ObjectField, error) {
	if  >= .numElements {
		return ObjectField{}, fmt.Errorf("%w: invalid object value: index out of range: %d >= %d",
			arrow.ErrIndex, , .numElements)
	}

	 := uint32(.idStart) + *uint32(.idSize)
	 := readLEU32(.value[ : +uint32(.idSize)])
	,  := .meta.KeyAt()
	if  != nil {
		return ObjectField{}, fmt.Errorf("invalid object value: fieldID at idx %d is not in metadata", )
	}

	 := uint32(.offsetStart) + *uint32(.offsetSize)
	 := readLEU32(.value[ : +uint32(.offsetSize)])

	return ObjectField{
		Key:   ,
		Value: Value{value: .value[.dataStart+:], meta: .meta}}, nil
}

// Values returns an iterator over all key-value pairs in the object.
func ( ObjectValue) () iter.Seq2[string, Value] {
	return func( func(string, Value) bool) {
		for  := range .numElements {
			 := uint32(.idStart) + *uint32(.idSize)
			 := readLEU32(.value[ : +uint32(.idSize)])
			,  := .meta.KeyAt()
			if  != nil {
				return
			}

			 := uint32(.offsetStart) + *uint32(.offsetSize)
			 := readLEU32(.value[ : +uint32(.offsetSize)])

			 := .value[.dataStart+:]
			 := valueSize()
			if !(, Value{value: [:], meta: .meta}) {
				return
			}
		}
	}
}

// MarshalJSON implements the json.Marshaler interface for ObjectValue.
func ( ObjectValue) () ([]byte, error) {
	// for now we'll use a naive approach and just build a map
	// then marshal it. This is not the most efficient way to do this
	// but it is the simplest and most straightforward.
	 := make(map[string]Value)
	maps.Insert(, .Values())
	return json.Marshal()
}

var NullValue = Value{meta: Metadata{data: EmptyMetadataBytes[:]}, value: []byte{0}}

// Value represents a variant value of any type.
type Value struct {
	value []byte
	meta  Metadata
}

// NewWithMetadata creates a Value with the provided metadata and value bytes.
func ( Metadata,  []byte) (Value, error) {
	if len() == 0 {
		return Value{}, errors.New("invalid variant value: empty")
	}

	return Value{value: , meta: }, nil
}

// New creates a Value by parsing both the metadata and value bytes.
func (,  []byte) (Value, error) {
	,  := NewMetadata()
	if  != nil {
		return Value{}, 
	}

	return NewWithMetadata(, )
}

func ( Value) () string {
	,  := json.Marshal()
	return string()
}

// Bytes returns the raw byte representation of the value (excluding metadata).
func ( Value) () []byte { return .value }

// Clone creates a deep copy of the value including its metadata.
func ( Value) () Value {
	return Value{
		meta:  .meta.Clone(),
		value: bytes.Clone(.value),
	}
}

// Metadata returns the metadata associated with the value.
func ( Value) () Metadata { return .meta }

// BasicType returns the fundamental type category of the value.
func ( Value) () BasicType {
	return basicTypeFromHeader(.value[0])
}

// Type returns the specific data type of the value.
func ( Value) () Type {
	switch  := .BasicType();  {
	case BasicPrimitive:
		switch  := primitiveTypeFromHeader(.value[0]);  {
		case PrimitiveNull:
			return Null
		case PrimitiveBoolTrue, PrimitiveBoolFalse:
			return Bool
		case PrimitiveInt8:
			return Int8
		case PrimitiveInt16:
			return Int16
		case PrimitiveInt32:
			return Int32
		case PrimitiveInt64:
			return Int64
		case PrimitiveDouble:
			return Double
		case PrimitiveDecimal4:
			return Decimal4
		case PrimitiveDecimal8:
			return Decimal8
		case PrimitiveDecimal16:
			return Decimal16
		case PrimitiveDate:
			return Date
		case PrimitiveTimestampMicros:
			return TimestampMicros
		case PrimitiveTimestampMicrosNTZ:
			return TimestampMicrosNTZ
		case PrimitiveFloat:
			return Float
		case PrimitiveBinary:
			return Binary
		case PrimitiveString:
			return String
		case PrimitiveTimeMicrosNTZ:
			return Time
		case PrimitiveTimestampNanos:
			return TimestampNanos
		case PrimitiveTimestampNanosNTZ:
			return TimestampNanosNTZ
		case PrimitiveUUID:
			return UUID
		default:
			panic(fmt.Errorf("invalid primitive type found: %d", ))
		}
	case BasicShortString:
		return String
	case BasicObject:
		return Object
	case BasicArray:
		return Array
	default:
		panic(fmt.Errorf("invalid basic type found: %d", ))
	}
}

// Value returns the Go value representation of the variant.
// The returned type depends on the variant type:
//   - Null: nil
//   - Bool: bool
//   - Int8/16/32/64: corresponding int type
//   - Float/Double: float32/float64
//   - String: string
//   - Binary: []byte
//   - Decimal: DecimalValue
//   - Date: arrow.Date32
//   - Time: arrow.Time64
//   - Timestamp: arrow.Timestamp
//   - UUID: uuid.UUID
//   - Object: ObjectValue
//   - Array: ArrayValue
func ( Value) () any {
	switch  := .BasicType();  {
	case BasicPrimitive:
		switch  := primitiveTypeFromHeader(.value[0]);  {
		case PrimitiveNull:
			return nil
		case PrimitiveBoolTrue:
			return true
		case PrimitiveBoolFalse:
			return false
		case PrimitiveInt8:
			return readExact[int8](.value[1:])
		case PrimitiveInt16:
			return readExact[int16](.value[1:])
		case PrimitiveInt32:
			return readExact[int32](.value[1:])
		case PrimitiveInt64:
			return readExact[int64](.value[1:])
		case PrimitiveDouble:
			return readExact[float64](.value[1:])
		case PrimitiveFloat:
			return readExact[float32](.value[1:])
		case PrimitiveDate:
			return arrow.Date32(readExact[int32](.value[1:]))
		case PrimitiveTimestampMicros, PrimitiveTimestampMicrosNTZ,
			PrimitiveTimestampNanos, PrimitiveTimestampNanosNTZ:
			return arrow.Timestamp(readExact[int64](.value[1:]))
		case PrimitiveTimeMicrosNTZ:
			return arrow.Time64(readExact[int64](.value[1:]))
		case PrimitiveUUID:
			debug.Assert(len(.value[1:]) == 16, "invalid UUID length")
			return uuid.Must(uuid.FromBytes(.value[1:]))
		case PrimitiveBinary:
			 := binary.LittleEndian.Uint32(.value[1:5])
			return .value[5 : 5+]
		case PrimitiveString:
			 := binary.LittleEndian.Uint32(.value[1:5])
			return unsafe.String(&.value[5], )
		case PrimitiveDecimal4:
			 := uint8(.value[1])
			 := decimal.Decimal32(readExact[int32](.value[2:]))
			return DecimalValue[decimal.Decimal32]{Scale: , Value: }
		case PrimitiveDecimal8:
			 := uint8(.value[1])
			 := decimal.Decimal64(readExact[int64](.value[2:]))
			return DecimalValue[decimal.Decimal64]{Scale: , Value: }
		case PrimitiveDecimal16:
			 := uint8(.value[1])
			 := readLEU64(.value[2:10])
			 := readExact[int64](.value[10:])
			return DecimalValue[decimal.Decimal128]{
				Scale: ,
				Value: decimal128.New(, ),
			}
		}
	case BasicShortString:
		 := int(.value[0] >> 2)
		if  > 0 {
			return unsafe.String(&.value[1], )
		}
		return ""
	case BasicObject:
		 := (.value[0] >> basicTypeBits)
		 := ( & 0b11) + 1
		 := (( >> 2) & 0b11) + 1
		 := (( >> 4) & 0b1) == 1

		var  uint8 = 1
		if  {
			 = 4
		}

		debug.Assert(len(.value) >= int(1+), "invalid object value: too short")
		 := readLEU32(.value[1 : 1+])
		 := uint32(1 + )
		 :=  + *uint32()
		 :=  + (+1)*uint32()

		debug.Assert( <= uint32(len(.value)), "invalid object value: dataStart out of range")
		return ObjectValue{
			value:       .value,
			meta:        .meta,
			numElements: ,
			offsetStart: ,
			dataStart:   ,
			idSize:      ,
			offsetSize:  ,
			idStart:     uint8(),
		}
	case BasicArray:
		 := (.value[0] >> basicTypeBits)
		 := ( & 0b11) + 1
		 := ( & 0b1) == 1

		var (
			                     int
			,  int
		)

		if  {
			,  = int(readLEU32(.value[1:5])), 5
		} else {
			,  = int(.value[1]), 2
		}

		 =  + (+1)*int()
		debug.Assert( <= len(.value), "invalid array value: dataStart out of range")
		return ArrayValue{
			value:       .value,
			meta:        .meta,
			numElements: uint32(),
			dataStart:   uint32(),
			offsetSize:  ,
			offsetStart: uint8(),
		}
	}

	debug.Assert(false, "unsupported type")
	return nil
}

// MarshalJSON implements the json.Marshaler interface for Value.
func ( Value) () ([]byte, error) {
	 := .Value()
	switch t := .(type) {
	case arrow.Date32:
		 = .FormattedString()
	case arrow.Timestamp:
		switch  := primitiveTypeFromHeader(.value[0]);  {
		case PrimitiveTimestampMicros:
			 = .ToTime(arrow.Microsecond).Format("2006-01-02 15:04:05.999999Z0700")
		case PrimitiveTimestampMicrosNTZ:
			 = .ToTime(arrow.Microsecond).In(time.Local).Format("2006-01-02 15:04:05.999999Z0700")
		case PrimitiveTimestampNanos:
			 = .ToTime(arrow.Nanosecond).Format("2006-01-02 15:04:05.999999999Z0700")
		case PrimitiveTimestampNanosNTZ:
			 = .ToTime(arrow.Nanosecond).In(time.Local).Format("2006-01-02 15:04:05.999999999Z0700")
		}
	case arrow.Time64:
		 = .ToTime(arrow.Microsecond).In(time.Local).Format("15:04:05.999999Z0700")
	}

	return json.Marshal()
}