// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
	
	

	
	
	
	
	
	
	
)

// ProtobufTypeHandler provides options on how protobuf fields should be handled in the conversion to arrow
type ProtobufTypeHandler int

const (
	// OneOfNull means do not wrap oneOfs in a union, they are treated as separate fields
	OneOfNull ProtobufTypeHandler = iota
	// OneOfDenseUnion maps the protobuf OneOf to an arrow.DENSE_UNION
	OneOfDenseUnion
	// EnumNumber uses the Enum numeric value
	EnumNumber
	// EnumValue uses the Enum string value
	EnumValue
	// EnumDictionary uses both the numeric and string value and maps to an arrow.Dictionary
	EnumDictionary
)

type schemaOptions struct {
	exclusionPolicy    func(pfr *ProtobufFieldReflection) bool
	fieldNameFormatter func(str string) string
	oneOfHandler       ProtobufTypeHandler
	enumHandler        ProtobufTypeHandler
}

// ProtobufFieldReflection represents the metadata and values of a protobuf field
type ProtobufFieldReflection struct {
	parent     *ProtobufMessageReflection
	descriptor protoreflect.FieldDescriptor
	prValue    protoreflect.Value
	rValue     reflect.Value
	schemaOptions
	arrow.Field
	isListItem bool
}

func ( *ProtobufFieldReflection) () bool {
	for .rValue.Kind() == reflect.Ptr {
		if .rValue.IsNil() {
			return true
		}
		.rValue = .rValue.Elem()
	}

	if !.rValue.IsValid() || !.prValue.IsValid() {
		return true
	}
	return false
}

func ( *ProtobufFieldReflection) () arrow.Field {
	return arrow.Field{
		Name:     .name(),
		Type:     .getDataType(),
		Nullable: true,
	}
}

func ( *ProtobufFieldReflection) () protoreflect.Value {
	return .prValue
}

func ( *ProtobufFieldReflection) () reflect.Value {
	return .rValue
}

func ( *ProtobufFieldReflection) () protoreflect.FieldDescriptor {
	return .descriptor
}

func ( *ProtobufFieldReflection) () string {
	if .isOneOf() && .oneOfHandler != OneOfNull {
		return .fieldNameFormatter(string(.descriptor.ContainingOneof().Name()))
	}
	return .fieldNameFormatter(string(.descriptor.Name()))
}

func ( *ProtobufFieldReflection) () arrow.Type {
	if .isOneOf() && .oneOfHandler == OneOfDenseUnion {
		return arrow.DENSE_UNION
	}
	if .isEnum() {
		switch .enumHandler {
		case EnumNumber:
			return arrow.INT32
		case EnumValue:
			return arrow.STRING
		case EnumDictionary:
			return arrow.DICTIONARY
		}
	}
	if .isStruct() {
		return arrow.STRUCT
	}
	if .isMap() {
		return arrow.MAP
	}
	if .isList() {
		return arrow.LIST
	}
	switch .descriptor.Kind() {
	case protoreflect.Int32Kind:
		return arrow.INT32
	case protoreflect.Int64Kind:
		return arrow.INT64
	case protoreflect.Sint32Kind:
		return arrow.INT32
	case protoreflect.Sint64Kind:
		return arrow.INT64
	case protoreflect.Uint32Kind:
		return arrow.UINT32
	case protoreflect.Uint64Kind:
		return arrow.UINT64
	case protoreflect.Fixed32Kind:
		return arrow.UINT32
	case protoreflect.Fixed64Kind:
		return arrow.UINT64
	case protoreflect.Sfixed32Kind:
		return arrow.INT32
	case protoreflect.Sfixed64Kind:
		return arrow.INT64
	case protoreflect.FloatKind:
		return arrow.FLOAT32
	case protoreflect.DoubleKind:
		return arrow.FLOAT64
	case protoreflect.StringKind:
		return arrow.STRING
	case protoreflect.BytesKind:
		return arrow.BINARY
	case protoreflect.BoolKind:
		return arrow.BOOL
	}
	return arrow.NULL
}

func ( *ProtobufFieldReflection) () bool {
	return .descriptor.ContainingOneof() != nil
}

func ( *ProtobufFieldReflection) () bool {
	return .descriptor.Kind() == protoreflect.EnumKind
}

func ( *ProtobufFieldReflection) () bool {
	return .descriptor.Kind() == protoreflect.MessageKind && !.descriptor.IsMap() && !.isList()
}

func ( *ProtobufFieldReflection) () bool {
	return .descriptor.Kind() == protoreflect.MessageKind && .descriptor.IsMap()
}

func ( *ProtobufFieldReflection) () bool {
	return .descriptor.IsList() && !.isListItem
}

// ProtobufMessageReflection represents the metadata and values of a protobuf message
type ProtobufMessageReflection struct {
	descriptor protoreflect.MessageDescriptor
	message    protoreflect.Message
	rValue     reflect.Value
	schemaOptions
	fields []ProtobufMessageFieldReflection
}

func ( ProtobufMessageReflection) () ProtobufMessageReflection {
	if .descriptor.FullName() == "google.protobuf.Any" && .rValue.IsValid() {
		for .rValue.Type().Kind() == reflect.Ptr {
			.rValue = reflect.Indirect(.rValue)
		}
		,  := .rValue.Interface().(anypb.Any)
		,  := .UnmarshalNew()

		 := reflect.ValueOf()
		for .Kind() == reflect.Ptr {
			 = reflect.Indirect()
		}

		return ProtobufMessageReflection{
			descriptor:    .ProtoReflect().Descriptor(),
			message:       .ProtoReflect(),
			rValue:        ,
			schemaOptions: .schemaOptions,
		}
	} else {
		return 
	}
}

func ( ProtobufMessageReflection) () []arrow.Field {
	var  []arrow.Field

	for  := range .generateStructFields() {
		 = append(, .arrowField())
	}

	return 
}

type protobufListReflection struct {
	ProtobufFieldReflection
}

func ( *ProtobufFieldReflection) () protobufListReflection {
	return protobufListReflection{*}
}

func ( protobufListReflection) () arrow.DataType {
	 := ProtobufFieldReflection{
		descriptor:    .descriptor,
		schemaOptions: .schemaOptions,
		isListItem:    true,
	}
	return arrow.ListOf(.getDataType())
}

type protobufUnionReflection struct {
	ProtobufFieldReflection
}

func ( *ProtobufFieldReflection) () protobufUnionReflection {
	return protobufUnionReflection{*}
}

func ( protobufUnionReflection) () bool {
	for .rValue.Kind() == reflect.Ptr || .rValue.Kind() == reflect.Interface {
		.rValue = .rValue.Elem()
	}
	return .rValue.Field(0).String() == .prValue.String()
}

func ( protobufUnionReflection) () arrow.UnionTypeCode {
	 := .descriptor.ContainingOneof().Fields()
	for  := 0;  < .Len(); ++ {
		 := .parent.getFieldByName(string(.Get().Name()))
		if .asUnion().isThisOne() {
			return .getUnionTypeCode(int32(.descriptor.Number()))
		}
	}
	// i.e. all null
	return -1
}

func ( protobufUnionReflection) () *ProtobufFieldReflection {
	 := .descriptor.ContainingOneof().Fields()
	for  := 0;  < .Len(); ++ {
		 := .parent.getFieldByName(string(.Get().Name()))
		if .asUnion().isThisOne() {
			return 
		}
	}
	// i.e. all null
	return nil
}

func ( protobufUnionReflection) ( int32) arrow.UnionTypeCode {
	//We use the index of the field number as there is a limit on the arrow.UnionTypeCode (127)
	//which a protobuf Number could realistically exceed
	 := .descriptor.ContainingOneof().Fields()
	for  := 0;  < .Len(); ++ {
		if  == int32(.Get().Number()) {
			return int8()
		}
	}
	return -1
}

func ( protobufUnionReflection) () chan *ProtobufFieldReflection {
	 := make(chan *ProtobufFieldReflection)
	go func() {
		defer close()
		 := .descriptor.ContainingOneof().Fields()
		for  := 0;  < .Len(); ++ {
			 := .parent.getFieldByName(string(.Get().Name()))
			// Do not get stuck in a recursion loop
			.oneOfHandler = OneOfNull
			if .exclusionPolicy() {
				continue
			}
			 <- 
		}
	}()

	return 
}

func ( protobufUnionReflection) () []arrow.Field {
	var  []arrow.Field

	for  := range .generateUnionFields() {
		 = append(, .arrowField())
	}

	return 
}

func ( protobufUnionReflection) () arrow.DataType {
	 := .getArrowFields()
	 := make([]arrow.UnionTypeCode, len())
	for  := 0;  < len(); ++ {
		[] = arrow.UnionTypeCode()
	}
	return arrow.DenseUnionOf(, )
}

type protobufDictReflection struct {
	ProtobufFieldReflection
}

func ( *ProtobufFieldReflection) () protobufDictReflection {
	return protobufDictReflection{*}
}

func ( protobufDictReflection) () arrow.DataType {
	return &arrow.DictionaryType{
		IndexType: arrow.PrimitiveTypes.Int32,
		ValueType: arrow.BinaryTypes.String,
		Ordered:   false,
	}
}

func ( protobufDictReflection) ( memory.Allocator) arrow.Array {
	 := .descriptor.Enum().Values()
	 := array.NewStringBuilder()
	for  := 0;  < .Len(); ++ {
		.Append(string(.Get().Name()))
	}
	return .NewArray()
}

type protobufMapReflection struct {
	ProtobufFieldReflection
}

func ( *ProtobufFieldReflection) () protobufMapReflection {
	return protobufMapReflection{*}
}

func ( protobufMapReflection) () arrow.DataType {
	for  := range .generateKeyValuePairs() {
		return .getDataType()
	}
	return protobufMapKeyValuePairReflection{
		k: ProtobufFieldReflection{
			parent:        .parent,
			descriptor:    .descriptor.MapKey(),
			schemaOptions: .schemaOptions,
		},
		v: ProtobufFieldReflection{
			parent:        .parent,
			descriptor:    .descriptor.MapValue(),
			schemaOptions: .schemaOptions,
		},
	}.getDataType()
}

type protobufMapKeyValuePairReflection struct {
	k ProtobufFieldReflection
	v ProtobufFieldReflection
}

func ( protobufMapKeyValuePairReflection) () arrow.DataType {
	return arrow.MapOf(.k.getDataType(), .v.getDataType())
}

func ( protobufMapReflection) () chan protobufMapKeyValuePairReflection {
	 := make(chan protobufMapKeyValuePairReflection)

	go func() {
		defer close()
		if !.rValue.IsValid() {
			 := protobufMapKeyValuePairReflection{
				k: ProtobufFieldReflection{
					parent:        .parent,
					descriptor:    .descriptor.MapKey(),
					schemaOptions: .schemaOptions,
				},
				v: ProtobufFieldReflection{
					parent:        .parent,
					descriptor:    .descriptor.MapValue(),
					schemaOptions: .schemaOptions,
				},
			}
			 <- 
			return
		}
		for ,  := range .rValue.MapKeys() {
			 := protobufMapKeyValuePairReflection{
				k: ProtobufFieldReflection{
					parent:        .parent,
					descriptor:    .descriptor.MapKey(),
					prValue:       getMapKey(),
					rValue:        ,
					schemaOptions: .schemaOptions,
				},
				v: ProtobufFieldReflection{
					parent:        .parent,
					descriptor:    .descriptor.MapValue(),
					prValue:       .prValue.Map().Get(protoreflect.MapKey(getMapKey())),
					rValue:        .rValue.MapIndex(),
					schemaOptions: .schemaOptions,
				},
			}
			 <- 
		}
	}()

	return 
}

func getMapKey( reflect.Value) protoreflect.Value {
	switch .Kind() {
	case reflect.String:
		return protoreflect.ValueOf(.String())
	case reflect.Int32, reflect.Int64:
		return protoreflect.ValueOf(.Int())
	case reflect.Bool:
		return protoreflect.ValueOf(.Bool())
	case reflect.Uint32, reflect.Uint64:
		return protoreflect.ValueOf(.Uint())
	default:
		panic("Unmapped protoreflect map key type")
	}
}

func ( ProtobufMessageReflection) () chan *ProtobufFieldReflection {
	 := make(chan *ProtobufFieldReflection)

	go func() {
		defer close()
		 := .descriptor.Fields()
		for  := 0;  < .Len(); ++ {
			 := .getFieldByName(string(.Get().Name()))
			if .exclusionPolicy() {
				continue
			}
			if .arrowType() == arrow.DENSE_UNION {
				if .descriptor.Number() != .descriptor.ContainingOneof().Fields().Get(0).Number() {
					continue
				}
			}
			 <- 
		}
	}()

	return 
}

func ( ProtobufMessageReflection) () chan *ProtobufFieldReflection {
	 := make(chan *ProtobufFieldReflection)

	go func() {
		defer close()
		 := .descriptor.Fields()
		for  := 0;  < .Len(); ++ {
			 := .getFieldByName(string(.Get().Name()))
			if .exclusionPolicy() {
				continue
			}
			if .arrowType() == arrow.DENSE_UNION {
				if .descriptor.Number() != .descriptor.ContainingOneof().Fields().Get(0).Number() {
					continue
				}
			}
			 <- 
		}
	}()

	return 
}

func ( *ProtobufFieldReflection) () ProtobufMessageReflection {
	 := ProtobufMessageReflection{
		descriptor:    .descriptor.Message(),
		rValue:        .rValue,
		schemaOptions: .schemaOptions,
	}
	if .prValue.IsValid() {
		.message = .prValue.Message()
	}
	 = .unmarshallAny()
	return 
}

func ( ProtobufMessageReflection) () arrow.DataType {
	return arrow.StructOf(.getArrowFields()...)
}

func ( ProtobufMessageReflection) ( string) *ProtobufFieldReflection {
	 := .descriptor.Fields().ByTextName(strcase.SnakeCase())
	 := .rValue
	if .IsValid() {
		if !.IsZero() {
			for .Kind() == reflect.Ptr || .Kind() == reflect.Interface {
				 = .Elem()
			}
			if .ContainingOneof() != nil {
				 = string(.ContainingOneof().Name())
			}
			 = .FieldByName(strcase.UpperCamelCase())
			for .Kind() == reflect.Ptr {
				 = .Elem()
			}
		}
	}
	 := ProtobufFieldReflection{
		parent:        &,
		descriptor:    ,
		rValue:        ,
		schemaOptions: .schemaOptions,
	}
	if .message != nil {
		.prValue = .message.Get()
	}
	return &
}

func ( protobufListReflection) () chan ProtobufFieldReflection {
	 := make(chan ProtobufFieldReflection)

	go func() {
		defer close()
		for  := 0;  < .prValue.List().Len(); ++ {
			 <- ProtobufFieldReflection{
				descriptor:    .descriptor,
				prValue:       .prValue.List().Get(),
				rValue:        .rValue.Index(),
				schemaOptions: .schemaOptions,
			}
		}
	}()

	return 
}

func ( *ProtobufFieldReflection) () arrow.DataType {
	switch .arrowType() {
	case arrow.DENSE_UNION:
		return .asUnion().getDataType()
	case arrow.DICTIONARY:
		return .asDictionary().getDataType()
	case arrow.LIST:
		return .asList().getDataType()
	case arrow.MAP:
		return .asMap().getDataType()
	case arrow.STRUCT:
		return .asStruct().getDataType()
	case arrow.INT32:
		return arrow.PrimitiveTypes.Int32
	case arrow.INT64:
		return arrow.PrimitiveTypes.Int64
	case arrow.UINT32:
		return arrow.PrimitiveTypes.Uint32
	case arrow.UINT64:
		return arrow.PrimitiveTypes.Uint64
	case arrow.FLOAT32:
		return arrow.PrimitiveTypes.Float32
	case arrow.FLOAT64:
		return arrow.PrimitiveTypes.Float64
	case arrow.STRING:
		return arrow.BinaryTypes.String
	case arrow.BINARY:
		return arrow.BinaryTypes.Binary
	case arrow.BOOL:
		return arrow.FixedWidthTypes.Boolean
	}
	return nil
}

type protobufReflection interface {
	name() string
	arrowType() arrow.Type
	protoreflectValue() protoreflect.Value
	reflectValue() reflect.Value
	GetDescriptor() protoreflect.FieldDescriptor
	isNull() bool
	isEnum() bool
	asDictionary() protobufDictReflection
	isList() bool
	asList() protobufListReflection
	isMap() bool
	asMap() protobufMapReflection
	isStruct() bool
	asStruct() ProtobufMessageReflection
	isOneOf() bool
	asUnion() protobufUnionReflection
}

// ProtobufMessageFieldReflection links together the message and it's fields
type ProtobufMessageFieldReflection struct {
	parent *ProtobufMessageReflection
	protobufReflection
	arrow.Field
}

// Schema returns an arrow.Schema representing a protobuf message
func ( ProtobufMessageReflection) () *arrow.Schema {
	var  []arrow.Field
	for ,  := range .fields {
		 = append(, .Field)
	}
	return arrow.NewSchema(, nil)
}

// Record returns an arrow.RecordBatch for a protobuf message
func ( ProtobufMessageReflection) ( memory.Allocator) arrow.RecordBatch {
	if  == nil {
		 = memory.NewGoAllocator()
	}

	 := .Schema()

	 := array.NewRecordBuilder(, )

	var  []string
	for ,  := range .fields {
		.AppendValueOrNull(.Field(), )
		 = append(, .name())
	}

	var  []arrow.Array
	for ,  := range .Fields() {
		 := .NewArray()
		 = append(, )
	}

	,  := array.NewStructArray(, )

	return array.RecordFromStructArray(, )
}

// NewProtobufMessageReflection initialises a ProtobufMessageReflection
// can be used to convert a protobuf message into an arrow Record
func ( proto.Message,  ...option) *ProtobufMessageReflection {
	 := reflect.ValueOf()
	for .Kind() == reflect.Ptr {
		 = .Elem()
	}
	 := func( *ProtobufFieldReflection) bool {
		return false
	}
	 := func( string) string {
		return 
	}
	 := &ProtobufMessageReflection{
		descriptor: .ProtoReflect().Descriptor(),
		message:    .ProtoReflect(),
		rValue:     ,
		schemaOptions: schemaOptions{
			exclusionPolicy:    ,
			fieldNameFormatter: ,
			oneOfHandler:       OneOfNull,
			enumHandler:        EnumDictionary,
		},
	}

	for ,  := range  {
		()
	}

	var  []ProtobufMessageFieldReflection

	for  := range .generateFields() {
		 = append(, ProtobufMessageFieldReflection{
			parent:             ,
			protobufReflection: ,
			Field:              .arrowField(),
		})
	}

	.fields = 

	return 
}

type option func(*ProtobufMessageReflection)

// WithExclusionPolicy is an option for a ProtobufMessageReflection
// WithExclusionPolicy acts as a deny filter on the fields of a protobuf message
// i.e. prevents them from being included in the schema.
// A use case for this is to exclude fields containing PII.
func ( func( *ProtobufFieldReflection) bool) option {
	return func( *ProtobufMessageReflection) {
		.exclusionPolicy = 
	}
}

// WithFieldNameFormatter is an option for a ProtobufMessageReflection
// WithFieldNameFormatter enables customisation of the field names in the arrow schema
// By default, the field names are taken from the protobuf message (.proto file)
func ( func( string) string) option {
	return func( *ProtobufMessageReflection) {
		.fieldNameFormatter = 
	}
}

// WithOneOfHandler is an option for a ProtobufMessageReflection
// WithOneOfHandler enables customisation of the protobuf oneOf type in the arrow schema
// By default, the oneOfs are mapped to separate columns
func ( ProtobufTypeHandler) option {
	return func( *ProtobufMessageReflection) {
		.oneOfHandler = 
	}
}

// WithEnumHandler is an option for a ProtobufMessageReflection
// WithEnumHandler enables customisation of the protobuf Enum type in the arrow schema
// By default, the Enums are mapped to arrow.Dictionary
func ( ProtobufTypeHandler) option {
	return func( *ProtobufMessageReflection) {
		.enumHandler = 
	}
}

// AppendValueOrNull add the value of a protobuf field to an arrow array builder
func ( ProtobufMessageFieldReflection) ( array.Builder,  memory.Allocator) error {
	 := .protoreflectValue()
	 := .GetDescriptor()

	if .isNull() {
		.AppendNull()
		return nil
	}

	switch .Type().ID() {
	case arrow.STRING:
		if .isEnum() {
			.(*array.StringBuilder).Append(string(.Enum().Values().ByNumber(.Enum()).Name()))
		} else {
			.(*array.StringBuilder).Append(.String())
		}
	case arrow.BINARY:
		.(*array.BinaryBuilder).Append(.Bytes())
	case arrow.INT32:
		if .isEnum() {
			.(*array.Int32Builder).Append(int32(.reflectValue().Int()))
		} else {
			.(*array.Int32Builder).Append(int32(.Int()))
		}
	case arrow.INT64:
		.(*array.Int64Builder).Append(.Int())
	case arrow.FLOAT64:
		.(*array.Float64Builder).Append(.Float())
	case arrow.UINT32:
		.(*array.Uint32Builder).Append(uint32(.Uint()))
	case arrow.UINT64:
		.(*array.Uint64Builder).Append(.Uint())
	case arrow.BOOL:
		.(*array.BooleanBuilder).Append(.Bool())
	case arrow.DENSE_UNION:
		 := .(array.UnionBuilder)
		 := .asUnion()
		if .whichOne() == -1 {
			.AppendNull()
			break
		}
		.Append(.whichOne())
		 := .Child(int(.whichOne()))
		 := ProtobufMessageFieldReflection{
			parent:             .parent,
			protobufReflection: .getField(),
			Field:              .arrowField(),
		}.(, )
		if  != nil {
			return 
		}
	case arrow.DICTIONARY:
		 := .asDictionary()
		 := .(*array.BinaryDictionaryBuilder)
		 := .InsertStringDictValues(.getDictValues().(*array.String))
		if  != nil {
			return 
		}
		 := int(.reflectValue().Int())
		 := .Enum().Values().ByNumber(protoreflect.EnumNumber()).Name()
		 = .AppendValueFromString(string())
		if  != nil {
			return 
		}
	case arrow.STRUCT:
		 := .(*array.StructBuilder)
		.Append(true)
		 := ProtobufMessageFieldReflection{
			parent: .parent,
		}
		for ,  := range .Field.Type.(*arrow.StructType).Fields() {
			.protobufReflection = .asStruct().getFieldByName(.Name)
			.Field = 
			 := .(.FieldBuilder(), )
			if  != nil {
				return 
			}
		}
	case arrow.LIST:
		 := .(*array.ListBuilder)
		 := .List().Len()
		if  == 0 {
			.AppendEmptyValue()
			break
		}
		.ValueBuilder().Reserve()
		.Append(true)
		 := ProtobufMessageFieldReflection{
			parent: .parent,
			Field:  .Field.Type.(*arrow.ListType).ElemField(),
		}
		for  := range .asList().generateListItems() {
			.protobufReflection = &
			 := .(.ValueBuilder(), )
			if  != nil {
				return 
			}
		}
	case arrow.MAP:
		 := .(*array.MapBuilder)
		 := .Map().Len()
		if  == 0 {
			.AppendEmptyValue()
			break
		}
		.KeyBuilder().Reserve()
		.ItemBuilder().Reserve()
		.Append(true)
		 := ProtobufMessageFieldReflection{
			parent: .parent,
			Field:  .Field.Type.(*arrow.MapType).KeyField(),
		}
		 := ProtobufMessageFieldReflection{
			parent: .parent,
			Field:  .Field.Type.(*arrow.MapType).ItemField(),
		}
		for  := range .asMap().generateKeyValuePairs() {
			.protobufReflection = &.k
			 := .(.KeyBuilder(), )
			if  != nil {
				return 
			}
			.protobufReflection = &.v
			 = .(.ItemBuilder(), )
			if  != nil {
				return 
			}
		}
	default:
		return fmt.Errorf("not able to appendValueOrNull for type %s", .Type().ID())
	}
	return nil
}