package pqarrow

import (
	
	
	
	
	
	

	
	
	
	

	
	
	
	
	
)

// ParquetRowGroupToArrowSchema converts a parquet row group to an arrow schema.
func ( context.Context,  parquet.RowGroup,  *dynparquet.Schema,  logicalplan.IterOptions) (*arrow.Schema, error) {
	return ParquetSchemaToArrowSchema(, .Schema(), , )
}

func ( context.Context,  *parquet.Schema,  *dynparquet.Schema,  logicalplan.IterOptions) (*arrow.Schema, error) {
	 := .Fields()

	if len(.DistinctColumns) == 1 && .Filter == nil {
		// We can use the faster path for a single distinct column by just
		// returning its dictionary.
		 := make([]arrow.Field, 0, 1)
		for ,  := range  {
			select {
			case <-.Done():
				return nil, .Err()
			default:
				,  := parquetFieldToArrowField("", , .DistinctColumns)
				if  != nil {
					return nil, 
				}
				if .Name != "" {
					 = append(, )
				}
			}
		}
		return arrow.NewSchema(, nil), nil
	}

	 := make([]arrow.Field, 0, len())
	for ,  := range  {
		,  := parquetFieldToArrowField("", , .PhysicalProjection)
		if  != nil {
			return nil, 
		}
		if .Name != "" {
			 = append(, )
		}
	}

	if len(.DistinctColumns) > 0 {
		for ,  := range .DistinctColumns {
			if .Computed() {
				// Usually we would pass the logical query plan as the data type
				// finder, but we're here because of an intended layering
				// violation, which is pushing distinct queries down to the scan
				// layer. In this case there are no other possible physical types
				// other than the actual schema, so we can just implement a
				// simplified version of the type finder that doesn't need to
				// traverse the logical plan, since this is already the physical
				// scan layer execution.
				,  := .DataType(&exprTypeFinder{s: })
				if  != nil {
					return nil, 
				}
				 = append(, arrow.Field{
					Name:     .Name(),
					Type:     ,
					Nullable: true, // TODO: This should be determined by the expression and underlying column(s).
				})
			}
		}

		// Need to sort as the distinct columns are just appended, but we need
		// the schema to be sorted. If we didn't sort them here, then
		// subsequent schemas would be in a different order as
		// `mergeArrowSchemas` sorts fields by name.
		sort.Slice(, func(,  int) bool {
			return [].Name < [].Name
		})
	}

	return arrow.NewSchema(, nil), nil
}

type exprTypeFinder struct {
	s *dynparquet.Schema
}

func ( *exprTypeFinder) ( logicalplan.Expr) (arrow.DataType, error) {
	return logicalplan.DataTypeForExprWithSchema(, .s)
}

func parquetFieldToArrowField( string,  parquet.Field,  []logicalplan.Expr) (arrow.Field, error) {
	if includedProjection(, fullPath(, )) {
		,  := convert.ParquetFieldToArrowField()
		if  != nil {
			return arrow.Field{}, 
		}
		return , nil
	}

	if !.Leaf() && includedPathProjection(, fullPath(, )) {
		 := []arrow.Field{}
		for ,  := range .Fields() {
			,  := (fullPath(, ), , )
			if  != nil {
				return arrow.Field{}, 
			}
			if .Name != "" {
				 = append(, )
			}
		}
		if len() > 0 {
			return arrow.Field{
				Name:     .Name(),
				Type:     arrow.StructOf(...),
				Nullable: .Optional(),
			}, nil
		}
	}

	return arrow.Field{}, nil
}

func fullPath( string,  parquet.Field) string {
	if  == "" {
		return .Name()
	}
	return  + "." + .Name()
}

func includedPathProjection( []logicalplan.Expr,  string) bool {
	if len() == 0 {
		return true
	}

	for ,  := range  {
		if .MatchPath() {
			return true
		}
	}
	return false
}

func includedProjection( []logicalplan.Expr,  string) bool {
	if len() == 0 {
		return true
	}

	for ,  := range  {
		if .MatchColumn() {
			return true
		}
	}
	return false
}

type parquetConverterMode int

const (
	// normal is the ParquetConverter's normal execution mode. No special
	// optimizations are applied.
	normal parquetConverterMode = iota
	// singleDistinctColumn is an execution mode when a single distinct column
	// is specified with no filter.
	singleDistinctColumn
	// multiDistinctColumn is an execution mode where there are multiple
	// distinct columns specified with no filter. Note that only "simple"
	// distinct expressions are supported in this mode (i.e. multiple columns
	// are not specified in the same distinct expression).
	multiDistinctColumn
)

// singleDistinctColumn is unused for now, see TODO in execution code.
var _ = singleDistinctColumn

// distinctColInfo stores metadata for a distinct expression.
type distinctColInfo struct {
	// parquetIndex is the index of the physical parquet column the distinct
	// expression reads from.
	parquetIndex int

	// v may be used in cases to store a literal expression value.
	v *parquet.Value

	// w and b are fields that the output is written to.
	w writer.ValueWriter
	b builder.ColumnBuilder
}

// ParquetConverter converts parquet.RowGroups into arrow.Records. The converted
// results are accumulated in the converter and can be retrieved by calling
// NewRecord, at which point the converter is reset.
type ParquetConverter struct {
	mode parquetConverterMode

	pool memory.Allocator
	// distinctColumns and distinctColInfos have a 1:1 mapping.
	distinctColInfos []*distinctColInfo

	// Output fields, for each outputSchema.Field(i) there will always be a
	// corresponding builder.Field(i).
	outputSchema *arrow.Schema
	iterOpts     logicalplan.IterOptions
	builder      *builder.RecordBuilder

	// writers are wrappers over a subset of builder.Fields().
	writers []MultiColumnWriter

	// prevSchema is stored to check for a different parquet schema on each
	// Convert call. This avoids performing duplicate work (e.g. finding
	// distinct column indices).
	prevSchema *parquet.Schema

	// scratchValues is an array of parquet.Values that is reused during
	// decoding to avoid allocations.
	scratchValues []parquet.Value
}

func (
	 memory.Allocator,
	 logicalplan.IterOptions,
) *ParquetConverter {
	 := &ParquetConverter{
		mode:             normal,
		pool:             ,
		iterOpts:         ,
		distinctColInfos: make([]*distinctColInfo, len(.DistinctColumns)),
	}

	if .Filter == nil && len(.DistinctColumns) != 0 {
		 := true
		for ,  := range .DistinctColumns {
			if ,  := .(*logicalplan.DynamicColumn);  ||
				len(.ColumnsUsedExprs()) != 1 {
				 = false
				break
			}
		}
		if  {
			// TODO(asubiotto): Note that the singleDistinctColumn mode is not
			// used yet given a bug in the current optimization (it was never
			// executed).
			.mode = multiDistinctColumn
		}
	}

	return 
}

func ( *ParquetConverter) ( context.Context,  parquet.RowGroup,  *dynparquet.Schema) error {
	,  := ParquetRowGroupToArrowSchema(, , , .iterOpts)
	if  != nil {
		return 
	}
	// If the schema has no fields we simply ignore this RowGroup that has no data.
	if .NumFields() == 0 {
		return nil
	}

	if .outputSchema == nil {
		.outputSchema = 
		.builder = builder.NewRecordBuilder(.pool, .outputSchema)
	} else if !.Equal(.outputSchema) { // new output schema; append new fields onto record builder
		.outputSchema = mergeArrowSchemas([]*arrow.Schema{.outputSchema, })
		.builder.ExpandSchema(.outputSchema)
		// Since we expanded the field we need to append nulls to the new field to match the max column length
		if , ,  := recordBuilderLength(.builder);  {
			for ,  := range .builder.Fields() {
				if  := .Len();  <  {
					if ,  := .(builder.OptimizedBuilder);  {
						.AppendNulls( - )
						continue
					}
					// If the column is not the same length as the maximum length
					// column, we need to append NULL as often as we have rows
					// TODO: Is there a faster or better way?
					for  := 0;  < -; ++ {
						.AppendNull()
					}
				}
			}
		}
	}

	if ,  := .(*dynparquet.MergedRowGroup);  {
		return rowBasedParquetRowGroupToArrowRecord(, , .outputSchema, .builder)
	}

	 := .Schema()
	 := .ColumnChunks()
	 := .Fields()

	if !parquetSchemaEqual(.prevSchema, ) {
		if  := .schemaChanged();  != nil {
			return 
		}
		.prevSchema = 
	}

	if .mode == multiDistinctColumn {
		// Since we're not filtering, we can use a faster path for distinct
		// columns. If all the distinct columns are dictionary encoded, we can
		// check their dictionaries and if all of them have a single value, we
		// can just return a single row with each of their values.
		,  := .writeDistinctAllColumns(
			,
			,
			,
		)
		if  != nil {
			return 
		}

		if  {
			return nil
		}
		// If we get here, we couldn't use the fast path.
	}

	for ,  := range .writers {
		for ,  := range .colIdx {
			select {
			case <-.Done():
				return .Err()
			default:
				if  := .writeColumnToArray(
					[.fieldIdx],
					[],
					false,
					.writer,
				);  != nil {
					return fmt.Errorf("convert parquet column to arrow array: %w", )
				}
			}
		}
	}

	, ,  := recordBuilderLength(.builder)
	if ! {
		return nil
	}

	for ,  := range .builder.Fields() {
		if  := .Len();  <  {
			if ,  := .(builder.OptimizedBuilder);  {
				.AppendNulls( - )
				continue
			}
			// If the column is not the same length as the maximum length
			// column, we need to append NULL as often as we have rows
			// TODO: Is there a faster or better way?
			for  := 0;  < -; ++ {
				.AppendNull()
			}
		}
	}

	return nil
}

func ( *ParquetConverter) () []builder.ColumnBuilder {
	if .builder == nil {
		return nil
	}
	return .builder.Fields()
}

func ( *ParquetConverter) () int {
	// NumRows assumes all fields have the same length. If not, this is a bug.
	return .builder.Field(0).Len()
}

func ( *ParquetConverter) () arrow.Record {
	if .builder != nil {
		return .builder.NewRecord()
	}

	return nil
}

func ( *ParquetConverter) () {
	if .builder != nil {
		.builder.Reset()
	}
}

func ( *ParquetConverter) () {
	if .builder != nil {
		.builder.Release()
	}
}

func numLeaves( parquet.Field) int {
	if .Leaf() {
		return 1
	}

	 := 0
	for ,  := range .Fields() {
		switch .Leaf() {
		case true:
			++
		default:
			 += ()
		}
	}

	return 
}

// schemaChanged is called when a rowgroup to convert has a different schema
// than previously seen. This causes a recalculation of helper fields.
func ( *ParquetConverter) ( []parquet.Field) error {
	.writers = .writers[:0]
	 := make(map[int]writer.ValueWriter)
	 := 0
	for ,  := range  {
		 := .outputSchema.FieldIndices(.Name())
		if len() == 0 {
			// This column can be skipped, it's not needed by the output.
			 += numLeaves()
			continue
		}

		,  := convert.GetWriter(, )
		if  != nil {
			return 
		}
		 := (.builder.Field([0]), 0)
		 := make([]int, numLeaves())
		for  := range  {
			[] = 
			++
		}
		.writers = append(.writers, MultiColumnWriter{
			writer:   ,
			fieldIdx: ,
			colIdx:   ,
		})
		[] = 
	}

	if .mode != multiDistinctColumn {
		return nil
	}

	// For distinct columns, we need to iterate to find the physical parquet
	// column to read from. Note that a sanity check has already been completed
	// in the constructor to ensure that only one column per distinct expression
	// is read in multiDistinctColumn mode.
	for  := range .iterOpts.DistinctColumns {
		.distinctColInfos[] = nil
	}

	for ,  := range .iterOpts.DistinctColumns {
		for ,  := range  {
			if !.ColumnsUsedExprs()[0].MatchColumn(.Name()) {
				continue
			}

			.distinctColInfos[] = &distinctColInfo{
				parquetIndex: ,
				w:            [],
				b:            .builder.Field(.outputSchema.FieldIndices(.Name())[0]),
			}
		}
	}
	return nil
}

func ( *ParquetConverter) (
	 context.Context,
	 []parquet.Field,
	 []parquet.ColumnChunk,
) (bool, error) {
	 := .NumRows()

	for ,  := range .distinctColInfos {
		if  == nil {
			// The parquet field the distinct expression operates on was not
			// found in this row group, skip it.
			continue
		}
		select {
		case <-.Done():
			return false, .Err()
		default:
			,  := .writeDistinctSingleColumn(
				[.parquetIndex],
				[.parquetIndex],
				.iterOpts.DistinctColumns[],
				,
			)
			if  != nil {
				return false, 
			}

			if ! {
				// Builder must be reset to its initial length in case other
				// columns were processed before this one.
				for ,  := range .builder.Fields() {
					if .Len() ==  {
						continue
					}
					resetBuilderToLength(, )
				}
				return false, nil
			}
		}
	}

	 := false
	for ,  := range .builder.Fields() {
		if .Len() >  {
			 = true
			break
		}
	}
	if ! {
		// Exit early if no rows were written.
		return true, nil
	}

	 := 0
	 := 0
	for ,  := range .builder.Fields() {
		 := .Len()
		if  > +1 {
			++
			if  > 1 {
				break
			}
		}
		if  >  {
			 = 
		}
	}

	if  > 1 {
		// More than one column had more than one distinct value. This means
		// that there is no way to know the correct number of distinct values,
		// so we must fall back to the non-optimized path.
		for ,  := range .builder.Fields() {
			if .Len() ==  {
				continue
			}
			resetBuilderToLength(, )
		}
		return false, nil
	}

	// At this point we know there is at most one column with more than one
	// row. Therefore we can repeat the values of the other columns as those
	// are the only possible combinations within the rowgroup.

	for ,  := range .builder.Fields() {
		// Columns that had no values are just backfilled with null values.
		if  := .Len();  ==  {
			if ,  := .(builder.OptimizedBuilder);  {
				.AppendNulls( - )
				continue
			}
			for  := ;  < ; ++ {
				.AppendNull()
			}
		} else if  <  {
			 :=  - 
			if ,  := .(builder.OptimizedBuilder);  {
				if  := .RepeatLastValue();  != nil {
					return false, 
				}
				continue
			}
			 := .NewArray()
			// TODO(asubiotto): NewArray resets the builder, copy all the values
			// again. There *must* be a better way to do this.
			copyArrToBuilder(, , )
			repeatLastValue(, , )
			.Release()
		}
	}
	return true, nil
}

// writeDistinctSingleColumn checks if the distinct expression can be optimized
// at the scan level and returns whether the optimization was successful or not.
// Writer and builder point to the same memory and are both passed in for
// convenience (TODO(asubiotto): This should be cleaned up by having
// binaryDistinctExpr write to a writer instead of a builder or extending the
// writer interface).
func ( *ParquetConverter) (
	 parquet.Node,
	 parquet.ColumnChunk,
	 logicalplan.Expr,
	 *distinctColInfo,
) (bool, error) {
	switch expr := .(type) {
	case *logicalplan.BinaryExpr:
		return binaryDistinctExpr(
			.Type(),
			,
			,
			,
		)
	case *logicalplan.Column:
		if  := .writeColumnToArray(
			,
			,
			true,
			.w,
		);  != nil {
			return false, 
		}
		return true, nil
	default:
		return false, nil
	}
}

var rowBufPool = &sync.Pool{
	New: func() interface{} {
		return make([]parquet.Row, 64) // Random guess.
	},
}

// rowBasedParquetRowGroupToArrowRecord converts a parquet row group to an arrow
// record row by row. The result is appended to b.
func rowBasedParquetRowGroupToArrowRecord(
	 context.Context,
	 parquet.RowGroup,
	 *arrow.Schema,
	 *builder.RecordBuilder,
) error {
	 := .Schema().Fields()

	if .NumFields() != len() {
		return fmt.Errorf("inconsistent schema between arrow and parquet")
	}

	// Create arrow writers from arrow and parquet schema
	 := make([]writer.ValueWriter, len())
	for ,  := range .Fields() {
		,  := convert.GetWriter(, [])
		if  != nil {
			return 
		}
		[] = (, 0)
	}

	 := .Rows()
	defer .Close()
	 := rowBufPool.Get().([]parquet.Row)
	//nolint:staticcheck
	defer rowBufPool.Put([:cap()])

	for {
		select {
		case <-.Done():
			return .Err()
		default:
		}
		 = [:cap()]
		,  := .ReadRows()
		if  == io.EOF &&  == 0 {
			break
		}
		if  != nil &&  != io.EOF {
			return fmt.Errorf("read row: %w", )
		}
		 = [:]

		for ,  := range  {
			for ,  := range  {
				 := dynparquet.ValuesForIndex(, )
				.Write()
			}
		}
		if  == io.EOF {
			break
		}
	}

	return nil
}

// writeColumnToArray writes the values of a single parquet column to an arrow
// array. It will attempt to make shortcuts if possible to not read the whole
// column. Possilibities why it might not read the whole column:
//
// * If it has been requested to only read the dictionary it will only do that
// (provided it's not a repeated type). Additionally, decompression of all pages
// are avoided if the column index indicates that there is only one value in the
// column.
//
// If the type is a repeated type it will also write the starting offsets of
// lists to the list builder.
func ( *ParquetConverter) (
	 parquet.Node,
	 parquet.ColumnChunk,
	 bool,
	 writer.ValueWriter,
) error {
	 := .Repeated()
	if ! &&  {
		// Check all the page indexes of the column chunk. If they are
		// trustworthy and there is only one value contained in the column
		// chunk, we can avoid reading any pages and construct a dictionary from
		// the index values.
		// TODO(asubiotto): This optimization can be applied at a finer
		// granularity at the page level as well.
		,  := .ColumnIndex()
		if  != nil {
			return 
		}
		 := .Type()

		 := .MinValue(0)
		 := false
		for  := 0;  < .NumPages(); ++ {
			if .NullCount() > 0 {
				// NULLs are not represented in the column index, so fall back
				// to the non-optimized path.
				// TODO(asubiotto): This is unexpected, verify upstream.
				 = true
				break
			}

			 := 
			if  != 0 {
				// MinValue/MaxValue are relatively expensive calls, so we avoid
				// them as much as possible.
				 = .MinValue()
			}
			 := .MaxValue()
			if .Length() == 0 {
				// Variable-length datatype. The index can only be trusted if
				// the size of the values is less than the column index size,
				// since we cannot otherwise know if the index values are
				// truncated.
				if len(.Bytes()) >= dynparquet.ColumnIndexSize ||
					len(.Bytes()) >= dynparquet.ColumnIndexSize {
					 = true
					break
				}
			}

			if .Compare(, ) != 0 ||
				( != 0 && .Compare(, ) != 0) {
				 = true
				break
			}
		}

		if ! {
			.Write([]parquet.Value{})
			return nil
		}
	}

	 := .Pages()
	defer .Close()
	for {
		,  := .ReadPage()
		if  != nil {
			if  == io.EOF {
				break
			}
			return fmt.Errorf("read page: %w", )
		}

		 := .Dictionary()
		if  != nil &&  {
			// We only want distinct values; write only the dictionary page.
			if .NumNulls() > 0 {
				// Since dictionary pages do not represent nulls, write a null
				// value if the non-dictionary page has at least one null.
				.Write([]parquet.Value{parquet.NullValue()})
			}
			 = .Page()
		}

		if ,  := .(writer.PageWriter);  {
			 := .WritePage()
			if  == nil {
				continue
			} else if  != nil && !errors.Is(, writer.ErrCannotWritePageDirectly) {
				return fmt.Errorf("write page: %w", )
			}
			// Could not write page directly, fall through to slow path.
		}

		// Write values using the slow path.
		 := .NumValues()
		if int64(cap(.scratchValues)) <  {
			.scratchValues = make([]parquet.Value, )
		}
		.scratchValues = .scratchValues[:]

		// We're reading all values in the page so we always expect an io.EOF.
		 := .Values()
		if ,  := .ReadValues(.scratchValues);  != nil &&  != io.EOF {
			return fmt.Errorf("read values: %w", )
		}

		.Write(.scratchValues)
	}

	return nil
}

// SingleMatchingColumn returns true if there is only a single matching column for the given column matchers.
func ( []logicalplan.Expr,  []parquet.Field) bool {
	 := 0
	for ,  := range  {
		for ,  := range  {
			 := .Name()
			if .MatchColumn() {
				++
				if  > 1 {
					return false
				}
			}
		}
	}

	return  == 1
}

// recordBuilderLength returns the maximum length of all of the
// array.RecordBuilder's fields, the number of columns that have this maximum
// length, and a boolean for convenience to indicate if this last number is
// equal to the number of fields in the RecordBuilder (i.e. there is no anomaly
// in the length of each field).
func recordBuilderLength( *builder.RecordBuilder) (,  int,  bool) {
	 := .Fields()
	 = [0].Len()
	 = 0
	for ,  := range  {
		if  := .Len();  !=  {
			if  >  {
				 = 1
				 = 
			}
		} else {
			++
		}
	}
	return , ,  != len(.Fields())
}

// parquetSchemaEqual returns whether the two input schemas are equal. For now,
// only the field names are checked. In the future, it might be good to flesh
// out this check and commit it upstream.
func parquetSchemaEqual(,  *parquet.Schema) bool {
	switch {
	case  == :
		return true
	case  == nil ||  == nil:
		return false
	case len(.Fields()) != len(.Fields()):
		return false
	}

	 := .Fields()
	 := .Fields()

	for  := range  {
		if [].Name() != [].Name() {
			return false
		}
	}

	return true
}

type PreExprVisitorFunc func(expr logicalplan.Expr) bool

func ( PreExprVisitorFunc) ( logicalplan.Expr) bool {
	return ()
}

func ( PreExprVisitorFunc) ( logicalplan.Expr) bool {
	return false
}

func ( PreExprVisitorFunc) ( logicalplan.Expr) bool {
	return false
}

// binaryDistinctExpr checks the columnChunk's column index to see if the
// expression can be evaluated on the index without reading the page values.
// Returns whether the optimization was successful or not.
func binaryDistinctExpr(
	 parquet.Type,
	 parquet.ColumnChunk,
	 *logicalplan.BinaryExpr,
	 *distinctColInfo,
) (bool, error) {
	if .v == nil {
		var (
			 parquet.Value
			   error
		)
		.Right.Accept(PreExprVisitorFunc(func( logicalplan.Expr) bool {
			switch e := .(type) {
			case *logicalplan.LiteralExpr:
				,  = ArrowScalarToParquetValue(.Value)
				return false
			}
			return true
		}))
		if  != nil {
			return false, 
		}
		.v = &
	}

	 := *.v
	switch .Op {
	case logicalplan.OpGt:
		,  := .ColumnIndex()
		if  != nil {
			return false, 
		}
		,  := allOrNoneGreaterThan(
			,
			,
			,
		)

		if  ||  {
			 := .b.(*builder.OptBooleanBuilder)
			if  {
				.AppendParquetValues([]parquet.Value{parquet.ValueOf(true)})
			}
			if  {
				.AppendParquetValues([]parquet.Value{parquet.ValueOf(false)})
			}
			return true, nil
		}
	default:
		return false, nil
	}

	return false, nil
}

func allOrNoneGreaterThan(
	 parquet.Type,
	 parquet.ColumnIndex,
	 parquet.Value,
) (bool, bool) {
	 := .NumPages()
	 := true
	 := true
	for  := 0;  < ; ++ {
		 := .MinValue()
		 := .MaxValue()

		if .Compare(, ) <= 0 {
			 = false
		}

		if .Compare(, ) > 0 {
			 = false
		}
	}

	return , 
}

// resetBuilderToLength resets the builder to the given length, it is logically
// equivalent to b = b[0:l]. It is unfortunately pretty expensive, since there
// is currently no way to recreate a builder from a sliced array.
func resetBuilderToLength( builder.ColumnBuilder,  int) {
	if ,  := .(builder.OptimizedBuilder);  {
		.ResetToLength()
		return
	}
	 := .NewArray()
	copyArrToBuilder(, , )
	.Release()
}

func copyArrToBuilder( builder.ColumnBuilder,  arrow.Array,  int) {
	// TODO(asubiotto): Is there a better way to do this in the arrow
	// library? Maybe by copying buffers over, but I'm not sure if it's
	// cheaper to convert the byte slices to valid slices/offsets.
	// In any case, we should probably move this to a utils file.
	// One other idea is to create a thin layer on top of a builder that
	// only flushes writes when told to (will help with all these
	// optimizations where we aren't sure we can apply them until the end).
	.Reserve()
	switch arr := .(type) {
	case *array.Boolean:
		 := .(*array.BooleanBuilder)
		for  := 0;  < ; ++ {
			if .IsNull() {
				.UnsafeAppendBoolToBitmap(false)
			} else {
				.UnsafeAppend(.Value())
			}
		}
	case *array.Binary:
		 := .(*array.BinaryBuilder)
		for  := 0;  < ; ++ {
			if .IsNull() {
				// We cannot use unsafe appends with the binary builder
				// because offsets won't be appended.
				.AppendNull()
			} else {
				.Append(.Value())
			}
		}
	case *array.String:
		 := .(*array.BinaryBuilder)
		for  := 0;  < ; ++ {
			if .IsNull() {
				// We cannot use unsafe appends with the binary builder
				// because offsets won't be appended.
				.AppendNull()
			} else {
				.AppendString(.Value())
			}
		}
	case *array.Int64:
		 := .(*array.Int64Builder)
		for  := 0;  < ; ++ {
			if .IsNull() {
				.UnsafeAppendBoolToBitmap(false)
			} else {
				.UnsafeAppend(.Value())
			}
		}
	case *array.Uint64:
		 := .(*array.Uint64Builder)
		for  := 0;  < ; ++ {
			if .IsNull() {
				.UnsafeAppendBoolToBitmap(false)
			} else {
				.UnsafeAppend(.Value())
			}
		}
	case *array.Float64:
		 := .(*array.Float64Builder)
		for  := 0;  < ; ++ {
			if .IsNull() {
				.UnsafeAppendBoolToBitmap(false)
			} else {
				.UnsafeAppend(.Value())
			}
		}
	case *array.Dictionary:
		 := .(*array.BinaryDictionaryBuilder)
		switch dict := .Dictionary().(type) {
		case *array.Binary:
			for  := 0;  < ; ++ {
				if .IsNull() {
					.AppendNull()
				} else {
					if  := .Append(.Value(.GetValueIndex()));  != nil {
						panic("failed to append to dictionary")
					}
				}
			}
		case *array.String:
			for  := 0;  < ; ++ {
				if .IsNull() {
					.AppendNull()
				} else {
					if  := .AppendString(.Value(.GetValueIndex()));  != nil {
						panic("failed to append to dictionary")
					}
				}
			}
		default:
			panic(fmt.Sprintf("unsupported dictionary type: %T", ))
		}
	default:
		panic(fmt.Sprintf("unsupported array type: %T", ))
	}
}

// repeatLastValue repeat's arr's last value count times and writes it to
// builder.
func repeatLastValue(
	 builder.ColumnBuilder,
	 arrow.Array,
	 int,
) {
	switch arr := .(type) {
	case *array.Boolean:
		repeatBooleanArray(.(*array.BooleanBuilder), , )
	case *array.Binary:
		repeatBinaryArray(.(*array.BinaryBuilder), , )
	case *array.Int64:
		repeatInt64Array(.(*array.Int64Builder), , )
	case *array.Uint64:
		repeatUint64Array(.(*array.Uint64Builder), , )
	case *array.Float64:
		repeatFloat64Array(.(*array.Float64Builder), , )
	case *array.Dictionary:
		repeatDictionaryArray(.(array.DictionaryBuilder), , )
	default:
		panic(fmt.Sprintf("unsupported array type: %T", ))
	}
}

func repeatBooleanArray(
	 *array.BooleanBuilder,
	 *array.Boolean,
	 int,
) {
	 := .Value(.Len() - 1)
	 := make([]bool, )
	for  := 0;  < ; ++ {
		[] = 
	}
	// TODO(asubiotto): are we ignoring a possible null?
	.AppendValues(, nil)
}

func repeatBinaryArray(
	 *array.BinaryBuilder,
	 *array.Binary,
	 int,
) {
	 := .Value(.Len() - 1)
	 := make([][]byte, )
	for  := 0;  < ; ++ {
		[] = 
	}
	.AppendValues(, nil)
}

func repeatInt64Array(
	 *array.Int64Builder,
	 *array.Int64,
	 int,
) {
	 := .Value(.Len() - 1)
	 := make([]int64, )
	for  := 0;  < ; ++ {
		[] = 
	}
	.AppendValues(, nil)
}

func repeatUint64Array(
	 *array.Uint64Builder,
	 *array.Uint64,
	 int,
) {
	 := .Value(.Len() - 1)
	 := make([]uint64, )
	for  := 0;  < ; ++ {
		[] = 
	}
	.AppendValues(, nil)
}

func repeatFloat64Array(
	 *array.Float64Builder,
	 *array.Float64,
	 int,
) {
	 := .Value(.Len() - 1)
	 := make([]float64, )
	for  := 0;  < ; ++ {
		[] = 
	}
	.AppendValues(, nil)
}

func repeatDictionaryArray( array.DictionaryBuilder,  *array.Dictionary,  int) {
	switch db := .(type) {
	case *array.BinaryDictionaryBuilder:
		switch dict := .Dictionary().(type) {
		case *array.Binary:
			if .IsNull(.Len() - 1) {
				for  := 0;  < ; ++ {
					.AppendNull()
				}
			} else {
				 := .Value(.GetValueIndex(.Len() - 1))
				for  := 0;  < ; ++ {
					if  := .Append();  != nil {
						panic("failed to append value to dict")
					}
				}
			}
		default:
			panic(fmt.Sprintf("unsuported dictionary array for builder %T", ))
		}
	default:
		panic(fmt.Sprintf("unsuported dictionary builder %T", ))
	}
}

func mergeArrowSchemas( []*arrow.Schema) *arrow.Schema {
	 := make([]string, 0, 16)
	 := make(map[string]arrow.Field)

	for ,  := range  {
		for  := 0;  < .NumFields(); ++ {
			 := .Field()
			if ,  := [.Name]; ! {
				 = append(, .Name)
				[.Name] = 
			}
		}
	}

	sort.Strings()

	 := make([]arrow.Field, 0, len())
	for ,  := range  {
		 = append(, [])
	}

	return arrow.NewSchema(, nil)
}

type MultiColumnWriter struct {
	writer   writer.ValueWriter
	fieldIdx int
	colIdx   []int
}

func ( int,  []MultiColumnWriter) writer.ValueWriter {
	for ,  := range  {
		for ,  := range .colIdx {
			if  ==  {
				return .writer
			}
		}
	}

	return nil
}

// Project will project the record according to the given projections.
func ( arrow.Record,  []logicalplan.Expr) arrow.Record {
	if len() == 0 {
		.Retain() // NOTE: we're creating another reference to this record, so retain it
		return 
	}

	 := make([]arrow.Array, 0, .Schema().NumFields())
	 := make([]arrow.Field, 0, .Schema().NumFields())
	for  := 0;  < .Schema().NumFields(); ++ {
		for ,  := range  {
			if .MatchColumn(.Schema().Field().Name) {
				 = append(, .Column())
				 = append(, .Schema().Field())
				break
			}
		}
	}

	// If the projection matches the entire record, return the record as is.
	if len() == .Schema().NumFields() {
		.Retain() // NOTE: we're creating another reference to this record, so retain it
		return 
	}

	return array.NewRecord(arrow.NewSchema(, nil), , .NumRows())
}