// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18

package kernels

import (
	
	
	
	

	
	
	
	
	
	
	
	
)

type RunEndEncodeState struct {
	RunEndType arrow.DataType
}

func (RunEndEncodeState) () string {
	return "RunEndEncodeOptions"
}

type RunEndsType interface {
	int16 | int32 | int64
}

func readFixedWidthVal[ arrow.FixedWidthType](,  []byte,  int64,  *) bool {
	 := int64(unsafe.Sizeof(*))
	* = *(*)(unsafe.Pointer(&[*]))
	return bitutil.BitIsSet(, int())
}

func writeFixedWidthVal[ arrow.FixedWidthType]( *exec.ExecResult,  int64,  bool,  ) {
	if len(.Buffers[0].Buf) != 0 {
		bitutil.SetBitTo(.Buffers[0].Buf, int(), )
	}

	 := arrow.GetData[](.Buffers[1].Buf)
	[] = 
}

func readBoolVal(,  []byte,  int64,  *bool) bool {
	* = bitutil.BitIsSet(, int())
	return bitutil.BitIsSet(, int())
}

func writeBoolVal( *exec.ExecResult,  int64,  bool,  bool) {
	if len(.Buffers[0].Buf) != 0 {
		bitutil.SetBitTo(.Buffers[0].Buf, int(), )
	}
	bitutil.SetBitTo(.Buffers[1].Buf, int(), )
}

type runEndEncodeLoopFixedWidth[ RunEndsType,  arrow.FixedWidthType | bool] struct {
	inputLen, inputOffset int64
	inputValidity         []byte
	inputValues           []byte
	valueType             arrow.DataType

	readValue  func(inputValidity, inputValues []byte, offset int64, out *) bool
	writeValue func(*exec.ExecResult, int64, bool, )
}

func ( *runEndEncodeLoopFixedWidth[, ]) ( *exec.ExecResult) int64 {
	 := arrow.GetData[](.Children[0].Buffers[1].Buf)

	 := .inputOffset
	var  
	 := .readValue(.inputValidity, .inputValues, , &)
	++

	var  int64
	var  
	for  < .inputOffset+.inputLen {
		 := .readValue(.inputValidity, .inputValues, , &)
		if  !=  ||  !=  {
			// close the current run by writing it out
			.writeValue(&.Children[1], , , )
			 := ( - .inputOffset)
			[] = 
			++
			,  = , 
		}
		++
	}

	.writeValue(&.Children[1], , , )
	[] = (.inputLen)
	return  + 1
}

func ( *runEndEncodeLoopFixedWidth[, ]) () (,  int64) {
	 := .inputOffset
	var  
	 := .readValue(.inputValidity, .inputValues, , &)
	++

	if  {
		 = 1
	}
	 = 1

	var  
	for  < .inputOffset+.inputLen {
		 := .readValue(.inputValidity, .inputValues, , &)
		++
		// new run
		if  !=  ||  !=  {
			 = 
			 = 

			++
			if  {
				++
			}
		}
	}
	return
}

func ( *runEndEncodeLoopFixedWidth[, ]) ( *exec.KernelCtx,  int64,  *exec.ExecResult) {
	 := .Allocate(int() * int(SizeOf[]()))
	var  *memory.Buffer
	if len(.inputValidity) > 0 {
		 = .AllocateBitmap()
	}

	var  *memory.Buffer
	 := .valueType.Layout().Buffers[1]
	if .Kind == arrow.KindBitmap {
		 = .AllocateBitmap()
	} else {
		 = .Allocate(int() * .ByteWidth)
	}

	 := arrow.RunEndEncodedOf(arrow.GetDataType[](), .valueType)
	.Release()

	* = exec.ExecResult{
		Type:   ,
		Len:    .inputLen,
		Nulls:  0,
		Offset: 0,
		Children: []exec.ArraySpan{
			{
				Type: .RunEnds(),
				Len:  ,
			},
			{
				Type: .Encoded(),
				Len:  ,
			},
		},
	}

	.Children[0].Buffers[1].WrapBuffer()
	if  != nil {
		.Children[1].Buffers[0].WrapBuffer()
	}
	.Children[1].Buffers[1].WrapBuffer()
}

type runEndEncodeFSB[ RunEndsType] struct {
	inputLen, inputOffset      int64
	inputValidity, inputValues []byte
	valueType                  arrow.DataType
	width                      int
}

func ( *runEndEncodeFSB[]) ( int64) ([]byte, bool) {
	if len(.inputValidity) > 0 && bitutil.BitIsNotSet(.inputValidity, int()) {
		return nil, false
	}

	,  := *int64(.width), (+1)*int64(.width)
	return .inputValues[:], true
}

func ( *runEndEncodeFSB[]) () (,  int64) {
	 := .inputOffset
	,  := .readValue()
	++

	if  {
		++
	}
	 = 1

	for  < .inputOffset+.inputLen {
		,  := .readValue()
		++
		if  !=  || !bytes.Equal(, ) {
			,  = , 
			++
			if  {
				++
			}
		}
	}
	return
}

func ( *runEndEncodeFSB[]) ( *exec.KernelCtx,  int64,  *exec.ExecResult) {
	 := .Allocate(int() * int(SizeOf[]()))
	var  *memory.Buffer
	if len(.inputValidity) > 0 {
		 = .AllocateBitmap()
	}

	 := .Allocate(.width * int())
	 := arrow.RunEndEncodedOf(arrow.GetDataType[](), .valueType)
	.Release()

	* = exec.ExecResult{
		Type:   ,
		Len:    .inputLen,
		Nulls:  0,
		Offset: 0,
		Children: []exec.ArraySpan{
			{
				Type: .RunEnds(),
				Len:  ,
			},
			{
				Type: .Encoded(),
				Len:  ,
			},
		},
	}

	.Children[0].Buffers[1].WrapBuffer()
	if  != nil {
		.Children[1].Buffers[0].WrapBuffer()
	}
	.Children[1].Buffers[1].WrapBuffer()
}

func ( *runEndEncodeFSB[]) ( *exec.ExecResult) int64 {
	 := arrow.GetData[](.Children[0].Buffers[1].Buf)
	 := .Children[1].Buffers[1].Buf

	 := .inputOffset
	,  := .readValue()
	++

	var  int64
	 := .Children[1].Buffers[0].Buf
	 := func( bool) {}
	if len() > 0 {
		 = func( bool) {
			bitutil.SetBitTo(, int(), )
		}
	}

	 := func( bool,  []byte) {
		()
		 :=  * int64(.width)
		copy([:], )
	}

	for  < .inputOffset+.inputLen {
		,  := .readValue()

		if  !=  || !bytes.Equal(, ) {
			(, )
			 := ( - .inputOffset)
			[] = 
			++
			,  = , 
		}

		++
	}

	(, )
	[] = (.inputLen)
	return  + 1
}

type runEndEncodeLoopBinary[ RunEndsType,  int32 | int64] struct {
	inputLen, inputOffset      int64
	inputValidity, inputValues []byte
	offsetValues               []
	valueType                  arrow.DataType

	estimatedValuesLen int64
}

func ( *runEndEncodeLoopBinary[, ]) ( int64) ([]byte, bool) {
	if len(.inputValidity) > 0 && bitutil.BitIsNotSet(.inputValidity, int(+.inputOffset)) {
		return nil, false
	}

	,  := .offsetValues[], .offsetValues[+1]
	return .inputValues[:], true
}

func ( *runEndEncodeLoopBinary[, ]) () (,  int64) {
	.estimatedValuesLen = 0
	// re.offsetValues already accounts for the input.Offset so we don't
	// need to use it as the initial value for `offset` here.
	var  int64
	,  := .readValue()
	++

	if  {
		 = 1
		.estimatedValuesLen += int64(len())
	}
	 = 1

	for  < .inputLen {
		,  := .readValue()
		++
		// new run
		if  !=  || !bytes.Equal(, ) {
			if  {
				.estimatedValuesLen += int64(len())
			}

			 = 
			 = 

			++
			if  {
				++
			}
		}
	}
	return
}

func ( *runEndEncodeLoopBinary[, ]) ( *exec.KernelCtx,  int64,  *exec.ExecResult) {
	 := .Allocate(int() * int(SizeOf[]()))
	var  *memory.Buffer
	if len(.inputValidity) > 0 {
		 = .AllocateBitmap()
	}

	 := .Allocate(int(.estimatedValuesLen))
	 := .Allocate(int(+1) * int(SizeOf[]()))

	 := arrow.RunEndEncodedOf(arrow.GetDataType[](), .valueType)
	* = exec.ExecResult{
		Type:   ,
		Len:    .inputLen,
		Nulls:  0,
		Offset: 0,
		Children: []exec.ArraySpan{
			{
				Type: .RunEnds(),
				Len:  ,
			},
			{
				Type: .Encoded(),
				Len:  ,
			},
		},
	}

	.Children[0].Buffers[1].WrapBuffer()
	if  != nil {
		.Children[1].Buffers[0].WrapBuffer()
	}
	.Children[1].Buffers[1].WrapBuffer()
	.Children[1].Buffers[2].WrapBuffer()
}

func ( *runEndEncodeLoopBinary[, ]) ( *exec.ExecResult) int64 {
	 := arrow.GetData[](.Children[0].Buffers[1].Buf)
	 := exec.GetSpanOffsets[](&.Children[1], 1)
	 := .Children[1].Buffers[2].Buf

	// re.offsetValues already accounts for the input.offset so we don't
	// need to initialize readOffset to re.inputOffset
	var  int64
	,  := .readValue()
	++

	var ,  int64
	 := .Children[1].Buffers[0].Buf
	 := func( bool) {}
	if len() > 0 {
		 = func( bool) {
			bitutil.SetBitTo(, int(), )
		}
	}

	[0],  = 0, [1:]

	 := func( bool,  []byte) {
		()
		 += int64(copy([:], ))
		[] = ()
	}

	for  < .inputLen {
		,  := .readValue()

		if  !=  || !bytes.Equal(, ) {
			(, )
			 := ()
			[] = 
			++
			,  = , 
		}
		++
	}

	(, )
	[] = (.inputLen)
	return  + 1
}

func validateRunEndType[ RunEndsType]( int64) error {
	 := MaxOf[]()
	if  > int64() {
		return fmt.Errorf("%w: cannot run-end encode arrays with more elements than the run end type can hold: %d",
			arrow.ErrInvalid, )
	}
	return nil
}

func createEncoder[ RunEndsType,  arrow.FixedWidthType]( *exec.ArraySpan) *runEndEncodeLoopFixedWidth[, ] {
	return &runEndEncodeLoopFixedWidth[, ]{
		inputLen:      .Len,
		inputOffset:   .Offset,
		inputValidity: .Buffers[0].Buf,
		inputValues:   .Buffers[1].Buf,
		valueType:     .Type,
		readValue:     readFixedWidthVal[],
		writeValue:    writeFixedWidthVal[],
	}
}

func createVarBinaryEncoder[ RunEndsType,  int32 | int64]( *exec.ArraySpan) *runEndEncodeLoopBinary[, ] {
	return &runEndEncodeLoopBinary[, ]{
		inputLen:      .Len,
		inputOffset:   .Offset,
		inputValidity: .Buffers[0].Buf,
		inputValues:   .Buffers[2].Buf,
		// exec.GetSpanOffsets applies input.Offset to the resulting slice
		offsetValues: exec.GetSpanOffsets[](, 1),
		valueType:    .Type,
	}
}

func newEncoder[ RunEndsType]( *exec.ArraySpan) encoder {
	switch .Type.ID() {
	case arrow.BOOL:
		return &runEndEncodeLoopFixedWidth[, bool]{
			inputLen:      .Len,
			inputOffset:   .Offset,
			inputValidity: .Buffers[0].Buf,
			inputValues:   .Buffers[1].Buf,
			valueType:     .Type,
			readValue:     readBoolVal,
			writeValue:    writeBoolVal,
		}
	// for the other fixed size types, we only need to
	// handle the different physical representations.
	case arrow.INT8, arrow.UINT8:
		return createEncoder[, uint8]()
	case arrow.INT16, arrow.UINT16:
		return createEncoder[, uint16]()
	case arrow.INT32, arrow.UINT32, arrow.DATE32,
		arrow.TIME32, arrow.INTERVAL_MONTHS:
		return createEncoder[, uint32]()
	case arrow.INT64, arrow.UINT64, arrow.DATE64,
		arrow.TIME64, arrow.DURATION, arrow.TIMESTAMP:
		return createEncoder[, uint64]()
	case arrow.FLOAT16:
		return createEncoder[, float16.Num]()
	case arrow.FLOAT32:
		return createEncoder[, float32]()
	case arrow.FLOAT64:
		return createEncoder[, float64]()
	case arrow.DECIMAL128:
		return createEncoder[, decimal128.Num]()
	case arrow.DECIMAL256:
		return createEncoder[, decimal256.Num]()
	case arrow.INTERVAL_DAY_TIME:
		return createEncoder[, arrow.DayTimeInterval]()
	case arrow.INTERVAL_MONTH_DAY_NANO:
		return createEncoder[, arrow.MonthDayNanoInterval]()
	case arrow.BINARY, arrow.STRING:
		return createVarBinaryEncoder[, int32]()
	case arrow.LARGE_BINARY, arrow.LARGE_STRING:
		return createVarBinaryEncoder[, int64]()
	case arrow.FIXED_SIZE_BINARY:
		return &runEndEncodeFSB[]{
			inputLen:      .Len,
			inputOffset:   .Offset,
			inputValidity: .Buffers[0].Buf,
			inputValues:   .Buffers[1].Buf,
			valueType:     .Type,
			width:         .Type.(*arrow.FixedSizeBinaryType).ByteWidth,
		}
	}
	return nil
}

type encoder interface {
	CountNumberOfRuns() (numValid, numOutput int64)
	PreallocOutput(*exec.KernelCtx, int64, *exec.ExecResult)
	WriteEncodedRuns(*exec.ExecResult) int64
}

func runEndEncodeImpl[ RunEndsType]( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	// first pass: count the number of runs
	var (
		      = &.Values[0].Array
		      = .Len
		 int64
		  int64
		           encoder
	)

	if  == 0 {
		 := arrow.RunEndEncodedOf(arrow.GetDataType[](), .Type)
		* = exec.ExecResult{
			Type: ,
			Children: []exec.ArraySpan{
				{Type: .RunEnds()}, {Type: .Encoded()},
			},
		}
		return nil
	}

	if  := validateRunEndType[]();  != nil {
		return 
	}

	 = newEncoder[]()
	,  = .CountNumberOfRuns()
	.PreallocOutput(, , )

	.Children[1].Nulls =  - 

	 := .WriteEncodedRuns()
	debug.Assert( == , "mismatch number of written values")
	return nil
}

func runEndEncodeExec( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	 := .State.(RunEndEncodeState).RunEndType
	switch .ID() {
	case arrow.INT16:
		return runEndEncodeImpl[int16](, , )
	case arrow.INT32:
		return runEndEncodeImpl[int32](, , )
	case arrow.INT64:
		return runEndEncodeImpl[int64](, , )
	}

	return fmt.Errorf("%w: bad run end type %s", arrow.ErrInvalid, )
}

type decodeBool[ RunEndsType] struct {
	inputLen, inputOffset int64
	inputRunEnds          []

	inputPhysicalOffset int64
	inputValidity       []byte
	inputValues         []byte
	inputValueOffset    int64
}

func ( *decodeBool[]) ( *exec.KernelCtx,  *exec.ExecResult) {
	* = exec.ExecResult{
		Type: arrow.FixedWidthTypes.Boolean,
		Len:  .inputLen,
	}

	if len(.inputValidity) != 0 {
		.Buffers[0].WrapBuffer(.AllocateBitmap(.inputLen))
	}

	.Buffers[1].WrapBuffer(.AllocateBitmap(.inputLen))
}

func ( *decodeBool[]) ( *exec.ExecResult) int64 {
	var (
		         int64
		,  int64
		        = .Buffers[1].Buf
		          = (.inputOffset)
		         = len(.inputValidity) != 0 && len(.Buffers[0].Buf) != 0
	)

	for ,  := range .inputRunEnds[.inputPhysicalOffset:] {
		,  = int64(-), 
		// if this run is a null, clear the bits and update writeOffset
		if  {
			if bitutil.BitIsNotSet(.inputValidity, int(.inputValueOffset+.inputPhysicalOffset)+) {
				bitutil.SetBitsTo(.Buffers[0].Buf, , , false)
				 += 
				continue
			}

			// if the output has a validity bitmap, update it with 1s
			bitutil.SetBitsTo(.Buffers[0].Buf, , , true)
		}

		// get the value for this run + where to start writing
		 := bitutil.BitIsSet(.inputValues, int(.inputValueOffset+.inputPhysicalOffset)+)
		bitutil.SetBitsTo(, , , )
		 += 
		 += 
	}

	return 
}

type decodeFixedWidth[ RunEndsType] struct {
	inputLen, inputOffset int64
	inputRunEnds          []

	inputPhysicalOffset int64
	inputValidity       []byte
	inputValues         []byte
	inputValueOffset    int64

	valueType arrow.DataType
}

func ( *decodeFixedWidth[]) ( *exec.KernelCtx,  *exec.ExecResult) {
	* = exec.ExecResult{
		Type: .valueType,
		Len:  .inputLen,
	}

	if len(.inputValidity) != 0 {
		.Buffers[0].WrapBuffer(.AllocateBitmap(.inputLen))
	}

	.Buffers[1].WrapBuffer(.Allocate(int(.inputLen) * .valueType.(arrow.FixedWidthDataType).Bytes()))
}

func ( *decodeFixedWidth[]) ( *exec.ExecResult) int64 {
	var (
		         int64
		,  int64
		        = .Buffers[1].Buf
		               = .valueType.(arrow.FixedWidthDataType).Bytes()
		         = .inputValues[(.inputValueOffset+.inputPhysicalOffset)*int64():]
		          = (.inputOffset)
		         = len(.inputValidity) != 0 && len(.Buffers[0].Buf) != 0
	)

	for ,  := range .inputRunEnds[.inputPhysicalOffset:] {
		,  = int64(-), 
		// if this run is a null, clear the bits and update writeOffset
		if  {
			if bitutil.BitIsNotSet(.inputValidity, int(.inputValueOffset+.inputPhysicalOffset)+) {
				bitutil.SetBitsTo(.Buffers[0].Buf, , , false)
				 += 
				continue
			}

			// if the output has a validity bitmap, update it with 1s
			bitutil.SetBitsTo(.Buffers[0].Buf, , , true)
		}

		// get the value for this run + where to start writing
		var (
			       = [* : (+1)*]
			 =  * int64()
		)
		 += 
		 += 

		// get the slice of our output buffer we want to fill
		// just incrementally duplicate the bytes until we've filled
		// the slice with runLength copies of the value
		 := [ : *int64()]
		copy(, )
		for  := ;  < len();  *= 2 {
			copy([:], [:])
		}
	}

	return 
}

type decodeBinary[ RunEndsType,  int32 | int64] struct {
	inputLen, inputLogicalOffset int64
	inputRunEnds                 []

	inputPhysicalOffset int64
	inputValuesOffset   int64
	inputValidity       []byte
	inputValues         []byte
	inputOffsets        []

	valueType arrow.DataType
}

func ( *decodeBinary[, ]) ( *exec.KernelCtx,  *exec.ExecResult) {
	var (
		  int64
		 = (.inputLogicalOffset)
		  int
	)

	for ,  := range .inputRunEnds[.inputPhysicalOffset:] {
		,  = int64(-), 

		 := .inputOffsets[.inputPhysicalOffset+int64()]
		 := .inputOffsets[.inputPhysicalOffset+int64()+1]

		 += int(-) * int()
	}

	* = exec.ExecResult{
		Type: .valueType,
		Len:  .inputLen,
	}

	if len(.inputValidity) != 0 {
		.Buffers[0].WrapBuffer(.AllocateBitmap(.inputLen))
	}

	.Buffers[1].WrapBuffer(.Allocate(int(.inputLen+1) * int(SizeOf[]())))
	.Buffers[2].WrapBuffer(.Allocate())
}

func ( *decodeBinary[, ]) ( *exec.ExecResult) int64 {
	var (
		,  int64
		,            int64
		                 = exec.GetSpanOffsets[](, 1)
		                  = .Buffers[2].Buf
		                    = (.inputLogicalOffset)
		                   = len(.inputValidity) != 0 && len(.Buffers[0].Buf) != 0
	)

	for ,  := range .inputRunEnds[.inputPhysicalOffset:] {
		,  = int64(-), 

		// if this run is a null, clear the bits and update writeOffset
		if  && bitutil.BitIsNotSet(.inputValidity, int(.inputValuesOffset+.inputPhysicalOffset)+) {
			bitutil.SetBitsTo(.Buffers[0].Buf, , , false)
		} else {
			 += 
			if  {
				bitutil.SetBitsTo(.Buffers[0].Buf, , , true)
			}
		}

		// get the value for this run + where to start writing
		// de.inputOffsets already accounts for inputOffset so we don't
		// need to add it here, we can just use the physicaloffset and that's
		// sufficient to get the correct values.
		var (
			 = .inputOffsets[.inputPhysicalOffset+int64()]
			   = .inputOffsets[.inputPhysicalOffset+int64()+1]
			 = .inputValues[:]

			 =  + int64(len()*int())
		)

		// get the slice of our output buffer we want to fill
		// just incrementally duplicate the bytes until we've filled
		// the slice with runLength copies of the value
		 := [:]
		copy(, )
		for  := len();  < len();  *= 2 {
			copy([:], [:])
		}

		for  := int64(0);  < ; ++ {
			[+] = ()
			 += int64(len())
		}

		 += 
	}

	[] = ()
	return 
}

type decoder interface {
	PreallocOutput(*exec.KernelCtx, *exec.ExecResult)
	ExpandAllRuns(*exec.ExecResult) int64
}

func newDecoder[ RunEndsType]( *exec.ArraySpan) decoder {
	 := (.Offset)
	 := exec.GetSpanValues[](&.Children[0], 1)
	 := sort.Search(len(), func( int) bool { return [] >  })

	switch dt := .Children[1].Type.(type) {
	case *arrow.BooleanType:
		return &decodeBool[]{
			inputLen:            .Len,
			inputOffset:         .Offset,
			inputValidity:       .Children[1].Buffers[0].Buf,
			inputValues:         .Children[1].Buffers[1].Buf,
			inputValueOffset:    .Children[1].Offset,
			inputPhysicalOffset: int64(),
			inputRunEnds:        ,
		}
	case *arrow.BinaryType, *arrow.StringType:
		return &decodeBinary[, int32]{
			inputLen:            .Len,
			inputLogicalOffset:  .Offset,
			inputRunEnds:        ,
			inputPhysicalOffset: int64(),
			inputValuesOffset:   .Children[1].Offset,
			inputValidity:       .Children[1].Buffers[0].Buf,
			inputValues:         .Children[1].Buffers[2].Buf,
			inputOffsets:        exec.GetSpanOffsets[int32](&.Children[1], 1),
			valueType:           .Children[1].Type,
		}
	case *arrow.LargeBinaryType, *arrow.LargeStringType:
		return &decodeBinary[, int64]{
			inputLen:            .Len,
			inputLogicalOffset:  .Offset,
			inputRunEnds:        ,
			inputPhysicalOffset: int64(),
			inputValuesOffset:   .Children[1].Offset,
			inputValidity:       .Children[1].Buffers[0].Buf,
			inputValues:         .Children[1].Buffers[2].Buf,
			inputOffsets:        exec.GetSpanOffsets[int64](&.Children[1], 1),
			valueType:           .Children[1].Type,
		}
	case arrow.FixedWidthDataType:
		return &decodeFixedWidth[]{
			inputLen:            .Len,
			inputOffset:         .Offset,
			inputRunEnds:        ,
			inputPhysicalOffset: int64(),
			inputValidity:       .Children[1].Buffers[0].Buf,
			inputValues:         .Children[1].Buffers[1].Buf,
			inputValueOffset:    .Children[1].Offset,
			valueType:           ,
		}
	}

	return nil
}

func runEndDecodeImpl[ RunEndsType]( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	 := &.Values[0].Array

	if .Len == 0 {
		return nil
	}

	 := newDecoder[]()
	.PreallocOutput(, )
	.Nulls = .Len - .ExpandAllRuns()
	return nil
}

func runEndDecodeExec( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	 := .Values[0].Type().(*arrow.RunEndEncodedType)
	switch .RunEnds().ID() {
	case arrow.INT16:
		return runEndDecodeImpl[int16](, , )
	case arrow.INT32:
		return runEndDecodeImpl[int32](, , )
	case arrow.INT64:
		return runEndDecodeImpl[int64](, , )
	}

	return fmt.Errorf("%w: bad run end type %s", arrow.ErrInvalid, .RunEnds())
}

func runEndEncodeOutputTypeResolver( *exec.KernelCtx,  []arrow.DataType) (arrow.DataType, error) {
	 := .State.(RunEndEncodeState).RunEndType
	return arrow.RunEndEncodedOf(, [0]), nil
}

func runEndDecodeOutputTypeResolver( *exec.KernelCtx,  []arrow.DataType) (arrow.DataType, error) {
	 := [0].(*arrow.RunEndEncodedType)
	return .Encoded(), nil
}

func () (,  []exec.VectorKernel) {
	 := exec.VectorKernel{
		NullHandling:        exec.NullNoOutput,
		MemAlloc:            exec.MemNoPrealloc,
		CanExecuteChunkWise: true,
		ExecFn:              runEndEncodeExec,
		OutputChunked:       true,
	}

	 := exec.VectorKernel{
		NullHandling:        exec.NullNoOutput,
		MemAlloc:            exec.MemNoPrealloc,
		CanExecuteChunkWise: true,
		ExecFn:              runEndDecodeExec,
		OutputChunked:       true,
	}

	.Init = exec.OptionsInit[RunEndEncodeState]

	,  = make([]exec.VectorKernel, 0), make([]exec.VectorKernel, 0)
	 := func( arrow.Type) {
		.Signature = &exec.KernelSignature{
			InputTypes: []exec.InputType{exec.NewIDInput()},
			OutType:    exec.NewComputedOutputType(runEndEncodeOutputTypeResolver),
		}
		 = append(, )

		.Signature = &exec.KernelSignature{
			InputTypes: []exec.InputType{exec.NewMatchedInput(
				exec.RunEndEncoded(exec.Integer(), exec.SameTypeID()))},
			OutType: exec.NewComputedOutputType(runEndDecodeOutputTypeResolver),
		}
		 = append(, )
	}

	for ,  := range primitiveTypes {
		(.ID())
	}
	(arrow.BOOL)

	 := []arrow.Type{
		arrow.FLOAT16, arrow.DECIMAL128, arrow.DECIMAL256,
		arrow.TIME32, arrow.TIME64, arrow.TIMESTAMP,
		arrow.INTERVAL_DAY_TIME, arrow.INTERVAL_MONTHS,
		arrow.INTERVAL_MONTH_DAY_NANO,
		arrow.FIXED_SIZE_BINARY,
	}

	for ,  := range  {
		()
	}

	return
}