package asynq
import (
"context"
"sync"
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/log"
)
type recoverer struct {
logger *log .Logger
broker base .Broker
retryDelayFunc RetryDelayFunc
isFailureFunc func (error ) bool
done chan struct {}
queues []string
interval time .Duration
}
type recovererParams struct {
logger *log .Logger
broker base .Broker
queues []string
interval time .Duration
retryDelayFunc RetryDelayFunc
isFailureFunc func (error ) bool
}
func newRecoverer(params recovererParams ) *recoverer {
return &recoverer {
logger : params .logger ,
broker : params .broker ,
done : make (chan struct {}),
queues : params .queues ,
interval : params .interval ,
retryDelayFunc : params .retryDelayFunc ,
isFailureFunc : params .isFailureFunc ,
}
}
func (r *recoverer ) shutdown () {
r .logger .Debug ("Recoverer shutting down..." )
r .done <- struct {}{}
}
func (r *recoverer ) start (wg *sync .WaitGroup ) {
wg .Add (1 )
go func () {
defer wg .Done ()
r .recover ()
timer := time .NewTimer (r .interval )
for {
select {
case <- r .done :
r .logger .Debug ("Recoverer done" )
timer .Stop ()
return
case <- timer .C :
r .recover ()
timer .Reset (r .interval )
}
}
}()
}
var ErrLeaseExpired = errors .New ("asynq: task lease expired" )
func (r *recoverer ) recover () {
r .recoverLeaseExpiredTasks ()
r .recoverStaleAggregationSets ()
}
func (r *recoverer ) recoverLeaseExpiredTasks () {
cutoff := time .Now ().Add (-30 * time .Second )
msgs , err := r .broker .ListLeaseExpired (cutoff , r .queues ...)
if err != nil {
r .logger .Warnf ("recoverer: could not list lease expired tasks: %v" , err )
return
}
for _ , msg := range msgs {
if msg .Retried >= msg .Retry {
r .archive (msg , ErrLeaseExpired )
} else {
r .retry (msg , ErrLeaseExpired )
}
}
}
func (r *recoverer ) recoverStaleAggregationSets () {
for _ , qname := range r .queues {
if err := r .broker .ReclaimStaleAggregationSets (qname ); err != nil {
r .logger .Warnf ("recoverer: could not reclaim stale aggregation sets in queue %q: %v" , qname , err )
}
}
}
func (r *recoverer ) retry (msg *base .TaskMessage , err error ) {
delay := r .retryDelayFunc (msg .Retried , err , NewTask (msg .Type , msg .Payload ))
retryAt := time .Now ().Add (delay )
if err := r .broker .Retry (context .Background (), msg , retryAt , err .Error(), r .isFailureFunc (err )); err != nil {
r .logger .Warnf ("recoverer: could not retry lease expired task: %v" , err )
}
}
func (r *recoverer ) archive (msg *base .TaskMessage , err error ) {
if err := r .broker .Archive (context .Background (), msg , err .Error()); err != nil {
r .logger .Warnf ("recoverer: could not move task to archive: %v" , err )
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .