// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

// Package base defines foundational types and constants used in asynq package.
package base import ( pb ) // Version of asynq library and CLI. const Version = "0.24.1" // DefaultQueueName is the queue name used if none are specified by user. const DefaultQueueName = "default" // DefaultQueue is the redis key for the default queue. var DefaultQueue = PendingKey(DefaultQueueName) // Global Redis keys. const ( AllServers = "asynq:servers" // ZSET AllWorkers = "asynq:workers" // ZSET AllSchedulers = "asynq:schedulers" // ZSET AllQueues = "asynq:queues" // SET CancelChannel = "asynq:cancel" // PubSub channel ) // TaskState denotes the state of a task. type TaskState int const ( TaskStateActive TaskState = iota + 1 TaskStatePending TaskStateScheduled TaskStateRetry TaskStateArchived TaskStateCompleted TaskStateAggregating // describes a state where task is waiting in a group to be aggregated ) func ( TaskState) () string { switch { case TaskStateActive: return "active" case TaskStatePending: return "pending" case TaskStateScheduled: return "scheduled" case TaskStateRetry: return "retry" case TaskStateArchived: return "archived" case TaskStateCompleted: return "completed" case TaskStateAggregating: return "aggregating" } panic(fmt.Sprintf("internal error: unknown task state %d", )) } func ( string) (TaskState, error) { switch { case "active": return TaskStateActive, nil case "pending": return TaskStatePending, nil case "scheduled": return TaskStateScheduled, nil case "retry": return TaskStateRetry, nil case "archived": return TaskStateArchived, nil case "completed": return TaskStateCompleted, nil case "aggregating": return TaskStateAggregating, nil } return 0, errors.E(errors.FailedPrecondition, fmt.Sprintf("%q is not supported task state", )) } // ValidateQueueName validates a given qname to be used as a queue name. // Returns nil if valid, otherwise returns non-nil error. func ( string) error { if len(strings.TrimSpace()) == 0 { return fmt.Errorf("queue name must contain one or more characters") } return nil } // QueueKeyPrefix returns a prefix for all keys in the given queue. func ( string) string { return fmt.Sprintf("asynq:{%s}:", ) } // TaskKeyPrefix returns a prefix for task key. func ( string) string { return fmt.Sprintf("%st:", QueueKeyPrefix()) } // TaskKey returns a redis key for the given task message. func (, string) string { return fmt.Sprintf("%s%s", TaskKeyPrefix(), ) } // PendingKey returns a redis key for the given queue name. func ( string) string { return fmt.Sprintf("%spending", QueueKeyPrefix()) } // ActiveKey returns a redis key for the active tasks. func ( string) string { return fmt.Sprintf("%sactive", QueueKeyPrefix()) } // ScheduledKey returns a redis key for the scheduled tasks. func ( string) string { return fmt.Sprintf("%sscheduled", QueueKeyPrefix()) } // RetryKey returns a redis key for the retry tasks. func ( string) string { return fmt.Sprintf("%sretry", QueueKeyPrefix()) } // ArchivedKey returns a redis key for the archived tasks. func ( string) string { return fmt.Sprintf("%sarchived", QueueKeyPrefix()) } // LeaseKey returns a redis key for the lease. func ( string) string { return fmt.Sprintf("%slease", QueueKeyPrefix()) } func ( string) string { return fmt.Sprintf("%scompleted", QueueKeyPrefix()) } // PausedKey returns a redis key to indicate that the given queue is paused. func ( string) string { return fmt.Sprintf("%spaused", QueueKeyPrefix()) } // ProcessedTotalKey returns a redis key for total processed count for the given queue. func ( string) string { return fmt.Sprintf("%sprocessed", QueueKeyPrefix()) } // FailedTotalKey returns a redis key for total failure count for the given queue. func ( string) string { return fmt.Sprintf("%sfailed", QueueKeyPrefix()) } // ProcessedKey returns a redis key for processed count for the given day for the queue. func ( string, time.Time) string { return fmt.Sprintf("%sprocessed:%s", QueueKeyPrefix(), .UTC().Format("2006-01-02")) } // FailedKey returns a redis key for failure count for the given day for the queue. func ( string, time.Time) string { return fmt.Sprintf("%sfailed:%s", QueueKeyPrefix(), .UTC().Format("2006-01-02")) } // ServerInfoKey returns a redis key for process info. func ( string, int, string) string { return fmt.Sprintf("asynq:servers:{%s:%d:%s}", , , ) } // WorkersKey returns a redis key for the workers given hostname, pid, and server ID. func ( string, int, string) string { return fmt.Sprintf("asynq:workers:{%s:%d:%s}", , , ) } // SchedulerEntriesKey returns a redis key for the scheduler entries given scheduler ID. func ( string) string { return fmt.Sprintf("asynq:schedulers:{%s}", ) } // SchedulerHistoryKey returns a redis key for the scheduler's history for the given entry. func ( string) string { return fmt.Sprintf("asynq:scheduler_history:%s", ) } // UniqueKey returns a redis key with the given type, payload, and queue name. func (, string, []byte) string { if == nil { return fmt.Sprintf("%sunique:%s:", QueueKeyPrefix(), ) } := md5.Sum() return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(), , hex.EncodeToString([:])) } // GroupKeyPrefix returns a prefix for group key. func ( string) string { return fmt.Sprintf("%sg:", QueueKeyPrefix()) } // GroupKey returns a redis key used to group tasks belong in the same group. func (, string) string { return fmt.Sprintf("%s%s", GroupKeyPrefix(), ) } // AggregationSetKey returns a redis key used for an aggregation set. func (, , string) string { return fmt.Sprintf("%s:%s", GroupKey(, ), ) } // AllGroups return a redis key used to store all group keys used in a given queue. func ( string) string { return fmt.Sprintf("%sgroups", QueueKeyPrefix()) } // AllAggregationSets returns a redis key used to store all aggregation sets (set of tasks staged to be aggregated) // in a given queue. func ( string) string { return fmt.Sprintf("%saggregation_sets", QueueKeyPrefix()) } // TaskMessage is the internal representation of a task with additional metadata fields. // Serialized data of this type gets written to redis. type TaskMessage struct { // Type indicates the kind of the task to be performed. Type string // Payload holds data needed to process the task. Payload []byte // ID is a unique identifier for each task. ID string // Queue is a name this message should be enqueued to. Queue string // Retry is the max number of retry for this task. Retry int // Retried is the number of times we've retried this task so far. Retried int // ErrorMsg holds the error message from the last failure. ErrorMsg string // Time of last failure in Unix time, // the number of seconds elapsed since January 1, 1970 UTC. // // Use zero to indicate no last failure LastFailedAt int64 // Timeout specifies timeout in seconds. // If task processing doesn't complete within the timeout, the task will be retried // if retry count is remaining. Otherwise it will be moved to the archive. // // Use zero to indicate no timeout. Timeout int64 // Deadline specifies the deadline for the task in Unix time, // the number of seconds elapsed since January 1, 1970 UTC. // If task processing doesn't complete before the deadline, the task will be retried // if retry count is remaining. Otherwise it will be moved to the archive. // // Use zero to indicate no deadline. Deadline int64 // UniqueKey holds the redis key used for uniqueness lock for this task. // // Empty string indicates that no uniqueness lock was used. UniqueKey string // GroupKey holds the group key used for task aggregation. // // Empty string indicates no aggregation is used for this task. GroupKey string // Retention specifies the number of seconds the task should be retained after completion. Retention int64 // CompletedAt is the time the task was processed successfully in Unix time, // the number of seconds elapsed since January 1, 1970 UTC. // // Use zero to indicate no value. CompletedAt int64 } // EncodeMessage marshals the given task message and returns an encoded bytes. func ( *TaskMessage) ([]byte, error) { if == nil { return nil, fmt.Errorf("cannot encode nil message") } return proto.Marshal(&pb.TaskMessage{ Type: .Type, Payload: .Payload, Id: .ID, Queue: .Queue, Retry: int32(.Retry), Retried: int32(.Retried), ErrorMsg: .ErrorMsg, LastFailedAt: .LastFailedAt, Timeout: .Timeout, Deadline: .Deadline, UniqueKey: .UniqueKey, GroupKey: .GroupKey, Retention: .Retention, CompletedAt: .CompletedAt, }) } // DecodeMessage unmarshals the given bytes and returns a decoded task message. func ( []byte) (*TaskMessage, error) { var pb.TaskMessage if := proto.Unmarshal(, &); != nil { return nil, } return &TaskMessage{ Type: .GetType(), Payload: .GetPayload(), ID: .GetId(), Queue: .GetQueue(), Retry: int(.GetRetry()), Retried: int(.GetRetried()), ErrorMsg: .GetErrorMsg(), LastFailedAt: .GetLastFailedAt(), Timeout: .GetTimeout(), Deadline: .GetDeadline(), UniqueKey: .GetUniqueKey(), GroupKey: .GetGroupKey(), Retention: .GetRetention(), CompletedAt: .GetCompletedAt(), }, nil } // TaskInfo describes a task message and its metadata. type TaskInfo struct { Message *TaskMessage State TaskState NextProcessAt time.Time Result []byte } // Z represents sorted set member. type Z struct { Message *TaskMessage Score int64 } // ServerInfo holds information about a running server. type ServerInfo struct { Host string PID int ServerID string Concurrency int Queues map[string]int StrictPriority bool Status string Started time.Time ActiveWorkerCount int } // EncodeServerInfo marshals the given ServerInfo and returns the encoded bytes. func ( *ServerInfo) ([]byte, error) { if == nil { return nil, fmt.Errorf("cannot encode nil server info") } := make(map[string]int32) for , := range .Queues { [] = int32() } , := ptypes.TimestampProto(.Started) if != nil { return nil, } return proto.Marshal(&pb.ServerInfo{ Host: .Host, Pid: int32(.PID), ServerId: .ServerID, Concurrency: int32(.Concurrency), Queues: , StrictPriority: .StrictPriority, Status: .Status, StartTime: , ActiveWorkerCount: int32(.ActiveWorkerCount), }) } // DecodeServerInfo decodes the given bytes into ServerInfo. func ( []byte) (*ServerInfo, error) { var pb.ServerInfo if := proto.Unmarshal(, &); != nil { return nil, } := make(map[string]int) for , := range .GetQueues() { [] = int() } , := ptypes.Timestamp(.GetStartTime()) if != nil { return nil, } return &ServerInfo{ Host: .GetHost(), PID: int(.GetPid()), ServerID: .GetServerId(), Concurrency: int(.GetConcurrency()), Queues: , StrictPriority: .GetStrictPriority(), Status: .GetStatus(), Started: , ActiveWorkerCount: int(.GetActiveWorkerCount()), }, nil } // WorkerInfo holds information about a running worker. type WorkerInfo struct { Host string PID int ServerID string ID string Type string Payload []byte Queue string Started time.Time Deadline time.Time } // EncodeWorkerInfo marshals the given WorkerInfo and returns the encoded bytes. func ( *WorkerInfo) ([]byte, error) { if == nil { return nil, fmt.Errorf("cannot encode nil worker info") } , := ptypes.TimestampProto(.Started) if != nil { return nil, } , := ptypes.TimestampProto(.Deadline) if != nil { return nil, } return proto.Marshal(&pb.WorkerInfo{ Host: .Host, Pid: int32(.PID), ServerId: .ServerID, TaskId: .ID, TaskType: .Type, TaskPayload: .Payload, Queue: .Queue, StartTime: , Deadline: , }) } // DecodeWorkerInfo decodes the given bytes into WorkerInfo. func ( []byte) (*WorkerInfo, error) { var pb.WorkerInfo if := proto.Unmarshal(, &); != nil { return nil, } , := ptypes.Timestamp(.GetStartTime()) if != nil { return nil, } , := ptypes.Timestamp(.GetDeadline()) if != nil { return nil, } return &WorkerInfo{ Host: .GetHost(), PID: int(.GetPid()), ServerID: .GetServerId(), ID: .GetTaskId(), Type: .GetTaskType(), Payload: .GetTaskPayload(), Queue: .GetQueue(), Started: , Deadline: , }, nil } // SchedulerEntry holds information about a periodic task registered with a scheduler. type SchedulerEntry struct { // Identifier of this entry. ID string // Spec describes the schedule of this entry. Spec string // Type is the task type of the periodic task. Type string // Payload is the payload of the periodic task. Payload []byte // Opts is the options for the periodic task. Opts []string // Next shows the next time the task will be enqueued. Next time.Time // Prev shows the last time the task was enqueued. // Zero time if task was never enqueued. Prev time.Time } // EncodeSchedulerEntry marshals the given entry and returns an encoded bytes. func ( *SchedulerEntry) ([]byte, error) { if == nil { return nil, fmt.Errorf("cannot encode nil scheduler entry") } , := ptypes.TimestampProto(.Next) if != nil { return nil, } , := ptypes.TimestampProto(.Prev) if != nil { return nil, } return proto.Marshal(&pb.SchedulerEntry{ Id: .ID, Spec: .Spec, TaskType: .Type, TaskPayload: .Payload, EnqueueOptions: .Opts, NextEnqueueTime: , PrevEnqueueTime: , }) } // DecodeSchedulerEntry unmarshals the given bytes and returns a decoded SchedulerEntry. func ( []byte) (*SchedulerEntry, error) { var pb.SchedulerEntry if := proto.Unmarshal(, &); != nil { return nil, } , := ptypes.Timestamp(.GetNextEnqueueTime()) if != nil { return nil, } , := ptypes.Timestamp(.GetPrevEnqueueTime()) if != nil { return nil, } return &SchedulerEntry{ ID: .GetId(), Spec: .GetSpec(), Type: .GetTaskType(), Payload: .GetTaskPayload(), Opts: .GetEnqueueOptions(), Next: , Prev: , }, nil } // SchedulerEnqueueEvent holds information about an enqueue event by a scheduler. type SchedulerEnqueueEvent struct { // ID of the task that was enqueued. TaskID string // Time the task was enqueued. EnqueuedAt time.Time } // EncodeSchedulerEnqueueEvent marshals the given event // and returns an encoded bytes. func ( *SchedulerEnqueueEvent) ([]byte, error) { if == nil { return nil, fmt.Errorf("cannot encode nil enqueue event") } , := ptypes.TimestampProto(.EnqueuedAt) if != nil { return nil, } return proto.Marshal(&pb.SchedulerEnqueueEvent{ TaskId: .TaskID, EnqueueTime: , }) } // DecodeSchedulerEnqueueEvent unmarshals the given bytes // and returns a decoded SchedulerEnqueueEvent. func ( []byte) (*SchedulerEnqueueEvent, error) { var pb.SchedulerEnqueueEvent if := proto.Unmarshal(, &); != nil { return nil, } , := ptypes.Timestamp(.GetEnqueueTime()) if != nil { return nil, } return &SchedulerEnqueueEvent{ TaskID: .GetTaskId(), EnqueuedAt: , }, nil } // Cancelations is a collection that holds cancel functions for all active tasks. // // Cancelations are safe for concurrent use by multiple goroutines. type Cancelations struct { mu sync.Mutex cancelFuncs map[string]context.CancelFunc } // NewCancelations returns a Cancelations instance. func () *Cancelations { return &Cancelations{ cancelFuncs: make(map[string]context.CancelFunc), } } // Add adds a new cancel func to the collection. func ( *Cancelations) ( string, context.CancelFunc) { .mu.Lock() defer .mu.Unlock() .cancelFuncs[] = } // Delete deletes a cancel func from the collection given an id. func ( *Cancelations) ( string) { .mu.Lock() defer .mu.Unlock() delete(.cancelFuncs, ) } // Get returns a cancel func given an id. func ( *Cancelations) ( string) ( context.CancelFunc, bool) { .mu.Lock() defer .mu.Unlock() , = .cancelFuncs[] return , } // Lease is a time bound lease for worker to process task. // It provides a communication channel between lessor and lessee about lease expiration. type Lease struct { once sync.Once ch chan struct{} Clock timeutil.Clock mu sync.Mutex expireAt time.Time // guarded by mu } func ( time.Time) *Lease { return &Lease{ ch: make(chan struct{}), expireAt: , Clock: timeutil.NewRealClock(), } } // Reset changes the lease to expire at the given time. // It returns true if the lease is still valid and reset operation was successful, false if the lease had been expired. func ( *Lease) ( time.Time) bool { if !.IsValid() { return false } .mu.Lock() defer .mu.Unlock() .expireAt = return true } // Sends a notification to lessee about expired lease // Returns true if notification was sent, returns false if the lease is still valid and notification was not sent. func ( *Lease) () bool { if .IsValid() { return false } .once.Do(.closeCh) return true } func ( *Lease) () { close(.ch) } // Done returns a communication channel from which the lessee can read to get notified when lessor notifies about lease expiration. func ( *Lease) () <-chan struct{} { return .ch } // Deadline returns the expiration time of the lease. func ( *Lease) () time.Time { .mu.Lock() defer .mu.Unlock() return .expireAt } // IsValid returns true if the lease's expiration time is in the future or equals to the current time, // returns false otherwise. func ( *Lease) () bool { := .Clock.Now() .mu.Lock() defer .mu.Unlock() return .expireAt.After() || .expireAt.Equal() } // Broker is a message broker that supports operations to manage task queues. // // See rdb.RDB as a reference implementation. type Broker interface { Ping() error Close() error Enqueue(ctx context.Context, msg *TaskMessage) error EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error Dequeue(qnames ...string) (*TaskMessage, time.Time, error) Done(ctx context.Context, msg *TaskMessage) error MarkAsComplete(ctx context.Context, msg *TaskMessage) error Requeue(ctx context.Context, msg *TaskMessage) error Schedule(ctx context.Context, msg *TaskMessage, processAt time.Time) error ScheduleUnique(ctx context.Context, msg *TaskMessage, processAt time.Time, ttl time.Duration) error Retry(ctx context.Context, msg *TaskMessage, processAt time.Time, errMsg string, isFailure bool) error Archive(ctx context.Context, msg *TaskMessage, errMsg string) error ForwardIfReady(qnames ...string) error // Group aggregation related methods AddToGroup(ctx context.Context, msg *TaskMessage, gname string) error AddToGroupUnique(ctx context.Context, msg *TaskMessage, groupKey string, ttl time.Duration) error ListGroups(qname string) ([]string, error) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (aggregationSetID string, err error) ReadAggregationSet(qname, gname, aggregationSetID string) ([]*TaskMessage, time.Time, error) DeleteAggregationSet(ctx context.Context, qname, gname, aggregationSetID string) error ReclaimStaleAggregationSets(qname string) error // Task retention related method DeleteExpiredCompletedTasks(qname string) error // Lease related methods ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error) ExtendLease(qname string, ids ...string) (time.Time, error) // State snapshot related methods WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error ClearServerState(host string, pid int, serverID string) error // Cancelation related methods CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers PublishCancelation(id string) error WriteResult(qname, id string, data []byte) (n int, err error) }