package base
import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"strings"
"sync"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/hibiken/asynq/internal/errors"
pb "github.com/hibiken/asynq/internal/proto"
"github.com/hibiken/asynq/internal/timeutil"
"github.com/redis/go-redis/v9"
"google.golang.org/protobuf/proto"
)
const Version = "0.24.1"
const DefaultQueueName = "default"
var DefaultQueue = PendingKey (DefaultQueueName )
const (
AllServers = "asynq:servers"
AllWorkers = "asynq:workers"
AllSchedulers = "asynq:schedulers"
AllQueues = "asynq:queues"
CancelChannel = "asynq:cancel"
)
type TaskState int
const (
TaskStateActive TaskState = iota + 1
TaskStatePending
TaskStateScheduled
TaskStateRetry
TaskStateArchived
TaskStateCompleted
TaskStateAggregating
)
func (s TaskState ) String () string {
switch s {
case TaskStateActive :
return "active"
case TaskStatePending :
return "pending"
case TaskStateScheduled :
return "scheduled"
case TaskStateRetry :
return "retry"
case TaskStateArchived :
return "archived"
case TaskStateCompleted :
return "completed"
case TaskStateAggregating :
return "aggregating"
}
panic (fmt .Sprintf ("internal error: unknown task state %d" , s ))
}
func TaskStateFromString (s string ) (TaskState , error ) {
switch s {
case "active" :
return TaskStateActive , nil
case "pending" :
return TaskStatePending , nil
case "scheduled" :
return TaskStateScheduled , nil
case "retry" :
return TaskStateRetry , nil
case "archived" :
return TaskStateArchived , nil
case "completed" :
return TaskStateCompleted , nil
case "aggregating" :
return TaskStateAggregating , nil
}
return 0 , errors .E (errors .FailedPrecondition , fmt .Sprintf ("%q is not supported task state" , s ))
}
func ValidateQueueName (qname string ) error {
if len (strings .TrimSpace (qname )) == 0 {
return fmt .Errorf ("queue name must contain one or more characters" )
}
return nil
}
func QueueKeyPrefix (qname string ) string {
return fmt .Sprintf ("asynq:{%s}:" , qname )
}
func TaskKeyPrefix (qname string ) string {
return fmt .Sprintf ("%st:" , QueueKeyPrefix (qname ))
}
func TaskKey (qname , id string ) string {
return fmt .Sprintf ("%s%s" , TaskKeyPrefix (qname ), id )
}
func PendingKey (qname string ) string {
return fmt .Sprintf ("%spending" , QueueKeyPrefix (qname ))
}
func ActiveKey (qname string ) string {
return fmt .Sprintf ("%sactive" , QueueKeyPrefix (qname ))
}
func ScheduledKey (qname string ) string {
return fmt .Sprintf ("%sscheduled" , QueueKeyPrefix (qname ))
}
func RetryKey (qname string ) string {
return fmt .Sprintf ("%sretry" , QueueKeyPrefix (qname ))
}
func ArchivedKey (qname string ) string {
return fmt .Sprintf ("%sarchived" , QueueKeyPrefix (qname ))
}
func LeaseKey (qname string ) string {
return fmt .Sprintf ("%slease" , QueueKeyPrefix (qname ))
}
func CompletedKey (qname string ) string {
return fmt .Sprintf ("%scompleted" , QueueKeyPrefix (qname ))
}
func PausedKey (qname string ) string {
return fmt .Sprintf ("%spaused" , QueueKeyPrefix (qname ))
}
func ProcessedTotalKey (qname string ) string {
return fmt .Sprintf ("%sprocessed" , QueueKeyPrefix (qname ))
}
func FailedTotalKey (qname string ) string {
return fmt .Sprintf ("%sfailed" , QueueKeyPrefix (qname ))
}
func ProcessedKey (qname string , t time .Time ) string {
return fmt .Sprintf ("%sprocessed:%s" , QueueKeyPrefix (qname ), t .UTC ().Format ("2006-01-02" ))
}
func FailedKey (qname string , t time .Time ) string {
return fmt .Sprintf ("%sfailed:%s" , QueueKeyPrefix (qname ), t .UTC ().Format ("2006-01-02" ))
}
func ServerInfoKey (hostname string , pid int , serverID string ) string {
return fmt .Sprintf ("asynq:servers:{%s:%d:%s}" , hostname , pid , serverID )
}
func WorkersKey (hostname string , pid int , serverID string ) string {
return fmt .Sprintf ("asynq:workers:{%s:%d:%s}" , hostname , pid , serverID )
}
func SchedulerEntriesKey (schedulerID string ) string {
return fmt .Sprintf ("asynq:schedulers:{%s}" , schedulerID )
}
func SchedulerHistoryKey (entryID string ) string {
return fmt .Sprintf ("asynq:scheduler_history:%s" , entryID )
}
func UniqueKey (qname , tasktype string , payload []byte ) string {
if payload == nil {
return fmt .Sprintf ("%sunique:%s:" , QueueKeyPrefix (qname ), tasktype )
}
checksum := md5 .Sum (payload )
return fmt .Sprintf ("%sunique:%s:%s" , QueueKeyPrefix (qname ), tasktype , hex .EncodeToString (checksum [:]))
}
func GroupKeyPrefix (qname string ) string {
return fmt .Sprintf ("%sg:" , QueueKeyPrefix (qname ))
}
func GroupKey (qname , gkey string ) string {
return fmt .Sprintf ("%s%s" , GroupKeyPrefix (qname ), gkey )
}
func AggregationSetKey (qname , gname , setID string ) string {
return fmt .Sprintf ("%s:%s" , GroupKey (qname , gname ), setID )
}
func AllGroups (qname string ) string {
return fmt .Sprintf ("%sgroups" , QueueKeyPrefix (qname ))
}
func AllAggregationSets (qname string ) string {
return fmt .Sprintf ("%saggregation_sets" , QueueKeyPrefix (qname ))
}
type TaskMessage struct {
Type string
Payload []byte
ID string
Queue string
Retry int
Retried int
ErrorMsg string
LastFailedAt int64
Timeout int64
Deadline int64
UniqueKey string
GroupKey string
Retention int64
CompletedAt int64
}
func EncodeMessage (msg *TaskMessage ) ([]byte , error ) {
if msg == nil {
return nil , fmt .Errorf ("cannot encode nil message" )
}
return proto .Marshal (&pb .TaskMessage {
Type : msg .Type ,
Payload : msg .Payload ,
Id : msg .ID ,
Queue : msg .Queue ,
Retry : int32 (msg .Retry ),
Retried : int32 (msg .Retried ),
ErrorMsg : msg .ErrorMsg ,
LastFailedAt : msg .LastFailedAt ,
Timeout : msg .Timeout ,
Deadline : msg .Deadline ,
UniqueKey : msg .UniqueKey ,
GroupKey : msg .GroupKey ,
Retention : msg .Retention ,
CompletedAt : msg .CompletedAt ,
})
}
func DecodeMessage (data []byte ) (*TaskMessage , error ) {
var pbmsg pb .TaskMessage
if err := proto .Unmarshal (data , &pbmsg ); err != nil {
return nil , err
}
return &TaskMessage {
Type : pbmsg .GetType (),
Payload : pbmsg .GetPayload (),
ID : pbmsg .GetId (),
Queue : pbmsg .GetQueue (),
Retry : int (pbmsg .GetRetry ()),
Retried : int (pbmsg .GetRetried ()),
ErrorMsg : pbmsg .GetErrorMsg (),
LastFailedAt : pbmsg .GetLastFailedAt (),
Timeout : pbmsg .GetTimeout (),
Deadline : pbmsg .GetDeadline (),
UniqueKey : pbmsg .GetUniqueKey (),
GroupKey : pbmsg .GetGroupKey (),
Retention : pbmsg .GetRetention (),
CompletedAt : pbmsg .GetCompletedAt (),
}, nil
}
type TaskInfo struct {
Message *TaskMessage
State TaskState
NextProcessAt time .Time
Result []byte
}
type Z struct {
Message *TaskMessage
Score int64
}
type ServerInfo struct {
Host string
PID int
ServerID string
Concurrency int
Queues map [string ]int
StrictPriority bool
Status string
Started time .Time
ActiveWorkerCount int
}
func EncodeServerInfo (info *ServerInfo ) ([]byte , error ) {
if info == nil {
return nil , fmt .Errorf ("cannot encode nil server info" )
}
queues := make (map [string ]int32 )
for q , p := range info .Queues {
queues [q ] = int32 (p )
}
started , err := ptypes .TimestampProto (info .Started )
if err != nil {
return nil , err
}
return proto .Marshal (&pb .ServerInfo {
Host : info .Host ,
Pid : int32 (info .PID ),
ServerId : info .ServerID ,
Concurrency : int32 (info .Concurrency ),
Queues : queues ,
StrictPriority : info .StrictPriority ,
Status : info .Status ,
StartTime : started ,
ActiveWorkerCount : int32 (info .ActiveWorkerCount ),
})
}
func DecodeServerInfo (b []byte ) (*ServerInfo , error ) {
var pbmsg pb .ServerInfo
if err := proto .Unmarshal (b , &pbmsg ); err != nil {
return nil , err
}
queues := make (map [string ]int )
for q , p := range pbmsg .GetQueues () {
queues [q ] = int (p )
}
startTime , err := ptypes .Timestamp (pbmsg .GetStartTime ())
if err != nil {
return nil , err
}
return &ServerInfo {
Host : pbmsg .GetHost (),
PID : int (pbmsg .GetPid ()),
ServerID : pbmsg .GetServerId (),
Concurrency : int (pbmsg .GetConcurrency ()),
Queues : queues ,
StrictPriority : pbmsg .GetStrictPriority (),
Status : pbmsg .GetStatus (),
Started : startTime ,
ActiveWorkerCount : int (pbmsg .GetActiveWorkerCount ()),
}, nil
}
type WorkerInfo struct {
Host string
PID int
ServerID string
ID string
Type string
Payload []byte
Queue string
Started time .Time
Deadline time .Time
}
func EncodeWorkerInfo (info *WorkerInfo ) ([]byte , error ) {
if info == nil {
return nil , fmt .Errorf ("cannot encode nil worker info" )
}
startTime , err := ptypes .TimestampProto (info .Started )
if err != nil {
return nil , err
}
deadline , err := ptypes .TimestampProto (info .Deadline )
if err != nil {
return nil , err
}
return proto .Marshal (&pb .WorkerInfo {
Host : info .Host ,
Pid : int32 (info .PID ),
ServerId : info .ServerID ,
TaskId : info .ID ,
TaskType : info .Type ,
TaskPayload : info .Payload ,
Queue : info .Queue ,
StartTime : startTime ,
Deadline : deadline ,
})
}
func DecodeWorkerInfo (b []byte ) (*WorkerInfo , error ) {
var pbmsg pb .WorkerInfo
if err := proto .Unmarshal (b , &pbmsg ); err != nil {
return nil , err
}
startTime , err := ptypes .Timestamp (pbmsg .GetStartTime ())
if err != nil {
return nil , err
}
deadline , err := ptypes .Timestamp (pbmsg .GetDeadline ())
if err != nil {
return nil , err
}
return &WorkerInfo {
Host : pbmsg .GetHost (),
PID : int (pbmsg .GetPid ()),
ServerID : pbmsg .GetServerId (),
ID : pbmsg .GetTaskId (),
Type : pbmsg .GetTaskType (),
Payload : pbmsg .GetTaskPayload (),
Queue : pbmsg .GetQueue (),
Started : startTime ,
Deadline : deadline ,
}, nil
}
type SchedulerEntry struct {
ID string
Spec string
Type string
Payload []byte
Opts []string
Next time .Time
Prev time .Time
}
func EncodeSchedulerEntry (entry *SchedulerEntry ) ([]byte , error ) {
if entry == nil {
return nil , fmt .Errorf ("cannot encode nil scheduler entry" )
}
next , err := ptypes .TimestampProto (entry .Next )
if err != nil {
return nil , err
}
prev , err := ptypes .TimestampProto (entry .Prev )
if err != nil {
return nil , err
}
return proto .Marshal (&pb .SchedulerEntry {
Id : entry .ID ,
Spec : entry .Spec ,
TaskType : entry .Type ,
TaskPayload : entry .Payload ,
EnqueueOptions : entry .Opts ,
NextEnqueueTime : next ,
PrevEnqueueTime : prev ,
})
}
func DecodeSchedulerEntry (b []byte ) (*SchedulerEntry , error ) {
var pbmsg pb .SchedulerEntry
if err := proto .Unmarshal (b , &pbmsg ); err != nil {
return nil , err
}
next , err := ptypes .Timestamp (pbmsg .GetNextEnqueueTime ())
if err != nil {
return nil , err
}
prev , err := ptypes .Timestamp (pbmsg .GetPrevEnqueueTime ())
if err != nil {
return nil , err
}
return &SchedulerEntry {
ID : pbmsg .GetId (),
Spec : pbmsg .GetSpec (),
Type : pbmsg .GetTaskType (),
Payload : pbmsg .GetTaskPayload (),
Opts : pbmsg .GetEnqueueOptions (),
Next : next ,
Prev : prev ,
}, nil
}
type SchedulerEnqueueEvent struct {
TaskID string
EnqueuedAt time .Time
}
func EncodeSchedulerEnqueueEvent (event *SchedulerEnqueueEvent ) ([]byte , error ) {
if event == nil {
return nil , fmt .Errorf ("cannot encode nil enqueue event" )
}
enqueuedAt , err := ptypes .TimestampProto (event .EnqueuedAt )
if err != nil {
return nil , err
}
return proto .Marshal (&pb .SchedulerEnqueueEvent {
TaskId : event .TaskID ,
EnqueueTime : enqueuedAt ,
})
}
func DecodeSchedulerEnqueueEvent (b []byte ) (*SchedulerEnqueueEvent , error ) {
var pbmsg pb .SchedulerEnqueueEvent
if err := proto .Unmarshal (b , &pbmsg ); err != nil {
return nil , err
}
enqueuedAt , err := ptypes .Timestamp (pbmsg .GetEnqueueTime ())
if err != nil {
return nil , err
}
return &SchedulerEnqueueEvent {
TaskID : pbmsg .GetTaskId (),
EnqueuedAt : enqueuedAt ,
}, nil
}
type Cancelations struct {
mu sync .Mutex
cancelFuncs map [string ]context .CancelFunc
}
func NewCancelations () *Cancelations {
return &Cancelations {
cancelFuncs : make (map [string ]context .CancelFunc ),
}
}
func (c *Cancelations ) Add (id string , fn context .CancelFunc ) {
c .mu .Lock ()
defer c .mu .Unlock ()
c .cancelFuncs [id ] = fn
}
func (c *Cancelations ) Delete (id string ) {
c .mu .Lock ()
defer c .mu .Unlock ()
delete (c .cancelFuncs , id )
}
func (c *Cancelations ) Get (id string ) (fn context .CancelFunc , ok bool ) {
c .mu .Lock ()
defer c .mu .Unlock ()
fn , ok = c .cancelFuncs [id ]
return fn , ok
}
type Lease struct {
once sync .Once
ch chan struct {}
Clock timeutil .Clock
mu sync .Mutex
expireAt time .Time
}
func NewLease (expirationTime time .Time ) *Lease {
return &Lease {
ch : make (chan struct {}),
expireAt : expirationTime ,
Clock : timeutil .NewRealClock (),
}
}
func (l *Lease ) Reset (expirationTime time .Time ) bool {
if !l .IsValid () {
return false
}
l .mu .Lock ()
defer l .mu .Unlock ()
l .expireAt = expirationTime
return true
}
func (l *Lease ) NotifyExpiration () bool {
if l .IsValid () {
return false
}
l .once .Do (l .closeCh )
return true
}
func (l *Lease ) closeCh () {
close (l .ch )
}
func (l *Lease ) Done () <-chan struct {} {
return l .ch
}
func (l *Lease ) Deadline () time .Time {
l .mu .Lock ()
defer l .mu .Unlock ()
return l .expireAt
}
func (l *Lease ) IsValid () bool {
now := l .Clock .Now ()
l .mu .Lock ()
defer l .mu .Unlock ()
return l .expireAt .After (now ) || l .expireAt .Equal (now )
}
type Broker interface {
Ping () error
Close () error
Enqueue (ctx context .Context , msg *TaskMessage ) error
EnqueueUnique (ctx context .Context , msg *TaskMessage , ttl time .Duration ) error
Dequeue (qnames ...string ) (*TaskMessage , time .Time , error )
Done (ctx context .Context , msg *TaskMessage ) error
MarkAsComplete (ctx context .Context , msg *TaskMessage ) error
Requeue (ctx context .Context , msg *TaskMessage ) error
Schedule (ctx context .Context , msg *TaskMessage , processAt time .Time ) error
ScheduleUnique (ctx context .Context , msg *TaskMessage , processAt time .Time , ttl time .Duration ) error
Retry (ctx context .Context , msg *TaskMessage , processAt time .Time , errMsg string , isFailure bool ) error
Archive (ctx context .Context , msg *TaskMessage , errMsg string ) error
ForwardIfReady (qnames ...string ) error
AddToGroup (ctx context .Context , msg *TaskMessage , gname string ) error
AddToGroupUnique (ctx context .Context , msg *TaskMessage , groupKey string , ttl time .Duration ) error
ListGroups (qname string ) ([]string , error )
AggregationCheck (qname, gname string , t time .Time , gracePeriod, maxDelay time .Duration , maxSize int ) (aggregationSetID string , err error )
ReadAggregationSet (qname, gname, aggregationSetID string ) ([]*TaskMessage , time .Time , error )
DeleteAggregationSet (ctx context .Context , qname, gname, aggregationSetID string ) error
ReclaimStaleAggregationSets (qname string ) error
DeleteExpiredCompletedTasks (qname string ) error
ListLeaseExpired (cutoff time .Time , qnames ...string ) ([]*TaskMessage , error )
ExtendLease (qname string , ids ...string ) (time .Time , error )
WriteServerState (info *ServerInfo , workers []*WorkerInfo , ttl time .Duration ) error
ClearServerState (host string , pid int , serverID string ) error
CancelationPubSub () (*redis .PubSub , error )
PublishCancelation (id string ) error
WriteResult (qname, id string , data []byte ) (n int , err error )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .