package physicalplan

import (
	
	
	
	

	
	
	
	

	
	
	
)

type groupColInfo struct {
	field arrow.Field
	arr   arrow.Array
}

// OrderedAggregate is an aggregation operator that supports aggregations on
// streams of data ordered by the group by columns. This is a more efficient
// aggregation than aggregating by hash since a group can be determined as
// completed once a different aggregation key is found in the ordered stream.
// OrderedAggregate also supports partially ordered aggregations. This means
// aggregating on keys that arrive in ordered sets of data that are not mutually
// exclusive. For example consider the group by columns: a, a, b, c, a, b, c.
// The OrderedAggregate will perform the aggregation on the first ordered set
// a, a, b, c and another one on the second a, b, c. The result of both
// aggregations is merged. Specifically, if the example is pushed to Callback
// in two records (a, a, b, c) followed by (a, b, c), and assuming that the
// aggregation values for each row are 1 for simplicity and we're using a sum
// aggregation, after the first call to Callback the OrderedAggregate will store
// [a, b, c], [2, 1, 1] but not emit anything. When the second record is pushed,
// the OrderedAggregate will realize that the first value in the new record (a)
// sorts before the "current group" (c), so will store the aggregation results
// of the second record as another ordered group [a, b, c], [1, 1, 1]. Only when
// Finish is called, will the OrderedAggregate be able to emit the merged
// aggregation results. The merged results should be: [a, b, c], [3, 2, 2].
type OrderedAggregate struct {
	// Fields that are constant throughout execution.
	pool                  memory.Allocator
	tracer                trace.Tracer
	resultColumnName      string
	groupByColumnMatchers []logicalplan.Expr
	aggregationFunction   logicalplan.AggFunc
	next                  PhysicalPlan
	columnToAggregate     logicalplan.Expr
	// Indicate is this is the last aggregation or if this is an aggregation
	// with another aggregation to follow after synchronizing.
	finalStage bool

	// groupColOrdering is needed to maintain a deterministic order of the group
	// by columns, since the names are stored in a map.
	groupColOrdering []arrow.Field

	notFirstCall bool
	// curGroup is used for comparisons against the groupResults found in each
	// record. It is initialized to the first group of the first record and
	// updated as new groupResults are found. The key is the field name, as in
	// groupBuilders below (this will hopefully change once we have static
	// schemas in the execution engine).
	curGroup map[string]any

	// groupBuilders is a map from the group by field name to the group column
	// builders **for the current ordered set**. If a new ordered set is found,
	// the builders flush the array to groupResults below.
	groupBuilders map[string]builder.ColumnBuilder

	// groupResults are the group columns. groupResults[i] represents the group
	// columns of ordered set i.
	groupResults [][]arrow.Array

	// arrayToAggCarry is used to carry over the values to aggregate for the
	// last group in a record since we cannot know whether that group continues
	// in the next record.
	arrayToAggCarry builder.ColumnBuilder

	// aggResultBuilder is a builder of the aggregation results for the current
	// ordered set (i.e. each element in this builder is the aggregation result
	// for one group in the ordered set). When the end of the ordered set is
	// found, the values in this builder are appended to aggregationResults
	// below.
	aggResultBuilder arrowutils.ArrayConcatenator

	// aggregationResults are the results of aggregating the values across
	// multiple calls to Callback. aggregationResults[i] is the arrow array that
	// belongs to ordered set i.
	aggregationResults []arrow.Array

	scratch struct {
		// groupByMap is a scratch map that helps store a mapping from the
		// field names of the group by columns found on each call to Callback to
		// their corresponding fields/arrays.
		groupByMap    map[string]groupColInfo
		groupByArrays []arrow.Array
		curGroup      []any
		// indexes is used as scratch space to unroll group/set range indexes.
		indexes []int64
	}
}

func (
	 memory.Allocator,
	 trace.Tracer,
	 Aggregation,
	 []logicalplan.Expr,
	 bool,
) *OrderedAggregate {
	 := &OrderedAggregate{
		pool:              ,
		tracer:            ,
		resultColumnName:  .resultName,
		columnToAggregate: .expr,
		// TODO: Matchers can be optimized to be something like a radix tree or
		// just a fast-lookup data structure for exact matches or prefix
		// matches.
		groupByColumnMatchers: ,
		aggregationFunction:   .function,
		finalStage:            ,
		curGroup:              make(map[string]any, 10),

		groupBuilders: make(map[string]builder.ColumnBuilder),

		aggregationResults: make([]arrow.Array, 0, 1),
	}
	.scratch.groupByMap = make(map[string]groupColInfo, 10)
	.scratch.groupByArrays = make([]arrow.Array, 0, 10)
	.scratch.curGroup = make([]any, 0, 10)
	return 
}

func ( *OrderedAggregate) () {
	.next.Close()
}

func ( *OrderedAggregate) ( PhysicalPlan) {
	.next = 
}

func ( *OrderedAggregate) () *Diagram {
	var  *Diagram
	if .next != nil {
		 = .next.Draw()
	}

	var  []string
	for ,  := range .groupByColumnMatchers {
		 = append(, .Name())
	}

	 := fmt.Sprintf(
		"OrderedAggregate (%s by %s)",
		.columnToAggregate.Name(),
		strings.Join(, ","),
	)
	return &Diagram{Details: , Child: }
}

func ( *OrderedAggregate) ( context.Context,  arrow.Record) error {
	// Generates high volume of spans. Comment out if needed during development.
	// ctx, span := a.tracer.Start(ctx, "OrderedAggregate/Callback")
	// defer span.End()

	for  := range .scratch.groupByMap {
		delete(.scratch.groupByMap, )
	}

	// TODO(asubiotto): Explore a static schema in the execution engine, all
	// this should be initialization code.

	var  arrow.Array
	 := false
	 := false
	for  := 0;  < .Schema().NumFields(); ++ {
		 := .Schema().Field()
		for ,  := range .groupByColumnMatchers {
			if .MatchColumn(.Name) {
				.scratch.groupByMap[.Name] = groupColInfo{field: , arr: .Column()}
				if ,  := .groupBuilders[.Name]; ! {
					.groupColOrdering = append(.groupColOrdering, )
					 := builder.NewBuilder(.pool, .Type)
					.groupBuilders[.Name] = 
					 = true
					if .notFirstCall {
						// This builder needs to be filled up with NULLs up to the
						// number of groups we currently have buffered.
						for  := 0;  < .groupBuilders[.groupColOrdering[0].Name].Len(); ++ {
							.AppendNull()
						}
					}
				}
			}
		}

		if .columnToAggregate.MatchColumn(.Name) {
			 = .Column()
			if .arrayToAggCarry == nil {
				.arrayToAggCarry = builder.NewBuilder(.pool, .DataType())
			}
			 = true
		}
	}

	if ! {
		return errors.New("aggregate field not found, aggregations are not possible without it")
	}

	if  {
		// Previous group results need to be updated with physical null columns
		// for the new columns found. We can't use virtual null columns here
		// because other operators aren't equipped to handle them.
		for  := range .groupResults {
			for  := len(.groupResults[]);  < len(.groupColOrdering); ++ {
				.groupResults[] = append(
					.groupResults[],
					arrowutils.MakeNullArray(
						.pool,
						.groupColOrdering[].Type,
						.groupResults[][0].Len(),
					),
				)
			}
		}
	}

	// Initialize the groupByMap for all group columns seen over all Callback
	// calls.
	.scratch.groupByArrays = .scratch.groupByArrays[:0]
	// curGroup is a slice that holds the group values for an index of the
	// groupByFields it is done so that we can access group values without
	// hashing by the field name.
	.scratch.curGroup = .scratch.curGroup[:0]
	for ,  := range .groupColOrdering {
		,  := .scratch.groupByMap[.Name]
		var  arrow.Array
		if ! {
			// If a column that was previously seen in a record is not seen,
			// add a virtual null column in its place.
			 = arrowutils.MakeVirtualNullArray(.Type, int(.NumRows()))
		} else {
			 = .arr
		}
		.scratch.groupByArrays = append(.scratch.groupByArrays, )
		if !.notFirstCall {
			// Initialize curGroup to the first value in each column.
			 := .GetOneForMarshal(0)
			switch concreteV := .(type) {
			case []byte:
				// Safe copy.
				.curGroup[.Name] = append([]byte(nil), ...)
			default:
				.curGroup[.Name] = 
			}
		}
		.scratch.curGroup = append(.scratch.curGroup, .curGroup[.Name])
	}
	.notFirstCall = true

	, , ,  := arrowutils.GetGroupsAndOrderedSetRanges(
		.scratch.curGroup,
		.scratch.groupByArrays,
	)
	if  != nil {
		return 
	}
	// Don't update curGroup to lastGroup yet, given that the end of the
	// curGroup from the last record might have been found at the zeroth index
	// and we need to know what values to append to the group builders.
	defer func() {
		for ,  := range  {
			.curGroup[.groupColOrdering[].Name] = 
		}
	}()

	 := .Unwrap(.scratch.indexes)

	// Aggregate the values for all groups found.
	 := make([]arrow.Array, 0, .Len())

	// arraysToAggregateSetIdxs keeps track of the idxs in arraysToAggregate
	// that represent new ordered sets. This is essentially a "conversion" of
	// the setRanges which refer to ranges of individual values in the input
	// record while arraysToAggregateSetIdxs refer to ranges of groups.
	var  []int64
	for ,  := int64(-1), 0; ; {
		,  := .PopNextNotEqual()
		// groupStart is initialized to -1 to not ignore groupEnd == 0, after
		// the first pop, it should now be set to 0.
		if  == -1 {
			 = 0
		}
		if ! {
			// All groups have been processed.
			// The values corresponding to the last group need to be carried
			// over to the next aggregation since we can't determine that the
			// last group is closed until we know the first value of the next
			// record passed to Callback.
			// Note that the current group values should already be set in
			// a.curGroup.
			// TODO(asubiotto): We don't handle NULL values in aggregation
			// columns in aggregation functions so disregard them here as well
			// for now. We should eventually care about this.
			// TODO(asubiotto): Instead of doing this copy, what would the
			// performance difference be if we just merged the aggregation?
			if  := builder.AppendArray(
				.arrayToAggCarry,
				array.NewSlice(, , int64(.Len())),
			);  != nil {
				return 
			}
			break
		}

		// Append the values to aggregate.
		var  arrow.Array
		if  == 0 {
			// End of the group found in the last record, the only data to
			// aggregate was carried over.
			 = .arrayToAggCarry.NewArray()
		} else {
			 = array.NewSlice(, , )
			if .arrayToAggCarry.Len() > 0 {
				if  := builder.AppendArray(.arrayToAggCarry, );  != nil {
					return 
				}
				 = .arrayToAggCarry.NewArray()
			}
		}
		 = append(, )

		// Append the groups.
		 := false
		if len() > 0 &&  < len() && [] ==  {
			++
			 = true
			 = append(, int64(len()))
			// This group is the last one of the current ordered set. Flush
			// it to the results. The corresponding aggregation results are
			// flushed in a loop below.
			.groupResults = append(.groupResults, nil)
		}
		for ,  := range .groupColOrdering {
			var  any
			if  == 0 {
				// End of the current group of the last record.
				 = .curGroup[.Name]
			} else {
				 = .scratch.groupByArrays[].GetOneForMarshal(int())
			}
			if  := builder.AppendGoValue(
				.groupBuilders[.Name],
				,
			);  != nil {
				return 
			}

			if  {
				 := len(.groupResults) - 1
				 := .groupBuilders[.Name].NewArray()
				// Since we're accumulating the group results until the call
				// to Finish, it is unsafe to reuse this builder since the
				// underlying buffers are reused, so allocate a new one.
				.groupBuilders[.Name] = builder.NewBuilder(.pool, .DataType())
				.groupResults[] = append(.groupResults[], )
			}
		}

		 = 
	}

	if len() == 0 {
		// No new groups or sets were found, carry on.
		return nil
	}

	,  := runAggregation(.finalStage, .aggregationFunction, .pool, )
	if  != nil {
		return 
	}

	// Supporting partial ordering implies the need to accumulate all the
	// results since any group might reoccur at any point in future records.
	// If we can determine that the ordering is global at plan time, we could
	// directly flush the results.
	 := int64(0)
	for ,  := range  {
		 := array.NewSlice(, , )
		if .aggResultBuilder.Len() > 0 {
			// This is the end of an ordered set that started in the last
			// record.
			.aggResultBuilder.Add()
			var  error
			,  = .aggResultBuilder.NewArray(.pool)
			if  != nil {
				return 
			}
		}
		.aggregationResults = append(.aggregationResults, )
		 = 
	}
	// The last ordered set cannot be determined to close within this
	// record, so carry it over.
	.aggResultBuilder.Add(array.NewSlice(, , int64(.Len())))
	return nil
}

func ( *OrderedAggregate) ( context.Context) error {
	,  := .tracer.Start(, "OrderedAggregate/Finish")
	defer .End()

	if !.notFirstCall {
		// Callback was never called, simply call Finish.
		return .next.Finish()
	}

	if .arrayToAggCarry.Len() > 0 {
		// Aggregate the last group.
		.groupResults = append(.groupResults, nil)
		 := len(.groupResults) - 1
		for ,  := range .groupColOrdering {
			 := .groupBuilders[.Name]
			if  := builder.AppendGoValue(
				, .curGroup[.Name],
			);  != nil {
				return 
			}
			.groupResults[] = append(.groupResults[], .NewArray())
		}

		,  := runAggregation(
			.finalStage, .aggregationFunction, .pool, []arrow.Array{.arrayToAggCarry.NewArray()},
		)
		if  != nil {
			return 
		}

		var  arrow.Array
		if .aggResultBuilder.Len() > 0 {
			// Append the results to the last ordered set.
			.aggResultBuilder.Add()
			var  error
			,  = .aggResultBuilder.NewArray(.pool)
			if  != nil {
				return 
			}
		} else {
			 = 
		}
		.aggregationResults = append(.aggregationResults, )
	}

	 := arrow.NewSchema(
		append(
			.groupColOrdering,
			arrow.Field{Name: .getResultColumnName(), Type: .aggregationResults[0].DataType()},
		),
		nil,
	)

	 := make([]arrow.Record, 0, len(.groupResults))
	for  := range .groupResults {
		 = append(
			,
			array.NewRecord(
				,
				append(
					.groupResults[],
					.aggregationResults[],
				),
				int64(.aggregationResults[].Len()),
			),
		)
	}

	if len() == 1 {
		if  := .next.Callback(, [0]);  != nil {
			return 
		}
	} else {
		// The aggregation results must be merged.
		 := make([]arrowutils.SortingColumn, len(.groupColOrdering))
		for  := range  {
			[] = arrowutils.SortingColumn{Index: }
		}
		,  := arrowutils.MergeRecords(.pool, , , 0)
		if  != nil {
			return 
		}
		 := make([]any, len(.groupColOrdering))
		 := .Columns()[:len(.groupColOrdering)]
		for ,  := range  {
			[] = .GetOneForMarshal(0)
		}
		, , ,  := arrowutils.GetGroupsAndOrderedSetRanges(, )
		if  != nil {
			return 
		}
		 := .Unwrap(.scratch.indexes)
		// Close the last range to iterate over all groupResults.
		 = append(, .NumRows())

		// For better performance, the result is built a column at a time.
		for ,  := range .groupColOrdering {
			 := int64(0)
			for ,  := range  {
				if  := builder.AppendValue(
					.groupBuilders[.Name], .Column(), int(),
				);  != nil {
					return 
				}
				 = 
			}
		}

		// The array of aggregation values is the first column index after the
		// group fields.
		 := .Columns()[len(.groupColOrdering)]
		 := int64(0)
		 := make([]arrow.Array, 0, len())
		for ,  := range  {
			 = append(, array.NewSlice(, , ))
			 = 
		}

		,  := runAggregation(true, .aggregationFunction, .pool, )
		if  != nil {
			return 
		}

		 := make([]arrow.Array, 0, len(.groupBuilders))
		for ,  := range .groupColOrdering {
			 = append(, .groupBuilders[.Name].NewArray())
		}
		if  := .next.Callback(
			,
			array.NewRecord(
				,
				append(, ),
				int64(.Len()),
			),
		);  != nil {
			return 
		}
	}

	return .next.Finish()
}

func ( *OrderedAggregate) () string {
	 := .columnToAggregate.Name()
	if .finalStage {
		 = .resultColumnName
	}
	return 
}