// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package extensions

import (
	
	
	
	
	
	
	

	
	
	
	
	
	
	
	
	
	
)

// VariantType is the arrow extension type for representing Variant values as
// defined by the Parquet Variant specification for encoding and shredding values.
// The underlying storage must be a struct type with a minimum of two fields
// ("metadata" and "value") and an optional third field ("typed_value").
//
// See the documentation for [NewVariantType] for the rules for creating a variant
// type.
type VariantType struct {
	arrow.ExtensionBase

	metadataFieldIdx   int
	valueFieldIdx      int
	typedValueFieldIdx int
}

// NewDefaultVariantType creates a basic, non-shredded variant type. The underlying
// storage type will be struct<metadata: binary non-null, value: binary non-null>.
func () *VariantType {
	 := arrow.StructOf(
		arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary, Nullable: false},
		arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: false})

	,  := NewVariantType()
	return 
}

func createShreddedField( arrow.DataType) arrow.DataType {
	switch t := .(type) {
	case arrow.ListLikeType:
		return arrow.ListOfNonNullable(arrow.StructOf(
			arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
			arrow.Field{Name: "typed_value", Type: (.Elem()), Nullable: true},
		))
	case *arrow.StructType:
		 := make([]arrow.Field, 0, .NumFields())
		for  := range .NumFields() {
			 := .Field()
			 = append(, arrow.Field{
				Name: .Name,
				Type: arrow.StructOf(arrow.Field{
					Name:     "value",
					Type:     arrow.BinaryTypes.Binary,
					Nullable: true,
				}, arrow.Field{
					Name:     "typed_value",
					Type:     (.Type),
					Nullable: true,
				}),
				Nullable: false,
				Metadata: .Metadata,
			})
		}
		return arrow.StructOf(...)
	default:
		return 
	}
}

// NewShreddedVariantType creates a new VariantType extension type using the provided
// type to define a shredded schema by setting the `typed_value` field accordingly and
// properly constructing the shredded fields for structs, lists and so on.
//
// For example:
//
//	NewShreddedVariantType(arrow.StructOf(
//	     arrow.Field{Name: "latitude", Type: arrow.PrimitiveTypes.Float64},
//	     arrow.Field{Name: "longitude", Type: arrow.PrimitiveTypes.Float32}))
//
// Will create a variant type with the following structure:
//
//	arrow.StructOf(
//	     arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary, Nullable: false},
//	     arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
//	     arrow.Field{Name: "typed_value", Type: arrow.StructOf(
//	       arrow.Field{Name: "latitude", Type: arrow.StructOf(
//	         arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
//	         arrow.Field{Name: "typed_value", Type: arrow.PrimitiveTypes.Float64, Nullable: true}),
//	         Nullable: false},
//	     arrow.Field{Name: "longitude", Type: arrow.StructOf(
//	         arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
//	         arrow.Field{Name: "typed_value", Type: arrow.PrimitiveTypes.Float32, Nullable: true}),
//	         Nullable: false},
//	 ), Nullable: true})
//
// This is intended to be a convenient way to create a shredded variant type from a definition
// of the fields to shred. If the provided data type is nil, it will create a default
// variant type.
func ( arrow.DataType) *VariantType {
	if  == nil {
		return NewDefaultVariantType()
	}

	,  := NewVariantType(arrow.StructOf(
		arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary, Nullable: false},
		arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
		arrow.Field{
			Name:     "typed_value",
			Type:     createShreddedField(),
			Nullable: true,
		}))
	return 
}

// NewVariantType creates a new variant type based on the provided storage type.
//
// The rules for a variant storage type are:
//  1. MUST be a struct
//  2. MUST have non-nullable field named "metadata" that is binary/largebinary/binary_view
//  3. Must satisfy exactly one of the following:
//     a. MUST have non-nullable field named "value" that is binary/largebinary/binary_view
//     b. MUST have an nullable field named "value" that is binary/largebinary/binary_view
//     and another nullable field named "typed_value" that is either a primitive type or
//     a list/large_list/list_view or struct which also satisfies the following requirements:
//     i. The elements must be NON-NULLABLE
//     ii. There must either be a single NON-NULLABLE field named "value" which is
//     binary/largebinary/binary_view or have an nullable "value" field and an nullable
//     "typed_value" field that follows the rules laid out in (b).
//
// The metadata field may also be dictionary encoded
func ( arrow.DataType) (*VariantType, error) {
	,  := .(*arrow.StructType)
	if ! {
		return nil, fmt.Errorf("%w: bad storage type %s for variant type", arrow.ErrInvalid, )
	}

	var (
		   = -1
		      = -1
		 = -1
	)

	if ,  = .FieldIdx("metadata"); ! {
		return nil, fmt.Errorf("%w: missing non-nullable field 'metadata' in variant storage type %s", arrow.ErrInvalid, )
	}

	var ,  bool
	,  = .FieldIdx("value")
	,  = .FieldIdx("typed_value")

	if ! && ! {
		return nil, fmt.Errorf("%w: there must be at least one of 'value' or 'typed_value' fields in variant storage type %s", arrow.ErrInvalid, )
	}

	if .NumFields() == 3 && (! || !) {
		return nil, fmt.Errorf("%w: has 3 fields, but missing one of 'value' or 'typed_value' fields, %s", arrow.ErrInvalid, )
	}

	if .NumFields() > 3 {
		return nil, fmt.Errorf("%w: too many fields in variant storage type %s, expected 2 or 3", arrow.ErrInvalid, )
	}

	 := .Field()
	if .Nullable {
		return nil, fmt.Errorf("%w: metadata field must be non-nullable binary type, got %s", arrow.ErrInvalid, .Type)
	}

	if !isBinary(.Type) {
		if .Type.ID() != arrow.DICTIONARY || !isBinary(.Type.(*arrow.DictionaryType).ValueType) {
			return nil, fmt.Errorf("%w: metadata field must be non-nullable binary type, got %s", arrow.ErrInvalid, .Type)
		}
	}

	if  {
		 := .Field()
		if !isBinary(.Type) {
			return nil, fmt.Errorf("%w: value field must be binary type, got %s", arrow.ErrInvalid, .Type)
		}
	}

	if ! {
		return &VariantType{
			ExtensionBase:      arrow.ExtensionBase{Storage: },
			metadataFieldIdx:   ,
			valueFieldIdx:      ,
			typedValueFieldIdx: -1,
		}, nil
	}

	 := .Field()
	if !.Nullable {
		return nil, fmt.Errorf("%w: typed_value field must be nullable, got %s", arrow.ErrInvalid, .Type)
	}

	 := .Type
	if .ID() == arrow.EXTENSION {
		 = .(arrow.ExtensionType).StorageType()
	}

	if ,  := .(arrow.NestedType);  {
		if !validNestedType() {
			return nil, fmt.Errorf("%w: typed_value field must be a valid nested type, got %s", arrow.ErrInvalid, .Type)
		}
	}

	return &VariantType{
		ExtensionBase:      arrow.ExtensionBase{Storage: },
		metadataFieldIdx:   ,
		valueFieldIdx:      ,
		typedValueFieldIdx: ,
	}, nil
}

func (*VariantType) () reflect.Type {
	return reflect.TypeOf(VariantArray{})
}

func ( *VariantType) () arrow.Field {
	return .StorageType().(*arrow.StructType).Field(.metadataFieldIdx)
}

func ( *VariantType) () arrow.Field {
	if .valueFieldIdx == -1 {
		return arrow.Field{}
	}
	return .StorageType().(*arrow.StructType).Field(.valueFieldIdx)
}

func ( *VariantType) () arrow.Field {
	if .typedValueFieldIdx == -1 {
		return arrow.Field{}
	}

	return .StorageType().(*arrow.StructType).Field(.typedValueFieldIdx)
}

func (*VariantType) () string { return "parquet.variant" }

func ( *VariantType) () string {
	return fmt.Sprintf("extension<%s>", .ExtensionName())
}

func ( *VariantType) ( arrow.ExtensionType) bool {
	return .ExtensionName() == .ExtensionName() &&
		arrow.TypeEqual(.Storage, .StorageType())
}

func (*VariantType) () string { return "" }
func (*VariantType) ( arrow.DataType,  string) (arrow.ExtensionType, error) {
	return NewVariantType()
}

func (*VariantType) () schema.LogicalType {
	return schema.VariantLogicalType{}
}

func ( *VariantType) ( memory.Allocator) array.Builder {
	return NewVariantBuilder(, )
}

func isBinary( arrow.DataType) bool {
	return .ID() == arrow.BINARY || .ID() == arrow.LARGE_BINARY ||
		.ID() == arrow.BINARY_VIEW
}

func validStruct( *arrow.StructType) bool {
	switch .NumFields() {
	case 1:
		 := .Field(0)
		return (.Name == "value" && isBinary(.Type)) || .Name == "typed_value"
	case 2:
		,  := .FieldByName("value")
		if ! || !.Nullable || !isBinary(.Type) {
			return false
		}

		,  := .FieldByName("typed_value")
		if ! || !.Nullable {
			return false
		}

		if ,  := .Type.(arrow.NestedType);  && .Name() != "extension" {
			return validNestedType()
		}

		return true
	default:
		return false
	}
}

func validNestedType( arrow.NestedType) bool {
	switch t := .(type) {
	case arrow.ListLikeType:
		if .ElemField().Nullable {
			return false
		}

		,  := .Elem().(*arrow.StructType)
		if ! {
			return false
		}

		return validStruct()
	case *arrow.StructType:
		if .NumFields() == 0 {
			return false
		}

		for  := range .NumFields() {
			 := .Field()
			if .Nullable {
				return false
			}

			,  := .Type.(*arrow.StructType)
			if ! {
				return false
			}

			if !validStruct() {
				return false
			}
		}

		return true
	default:
		return false
	}
}

// VariantArray is an extension Array type containing Variant values which may
// potentially be shredded into multiple fields.
type VariantArray struct {
	array.ExtensionArrayBase

	rdr     variantReader
	rdrErr  error
	initRdr sync.Once
}

func ( *VariantArray) () {
	// initialize a reader that coalesces shredded fields back into a variant
	// or just returns the basic variants if the array is not shredded.
	.initRdr.Do(func() {
		 := .ExtensionType().(*VariantType)
		 := .Storage().(*array.Struct)
		 := .Field(.metadataFieldIdx)
		,  := .(arrow.TypedArray[[]byte])
		if ! {
			// we already validated that if the metadata field isn't a binary
			// type directly, it must be a dictionary with a binary value type.
			, _ = array.NewDictWrapper[[]byte](.(*array.Dictionary))
		}

		var  arrow.TypedArray[[]byte]
		if .valueFieldIdx != -1 {
			 := .Field(.valueFieldIdx)
			 = .(arrow.TypedArray[[]byte])
		}

		var  typedValReader
		var  error
		if .typedValueFieldIdx != -1 {
			,  = getReader(.Field(.typedValueFieldIdx))
			if  != nil {
				.rdrErr = 
				return
			}
			.rdr = &shreddedVariantReader{
				metadata:   ,
				value:      ,
				typedValue: ,
			}
		} else {
			.rdr = &basicVariantReader{
				metadata: ,
				value:    ,
			}
		}
	})
}

// Metadata returns the metadata column of the variant array, containing the
// metadata for each variant value.
func ( *VariantArray) () arrow.TypedArray[[]byte] {
	 := .ExtensionType().(*VariantType)
	return .Storage().(*array.Struct).Field(.metadataFieldIdx).(arrow.TypedArray[[]byte])
}

// UntypedValues returns the untyped variant values for each element of the array,
// if the array is not shredded this will contain the variant bytes for each value.
// If the array is shredded, this will contain any variant values that are either
// partially shredded objects or are not shredded at all (e.g. a value that doesnt
// match the types of the shredding).
//
// The shredded array and the untyped values array together are used to encode a
// single value. If this is not encoding shredded object fields, then a given index
// will never be null in both arrays. (A null value will be an encoded null variant value
// in this array with a null in the shredded array).
//
// If both arrays are null for a given index (only valid for shredded object fields),
// it means that the value is missing entirely (as opposed to existing and having a
// value of null).
func ( *VariantArray) () arrow.TypedArray[[]byte] {
	 := .ExtensionType().(*VariantType)
	if .valueFieldIdx == -1 {
		return nil
	}
	return .Storage().(*array.Struct).Field(.valueFieldIdx).(arrow.TypedArray[[]byte])
}

// Shredded returns the typed array for the shredded values of the variant array,
// following the rules of the Parquet Variant specification. As such, this array will
// always be either a struct, a list, or a primitive array.
//
// The reason for exposing this is to allow users to quickly access one of the shredded
// fields without having to decode the entire variant value.
func ( *VariantArray) () arrow.Array {
	 := .ExtensionType().(*VariantType)
	if .typedValueFieldIdx == -1 {
		return nil
	}

	return .Storage().(*array.Struct).Field(.typedValueFieldIdx)
}

// IsShredded returns true if the variant has shredded columns.
func ( *VariantArray) () bool {
	return .ExtensionType().(*VariantType).typedValueFieldIdx != -1
}

// IsNull will also take into account the special case where there is an
// encoded null variant in the untyped values array for this index and return
// appropriately.
func ( *VariantArray) ( int) bool {
	if .Storage().IsNull() {
		return true
	}

	 := .ExtensionType().(*VariantType)
	if .typedValueFieldIdx != -1 {
		 := .Storage().(*array.Struct).Field(.typedValueFieldIdx)
		if !.IsNull() {
			return false
		}
	}

	 := .Storage().(*array.Struct).Field(.valueFieldIdx)
	 := .(arrow.TypedArray[[]byte]).Value()
	return len() == 1 && [0] == 0 // variant null
}

func ( *VariantArray) ( int) bool {
	return !.IsNull()
}

func ( *VariantArray) () string {
	 := new(strings.Builder)
	.WriteString("VariantArray[")
	for  := 0;  < .Len(); ++ {
		if  > 0 {
			.WriteString(" ")
		}
		if .IsNull() {
			.WriteString(array.NullValueStr)
			continue
		}

		,  := .Value()
		if  != nil {
			fmt.Fprintf(, "error: %v", )
			continue
		}

		.WriteString(.String())
	}
	.WriteString("]")
	return .String()
}

func ( *VariantArray) ( int) (variant.Value, error) {
	.initReader()
	if .rdrErr != nil {
		return variant.Value{}, .rdrErr
	}

	return .rdr.Value()
}

func ( *VariantArray) () ([]variant.Value, error) {
	 := make([]variant.Value, .Len())
	for  := range .Len() {
		,  := .Value()
		if  != nil {
			return nil, fmt.Errorf("error getting value at index %d: %w", , )
		}
		[] = 
	}
	return , nil
}

func ( *VariantArray) ( int) string {
	if .IsNull() {
		return array.NullValueStr
	}

	,  := .Value()
	if  != nil {
		return fmt.Sprintf("error: %v", )
	}

	return .String()
}

func ( *VariantArray) () ([]byte, error) {
	 := make([]any, .Len())
	for  := range .Len() {
		if .IsNull() {
			[] = nil
			continue
		}

		,  := .Value()
		if  != nil {
			[] = fmt.Sprintf("error: %v", )
			continue
		}

		[] = .Value()
	}
	return json.Marshal()
}

func ( *VariantArray) ( int) any {
	if .IsNull() {
		return nil
	}

	,  := .Value()
	if  != nil {
		return fmt.Sprintf("error: %v", )
	}

	return .Value()
}

type variantReader interface {
	IsNull(i int) bool
	Value(i int) (variant.Value, error)
}

type basicVariantReader struct {
	metadata arrow.TypedArray[[]byte]
	value    arrow.TypedArray[[]byte]
}

func ( *basicVariantReader) ( int) bool {
	if .value.IsNull() {
		return true
	}

	// special case for null variant
	 := .value.Value()
	return len() == 1 && [0] == 0
}

func ( *basicVariantReader) ( int) (variant.Value, error) {
	if .IsNull() {
		return variant.NullValue, nil
	}

	 := .metadata.Value()
	 := .value.Value()

	return variant.New(, )
}

func createPrimitiveVariantReader( arrow.Array) (typedValReader, error) {
	switch a := .(type) {
	case *array.Boolean:
		return asVariantReader[bool]{typedVal: }, nil
	case *array.Int8:
		return asVariantReader[int8]{typedVal: }, nil
	case *array.Uint8:
		return asVariantReader[uint8]{typedVal: }, nil
	case *array.Int16:
		return asVariantReader[int16]{typedVal: }, nil
	case *array.Uint16:
		return asVariantReader[uint16]{typedVal: }, nil
	case *array.Int32:
		return asVariantReader[int32]{typedVal: }, nil
	case *array.Uint32:
		return asVariantReader[uint32]{typedVal: }, nil
	case *array.Int64:
		return asVariantReader[int64]{typedVal: }, nil
	case *array.Float32:
		return asVariantReader[float32]{typedVal: }, nil
	case *array.Float64:
		return asVariantReader[float64]{typedVal: }, nil
	case array.StringLike:
		return asVariantReader[string]{typedVal: }, nil
	case arrow.TypedArray[[]byte]:
		return asVariantReader[[]byte]{typedVal: }, nil
	case *array.Date32:
		return asVariantReader[arrow.Date32]{typedVal: }, nil
	case *array.Time64:
		if .DataType().(*arrow.Time64Type).Unit != arrow.Microsecond {
			return nil, fmt.Errorf("%w: unsupported time64 unit %s for variant",
				arrow.ErrInvalid, .DataType().(*arrow.Time64Type).Unit)
		}
		return asVariantReader[arrow.Time64]{typedVal: }, nil
	case *array.Timestamp:
		var  variant.AppendOpt
		 := .DataType().(*arrow.TimestampType)
		switch .Unit {
		case arrow.Microsecond:
		case arrow.Nanosecond:
			 |= variant.OptTimestampNano
		default:
			return nil, fmt.Errorf("%w: unsupported timestamp unit %s for variant",
				arrow.ErrInvalid, .DataType().(*arrow.TimestampType).Unit)
		}

		if .TimeZone == "UTC" {
			 |= variant.OptTimestampUTC
		}

		return asVariantReader[arrow.Timestamp]{typedVal: , opts: }, nil
	case *UUIDArray:
		return asVariantReader[uuid.UUID]{typedVal: }, nil
	case *array.Decimal32:
		return asVariantReader[variant.DecimalValue[decimal.Decimal32]]{
			typedVal: decimal32Wrapper{
				Decimal32: ,
				scale:     uint8(.DataType().(*arrow.Decimal32Type).Scale),
			},
		}, nil
	case *array.Decimal64:
		return asVariantReader[variant.DecimalValue[decimal.Decimal64]]{
			typedVal: decimal64Wrapper{
				Decimal64: ,
				scale:     uint8(.DataType().(*arrow.Decimal64Type).Scale),
			},
		}, nil
	case *array.Decimal128:
		return asVariantReader[variant.DecimalValue[decimal.Decimal128]]{
			typedVal: decimal128Wrapper{
				Decimal128: ,
				scale:      uint8(.DataType().(*arrow.Decimal128Type).Scale),
			},
		}, nil
	}

	return nil, fmt.Errorf("%w: unsupported primitive type %s for variant",
		arrow.ErrInvalid, .DataType().String())
}

type decimal32Wrapper struct {
	*array.Decimal32

	scale uint8
}

func ( decimal32Wrapper) ( int) variant.DecimalValue[decimal.Decimal32] {
	return variant.DecimalValue[decimal.Decimal32]{
		Value: .Decimal32.Value(),
		Scale: .scale,
	}
}

type decimal64Wrapper struct {
	*array.Decimal64

	scale uint8
}

func ( decimal64Wrapper) ( int) variant.DecimalValue[decimal.Decimal64] {
	return variant.DecimalValue[decimal.Decimal64]{
		Value: .Decimal64.Value(),
		Scale: .scale,
	}
}

type decimal128Wrapper struct {
	*array.Decimal128

	scale uint8
}

func ( decimal128Wrapper) ( int) variant.DecimalValue[decimal.Decimal128] {
	return variant.DecimalValue[decimal.Decimal128]{
		Value: .Decimal128.Value(),
		Scale: .scale,
	}
}

type arrowVariantPrimitiveType interface {
	arrow.NumericType | bool | string | []byte | uuid.UUID |
		variant.DecimalValue[decimal.Decimal32] |
		variant.DecimalValue[decimal.Decimal64] |
		variant.DecimalValue[decimal.Decimal128]
}

type typedArr[ arrowVariantPrimitiveType] interface {
	arrow.Array
	Value(int) 
}

type asVariantReader[ arrowVariantPrimitiveType] struct {
	typedVal typedArr[]

	opts variant.AppendOpt
}

func ( asVariantReader[]) ( int) bool {
	return .typedVal.IsNull()
}

func ( asVariantReader[]) ( variant.Metadata,  int) (any, error) {
	if .typedVal.IsNull() {
		return nil, nil
	}

	return variant.Encode(.typedVal.Value(), .opts)
}

func getReader( arrow.Array) (typedValReader, error) {
	switch arr := .(type) {
	case *array.Struct:
		 := make(map[string]fieldReaderPair)
		 := .DataType().(*arrow.StructType).Fields()
		for  := range .NumField() {
			 := .Field().(*array.Struct)
			 := .DataType().(*arrow.StructType)

			,  := .FieldIdx("value")
			var  arrow.TypedArray[[]byte]
			if  != -1 {
				 = .Field().(arrow.TypedArray[[]byte])
			}

			,  := .FieldIdx("typed_value")
			if ! {
				[[].Name] = fieldReaderPair{
					values:   ,
					typedVal: nil,
				}
				continue
			}

			,  := (.Field())
			if  != nil {
				return nil, fmt.Errorf("error getting typed value reader for field %s: %w", [].Name, )
			}

			[[].Name] = fieldReaderPair{
				values:   ,
				typedVal: ,
			}
		}

		return &typedObjReader{
			objArr:    ,
			fieldRdrs: ,
		}, nil
	case array.ListLike:
		 := .ListValues().(*array.Struct)
		 := .DataType().(*arrow.StructType)

		var  arrow.TypedArray[[]byte]
		var  typedValReader

		,  := .FieldIdx("value")
		if  != -1 {
			 = .Field().(arrow.TypedArray[[]byte])
		}

		,  := .FieldIdx("typed_value")
		if  != -1 {
			var  error
			,  = (.Field())
			if  != nil {
				return nil, fmt.Errorf("error getting typed value reader: %w", )
			}
		}

		return &typedListReader{
			listArr:  ,
			valueArr: ,
			typedVal: ,
		}, nil
	default:
		return createPrimitiveVariantReader()
	}
}

type typedPair struct {
	Value      []byte
	TypedValue any
}

func constructVariant( *variant.Builder,  variant.Metadata,  []byte,  any) error {
	switch v := .(type) {
	case nil:
		if len() == 0 {
			.AppendNull()
			return nil
		}

		return .UnsafeAppendEncoded()
	case map[string]typedPair:
		 := make([]variant.FieldEntry, 0, len())
		 := .Offset()
		for ,  := range  {
			if .TypedValue != nil || len(.Value) != 0 {
				 = append(, .NextField(, ))
				if  := (, , .Value, .TypedValue);  != nil {
					return 
				}
			}
		}

		if len() > 0 {
			,  := variant.NewWithMetadata(, )
			if  != nil {
				return 
			}

			,  := .Value().(variant.ObjectValue)
			if ! {
				return fmt.Errorf("%w: expected object value, got %T", arrow.ErrInvalid, .Value())
			}

			for ,  := range .Values() {
				 = append(, .NextField(, ))
				if  := .UnsafeAppendEncoded(.Bytes());  != nil {
					return fmt.Errorf("error appending field %s: %w", , )
				}
			}
		}

		return .FinishObject(, )
	case []typedPair:
		debug.Assert(len() == 0, "shredded array must not conflict with variant value")

		 := make([]int, 0, len())
		 := .Offset()
		for ,  := range  {
			 = append(, .NextElement())
			if  := (, , .Value, .TypedValue);  != nil {
				return 
			}
		}

		return .FinishArray(, )
	case []byte:
		if len() > 0 {
			return errors.New("invalid variant, conflicting value and typed_value")
		}
		return .UnsafeAppendEncoded()
	default:
		return fmt.Errorf("%w: unsupported typed value type %T for variant", arrow.ErrInvalid, )
	}
}

type typedValReader interface {
	Value(meta variant.Metadata, i int) (any, error)
	IsNull(i int) bool
}

type fieldReaderPair struct {
	values   arrow.TypedArray[[]byte]
	typedVal typedValReader
}

type typedObjReader struct {
	objArr    *array.Struct
	fieldRdrs map[string]fieldReaderPair
}

func ( *typedObjReader) ( int) bool {
	return .objArr.IsNull()
}

func ( *typedObjReader) ( variant.Metadata,  int) (any, error) {
	if .objArr.IsNull() {
		return nil, nil
	}

	var  error
	 := make(map[string]typedPair)
	for ,  := range .fieldRdrs {
		var  any
		if .typedVal != nil {
			,  = .typedVal.Value(, )
			if  != nil {
				return nil, fmt.Errorf("error reading typed value for field %s at index %d: %w", , , )
			}
		}

		var  []byte
		if .values != nil {
			 = .values.Value()
		}

		[] = typedPair{
			Value:      ,
			TypedValue: ,
		}
	}
	return , nil
}

type typedListReader struct {
	listArr array.ListLike

	valueArr arrow.TypedArray[[]byte]
	typedVal typedValReader
}

func ( *typedListReader) ( int) bool {
	return .listArr.IsNull()
}

func ( *typedListReader) ( variant.Metadata,  int) (any, error) {
	if .listArr.IsNull() {
		return nil, nil
	}

	,  := .listArr.ValueOffsets()
	if  ==  {
		return []typedPair{}, nil
	}

	 := make([]typedPair, 0, -)
	for  := ;  < ; ++ {
		var  []byte
		if .valueArr != nil {
			 = .valueArr.Value(int())
		}

		,  := .typedVal.Value(, int())
		if  != nil {
			return nil, fmt.Errorf("error reading typed value at index %d: %w", , )
		}

		 = append(, typedPair{
			Value:      ,
			TypedValue: ,
		})
	}

	return , nil
}

type shreddedVariantReader struct {
	metadata arrow.TypedArray[[]byte]
	value    arrow.TypedArray[[]byte]

	typedValue typedValReader
}

func ( *shreddedVariantReader) ( int) bool {
	if !.typedValue.IsNull() {
		return false
	}

	if .value.IsNull() {
		return true
	}

	 := .value.Value()
	return len() == 1 && [0] == 0 // variant null
}

func ( *shreddedVariantReader) ( int) (variant.Value, error) {
	 := .metadata.Value()
	,  := variant.NewMetadata()
	if  != nil {
		return variant.NullValue, fmt.Errorf("error reading metadata at index %d: %w", , )
	}

	 := variant.NewBuilderFromMeta()
	.SetAllowDuplicates(true)
	,  := .typedValue.Value(, )
	if  != nil {
		return variant.NullValue, fmt.Errorf("error reading typed value at index %d: %w", , )
	}

	var  []byte
	if .value != nil {
		 = .value.Value()
	}
	if  := constructVariant(, , , );  != nil {
		return variant.NullValue, fmt.Errorf("error constructing variant at index %d: %w", , )
	}
	return .Build()
}

// VariantBuilder is an array builder for both shredded or non-shredded variant extension
// arrays. It allows you to append variant values, and will appropriately shred them
// if it is able to do so based on the underlying storage type.
type VariantBuilder struct {
	*array.ExtensionBuilder
	shreddedSchema shreddedSchema

	structBldr *array.StructBuilder
	metaBldr   array.BinaryLikeBuilder
	valueBldr  array.BinaryLikeBuilder
	typedBldr  shreddedBuilder
}

type binaryDictBuilderAdapter struct {
	*array.BinaryDictionaryBuilder
}

func ( *binaryDictBuilderAdapter) (int) {}

func ( *binaryDictBuilderAdapter) ( [][]byte,  []bool) {
	if len() == 0 {
		for ,  := range  {
			.Append()
		}
		return
	}

	for ,  := range  {
		if ! {
			.AppendNull()
		} else {
			.Append([])
		}
	}
}

func ( *binaryDictBuilderAdapter) ( []byte) {
	.Append()
}

func ( *binaryDictBuilderAdapter) ( []byte) {
	if  := .BinaryDictionaryBuilder.Append();  != nil {
		panic(fmt.Sprintf("error appending value %s to binary dictionary builder: %v", string(), ))
	}
}

// NewVariantBuilder creates a new VariantBuilder for the given variant type which may
// or may not be shredded.
func ( memory.Allocator,  *VariantType) *VariantBuilder {
	 := getShreddedSchema()
	 := array.NewExtensionBuilder(, )

	 := .StorageBuilder().(*array.StructBuilder)
	var  shreddedBuilder
	if .typedIdx != -1 {
		 = createShreddedBuilder(.typedSchema, .FieldBuilder(.typedIdx))
	}

	var  array.BinaryLikeBuilder
	switch b := .FieldBuilder(.metadataIdx).(type) {
	case *array.BinaryDictionaryBuilder:
		 = &binaryDictBuilderAdapter{BinaryDictionaryBuilder: }
	case array.BinaryLikeBuilder:
		 = 
	}

	return &VariantBuilder{
		ExtensionBuilder: ,
		shreddedSchema:   ,
		structBldr:       ,
		metaBldr:         ,
		valueBldr:        .FieldBuilder(.variantIdx).(array.BinaryLikeBuilder),
		typedBldr:        ,
	}
}

func ( *VariantBuilder) ( variant.Value) {
	.structBldr.Append(true)
	.metaBldr.Append(.Metadata().Bytes())
	if .typedBldr == nil {
		.valueBldr.Append(.Bytes())
		return
	}

	 := .typedBldr.tryTyped()
	if len() > 0 {
		.valueBldr.Append()
	} else {
		.valueBldr.AppendNull()
	}
}

func ( *VariantBuilder) ( *json.Decoder) error {
	for .More() {
		if  := .UnmarshalOne();  != nil {
			return 
		}
	}
	return nil
}

func ( *VariantBuilder) ( []byte) error {
	 := json.NewDecoder(bytes.NewReader())
	.UseNumber()

	,  := .Token()
	if  != nil {
		return 
	}

	if ,  := .(json.Delim); ! ||  != '[' {
		return fmt.Errorf("variant builder must unpack from json array, found %s", )
	}

	return .Unmarshal()
}

func ( *VariantBuilder) ( *json.Decoder) error {
	,  := variant.Unmarshal(, false)
	if  != nil {
		return fmt.Errorf("error unmarshalling variant value: %w", )
	}

	.Append()
	return nil
}

func variantTypeFromArrow( arrow.DataType) variant.Type {
	switch .ID() {
	case arrow.BOOL:
		return variant.Bool
	case arrow.INT8:
		return variant.Int8
	case arrow.INT16, arrow.UINT8:
		return variant.Int16
	case arrow.INT32, arrow.UINT16:
		return variant.Int32
	case arrow.INT64, arrow.UINT32:
		return variant.Int64
	case arrow.FLOAT32:
		return variant.Float
	case arrow.FLOAT64:
		return variant.Double
	case arrow.STRING, arrow.LARGE_STRING, arrow.STRING_VIEW:
		return variant.String
	case arrow.BINARY, arrow.LARGE_BINARY, arrow.BINARY_VIEW:
		return variant.Binary
	case arrow.DATE32:
		return variant.Date
	case arrow.TIME64:
		if .(*arrow.Time64Type).Unit == arrow.Microsecond {
			return variant.Time
		}
	case arrow.TIMESTAMP:
		 := .(*arrow.TimestampType)
		 := .TimeZone == "" || .TimeZone == "UTC"
		switch .Unit {
		case arrow.Microsecond:
			if  {
				return variant.TimestampMicros
			}
			return variant.TimestampMicrosNTZ
		case arrow.Nanosecond:
			if  {
				return variant.TimestampNanos
			}
			return variant.TimestampNanosNTZ
		}
	case arrow.DECIMAL32:
		return variant.Decimal4
	case arrow.DECIMAL64:
		return variant.Decimal8
	case arrow.DECIMAL128:
		return variant.Decimal16
	case arrow.EXTENSION:
		,  := .(arrow.ExtensionType)
		if ! {
			break
		}

		switch .ExtensionName() {
		case "arrow.uuid":
			return variant.UUID
		case "arrow.json":
			return variant.String
		}
	}

	panic(fmt.Sprintf("unsupported arrow type %s for variant", .String()))
}

type variantSchema interface {
	Type() variant.Type
}

type objFieldSchema struct {
	fieldName string
	schema    variantSchema

	variantIdx int
	typedIdx   int
}

type shreddedObjSchema struct {
	fields    []objFieldSchema
	schemaMap map[string]int
}

func (shreddedObjSchema) () variant.Type {
	return variant.Object
}

type shreddedArraySchema struct {
	elemSchema variantSchema

	elemVariantIdx int
	elemTypedIdx   int
}

func (shreddedArraySchema) () variant.Type {
	return variant.Array
}

type shreddedPrimitiveSchema struct {
	typ variant.Type
}

func ( shreddedPrimitiveSchema) () variant.Type {
	return .typ
}

type shreddedSchema struct {
	metadataIdx int
	variantIdx  int
	typedIdx    int

	typedSchema variantSchema
}

func getVariantSchema( arrow.DataType) variantSchema {
	switch dt := .(type) {
	case *arrow.StructType:
		 := make([]objFieldSchema, 0, .NumFields())
		 := make(map[string]int)

		for ,  := range .Fields() {
			 := .Type.(*arrow.StructType)

			,  := .FieldIdx("value")
			,  := .FieldIdx("typed_value")

			 = append(, objFieldSchema{
				fieldName:  .Name,
				schema:     (.Field().Type),
				variantIdx: ,
				typedIdx:   ,
			})

			[.Name] = 
		}

		return shreddedObjSchema{
			fields:    ,
			schemaMap: ,
		}
	case arrow.ListLikeType:
		 := .Elem().(*arrow.StructType)

		,  := .FieldIdx("value")
		,  := .FieldIdx("typed_value")

		 := (.Field().Type)
		return shreddedArraySchema{
			elemSchema:     ,
			elemVariantIdx: ,
			elemTypedIdx:   ,
		}
	default:
		return shreddedPrimitiveSchema{typ: variantTypeFromArrow()}
	}
}

func getShreddedSchema( *VariantType) shreddedSchema {
	 := .StorageType().(*arrow.StructType)

	var  variantSchema
	if .typedValueFieldIdx != -1 {
		 = getVariantSchema(.Field(.typedValueFieldIdx).Type)
	}
	return shreddedSchema{
		metadataIdx: .metadataFieldIdx,
		variantIdx:  .valueFieldIdx,
		typedIdx:    .typedValueFieldIdx,
		typedSchema: ,
	}
}

type shreddedBuilder interface {
	AppendMissing()
	tryTyped(v variant.Value) (residual []byte)
}

type shreddedArrayBuilder struct {
	listBldr *array.ListBuilder
	elemBldr *array.StructBuilder

	valueBldr array.BinaryLikeBuilder
	typedBldr shreddedBuilder
}

func ( *shreddedArrayBuilder) () {
	.listBldr.Append(true)
	.elemBldr.Append(true)
	.valueBldr.AppendNull()
	.typedBldr.AppendMissing()
}

func ( *shreddedArrayBuilder) ( variant.Value) ( []byte) {
	if .Type() != variant.Array {
		.listBldr.AppendNull()
		return .Bytes()
	}

	.listBldr.Append(true)
	 := .Value().(variant.ArrayValue)
	if .Len() == 0 {
		.listBldr.AppendEmptyValue()
		return nil
	}

	for  := range .Values() {
		.elemBldr.Append(true)
		 = .typedBldr.tryTyped()
		if len() > 0 {
			.valueBldr.Append()
		} else {
			.valueBldr.AppendNull()
		}
	}

	return nil
}

type shreddedPrimitiveBuilder struct {
	typedBldr array.Builder
}

func ( *shreddedPrimitiveBuilder) () {
	.typedBldr.AppendNull()
}

type typedBuilder[ arrow.ValueType] interface {
	Type() arrow.DataType
	Append()
}

func appendToTarget[ int8 | uint8 | int16 | uint16 | int32 | uint32 | int64]( typedBuilder[],  int64) bool {
	if int64(()) ==  {
		.Append(())
		return true
	}

	return false
}

func appendNumericToTarget[ int8 | uint8 | int16 | uint16 | int32 | uint32 | int64]( typedBuilder[],  variant.Value) bool {
	switch val := .Value().(type) {
	case int8:
		return appendToTarget(, int64())
	case int16:
		return appendToTarget(, int64())
	case int32:
		return appendToTarget(, int64())
	case int64:
		return appendToTarget(, )
	}

	return false
}

func decimalCanFit[ decimal.Decimal32 | decimal.Decimal64 | decimal.Decimal128]( arrow.DecimalType,  variant.DecimalValue[]) bool {
	if .GetScale() != int32(.Scale) {
		return false
	}

	return .Value.FitsInPrecision(.GetPrecision())
}

func ( *shreddedPrimitiveBuilder) ( variant.Value) ( []byte) {
	if .Type() == variant.Null {
		.typedBldr.AppendNull()
		return .Bytes()
	}

	switch bldr := .typedBldr.(type) {
	case *array.Int8Builder:
		if appendNumericToTarget(, ) {
			return nil
		}
	case *array.Uint8Builder:
		if appendNumericToTarget(, ) {
			return nil
		}
	case *array.Int16Builder:
		if appendNumericToTarget(, ) {
			return nil
		}
	case *array.Uint16Builder:
		if appendNumericToTarget(, ) {
			return nil
		}
	case *array.Int32Builder:
		if appendNumericToTarget(, ) {
			return nil
		}
	case *array.Uint32Builder:
		if appendNumericToTarget(, ) {
			return nil
		}
	case *array.Int64Builder:
		if appendNumericToTarget(, ) {
			return nil
		}
	case *array.Float32Builder:
		switch .Type() {
		case variant.Float:
			.Append(.Value().(float32))
			return nil
		case variant.Double:
			 := .Value().(float64)
			if  >= -math.MaxFloat32 &&  <= math.MaxFloat32 {
				.Append(float32())
				return nil
			}
		}
	case *array.Float64Builder:
		switch .Type() {
		case variant.Float:
			.Append(float64(.Value().(float32)))
			return nil
		case variant.Double:
			.Append(.Value().(float64))
			return nil
		}
	case *array.BooleanBuilder:
		if .Type() == variant.Bool {
			.Append(.Value().(bool))
			return nil
		}
	case array.StringLikeBuilder:
		if .Type() == variant.String {
			.Append(.Value().(string))
			return nil
		}
	case array.BinaryLikeBuilder:
		if .Type() == variant.Binary {
			.Append(.Value().([]byte))
			return nil
		}
	case *array.Date32Builder:
		if .Type() == variant.Date {
			.Append(.Value().(arrow.Date32))
			return nil
		}
	case *array.Time64Builder:
		if .Type() == variant.Time && .Type().(*arrow.Time64Type).Unit == arrow.Microsecond {
			.Append(.Value().(arrow.Time64))
			return nil
		}
	case *UUIDBuilder:
		if .Type() == variant.UUID {
			.Append(.Value().(uuid.UUID))
			return nil
		}
	case *array.TimestampBuilder:
		 := .Type().(*arrow.TimestampType)
		switch .Type() {
		case variant.TimestampMicros:
			if .TimeZone != "UTC" {
				break
			}

			switch .Unit {
			case arrow.Microsecond:
				.Append(.Value().(arrow.Timestamp))
				return nil
			case arrow.Nanosecond:
				.Append(.Value().(arrow.Timestamp) * 1000)
				return nil
			}
		case variant.TimestampMicrosNTZ:
			if .TimeZone != "" {
				break
			}

			switch .Unit {
			case arrow.Microsecond:
				.Append(.Value().(arrow.Timestamp))
				return nil
			case arrow.Nanosecond:
				.Append(.Value().(arrow.Timestamp) * 1000)
				return nil
			}
		case variant.TimestampNanos:
			if .TimeZone == "UTC" && .Unit == arrow.Nanosecond {
				.Append(.Value().(arrow.Timestamp))
				return nil
			}
		case variant.TimestampNanosNTZ:
			if .TimeZone == "" && .Unit == arrow.Nanosecond {
				.Append(.Value().(arrow.Timestamp))
				return nil
			}
		}
	case *array.Decimal32Builder:
		 := .Type().(*arrow.Decimal32Type)
		switch val := .Value().(type) {
		case variant.DecimalValue[decimal.Decimal32]:
			if decimalCanFit(, ) {
				.Append(.Value.(decimal.Decimal32))
				return nil
			}
		case variant.DecimalValue[decimal.Decimal64]:
			if decimalCanFit(, ) {
				.Append(decimal.Decimal32(.Value.(decimal.Decimal64)))
				return nil
			}
		case variant.DecimalValue[decimal.Decimal128]:
			if decimalCanFit(, ) {
				.Append(decimal.Decimal32(.Value.(decimal.Decimal128).LowBits()))
				return nil
			}
		}
	case *array.Decimal64Builder:
		 := .Type().(*arrow.Decimal64Type)
		switch val := .Value().(type) {
		case variant.DecimalValue[decimal.Decimal32]:
			if decimalCanFit(, ) {
				.Append(decimal.Decimal64(.Value.(decimal.Decimal32)))
				return nil
			}
		case variant.DecimalValue[decimal.Decimal64]:
			if decimalCanFit(, ) {
				.Append(.Value.(decimal.Decimal64))
				return nil
			}
		case variant.DecimalValue[decimal.Decimal128]:
			if decimalCanFit(, ) {
				.Append(decimal.Decimal64(.Value.(decimal.Decimal128).LowBits()))
				return nil
			}
		}
	case *array.Decimal128Builder:
		 := .Type().(*arrow.Decimal128Type)
		switch val := .Value().(type) {
		case variant.DecimalValue[decimal.Decimal32]:
			if decimalCanFit(, ) {
				.Append(decimal128.FromI64(int64(.Value.(decimal.Decimal32))))
				return nil
			}
		case variant.DecimalValue[decimal.Decimal64]:
			if decimalCanFit(, ) {
				.Append(decimal128.FromI64(int64(.Value.(decimal.Decimal64))))
				return nil
			}
		case variant.DecimalValue[decimal.Decimal128]:
			if decimalCanFit(, ) {
				.Append(.Value.(decimal.Decimal128))
				return nil
			}
		}
	}

	.typedBldr.AppendNull()
	return .Bytes()
}

type shreddedFieldBuilder struct {
	structBldr *array.StructBuilder
	valueBldr  array.BinaryLikeBuilder
	typedBldr  shreddedBuilder
}

type shreddedObjBuilder struct {
	structBldr *array.StructBuilder

	fieldBuilders map[string]shreddedFieldBuilder
}

func ( *shreddedObjBuilder) () {
	.structBldr.AppendValues([]bool{false})
	for ,  := range .fieldBuilders {
		.structBldr.Append(true)
		.valueBldr.AppendNull()
		.typedBldr.AppendMissing()
	}
}

func ( *shreddedObjBuilder) ( variant.Value) ( []byte) {
	if .Type() != variant.Object {
		.AppendMissing()
		return .Bytes()
	}

	.structBldr.Append(true)

	// create a variant builder for any field existing in 'v' but not in
	// the shreddeding schema
	 := variant.NewBuilderFromMeta(.Metadata())
	 := .Value().(variant.ObjectValue)

	 := .Offset()
	 := make([]variant.FieldEntry, 0, .NumElements())
	 := make(map[string]struct{})
	for ,  := range .Values() {
		,  := .fieldBuilders[]
		if ! {
			// field is not shredded, put it in the untyped value col
			 = append(, .NextField(, ))
			if  := .UnsafeAppendEncoded(.Bytes());  != nil {
				panic(fmt.Sprintf("error appending field %s: %v", , ))
			}
		} else {
			[] = struct{}{}
			.structBldr.Append(true)
			 := .typedBldr.tryTyped()
			if len() > 0 {
				.valueBldr.Append()
			} else {
				.valueBldr.AppendNull()
			}
		}
	}

	if len() < len(.fieldBuilders) {
		// set missing fields appropriately
		for ,  := range .fieldBuilders {
			if ,  := []; ! {
				.structBldr.Append(true)
				.valueBldr.AppendNull()
				.typedBldr.AppendMissing()
			}
		}
	}

	if len() > 0 {
		if  := .FinishObject(, );  != nil {
			panic(fmt.Sprintf("error finishing object: %v", ))
		}

		return .BuildWithoutMeta()
	}

	return nil
}

func createShreddedBuilder( variantSchema,  array.Builder) shreddedBuilder {
	switch s := .(type) {
	case shreddedObjSchema:
		 := .(*array.StructBuilder)
		 := make(map[string]shreddedFieldBuilder)
		for ,  := range .fields {
			 := .FieldBuilder().(*array.StructBuilder)
			[.fieldName] = shreddedFieldBuilder{
				structBldr: ,
				valueBldr:  .FieldBuilder(.variantIdx).(array.BinaryLikeBuilder),
				typedBldr:  (.schema, .FieldBuilder(.typedIdx)),
			}
		}
		return &shreddedObjBuilder{
			structBldr:    ,
			fieldBuilders: ,
		}
	case shreddedArraySchema:
		 := .(*array.ListBuilder)
		 := .ValueBuilder().(*array.StructBuilder)

		return &shreddedArrayBuilder{
			listBldr:  ,
			elemBldr:  ,
			valueBldr: .FieldBuilder(.elemVariantIdx).(array.BinaryLikeBuilder),
			typedBldr: (.elemSchema, .FieldBuilder(.elemTypedIdx)),
		}
	case shreddedPrimitiveSchema:
		return &shreddedPrimitiveBuilder{typedBldr: }
	}

	// non-shredded
	return nil
}