// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package array

import (
	
	
	
	
	

	
	
	
	
	
	
	
)

// Concatenate creates a new arrow.Array which is the concatenation of the
// passed in arrays. Returns nil if an error is encountered.
//
// The passed in arrays still need to be released manually, and will not be
// released by this function.
func ( []arrow.Array,  memory.Allocator) ( arrow.Array,  error) {
	if len() == 0 {
		return nil, errors.New("array/concat: must pass at least one array")
	}

	// gather Data of inputs
	 := make([]arrow.ArrayData, len())
	for ,  := range  {
		if !arrow.TypeEqual(.DataType(), [0].DataType()) {
			return nil, fmt.Errorf("arrays to be concatenated must be identically typed, but %s and %s were encountered",
				[0].DataType(), .DataType())
		}
		[] = .Data()
	}

	,  := concat(, )
	if  != nil {
		return nil, 
	}

	defer .Release()
	return MakeFromData(), nil
}

// simple struct to hold ranges
type rng struct {
	offset, len int
}

// simple bitmap struct to reference a specific slice of a bitmap where the range
// offset and length are in bits
type bitmap struct {
	data []byte
	rng  rng
}

// gather up the bitmaps from the passed in data objects
func gatherBitmaps( []arrow.ArrayData,  int) []bitmap {
	 := make([]bitmap, len())
	for ,  := range  {
		if .Buffers()[] != nil {
			[].data = .Buffers()[].Bytes()
		}
		[].rng.offset = .Offset()
		[].rng.len = .Len()
	}
	return 
}

// gatherFixedBuffers gathers up the buffer objects of the given index, specifically
// returning only the slices of the buffers which are relevant to the passed in arrays
// in case they are themselves slices of other arrays. nil buffers are ignored and not
// in the output slice.
func gatherFixedBuffers( []arrow.ArrayData, ,  int) []*memory.Buffer {
	 := make([]*memory.Buffer, 0, len())
	for ,  := range  {
		 := .Buffers()[]
		if  == nil {
			continue
		}

		 = append(, memory.NewBufferBytes(.Bytes()[.Offset()*:(.Offset()+.Len())*]))
	}
	return 
}

// gatherBuffersFixedWidthType is like gatherFixedBuffers, but uses a datatype to determine the size
// to use for determining the byte slice rather than a passed in bytewidth.
func gatherBuffersFixedWidthType( []arrow.ArrayData,  int,  arrow.FixedWidthDataType) []*memory.Buffer {
	return gatherFixedBuffers(, , .BitWidth()/8)
}

// gatherBufferRanges requires that len(ranges) == len(data) and returns a list of buffers
// which represent the corresponding range of each buffer in the specified index of each
// data object.
func gatherBufferRanges( []arrow.ArrayData,  int,  []rng) []*memory.Buffer {
	 := make([]*memory.Buffer, 0, len())
	for ,  := range  {
		 := .Buffers()[]
		if  == nil {
			debug.Assert([].len == 0, "misaligned buffer value ranges")
			continue
		}

		 = append(, memory.NewBufferBytes(.Bytes()[[].offset:[].offset+[].len]))
	}
	return 
}

// gatherChildren gathers the children data objects for child of index idx for all of the data objects.
func gatherChildren( []arrow.ArrayData,  int) []arrow.ArrayData {
	return gatherChildrenMultiplier(, , 1)
}

// gatherChildrenMultiplier gathers the full data slice of the underlying values from the children data objects
// such as the values data for a list array so that it can return a slice of the buffer for a given
// index into the children.
func gatherChildrenMultiplier( []arrow.ArrayData, ,  int) []arrow.ArrayData {
	 := make([]arrow.ArrayData, len())
	for ,  := range  {
		[] = NewSliceData(.Children()[], int64(.Offset()*), int64(.Offset()+.Len())*int64())
	}
	return 
}

// gatherChildrenRanges returns a slice of Data objects which each represent slices of the given ranges from the
// child in the specified index from each data object.
func gatherChildrenRanges( []arrow.ArrayData,  int,  []rng) []arrow.ArrayData {
	debug.Assert(len() == len(), "mismatched children ranges for concat")
	 := make([]arrow.ArrayData, len())
	for ,  := range  {
		[] = NewSliceData(.Children()[], int64([].offset), int64([].offset+[].len))
	}
	return 
}

// creates a single contiguous buffer which contains the concatenation of all of the passed
// in buffer objects.
func concatBuffers( []*memory.Buffer,  memory.Allocator) *memory.Buffer {
	 := 0
	for ,  := range  {
		 += .Len()
	}
	 := memory.NewResizableBuffer()
	.Resize()

	 := .Bytes()
	for ,  := range  {
		copy(, .Bytes())
		 = [.Len():]
	}
	return 
}

func handle32BitOffsets( int,  []*memory.Buffer,  *memory.Buffer) (*memory.Buffer, []rng, error) {
	 := arrow.Int32Traits.CastFromBytes(.Bytes())
	 := make([]rng, len())
	 := int32(0)
	 := int(0)
	for ,  := range  {
		if .Len() == 0 {
			[].offset = 0
			[].len = 0
			continue
		}

		// when we gather our buffers, we sliced off the last offset from the buffer
		// so that we could count the lengths accurately
		 := arrow.Int32Traits.CastFromBytes(.Bytes())
		[].offset = int([0])
		// expand our slice to see that final offset
		 := [:len()+1]
		// compute the length of this range by taking the final offset and subtracting where we started.
		[].len = int([len()]) - [].offset

		if  > math.MaxInt32-int32([].len) {
			return nil, nil, errors.New("offset overflow while concatenating arrays")
		}

		// adjust each offset by the difference between our last ending point and our starting point
		 :=  - [0]
		for ,  := range  {
			[+] =  + 
		}

		// the next index for an element in the output buffer
		 += .Len() / arrow.Int32SizeBytes
		// update our offset counter to be the total current length of our output
		 += int32([].len)
	}

	// final offset should point to the end of the data
	[] = 
	return , , nil
}

func unifyDictionaries( memory.Allocator,  []arrow.ArrayData,  *arrow.DictionaryType) ([]*memory.Buffer, arrow.Array, error) {
	,  := NewDictionaryUnifier(, .ValueType)
	if  != nil {
		return nil, nil, 
	}
	defer .Release()

	 := make([]*memory.Buffer, len())
	for ,  := range  {
		 := MakeFromData(.Dictionary())
		defer .Release()
		[],  = .UnifyAndTranspose()
		if  != nil {
			return nil, nil, 
		}
	}

	,  := .GetResultWithIndexType(.IndexType)
	if  != nil {
		for ,  := range  {
			.Release()
		}
		return nil, nil, 
	}
	return , , nil
}

func concatDictIndices( memory.Allocator,  []arrow.ArrayData,  arrow.FixedWidthDataType,  []*memory.Buffer) ( *memory.Buffer,  error) {
	defer func() {
		if  != nil &&  != nil {
			.Release()
			 = nil
		}
	}()

	 := .BitWidth() / 8
	 := 0
	for ,  := range  {
		 += .Len()
		defer [].Release()
	}

	 = memory.NewResizableBuffer()
	.Resize( * )

	 := .Bytes()
	for ,  := range  {
		 := arrow.Int32Traits.CastFromBytes([].Bytes())
		 := .Buffers()[1].Bytes()
		if .Buffers()[0] == nil {
			if  = utils.TransposeIntsBuffers(, , , , .Offset(), 0, .Len(), );  != nil {
				return
			}
		} else {
			 := bitutils.NewBitRunReader(.Buffers()[0].Bytes(), int64(.Offset()), int64(.Len()))
			 := 0
			for {
				 := .NextRun()
				if .Len == 0 {
					break
				}

				if .Set {
					 = utils.TransposeIntsBuffers(, , , , .Offset()+, , int(.Len), )
					if  != nil {
						return
					}
				} else {
					memory.Set([:+(int(.Len)*)], 0x00)
				}

				 += int(.Len)
			}
		}
		 = [.Len()*:]
	}
	return
}

func handle64BitOffsets( int,  []*memory.Buffer,  *memory.Buffer) (*memory.Buffer, []rng, error) {
	 := arrow.Int64Traits.CastFromBytes(.Bytes())
	 := make([]rng, len())
	 := int64(0)
	 := int(0)
	for ,  := range  {
		if .Len() == 0 {
			[].offset = 0
			[].len = 0
			continue
		}

		// when we gather our buffers, we sliced off the last offset from the buffer
		// so that we could count the lengths accurately
		 := arrow.Int64Traits.CastFromBytes(.Bytes())
		[].offset = int([0])
		// expand our slice to see that final offset
		 := [:len()+1]
		// compute the length of this range by taking the final offset and subtracting where we started.
		[].len = int([len()]) - [].offset

		if  > math.MaxInt64-int64([].len) {
			return nil, nil, errors.New("offset overflow while concatenating arrays")
		}

		// adjust each offset by the difference between our last ending point and our starting point
		 :=  - [0]
		for ,  := range  {
			[+] =  + 
		}

		// the next index for an element in the output buffer
		 += .Len() / arrow.Int64SizeBytes
		// update our offset counter to be the total current length of our output
		 += int64([].len)
	}

	// final offset should point to the end of the data
	[] = 
	return , , nil
}

// concatOffsets creates a single offset buffer which represents the concatenation of all of the
// offsets buffers, adjusting the offsets appropriately to their new relative locations.
//
// It also returns the list of ranges that need to be fetched for the corresponding value buffers
// to construct the final concatenated value buffer.
func concatOffsets( []*memory.Buffer,  int,  memory.Allocator) (*memory.Buffer, []rng, error) {
	 := 0
	for ,  := range  {
		 += .Len() / 
	}

	 := memory.NewResizableBuffer()
	.Resize( * ( + 1))

	switch  {
	case arrow.Int64SizeBytes:
		return handle64BitOffsets(, , )
	default:
		return handle32BitOffsets(, , )
	}
}

func sumArraySizes( []arrow.ArrayData) int {
	 := 0
	for ,  := range  {
		 += .Len()
	}
	return 
}

func getListViewBufferValues[ int32 | int64]( arrow.ArrayData,  int) [] {
	 := .Buffers()[].Bytes()
	 := (*)(unsafe.Pointer(&[0]))
	 := unsafe.Slice(, .Offset()+.Len())
	return [.Offset():]
}

func putListViewOffsets32( arrow.ArrayData,  int32,  *memory.Buffer,  int) {
	debug.Assert(.DataType().ID() == arrow.LIST_VIEW, "putListViewOffsets32: expected LIST_VIEW data")
	,  := .Offset(), .Len()
	if  == 0 {
		return
	}
	 := .Buffers()[0]
	 := getListViewBufferValues[int32](, 1)
	 := getListViewBufferValues[int32](, 2)
	 := func( int) bool {
		return ( == nil || bitutil.BitIsSet(.Bytes(), +)) && [] > 0
	}

	 := arrow.Int32Traits.CastFromBytes(.Bytes())
	for ,  := range  {
		if () {
			// This is guaranteed by RangeOfValuesUsed returning the smallest offset
			// of valid and non-empty list-views.
			debug.Assert(+ >= 0, "putListViewOffsets32: offset underflow while concatenating arrays")
			[+] =  + 
		} else {
			[+] = 0
		}
	}
}

func putListViewOffsets64( arrow.ArrayData,  int64,  *memory.Buffer,  int) {
	debug.Assert(.DataType().ID() == arrow.LARGE_LIST_VIEW, "putListViewOffsets64: expected LARGE_LIST_VIEW data")
	,  := .Offset(), .Len()
	if  == 0 {
		return
	}
	 := .Buffers()[0]
	 := getListViewBufferValues[int64](, 1)
	 := getListViewBufferValues[int64](, 2)
	 := func( int) bool {
		return ( == nil || bitutil.BitIsSet(.Bytes(), +)) && [] > 0
	}

	 := arrow.Int64Traits.CastFromBytes(.Bytes())
	for ,  := range  {
		if () {
			// This is guaranteed by RangeOfValuesUsed returning the smallest offset
			// of valid and non-empty list-views.
			debug.Assert(+ >= 0, "putListViewOffsets64: offset underflow while concatenating arrays")
			[+] =  + 
		} else {
			[+] = 0
		}
	}
}

// Concatenate buffers holding list-view offsets into a single buffer of offsets
//
// valueRanges contains the relevant ranges of values in the child array actually
// referenced to by the views. Most commonly, these ranges will start from 0,
// but when that is not the case, we need to adjust the displacement of offsets.
// The concatenated child array does not contain values from the beginning
// if they are not referenced to by any view.
func concatListViewOffsets( []arrow.ArrayData,  int,  []rng,  memory.Allocator) (*memory.Buffer, error) {
	 := sumArraySizes()
	if  == 4 &&  > math.MaxInt32 {
		return nil, fmt.Errorf("%w: offset overflow while concatenating arrays", arrow.ErrInvalid)
	}
	 := memory.NewResizableBuffer()
	.Resize( * )

	,  := 0, 0
	for ,  := range  {
		 :=  - [].offset
		if  == 4 {
			putListViewOffsets32(, int32(), , )
		} else {
			putListViewOffsets64(, int64(), , )
		}
		 += .Len()
		 += [].len
	}
	debug.Assert( == , "implementation error")

	return , nil
}

func zeroNullListViewSizes[ int32 | int64]( arrow.ArrayData) {
	if .Len() == 0 || .Buffers()[0] == nil {
		return
	}
	 := .Buffers()[0].Bytes()
	 := getListViewBufferValues[](, 2)

	for  := 0;  < .Len(); ++ {
		if !bitutil.BitIsSet(, .Offset()+) {
			[] = 0
		}
	}
}

func concatListView( []arrow.ArrayData,  arrow.FixedWidthDataType,  *Data,  memory.Allocator) ( error) {
	// Calculate the ranges of values that each list-view array uses
	 := make([]rng, len())
	for ,  := range  {
		,  := rangeOfValuesUsed()
		[].offset = 
		[].len = 
	}

	// Gather the children ranges of each input array
	 := gatherChildrenRanges(, 0, )
	for ,  := range  {
		defer .Release()
	}

	// Concatenate the values
	,  := concat(, )
	if  != nil {
		return 
	}

	// Concatenate the offsets
	,  := concatListViewOffsets(, .Bytes(), , )
	if  != nil {
		return 
	}

	// Concatenate the sizes
	 := gatherBuffersFixedWidthType(, 2, )
	 := concatBuffers(, )

	.childData = []arrow.ArrayData{}
	.buffers[1] = 
	.buffers[2] = 

	// To make sure the sizes don't reference values that are not in the new
	// concatenated values array, we zero the sizes of null list-view values.
	if .ID() == arrow.INT32 {
		zeroNullListViewSizes[int32]()
	} else {
		zeroNullListViewSizes[int64]()
	}

	return nil
}

// concat is the implementation for actually performing the concatenation of the arrow.ArrayData
// objects that we can call internally for nested types.
func concat( []arrow.ArrayData,  memory.Allocator) ( arrow.ArrayData,  error) {
	 := &Data{dtype: [0].DataType(), nulls: 0}
	.refCount.Add(1)

	defer func() {
		if  := recover();  != nil {
			 = utils.FormatRecoveredError("arrow/concat", )
		}
		if  != nil {
			.Release()
		}
	}()
	for ,  := range  {
		.length += .Len()
		if .nulls == UnknownNullCount || .NullN() == UnknownNullCount {
			.nulls = UnknownNullCount
			continue
		}
		.nulls += .NullN()
	}

	.buffers = make([]*memory.Buffer, len([0].Buffers()))
	if .nulls != 0 && .dtype.ID() != arrow.NULL {
		,  := concatBitmaps(gatherBitmaps(, 0), )
		if  != nil {
			return nil, 
		}
		.buffers[0] = 
	}

	 := .dtype
	if .ID() == arrow.EXTENSION {
		 = .(arrow.ExtensionType).StorageType()
	}

	switch dt := .(type) {
	case *arrow.NullType:
	case *arrow.BooleanType:
		,  := concatBitmaps(gatherBitmaps(, 1), )
		if  != nil {
			return nil, 
		}
		.buffers[1] = 
	case *arrow.DictionaryType:
		 := .IndexType.(arrow.FixedWidthDataType)
		// two cases: all dictionaries are the same or we need to unify them
		 := true
		 := MakeFromData([0].Dictionary())
		defer .Release()
		for ,  := range  {
			 := MakeFromData(.Dictionary())
			if !Equal(, ) {
				.Release()
				 = false
				break
			}
			.Release()
		}

		 := gatherBuffersFixedWidthType(, 1, )
		if  {
			.dictionary = .Data().(*Data)
			.dictionary.Retain()
			.buffers[1] = concatBuffers(, )
			break
		}

		, ,  := unifyDictionaries(, , )
		if  != nil {
			return nil, 
		}
		defer .Release()
		.dictionary = .Data().(*Data)
		.dictionary.Retain()

		.buffers[1],  = concatDictIndices(, , , )
		if  != nil {
			return nil, 
		}
	case arrow.FixedWidthDataType:
		.buffers[1] = concatBuffers(gatherBuffersFixedWidthType(, 1, ), )
	case arrow.BinaryViewDataType:
		.buffers = .buffers[:2]
		for ,  := range  {
			for ,  := range .Buffers()[2:] {
				.Retain()
				.buffers = append(.buffers, )
			}
		}

		.buffers[1] = concatBuffers(gatherFixedBuffers(, 1, arrow.ViewHeaderSizeBytes), )

		var (
			                  = arrow.ViewHeaderTraits.CastFromBytes(.buffers[1].Bytes())
			                  = [0].Len()
			 int
		)

		for  := 1;  < len(); ++ {
			 += len([-1].Buffers()) - 2

			for  :=  + [].Len();  < ; ++ {
				if [].IsInline() {
					continue
				}

				 := [].BufferIndex() + int32()
				[].SetIndexOffset(, [].BufferOffset())
			}
		}
	case arrow.BinaryDataType:
		 := .Layout().Buffers[1].ByteWidth
		, ,  := concatOffsets(gatherFixedBuffers(, 1, ), , )
		if  != nil {
			return nil, 
		}
		.buffers[1] = 
		.buffers[2] = concatBuffers(gatherBufferRanges(, 2, ), )
	case *arrow.ListType:
		 := .Layout().Buffers[1].ByteWidth
		, ,  := concatOffsets(gatherFixedBuffers(, 1, ), , )
		if  != nil {
			return nil, 
		}
		 := gatherChildrenRanges(, 0, )
		for ,  := range  {
			defer .Release()
		}

		.buffers[1] = 
		.childData = make([]arrow.ArrayData, 1)
		.childData[0],  = (, )
		if  != nil {
			return nil, 
		}
	case *arrow.LargeListType:
		 := .Layout().Buffers[1].ByteWidth
		, ,  := concatOffsets(gatherFixedBuffers(, 1, ), , )
		if  != nil {
			return nil, 
		}
		 := gatherChildrenRanges(, 0, )
		for ,  := range  {
			defer .Release()
		}

		.buffers[1] = 
		.childData = make([]arrow.ArrayData, 1)
		.childData[0],  = (, )
		if  != nil {
			return nil, 
		}
	case *arrow.ListViewType:
		 := arrow.PrimitiveTypes.Int32.(arrow.FixedWidthDataType)
		 := concatListView(, , , )
		if  != nil {
			return nil, 
		}
	case *arrow.LargeListViewType:
		 := arrow.PrimitiveTypes.Int64.(arrow.FixedWidthDataType)
		 := concatListView(, , , )
		if  != nil {
			return nil, 
		}
	case *arrow.FixedSizeListType:
		 := gatherChildrenMultiplier(, 0, int(.Len()))
		for ,  := range  {
			defer .Release()
		}

		,  := (, )
		if  != nil {
			return nil, 
		}
		.childData = []arrow.ArrayData{}
	case *arrow.StructType:
		.childData = make([]arrow.ArrayData, .NumFields())
		for  := range .Fields() {
			 := gatherChildren(, )
			for ,  := range  {
				defer .Release()
			}

			,  := (, )
			if  != nil {
				return nil, 
			}
			.childData[] = 
		}
	case *arrow.MapType:
		 := .Layout().Buffers[1].ByteWidth
		, ,  := concatOffsets(gatherFixedBuffers(, 1, ), , )
		if  != nil {
			return nil, 
		}
		 := gatherChildrenRanges(, 0, )
		for ,  := range  {
			defer .Release()
		}

		.buffers[1] = 
		.childData = make([]arrow.ArrayData, 1)
		.childData[0],  = (, )
		if  != nil {
			return nil, 
		}
	case *arrow.RunEndEncodedType:
		,  := int(0), false
		// we can't use gatherChildren because the Offset and Len of
		// data doesn't correspond to the physical length or offset
		 := make([]arrow.ArrayData, len())
		 := make([]arrow.ArrayData, len())
		for ,  := range  {
			 := encoded.GetPhysicalLength()
			 := encoded.FindPhysicalOffset()

			[] = NewSliceData(.Children()[0], int64(), int64(+))
			defer [].Release()
			[] = NewSliceData(.Children()[1], int64(), int64(+))
			defer [].Release()

			,  = addOvf(, )
			if  {
				return nil, fmt.Errorf("%w: run end encoded array length must fit into a 32-bit signed integer",
					arrow.ErrInvalid)
			}
		}

		 := [0].DataType().(arrow.FixedWidthDataType).Bytes()
		 := gatherFixedBuffers(, 1, )
		 :=  * 
		 := memory.NewResizableBuffer()
		.Resize()
		defer .Release()

		if  := updateRunEnds(, , , );  != nil {
			return nil, 
		}

		.childData = make([]arrow.ArrayData, 2)
		.childData[0] = NewData([0].Children()[0].DataType(), int(),
			[]*memory.Buffer{nil, }, nil, 0, 0)

		var  error
		.childData[1],  = (, )
		if  != nil {
			.childData[0].Release()
			return nil, 
		}
	default:
		return nil, fmt.Errorf("concatenate not implemented for type %s", )
	}

	return , nil
}

// check overflow in the addition, taken from bits.Add but adapted for signed integers
// rather than unsigned integers. bits.UintSize will be either 32 or 64 based on
// whether our architecture is 32 bit or 64. The operation is the same for both cases,
// the only difference is how much we need to shift by 30 for 32 bit and 62 for 64 bit.
// Thus, bits.UintSize - 2 is how much we shift right by to check if we had an overflow
// in the signed addition.
//
// First return is the result of the sum, the second return is true if there was an overflow
func addOvf(,  int) (int, bool) {
	 :=  + 
	return , ((&)|((|)&^))>>(bits.UintSize-2) == 1
}

// concatenate bitmaps together and return a buffer with the combined bitmaps
func concatBitmaps( []bitmap,  memory.Allocator) (*memory.Buffer, error) {
	var (
		   int
		 bool
	)

	for ,  := range  {
		if ,  = addOvf(, .rng.len);  {
			return nil, errors.New("length overflow when concatenating arrays")
		}
	}

	 := memory.NewResizableBuffer()
	.Resize(int(bitutil.BytesForBits(int64())))
	 := .Bytes()

	 := 0
	for ,  := range  {
		if .data == nil { // if the bitmap is nil, that implies that the value is true for all elements
			bitutil.SetBitsTo(.Bytes(), int64(), int64(.rng.len), true)
		} else {
			bitutil.CopyBitmap(.data, .rng.offset, .rng.len, , )
		}
		 += .rng.len
	}
	return , nil
}

func updateRunEnds( int,  []arrow.ArrayData,  []*memory.Buffer,  *memory.Buffer) error {
	switch  {
	case 2:
		 := arrow.Int16Traits.CastFromBytes(.Bytes())
		return updateRunsInt16(, , )
	case 4:
		 := arrow.Int32Traits.CastFromBytes(.Bytes())
		return updateRunsInt32(, , )
	case 8:
		 := arrow.Int64Traits.CastFromBytes(.Bytes())
		return updateRunsInt64(, , )
	}
	return fmt.Errorf("%w: invalid dataType for RLE runEnds", arrow.ErrInvalid)
}

func updateRunsInt16( []arrow.ArrayData,  []*memory.Buffer,  []int16) error {
	// for now we will not attempt to optimize by checking if we
	// can fold the end and beginning of each array we're concatenating
	// into a single run
	 := 0
	for ,  := range  {
		if .Len() == 0 {
			continue
		}
		 := arrow.Int16Traits.CastFromBytes(.Bytes())
		if  == 0 {
			 += copy(, )
			continue
		}

		 := [-1]
		// we can check the last runEnd in the src and add it to the
		// last value that we're adjusting them all by to see if we
		// are going to overflow
		if int64()+int64(int([len()-1])-[].Offset()) > math.MaxInt16 {
			return fmt.Errorf("%w: overflow in run-length-encoded run ends concat", arrow.ErrInvalid)
		}

		// adjust all of the run ends by first normalizing them (e - data[i].offset)
		// then adding the previous value we ended on. Since the offset
		// is a logical length offset it should be accurate to just subtract
		// it from each value.
		for ,  := range  {
			[+] =  + int16(int()-[].Offset())
		}
		 += len()
	}
	return nil
}

func updateRunsInt32( []arrow.ArrayData,  []*memory.Buffer,  []int32) error {
	// for now we will not attempt to optimize by checking if we
	// can fold the end and beginning of each array we're concatenating
	// into a single run
	 := 0
	for ,  := range  {
		if .Len() == 0 {
			continue
		}
		 := arrow.Int32Traits.CastFromBytes(.Bytes())
		if  == 0 {
			 += copy(, )
			continue
		}

		 := [-1]
		// we can check the last runEnd in the src and add it to the
		// last value that we're adjusting them all by to see if we
		// are going to overflow
		if int64()+int64(int([len()-1])-[].Offset()) > math.MaxInt32 {
			return fmt.Errorf("%w: overflow in run-length-encoded run ends concat", arrow.ErrInvalid)
		}

		// adjust all of the run ends by first normalizing them (e - data[i].offset)
		// then adding the previous value we ended on. Since the offset
		// is a logical length offset it should be accurate to just subtract
		// it from each value.
		for ,  := range  {
			[+] =  + int32(int()-[].Offset())
		}
		 += len()
	}
	return nil
}

func updateRunsInt64( []arrow.ArrayData,  []*memory.Buffer,  []int64) error {
	// for now we will not attempt to optimize by checking if we
	// can fold the end and beginning of each array we're concatenating
	// into a single run
	 := 0
	for ,  := range  {
		if .Len() == 0 {
			continue
		}
		 := arrow.Int64Traits.CastFromBytes(.Bytes())
		if  == 0 {
			 += copy(, )
			continue
		}

		 := [-1]
		// we can check the last runEnd in the src and add it to the
		// last value that we're adjusting them all by to see if we
		// are going to overflow
		if uint64()+uint64(int([len()-1])-[].Offset()) > math.MaxInt64 {
			return fmt.Errorf("%w: overflow in run-length-encoded run ends concat", arrow.ErrInvalid)
		}

		// adjust all of the run ends by first normalizing them (e - data[i].offset)
		// then adding the previous value we ended on. Since the offset
		// is a logical length offset it should be accurate to just subtract
		// it from each value.
		for ,  := range  {
			[+] =  +  - int64([].Offset())
		}
		 += len()
	}
	return nil
}