package physicalplan

import (
	
	

	
	
	
	
	

	
	
)

type Limiter struct {
	pool   memory.Allocator
	tracer trace.Tracer
	next   PhysicalPlan

	count uint64
}

func ( memory.Allocator,  trace.Tracer,  logicalplan.Expr) (*Limiter, error) {
	,  := .(*logicalplan.LiteralExpr)
	if ! {
		return nil, fmt.Errorf("expected literal expression, got %T", )
	}

	var  uint64
	switch v := .Value.(type) {
	case *scalar.Uint64:
		 = .Value
	case *scalar.Int64:
		 = uint64(.Value)
	default:
		return nil, fmt.Errorf("expected limit count type, got %T", )
	}

	return &Limiter{
		pool:   ,
		tracer: ,
		count:  ,
	}, nil
}

func ( *Limiter) ( PhysicalPlan) { .next =  }

func ( *Limiter) ( context.Context) error { return .next.Finish() }

func ( *Limiter) () { .next.Close() }

func ( *Limiter) () *Diagram {
	var  *Diagram
	if .next != nil {
		 = .next.Draw()
	}
	 := fmt.Sprintf("Limit(%d)", .count)
	return &Diagram{Details: , Child: }
}

func ( *Limiter) ( context.Context,  arrow.Record) error {
	if .NumRows() == 0 {
		return .next.Callback(, )
	}
	if .count == 0 {
		 := array.NewRecord(.Schema(), nil, 0)
		return .next.Callback(, )
	}

	if uint64(.NumRows()) <= .count {
		return .next.Callback(, )
	}

	// TODO: We should figure out a way to avoid copying the record here.
	// Maybe we can use a different approach to limit the record.

	 := array.NewInt32Builder(.pool)
	defer .Release()

	for  := int32(0);  < int32(.count); ++ {
		.Append()
	}
	 := .NewInt32Array()
	defer .Release()

	,  := arrowutils.Take(, , )
	if  != nil {
		return 
	}

	if  := .next.Callback(, );  != nil {
		return 
	}

	return nil
}