// Copyright 2022 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package asynq

import (
	
	
	

	
	
)

// An aggregator is responsible for checking groups and aggregate into one task
// if any of the grouping condition is met.
type aggregator struct {
	logger *log.Logger
	broker base.Broker
	client *Client

	// channel to communicate back to the long running "aggregator" goroutine.
	done chan struct{}

	// list of queue names to check and aggregate.
	queues []string

	// Group configurations
	gracePeriod time.Duration
	maxDelay    time.Duration
	maxSize     int

	// User provided group aggregator.
	ga GroupAggregator

	// interval used to check for aggregation
	interval time.Duration

	// sema is a counting semaphore to ensure the number of active aggregating function
	// does not exceed the limit.
	sema chan struct{}
}

type aggregatorParams struct {
	logger          *log.Logger
	broker          base.Broker
	queues          []string
	gracePeriod     time.Duration
	maxDelay        time.Duration
	maxSize         int
	groupAggregator GroupAggregator
}

const (
	// Maximum number of aggregation checks in flight concurrently.
	maxConcurrentAggregationChecks = 3

	// Default interval used for aggregation checks. If the provided gracePeriod is less than
	// the default, use the gracePeriod.
	defaultAggregationCheckInterval = 7 * time.Second
)

func newAggregator( aggregatorParams) *aggregator {
	 := defaultAggregationCheckInterval
	if .gracePeriod <  {
		 = .gracePeriod
	}
	return &aggregator{
		logger:      .logger,
		broker:      .broker,
		client:      &Client{broker: .broker},
		done:        make(chan struct{}),
		queues:      .queues,
		gracePeriod: .gracePeriod,
		maxDelay:    .maxDelay,
		maxSize:     .maxSize,
		ga:          .groupAggregator,
		sema:        make(chan struct{}, maxConcurrentAggregationChecks),
		interval:    ,
	}
}

func ( *aggregator) () {
	if .ga == nil {
		return
	}
	.logger.Debug("Aggregator shutting down...")
	// Signal the aggregator goroutine to stop.
	.done <- struct{}{}
}

func ( *aggregator) ( *sync.WaitGroup) {
	if .ga == nil {
		return
	}
	.Add(1)
	go func() {
		defer .Done()
		 := time.NewTicker(.interval)
		for {
			select {
			case <-.done:
				.logger.Debug("Waiting for all aggregation checks to finish...")
				// block until all aggregation checks released the token
				for  := 0;  < cap(.sema); ++ {
					.sema <- struct{}{}
				}
				.logger.Debug("Aggregator done")
				.Stop()
				return
			case  := <-.C:
				.exec()
			}
		}
	}()
}

func ( *aggregator) ( time.Time) {
	select {
	case .sema <- struct{}{}: // acquire token
		go .aggregate()
	default:
		// If the semaphore blocks, then we are currently running max number of
		// aggregation checks. Skip this round and log warning.
		.logger.Warnf("Max number of aggregation checks in flight. Skipping")
	}
}

func ( *aggregator) ( time.Time) {
	defer func() { <-.sema /* release token */ }()
	for ,  := range .queues {
		,  := .broker.ListGroups()
		if  != nil {
			.logger.Errorf("Failed to list groups in queue: %q", )
			continue
		}
		for ,  := range  {
			,  := .broker.AggregationCheck(
				, , , .gracePeriod, .maxDelay, .maxSize)
			if  != nil {
				.logger.Errorf("Failed to run aggregation check: queue=%q group=%q", , )
				continue
			}
			if  == "" {
				.logger.Debugf("No aggregation needed at this time: queue=%q group=%q", , )
				continue
			}

			// Aggregate and enqueue.
			, ,  := .broker.ReadAggregationSet(, , )
			if  != nil {
				.logger.Errorf("Failed to read aggregation set: queue=%q, group=%q, setID=%q",
					, , )
				continue
			}
			 := make([]*Task, len())
			for ,  := range  {
				[] = NewTask(.Type, .Payload)
			}
			 := .ga.Aggregate(, )
			,  := context.WithDeadline(context.Background(), )
			if ,  := .client.EnqueueContext(, , Queue());  != nil {
				.logger.Errorf("Failed to enqueue aggregated task (queue=%q, group=%q, setID=%q): %v",
					, , , )
				()
				continue
			}
			if  := .broker.DeleteAggregationSet(, , , );  != nil {
				.logger.Warnf("Failed to delete aggregation set: queue=%q, group=%q, setID=%q",
					, , )
			}
			()
		}
	}
}