package physicalplan

import (
	
	
	
	
	

	
	
	
	
	

	
	
	
)

type Distinction struct {
	pool     memory.Allocator
	tracer   trace.Tracer
	next     PhysicalPlan
	columns  []logicalplan.Expr
	hashSeed maphash.Seed

	mtx  *sync.RWMutex
	seen map[uint64]struct{}
}

func ( *Distinction) () *Diagram {
	var  *Diagram
	if .next != nil {
		 = .next.Draw()
	}

	var  []string
	for ,  := range .columns {
		 = append(, .Name())
	}

	return &Diagram{Details: fmt.Sprintf("Distinction (%s)", strings.Join(, ",")), Child: }
}

func ( memory.Allocator,  trace.Tracer,  []logicalplan.Expr) *Distinction {
	return &Distinction{
		pool:     ,
		tracer:   ,
		columns:  ,
		hashSeed: maphash.MakeSeed(),

		mtx:  &sync.RWMutex{},
		seen: make(map[uint64]struct{}),
	}
}

func ( *Distinction) ( PhysicalPlan) {
	.next = 
}

func ( *Distinction) ( context.Context) error {
	return .next.Finish()
}

func ( *Distinction) () {
	.next.Close()
}

func ( *Distinction) ( context.Context,  arrow.Record) error {
	// Generates high volume of spans. Comment out if needed during development.
	// ctx, span := d.tracer.Start(ctx, "Distinction/Callback")
	// defer span.End()

	 := make([]arrow.Field, 0, 10)
	 := make([]uint64, 0, 10)
	 := make([]arrow.Array, 0, 10)

	for  := 0;  < .Schema().NumFields(); ++ {
		 := .Schema().Field()
		for ,  := range .columns {
			if .MatchColumn(.Name) {
				 = append(, )
				 = append(, scalar.Hash(.hashSeed, scalar.NewStringScalar(.Name)))
				 = append(, .Column())
			}
		}
	}

	 := make([]builder.ColumnBuilder, 0, len())
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()
	for ,  := range  {
		 = append(, builder.NewBuilder(.pool, .DataType()))
	}
	 := int64(0)

	 := int(.NumRows())

	 := make([][]uint64, len())
	for ,  := range  {
		[] = dynparquet.HashArray()
	}

	for  := 0;  < ; ++ {
		 := uint64(0)
		for  := range  {
			if [][] == 0 {
				continue
			}

			 = hashCombine(
				,
				hashCombine(
					[],
					[][],
				),
			)
		}

		.mtx.RLock()
		if ,  := .seen[];  {
			.mtx.RUnlock()
			continue
		}
		.mtx.RUnlock()

		for ,  := range  {
			 := builder.AppendValue([], , )
			if  != nil {
				return 
			}
		}

		++
		.mtx.Lock()
		.seen[] = struct{}{}
		.mtx.Unlock()
	}

	if  == 0 {
		// No need to call anything further down the chain, no new values were
		// seen so we can skip.
		return nil
	}

	 := make([]arrow.Array, 0, len())
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()
	for ,  := range  {
		 = append(, .NewArray())
	}

	 := arrow.NewSchema(, nil)

	 := array.NewRecord(
		,
		,
		,
	)

	defer .Release()
	return .next.Callback(, )
}