package physicalplan
Import Path
github.com/polarsignals/frostdb/query/physicalplan (on go.dev)
Dependency Relation
imports 30 packages, and imported by one package
Involved Source Files
aggregate.go
binaryscalarexpr.go
distinct.go
filter.go
limit.go
ordered_aggregate.go
ordered_synchronizer.go
physicalplan.go
planordering.go
project.go
regexpfilter.go
sampler.go
synchronize.go
Package-Level Type Names (total 36)
Aggregation groups together some lower level primitives to for the column to be aggregated by its function.
func NewHashAggregate(pool memory.Allocator, tracer trace.Tracer, aggregations []Aggregation, groupByColumnMatchers []logicalplan.Expr, seed maphash.Seed, finalStage bool) *HashAggregate
func NewOrderedAggregate(pool memory.Allocator, tracer trace.Tracer, aggregation Aggregation, groupByColumnMatchers []logicalplan.Expr, finalStage bool) *OrderedAggregate
( AggregationFunction) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)
*AndAggregation
*CountAggregation
*MaxAggregation
*MinAggregation
*SumAggregation
*UniqueAggregation
(*AndAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)
*AndAggregation : AggregationFunction
Left BooleanExpression
Right BooleanExpression
(*AndExpr) Eval(r arrow.Record) (*Bitmap, error)
(*AndExpr) String() string
*AndExpr : BooleanExpression
*AndExpr : expvar.Var
*AndExpr : fmt.Stringer
ColumnName string
(*ArrayRef) ArrowArray(r arrow.Record) (arrow.Array, bool, error)
(*ArrayRef) String() string
*ArrayRef : expvar.Var
*ArrayRef : fmt.Stringer
type ArrayReference (struct)
Left *ArrayRef
Op logicalplan.Op
Right scalar.Scalar
( BinaryScalarExpr) Eval(r arrow.Record) (*Bitmap, error)
( BinaryScalarExpr) String() string
BinaryScalarExpr : BooleanExpression
BinaryScalarExpr : expvar.Var
BinaryScalarExpr : fmt.Stringer
( BooleanExpression) Eval(r arrow.Record) (*Bitmap, error)
( BooleanExpression) String() string
*AndExpr
BinaryScalarExpr
*OrExpr
*RegExpFilter
BooleanExpression : expvar.Var
BooleanExpression : fmt.Stringer
(*CountAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)
*CountAggregation : AggregationFunction
Child *Diagram
Details string
(*Diagram) String() string
*Diagram : expvar.Var
*Diagram : fmt.Stringer
func (*Distinction).Draw() *Diagram
func (*HashAggregate).Draw() *Diagram
func (*Limiter).Draw() *Diagram
func (*OrderedAggregate).Draw() *Diagram
func (*OrderedSynchronizer).Draw() *Diagram
func (*OutputPlan).Draw() *Diagram
func PhysicalPlan.Draw() *Diagram
func (*PredicateFilter).Draw() *Diagram
func (*Projection).Draw() *Diagram
func (*ReservoirSampler).Draw() *Diagram
func ScanPhysicalPlan.Draw() *Diagram
func (*SchemaScan).Draw() *Diagram
func (*Synchronizer).Draw() *Diagram
func (*TableScan).Draw() *Diagram
(*Distinction) Callback(ctx context.Context, r arrow.Record) error
(*Distinction) Close()
(*Distinction) Draw() *Diagram
(*Distinction) Finish(ctx context.Context) error
(*Distinction) SetNext(plan PhysicalPlan)
*Distinction : PhysicalPlan
func Distinct(pool memory.Allocator, tracer trace.Tracer, columns []logicalplan.Expr) *Distinction
(*HashAggregate) Callback(_ context.Context, r arrow.Record) error
(*HashAggregate) Close()
(*HashAggregate) Draw() *Diagram
(*HashAggregate) Finish(ctx context.Context) error
(*HashAggregate) SetNext(next PhysicalPlan)
*HashAggregate : PhysicalPlan
func NewHashAggregate(pool memory.Allocator, tracer trace.Tracer, aggregations []Aggregation, groupByColumnMatchers []logicalplan.Expr, seed maphash.Seed, finalStage bool) *HashAggregate
(*Limiter) Callback(ctx context.Context, r arrow.Record) error
(*Limiter) Close()
(*Limiter) Draw() *Diagram
(*Limiter) Finish(ctx context.Context) error
(*Limiter) SetNext(next PhysicalPlan)
*Limiter : PhysicalPlan
func Limit(pool memory.Allocator, tracer trace.Tracer, expr logicalplan.Expr) (*Limiter, error)
(*MaxAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)
*MaxAggregation : AggregationFunction
(*MinAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)
*MinAggregation : AggregationFunction
func WithOrderedAggregations() Option
func WithOverrideInput(input []PhysicalPlan) Option
func WithReadMode(m logicalplan.ReadMode) Option
func Build(ctx context.Context, pool memory.Allocator, tracer trace.Tracer, s *dynparquet.Schema, plan *logicalplan.LogicalPlan, options ...Option) (*OutputPlan, error)
OrderedAggregate is an aggregation operator that supports aggregations on
streams of data ordered by the group by columns. This is a more efficient
aggregation than aggregating by hash since a group can be determined as
completed once a different aggregation key is found in the ordered stream.
OrderedAggregate also supports partially ordered aggregations. This means
aggregating on keys that arrive in ordered sets of data that are not mutually
exclusive. For example consider the group by columns: a, a, b, c, a, b, c.
The OrderedAggregate will perform the aggregation on the first ordered set
a, a, b, c and another one on the second a, b, c. The result of both
aggregations is merged. Specifically, if the example is pushed to Callback
in two records (a, a, b, c) followed by (a, b, c), and assuming that the
aggregation values for each row are 1 for simplicity and we're using a sum
aggregation, after the first call to Callback the OrderedAggregate will store
[a, b, c], [2, 1, 1] but not emit anything. When the second record is pushed,
the OrderedAggregate will realize that the first value in the new record (a)
sorts before the "current group" (c), so will store the aggregation results
of the second record as another ordered group [a, b, c], [1, 1, 1]. Only when
Finish is called, will the OrderedAggregate be able to emit the merged
aggregation results. The merged results should be: [a, b, c], [3, 2, 2].
(*OrderedAggregate) Callback(_ context.Context, r arrow.Record) error
(*OrderedAggregate) Close()
(*OrderedAggregate) Draw() *Diagram
(*OrderedAggregate) Finish(ctx context.Context) error
(*OrderedAggregate) SetNext(next PhysicalPlan)
*OrderedAggregate : PhysicalPlan
func NewOrderedAggregate(pool memory.Allocator, tracer trace.Tracer, aggregation Aggregation, groupByColumnMatchers []logicalplan.Expr, finalStage bool) *OrderedAggregate
OrderedSynchronizer implements synchronizing ordered input from multiple
goroutines. The strategy used is that any input that calls Callback must wait
for all the other inputs to call Callback, since an ordered result cannot
be produced until all inputs have pushed data. Another strategy would be to
store the pushed records, but that requires fully copying all the data for
safety.
(*OrderedSynchronizer) Callback(ctx context.Context, r arrow.Record) error
(*OrderedSynchronizer) Close()
(*OrderedSynchronizer) Draw() *Diagram
(*OrderedSynchronizer) Finish(ctx context.Context) error
(*OrderedSynchronizer) SetNext(next PhysicalPlan)
*OrderedSynchronizer : PhysicalPlan
func NewOrderedSynchronizer(pool memory.Allocator, inputs int, orderByExprs []logicalplan.Expr) *OrderedSynchronizer
Left BooleanExpression
Right BooleanExpression
(*OrExpr) Eval(r arrow.Record) (*Bitmap, error)
(*OrExpr) String() string
*OrExpr : BooleanExpression
*OrExpr : expvar.Var
*OrExpr : fmt.Stringer
(*OutputPlan) Callback(ctx context.Context, r arrow.Record) error
(*OutputPlan) Close()
(*OutputPlan) Draw() *Diagram
(*OutputPlan) DrawString() string
(*OutputPlan) Execute(ctx context.Context, pool memory.Allocator, callback func(ctx context.Context, r arrow.Record) error) error
(*OutputPlan) Finish(_ context.Context) error
(*OutputPlan) SetNext(_ PhysicalPlan)
(*OutputPlan) SetNextCallback(next func(ctx context.Context, r arrow.Record) error)
*OutputPlan : PhysicalPlan
func Build(ctx context.Context, pool memory.Allocator, tracer trace.Tracer, s *dynparquet.Schema, plan *logicalplan.LogicalPlan, options ...Option) (*OutputPlan, error)
( PhysicalPlan) Callback(ctx context.Context, r arrow.Record) error
( PhysicalPlan) Close()
( PhysicalPlan) Draw() *Diagram
( PhysicalPlan) Finish(ctx context.Context) error
( PhysicalPlan) SetNext(next PhysicalPlan)
*Distinction
*HashAggregate
*Limiter
*OrderedAggregate
*OrderedSynchronizer
*OutputPlan
*PredicateFilter
*Projection
*ReservoirSampler
*Synchronizer
func Aggregate(pool memory.Allocator, tracer trace.Tracer, agg *logicalplan.Aggregation, final bool, ordered bool, seed maphash.Seed) (PhysicalPlan, error)
func WithOverrideInput(input []PhysicalPlan) Option
func (*Distinction).SetNext(plan PhysicalPlan)
func (*HashAggregate).SetNext(next PhysicalPlan)
func (*Limiter).SetNext(next PhysicalPlan)
func (*OrderedAggregate).SetNext(next PhysicalPlan)
func (*OrderedSynchronizer).SetNext(next PhysicalPlan)
func (*OutputPlan).SetNext(_ PhysicalPlan)
func PhysicalPlan.SetNext(next PhysicalPlan)
func (*PredicateFilter).SetNext(next PhysicalPlan)
func (*Projection).SetNext(next PhysicalPlan)
func (*ReservoirSampler).SetNext(p PhysicalPlan)
func (*Synchronizer).SetNext(next PhysicalPlan)
func (*Synchronizer).SetNextPlan(nextPlan PhysicalPlan)
( PostPlanVisitorFunc) PostVisit(plan *logicalplan.LogicalPlan) bool
( PostPlanVisitorFunc) PreVisit(_ *logicalplan.LogicalPlan) bool
PostPlanVisitorFunc : github.com/polarsignals/frostdb/query/logicalplan.PlanVisitor
(*PredicateFilter) Callback(ctx context.Context, r arrow.Record) error
(*PredicateFilter) Close()
(*PredicateFilter) Draw() *Diagram
(*PredicateFilter) Finish(ctx context.Context) error
(*PredicateFilter) SetNext(next PhysicalPlan)
*PredicateFilter : PhysicalPlan
func Filter(pool memory.Allocator, tracer trace.Tracer, filterExpr logicalplan.Expr) (*PredicateFilter, error)
( PreExprVisitorFunc) PostVisit(_ logicalplan.Expr) bool
( PreExprVisitorFunc) PreVisit(expr logicalplan.Expr) bool
( PreExprVisitorFunc) Visit(_ logicalplan.Expr) bool
PreExprVisitorFunc : github.com/polarsignals/frostdb/query/logicalplan.Visitor
( PrePlanVisitorFunc) PostVisit(_ *logicalplan.LogicalPlan) bool
( PrePlanVisitorFunc) PreVisit(plan *logicalplan.LogicalPlan) bool
PrePlanVisitorFunc : github.com/polarsignals/frostdb/query/logicalplan.PlanVisitor
(*Projection) Callback(ctx context.Context, r arrow.Record) error
(*Projection) Close()
(*Projection) Draw() *Diagram
(*Projection) Finish(ctx context.Context) error
(*Projection) Project(_ context.Context, r arrow.Record) (arrow.Record, error)
(*Projection) SetNext(next PhysicalPlan)
*Projection : PhysicalPlan
func Project(mem memory.Allocator, tracer trace.Tracer, exprs []logicalplan.Expr) (*Projection, error)
(*RegExpFilter) Eval(r arrow.Record) (*Bitmap, error)
(*RegExpFilter) String() string
*RegExpFilter : BooleanExpression
*RegExpFilter : expvar.Var
*RegExpFilter : fmt.Stringer
Callback collects all the records to sample.
(*ReservoirSampler) Close()
(*ReservoirSampler) Draw() *Diagram
Finish sends all the records in the reservoir to the next operator.
(*ReservoirSampler) SetNext(p PhysicalPlan)
*ReservoirSampler : PhysicalPlan
func NewReservoirSampler(size, limit int64, allocator memory.Allocator) *ReservoirSampler
( ScanPhysicalPlan) Draw() *Diagram
( ScanPhysicalPlan) Execute(ctx context.Context, pool memory.Allocator) error
*SchemaScan
*TableScan
(*SchemaScan) Draw() *Diagram
(*SchemaScan) Execute(ctx context.Context, pool memory.Allocator) error
*SchemaScan : ScanPhysicalPlan
(*SumAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)
*SumAggregation : AggregationFunction
Synchronizer is used to combine the results of multiple parallel streams
into a single stream concurrent stream. It also forms a barrier on the
finishers, by waiting to call next plan's finish until all previous parallel
stages have finished.
(*Synchronizer) Callback(ctx context.Context, r arrow.Record) error
(*Synchronizer) Close()
(*Synchronizer) Draw() *Diagram
(*Synchronizer) Finish(ctx context.Context) error
(*Synchronizer) SetNext(next PhysicalPlan)
(*Synchronizer) SetNextPlan(nextPlan PhysicalPlan)
*Synchronizer : PhysicalPlan
func Synchronize(concurrency int) *Synchronizer
Package-Level Functions (total 46)
func Aggregate(pool memory.Allocator, tracer trace.Tracer, agg *logicalplan.Aggregation, final bool, ordered bool, seed maphash.Seed) (PhysicalPlan, error) func BinaryDictionaryArrayScalarRegexMatch(dict *array.Dictionary, left *array.Binary, right *regexp.Regexp) (*Bitmap, error) func BinaryDictionaryArrayScalarRegexNotMatch(dict *array.Dictionary, left *array.Binary, right *regexp.Regexp) (*Bitmap, error) func BinaryScalarOperation(left arrow.Array, right scalar.Scalar, operator logicalplan.Op) (*Bitmap, error) func Build(ctx context.Context, pool memory.Allocator, tracer trace.Tracer, s *dynparquet.Schema, plan *logicalplan.LogicalPlan, options ...Option) (*OutputPlan, error) func DictionaryArrayScalarContains(left *array.Dictionary, right scalar.Scalar, not bool) (*Bitmap, error) func DictionaryArrayScalarEqual(left *array.Dictionary, right scalar.Scalar) (*Bitmap, error) func DictionaryArrayScalarNotEqual(left *array.Dictionary, right scalar.Scalar) (*Bitmap, error) func Distinct(pool memory.Allocator, tracer trace.Tracer, columns []logicalplan.Expr) *Distinction func Filter(pool memory.Allocator, tracer trace.Tracer, filterExpr logicalplan.Expr) (*PredicateFilter, error) func NewHashAggregate(pool memory.Allocator, tracer trace.Tracer, aggregations []Aggregation, groupByColumnMatchers []logicalplan.Expr, seed maphash.Seed, finalStage bool) *HashAggregate func NewOrderedAggregate(pool memory.Allocator, tracer trace.Tracer, aggregation Aggregation, groupByColumnMatchers []logicalplan.Expr, finalStage bool) *OrderedAggregate func NewOrderedSynchronizer(pool memory.Allocator, inputs int, orderByExprs []logicalplan.Expr) *OrderedSynchronizer
NewReservoirSampler will create a new ReservoirSampler operator that will sample up to size rows of all records seen by Callback.
func Project(mem memory.Allocator, tracer trace.Tracer, exprs []logicalplan.Expr) (*Projection, error) func Synchronize(concurrency int) *Synchronizer func WithOrderedAggregations() Option
WithOverrideInput can be used to provide an input stage on top of which the
Build function can build the physical plan.
func WithReadMode(m logicalplan.ReadMode) Option
Package-Level Variables (total 7)
![]() |
The pages are generated with Golds v0.8.2. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |