package physicalplan

import (
	
	
	
	

	
)

// Synchronizer is used to combine the results of multiple parallel streams
// into a single stream concurrent stream. It also forms a barrier on the
// finishers, by waiting to call next plan's finish until all previous parallel
// stages have finished.
type Synchronizer struct {
	next    PhysicalPlan
	nextMtx sync.Mutex
	running *atomic.Int64
	open    *atomic.Int64
}

func ( int) *Synchronizer {
	 := &atomic.Int64{}
	.Add(int64())
	 := &atomic.Int64{}
	.Add(int64())
	return &Synchronizer{running: , open: }
}

func ( *Synchronizer) ( context.Context,  arrow.Record) error {
	// multiple threads can emit the results to the next step, but they will do
	// it synchronously
	.nextMtx.Lock()
	defer .nextMtx.Unlock()

	 := .next.Callback(, )
	if  != nil {
		return 
	}
	return nil
}

func ( *Synchronizer) ( context.Context) error {
	 := .running.Add(-1)
	if  < 0 {
		return errors.New("too many Synchronizer Finish calls")
	}
	if  > 0 {
		return nil
	}
	return .next.Finish()
}

func ( *Synchronizer) ( PhysicalPlan) {
	.next = 
}

func ( *Synchronizer) ( PhysicalPlan) {
	.next = 
}

func ( *Synchronizer) () *Diagram {
	return &Diagram{Details: "Synchronizer", Child: .next.Draw()}
}

func ( *Synchronizer) () {
	 := .open.Add(-1)
	if  < 0 {
		panic("too many Synchronizer Close calls")
	}
	if  > 0 {
		return
	}
	.next.Close()
}