package physicalplan
import (
"context"
"errors"
"fmt"
"sync"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/polarsignals/frostdb/dynparquet"
"github.com/polarsignals/frostdb/pqarrow/arrowutils"
"github.com/polarsignals/frostdb/query/logicalplan"
)
type OrderedSynchronizer struct {
pool memory .Allocator
orderByExprs []logicalplan .Expr
orderByCols []arrowutils .SortingColumn
sync struct {
mtx sync .Mutex
lastSchema *arrow .Schema
data []arrow .Record
inputsWaiting int
inputsRunning int
}
wait chan struct {}
next PhysicalPlan
}
func NewOrderedSynchronizer (pool memory .Allocator , inputs int , orderByExprs []logicalplan .Expr ) *OrderedSynchronizer {
o := &OrderedSynchronizer {
pool : pool ,
orderByExprs : orderByExprs ,
wait : make (chan struct {}),
}
o .sync .inputsRunning = inputs
return o
}
func (o *OrderedSynchronizer ) Close () {
o .next .Close ()
}
func (o *OrderedSynchronizer ) Callback (ctx context .Context , r arrow .Record ) error {
o .sync .mtx .Lock ()
o .sync .data = append (o .sync .data , r )
o .sync .inputsWaiting ++
inputsWaiting := o .sync .inputsWaiting
inputsRunning := o .sync .inputsRunning
o .sync .mtx .Unlock ()
if inputsWaiting != inputsRunning {
select {
case <- o .wait :
return nil
case <- ctx .Done ():
return ctx .Err ()
}
}
o .sync .mtx .Lock ()
defer o .sync .mtx .Unlock ()
o .sync .inputsWaiting --
mergedRecord , err := o .mergeRecordsLocked ()
if err != nil {
return err
}
return o .next .Callback (ctx , mergedRecord )
}
func (o *OrderedSynchronizer ) Finish (ctx context .Context ) error {
o .sync .mtx .Lock ()
defer o .sync .mtx .Unlock ()
o .sync .inputsRunning --
running := o .sync .inputsRunning
if running > 0 && running == o .sync .inputsWaiting {
mergedRecord , err := o .mergeRecordsLocked ()
if err != nil {
return err
}
return o .next .Callback (ctx , mergedRecord )
}
if running < 0 {
return errors .New ("too many OrderedSynchronizer Finish calls" )
}
if running > 0 {
return nil
}
return o .next .Finish (ctx )
}
func (o *OrderedSynchronizer ) mergeRecordsLocked () (arrow .Record , error ) {
if err := o .ensureSameSchema (o .sync .data ); err != nil {
return nil , err
}
mergedRecord , err := arrowutils .MergeRecords (o .pool , o .sync .data , o .orderByCols , 0 )
if err != nil {
return nil , err
}
for i := 0 ; i < o .sync .inputsWaiting ; i ++ {
o .wait <- struct {}{}
}
o .sync .inputsWaiting = 0
o .sync .data = o .sync .data [:0 ]
return mergedRecord , nil
}
func (o *OrderedSynchronizer ) ensureSameSchema (records []arrow .Record ) error {
var needSchemaRecalculation bool
for i := range records {
if !records [i ].Schema ().Equal (o .sync .lastSchema ) {
needSchemaRecalculation = true
break
}
}
if !needSchemaRecalculation {
return nil
}
orderCols := make ([]map [string ]arrow .Field , len (o .orderByExprs ))
leftoverCols := make (map [string ]arrow .Field )
for i , orderCol := range o .orderByExprs {
orderCols [i ] = make (map [string ]arrow .Field )
for _ , r := range records {
for j := 0 ; j < r .Schema ().NumFields (); j ++ {
field := r .Schema ().Field (j )
if ok := orderCol .MatchColumn (field .Name ); ok {
orderCols [i ][field .Name ] = field
} else {
leftoverCols [field .Name ] = field
}
}
}
}
newFields := make ([]arrow .Field , 0 , len (orderCols ))
for _ , colsFound := range orderCols {
if len (colsFound ) == 0 {
continue
}
if len (colsFound ) == 1 {
for _ , field := range colsFound {
newFields = append (newFields , field )
}
continue
}
colNames := make ([]string , 0 , len (colsFound ))
for name := range colsFound {
colNames = append (colNames , name )
}
for _ , name := range dynparquet .MergeDeduplicatedDynCols (colNames ) {
newFields = append (newFields , colsFound [name ])
}
}
o .orderByCols = o .orderByCols [:0 ]
for i := range newFields {
o .orderByCols = append (o .orderByCols , arrowutils .SortingColumn {Index : i })
}
for _ , field := range leftoverCols {
newFields = append (newFields , field )
}
schema := arrow .NewSchema (newFields , nil )
for i := range records {
otherSchema := records [i ].Schema ()
if schema .Equal (records [i ].Schema ()) {
continue
}
var columns []arrow .Array
for j := 0 ; j < schema .NumFields (); j ++ {
field := schema .Field (j )
if otherFields := otherSchema .FieldIndices (field .Name ); otherFields != nil {
if len (otherFields ) > 1 {
fieldsFound , _ := otherSchema .FieldsByName (field .Name )
return fmt .Errorf (
"found multiple fields %v for name %s" ,
fieldsFound ,
field .Name ,
)
}
columns = append (columns , records [i ].Column (otherFields [0 ]))
} else {
columns = append (columns , arrowutils .MakeVirtualNullArray (field .Type , int (records [i ].NumRows ())))
}
}
records [i ] = array .NewRecord (schema , columns , records [i ].NumRows ())
}
o .sync .lastSchema = schema
return nil
}
func (o *OrderedSynchronizer ) SetNext (next PhysicalPlan ) {
o .next = next
}
func (o *OrderedSynchronizer ) Draw () *Diagram {
return &Diagram {Details : "OrderedSynchronizer" , Child : o .next .Draw ()}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .