// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kernels

import (
	

	
	
	
	
	
	
	
)

type NullMatchingBehavior int8

const (
	NullMatchingMatch NullMatchingBehavior = iota
	NullMatchingSkip
	NullMatchingEmitNull
	NullMatchingInconclusive
)

func visitBinary[ int32 | int64]( *exec.ArraySpan,  func([]byte) error,  func() error) error {
	if .Len == 0 {
		return nil
	}

	 := .Buffers[2].Buf
	 := exec.GetSpanOffsets[](, 1)
	return bitutils.VisitBitBlocksShort(.Buffers[0].Buf, .Offset, .Len,
		func( int64) error {
			return ([[]:[+1]])
		}, )
}

func visitNumeric[ arrow.FixedWidthType]( *exec.ArraySpan,  func() error,  func() error) error {
	if .Len == 0 {
		return nil
	}

	 := exec.GetSpanValues[](, 1)
	return bitutils.VisitBitBlocksShort(.Buffers[0].Buf, .Offset, .Len,
		func( int64) error {
			return ([])
		}, )
}

func visitFSB( *exec.ArraySpan,  func([]byte) error,  func() error) error {
	if .Len == 0 {
		return nil
	}

	 := int64(.Type.(arrow.FixedWidthDataType).Bytes())
	 := .Buffers[1].Buf

	return bitutils.VisitBitBlocksShort(.Buffers[0].Buf, .Offset, .Len,
		func( int64) error {
			return ([* : (+1)*])
		}, )
}

type SetLookupOptions struct {
	ValueSetType arrow.DataType
	TotalLen     int64
	ValueSet     []exec.ArraySpan
	NullBehavior NullMatchingBehavior
}

type lookupState interface {
	Init(SetLookupOptions) error
}

func ( SetLookupOptions,  memory.Allocator) (exec.KernelState, error) {
	 := .ValueSetType
	if .ID() == arrow.EXTENSION {
		 = .(arrow.ExtensionType).StorageType()
	}

	var  lookupState
	switch ty := .(type) {
	case arrow.BinaryDataType:
		switch .Layout().Buffers[1].ByteWidth {
		case 4:
			 = &SetLookupState[[]byte]{
				Alloc:   ,
				visitFn: visitBinary[int32],
			}
		case 8:
			 = &SetLookupState[[]byte]{
				Alloc:   ,
				visitFn: visitBinary[int64],
			}
		}
	case arrow.FixedWidthDataType:
		switch .Bytes() {
		case 1:
			 = &SetLookupState[uint8]{
				Alloc:   ,
				visitFn: visitNumeric[uint8],
			}
		case 2:
			 = &SetLookupState[uint16]{
				Alloc:   ,
				visitFn: visitNumeric[uint16],
			}
		case 4:
			 = &SetLookupState[uint32]{
				Alloc:   ,
				visitFn: visitNumeric[uint32],
			}
		case 8:
			 = &SetLookupState[uint64]{
				Alloc:   ,
				visitFn: visitNumeric[uint64],
			}
		default:
			 = &SetLookupState[[]byte]{
				Alloc:   ,
				visitFn: visitFSB,
			}
		}

	default:
		return nil, fmt.Errorf("%w: unsupported type %s for SetLookup functions", arrow.ErrInvalid, .ValueSetType)
	}

	return , .Init()
}

type SetLookupState[ hashing.MemoTypes] struct {
	visitFn      func(*exec.ArraySpan, func() error, func() error) error
	ValueSetType arrow.DataType
	Alloc        memory.Allocator
	Lookup       hashing.TypedMemoTable[]
	// When there are duplicates in value set, memotable indices
	// must be mapped back to indices in the value set
	MemoIndexToValueIndex []int32
	NullIndex             int32
	NullBehavior          NullMatchingBehavior
}

func ( *SetLookupState[]) () arrow.DataType {
	return .ValueSetType
}

func ( *SetLookupState[]) ( SetLookupOptions) error {
	.ValueSetType = .ValueSetType
	.NullBehavior = .NullBehavior
	.MemoIndexToValueIndex = make([]int32, 0, .TotalLen)
	.NullIndex = -1
	 := .ValueSetType.ID()
	if  == arrow.EXTENSION {
		 = .ValueSetType.(arrow.ExtensionType).StorageType().ID()
	}
	,  := newMemoTable(.Alloc, )
	if  != nil {
		return 
	}
	.Lookup = .(hashing.TypedMemoTable[])
	if .Lookup == nil {
		return fmt.Errorf("unsupported type %s for SetLookup functions", .ValueSetType)
	}

	var  int64
	for ,  := range .ValueSet {
		if  := .AddArrayValueSet(&, );  != nil {
			return 
		}
		 += .Len
	}

	,  := .Lookup.GetNull()
	if .NullBehavior != NullMatchingSkip &&  >= 0 {
		.NullIndex = int32()
	}
	return nil
}

func ( *SetLookupState[]) ( *exec.ArraySpan,  int64) error {
	 := 
	return .visitFn(,
		func( ) error {
			 := len(.MemoIndexToValueIndex)
			, ,  := .Lookup.InsertOrGet()
			if  != nil {
				return 
			}

			if ! {
				debug.Assert( == , "inconsistent memo index and size")
				.MemoIndexToValueIndex = append(.MemoIndexToValueIndex, int32())
			} else {
				debug.Assert( < , "inconsistent memo index and size")
			}

			++
			return nil
		}, func() error {
			 := len(.MemoIndexToValueIndex)
			,  := .Lookup.GetOrInsertNull()
			if ! {
				debug.Assert( == , "inconsistent memo index and size")
				.MemoIndexToValueIndex = append(.MemoIndexToValueIndex, int32())
			} else {
				debug.Assert( < , "inconsistent memo index and size")
			}

			++
			return nil
		})
}

func ( lookupState,  *exec.ArraySpan,  *exec.ExecResult) error {
	 := .Type
	if .ID() == arrow.EXTENSION {
		 = .(arrow.ExtensionType).StorageType()
	}

	switch ty := .(type) {
	case arrow.BinaryDataType:
		return isInKernelExec(.(*SetLookupState[[]byte]), , )
	case arrow.FixedWidthDataType:
		switch .Bytes() {
		case 1:
			return isInKernelExec(.(*SetLookupState[uint8]), , )
		case 2:
			return isInKernelExec(.(*SetLookupState[uint16]), , )
		case 4:
			return isInKernelExec(.(*SetLookupState[uint32]), , )
		case 8:
			return isInKernelExec(.(*SetLookupState[uint64]), , )
		default:
			return isInKernelExec(.(*SetLookupState[[]byte]), , )
		}
	default:
		return fmt.Errorf("%w: unsupported type %s for is_in function", arrow.ErrInvalid, .Type)
	}
}

func isInKernelExec[ hashing.MemoTypes]( *SetLookupState[],  *exec.ArraySpan,  *exec.ExecResult) error {
	 := bitutil.NewBitmapWriter(.Buffers[1].Buf, int(.Offset), int(.Len))
	defer .Finish()
	 := bitutil.NewBitmapWriter(.Buffers[0].Buf, int(.Offset), int(.Len))
	defer .Finish()
	 := .NullIndex != -1
	return .visitFn(,
		func( ) error {
			switch {
			case .Lookup.Exists():
				.Set()
				.Set()
			case .NullBehavior == NullMatchingInconclusive && :
				.Clear()
				.Clear()
			default:
				.Clear()
				.Set()
			}

			.Next()
			.Next()
			return nil
		}, func() error {
			switch {
			case .NullBehavior == NullMatchingMatch && :
				.Set()
				.Set()
			case .NullBehavior == NullMatchingSkip || (! && .NullBehavior == NullMatchingMatch):
				.Clear()
				.Set()
			default:
				.Clear()
				.Clear()
			}

			.Next()
			.Next()
			return nil
		})
}