// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package array

import (
	
	

	
	
	
)

// RecordEqual reports whether the two provided records are equal.
func (,  arrow.RecordBatch) bool {
	switch {
	case .NumCols() != .NumCols():
		return false
	case .NumRows() != .NumRows():
		return false
	}

	for  := range .Columns() {
		 := .Column()
		 := .Column()
		if !Equal(, ) {
			return false
		}
	}
	return true
}

// RecordApproxEqual reports whether the two provided records are approximately equal.
// For non-floating point columns, it is equivalent to RecordEqual.
func (,  arrow.RecordBatch,  ...EqualOption) bool {
	switch {
	case .NumCols() != .NumCols():
		return false
	case .NumRows() != .NumRows():
		return false
	}

	 := newEqualOption(...)

	for  := range .Columns() {
		 := .Column()
		 := .Column()
		if !arrayApproxEqual(, , ) {
			return false
		}
	}
	return true
}

// helper function to evaluate a function on two chunked object having possibly different
// chunk layouts. the function passed in will be called for each corresponding slice of the
// two chunked arrays and if the function returns false it will end the loop early.
func chunkedBinaryApply(,  *arrow.Chunked,  func( arrow.Array, ,  int64,  arrow.Array, ,  int64) bool) {
	var (
		               int64
		            = int64(.Len())
		,  int
		,  int64
	)

	for  <  {
		var ,  arrow.Array
		for {
			,  = .Chunk(), .Chunk()
			if  == int64(.Len()) {
				 = 0
				++
				continue
			}
			if  == int64(.Len()) {
				 = 0
				++
				continue
			}
			break
		}

		 := int64(min(.Len()-int(), .Len()-int()))
		 += 
		if !(, , +, , , +) {
			return
		}

		 += 
		 += 
	}
}

// ChunkedEqual reports whether two chunked arrays are equal regardless of their chunkings
func (,  *arrow.Chunked) bool {
	switch {
	case  == :
		return true
	case .Len() != .Len():
		return false
	case .NullN() != .NullN():
		return false
	case !arrow.TypeEqual(.DataType(), .DataType()):
		return false
	}

	var  = true
	chunkedBinaryApply(, , func( arrow.Array, ,  int64,  arrow.Array, ,  int64) bool {
		 = SliceEqual(, , , , , )
		return 
	})

	return 
}

// ChunkedApproxEqual reports whether two chunked arrays are approximately equal regardless of their chunkings
// for non-floating point arrays, this is equivalent to ChunkedEqual
func (,  *arrow.Chunked,  ...EqualOption) bool {
	switch {
	case  == :
		return true
	case .Len() != .Len():
		return false
	case .NullN() != .NullN():
		return false
	case !arrow.TypeEqual(.DataType(), .DataType()):
		return false
	}

	var  bool
	chunkedBinaryApply(, , func( arrow.Array, ,  int64,  arrow.Array, ,  int64) bool {
		 = SliceApproxEqual(, , , , , , ...)
		return 
	})

	return 
}

// TableEqual returns if the two tables have the same data in the same schema
func (,  arrow.Table) bool {
	switch {
	case .NumCols() != .NumCols():
		return false
	case .NumRows() != .NumRows():
		return false
	}

	for  := 0; int64() < .NumCols(); ++ {
		 := .Column()
		 := .Column()
		if !.Field().Equal(.Field()) {
			return false
		}

		if !ChunkedEqual(.Data(), .Data()) {
			return false
		}
	}
	return true
}

// TableEqual returns if the two tables have the approximately equal data in the same schema
func (,  arrow.Table,  ...EqualOption) bool {
	switch {
	case .NumCols() != .NumCols():
		return false
	case .NumRows() != .NumRows():
		return false
	}

	for  := 0; int64() < .NumCols(); ++ {
		 := .Column()
		 := .Column()
		if !.Field().Equal(.Field()) {
			return false
		}

		if !ChunkedApproxEqual(.Data(), .Data(), ...) {
			return false
		}
	}
	return true
}

// Equal reports whether the two provided arrays are equal.
func (,  arrow.Array) bool {
	switch {
	case !baseArrayEqual(, ):
		return false
	case .Len() == 0:
		return true
	case .NullN() == .Len():
		return true
	}

	// at this point, we know both arrays have same type, same length, same number of nulls
	// and nulls at the same place.
	// compare the values.

	switch l := .(type) {
	case *Null:
		return true
	case *Boolean:
		 := .(*Boolean)
		return arrayEqualBoolean(, )
	case *FixedSizeBinary:
		 := .(*FixedSizeBinary)
		return arrayEqualFixedSizeBinary(, )
	case *Binary:
		 := .(*Binary)
		return arrayEqualBinary(, )
	case *String:
		 := .(*String)
		return arrayEqualString(, )
	case *LargeBinary:
		 := .(*LargeBinary)
		return arrayEqualLargeBinary(, )
	case *LargeString:
		 := .(*LargeString)
		return arrayEqualLargeString(, )
	case *BinaryView:
		 := .(*BinaryView)
		return arrayEqualBinaryView(, )
	case *StringView:
		 := .(*StringView)
		return arrayEqualStringView(, )
	case *Int8:
		 := .(*Int8)
		return arrayEqualFixedWidth(, )
	case *Int16:
		 := .(*Int16)
		return arrayEqualFixedWidth(, )
	case *Int32:
		 := .(*Int32)
		return arrayEqualFixedWidth(, )
	case *Int64:
		 := .(*Int64)
		return arrayEqualFixedWidth(, )
	case *Uint8:
		 := .(*Uint8)
		return arrayEqualFixedWidth(, )
	case *Uint16:
		 := .(*Uint16)
		return arrayEqualFixedWidth(, )
	case *Uint32:
		 := .(*Uint32)
		return arrayEqualFixedWidth(, )
	case *Uint64:
		 := .(*Uint64)
		return arrayEqualFixedWidth(, )
	case *Float16:
		 := .(*Float16)
		return arrayEqualFixedWidth(, )
	case *Float32:
		 := .(*Float32)
		return arrayEqualFixedWidth(, )
	case *Float64:
		 := .(*Float64)
		return arrayEqualFixedWidth(, )
	case *Decimal32:
		 := .(*Decimal32)
		return arrayEqualDecimal(, )
	case *Decimal64:
		 := .(*Decimal64)
		return arrayEqualDecimal(, )
	case *Decimal128:
		 := .(*Decimal128)
		return arrayEqualDecimal(, )
	case *Decimal256:
		 := .(*Decimal256)
		return arrayEqualDecimal(, )
	case *Date32:
		 := .(*Date32)
		return arrayEqualFixedWidth(, )
	case *Date64:
		 := .(*Date64)
		return arrayEqualFixedWidth(, )
	case *Time32:
		 := .(*Time32)
		return arrayEqualFixedWidth(, )
	case *Time64:
		 := .(*Time64)
		return arrayEqualFixedWidth(, )
	case *Timestamp:
		 := .(*Timestamp)
		return arrayEqualTimestamp(, )
	case *List:
		 := .(*List)
		return arrayEqualList(, )
	case *LargeList:
		 := .(*LargeList)
		return arrayEqualLargeList(, )
	case *ListView:
		 := .(*ListView)
		return arrayEqualListView(, )
	case *LargeListView:
		 := .(*LargeListView)
		return arrayEqualLargeListView(, )
	case *FixedSizeList:
		 := .(*FixedSizeList)
		return arrayEqualFixedSizeList(, )
	case *Struct:
		 := .(*Struct)
		return arrayEqualStruct(, )
	case *MonthInterval:
		 := .(*MonthInterval)
		return arrayEqualMonthInterval(, )
	case *DayTimeInterval:
		 := .(*DayTimeInterval)
		return arrayEqualDayTimeInterval(, )
	case *MonthDayNanoInterval:
		 := .(*MonthDayNanoInterval)
		return arrayEqualMonthDayNanoInterval(, )
	case *Duration:
		 := .(*Duration)
		return arrayEqualFixedWidth(, )
	case *Map:
		 := .(*Map)
		return arrayEqualMap(, )
	case ExtensionArray:
		 := .(ExtensionArray)
		return arrayEqualExtension(, )
	case *Dictionary:
		 := .(*Dictionary)
		return arrayEqualDict(, )
	case *SparseUnion:
		 := .(*SparseUnion)
		return arraySparseUnionEqual(, )
	case *DenseUnion:
		 := .(*DenseUnion)
		return arrayDenseUnionEqual(, )
	case *RunEndEncoded:
		 := .(*RunEndEncoded)
		return arrayRunEndEncodedEqual(, )
	default:
		panic(fmt.Errorf("arrow/array: unknown array type %T", ))
	}
}

// SliceEqual reports whether slices left[lbeg:lend] and right[rbeg:rend] are equal.
func ( arrow.Array, ,  int64,  arrow.Array, ,  int64) bool {
	 := NewSlice(, , )
	defer .Release()
	 := NewSlice(, , )
	defer .Release()

	return Equal(, )
}

// SliceApproxEqual reports whether slices left[lbeg:lend] and right[rbeg:rend] are approximately equal.
func ( arrow.Array, ,  int64,  arrow.Array, ,  int64,  ...EqualOption) bool {
	 := newEqualOption(...)
	return sliceApproxEqual(, , , , , , )
}

func sliceApproxEqual( arrow.Array, ,  int64,  arrow.Array, ,  int64,  equalOption) bool {
	 := NewSlice(, , )
	defer .Release()
	 := NewSlice(, , )
	defer .Release()

	return arrayApproxEqual(, , )
}

const defaultAbsoluteTolerance = 1e-5

type equalOption struct {
	atol             float64 // absolute tolerance
	nansEq           bool    // whether NaNs are considered equal.
	unorderedMapKeys bool    // whether maps are allowed to have different entries order
}

func ( equalOption) (,  float16.Num) bool {
	 := float64(.Float32())
	 := float64(.Float32())
	switch {
	case .nansEq:
		return math.Abs(-) <= .atol || (math.IsNaN() && math.IsNaN())
	default:
		return math.Abs(-) <= .atol
	}
}

func ( equalOption) (,  float32) bool {
	 := float64()
	 := float64()
	switch {
	case .nansEq:
		return  ==  || math.Abs(-) <= .atol || (math.IsNaN() && math.IsNaN())
	default:
		return  ==  || math.Abs(-) <= .atol
	}
}

func ( equalOption) (,  float64) bool {
	switch {
	case .nansEq:
		return  ==  || math.Abs(-) <= .atol || (math.IsNaN() && math.IsNaN())
	default:
		return  ==  || math.Abs(-) <= .atol
	}
}

func newEqualOption( ...EqualOption) equalOption {
	 := equalOption{
		atol:   defaultAbsoluteTolerance,
		nansEq: false,
	}
	for ,  := range  {
		(&)
	}

	return 
}

// EqualOption is a functional option type used to configure how Records and Arrays are compared.
type EqualOption func(*equalOption)

// WithNaNsEqual configures the comparison functions so that NaNs are considered equal.
func ( bool) EqualOption {
	return func( *equalOption) {
		.nansEq = 
	}
}

// WithAbsTolerance configures the comparison functions so that 2 floating point values
// v1 and v2 are considered equal if |v1-v2| <= atol.
func ( float64) EqualOption {
	return func( *equalOption) {
		.atol = 
	}
}

// WithUnorderedMapKeys configures the comparison functions so that Map with different entries order are considered equal.
func ( bool) EqualOption {
	return func( *equalOption) {
		.unorderedMapKeys = 
	}
}

// ApproxEqual reports whether the two provided arrays are approximately equal.
// For non-floating point arrays, it is equivalent to Equal.
func (,  arrow.Array,  ...EqualOption) bool {
	 := newEqualOption(...)
	return arrayApproxEqual(, , )
}

func arrayApproxEqual(,  arrow.Array,  equalOption) bool {
	switch {
	case !baseArrayEqual(, ):
		return false
	case .Len() == 0:
		return true
	case .NullN() == .Len():
		return true
	}

	// at this point, we know both arrays have same type, same length, same number of nulls
	// and nulls at the same place.
	// compare the values.

	switch l := .(type) {
	case *Null:
		return true
	case *Boolean:
		 := .(*Boolean)
		return arrayEqualBoolean(, )
	case *FixedSizeBinary:
		 := .(*FixedSizeBinary)
		return arrayEqualFixedSizeBinary(, )
	case *Binary:
		 := .(*Binary)
		return arrayEqualBinary(, )
	case *String:
		 := .(*String)
		return arrayApproxEqualString(, )
	case *LargeBinary:
		 := .(*LargeBinary)
		return arrayEqualLargeBinary(, )
	case *LargeString:
		 := .(*LargeString)
		return arrayApproxEqualLargeString(, )
	case *BinaryView:
		 := .(*BinaryView)
		return arrayEqualBinaryView(, )
	case *StringView:
		 := .(*StringView)
		return arrayApproxEqualStringView(, )
	case *Int8:
		 := .(*Int8)
		return arrayEqualFixedWidth(, )
	case *Int16:
		 := .(*Int16)
		return arrayEqualFixedWidth(, )
	case *Int32:
		 := .(*Int32)
		return arrayEqualFixedWidth(, )
	case *Int64:
		 := .(*Int64)
		return arrayEqualFixedWidth(, )
	case *Uint8:
		 := .(*Uint8)
		return arrayEqualFixedWidth(, )
	case *Uint16:
		 := .(*Uint16)
		return arrayEqualFixedWidth(, )
	case *Uint32:
		 := .(*Uint32)
		return arrayEqualFixedWidth(, )
	case *Uint64:
		 := .(*Uint64)
		return arrayEqualFixedWidth(, )
	case *Float16:
		 := .(*Float16)
		return arrayApproxEqualFloat16(, , )
	case *Float32:
		 := .(*Float32)
		return arrayApproxEqualFloat32(, , )
	case *Float64:
		 := .(*Float64)
		return arrayApproxEqualFloat64(, , )
	case *Decimal32:
		 := .(*Decimal32)
		return arrayEqualDecimal(, )
	case *Decimal64:
		 := .(*Decimal64)
		return arrayEqualDecimal(, )
	case *Decimal128:
		 := .(*Decimal128)
		return arrayEqualDecimal(, )
	case *Decimal256:
		 := .(*Decimal256)
		return arrayEqualDecimal(, )
	case *Date32:
		 := .(*Date32)
		return arrayEqualFixedWidth(, )
	case *Date64:
		 := .(*Date64)
		return arrayEqualFixedWidth(, )
	case *Time32:
		 := .(*Time32)
		return arrayEqualFixedWidth(, )
	case *Time64:
		 := .(*Time64)
		return arrayEqualFixedWidth(, )
	case *Timestamp:
		 := .(*Timestamp)
		return arrayEqualTimestamp(, )
	case *List:
		 := .(*List)
		return arrayApproxEqualList(, , )
	case *LargeList:
		 := .(*LargeList)
		return arrayApproxEqualLargeList(, , )
	case *ListView:
		 := .(*ListView)
		return arrayApproxEqualListView(, , )
	case *LargeListView:
		 := .(*LargeListView)
		return arrayApproxEqualLargeListView(, , )
	case *FixedSizeList:
		 := .(*FixedSizeList)
		return arrayApproxEqualFixedSizeList(, , )
	case *Struct:
		 := .(*Struct)
		return arrayApproxEqualStruct(, , )
	case *MonthInterval:
		 := .(*MonthInterval)
		return arrayEqualMonthInterval(, )
	case *DayTimeInterval:
		 := .(*DayTimeInterval)
		return arrayEqualDayTimeInterval(, )
	case *MonthDayNanoInterval:
		 := .(*MonthDayNanoInterval)
		return arrayEqualMonthDayNanoInterval(, )
	case *Duration:
		 := .(*Duration)
		return arrayEqualFixedWidth(, )
	case *Map:
		 := .(*Map)
		if .unorderedMapKeys {
			return arrayApproxEqualMap(, , )
		}
		return arrayApproxEqualList(.List, .List, )
	case *Dictionary:
		 := .(*Dictionary)
		return arrayApproxEqualDict(, , )
	case ExtensionArray:
		 := .(ExtensionArray)
		return arrayApproxEqualExtension(, , )
	case *SparseUnion:
		 := .(*SparseUnion)
		return arraySparseUnionApproxEqual(, , )
	case *DenseUnion:
		 := .(*DenseUnion)
		return arrayDenseUnionApproxEqual(, , )
	case *RunEndEncoded:
		 := .(*RunEndEncoded)
		return arrayRunEndEncodedApproxEqual(, , )
	default:
		panic(fmt.Errorf("arrow/array: unknown array type %T", ))
	}
}

func baseArrayEqual(,  arrow.Array) bool {
	switch {
	case .Len() != .Len():
		return false
	case .NullN() != .NullN():
		return false
	case !arrow.TypeEqual(.DataType(), .DataType()): // We do not check for metadata as in the C++ implementation.
		return false
	case !validityBitmapEqual(, ):
		return false
	}
	return true
}

func validityBitmapEqual(,  arrow.Array) bool {
	// TODO(alexandreyc): make it faster by comparing byte slices of the validity bitmap?
	 := .Len()
	if  != .Len() {
		return false
	}
	for  := 0;  < ; ++ {
		if .IsNull() != .IsNull() {
			return false
		}
	}
	return true
}

func arrayApproxEqualString(,  *String) bool {
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}
		if stripNulls(.Value()) != stripNulls(.Value()) {
			return false
		}
	}
	return true
}

func arrayApproxEqualLargeString(,  *LargeString) bool {
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}
		if stripNulls(.Value()) != stripNulls(.Value()) {
			return false
		}
	}
	return true
}

func arrayApproxEqualStringView(,  *StringView) bool {
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}
		if stripNulls(.Value()) != stripNulls(.Value()) {
			return false
		}
	}
	return true
}

func arrayApproxEqualFloat16(,  *Float16,  equalOption) bool {
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}
		if !.f16(.Value(), .Value()) {
			return false
		}
	}
	return true
}

func arrayApproxEqualFloat32(,  *Float32,  equalOption) bool {
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}
		if !.f32(.Value(), .Value()) {
			return false
		}
	}
	return true
}

func arrayApproxEqualFloat64(,  *Float64,  equalOption) bool {
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}
		if !.f64(.Value(), .Value()) {
			return false
		}
	}
	return true
}

func arrayApproxEqualList(,  *List,  equalOption) bool {
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}
		 := func() bool {
			 := .newListValue()
			defer .Release()
			 := .newListValue()
			defer .Release()
			return arrayApproxEqual(, , )
		}()
		if ! {
			return false
		}
	}
	return true
}

func arrayApproxEqualLargeList(,  *LargeList,  equalOption) bool {
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}
		 := func() bool {
			 := .newListValue()
			defer .Release()
			 := .newListValue()
			defer .Release()
			return arrayApproxEqual(, , )
		}()
		if ! {
			return false
		}
	}
	return true
}

func arrayApproxEqualListView(,  *ListView,  equalOption) bool {
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}
		 := func() bool {
			 := .newListValue()
			defer .Release()
			 := .newListValue()
			defer .Release()
			return arrayApproxEqual(, , )
		}()
		if ! {
			return false
		}
	}
	return true
}

func arrayApproxEqualLargeListView(,  *LargeListView,  equalOption) bool {
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}
		 := func() bool {
			 := .newListValue()
			defer .Release()
			 := .newListValue()
			defer .Release()
			return arrayApproxEqual(, , )
		}()
		if ! {
			return false
		}
	}
	return true
}

func arrayApproxEqualFixedSizeList(,  *FixedSizeList,  equalOption) bool {
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}
		 := func() bool {
			 := .newListValue()
			defer .Release()
			 := .newListValue()
			defer .Release()
			return arrayApproxEqual(, , )
		}()
		if ! {
			return false
		}
	}
	return true
}

func arrayApproxEqualStruct(,  *Struct,  equalOption) bool {
	return bitutils.VisitSetBitRuns(
		.NullBitmapBytes(),
		int64(.Offset()), int64(.Len()),
		approxEqualStructRun(, , ),
	) == nil
}

func approxEqualStructRun(,  *Struct,  equalOption) bitutils.VisitFn {
	return func( int64,  int64) error {
		for  := range .fields {
			if !sliceApproxEqual(.fields[], , +, .fields[], , +, ) {
				return arrow.ErrInvalid
			}
		}
		return nil
	}
}

// arrayApproxEqualMap doesn't care about the order of keys (in Go map traversal order is undefined)
func arrayApproxEqualMap(,  *Map,  equalOption) bool {
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}
		if !arrayApproxEqualSingleMapEntry(.newListValue().(*Struct), .newListValue().(*Struct), ) {
			return false
		}
	}
	return true
}

// arrayApproxEqualSingleMapEntry is a helper function that checks if a single entry pair is approx equal.
// Basically, it doesn't care about key order.
// structs passed will be released
func arrayApproxEqualSingleMapEntry(,  *Struct,  equalOption) bool {
	defer .Release()
	defer .Release()

	// we don't compare the validity bitmap, but we want other checks from baseArrayEqual
	switch {
	case .Len() != .Len():
		return false
	case .NullN() != .NullN():
		return false
	case !arrow.TypeEqual(.DataType(), .DataType()): // We do not check for metadata as in the C++ implementation.
		return false
	case .NullN() == .Len():
		return true
	}

	 := make(map[int]bool, .Len())
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			continue
		}

		 := false
		,  := int64(), int64(+1)
		for  := 0;  < .Len(); ++ {
			if [] {
				continue
			}
			if .IsNull() {
				[] = true
				continue
			}

			,  := int64(), int64(+1)

			// check keys (field 0)
			if !sliceApproxEqual(.Field(0), , , .Field(0), , , ) {
				continue
			}

			// only now check the values
			if sliceApproxEqual(.Field(1), , , .Field(1), , , ) {
				 = true
				[] = true
				break
			}
		}
		if ! {
			return false
		}
	}

	return len() == .Len()
}