// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package array

import (
	
	
	
	

	
	
	
	
	
	
)

// RunEndEncoded represents an array containing two children:
// an array of int32 values defining the ends of each run of values
// and an array of values
type RunEndEncoded struct {
	array

	ends   arrow.Array
	values arrow.Array
}

func (,  arrow.Array, ,  int) *RunEndEncoded {
	 := NewData(arrow.RunEndEncodedOf(.DataType(), .DataType()), ,
		[]*memory.Buffer{nil}, []arrow.ArrayData{.Data(), .Data()}, 0, )
	defer .Release()
	return NewRunEndEncodedData()
}

func ( arrow.ArrayData) *RunEndEncoded {
	 := &RunEndEncoded{}
	.refCount.Add(1)
	.setData(.(*Data))
	return 
}

func ( *RunEndEncoded) () arrow.Array     { return .values }
func ( *RunEndEncoded) () arrow.Array { return .ends }

func ( *RunEndEncoded) () {
	.array.Retain()
	.values.Retain()
	.ends.Retain()
}

func ( *RunEndEncoded) () {
	.array.Release()
	.values.Release()
	.ends.Release()
}

// LogicalValuesArray returns an array holding the values of each
// run, only over the range of run values inside the logical offset/length
// range of the parent array.
//
// # Example
//
// For this array:
//
//	RunEndEncoded: { Offset: 150, Length: 1500 }
//	    RunEnds: [ 1, 2, 4, 6, 10, 1000, 1750, 2000 ]
//	    Values:  [ "a", "b", "c", "d", "e", "f", "g", "h" ]
//
// LogicalValuesArray will return the following array:
//
//	[ "f", "g" ]
//
// This is because the offset of 150 tells it to skip the values until
// "f" which corresponds with the logical offset (the run from 10 - 1000),
// and stops after "g" because the length + offset goes to 1650 which is
// within the run from 1000 - 1750, corresponding to the "g" value.
//
// # Note
//
// The return from this needs to be Released.
func ( *RunEndEncoded) () arrow.Array {
	 := .GetPhysicalOffset()
	 := .GetPhysicalLength()
	 := NewSliceData(.data.Children()[1], int64(), int64(+))
	defer .Release()
	return MakeFromData()
}

// LogicalRunEndsArray returns an array holding the logical indexes
// of each run end, only over the range of run end values relative
// to the logical offset/length range of the parent array.
//
// For arrays with an offset, this is not a slice of the existing
// internal run ends array. Instead a new array is created with run-ends
// that are adjusted so the new array can have an offset of 0. As a result
// this method can be expensive to call for an array with a non-zero offset.
//
// # Example
//
// For this array:
//
//	RunEndEncoded: { Offset: 150, Length: 1500 }
//	    RunEnds: [ 1, 2, 4, 6, 10, 1000, 1750, 2000 ]
//	    Values:  [ "a", "b", "c", "d", "e", "f", "g", "h" ]
//
// LogicalRunEndsArray will return the following array:
//
//	[ 850, 1500 ]
//
// This is because the offset of 150 tells us to skip all run-ends less
// than 150 (by finding the physical offset), and we adjust the run-ends
// accordingly (1000 - 150 = 850). The logical length of the array is 1500,
// so we know we don't want to go past the 1750 run end. Thus the last
// run-end is determined by doing: min(1750 - 150, 1500) = 1500.
//
// # Note
//
// The return from this needs to be Released
func ( *RunEndEncoded) ( memory.Allocator) arrow.Array {
	 := .GetPhysicalOffset()
	 := .GetPhysicalLength()

	if .data.offset == 0 {
		 := NewSliceData(.data.childData[0], 0, int64())
		defer .Release()
		return MakeFromData()
	}

	 := NewBuilder(, .data.childData[0].DataType())
	defer .Release()
	.Resize()

	switch e := .ends.(type) {
	case *Int16:
		for ,  := range .Int16Values()[ : +] {
			 -= int16(.data.offset)
			 = int16(utils.Min(int(), .data.length))
			.(*Int16Builder).Append()
		}
	case *Int32:
		for ,  := range .Int32Values()[ : +] {
			 -= int32(.data.offset)
			 = int32(utils.Min(int(), .data.length))
			.(*Int32Builder).Append()
		}
	case *Int64:
		for ,  := range .Int64Values()[ : +] {
			 -= int64(.data.offset)
			 = int64(utils.Min(int(), .data.length))
			.(*Int64Builder).Append()
		}
	}

	return .NewArray()
}

func ( *RunEndEncoded) ( *Data) {
	if len(.childData) != 2 {
		panic(fmt.Errorf("%w: arrow/array: RLE array must have exactly 2 children", arrow.ErrInvalid))
	}
	debug.Assert(.dtype.ID() == arrow.RUN_END_ENCODED, "invalid type for RunLengthEncoded")
	if !.dtype.(*arrow.RunEndEncodedType).ValidRunEndsType(.childData[0].DataType()) {
		panic(fmt.Errorf("%w: arrow/array: run ends array must be int16, int32, or int64", arrow.ErrInvalid))
	}
	if .childData[0].NullN() > 0 {
		panic(fmt.Errorf("%w: arrow/array: run ends array cannot contain nulls", arrow.ErrInvalid))
	}

	.array.setData()

	.ends = MakeFromData(.data.childData[0])
	.values = MakeFromData(.data.childData[1])
}

func ( *RunEndEncoded) () int {
	return encoded.FindPhysicalOffset(.data)
}

func ( *RunEndEncoded) () int {
	return encoded.GetPhysicalLength(.data)
}

// GetPhysicalIndex can be used to get the run-encoded value instead of costly LogicalValuesArray
// in the following way:
//
//	r.Values().(valuetype).Value(r.GetPhysicalIndex(i))
func ( *RunEndEncoded) ( int) int {
	return encoded.FindPhysicalIndex(.data, +.data.offset)
}

// ValueStr will return the str representation of the value at the logical offset i.
func ( *RunEndEncoded) ( int) string {
	return .values.ValueStr(.GetPhysicalIndex())
}

func ( *RunEndEncoded) () string {
	var  bytes.Buffer
	.WriteByte('[')
	for  := 0;  < .ends.Len(); ++ {
		if  != 0 {
			.WriteByte(',')
		}

		 := .values.GetOneForMarshal()
		if ,  := .(json.RawMessage);  {
			 = string()
		}
		fmt.Fprintf(&, "{%d -> %v}", .ends.GetOneForMarshal(), )
	}

	.WriteByte(']')
	return .String()
}

func ( *RunEndEncoded) ( int) interface{} {
	return .values.GetOneForMarshal(.GetPhysicalIndex())
}

func ( *RunEndEncoded) () ([]byte, error) {
	var  bytes.Buffer
	 := json.NewEncoder(&)
	.WriteByte('[')
	for  := 0;  < .Len(); ++ {
		if  != 0 {
			.WriteByte(',')
		}
		if  := .Encode(.GetOneForMarshal());  != nil {
			return nil, 
		}
	}
	.WriteByte(']')
	return .Bytes(), nil
}

func arrayRunEndEncodedEqual(,  *RunEndEncoded) bool {
	// types were already checked before getting here, so we know
	// the encoded types are equal
	 := encoded.NewMergedRuns([2]arrow.Array{, })
	for .Next() {
		 := .IndexIntoArray(0)
		 := .IndexIntoArray(1)
		if !SliceEqual(.values, , +1, .values, , +1) {
			return false
		}
	}
	return true
}

func arrayRunEndEncodedApproxEqual(,  *RunEndEncoded,  equalOption) bool {
	// types were already checked before getting here, so we know
	// the encoded types are equal
	 := encoded.NewMergedRuns([2]arrow.Array{, })
	for .Next() {
		 := .IndexIntoArray(0)
		 := .IndexIntoArray(1)
		if !sliceApproxEqual(.values, , +1, .values, , +1, ) {
			return false
		}
	}
	return true
}

type RunEndEncodedBuilder struct {
	builder

	dt        arrow.DataType
	runEnds   Builder
	values    Builder
	maxRunEnd uint64

	// currently, mixing AppendValueFromString & UnmarshalOne is unsupported
	lastUnmarshalled interface{}
	unmarshalled     bool // tracks if Unmarshal was called (in case lastUnmarshalled is nil)
	lastStr          *string
}

func ( memory.Allocator, ,  arrow.DataType) *RunEndEncodedBuilder {
	 := arrow.RunEndEncodedOf(, )
	if !.ValidRunEndsType() {
		panic("arrow/ree: invalid runEnds type for run length encoded array")
	}

	var  uint64
	switch .ID() {
	case arrow.INT16:
		 = math.MaxInt16
	case arrow.INT32:
		 = math.MaxInt32
	case arrow.INT64:
		 = math.MaxInt64
	}
	 := &RunEndEncodedBuilder{
		builder:          builder{mem: },
		dt:               ,
		runEnds:          NewBuilder(, ),
		values:           NewBuilder(, ),
		maxRunEnd:        ,
		lastUnmarshalled: nil,
	}
	.refCount.Add(1)
	return 
}

func ( *RunEndEncodedBuilder) () arrow.DataType {
	return .dt
}

func ( *RunEndEncodedBuilder) () {
	debug.Assert(.refCount.Load() > 0, "too many releases")

	if .refCount.Add(-1) == 0 {
		.values.Release()
		.runEnds.Release()
	}
}

func ( *RunEndEncodedBuilder) ( uint64) {
	if uint64(.length)+ > .maxRunEnd {
		panic(fmt.Errorf("%w: %s array length must fit be less than %d", arrow.ErrInvalid, .dt, .maxRunEnd))
	}

	.length += int()
}

func ( *RunEndEncodedBuilder) () {
	.lastUnmarshalled = nil
	.lastStr = nil
	.unmarshalled = false
	if .length == 0 {
		return
	}

	switch bldr := .runEnds.(type) {
	case *Int16Builder:
		.Append(int16(.length))
	case *Int32Builder:
		.Append(int32(.length))
	case *Int64Builder:
		.Append(int64(.length))
	}
}

func ( *RunEndEncodedBuilder) () Builder { return .values }

func ( *RunEndEncodedBuilder) ( uint64) {
	.finishRun()
	.addLength()
}

func ( *RunEndEncodedBuilder) ( []uint64) {
	for ,  := range  {
		.finishRun()
		.addLength()
	}
}

func ( *RunEndEncodedBuilder) ( uint64) {
	.addLength()
}

func ( *RunEndEncodedBuilder) () {
	.finishRun()
	.values.AppendNull()
	.addLength(1)
}

func ( *RunEndEncodedBuilder) ( int) {
	for  := 0;  < ; ++ {
		.AppendNull()
	}
}

func ( *RunEndEncodedBuilder) () int {
	return UnknownNullCount
}

func ( *RunEndEncodedBuilder) () {
	.AppendNull()
}

func ( *RunEndEncodedBuilder) ( int) {
	.AppendNulls()
}

func ( *RunEndEncodedBuilder) ( int) {
	.values.Reserve()
	.runEnds.Reserve()
}

func ( *RunEndEncodedBuilder) ( int) {
	.values.Resize()
	.runEnds.Resize()
}

func ( *RunEndEncodedBuilder) () *RunEndEncoded {
	 := .newData()
	defer .Release()
	return NewRunEndEncodedData()
}

func ( *RunEndEncodedBuilder) () arrow.Array {
	return .NewRunEndEncodedArray()
}

func ( *RunEndEncodedBuilder) () ( *Data) {
	.finishRun()
	 := .values.NewArray()
	defer .Release()
	 := .runEnds.NewArray()
	defer .Release()

	 = NewData(
		.dt, .length, []*memory.Buffer{},
		[]arrow.ArrayData{.Data(), .Data()}, 0, 0)
	.reset()
	return
}

// AppendValueFromString can't be used in conjunction with UnmarshalOne
func ( *RunEndEncodedBuilder) ( string) error {
	// we don't support mixing AppendValueFromString & UnmarshalOne
	if .unmarshalled {
		return fmt.Errorf("%w: mixing AppendValueFromString & UnmarshalOne not yet implemented", arrow.ErrNotImplemented)
	}

	if  == NullValueStr {
		.AppendNull()
		return nil
	}

	if .lastStr != nil &&  == *.lastStr {
		.ContinueRun(1)
		return nil
	}

	.Append(1)
	 := 
	.lastStr = &
	return .ValueBuilder().AppendValueFromString()
}

// UnmarshalOne can't be used in conjunction with AppendValueFromString
func ( *RunEndEncodedBuilder) ( *json.Decoder) error {
	// we don't support mixing AppendValueFromString & UnmarshalOne
	if .lastStr != nil {
		return fmt.Errorf("%w: mixing AppendValueFromString & UnmarshalOne not yet implemented", arrow.ErrNotImplemented)
	}

	var  interface{}
	if  := .Decode(&);  != nil {
		return 
	}

	// if we unmarshalled the same value as the previous one, we want to
	// continue the run. However, there's an edge case. At the start of
	// unmarshalling, lastUnmarshalled will be nil, but we might get
	// nil as the first value we unmarshal. In that case we want to
	// make sure we add a new run instead. We can detect that case by
	// checking that the number of runEnds matches the number of values
	// we have, which means no matter what we have to start a new run
	if reflect.DeepEqual(, .lastUnmarshalled) && ( != nil || .runEnds.Len() != .values.Len()) {
		.ContinueRun(1)
		return nil
	}

	,  := json.Marshal()
	if  != nil {
		return 
	}

	.Append(1)
	.lastUnmarshalled = 
	.unmarshalled = true
	return .ValueBuilder().UnmarshalOne(json.NewDecoder(bytes.NewReader()))
}

// Unmarshal can't be used in conjunction with AppendValueFromString (as it calls UnmarshalOne)
func ( *RunEndEncodedBuilder) ( *json.Decoder) error {
	.finishRun()
	for .More() {
		if  := .UnmarshalOne();  != nil {
			return 
		}
	}
	return nil
}

// UnmarshalJSON can't be used in conjunction with AppendValueFromString (as it calls UnmarshalOne)
func ( *RunEndEncodedBuilder) ( []byte) error {
	 := json.NewDecoder(bytes.NewReader())
	,  := .Token()
	if  != nil {
		return 
	}

	if ,  := .(json.Delim); ! ||  != '[' {
		return fmt.Errorf("list builder must unpack from json array, found %s", )
	}

	return .Unmarshal()
}

var (
	_ arrow.Array = (*RunEndEncoded)(nil)
	_ Builder     = (*RunEndEncodedBuilder)(nil)
)