package asynq
import (
"context"
"fmt"
"strings"
"time"
"github.com/redis/go-redis/v9"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
)
type Client struct {
broker base .Broker
}
func NewClient (r RedisConnOpt ) *Client {
c , ok := r .MakeRedisClient ().(redis .UniversalClient )
if !ok {
panic (fmt .Sprintf ("asynq: unsupported RedisConnOpt type %T" , r ))
}
return &Client {broker : rdb .NewRDB (c )}
}
type OptionType int
const (
MaxRetryOpt OptionType = iota
QueueOpt
TimeoutOpt
DeadlineOpt
UniqueOpt
ProcessAtOpt
ProcessInOpt
TaskIDOpt
RetentionOpt
GroupOpt
)
type Option interface {
String () string
Type () OptionType
Value () interface {}
}
type (
retryOption int
queueOption string
taskIDOption string
timeoutOption time .Duration
deadlineOption time .Time
uniqueOption time .Duration
processAtOption time .Time
processInOption time .Duration
retentionOption time .Duration
groupOption string
)
func MaxRetry (n int ) Option {
if n < 0 {
n = 0
}
return retryOption (n )
}
func (n retryOption ) String () string { return fmt .Sprintf ("MaxRetry(%d)" , int (n )) }
func (n retryOption ) Type () OptionType { return MaxRetryOpt }
func (n retryOption ) Value () interface {} { return int (n ) }
func Queue (name string ) Option {
return queueOption (name )
}
func (name queueOption ) String () string { return fmt .Sprintf ("Queue(%q)" , string (name )) }
func (name queueOption ) Type () OptionType { return QueueOpt }
func (name queueOption ) Value () interface {} { return string (name ) }
func TaskID (id string ) Option {
return taskIDOption (id )
}
func (id taskIDOption ) String () string { return fmt .Sprintf ("TaskID(%q)" , string (id )) }
func (id taskIDOption ) Type () OptionType { return TaskIDOpt }
func (id taskIDOption ) Value () interface {} { return string (id ) }
func Timeout (d time .Duration ) Option {
return timeoutOption (d )
}
func (d timeoutOption ) String () string { return fmt .Sprintf ("Timeout(%v)" , time .Duration (d )) }
func (d timeoutOption ) Type () OptionType { return TimeoutOpt }
func (d timeoutOption ) Value () interface {} { return time .Duration (d ) }
func Deadline (t time .Time ) Option {
return deadlineOption (t )
}
func (t deadlineOption ) String () string {
return fmt .Sprintf ("Deadline(%v)" , time .Time (t ).Format (time .UnixDate ))
}
func (t deadlineOption ) Type () OptionType { return DeadlineOpt }
func (t deadlineOption ) Value () interface {} { return time .Time (t ) }
func Unique (ttl time .Duration ) Option {
return uniqueOption (ttl )
}
func (ttl uniqueOption ) String () string { return fmt .Sprintf ("Unique(%v)" , time .Duration (ttl )) }
func (ttl uniqueOption ) Type () OptionType { return UniqueOpt }
func (ttl uniqueOption ) Value () interface {} { return time .Duration (ttl ) }
func ProcessAt (t time .Time ) Option {
return processAtOption (t )
}
func (t processAtOption ) String () string {
return fmt .Sprintf ("ProcessAt(%v)" , time .Time (t ).Format (time .UnixDate ))
}
func (t processAtOption ) Type () OptionType { return ProcessAtOpt }
func (t processAtOption ) Value () interface {} { return time .Time (t ) }
func ProcessIn (d time .Duration ) Option {
return processInOption (d )
}
func (d processInOption ) String () string { return fmt .Sprintf ("ProcessIn(%v)" , time .Duration (d )) }
func (d processInOption ) Type () OptionType { return ProcessInOpt }
func (d processInOption ) Value () interface {} { return time .Duration (d ) }
func Retention (d time .Duration ) Option {
return retentionOption (d )
}
func (ttl retentionOption ) String () string { return fmt .Sprintf ("Retention(%v)" , time .Duration (ttl )) }
func (ttl retentionOption ) Type () OptionType { return RetentionOpt }
func (ttl retentionOption ) Value () interface {} { return time .Duration (ttl ) }
func Group (name string ) Option {
return groupOption (name )
}
func (name groupOption ) String () string { return fmt .Sprintf ("Group(%q)" , string (name )) }
func (name groupOption ) Type () OptionType { return GroupOpt }
func (name groupOption ) Value () interface {} { return string (name ) }
var ErrDuplicateTask = errors .New ("task already exists" )
var ErrTaskIDConflict = errors .New ("task ID conflicts with another task" )
type option struct {
retry int
queue string
taskID string
timeout time .Duration
deadline time .Time
uniqueTTL time .Duration
processAt time .Time
retention time .Duration
group string
}
func composeOptions(opts ...Option ) (option , error ) {
res := option {
retry : defaultMaxRetry ,
queue : base .DefaultQueueName ,
taskID : uuid .NewString (),
timeout : 0 ,
deadline : time .Time {},
processAt : time .Now (),
}
for _ , opt := range opts {
switch opt := opt .(type ) {
case retryOption :
res .retry = int (opt )
case queueOption :
qname := string (opt )
if err := base .ValidateQueueName (qname ); err != nil {
return option {}, err
}
res .queue = qname
case taskIDOption :
id := string (opt )
if isBlank (id ) {
return option {}, errors .New ("task ID cannot be empty" )
}
res .taskID = id
case timeoutOption :
res .timeout = time .Duration (opt )
case deadlineOption :
res .deadline = time .Time (opt )
case uniqueOption :
ttl := time .Duration (opt )
if ttl < 1 *time .Second {
return option {}, errors .New ("Unique TTL cannot be less than 1s" )
}
res .uniqueTTL = ttl
case processAtOption :
res .processAt = time .Time (opt )
case processInOption :
res .processAt = time .Now ().Add (time .Duration (opt ))
case retentionOption :
res .retention = time .Duration (opt )
case groupOption :
key := string (opt )
if isBlank (key ) {
return option {}, errors .New ("group key cannot be empty" )
}
res .group = key
default :
}
}
return res , nil
}
func isBlank(s string ) bool {
return strings .TrimSpace (s ) == ""
}
const (
defaultMaxRetry = 25
defaultTimeout = 30 * time .Minute
)
var (
noTimeout time .Duration = 0
noDeadline time .Time = time .Unix (0 , 0 )
)
func (c *Client ) Close () error {
return c .broker .Close ()
}
func (c *Client ) Enqueue (task *Task , opts ...Option ) (*TaskInfo , error ) {
return c .EnqueueContext (context .Background (), task , opts ...)
}
func (c *Client ) EnqueueContext (ctx context .Context , task *Task , opts ...Option ) (*TaskInfo , error ) {
if task == nil {
return nil , fmt .Errorf ("task cannot be nil" )
}
if strings .TrimSpace (task .Type ()) == "" {
return nil , fmt .Errorf ("task typename cannot be empty" )
}
opts = append (task .opts , opts ...)
opt , err := composeOptions (opts ...)
if err != nil {
return nil , err
}
deadline := noDeadline
if !opt .deadline .IsZero () {
deadline = opt .deadline
}
timeout := noTimeout
if opt .timeout != 0 {
timeout = opt .timeout
}
if deadline .Equal (noDeadline ) && timeout == noTimeout {
timeout = defaultTimeout
}
var uniqueKey string
if opt .uniqueTTL > 0 {
uniqueKey = base .UniqueKey (opt .queue , task .Type (), task .Payload ())
}
msg := &base .TaskMessage {
ID : opt .taskID ,
Type : task .Type (),
Payload : task .Payload (),
Queue : opt .queue ,
Retry : opt .retry ,
Deadline : deadline .Unix (),
Timeout : int64 (timeout .Seconds ()),
UniqueKey : uniqueKey ,
GroupKey : opt .group ,
Retention : int64 (opt .retention .Seconds ()),
}
now := time .Now ()
var state base .TaskState
if opt .processAt .After (now ) {
err = c .schedule (ctx , msg , opt .processAt , opt .uniqueTTL )
state = base .TaskStateScheduled
} else if opt .group != "" {
opt .processAt = time .Time {}
err = c .addToGroup (ctx , msg , opt .group , opt .uniqueTTL )
state = base .TaskStateAggregating
} else {
opt .processAt = now
err = c .enqueue (ctx , msg , opt .uniqueTTL )
state = base .TaskStatePending
}
switch {
case errors .Is (err , errors .ErrDuplicateTask ):
return nil , fmt .Errorf ("%w" , ErrDuplicateTask )
case errors .Is (err , errors .ErrTaskIdConflict ):
return nil , fmt .Errorf ("%w" , ErrTaskIDConflict )
case err != nil :
return nil , err
}
return newTaskInfo (msg , state , opt .processAt , nil ), nil
}
func (c *Client ) enqueue (ctx context .Context , msg *base .TaskMessage , uniqueTTL time .Duration ) error {
if uniqueTTL > 0 {
return c .broker .EnqueueUnique (ctx , msg , uniqueTTL )
}
return c .broker .Enqueue (ctx , msg )
}
func (c *Client ) schedule (ctx context .Context , msg *base .TaskMessage , t time .Time , uniqueTTL time .Duration ) error {
if uniqueTTL > 0 {
ttl := t .Add (uniqueTTL ).Sub (time .Now ())
return c .broker .ScheduleUnique (ctx , msg , t , ttl )
}
return c .broker .Schedule (ctx , msg , t )
}
func (c *Client ) addToGroup (ctx context .Context , msg *base .TaskMessage , group string , uniqueTTL time .Duration ) error {
if uniqueTTL > 0 {
return c .broker .AddToGroupUnique (ctx , msg , group , uniqueTTL )
}
return c .broker .AddToGroup (ctx , msg , group )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .