// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18

package kernels

import (
	
	
	

	
	
	
	
	
	
)

func validateUtf8Fsb( *exec.ArraySpan) error {
	var (
		 = .Buffers[1].Buf
		     = int64(.Type.(*arrow.FixedSizeBinaryType).ByteWidth)
		    = .Buffers[0].Buf
	)

	return bitutils.VisitBitBlocksShort(, .Offset, .Len,
		func( int64) error {
			 += .Offset
			 :=  * 
			 := ( + 1) * 
			if !utf8.Valid([:]) {
				return fmt.Errorf("%w: invalid UTF8 bytes: %x", arrow.ErrInvalid, [:])
			}
			return nil
		}, func() error { return nil })
}

func validateUtf8[ int32 | int64]( *exec.ArraySpan) error {
	var (
		 = exec.GetSpanOffsets[](, 1)
		    = .Buffers[2].Buf
		       = .Buffers[0].Buf
	)

	return bitutils.VisitBitBlocksShort(, .Offset, .Len,
		func( int64) error {
			 := [[]:[+1]]
			if !utf8.Valid() {
				return fmt.Errorf("%w: invalid UTF8 bytes: %x", arrow.ErrInvalid, )
			}
			return nil
		}, func() error { return nil })
}

func ( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	 := .Values[0].Array.Type.(*arrow.FixedSizeBinaryType).ByteWidth
	 := .State.(CastState).ToType.(*arrow.FixedSizeBinaryType).ByteWidth

	if  !=  {
		return fmt.Errorf("%w: failed casting from %s to %s: widths must match",
			arrow.ErrInvalid, .Values[0].Array.Type, .Type)
	}

	return ZeroCopyCastExec(, , )
}

func [,  int32 | int64]( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	 := .State.(CastState)
	 := &.Values[0].Array

	if !.Type.(arrow.BinaryDataType).IsUtf8() && .Type.(arrow.BinaryDataType).IsUtf8() && !.AllowInvalidUtf8 {
		if  := validateUtf8[]();  != nil {
			return 
		}
	}

	// start with a zero-copy cast, then change the indices to the
	// expected size
	if  := ZeroCopyCastExec(, , );  != nil {
		return 
	}

	switch {
	case SizeOf[]() == SizeOf[]():
		// offsets are the same width, nothing more to do
		return nil
	case SizeOf[]() > SizeOf[]():
		// downcast from int64 -> int32
		 := exec.GetSpanOffsets[](, 1)

		// binary offsets are ascending, so it's enough to check
		// the last one for overflow
		if [.Len] > (MaxOf[]()) {
			return fmt.Errorf("%w: failed casting from %s to %s: input array too large",
				arrow.ErrInvalid, .Type, .Type)
		}

		 := .Allocate(.Type.(arrow.OffsetsDataType).OffsetTypeTraits().BytesRequired(int(.Len + .Offset + 1)))
		.Buffers[1].WrapBuffer()

		 := exec.GetSpanOffsets[](, 1)

		castNumericUnsafe(arrow.INT64, arrow.INT32,
			arrow.GetBytes(), arrow.GetBytes(), len())
		return nil
	default:
		// upcast from int32 -> int64
		 := .Allocate(.Type.(arrow.OffsetsDataType).OffsetTypeTraits().BytesRequired(int(.Len + .Offset + 1)))
		.Buffers[1].WrapBuffer()

		 := exec.GetSpanOffsets[](, 1)
		 := exec.GetSpanOffsets[](, 1)

		castNumericUnsafe(arrow.INT32, arrow.INT64,
			arrow.GetBytes(), arrow.GetBytes(), len())
		return nil
	}
}

func [ int32 | int64]( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	 := .State.(CastState)
	 := &.Values[0].Array

	if .Type.(arrow.BinaryDataType).IsUtf8() && !.AllowInvalidUtf8 {
		if  := validateUtf8Fsb();  != nil {
			return 
		}
	}

	// check for overflow
	 := int64(MaxOf[]())
	 := (.Type.(*arrow.FixedSizeBinaryType).ByteWidth)
	if (int64() * .Len) >  {
		return fmt.Errorf("%w: failed casting from %s to %s: input array too large",
			arrow.ErrInvalid, .Type, .Type)
	}

	.Len = .Len
	.Nulls = .Nulls
	if .Offset == .Offset {
		.Buffers[0].SetBuffer(.GetBuffer(0))
	} else {
		.Buffers[0].WrapBuffer(.AllocateBitmap(.Len))
		bitutil.CopyBitmap(.Buffers[0].Buf, int(.Offset), int(.Len), .Buffers[0].Buf, int(.Offset))
	}

	// this buffer is preallocated
	 := exec.GetSpanOffsets[](, 1)
	[0] = (.Offset) * 
	for  := 0;  < int(.Len); ++ {
		[+1] = [] + 
	}

	if len(.Buffers[1].Buf) > 0 {
		.Buffers[2] = .Buffers[1]
	}

	return nil
}

func addBinaryToBinaryCast[,  int32 | int64]( arrow.Type,  exec.OutputType) exec.ScalarKernel {
	return exec.NewScalarKernel([]exec.InputType{exec.NewIDInput()},
		, CastBinaryToBinary[, ], nil)
}

func addToBinaryKernels[ int32 | int64]( exec.OutputType,  []exec.ScalarKernel) []exec.ScalarKernel {
	return append(,
		addBinaryToBinaryCast[int32, ](arrow.STRING, ),
		addBinaryToBinaryCast[int32, ](arrow.BINARY, ),
		addBinaryToBinaryCast[int64, ](arrow.LARGE_STRING, ),
		addBinaryToBinaryCast[int64, ](arrow.LARGE_BINARY, ),
		exec.NewScalarKernel([]exec.InputType{exec.NewIDInput(arrow.FIXED_SIZE_BINARY)},
			, CastFsbToBinary[], nil),
	)
}

func () []exec.ScalarKernel {
	 := exec.NewComputedOutputType(resolveOutputFromOptions)
	 := GetCommonCastKernels(arrow.FIXED_SIZE_BINARY, )
	 := exec.NewScalarKernel([]exec.InputType{exec.NewIDInput(arrow.FIXED_SIZE_BINARY)},
		OutputFirstType, CastFsbToFsb, nil)
	.NullHandling = exec.NullComputedNoPrealloc
	return append(, )
}

func float16Formatter( float16.Num) string                 { return .String() }
func date32Formatter( arrow.Date32) string                 { return .FormattedString() }
func date64Formatter( arrow.Date64) string                 { return .FormattedString() }
func numericFormatterSigned[ arrow.IntType]( ) string    { return strconv.FormatInt(int64(), 10) }
func numericFormatterUnsigned[ arrow.UintType]( ) string { return strconv.FormatUint(uint64(), 10) }
func float32Formatter( float32) string                     { return strconv.FormatFloat(float64(), 'g', -1, 32) }
func float64Formatter( float64) string                     { return strconv.FormatFloat(, 'g', -1, 64) }

func boolToStringCastExec( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		 = &.Values[0].Array
		  = array.NewBuilder(exec.GetAllocator(.Ctx), .Type).(array.StringLikeBuilder)
	)
	defer .Release()

	bitutils.VisitBitBlocks(.Buffers[0].Buf, .Offset, .Len,
		func( int64) {
			.Append(strconv.FormatBool(bitutil.BitIsSet(.Buffers[1].Buf, int())))
		}, func() { .AppendNull() })

	 := .NewArray()
	.TakeOwnership(.Data())
	return nil
}

type timeIntrinsic interface {
	arrow.Time32 | arrow.Time64
	FormattedString(arrow.TimeUnit) string
}

func timeToStringCastExec[ timeIntrinsic]( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		     = &.Values[0].Array
		 = exec.GetSpanValues[](, 1)
		      = array.NewBuilder(exec.GetAllocator(.Ctx), .Type).(array.StringLikeBuilder)
		 = .Type.(arrow.TemporalWithUnit)
	)
	defer .Release()

	bitutils.VisitBitBlocks(.Buffers[0].Buf, .Offset, .Len,
		func( int64) {
			.Append([].FormattedString(.TimeUnit()))
		}, func() { .AppendNull() })

	 := .NewArray()
	.TakeOwnership(.Data())
	return nil
}

func numericToStringCastExec[ arrow.IntType | arrow.UintType | arrow.FloatType]( func() string) exec.ArrayKernelExec {
	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		var (
			     = &.Values[0].Array
			 = exec.GetSpanValues[](, 1)
			      = array.NewBuilder(exec.GetAllocator(.Ctx), .Type).(array.StringLikeBuilder)
		)
		defer .Release()

		bitutils.VisitBitBlocks(.Buffers[0].Buf, .Offset, .Len,
			func( int64) {
				.Append(([]))
			}, func() { .AppendNull() })

		 := .NewArray()
		.TakeOwnership(.Data())
		return nil
	}
}

func castTimestampToString( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	var (
		     = &.Values[0].Array
		 = exec.GetSpanValues[arrow.Timestamp](, 1)
		 = .Type.(*arrow.TimestampType)
		      = array.NewBuilder(exec.GetAllocator(.Ctx), .Type).(array.StringLikeBuilder)
	)
	defer .Release()

	,  := .GetToTimeFunc()
	if  != nil {
		return 
	}

	// prealloc
	 := "2006-01-02 15:04:05"
	switch .Unit {
	case arrow.Millisecond:
		 += ".000"
	case arrow.Microsecond:
		 += ".000000"
	case arrow.Nanosecond:
		 += ".000000000"
	}

	switch .TimeZone {
	case "UTC":
		 += "Z"
	case "":
	default:
		 += "-0700"
	}

	 := len()
	.Reserve(int(.Len))
	.ReserveData(int(.Len-.Nulls) * )

	bitutils.VisitBitBlocks(.Buffers[0].Buf, .Offset, .Len,
		func( int64) {
			.Append(([]).Format())
		},
		func() { .AppendNull() })

	 := .NewArray()
	.TakeOwnership(.Data())
	return nil
}

func getNumericToStringCastExec( arrow.Type) exec.ArrayKernelExec {
	switch  {
	case arrow.INT8:
		return numericToStringCastExec(numericFormatterSigned[int8])
	case arrow.UINT8:
		return numericToStringCastExec(numericFormatterUnsigned[uint8])
	case arrow.INT16:
		return numericToStringCastExec(numericFormatterSigned[int16])
	case arrow.UINT16:
		return numericToStringCastExec(numericFormatterUnsigned[uint16])
	case arrow.INT32:
		return numericToStringCastExec(numericFormatterSigned[int32])
	case arrow.UINT32:
		return numericToStringCastExec(numericFormatterUnsigned[uint32])
	case arrow.INT64:
		return numericToStringCastExec(numericFormatterSigned[int64])
	case arrow.UINT64:
		return numericToStringCastExec(numericFormatterUnsigned[uint64])
	case arrow.FLOAT16:
		return numericToStringCastExec(float16Formatter)
	case arrow.FLOAT32:
		return numericToStringCastExec(float32Formatter)
	case arrow.FLOAT64:
		return numericToStringCastExec(float64Formatter)
	case arrow.BOOL:
		return boolToStringCastExec
	case arrow.DATE32:
		return numericToStringCastExec(date32Formatter)
	case arrow.DATE64:
		return numericToStringCastExec(date64Formatter)
	case arrow.TIME32:
		return timeToStringCastExec[arrow.Time32]
	case arrow.TIME64:
		return timeToStringCastExec[arrow.Time64]
	case arrow.TIMESTAMP:
		return castTimestampToString
	}
	panic("unimplemented cast: " + .String())
}

func addNumericAndTemporalToStringCasts( exec.OutputType,  []exec.ScalarKernel) []exec.ScalarKernel {
	 := exec.NewScalarKernel([]exec.InputType{exec.NewExactInput(arrow.FixedWidthTypes.Boolean)}, ,
		getNumericToStringCastExec(arrow.BOOL), nil)
	.NullHandling = exec.NullComputedNoPrealloc
	 = append(, )

	for ,  := range numericTypes {
		 = exec.NewScalarKernel([]exec.InputType{exec.NewExactInput()}, ,
			getNumericToStringCastExec(.ID()), nil)
		.NullHandling = exec.NullComputedNoPrealloc
		 = append(, )
	}

	for ,  := range []arrow.DataType{arrow.FixedWidthTypes.Date32, arrow.FixedWidthTypes.Date64} {
		 = exec.NewScalarKernel([]exec.InputType{exec.NewExactInput()}, ,
			getNumericToStringCastExec(.ID()), nil)
		.NullHandling = exec.NullComputedNoPrealloc
		 = append(, )
	}

	for ,  := range []arrow.Type{arrow.TIME32, arrow.TIME64, arrow.TIMESTAMP} {
		 = exec.NewScalarKernel([]exec.InputType{exec.NewIDInput()}, ,
			getNumericToStringCastExec(), nil)
		.NullHandling = exec.NullComputedNoPrealloc
		 = append(, )
	}

	return 
}

func ( arrow.DataType) []exec.ScalarKernel {
	if .ID() == arrow.FIXED_SIZE_BINARY {
		return nil
	}

	 := exec.NewOutputType()
	 := GetCommonCastKernels(.ID(), )

	switch .ID() {
	case arrow.BINARY:
		return addToBinaryKernels[int32](, )
	case arrow.LARGE_BINARY:
		return addToBinaryKernels[int64](, )
	case arrow.STRING:
		 = addToBinaryKernels[int32](, )
		return addNumericAndTemporalToStringCasts(, )
	case arrow.LARGE_STRING:
		 = addToBinaryKernels[int64](, )
		return addNumericAndTemporalToStringCasts(, )
	}
	return nil
}