package asynq
import (
"context"
"fmt"
"math"
"math/rand"
"runtime"
"runtime/debug"
"sort"
"strings"
"sync"
"time"
"github.com/hibiken/asynq/internal/base"
asynqcontext "github.com/hibiken/asynq/internal/context"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/timeutil"
"golang.org/x/time/rate"
)
type processor struct {
logger *log .Logger
broker base .Broker
clock timeutil .Clock
handler Handler
baseCtxFn func () context .Context
queueConfig map [string ]int
orderedQueues []string
retryDelayFunc RetryDelayFunc
isFailureFunc func (error ) bool
errHandler ErrorHandler
shutdownTimeout time .Duration
syncRequestCh chan <- *syncRequest
errLogLimiter *rate .Limiter
sema chan struct {}
done chan struct {}
once sync .Once
quit chan struct {}
abort chan struct {}
cancelations *base .Cancelations
starting chan <- *workerInfo
finished chan <- *base .TaskMessage
}
type processorParams struct {
logger *log .Logger
broker base .Broker
baseCtxFn func () context .Context
retryDelayFunc RetryDelayFunc
isFailureFunc func (error ) bool
syncCh chan <- *syncRequest
cancelations *base .Cancelations
concurrency int
queues map [string ]int
strictPriority bool
errHandler ErrorHandler
shutdownTimeout time .Duration
starting chan <- *workerInfo
finished chan <- *base .TaskMessage
}
func newProcessor(params processorParams ) *processor {
queues := normalizeQueues (params .queues )
orderedQueues := []string (nil )
if params .strictPriority {
orderedQueues = sortByPriority (queues )
}
return &processor {
logger : params .logger ,
broker : params .broker ,
baseCtxFn : params .baseCtxFn ,
clock : timeutil .NewRealClock (),
queueConfig : queues ,
orderedQueues : orderedQueues ,
retryDelayFunc : params .retryDelayFunc ,
isFailureFunc : params .isFailureFunc ,
syncRequestCh : params .syncCh ,
cancelations : params .cancelations ,
errLogLimiter : rate .NewLimiter (rate .Every (3 *time .Second ), 1 ),
sema : make (chan struct {}, params .concurrency ),
done : make (chan struct {}),
quit : make (chan struct {}),
abort : make (chan struct {}),
errHandler : params .errHandler ,
handler : HandlerFunc (func (ctx context .Context , t *Task ) error { return fmt .Errorf ("handler not set" ) }),
shutdownTimeout : params .shutdownTimeout ,
starting : params .starting ,
finished : params .finished ,
}
}
func (p *processor ) stop () {
p .once .Do (func () {
p .logger .Debug ("Processor shutting down..." )
close (p .quit )
p .done <- struct {}{}
})
}
func (p *processor ) shutdown () {
p .stop ()
time .AfterFunc (p .shutdownTimeout , func () { close (p .abort ) })
p .logger .Info ("Waiting for all workers to finish..." )
for i := 0 ; i < cap (p .sema ); i ++ {
p .sema <- struct {}{}
}
p .logger .Info ("All workers have finished" )
}
func (p *processor ) start (wg *sync .WaitGroup ) {
wg .Add (1 )
go func () {
defer wg .Done ()
for {
select {
case <- p .done :
p .logger .Debug ("Processor done" )
return
default :
p .exec ()
}
}
}()
}
func (p *processor ) exec () {
select {
case <- p .quit :
return
case p .sema <- struct {}{}:
qnames := p .queues ()
msg , leaseExpirationTime , err := p .broker .Dequeue (qnames ...)
switch {
case errors .Is (err , errors .ErrNoProcessableTask ):
p .logger .Debug ("All queues are empty" )
time .Sleep (time .Second )
<-p .sema
return
case err != nil :
if p .errLogLimiter .Allow () {
p .logger .Errorf ("Dequeue error: %v" , err )
}
<-p .sema
return
}
lease := base .NewLease (leaseExpirationTime )
deadline := p .computeDeadline (msg )
p .starting <- &workerInfo {msg , time .Now (), deadline , lease }
go func () {
defer func () {
p .finished <- msg
<-p .sema
}()
ctx , cancel := asynqcontext .New (p .baseCtxFn (), msg , deadline )
p .cancelations .Add (msg .ID , cancel )
defer func () {
cancel ()
p .cancelations .Delete (msg .ID )
}()
select {
case <- ctx .Done ():
p .handleFailedMessage (ctx , lease , msg , ctx .Err ())
return
default :
}
resCh := make (chan error , 1 )
go func () {
task := newTask (
msg .Type ,
msg .Payload ,
&ResultWriter {
id : msg .ID ,
qname : msg .Queue ,
broker : p .broker ,
ctx : ctx ,
},
)
resCh <- p .perform (ctx , task )
}()
select {
case <- p .abort :
p .logger .Warnf ("Quitting worker. task id=%s" , msg .ID )
p .requeue (lease , msg )
return
case <- lease .Done ():
cancel ()
p .handleFailedMessage (ctx , lease , msg , ErrLeaseExpired )
return
case <- ctx .Done ():
p .handleFailedMessage (ctx , lease , msg , ctx .Err ())
return
case resErr := <- resCh :
if resErr != nil {
p .handleFailedMessage (ctx , lease , msg , resErr )
return
}
p .handleSucceededMessage (lease , msg )
}
}()
}
}
func (p *processor ) requeue (l *base .Lease , msg *base .TaskMessage ) {
if !l .IsValid () {
return
}
ctx , _ := context .WithDeadline (context .Background (), l .Deadline ())
err := p .broker .Requeue (ctx , msg )
if err != nil {
p .logger .Errorf ("Could not push task id=%s back to queue: %v" , msg .ID , err )
} else {
p .logger .Infof ("Pushed task id=%s back to queue" , msg .ID )
}
}
func (p *processor ) handleSucceededMessage (l *base .Lease , msg *base .TaskMessage ) {
if msg .Retention > 0 {
p .markAsComplete (l , msg )
} else {
p .markAsDone (l , msg )
}
}
func (p *processor ) markAsComplete (l *base .Lease , msg *base .TaskMessage ) {
if !l .IsValid () {
return
}
ctx , _ := context .WithDeadline (context .Background (), l .Deadline ())
err := p .broker .MarkAsComplete (ctx , msg )
if err != nil {
errMsg := fmt .Sprintf ("Could not move task id=%s type=%q from %q to %q: %+v" ,
msg .ID , msg .Type , base .ActiveKey (msg .Queue ), base .CompletedKey (msg .Queue ), err )
p .logger .Warnf ("%s; Will retry syncing" , errMsg )
p .syncRequestCh <- &syncRequest {
fn : func () error {
return p .broker .MarkAsComplete (ctx , msg )
},
errMsg : errMsg ,
deadline : l .Deadline (),
}
}
}
func (p *processor ) markAsDone (l *base .Lease , msg *base .TaskMessage ) {
if !l .IsValid () {
return
}
ctx , _ := context .WithDeadline (context .Background (), l .Deadline ())
err := p .broker .Done (ctx , msg )
if err != nil {
errMsg := fmt .Sprintf ("Could not remove task id=%s type=%q from %q err: %+v" , msg .ID , msg .Type , base .ActiveKey (msg .Queue ), err )
p .logger .Warnf ("%s; Will retry syncing" , errMsg )
p .syncRequestCh <- &syncRequest {
fn : func () error {
return p .broker .Done (ctx , msg )
},
errMsg : errMsg ,
deadline : l .Deadline (),
}
}
}
var SkipRetry = errors .New ("skip retry for the task" )
func (p *processor ) handleFailedMessage (ctx context .Context , l *base .Lease , msg *base .TaskMessage , err error ) {
if p .errHandler != nil {
p .errHandler .HandleError (ctx , NewTask (msg .Type , msg .Payload ), err )
}
if !p .isFailureFunc (err ) {
p .retry (l , msg , err , false )
return
}
if msg .Retried >= msg .Retry || errors .Is (err , SkipRetry ) {
p .logger .Warnf ("Retry exhausted for task id=%s" , msg .ID )
p .archive (l , msg , err )
} else {
p .retry (l , msg , err , true )
}
}
func (p *processor ) retry (l *base .Lease , msg *base .TaskMessage , e error , isFailure bool ) {
if !l .IsValid () {
return
}
ctx , _ := context .WithDeadline (context .Background (), l .Deadline ())
d := p .retryDelayFunc (msg .Retried , e , NewTask (msg .Type , msg .Payload ))
retryAt := time .Now ().Add (d )
err := p .broker .Retry (ctx , msg , retryAt , e .Error(), isFailure )
if err != nil {
errMsg := fmt .Sprintf ("Could not move task id=%s from %q to %q" , msg .ID , base .ActiveKey (msg .Queue ), base .RetryKey (msg .Queue ))
p .logger .Warnf ("%s; Will retry syncing" , errMsg )
p .syncRequestCh <- &syncRequest {
fn : func () error {
return p .broker .Retry (ctx , msg , retryAt , e .Error(), isFailure )
},
errMsg : errMsg ,
deadline : l .Deadline (),
}
}
}
func (p *processor ) archive (l *base .Lease , msg *base .TaskMessage , e error ) {
if !l .IsValid () {
return
}
ctx , _ := context .WithDeadline (context .Background (), l .Deadline ())
err := p .broker .Archive (ctx , msg , e .Error())
if err != nil {
errMsg := fmt .Sprintf ("Could not move task id=%s from %q to %q" , msg .ID , base .ActiveKey (msg .Queue ), base .ArchivedKey (msg .Queue ))
p .logger .Warnf ("%s; Will retry syncing" , errMsg )
p .syncRequestCh <- &syncRequest {
fn : func () error {
return p .broker .Archive (ctx , msg , e .Error())
},
errMsg : errMsg ,
deadline : l .Deadline (),
}
}
}
func (p *processor ) queues () []string {
if len (p .queueConfig ) == 1 {
for qname := range p .queueConfig {
return []string {qname }
}
}
if p .orderedQueues != nil {
return p .orderedQueues
}
var names []string
for qname , priority := range p .queueConfig {
for i := 0 ; i < priority ; i ++ {
names = append (names , qname )
}
}
r := rand .New (rand .NewSource (time .Now ().UnixNano ()))
r .Shuffle (len (names ), func (i , j int ) { names [i ], names [j ] = names [j ], names [i ] })
return uniq (names , len (p .queueConfig ))
}
func (p *processor ) perform (ctx context .Context , task *Task ) (err error ) {
defer func () {
if x := recover (); x != nil {
p .logger .Errorf ("recovering from panic. See the stack trace below for details:\n%s" , string (debug .Stack ()))
_ , file , line , ok := runtime .Caller (1 )
if ok && strings .Contains (file , "runtime/" ) {
_, file , line , ok = runtime .Caller (2 )
}
if ok {
err = fmt .Errorf ("panic [%s:%d]: %v" , file , line , x )
} else {
err = fmt .Errorf ("panic: %v" , x )
}
}
}()
return p .handler .ProcessTask (ctx , task )
}
func uniq(names []string , l int ) []string {
var res []string
seen := make (map [string ]struct {})
for _ , s := range names {
if _ , ok := seen [s ]; !ok {
seen [s ] = struct {}{}
res = append (res , s )
}
if len (res ) == l {
break
}
}
return res
}
func sortByPriority(qcfg map [string ]int ) []string {
var queues []*queue
for qname , n := range qcfg {
queues = append (queues , &queue {qname , n })
}
sort .Sort (sort .Reverse (byPriority (queues )))
var res []string
for _ , q := range queues {
res = append (res , q .name )
}
return res
}
type queue struct {
name string
priority int
}
type byPriority []*queue
func (x byPriority ) Len () int { return len (x ) }
func (x byPriority ) Less (i , j int ) bool { return x [i ].priority < x [j ].priority }
func (x byPriority ) Swap (i , j int ) { x [i ], x [j ] = x [j ], x [i ] }
func normalizeQueues(queues map [string ]int ) map [string ]int {
var xs []int
for _ , x := range queues {
xs = append (xs , x )
}
d := gcd (xs ...)
res := make (map [string ]int )
for q , x := range queues {
res [q ] = x / d
}
return res
}
func gcd(xs ...int ) int {
fn := func (x , y int ) int {
for y > 0 {
x , y = y , x %y
}
return x
}
res := xs [0 ]
for i := 0 ; i < len (xs ); i ++ {
res = fn (xs [i ], res )
if res == 1 {
return 1
}
}
return res
}
func (p *processor ) computeDeadline (msg *base .TaskMessage ) time .Time {
if msg .Timeout == 0 && msg .Deadline == 0 {
p .logger .Errorf ("asynq: internal error: both timeout and deadline are not set for the task message: %s" , msg .ID )
return p .clock .Now ().Add (defaultTimeout )
}
if msg .Timeout != 0 && msg .Deadline != 0 {
deadlineUnix := math .Min (float64 (p .clock .Now ().Unix ()+msg .Timeout ), float64 (msg .Deadline ))
return time .Unix (int64 (deadlineUnix ), 0 )
}
if msg .Timeout != 0 {
return p .clock .Now ().Add (time .Duration (msg .Timeout ) * time .Second )
}
return time .Unix (msg .Deadline , 0 )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .