// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compute

import (
	
	
	
	
	
	
	

	
	
)

var (
	ErrEmpty           = errors.New("cannot traverse empty field path")
	ErrNoChildren      = errors.New("trying to get child of type with no children")
	ErrIndexRange      = errors.New("index out of range")
	ErrMultipleMatches = errors.New("multiple matches")
	ErrNoMatch         = errors.New("no match")
	ErrInvalid         = errors.New("field ref invalid")
)

func getFields( arrow.DataType) []arrow.Field {
	if ,  := .(arrow.NestedType);  {
		return .Fields()
	}
	return nil
}

type listvals interface {
	ListValues() arrow.Array
}

func getChildren( arrow.Array) ( []arrow.Array) {
	switch arr := .(type) {
	case *array.Struct:
		 = make([]arrow.Array, .NumField())
		for  := 0;  < .NumField(); ++ {
			[] = .Field()
		}
	case listvals:
		 = []arrow.Array{.ListValues()}
	}
	return
}

// FieldPath represents a path to a nested field using indices of child fields.
// For example, given the indices {5, 9, 3} the field could be retrieved with:
// schema.Field(5).Type().(*arrow.StructType).Field(9).Type().(*arrow.StructType).Field(3)
//
// Attempting to retrieve a child field using a FieldPath which is not valid for a given
// schema will get an error such as an out of range index, or an empty path.
//
// FieldPaths provide for drilling down to potentially nested children for convenience
// of accepting a slice of fields, a schema or a datatype (which should contain child fields).
//
// A fieldpath can also be used to retrieve a child arrow.Array or column from a record batch.
type FieldPath []int

func ( FieldPath) () string {
	if len() == 0 {
		return "FieldPath(empty)"
	}

	var  strings.Builder
	.WriteString("FieldPath(")
	for ,  := range  {
		fmt.Fprint(&, )
		.WriteByte(' ')
	}
	 := .String()
	return [:len()-1] + ")"
}

// Get retrieves the corresponding nested child field by drilling through the schema's
// fields as per the field path.
func ( FieldPath) ( *arrow.Schema) (*arrow.Field, error) {
	return .GetFieldFromSlice(.Fields())
}

// GetFieldFromSlice treats the slice as the top layer of fields, so the first value
// in the field path will index into the slice, and then drill down from there.
func ( FieldPath) ( []arrow.Field) (*arrow.Field, error) {
	if len() == 0 {
		return nil, ErrEmpty
	}

	var (
		 = 0
		   *arrow.Field
	)
	for ,  := range  {
		if len() == 0 {
			return nil, fmt.Errorf("%w: %s", ErrNoChildren, .Type)
		}

		if  < 0 ||  >= len() {
			return nil, fmt.Errorf("%w: indices=%s", ErrIndexRange, [:+1])
		}

		 = &[]
		 = getFields(.Type)
		++
	}

	return , nil
}

func ( FieldPath) ( []arrow.Array) (arrow.Array, error) {
	if len() == 0 {
		return nil, ErrEmpty
	}

	var (
		 = 0
		   arrow.Array
	)
	for ,  := range  {
		if len() == 0 {
			return nil, fmt.Errorf("%w: %s", ErrNoChildren, .DataType())
		}

		if  < 0 ||  >= len() {
			return nil, fmt.Errorf("%w. indices=%s", ErrIndexRange, [:+1])
		}

		 = []
		 = getChildren()
		++
	}
	return , nil
}

// GetFieldFromType returns the nested field from a datatype by drilling into it's
// child fields.
func ( FieldPath) ( arrow.DataType) (*arrow.Field, error) {
	return .GetFieldFromSlice(getFields())
}

// GetField is equivalent to GetFieldFromType(field.Type)
func ( FieldPath) ( arrow.Field) (*arrow.Field, error) {
	return .GetFieldFromType(.Type)
}

// GetColumn will return the correct child array by traversing the fieldpath
// going to the nested arrays of the columns in the record batch.
func ( FieldPath) ( arrow.RecordBatch) (arrow.Array, error) {
	return .getArray(.Columns())
}

func ( FieldPath) ( []arrow.Field) []FieldPath {
	,  := .GetFieldFromSlice()
	if  == nil {
		return []FieldPath{}
	}
	return nil
}

// a nameref represents a FieldRef by name of the field
type nameRef string

func ( nameRef) () string {
	return "Name(" + string() + ")"
}

func ( nameRef) ( []arrow.Field) []FieldPath {
	 := []FieldPath{}
	for ,  := range  {
		if .Name == string() {
			 = append(, FieldPath{})
		}
	}
	return 
}

func ( nameRef) ( *maphash.Hash) { .WriteString(string()) }

type matches struct {
	prefixes []FieldPath
	refs     []*arrow.Field
}

func ( *matches) (,  FieldPath,  []arrow.Field) {
	,  := .GetFieldFromSlice()
	if  != nil {
		panic()
	}

	.refs = append(.refs, )
	.prefixes = append(.prefixes, append(, ...))
}

// refList represents a list of references to use to determine which nested
// field is being referenced. allowing combinations of field indices and names
type refList []FieldRef

func ( refList) () string {
	var  strings.Builder
	.WriteString("Nested(")
	for ,  := range  {
		fmt.Fprint(&, )
		.WriteByte(' ')
	}
	 := .String()
	return [:len()-1] + ")"
}

func ( refList) ( *maphash.Hash) {
	for ,  := range  {
		.hash()
	}
}

func ( refList) ( []arrow.Field) []FieldPath {
	if len() == 0 {
		return nil
	}

	 := matches{}
	for ,  := range [0].FindAll() {
		.add(FieldPath{}, , )
	}

	for ,  := range [1:] {
		 := matches{}
		for ,  := range .refs {
			for ,  := range .FindAllField(*) {
				.add(.prefixes[], , getFields(.Type))
			}
		}
		 = 
	}
	return .prefixes
}

type refImpl interface {
	fmt.Stringer
	findAll(fields []arrow.Field) []FieldPath
	hash(h *maphash.Hash)
}

// FieldRef is a descriptor of a (potentially nested) field within a schema.
//
// Unlike FieldPath (which is exclusively indices of child fields), FieldRef
// may reference a field by name. It can be constructed from either
// a field index, field name, or field path.
//
// Nested fields can be referenced as well, given the schema:
//
//			arrow.NewSchema([]arrow.Field{
//				{Name: "a", Type: arrow.StructOf(arrow.Field{Name: "n", Type: arrow.Null})},
//	 		{Name: "b", Type: arrow.PrimitiveTypes.Int32},
//			})
//
// the following all indicate the nested field named "n":
//
//	FieldRefPath(FieldPath{0, 0})
//	FieldRefList("a", 0)
//	FieldRefList("a", "n")
//	FieldRefList(0, "n")
//	NewFieldRefFromDotPath(".a[0]")
//
// FieldPaths matching a FieldRef are retrieved with the FindAll* functions
// Multiple matches are possible because field names may be duplicated within
// a schema. For example:
//
//	aIsAmbiguous := arrow.NewSchema([]arrow.Field{
//		{Name: "a", Type: arrow.PrimitiveTypes.Int32},
//		{Name: "a", Type: arrow.PrimitiveTypes.Float32},
//	})
//	matches := FieldRefName("a").FindAll(aIsAmbiguous)
//	assert.Len(matches, 2)
//	assert.True(matches[0].Get(aIsAmbiguous).Equals(aIsAmbiguous.Field(0))
//	assert.True(matches[1].Get(aIsAmbiguous).Equals(aIsAmbiguous.Field(1))
type FieldRef struct {
	impl refImpl
}

// FieldRefPath constructs a FieldRef from a given FieldPath
func ( FieldPath) FieldRef {
	return FieldRef{impl: }
}

// FieldRefIndex is a convenience function to construct a FieldPath reference
// of a single index
func ( int) FieldRef {
	return FieldRef{impl: FieldPath{}}
}

// FieldRefName constructs a FieldRef by name
func ( string) FieldRef {
	return FieldRef{impl: nameRef()}
}

// FieldRefList takes an arbitrary number of arguments which can be either
// strings or ints. This will panic if anything other than a string or int
// is passed in.
func ( ...interface{}) FieldRef {
	 := make(refList, len())
	for ,  := range  {
		switch e := .(type) {
		case string:
			[] = FieldRefName()
		case int:
			[] = FieldRefIndex()
		}
	}
	return FieldRef{impl: }
}

// NewFieldRefFromDotPath parses a dot path into a field ref.
//
// dot_path = '.' name
//
//	| '[' digit+ ']'
//	| dot_path+
//
// Examples
//
//	".alpha" => FieldRefName("alpha")
//	"[2]" => FieldRefIndex(2)
//	".beta[3]" => FieldRefList("beta", 3)
//	"[5].gamma.delta[7]" => FieldRefList(5, "gamma", "delta", 7)
//	".hello world" => FieldRefName("hello world")
//	`.\[y\]\\tho\.\` => FieldRef(`[y]\tho.\`)
//
// Note: when parsing a name, a '\' preceding any other character will be
// dropped from the resulting name. therefore if a name must contain the characters
// '.', '\', '[' or ']' then they must be escaped with a preceding '\'.
func ( string) ( FieldRef,  error) {
	if len() == 0 {
		return , fmt.Errorf("%w dotpath was empty", ErrInvalid)
	}

	 := func() string {
		var  string
		for {
			 := strings.IndexAny(, `\[.`)
			if  == -1 {
				 += 
				 = ""
				break
			}

			if [] != '\\' {
				// subscript for a new field ref
				 += [:]
				 = [:]
				break
			}

			if len() == +1 {
				// dotpath ends with a backslash; consume it all
				 += 
				 = ""
				break
			}

			// append all characters before backslash, then the character which follows it
			 += [:] + string([+1])
			 = [+2:]
		}
		return 
	}

	 := make([]FieldRef, 0)

	for len() > 0 {
		 := [0]
		 = [1:]
		switch  {
		case '.':
			// next element is a name
			 = append(, FieldRef{nameRef(())})
		case '[':
			 := strings.IndexFunc(, func( rune) bool { return !unicode.IsDigit() })
			if  == -1 || [] != ']' {
				return , fmt.Errorf("%w: dot path '%s' contained an unterminated index", ErrInvalid, )
			}
			,  := strconv.Atoi([:])
			 = append(, FieldRef{FieldPath{}})
			 = [+1:]
		default:
			return , fmt.Errorf("%w: dot path must begin with '[' or '.' got '%s'", ErrInvalid, )
		}
	}

	.flatten()
	return
}

func ( FieldRef) ( *maphash.Hash) { .impl.hash() }

// Hash produces a hash of this field reference and takes in a seed so that
// it can maintain consistency across multiple places / processes /etc.
func ( FieldRef) ( maphash.Seed) uint64 {
	 := maphash.Hash{}
	.SetSeed()
	.hash(&)
	return .Sum64()
}

// IsName returns true if this fieldref is a name reference
func ( *FieldRef) () bool {
	,  := .impl.(nameRef)
	return 
}

// IsFieldPath returns true if this FieldRef uses a fieldpath
func ( *FieldRef) () bool {
	,  := .impl.(FieldPath)
	return 
}

// IsNested returns true if this FieldRef expects to represent
// a nested field.
func ( *FieldRef) () bool {
	switch impl := .impl.(type) {
	case nameRef:
		return false
	case FieldPath:
		return len() > 1
	default:
		return true
	}
}

// Name returns the name of the field this references if it is
// a Name reference, otherwise the empty string
func ( *FieldRef) () string {
	,  := .impl.(nameRef)
	return string()
}

// FieldPath returns the fieldpath that this FieldRef uses, otherwise
// an empty FieldPath if it's not a FieldPath reference
func ( *FieldRef) () FieldPath {
	,  := .impl.(FieldPath)
	return 
}

func ( *FieldRef) ( FieldRef) bool {
	return reflect.DeepEqual(.impl, .impl)
}

func ( *FieldRef) ( []FieldRef) {
	 := make([]FieldRef, 0, len())

	var  func(refImpl)
	 = func( refImpl) {
		switch r := .(type) {
		case nameRef:
			 = append(, FieldRef{})
		case FieldPath:
			 = append(, FieldRef{})
		case refList:
			for ,  := range  {
				(.impl)
			}
		}
	}

	(refList())

	if len() == 1 {
		.impl = [0].impl
	} else {
		.impl = refList()
	}
}

// FindAll returns all the fieldpaths which this FieldRef matches in the given
// slice of fields.
func ( FieldRef) ( []arrow.Field) []FieldPath {
	return .impl.findAll()
}

// FindAllField returns all the fieldpaths that this FieldRef matches against
// the type of the given field.
func ( FieldRef) ( arrow.Field) []FieldPath {
	return .impl.findAll(getFields(.Type))
}

// FindOneOrNone is a convenience helper that will either return 1 fieldpath,
// or an empty fieldpath, and will return an error if there are multiple matches.
func ( FieldRef) ( *arrow.Schema) (FieldPath, error) {
	 := .FindAll(.Fields())
	if len() > 1 {
		return nil, fmt.Errorf("%w for %s in %s", ErrMultipleMatches, , )
	}
	if len() == 0 {
		return nil, nil
	}
	return [0], nil
}

// FindOneOrNoneRecord is like FindOneOrNone but for the schema of a record,
// returning an error only if there are multiple matches.
func ( FieldRef) ( arrow.RecordBatch) (FieldPath, error) {
	return .FindOneOrNone(.Schema())
}

// FindOne returns an error if the field isn't matched or if there are multiple matches
// otherwise it returns the path to the single valid match.
func ( FieldRef) ( *arrow.Schema) (FieldPath, error) {
	 := .FindAll(.Fields())
	if len() == 0 {
		return nil, fmt.Errorf("%w for %s in %s", ErrNoMatch, , )
	}
	if len() > 1 {
		return nil, fmt.Errorf("%w for %s in %s", ErrMultipleMatches, , )
	}
	return [0], nil
}

// GetAllColumns gets all the matching column arrays from the given record that
// this FieldRef references.
func ( FieldRef) ( arrow.RecordBatch) ([]arrow.Array, error) {
	 := make([]arrow.Array, 0)
	for ,  := range .FindAll(.Schema().Fields()) {
		,  := .GetColumn()
		if  != nil {
			return nil, 
		}
		 = append(, )
	}
	return , nil
}

// GetOneField will return a pointer to a field or an error if it is not found
// or if there are multiple matches.
func ( FieldRef) ( *arrow.Schema) (*arrow.Field, error) {
	,  := .FindOne()
	if  != nil {
		return nil, 
	}

	return .GetFieldFromSlice(.Fields())
}

// GetOneOrNone will return a field or a nil if the field is found or not, and
// only errors if there are multiple matches.
func ( FieldRef) ( *arrow.Schema) (*arrow.Field, error) {
	,  := .FindOneOrNone()
	if  != nil {
		return nil, 
	}
	if len() == 0 {
		return nil, nil
	}
	return .GetFieldFromSlice(.Fields())
}

// GetOneColumnOrNone returns either a nil or the referenced array if it can be
// found, erroring only if there is an ambiguous multiple matches.
func ( FieldRef) ( arrow.RecordBatch) (arrow.Array, error) {
	,  := .FindOneOrNoneRecord()
	if  != nil {
		return nil, 
	}
	if len() == 0 {
		return nil, nil
	}
	return .GetColumn()
}

func ( FieldRef) () string {
	return "FieldRef." + .impl.String()
}