// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18

package compute

import (
	
	

	
	
	
	
	
)

var (
	filterDoc = FunctionDoc{
		Summary: "Filter with a boolean selection filter",
		Description: `The output is populated with values from the input at positions
where the selection filter is non-zero. Nulls in the selection filter
are handled based on FilterOptions.`,
		ArgNames:    []string{"input", "selection_filter"},
		OptionsType: "FilterOptions",
	}
	filterMetaFunc = NewMetaFunction("filter", Binary(), filterDoc,
		func( context.Context,  FunctionOptions,  ...Datum) (Datum, error) {
			if [1].(ArrayLikeDatum).Type().ID() != arrow.BOOL {
				return nil, fmt.Errorf("%w: filter argument must be boolean type",
					arrow.ErrNotImplemented)
			}

			switch [0].Kind() {
			case KindRecord:
				,  := .(*FilterOptions)
				if ! {
					return nil, fmt.Errorf("%w: invalid options type", arrow.ErrInvalid)
				}

				if ,  := [1].(*ArrayDatum);  {
					 := .MakeArray()
					defer .Release()
					,  := FilterRecordBatch(, [0].(*RecordDatum).Value, , )
					if  != nil {
						return nil, 
					}
					return &RecordDatum{Value: }, nil
				}
				return nil, fmt.Errorf("%w: record batch filtering only implemented for Array filter", arrow.ErrNotImplemented)
			case KindTable:
				,  := .(*FilterOptions)
				if ! {
					return nil, fmt.Errorf("%w: invalid options type", arrow.ErrInvalid)
				}

				,  := FilterTable(, [0].(*TableDatum).Value, [1], )
				if  != nil {
					return nil, 
				}
				return &TableDatum{Value: }, nil

			default:
				return CallFunction(, "array_filter", , ...)
			}
		})
	takeDoc = FunctionDoc{
		Summary: "Select values from an input based on indices from another array",
		Description: `The output is populated with values from the input at positions
given by "indices". Nulls in "indices" emit null in the output`,
		ArgNames:    []string{"input", "indices"},
		OptionsType: "TakeOptions",
	}
	takeMetaFunc = NewMetaFunction("take", Binary(), takeDoc,
		func( context.Context,  FunctionOptions,  ...Datum) (Datum, error) {
			 := [1].Kind()
			if  != KindArray &&  != KindChunked {
				return nil, fmt.Errorf("%w: unsupported types for take operation: values=%s, indices=%s",
					arrow.ErrNotImplemented, [0], [1])
			}

			switch [0].Kind() {
			case KindArray:
				return takeArrayImpl(, , ...)
			case KindChunked:
				return takeChunkedImpl(, , ...)
			case KindRecord:
				return takeRecordImpl(, , ...)
			case KindTable:
				return takeTableImpl(, , ...)
			}

			return nil, fmt.Errorf("%w: unsupported types for take operation: values=%s, indices=%s",
				arrow.ErrNotImplemented, [0], [1])
		})
)

func takeTableImpl( context.Context,  FunctionOptions,  ...Datum) (Datum, error) {
	 := [0].(*TableDatum).Value
	 := int(.NumCols())
	 := make([]arrow.Column, )
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()

	,  := errgroup.WithContext()
	.SetLimit(GetExecCtx().NumParallel)
	for  := 0;  < ; ++ {
		 := 
		.Go(func() error {
			 := .Column()
			,  := CallFunction(, "take", ,
				&ChunkedDatum{Value: .Data()},
				[1])
			if  != nil {
				return 
			}
			defer .Release()
			 := .(ArrayLikeDatum)
			 := .Chunks()
			if .Kind() == KindArray {
				defer [0].Release()
			}
			 := arrow.NewChunked(.Type(), )
			defer .Release()
			[] = *arrow.NewColumn(.Field(), )
			return nil
		})
	}

	if  := .Wait();  != nil {
		return nil, 
	}

	 := array.NewTable(.Schema(), , -1)
	return &TableDatum{Value: }, nil
}

func takeRecordImpl( context.Context,  FunctionOptions,  ...Datum) (Datum, error) {
	 := [1]
	if .Kind() == KindChunked {
		,  := array.Concatenate(.(*ChunkedDatum).Chunks(), exec.GetAllocator())
		if  != nil {
			return nil, 
		}
		defer .Release()
		 = &ArrayDatum{Value: .Data()}
	}

	 := [0].(*RecordDatum).Value
	 := .NumCols()
	 := [1].(ArrayLikeDatum).Len()
	 := make([]arrow.Array, )
	defer func() {
		for ,  := range  {
			if  != nil {
				.Release()
			}
		}
	}()

	,  := errgroup.WithContext()
	.SetLimit(GetExecCtx().NumParallel)
	for  := range .Columns() {
		 := 
		.Go(func() error {
			,  := CallFunction(, "array_take", , &ArrayDatum{Value: .Column().Data()}, )
			if  != nil {
				return 
			}
			defer .Release()
			[] = .(*ArrayDatum).MakeArray()
			return nil
		})
	}

	if  := .Wait();  != nil {
		return nil, 
	}

	 := array.NewRecord(.Schema(), , )
	return &RecordDatum{Value: }, nil
}

func takeArrayImpl( context.Context,  FunctionOptions,  ...Datum) (Datum, error) {
	switch [1].Kind() {
	case KindArray:
		return CallFunction(, "array_take", , ...)
	case KindChunked:
		 := [1].(*ChunkedDatum).Chunks()
		 := make([]arrow.Array, len())
		defer func() {
			for ,  := range  {
				if  != nil {
					.Release()
				}
			}
		}()

		,  := errgroup.WithContext()
		.SetLimit(GetExecCtx().NumParallel)
		for  := range  {
			 := 
			.Go(func() error {
				,  := CallFunction(, "array_take", , [0], &ArrayDatum{Value: [].Data()})
				if  != nil {
					return 
				}
				defer .Release()
				[] = .(*ArrayDatum).MakeArray()
				return nil
			})
		}
		if  := .Wait();  != nil {
			return nil, 
		}
		return &ChunkedDatum{
			Value: arrow.NewChunked([0].(*ArrayDatum).Type(), )}, nil
	}

	return nil, fmt.Errorf("%w: unsupported types for take operation: values=%s, indices=%s",
		arrow.ErrNotImplemented, [0], [1])
}

func takeChunkedImpl( context.Context,  FunctionOptions,  ...Datum) (Datum, error) {
	 := [0].(*ChunkedDatum).Value
	var  *arrow.Chunked
	if ,  := [1].(*ArrayDatum);  {
		switch {
		case len(.Chunks()) <= 1:
			var  arrow.Array
			if len(.Chunks()) == 1 {
				 = .Chunk(0)
			} else {
				// no chunks, create an empty one!
				 = array.MakeArrayOfNull(exec.GetAllocator(), .DataType(), 0)
				defer .Release()
			}
			,  := CallFunction(, "array_take", , &ArrayDatum{Value: .Data()}, )
			if  != nil {
				return nil, 
			}
			defer .Release()
			 := .(*ArrayDatum).Chunks()
			defer [0].Release()
			return &ChunkedDatum{Value: arrow.NewChunked([0].DataType(), )}, nil
		case kernels.ChunkedTakeSupported(.DataType()):
			 := .Chunks()
			defer [0].Release()
			 = arrow.NewChunked(.Type(), )
			defer .Release()
		default:
			,  := array.Concatenate(.Chunks(), GetAllocator())
			if  != nil {
				return nil, 
			}
			defer .Release()
			,  := CallFunction(, "array_take", , &ArrayDatum{Value: .Data()}, )
			if  != nil {
				return nil, 
			}
			defer .Release()
			 := .(*ArrayDatum).Chunks()
			defer [0].Release()
			return &ChunkedDatum{Value: arrow.NewChunked([0].DataType(), )}, nil
		}
	} else {
		 = [1].(*ChunkedDatum).Value
	}

	if kernels.ChunkedTakeSupported(.DataType()) {
		return CallFunction(, "array_take", , [0], &ChunkedDatum{Value: })
	}

	,  := array.Concatenate(.Chunks(), GetAllocator())
	if  != nil {
		return nil, 
	}
	defer .Release()
	return CallFunction(, "take", , &ArrayDatum{Value: .Data()}, &ChunkedDatum{Value: })
}

func ( context.Context,  TakeOptions, ,  Datum) (Datum, error) {
	return CallFunction(, "take", &, , )
}

func ( context.Context, ,  arrow.Array) (arrow.Array, error) {
	 := NewDatum()
	 := NewDatum()
	defer .Release()
	defer .Release()

	,  := CallFunction(, "array_take", nil, , )
	if  != nil {
		return nil, 
	}
	defer .Release()

	return .(*ArrayDatum).MakeArray(), nil
}

func ( context.Context, ,  arrow.Array,  TakeOptions) (arrow.Array, error) {
	 := NewDatum()
	 := NewDatum()
	defer .Release()
	defer .Release()

	,  := CallFunction(, "array_take", &, , )
	if  != nil {
		return nil, 
	}
	defer .Release()

	return .(*ArrayDatum).MakeArray(), nil
}

type listArr interface {
	arrow.Array
	ListValues() arrow.Array
}

func selectListImpl( exec.ArrayKernelExec) exec.ArrayKernelExec {
	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		if  := (, , );  != nil {
			return 
		}

		// out.Children[0] contains the child indexes of values that we
		// want to take after processing.
		 := .Values[0].Array.MakeArray().(listArr)
		defer .Release()

		 := .Children[0].MakeArray()
		defer .Release()

		,  := TakeArrayOpts(.Ctx, .ListValues(), , kernels.TakeOptions{BoundsCheck: false})
		if  != nil {
			return 
		}
		defer .Release()

		.Children[0].TakeOwnership(.Data())
		return nil
	}
}

func denseUnionImpl( exec.ArrayKernelExec) exec.ArrayKernelExec {
	return func( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
		if  := (, , );  != nil {
			return 
		}

		 := .Values[0].Array.MakeArray().(*array.DenseUnion)
		defer .Release()

		,  := errgroup.WithContext(.Ctx)
		.SetLimit(GetExecCtx(.Ctx).NumParallel)

		for  := 0;  < .NumFields(); ++ {
			 := 
			.Go(func() error {
				 := .Field()
				 := .Children[].MakeArray()
				defer .Release()
				,  := TakeArrayOpts(, , , kernels.TakeOptions{})
				if  != nil {
					return 
				}
				defer .Release()
				.Children[].TakeOwnership(.Data())
				return nil
			})
		}

		return .Wait()
	}
}

func extensionFilterImpl( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	 := .Values[0].Array.MakeArray().(array.ExtensionArray)
	defer .Release()

	 := .Values[1].Array.MakeArray()
	defer .Release()
	,  := FilterArray(.Ctx, .Storage(), , FilterOptions(.State.(kernels.FilterState)))
	if  != nil {
		return 
	}
	defer .Release()

	.TakeOwnership(.Data())
	.Type = .DataType()
	return nil
}

func extensionTakeImpl( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	 := .Values[0].Array.MakeArray().(array.ExtensionArray)
	defer .Release()

	 := .Values[1].Array.MakeArray()
	defer .Release()
	,  := TakeArrayOpts(.Ctx, .Storage(), , TakeOptions(.State.(kernels.TakeState)))
	if  != nil {
		return 
	}
	defer .Release()

	.TakeOwnership(.Data())
	.Type = .DataType()
	return nil
}

func structFilter( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	// transform filter to selection indices and use take
	,  := kernels.GetTakeIndices(exec.GetAllocator(.Ctx),
		&.Values[1].Array, .State.(kernels.FilterState).NullSelection)
	if  != nil {
		return 
	}
	defer .Release()

	 := NewDatum()
	defer .Release()

	 := .Values[0].Array.MakeData()
	defer .Release()

	 := NewDatum()
	defer .Release()

	,  := Take(.Ctx, kernels.TakeOptions{BoundsCheck: false}, , )
	if  != nil {
		return 
	}
	defer .Release()

	.TakeOwnership(.(*ArrayDatum).Value)
	return nil
}

func structTake( *exec.KernelCtx,  *exec.ExecSpan,  *exec.ExecResult) error {
	// generate top level validity bitmap
	if  := kernels.TakeExec(kernels.StructImpl)(, , );  != nil {
		return 
	}

	 := .Values[0].Array.MakeArray().(*array.Struct)
	defer .Release()

	// select from children without bounds checking
	.Children = make([]exec.ArraySpan, .NumField())
	,  := errgroup.WithContext(.Ctx)
	.SetLimit(GetExecCtx(.Ctx).NumParallel)

	 := .Values[1].Array.MakeArray()
	defer .Release()

	for  := range .Children {
		 := 
		.Go(func() error {
			,  := TakeArrayOpts(, .Field(), , kernels.TakeOptions{BoundsCheck: false})
			if  != nil {
				return 
			}
			defer .Release()

			.Children[].TakeOwnership(.Data())
			return nil
		})
	}

	return .Wait()
}

// RegisterVectorSelection registers functions that select specific
// values from arrays such as Take and Filter
func ( FunctionRegistry) {
	filterMetaFunc.defaultOpts = DefaultFilterOptions()
	takeMetaFunc.defaultOpts = DefaultTakeOptions()
	.AddFunction(filterMetaFunc, false)
	.AddFunction(takeMetaFunc, false)
	,  := kernels.GetVectorSelectionKernels()

	 = append(, []kernels.SelectionKernelData{
		{In: exec.NewIDInput(arrow.LIST), Exec: selectListImpl(kernels.FilterExec(kernels.ListImpl[int32]))},
		{In: exec.NewIDInput(arrow.LARGE_LIST), Exec: selectListImpl(kernels.FilterExec(kernels.ListImpl[int64]))},
		{In: exec.NewIDInput(arrow.FIXED_SIZE_LIST), Exec: selectListImpl(kernels.FilterExec(kernels.FSLImpl))},
		{In: exec.NewIDInput(arrow.DENSE_UNION), Exec: denseUnionImpl(kernels.FilterExec(kernels.DenseUnionImpl))},
		{In: exec.NewIDInput(arrow.EXTENSION), Exec: extensionFilterImpl},
		{In: exec.NewIDInput(arrow.STRUCT), Exec: structFilter},
	}...)

	 = append(, []kernels.SelectionKernelData{
		{In: exec.NewIDInput(arrow.LIST), Exec: selectListImpl(kernels.TakeExec(kernels.ListImpl[int32]))},
		{In: exec.NewIDInput(arrow.LARGE_LIST), Exec: selectListImpl(kernels.TakeExec(kernels.ListImpl[int64]))},
		{In: exec.NewIDInput(arrow.FIXED_SIZE_LIST), Exec: selectListImpl(kernels.TakeExec(kernels.FSLImpl))},
		{In: exec.NewIDInput(arrow.DENSE_UNION), Exec: denseUnionImpl(kernels.TakeExec(kernels.DenseUnionImpl))},
		{In: exec.NewIDInput(arrow.EXTENSION), Exec: extensionTakeImpl},
		{In: exec.NewIDInput(arrow.STRUCT), Exec: structTake},
	}...)

	 := NewVectorFunction("array_filter", Binary(), EmptyFuncDoc)
	.defaultOpts = &kernels.FilterOptions{}

	 := exec.NewExactInput(arrow.FixedWidthTypes.Boolean)
	 := exec.NewVectorKernelWithSig(nil, nil, exec.OptionsInit[kernels.FilterState])
	for ,  := range  {
		.Signature = &exec.KernelSignature{
			InputTypes: []exec.InputType{.In, },
			OutType:    kernels.OutputFirstType,
		}
		.ExecFn = .Exec
		.ExecChunked = .Chunked
		.AddKernel()
	}
	.AddFunction(, false)

	 = NewVectorFunction("array_take", Binary(), EmptyFuncDoc)
	.defaultOpts = DefaultTakeOptions()

	 = exec.NewMatchedInput(exec.Integer())
	 = exec.NewVectorKernelWithSig(nil, nil, exec.OptionsInit[kernels.TakeState])
	.CanExecuteChunkWise = false
	for ,  := range  {
		.Signature = &exec.KernelSignature{
			InputTypes: []exec.InputType{.In, },
			OutType:    kernels.OutputFirstType,
		}

		.ExecFn = .Exec
		.ExecChunked = .Chunked
		.AddKernel()
	}
	.AddFunction(, false)
}

// Filter is a wrapper convenience that is equivalent to calling
// CallFunction(ctx, "filter", &options, values, filter) for filtering
// an input array (values) by a boolean array (filter). The two inputs
// must be the same length.
func ( context.Context, ,  Datum,  FilterOptions) (Datum, error) {
	return CallFunction(, "filter", &, , )
}

// FilterArray is a convenience method for calling Filter without having
// to manually construct the intervening Datum objects (they will be
// created for you internally here).
func ( context.Context, ,  arrow.Array,  FilterOptions) (arrow.Array, error) {
	 := NewDatum()
	 := NewDatum()
	defer .Release()
	defer .Release()

	,  := Filter(, , , )
	if  != nil {
		return nil, 
	}

	defer .Release()
	return .(*ArrayDatum).MakeArray(), nil
}

func ( context.Context,  arrow.RecordBatch,  arrow.Array,  *FilterOptions) (arrow.RecordBatch, error) {
	if .NumRows() != int64(.Len()) {
		return nil, fmt.Errorf("%w: filter inputs must all be the same length", arrow.ErrInvalid)
	}

	var  exec.ArraySpan
	.SetMembers(.Data())

	,  := kernels.GetTakeIndices(exec.GetAllocator(), &, .NullSelection)
	if  != nil {
		return nil, 
	}
	defer .Release()

	 := array.MakeFromData()
	defer .Release()

	 := make([]arrow.Array, .NumCols())
	defer func() {
		for ,  := range  {
			if  != nil {
				.Release()
			}
		}
	}()
	,  := errgroup.WithContext()
	.SetLimit(GetExecCtx().NumParallel)
	for ,  := range .Columns() {
		,  := , 
		.Go(func() error {
			,  := TakeArrayOpts(, , , kernels.TakeOptions{BoundsCheck: false})
			if  != nil {
				return 
			}
			[] = 
			return nil
		})
	}

	if  := .Wait();  != nil {
		return nil, 
	}

	return array.NewRecord(.Schema(), , int64(.Len())), nil
}

func ( context.Context,  arrow.Table,  Datum,  *FilterOptions) (arrow.Table, error) {
	if .NumRows() != .Len() {
		return nil, fmt.Errorf("%w: filter inputs must all be the same length", arrow.ErrInvalid)
	}

	if .NumRows() == 0 {
		 := make([]arrow.Column, .NumCols())
		for  := 0;  < int(.NumCols()); ++ {
			[] = *.Column()
		}
		return array.NewTable(.Schema(), , 0), nil
	}

	// last input element will be the filter array
	 := .NumCols()
	 := make([][]arrow.Array, +1)
	for  := int64(0);  < ; ++ {
		[] = .Column(int()).Data().Chunks()
	}

	switch ft := .(type) {
	case *ArrayDatum:
		[] = .Chunks()
		defer [][0].Release()
	case *ChunkedDatum:
		[] = .Chunks()
	default:
		return nil, fmt.Errorf("%w: filter should be array-like", arrow.ErrNotImplemented)
	}

	// rechunk inputs to allow consistent iteration over the respective chunks
	 = exec.RechunkArraysConsistently()

	// instead of filtering each column with the boolean filter
	// (which would be slow if the table has a large number of columns)
	// convert each filter chunk to indices and take() the column
	 := GetAllocator()
	 := make([][]arrow.Array, )
	// pre-size the output
	 := len([])
	for  := range  {
		[] = make([]arrow.Array, )
	}
	var  int64
	var  context.CancelFunc
	,  = context.WithCancel()
	defer ()

	,  := errgroup.WithContext()
	.SetLimit(GetExecCtx().NumParallel)

	var  exec.ArraySpan
	for ,  := range [] {
		.SetMembers(.Data())
		,  := kernels.GetTakeIndices(, &, .NullSelection)
		if  != nil {
			return nil, 
		}
		defer .Release()
		.Release()
		if .Len() == 0 {
			for  := int64(0);  < ; ++ {
				[][].Release()
			}
			continue
		}

		// take from all input columns
		 += int64(.Len())
		 := NewDatum()
		defer .Release()

		for  := int64(0);  < ; ++ {
			 := [][]
			defer .Release()
			 := 
			 := 
			.Go(func() error {
				 := NewDatum()
				defer .Release()
				,  := Take(, kernels.TakeOptions{BoundsCheck: false}, , )
				if  != nil {
					return 
				}
				defer .Release()
				[][] = .(*ArrayDatum).MakeArray()
				return nil
			})
		}
	}

	if  := .Wait();  != nil {
		return nil, 
	}

	 := make([]arrow.Column, )
	for ,  := range  {
		 := arrow.NewChunked(.Column().DataType(), )
		[] = *arrow.NewColumn(.Schema().Field(), )
		defer [].Release()
		.Release()
	}

	return array.NewTable(.Schema(), , ), nil
}