// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18

package exec

import (
	
	

	
	
	
	
	
)

// BufferSpan is a lightweight Buffer holder for ArraySpans that does not
// take ownership of the underlying memory.Buffer at all or could be
// used to reference raw byte slices instead.
type BufferSpan struct {
	// Buf should be the byte slice representing this buffer, if this is
	// nil then this bufferspan should be considered empty.
	Buf []byte
	// Owner should point to an underlying parent memory.Buffer if this
	// memory is owned by a different, existing, buffer. Retain is not
	// called on this buffer, so it must not be released as long as
	// this BufferSpan refers to it.
	Owner *memory.Buffer
	// SelfAlloc tracks whether or not this bufferspan is the only owner
	// of the Owning memory.Buffer. This happens when preallocating
	// memory or if a kernel allocates it's own buffer for a result.
	// In these cases, we have to know so we can properly maintain the
	// refcount if this is later turned into an ArrayData object.
	SelfAlloc bool
}

// SetBuffer sets the given buffer into this BufferSpan and marks
// SelfAlloc as false. This should be called when setting a buffer
// that is externally owned/created.
func ( *BufferSpan) ( *memory.Buffer) {
	.Buf = .Bytes()
	.Owner = 
	.SelfAlloc = false
}

// WrapBuffer wraps this bufferspan around a buffer and marks
// SelfAlloc as true. This should be called when setting a buffer
// that was allocated as part of an execution rather than just
// re-using an existing buffer from an input array.
func ( *BufferSpan) ( *memory.Buffer) {
	.Buf = .Bytes()
	.Owner = 
	.SelfAlloc = true
}

// ArraySpan is a light-weight, non-owning version of arrow.ArrayData
// for more efficient handling with computation and engines. We use
// explicit go Arrays to define the buffers and some scratch space
// for easily populating and shifting around pointers to memory without
// having to worry about and deal with retain/release during calculations.
type ArraySpan struct {
	Type    arrow.DataType
	Len     int64
	Nulls   int64
	Offset  int64
	Buffers [3]BufferSpan

	// Scratch is a holding spot for things such as
	// offsets or union type codes when converting from scalars
	Scratch [2]uint64

	Children []ArraySpan
}

// if an error is encountered, call Release on a preallocated span
// to ensure it releases any self-allocated buffers, it will
// not call release on buffers it doesn't own (SelfAlloc != true)
func ( *ArraySpan) () {
	for ,  := range .Children {
		.()
	}

	for ,  := range .Buffers {
		if .SelfAlloc {
			.Owner.Release()
		}
	}
}

func ( *ArraySpan) () bool {
	return atomic.LoadInt64(&.Nulls) != 0 && .Buffers[0].Buf != nil
}

// UpdateNullCount will count the bits in the null bitmap and update the
// number of nulls if the current null count is unknown, otherwise it just
// returns the value of a.Nulls
func ( *ArraySpan) () int64 {
	 := atomic.LoadInt64(&.Nulls)
	if  != array.UnknownNullCount {
		return 
	}

	 := .Len - int64(bitutil.CountSetBits(.Buffers[0].Buf, int(.Offset), int(.Len)))
	atomic.StoreInt64(&.Nulls, )
	return 
}

// Dictionary returns a pointer to the array span for the dictionary which
// we will always place as the first (and only) child if it exists.
func ( *ArraySpan) () *ArraySpan { return &.Children[0] }

// NumBuffers returns the number of expected buffers for this type
func ( *ArraySpan) () int { return getNumBuffers(.Type) }

// MakeData generates an arrow.ArrayData object for this ArraySpan,
// properly updating the buffer ref count if necessary.
func ( *ArraySpan) () arrow.ArrayData {
	var  [3]*memory.Buffer
	for  := range  {
		 := .GetBuffer()
		[] = 
		if  != nil && .Buffers[].SelfAlloc {
			// if this buffer is just a pointer to another existing buffer
			// then we never bumped the refcount for that buffer.
			// As a result, we won't call release here so that the call
			// to array.NewData properly updates the ref counts of the buffers.
			// If instead this buffer was allocated during calculation
			// (such as during prealloc or by a kernel itself)
			// then we need to release after we create the ArrayData so that it
			// maintains the correct refcount of 1, giving the resulting
			// ArrayData object ownership of this buffer.
			defer .Release()
		}
	}

	var (
		    = int(atomic.LoadInt64(&.Nulls))
		   = int(.Len)
		      = int(.Offset)
		       = .Type
		 []arrow.ArrayData
	)

	if .Type.ID() == arrow.NULL {
		 = 
	} else if len(.Buffers[0].Buf) == 0 {
		 = 0
	}

	// we use a.Type for the NewData call at the end, so we can
	// handle extension types by using dt to point to the storage type
	// and let the proper extension type get set into the ArrayData
	// object we return.
	if .ID() == arrow.EXTENSION {
		 = .(arrow.ExtensionType).StorageType()
	}

	if .ID() == arrow.DICTIONARY {
		 := array.NewData(.Type, , [:.NumBuffers()], nil, , )
		 := .Dictionary().()
		defer .Release()
		.SetDictionary()
		return 
	} else if .ID() == arrow.DENSE_UNION || .ID() == arrow.SPARSE_UNION {
		[0] = nil
		 = 0
	}

	if len(.Children) > 0 {
		 = make([]arrow.ArrayData, len(.Children))
		for ,  := range .Children {
			 := .()
			defer .Release()
			[] = 
		}
	}
	return array.NewData(.Type, , [:.NumBuffers()], , , )
}

// MakeArray is a convenience function for calling array.MakeFromData(a.MakeData())
func ( *ArraySpan) () arrow.Array {
	 := .MakeData()
	defer .Release()
	return array.MakeFromData()
}

// SetSlice updates the offset and length of this ArraySpan to refer to
// a specific slice of the underlying buffers.
func ( *ArraySpan) (,  int64) {
	if  == .Offset &&  == .Len {
		// don't modify the nulls if the slice is the entire span
		return
	}

	if .Type.ID() != arrow.NULL {
		if .Nulls != 0 {
			if .Nulls == .Len {
				.Nulls = 
			} else {
				.Nulls = array.UnknownNullCount
			}
		}
	} else {
		.Nulls = 
	}

	.Offset, .Len = , 
}

// GetBuffer returns the buffer for the requested index. If this buffer
// is owned by another array/arrayspan the Owning buffer is returned,
// otherwise if this slice has no owning buffer, we call NewBufferBytes
// to wrap it as a memory.Buffer. Can also return nil if there is no
// buffer in this index.
func ( *ArraySpan) ( int) *memory.Buffer {
	 := .Buffers[]
	switch {
	case .Owner != nil:
		return .Owner
	case .Buf != nil:
		return memory.NewBufferBytes(.Buf)
	}
	return nil
}

// convenience function to resize the children slice if necessary,
// or just shrink the slice without re-allocating if there's enough
// capacity already.
func ( *ArraySpan) ( int) {
	if cap(.Children) >=  {
		.Children = .Children[:]
	} else {
		.Children = make([]ArraySpan, )
	}
}

// FillFromScalar populates this ArraySpan as if it were a 1 length array
// with the single value equal to the passed in Scalar.
func ( *ArraySpan) ( scalar.Scalar) {
	var (
		  byte = 0x01
		 byte = 0x00
	)

	.Type = .DataType()
	.Len = 1
	 := .Type.ID()
	if .IsValid() {
		.Nulls = 0
	} else {
		.Nulls = 1
	}

	if !arrow.IsUnion() &&  != arrow.NULL {
		if .IsValid() {
			.Buffers[0].Buf = []byte{}
		} else {
			.Buffers[0].Buf = []byte{}
		}
		.Buffers[0].Owner = nil
		.Buffers[0].SelfAlloc = false
	}

	switch {
	case  == arrow.BOOL:
		if .(*scalar.Boolean).Value {
			.Buffers[1].Buf = []byte{}
		} else {
			.Buffers[1].Buf = []byte{}
		}
		.Buffers[1].Owner = nil
		.Buffers[1].SelfAlloc = false
	case arrow.IsPrimitive() || arrow.IsDecimal():
		 := .(scalar.PrimitiveScalar)
		.Buffers[1].Buf = .Data()
		.Buffers[1].Owner = nil
		.Buffers[1].SelfAlloc = false
	case  == arrow.DICTIONARY:
		 := .(scalar.PrimitiveScalar)
		.Buffers[1].Buf = .Data()
		.Buffers[1].Owner = nil
		.Buffers[1].SelfAlloc = false
		.resizeChildren(1)
		.Children[0].SetMembers(.(*scalar.Dictionary).Value.Dict.Data())
	case arrow.IsBaseBinary():
		 := .(scalar.BinaryScalar)
		.Buffers[1].Buf = arrow.Uint64Traits.CastToBytes(.Scratch[:])
		.Buffers[1].Owner = nil
		.Buffers[1].SelfAlloc = false

		var  []byte
		if .IsValid() {
			 = .Data()
			.Buffers[2].Owner = .Buffer()
			.Buffers[2].SelfAlloc = false
		}
		if arrow.IsBinaryLike() {
			setOffsetsForScalar(,
				unsafe.Slice((*int32)(unsafe.Pointer(&.Scratch[0])), 2),
				int64(len()), 1)
		} else {
			// large_binary_like
			setOffsetsForScalar(,
				unsafe.Slice((*int64)(unsafe.Pointer(&.Scratch[0])), 2),
				int64(len()), 1)
		}
		.Buffers[2].Buf = 
	case  == arrow.FIXED_SIZE_BINARY:
		 := .(scalar.BinaryScalar)
		if !.IsValid() {
			.Buffers[1].Buf = make([]byte, .DataType().(*arrow.FixedSizeBinaryType).ByteWidth)
			.Buffers[1].Owner = nil
			.Buffers[1].SelfAlloc = false
			break
		}
		.Buffers[1].Buf = .Data()
		.Buffers[1].Owner = .Buffer()
		.Buffers[1].SelfAlloc = false
	case arrow.IsListLike():
		 := .(scalar.ListScalar)
		 := 0
		.resizeChildren(1)

		if .GetList() != nil {
			.Children[0].SetMembers(.GetList().Data())
			 = .GetList().Len()
		} else {
			// even when the value is null, we must populate
			// child data to yield a valid array. ugh
			FillZeroLength(.DataType().(arrow.NestedType).Fields()[0].Type, &.Children[0])
		}

		switch  {
		case arrow.LIST, arrow.MAP:
			setOffsetsForScalar(,
				unsafe.Slice((*int32)(unsafe.Pointer(&.Scratch[0])), 2),
				int64(), 1)
		case arrow.LARGE_LIST:
			setOffsetsForScalar(,
				unsafe.Slice((*int64)(unsafe.Pointer(&.Scratch[0])), 2),
				int64(), 1)
		default:
			// fixed size list has no second buffer
			.Buffers[1].Buf, .Buffers[1].Owner = nil, nil
			.Buffers[1].SelfAlloc = false
		}
	case  == arrow.STRUCT:
		 := .(*scalar.Struct)
		.Buffers[1].Buf = nil
		.Buffers[1].Owner = nil
		.Buffers[1].SelfAlloc = false
		.resizeChildren(len(.Value))
		for ,  := range .Value {
			.Children[].()
		}
	case arrow.IsUnion():
		// first buffer is kept null since unions have no validity vector
		.Buffers[0].Buf, .Buffers[0].Owner = nil, nil
		.Buffers[0].SelfAlloc = false

		.Buffers[1].Buf = arrow.Uint64Traits.CastToBytes(.Scratch[:])[:1]
		.Buffers[1].Owner = nil
		.Buffers[1].SelfAlloc = false
		 := unsafe.Slice((*arrow.UnionTypeCode)(unsafe.Pointer(&.Buffers[1].Buf[0])), 1)

		.resizeChildren(len(.Type.(arrow.UnionType).Fields()))
		switch sc := .(type) {
		case *scalar.DenseUnion:
			[0] = .TypeCode
			// has offset, start 4 bytes in so it's aligned to the 32-bit boundaries
			 := unsafe.Slice((*int32)(unsafe.Add(unsafe.Pointer(&.Scratch[0]), arrow.Int32SizeBytes)), 2)
			setOffsetsForScalar(, , 1, 2)
			// we can't "see" the other arrays in the union, but we put the "active"
			// union array in the right place and fill zero-length arrays for
			// the others.
			 := .Type.(arrow.UnionType).ChildIDs()
			for ,  := range .Type.(arrow.UnionType).Fields() {
				if  == [.TypeCode] {
					.Children[].(.Value)
				} else {
					FillZeroLength(.Type, &.Children[])
				}
			}
		case *scalar.SparseUnion:
			[0] = .TypeCode
			// sparse union scalars have a full complement of child values
			// even though only one of them is relevant, so we just fill them
			// in here
			for ,  := range .Value {
				.Children[].()
			}
		}
	case  == arrow.EXTENSION:
		// pass through storage
		 := .(*scalar.Extension)
		.(.Value)
		// restore the extension type
		.Type = .DataType()
	case  == arrow.NULL:
		for  := range .Buffers {
			.Buffers[].Buf = nil
			.Buffers[].Owner = nil
			.Buffers[].SelfAlloc = false
		}
	}
}

func ( *ArraySpan) ( *ArraySpan) {
	.resizeChildren(1)
	.Children[0].Release()
	.Children[0] = *
}

// TakeOwnership is like SetMembers only this takes ownership of
// the buffers by calling Retain on them so that the passed in
// ArrayData can be released without negatively affecting this
// ArraySpan
func ( *ArraySpan) ( arrow.ArrayData) {
	.Type = .DataType()
	.Len = int64(.Len())
	if .Type.ID() == arrow.NULL {
		.Nulls = .Len
	} else {
		.Nulls = int64(.NullN())
	}
	.Offset = int64(.Offset())

	for ,  := range .Buffers() {
		if  != nil {
			.Buffers[].WrapBuffer()
			.Retain()
		} else {
			.Buffers[].Buf = nil
			.Buffers[].Owner = nil
			.Buffers[].SelfAlloc = false
		}
	}

	 := .Type.ID()
	if .Buffers[0].Buf == nil {
		switch  {
		case arrow.NULL, arrow.SPARSE_UNION, arrow.DENSE_UNION:
		default:
			// should already be zero, but we make sure
			.Nulls = 0
		}
	}

	for  := len(.Buffers());  < 3; ++ {
		.Buffers[].Buf = nil
		.Buffers[].Owner = nil
		.Buffers[].SelfAlloc = false
	}

	if  == arrow.DICTIONARY {
		.resizeChildren(1)
		 := .Dictionary()
		if  != (*array.Data)(nil) {
			.Children[0].()
		}
	} else {
		.resizeChildren(len(.Children()))
		for ,  := range .Children() {
			.Children[].()
		}
	}
}

// SetMembers populates this ArraySpan from the given ArrayData object.
// As this is a non-owning reference, the ArrayData object must not
// be fully released while this ArraySpan is in use, otherwise any buffers
// referenced will be released too
func ( *ArraySpan) ( arrow.ArrayData) {
	.Type = .DataType()
	.Len = int64(.Len())
	if .Type.ID() == arrow.NULL {
		.Nulls = .Len
	} else {
		.Nulls = int64(.NullN())
	}
	.Offset = int64(.Offset())

	for ,  := range .Buffers() {
		if  != nil {
			.Buffers[].SetBuffer()
		} else {
			.Buffers[].Buf = nil
			.Buffers[].Owner = nil
			.Buffers[].SelfAlloc = false
		}
	}

	 := .Type.ID()
	if .Buffers[0].Buf == nil {
		switch  {
		case arrow.NULL, arrow.SPARSE_UNION, arrow.DENSE_UNION:
		default:
			// should already be zero, but we make sure
			.Nulls = 0
		}
	}

	for  := len(.Buffers());  < 3; ++ {
		.Buffers[].Buf = nil
		.Buffers[].Owner = nil
		.Buffers[].SelfAlloc = false
	}

	if  == arrow.DICTIONARY {
		.resizeChildren(1)
		 := .Dictionary()
		if  != (*array.Data)(nil) {
			.Children[0].()
		}
	} else {
		if cap(.Children) >= len(.Children()) {
			.Children = .Children[:len(.Children())]
		} else {
			.Children = make([]ArraySpan, len(.Children()))
		}
		for ,  := range .Children() {
			.Children[].()
		}
	}
}

// ExecValue represents a single input to an execution which could
// be either an Array (ArraySpan) or a Scalar value
type ExecValue struct {
	Array  ArraySpan
	Scalar scalar.Scalar
}

func ( *ExecValue) () bool  { return .Scalar == nil }
func ( *ExecValue) () bool { return !.IsArray() }

func ( *ExecValue) () arrow.DataType {
	if .IsArray() {
		return .Array.Type
	}
	return .Scalar.DataType()
}

// ExecResult is the result of a kernel execution and should be populated
// by the execution functions and/or a kernel. For now we're just going to
// alias an ArraySpan.
type ExecResult = ArraySpan

// ExecSpan represents a slice of inputs and is used to provide slices
// of input values to iterate over.
//
// Len is the length of the span (all elements in Values should either
// be scalar or an array with a length + offset of at least Len).
type ExecSpan struct {
	Len    int64
	Values []ExecValue
}

func getNumBuffers( arrow.DataType) int {
	switch .ID() {
	case arrow.RUN_END_ENCODED:
		return 0
	case arrow.NULL, arrow.STRUCT, arrow.FIXED_SIZE_LIST:
		return 1
	case arrow.BINARY, arrow.LARGE_BINARY, arrow.STRING, arrow.LARGE_STRING, arrow.DENSE_UNION:
		return 3
	case arrow.EXTENSION:
		return (.(arrow.ExtensionType).StorageType())
	default:
		return 2
	}
}

// FillZeroLength fills an ArraySpan with the appropriate information for
// a Zero Length Array of the provided type.
func ( arrow.DataType,  *ArraySpan) {
	.Scratch[0], .Scratch[1] = 0, 0
	.Type = 
	.Len = 0
	 := getNumBuffers()
	for  := 0;  < ; ++ {
		.Buffers[].Buf = arrow.Uint64Traits.CastToBytes(.Scratch[:])[:0]
		.Buffers[].Owner = nil
	}

	for  := ;  < 3; ++ {
		.Buffers[].Buf, .Buffers[].Owner = nil, nil
	}

	if .ID() == arrow.DICTIONARY {
		.resizeChildren(1)
		(.(*arrow.DictionaryType).ValueType, &.Children[0])
		return
	}

	,  := .(arrow.NestedType)
	if ! {
		if len(.Children) > 0 {
			.Children = .Children[:0]
		}
		return
	}

	.resizeChildren(.NumFields())
	for ,  := range .Fields() {
		(.Type, &.Children[])
	}
}

// PromoteExecSpanScalars promotes the values of the passed in ExecSpan
// from scalars to Arrays of length 1 for each value.
func ( ExecSpan) {
	for  := range .Values {
		if .Values[].Scalar != nil {
			.Values[].Array.FillFromScalar(.Values[].Scalar)
			.Values[].Scalar = nil
		}
	}
}