package asynq
import (
"context"
"sync"
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
)
type aggregator struct {
logger *log .Logger
broker base .Broker
client *Client
done chan struct {}
queues []string
gracePeriod time .Duration
maxDelay time .Duration
maxSize int
ga GroupAggregator
interval time .Duration
sema chan struct {}
}
type aggregatorParams struct {
logger *log .Logger
broker base .Broker
queues []string
gracePeriod time .Duration
maxDelay time .Duration
maxSize int
groupAggregator GroupAggregator
}
const (
maxConcurrentAggregationChecks = 3
defaultAggregationCheckInterval = 7 * time .Second
)
func newAggregator(params aggregatorParams ) *aggregator {
interval := defaultAggregationCheckInterval
if params .gracePeriod < interval {
interval = params .gracePeriod
}
return &aggregator {
logger : params .logger ,
broker : params .broker ,
client : &Client {broker : params .broker },
done : make (chan struct {}),
queues : params .queues ,
gracePeriod : params .gracePeriod ,
maxDelay : params .maxDelay ,
maxSize : params .maxSize ,
ga : params .groupAggregator ,
sema : make (chan struct {}, maxConcurrentAggregationChecks ),
interval : interval ,
}
}
func (a *aggregator ) shutdown () {
if a .ga == nil {
return
}
a .logger .Debug ("Aggregator shutting down..." )
a .done <- struct {}{}
}
func (a *aggregator ) start (wg *sync .WaitGroup ) {
if a .ga == nil {
return
}
wg .Add (1 )
go func () {
defer wg .Done ()
ticker := time .NewTicker (a .interval )
for {
select {
case <- a .done :
a .logger .Debug ("Waiting for all aggregation checks to finish..." )
for i := 0 ; i < cap (a .sema ); i ++ {
a .sema <- struct {}{}
}
a .logger .Debug ("Aggregator done" )
ticker .Stop ()
return
case t := <- ticker .C :
a .exec (t )
}
}
}()
}
func (a *aggregator ) exec (t time .Time ) {
select {
case a .sema <- struct {}{}:
go a .aggregate (t )
default :
a .logger .Warnf ("Max number of aggregation checks in flight. Skipping" )
}
}
func (a *aggregator ) aggregate (t time .Time ) {
defer func () { <-a .sema }()
for _ , qname := range a .queues {
groups , err := a .broker .ListGroups (qname )
if err != nil {
a .logger .Errorf ("Failed to list groups in queue: %q" , qname )
continue
}
for _ , gname := range groups {
aggregationSetID , err := a .broker .AggregationCheck (
qname , gname , t , a .gracePeriod , a .maxDelay , a .maxSize )
if err != nil {
a .logger .Errorf ("Failed to run aggregation check: queue=%q group=%q" , qname , gname )
continue
}
if aggregationSetID == "" {
a .logger .Debugf ("No aggregation needed at this time: queue=%q group=%q" , qname , gname )
continue
}
msgs , deadline , err := a .broker .ReadAggregationSet (qname , gname , aggregationSetID )
if err != nil {
a .logger .Errorf ("Failed to read aggregation set: queue=%q, group=%q, setID=%q" ,
qname , gname , aggregationSetID )
continue
}
tasks := make ([]*Task , len (msgs ))
for i , m := range msgs {
tasks [i ] = NewTask (m .Type , m .Payload )
}
aggregatedTask := a .ga .Aggregate (gname , tasks )
ctx , cancel := context .WithDeadline (context .Background (), deadline )
if _ , err := a .client .EnqueueContext (ctx , aggregatedTask , Queue (qname )); err != nil {
a .logger .Errorf ("Failed to enqueue aggregated task (queue=%q, group=%q, setID=%q): %v" ,
qname , gname , aggregationSetID , err )
cancel ()
continue
}
if err := a .broker .DeleteAggregationSet (ctx , qname , gname , aggregationSetID ); err != nil {
a .logger .Warnf ("Failed to delete aggregation set: queue=%q, group=%q, setID=%q" ,
qname , gname , aggregationSetID )
}
cancel ()
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .