package physicalplan

import (
	
	
	
	

	
	
	

	
	
	
)

// OrderedSynchronizer implements synchronizing ordered input from multiple
// goroutines. The strategy used is that any input that calls Callback must wait
// for all the other inputs to call Callback, since an ordered result cannot
// be produced until all inputs have pushed data. Another strategy would be to
// store the pushed records, but that requires fully copying all the data for
// safety.
type OrderedSynchronizer struct {
	pool         memory.Allocator
	orderByExprs []logicalplan.Expr
	orderByCols  []arrowutils.SortingColumn

	sync struct {
		mtx        sync.Mutex
		lastSchema *arrow.Schema
		data       []arrow.Record
		// inputsWaiting is an integer that keeps track of the number of inputs
		// waiting on the wait channel. It cannot be an atomic because it
		// sometimes needs to be compared to inputsRunning.
		inputsWaiting int
		// inputsRunning is an integer that keeps track of the number of inputs
		// that have not called Finish yet.
		inputsRunning int
	}
	wait chan struct{}
	next PhysicalPlan
}

func ( memory.Allocator,  int,  []logicalplan.Expr) *OrderedSynchronizer {
	 := &OrderedSynchronizer{
		pool:         ,
		orderByExprs: ,
		wait:         make(chan struct{}),
	}
	.sync.inputsRunning = 
	return 
}

func ( *OrderedSynchronizer) () {
	.next.Close()
}

func ( *OrderedSynchronizer) ( context.Context,  arrow.Record) error {
	.sync.mtx.Lock()
	.sync.data = append(.sync.data, )
	.sync.inputsWaiting++
	 := .sync.inputsWaiting
	 := .sync.inputsRunning
	.sync.mtx.Unlock()

	if  !=  {
		select {
		case <-.wait:
			return nil
		case <-.Done():
			return .Err()
		}
	}

	.sync.mtx.Lock()
	defer .sync.mtx.Unlock()
	.sync.inputsWaiting--
	// This is the last input to call Callback, merge the records.
	,  := .mergeRecordsLocked()
	if  != nil {
		return 
	}

	// Note that we hold the mutex while calling Callback because we want to
	// ensure that Callback is called in an ordered fashion since we could race
	// with a call to Callback in Finish.
	return .next.Callback(, )
}

func ( *OrderedSynchronizer) ( context.Context) error {
	.sync.mtx.Lock()
	defer .sync.mtx.Unlock()
	.sync.inputsRunning--
	 := .sync.inputsRunning
	if  > 0 &&  == .sync.inputsWaiting {
		// All other goroutines are currently waiting to be woken up. We need to
		// merge the records.
		,  := .mergeRecordsLocked()
		if  != nil {
			return 
		}
		return .next.Callback(, )
	}
	if  < 0 {
		return errors.New("too many OrderedSynchronizer Finish calls")
	}
	if  > 0 {
		return nil
	}
	return .next.Finish()
}

// mergeRecordsLocked must be called while holding o.sync.mtx. It merges the
// records found in o.sync.data and unblocks all the inputs waiting on o.wait.
func ( *OrderedSynchronizer) () (arrow.Record, error) {
	if  := .ensureSameSchema(.sync.data);  != nil {
		return nil, 
	}
	,  := arrowutils.MergeRecords(.pool, .sync.data, .orderByCols, 0)
	if  != nil {
		return nil, 
	}
	// Now that the records have been merged, we can wake up the other input
	// goroutines. Since we have exactly o.sync.inputsWaiting waiting on the
	// channel, send the corresponding number of messages. Since we are also
	// holding the records mutex during this broadcast, fast inputs won't be
	// able to re-enter Callback until the mutex is released, so won't
	// mistakenly read another input's signal.
	for  := 0;  < .sync.inputsWaiting; ++ {
		.wait <- struct{}{}
	}
	// Reset inputsWaiting.
	.sync.inputsWaiting = 0
	.sync.data = .sync.data[:0]
	return , nil
}

// ensureSameSchema ensures that all the records have the same schema. In cases
// where the schema is not equal, virtual null columns are inserted in the
// records with the missing column. When we have static schemas in the execution
// engine, steps like these should be unnecessary.
func ( *OrderedSynchronizer) ( []arrow.Record) error {
	var  bool
	for  := range  {
		if ![].Schema().Equal(.sync.lastSchema) {
			 = true
			break
		}
	}
	if ! {
		return nil
	}

	 := make([]map[string]arrow.Field, len(.orderByExprs))
	 := make(map[string]arrow.Field)
	for ,  := range .orderByExprs {
		[] = make(map[string]arrow.Field)
		for ,  := range  {
			for  := 0;  < .Schema().NumFields(); ++ {
				 := .Schema().Field()
				if  := .MatchColumn(.Name);  {
					[][.Name] = 
				} else {
					[.Name] = 
				}
			}
		}
	}

	 := make([]arrow.Field, 0, len())
	for ,  := range  {
		if len() == 0 {
			// An expected order by field is missing from the records, this
			// field will just be considered to be null.
			continue
		}

		if len() == 1 {
			for ,  := range  {
				 = append(, )
			}
			continue
		}
		// These columns are dynamic columns and should be merged to follow
		// the physical sort order.
		 := make([]string, 0, len())
		for  := range  {
			 = append(, )
		}
		// MergeDeduplicatedDynCols will return the dynamic column names in
		// the order that they sort physically.
		for ,  := range dynparquet.MergeDeduplicatedDynCols() {
			 = append(, [])
		}
	}

	.orderByCols = .orderByCols[:0]
	for  := range  {
		.orderByCols = append(.orderByCols, arrowutils.SortingColumn{Index: })
	}

	for ,  := range  {
		 = append(, )
	}

	// This is the schema that all records must respect in order to be merged.
	 := arrow.NewSchema(, nil)

	for  := range  {
		 := [].Schema()
		if .Equal([].Schema()) {
			continue
		}

		var  []arrow.Array
		for  := 0;  < .NumFields(); ++ {
			 := .Field()
			if  := .FieldIndices(.Name);  != nil {
				if len() > 1 {
					,  := .FieldsByName(.Name)
					return fmt.Errorf(
						"found multiple fields %v for name %s",
						,
						.Name,
					)
				}
				 = append(, [].Column([0]))
			} else {
				// Note that this VirtualNullArray will be read from, but the
				// merged output will be a physical null array, so there is no
				// virtual->physical conversion necessary before we return data.
				 = append(, arrowutils.MakeVirtualNullArray(.Type, int([].NumRows())))
			}
		}

		[] = array.NewRecord(, , [].NumRows())
	}
	.sync.lastSchema = 
	return nil
}

func ( *OrderedSynchronizer) ( PhysicalPlan) {
	.next = 
}

func ( *OrderedSynchronizer) () *Diagram {
	return &Diagram{Details: "OrderedSynchronizer", Child: .next.Draw()}
}