// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18

package exec

import (
	
	
	
	
	
	

	
	
	
	
)

// GetSpanValues returns a properly typed slice by reinterpreting
// the buffer at index i using unsafe.Slice. This will take into account
// the offset of the given ArraySpan.
func [ arrow.FixedWidthType]( *ArraySpan,  int) [] {
	if len(.Buffers[].Buf) == 0 {
		return nil
	}
	 := unsafe.Slice((*)(unsafe.Pointer(&.Buffers[].Buf[0])), .Offset+.Len)
	return [.Offset:]
}

// GetSpanOffsets is like GetSpanValues, except it is only for int32
// or int64 and adds the additional 1 expected value for an offset
// buffer (ie. len(output) == span.Len+1)
func [ int32 | int64]( *ArraySpan,  int) [] {
	 := unsafe.Slice((*)(unsafe.Pointer(&.Buffers[].Buf[0])), .Offset+.Len+1)
	return [.Offset:]
}

func [ cmp.Ordered](,  )  {
	if  <  {
		return 
	}
	return 
}

func [ cmp.Ordered](,  )  {
	if  >  {
		return 
	}
	return 
}

// OptionsInit should be used in the case where a KernelState is simply
// represented with a specific type by value (instead of pointer).
// This will initialize the KernelState as a value-copied instance of
// the passed in function options argument to ensure separation
// and allow the kernel to manipulate the options if necessary without
// any negative consequences since it will have its own copy of the options.
func [ any]( *KernelCtx,  KernelInitArgs) (KernelState, error) {
	if ,  := .Options.(*);  {
		return *, nil
	}

	return nil, fmt.Errorf("%w: attempted to initialize kernel state from invalid function options",
		arrow.ErrInvalid)
}

type arrayBuilder[ arrow.NumericType | bool] interface {
	array.Builder
	Append()
	AppendValues([], []bool)
}

func [ arrow.NumericType | bool]( memory.Allocator,  []) arrow.Array {
	 := array.NewBuilder(, arrow.GetDataType[]()).(arrayBuilder[])
	defer .Release()

	.AppendValues(, nil)
	return .NewArray()
}

func [ arrow.NumericType | bool]( memory.Allocator,  [],  []bool) arrow.Array {
	 := array.NewBuilder(, arrow.GetDataType[]()).(arrayBuilder[])
	defer .Release()

	.AppendValues(, )
	return .NewArray()
}

func ( [][]arrow.Array) [][]arrow.Array {
	if len() <= 1 {
		return 
	}

	var  int
	for ,  := range [0] {
		 += .Len()
	}

	if  == 0 {
		return 
	}

	 := make([][]arrow.Array, len())
	 := make([]int64, len())
	// scan all array vectors at once, rechunking along the way
	var  int64
	for  < int64() {
		// first compute max possible length for next chunk
		var  int64 = math.MaxInt64
		for ,  := range  {
			 := []
			// skip any done arrays including 0-length
			for  == int64([0].Len()) {
				 = [1:]
				 = 0
			}
			 := [0]
			 = Min(, int64(.Len())-)

			[] = 
			[] = 
		}

		// now slice all the arrays along this chunk size
		for ,  := range  {
			 := []
			 := [0]
			if  == 0 && int64(.Len()) ==  {
				// slice spans entire array
				.Retain()
				[] = append([], )
			} else {
				[] = append([], array.NewSlice(, int64(), int64(+)))
			}
			[] += 
		}

		 += int64()
	}
	return 
}

type ChunkResolver struct {
	offsets []int64
	cached  atomic.Int64
}

func ( []arrow.Array) *ChunkResolver {
	 := make([]int64, len()+1)
	var  int64
	for ,  := range  {
		 := 
		 += int64(.Len())
		[] = 
	}
	[len()] = 
	return &ChunkResolver{offsets: }
}

func ( *ChunkResolver) ( int64) (,  int64) {
	// some algorithms consecutively access indexes that are a
	// relatively small distance from each other, falling into
	// the same chunk.
	// This is trivial when merging (assuming each side of the
	// merge uses its own resolver), but also in the inner
	// recursive invocations of partitioning.
	if len(.offsets) <= 1 {
		return 0, 
	}

	 := .cached.Load()
	 :=  >= .offsets[] &&  < .offsets[+1]
	if  {
		return ,  - .offsets[]
	}

	,  := slices.BinarySearch(.offsets, )
	if ! {
		--
	}

	,  = int64(), -.offsets[]
	.cached.Store()
	return
}

type arrayTypes interface {
	arrow.FixedWidthType | arrow.TemporalType | bool | string | []byte
}

type ArrayIter[ arrayTypes] interface {
	Next() 
}

type BoolIter struct {
	Rdr *bitutil.BitmapReader
}

func ( *ArraySpan) ArrayIter[bool] {
	return &BoolIter{
		Rdr: bitutil.NewBitmapReader(.Buffers[1].Buf, int(.Offset), int(.Len)),
	}
}

func ( *BoolIter) () ( bool) {
	 = .Rdr.Set()
	.Rdr.Next()
	return
}

type PrimitiveIter[ arrow.FixedWidthType] struct {
	Values []
}

func [ arrow.FixedWidthType]( *ArraySpan) ArrayIter[] {
	return &PrimitiveIter[]{Values: GetSpanValues[](, 1)}
}

func ( *PrimitiveIter[]) () ( ) {
	 = .Values[0]
	.Values = .Values[1:]
	return
}

type VarBinaryIter[ int32 | int64] struct {
	Offsets []
	Data    []byte
	Pos     int64
}

func [ int32 | int64]( *ArraySpan) ArrayIter[[]byte] {
	return &VarBinaryIter[]{
		Offsets: GetSpanOffsets[](, 1),
		Data:    .Buffers[2].Buf,
	}
}

func ( *VarBinaryIter[]) () []byte {
	 := .Pos
	.Pos++
	return .Data[.Offsets[]:.Offsets[.Pos]]
}

type FSBIter struct {
	Data  []byte
	Width int
	Pos   int64
}

func ( *ArraySpan) ArrayIter[[]byte] {
	return &FSBIter{
		Data:  .Buffers[1].Buf,
		Width: .Type.(arrow.FixedWidthDataType).Bytes(),
	}
}

func ( *FSBIter) () []byte {
	 := .Width * int(.Pos)
	.Pos++
	return .Data[ : +.Width]
}