package base

Import Path
	github.com/hibiken/asynq/internal/base (on go.dev)

Dependency Relation
	imports 13 packages, and imported by 3 packages

Involved Source Files Package base defines foundational types and constants used in asynq package.
Package-Level Type Names (total 11)
/* sort by: | */
Broker is a message broker that supports operations to manage task queues. See rdb.RDB as a reference implementation. Group aggregation related methods ( Broker) AddToGroupUnique(ctx context.Context, msg *TaskMessage, groupKey string, ttl time.Duration) error ( Broker) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (aggregationSetID string, err error) ( Broker) Archive(ctx context.Context, msg *TaskMessage, errMsg string) error Cancelation related methods // TODO: Need to decouple from redis to support other brokers ( Broker) ClearServerState(host string, pid int, serverID string) error ( Broker) Close() error ( Broker) DeleteAggregationSet(ctx context.Context, qname, gname, aggregationSetID string) error Task retention related method ( Broker) Dequeue(qnames ...string) (*TaskMessage, time.Time, error) ( Broker) Done(ctx context.Context, msg *TaskMessage) error ( Broker) Enqueue(ctx context.Context, msg *TaskMessage) error ( Broker) EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error ( Broker) ExtendLease(qname string, ids ...string) (time.Time, error) ( Broker) ForwardIfReady(qnames ...string) error ( Broker) ListGroups(qname string) ([]string, error) Lease related methods ( Broker) MarkAsComplete(ctx context.Context, msg *TaskMessage) error ( Broker) Ping() error ( Broker) PublishCancelation(id string) error ( Broker) ReadAggregationSet(qname, gname, aggregationSetID string) ([]*TaskMessage, time.Time, error) ( Broker) ReclaimStaleAggregationSets(qname string) error ( Broker) Requeue(ctx context.Context, msg *TaskMessage) error ( Broker) Retry(ctx context.Context, msg *TaskMessage, processAt time.Time, errMsg string, isFailure bool) error ( Broker) Schedule(ctx context.Context, msg *TaskMessage, processAt time.Time) error ( Broker) ScheduleUnique(ctx context.Context, msg *TaskMessage, processAt time.Time, ttl time.Duration) error ( Broker) WriteResult(qname, id string, data []byte) (n int, err error) State snapshot related methods *github.com/hibiken/asynq/internal/rdb.RDB Broker : github.com/prometheus/common/expfmt.Closer Broker : io.Closer
Cancelations is a collection that holds cancel functions for all active tasks. Cancelations are safe for concurrent use by multiple goroutines. Add adds a new cancel func to the collection. Delete deletes a cancel func from the collection given an id. Get returns a cancel func given an id. func NewCancelations() *Cancelations
Lease is a time bound lease for worker to process task. It provides a communication channel between lessor and lessee about lease expiration. Clock timeutil.Clock Deadline returns the expiration time of the lease. Done returns a communication channel from which the lessee can read to get notified when lessor notifies about lease expiration. IsValid returns true if the lease's expiration time is in the future or equals to the current time, returns false otherwise. Sends a notification to lessee about expired lease Returns true if notification was sent, returns false if the lease is still valid and notification was not sent. Reset changes the lease to expire at the given time. It returns true if the lease is still valid and reset operation was successful, false if the lease had been expired. *Lease : database/sql/driver.Validator func NewLease(expirationTime time.Time) *Lease
SchedulerEnqueueEvent holds information about an enqueue event by a scheduler. Time the task was enqueued. ID of the task that was enqueued. func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).ListSchedulerEnqueueEvents(entryID string, pgn rdb.Pagination) ([]*SchedulerEnqueueEvent, error) func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).RecordSchedulerEnqueueEvent(entryID string, event *SchedulerEnqueueEvent) error
SchedulerEntry holds information about a periodic task registered with a scheduler. Identifier of this entry. Next shows the next time the task will be enqueued. Opts is the options for the periodic task. Payload is the payload of the periodic task. Prev shows the last time the task was enqueued. Zero time if task was never enqueued. Spec describes the schedule of this entry. Type is the task type of the periodic task. func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).ListSchedulerEntries() ([]*SchedulerEntry, error) func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).WriteSchedulerEntries(schedulerID string, entries []*SchedulerEntry, ttl time.Duration) error
ServerInfo holds information about a running server. ActiveWorkerCount int Concurrency int Host string PID int Queues map[string]int ServerID string Started time.Time Status string StrictPriority bool func DecodeServerInfo(b []byte) (*ServerInfo, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).ListServers() ([]*ServerInfo, error) func EncodeServerInfo(info *ServerInfo) ([]byte, error) func Broker.WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error func github.com/hibiken/asynq/internal/rdb.(*RDB).WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
TaskInfo describes a task message and its metadata. Message *TaskMessage NextProcessAt time.Time Result []byte State TaskState func github.com/hibiken/asynq/internal/rdb.(*RDB).GetTaskInfo(qname, id string) (*TaskInfo, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).ListActive(qname string, pgn rdb.Pagination) ([]*TaskInfo, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).ListAggregating(qname, gname string, pgn rdb.Pagination) ([]*TaskInfo, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).ListArchived(qname string, pgn rdb.Pagination) ([]*TaskInfo, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).ListCompleted(qname string, pgn rdb.Pagination) ([]*TaskInfo, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).ListPending(qname string, pgn rdb.Pagination) ([]*TaskInfo, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).ListRetry(qname string, pgn rdb.Pagination) ([]*TaskInfo, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).ListScheduled(qname string, pgn rdb.Pagination) ([]*TaskInfo, error)
TaskMessage is the internal representation of a task with additional metadata fields. Serialized data of this type gets written to redis. CompletedAt is the time the task was processed successfully in Unix time, the number of seconds elapsed since January 1, 1970 UTC. Use zero to indicate no value. Deadline specifies the deadline for the task in Unix time, the number of seconds elapsed since January 1, 1970 UTC. If task processing doesn't complete before the deadline, the task will be retried if retry count is remaining. Otherwise it will be moved to the archive. Use zero to indicate no deadline. ErrorMsg holds the error message from the last failure. GroupKey holds the group key used for task aggregation. Empty string indicates no aggregation is used for this task. ID is a unique identifier for each task. Time of last failure in Unix time, the number of seconds elapsed since January 1, 1970 UTC. Use zero to indicate no last failure Payload holds data needed to process the task. Queue is a name this message should be enqueued to. Retention specifies the number of seconds the task should be retained after completion. Retried is the number of times we've retried this task so far. Retry is the max number of retry for this task. Timeout specifies timeout in seconds. If task processing doesn't complete within the timeout, the task will be retried if retry count is remaining. Otherwise it will be moved to the archive. Use zero to indicate no timeout. Type indicates the kind of the task to be performed. UniqueKey holds the redis key used for uniqueness lock for this task. Empty string indicates that no uniqueness lock was used. func DecodeMessage(data []byte) (*TaskMessage, error) func Broker.Dequeue(qnames ...string) (*TaskMessage, time.Time, error) func Broker.ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error) func Broker.ReadAggregationSet(qname, gname, aggregationSetID string) ([]*TaskMessage, time.Time, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).Dequeue(qnames ...string) (msg *TaskMessage, leaseExpirationTime time.Time, err error) func github.com/hibiken/asynq/internal/rdb.(*RDB).ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).ReadAggregationSet(qname, gname, setID string) ([]*TaskMessage, time.Time, error) func EncodeMessage(msg *TaskMessage) ([]byte, error) func Broker.AddToGroup(ctx context.Context, msg *TaskMessage, gname string) error func Broker.AddToGroupUnique(ctx context.Context, msg *TaskMessage, groupKey string, ttl time.Duration) error func Broker.Archive(ctx context.Context, msg *TaskMessage, errMsg string) error func Broker.Done(ctx context.Context, msg *TaskMessage) error func Broker.Enqueue(ctx context.Context, msg *TaskMessage) error func Broker.EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error func Broker.MarkAsComplete(ctx context.Context, msg *TaskMessage) error func Broker.Requeue(ctx context.Context, msg *TaskMessage) error func Broker.Retry(ctx context.Context, msg *TaskMessage, processAt time.Time, errMsg string, isFailure bool) error func Broker.Schedule(ctx context.Context, msg *TaskMessage, processAt time.Time) error func Broker.ScheduleUnique(ctx context.Context, msg *TaskMessage, processAt time.Time, ttl time.Duration) error func github.com/hibiken/asynq/internal/context.New(base context.Context, msg *TaskMessage, deadline time.Time) (context.Context, context.CancelFunc) func github.com/hibiken/asynq/internal/rdb.(*RDB).AddToGroup(ctx context.Context, msg *TaskMessage, groupKey string) error func github.com/hibiken/asynq/internal/rdb.(*RDB).AddToGroupUnique(ctx context.Context, msg *TaskMessage, groupKey string, ttl time.Duration) error func github.com/hibiken/asynq/internal/rdb.(*RDB).Archive(ctx context.Context, msg *TaskMessage, errMsg string) error func github.com/hibiken/asynq/internal/rdb.(*RDB).Done(ctx context.Context, msg *TaskMessage) error func github.com/hibiken/asynq/internal/rdb.(*RDB).Enqueue(ctx context.Context, msg *TaskMessage) error func github.com/hibiken/asynq/internal/rdb.(*RDB).EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error func github.com/hibiken/asynq/internal/rdb.(*RDB).MarkAsComplete(ctx context.Context, msg *TaskMessage) error func github.com/hibiken/asynq/internal/rdb.(*RDB).Requeue(ctx context.Context, msg *TaskMessage) error func github.com/hibiken/asynq/internal/rdb.(*RDB).Retry(ctx context.Context, msg *TaskMessage, processAt time.Time, errMsg string, isFailure bool) error func github.com/hibiken/asynq/internal/rdb.(*RDB).Schedule(ctx context.Context, msg *TaskMessage, processAt time.Time) error func github.com/hibiken/asynq/internal/rdb.(*RDB).ScheduleUnique(ctx context.Context, msg *TaskMessage, processAt time.Time, ttl time.Duration) error
TaskState denotes the state of a task. ( TaskState) String() string TaskState : expvar.Var TaskState : fmt.Stringer func TaskStateFromString(s string) (TaskState, error) const TaskStateActive const TaskStateAggregating const TaskStateArchived const TaskStateCompleted const TaskStatePending const TaskStateRetry const TaskStateScheduled
WorkerInfo holds information about a running worker. Deadline time.Time Host string ID string PID int Payload []byte Queue string ServerID string Started time.Time Type string func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) func github.com/hibiken/asynq/internal/rdb.(*RDB).ListWorkers() ([]*WorkerInfo, error) func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) func Broker.WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error func github.com/hibiken/asynq/internal/rdb.(*RDB).WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
Z represents sorted set member. Message *TaskMessage Score int64
Package-Level Functions (total 39)
ActiveKey returns a redis key for the active tasks.
AggregationSetKey returns a redis key used for an aggregation set.
AllAggregationSets returns a redis key used to store all aggregation sets (set of tasks staged to be aggregated) in a given queue.
AllGroups return a redis key used to store all group keys used in a given queue.
ArchivedKey returns a redis key for the archived tasks.
DecodeMessage unmarshals the given bytes and returns a decoded task message.
DecodeSchedulerEnqueueEvent unmarshals the given bytes and returns a decoded SchedulerEnqueueEvent.
DecodeSchedulerEntry unmarshals the given bytes and returns a decoded SchedulerEntry.
DecodeServerInfo decodes the given bytes into ServerInfo.
DecodeWorkerInfo decodes the given bytes into WorkerInfo.
EncodeMessage marshals the given task message and returns an encoded bytes.
EncodeSchedulerEnqueueEvent marshals the given event and returns an encoded bytes.
EncodeSchedulerEntry marshals the given entry and returns an encoded bytes.
EncodeServerInfo marshals the given ServerInfo and returns the encoded bytes.
EncodeWorkerInfo marshals the given WorkerInfo and returns the encoded bytes.
FailedKey returns a redis key for failure count for the given day for the queue.
FailedTotalKey returns a redis key for total failure count for the given queue.
GroupKey returns a redis key used to group tasks belong in the same group.
GroupKeyPrefix returns a prefix for group key.
LeaseKey returns a redis key for the lease.
NewCancelations returns a Cancelations instance.
func NewLease(expirationTime time.Time) *Lease
PausedKey returns a redis key to indicate that the given queue is paused.
PendingKey returns a redis key for the given queue name.
ProcessedKey returns a redis key for processed count for the given day for the queue.
ProcessedTotalKey returns a redis key for total processed count for the given queue.
QueueKeyPrefix returns a prefix for all keys in the given queue.
RetryKey returns a redis key for the retry tasks.
ScheduledKey returns a redis key for the scheduled tasks.
SchedulerEntriesKey returns a redis key for the scheduler entries given scheduler ID.
SchedulerHistoryKey returns a redis key for the scheduler's history for the given entry.
ServerInfoKey returns a redis key for process info.
TaskKey returns a redis key for the given task message.
TaskKeyPrefix returns a prefix for task key.
UniqueKey returns a redis key with the given type, payload, and queue name.
ValidateQueueName validates a given qname to be used as a queue name. Returns nil if valid, otherwise returns non-nil error.
WorkersKey returns a redis key for the workers given hostname, pid, and server ID.
Package-Level Variables (only one)
DefaultQueue is the redis key for the default queue.
Package-Level Constants (total 14)
Global Redis keys.
Global Redis keys.
Global Redis keys.
Global Redis keys.
Global Redis keys.
DefaultQueueName is the queue name used if none are specified by user.
const TaskStateAggregating TaskState = 7 // describes a state where task is waiting in a group to be aggregated
Version of asynq library and CLI.