// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package encoded

import (
	
	

	
)

// FindPhysicalIndex performs a binary search on the run-ends to return
// the appropriate physical offset into the values/run-ends that corresponds
// with the logical index provided when called. If the array's logical offset
// is provided, this is equivalent to calling FindPhysicalOffset.
//
// For example, an array with run-ends [10, 20, 30, 40, 50] and a logicalIdx
// of 25 will return the value 2. This returns the smallest offset
// whose run-end is greater than the logicalIdx requested, which would
// also be the index into the values that contains the correct value.
//
// This function assumes it receives Run End Encoded array data
func ( arrow.ArrayData,  int) int {
	 := .Children()[0]
	if .Len() == 0 {
		return 0
	}

	switch .DataType().ID() {
	case arrow.INT16:
		 := arrow.Int16Traits.CastFromBytes(.Buffers()[1].Bytes())
		 = [.Offset() : .Offset()+.Len()]
		return sort.Search(len(), func( int) bool { return [] > int16() })
	case arrow.INT32:
		 := arrow.Int32Traits.CastFromBytes(.Buffers()[1].Bytes())
		 = [.Offset() : .Offset()+.Len()]
		return sort.Search(len(), func( int) bool { return [] > int32() })
	case arrow.INT64:
		 := arrow.Int64Traits.CastFromBytes(.Buffers()[1].Bytes())
		 = [.Offset() : .Offset()+.Len()]
		return sort.Search(len(), func( int) bool { return [] > int64() })
	default:
		panic("only int16, int32, and int64 are allowed for the run-ends")
	}
}

// FindPhysicalOffset performs a binary search on the run-ends to return
// the appropriate physical offset into the values/run-ends that corresponds
// with the logical offset defined in the array.
//
// For example, an array with run-ends [10, 20, 30, 40, 50] and a logical
// offset of 25 will return the value 2. This returns the smallest offset
// whose run-end is greater than the logical offset, which would also be the
// offset index into the values that contains the correct value.
//
// This function assumes it receives Run End Encoded array data
func ( arrow.ArrayData) int {
	return FindPhysicalIndex(, .Offset())
}

// GetPhysicalLength returns the physical number of values which are in
// the passed in RunEndEncoded array data. This will take into account
// the offset and length of the array as reported in the array data
// (so that it properly handles slices).
//
// This function assumes it receives Run End Encoded array data
func ( arrow.ArrayData) int {
	if .Len() == 0 {
		return 0
	}

	 := .Children()[0]
	 := FindPhysicalOffset()
	,  := .Offset()+, .Len()-
	 := .Offset() + .Len() - 1

	switch .DataType().ID() {
	case arrow.INT16:
		 := arrow.Int16Traits.CastFromBytes(.Buffers()[1].Bytes())
		 = [ : +]
		return sort.Search(len(), func( int) bool { return [] > int16() }) + 1
	case arrow.INT32:
		 := arrow.Int32Traits.CastFromBytes(.Buffers()[1].Bytes())
		 = [ : +]
		return sort.Search(len(), func( int) bool { return [] > int32() }) + 1
	case arrow.INT64:
		 := arrow.Int64Traits.CastFromBytes(.Buffers()[1].Bytes())
		 = [ : +]
		return sort.Search(len(), func( int) bool { return [] > int64() }) + 1
	default:
		panic("arrow/rle: can only get rle.PhysicalLength for int16/int32/int64 run ends array")
	}
}

func getRunEnds( arrow.ArrayData) func(int64) int64 {
	switch .DataType().ID() {
	case arrow.INT16:
		 := arrow.Int16Traits.CastFromBytes(.Buffers()[1].Bytes())
		 = [.Offset() : .Offset()+.Len()]
		return func( int64) int64 { return int64([]) }
	case arrow.INT32:
		 := arrow.Int32Traits.CastFromBytes(.Buffers()[1].Bytes())
		 = [.Offset() : .Offset()+.Len()]
		return func( int64) int64 { return int64([]) }
	case arrow.INT64:
		 := arrow.Int64Traits.CastFromBytes(.Buffers()[1].Bytes())
		 = [.Offset() : .Offset()+.Len()]
		return func( int64) int64 { return int64([]) }
	default:
		panic("only int16, int32, and int64 are allowed for the run-ends")
	}
}

// MergedRuns is used to take two Run End Encoded arrays and iterate
// them, finding the correct physical indices to correspond with the
// runs.
type MergedRuns struct {
	inputs       [2]arrow.Array
	runIndex     [2]int64
	inputRunEnds [2]func(int64) int64
	runEnds      [2]int64
	logicalLen   int
	logicalPos   int
	mergedEnd    int64
}

// NewMergedRuns takes two RunEndEncoded arrays and returns a MergedRuns
// object that will allow iterating over the physical indices of the runs.
func ( [2]arrow.Array) *MergedRuns {
	if len() == 0 {
		return &MergedRuns{logicalLen: 0}
	}

	 := &MergedRuns{inputs: , logicalLen: [0].Len()}
	for ,  := range  {
		if .DataType().ID() != arrow.RUN_END_ENCODED {
			panic("arrow/rle: NewMergedRuns can only be called with RunLengthEncoded arrays")
		}
		if .Len() != .logicalLen {
			panic("arrow/rle: can only merge runs of RLE arrays of the same length")
		}

		.inputRunEnds[] = getRunEnds(.Data().Children()[0])
		// initialize the runIndex at the physical offset - 1 so the first
		// call to Next will increment it to the correct initial offset
		// since the initial state is logicalPos == 0 and mergedEnd == 0
		.runIndex[] = int64(FindPhysicalOffset(.Data())) - 1
	}

	return 
}

// Next returns true if there are more values/runs to iterate and false
// when one of the arrays has reached the end.
func ( *MergedRuns) () bool {
	.logicalPos = int(.mergedEnd)
	if .isEnd() {
		return false
	}

	for  := range .inputs {
		if .logicalPos == int(.runEnds[]) {
			.runIndex[]++
		}
	}
	.findMergedRun()

	return true
}

// IndexIntoBuffer returns the physical index into the value buffer of
// the passed in array index (ie: 0 for the first array and 1 for the second)
// this takes into account the offset of the array so it is the true physical
// index into the value *buffer* in the child.
func ( *MergedRuns) ( int) int64 {
	return .runIndex[] + int64(.inputs[].Data().Children()[1].Offset())
}

// IndexIntoArray is like IndexIntoBuffer but it doesn't take into account
// the array offset and instead is the index that can be used with the .Value
// method on the array to get the correct value.
func ( *MergedRuns) ( int) int64 { return .runIndex[] }

// RunLength returns the logical length of the current merged run being looked at.
func ( *MergedRuns) () int64 { return .mergedEnd - int64(.logicalPos) }

// AccumulatedRunLength returns the logical run end of the current merged run.
func ( *MergedRuns) () int64 { return .mergedEnd }

func ( *MergedRuns) () {
	.mergedEnd = int64(math.MaxInt64)
	for ,  := range .inputs {
		// logical indices of the end of the run we are currently in each input
		.runEnds[] = int64(.inputRunEnds[](.runIndex[]) - int64(.Data().Offset()))
		// the logical length may end in the middle of a run, in case the array was sliced
		if .logicalLen < int(.runEnds[]) {
			.runEnds[] = int64(.logicalLen)
		}
		if .runEnds[] < .mergedEnd {
			.mergedEnd = .runEnds[]
		}
	}
}

func ( *MergedRuns) () bool { return .logicalPos == .logicalLen }