Involved Source Filesaggregator.goasynq.goclient.gocontext.go Package asynq provides a framework for Redis based distrubted task queue.
Asynq uses Redis as a message broker. To connect to redis,
specify the connection using one of RedisConnOpt types.
redisConnOpt = asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
Password: "xxxxx",
DB: 2,
}
The Client is used to enqueue a task.
client := asynq.NewClient(redisConnOpt)
// Task is created with two parameters: its type and payload.
// Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc.
b, err := json.Marshal(ExamplePayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
task := asynq.NewTask("example", b)
// Enqueue the task to be processed immediately.
info, err := client.Enqueue(task)
// Schedule the task to be processed after one minute.
info, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute))
The Server is used to run the task processing workers with a given
handler.
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 10,
})
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
Handler is an interface type with a method which
takes a task and returns an error. Handler should return nil if
the processing is successful, otherwise return a non-nil error.
If handler panics or returns a non-nil error, the task will be retried in the future.
Example of a type that implements the Handler interface.
type TaskHandler struct {
// ...
}
func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
switch task.Type {
case "example":
var data ExamplePayload
if err := json.Unmarshal(task.Payload(), &data); err != nil {
return err
}
// perform task with the data
default:
return fmt.Errorf("unexpected task type %q", task.Type)
}
return nil
}forwarder.gohealthcheck.goheartbeat.goinspector.gojanitor.goperiodic_task_manager.goprocessor.gorecoverer.goscheduler.goservemux.goserver.gosignals_unix.gosubscriber.gosyncer.go
Code Examples
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
rconn, err := asynq.ParseRedisURI("redis://localhost:6379/10")
if err != nil {
log.Fatal(err)
}
r, ok := rconn.(asynq.RedisClientOpt)
if !ok {
log.Fatal("unexpected type")
}
fmt.Println(r.Addr)
fmt.Println(r.DB)
}
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
// ResultWriter is only accessible in Handler.
h := func(ctx context.Context, task *asynq.Task) error {
// .. do task processing work
res := []byte("task result data")
n, err := task.ResultWriter().Write(res) // implements io.Writer
if err != nil {
return fmt.Errorf("failed to write task result: %v", err)
}
log.Printf(" %d bytes written", n)
return nil
}
_ = h
}
package main
import (
"log"
"time"
"github.com/hibiken/asynq"
)
func main() {
scheduler := asynq.NewScheduler(
asynq.RedisClientOpt{Addr: ":6379"},
&asynq.SchedulerOpts{Location: time.Local},
)
if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", nil)); err != nil {
log.Fatal(err)
}
if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", nil)); err != nil {
log.Fatal(err)
}
// Run blocks and waits for os signal to terminate the program.
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
}
package main
import (
"log"
"github.com/hibiken/asynq"
)
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
)
h := asynq.NewServeMux()
// ... Register handlers
// Run blocks and waits for os signal to terminate the program.
if err := srv.Run(h); err != nil {
log.Fatal(err)
}
}
package main
import (
"log"
"os"
"os/signal"
"github.com/hibiken/asynq"
"golang.org/x/sys/unix"
)
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
)
h := asynq.NewServeMux()
// ... Register handlers
if err := srv.Start(h); err != nil {
log.Fatal(err)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
<-sigs // wait for termination signal
srv.Shutdown()
}
package main
import (
"log"
"os"
"os/signal"
"github.com/hibiken/asynq"
"golang.org/x/sys/unix"
)
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
)
h := asynq.NewServeMux()
// ... Register handlers
if err := srv.Start(h); err != nil {
log.Fatal(err)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
// Handle SIGTERM, SIGINT to exit the program.
// Handle SIGTSTP to stop processing new tasks.
for {
s := <-sigs
if s == unix.SIGTSTP {
srv.Stop() // stop processing new tasks
continue
}
break // received SIGTERM or SIGINT signal
}
srv.Shutdown()
}
Package-Level Type Names (total 40)
/* sort by: | */
A Client is responsible for scheduling tasks.
A Client is used to register tasks that should be processed
immediately or some time in the future.
Clients are safe for concurrent use by multiple goroutines. Close closes the connection with redis. Enqueue enqueues the given task to a queue.
Enqueue returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error.
The argument opts specifies the behavior of task processing.
If there are conflicting Option values the last one overrides others.
Any options provided to NewTask can be overridden by options passed to Enqueue.
By default, max retry is set to 25 and timeout is set to 30 minutes.
If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
Enqueue uses context.Background internally; to specify the context, use EnqueueContext. EnqueueContext enqueues the given task to a queue.
EnqueueContext returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error.
The argument opts specifies the behavior of task processing.
If there are conflicting Option values the last one overrides others.
Any options provided to NewTask can be overridden by options passed to Enqueue.
By default, max retry is set to 25 and timeout is set to 30 minutes.
If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
The first argument context applies to the enqueue operation. To specify task timeout and deadline, use Timeout and Deadline option instead.
*Client : github.com/prometheus/common/expfmt.Closer
*Client : io.Closer
func NewClient(r RedisConnOpt) *Client
ClusterNode describes a node in redis cluster. Address of the node. Node ID in the cluster.
func (*Inspector).ClusterNodes(queue string) ([]*ClusterNode, error)
Config specifies the server's background-task processing behavior. BaseContext optionally specifies a function that returns the base context for Handler invocations on this server.
If BaseContext is nil, the default is context.Background().
If this is defined, then it MUST return a non-nil context Maximum number of concurrent processing of tasks.
If set to a zero or negative value, NewServer will overwrite the value
to the number of CPUs usable by the current process. DelayedTaskCheckInterval specifies the interval between checks run on 'scheduled' and 'retry'
tasks, and forwarding them to 'pending' state if they are ready to be processed.
If unset or zero, the interval is set to 5 seconds. ErrorHandler handles errors returned by the task handler.
HandleError is invoked only if the task handler returns a non-nil error.
Example:
func reportError(ctx context, task *asynq.Task, err error) {
retried, _ := asynq.GetRetryCount(ctx)
maxRetry, _ := asynq.GetMaxRetry(ctx)
if retried >= maxRetry {
err = fmt.Errorf("retry exhausted for task %s: %w", task.Type, err)
}
errorReportingService.Notify(err)
})
ErrorHandler: asynq.ErrorHandlerFunc(reportError) GroupAggregator specifies the aggregation function used to aggregate multiple tasks in a group into one task.
If unset or nil, the group aggregation feature will be disabled on the server. GroupGracePeriod specifies the amount of time the server will wait for an incoming task before aggregating
the tasks in a group. If an incoming task is received within this period, the server will wait for another
period of the same length, up to GroupMaxDelay if specified.
If unset or zero, the grace period is set to 1 minute.
Minimum duration for GroupGracePeriod is 1 second. If value specified is less than a second, the call to
NewServer will panic. GroupMaxDelay specifies the maximum amount of time the server will wait for incoming tasks before aggregating
the tasks in a group.
If unset or zero, no delay limit is used. GroupMaxSize specifies the maximum number of tasks that can be aggregated into a single task within a group.
If GroupMaxSize is reached, the server will aggregate the tasks into one immediately.
If unset or zero, no size limit is used. HealthCheckFunc is called periodically with any errors encountered during ping to the
connected redis server. HealthCheckInterval specifies the interval between healthchecks.
If unset or zero, the interval is set to 15 seconds. Predicate function to determine whether the error returned from Handler is a failure.
If the function returns false, Server will not increment the retried counter for the task,
and Server won't record the queue stats (processed and failed stats) to avoid skewing the error
rate of the queue.
By default, if the given error is non-nil the function returns true. LogLevel specifies the minimum log level to enable.
If unset, InfoLevel is used by default. Logger specifies the logger used by the server instance.
If unset, default logger is used. List of queues to process with given priority value. Keys are the names of the
queues and values are associated priority value.
If set to nil or not specified, the server will process only the "default" queue.
Priority is treated as follows to avoid starving low priority queues.
Example:
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
}
With the above config and given that all queues are not empty, the tasks
in "critical", "default", "low" should be processed 60%, 30%, 10% of
the time respectively.
If a queue has a zero or negative priority value, the queue will be ignored. Function to calculate retry delay for a failed task.
By default, it uses exponential backoff algorithm to calculate the delay. ShutdownTimeout specifies the duration to wait to let workers finish their tasks
before forcing them to abort when stopping the server.
If unset or zero, default timeout of 8 seconds is used. StrictPriority indicates whether the queue priority should be treated strictly.
If set to true, tasks in the queue with the highest priority is processed first.
The tasks in lower priority queues are processed only when those queues with
higher priorities are empty.
func NewServer(r RedisConnOpt, cfg Config) *Server
DailyStats holds aggregate data for a given day for a given queue. Date this stats was taken. Total number of tasks failed to be processed during the given date. Total number of tasks being processed during the given date.
The number includes both succeeded and failed tasks. Name of the queue.
func (*Inspector).History(queue string, n int) ([]*DailyStats, error)
The ErrorHandlerFunc type is an adapter to allow the use of ordinary functions as a ErrorHandler.
If f is a function with the appropriate signature, ErrorHandlerFunc(f) is a ErrorHandler that calls f. HandleError calls fn(ctx, task, err)
ErrorHandlerFunc : ErrorHandler
GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler. Aggregate aggregates the given tasks in a group with the given group name,
and returns a new task which is the aggregation of those tasks.
Use NewTask(typename, payload, opts...) to set any options for the aggregated task.
The Queue option, if provided, will be ignored and the aggregated task will always be enqueued
to the same queue the group belonged.GroupAggregatorFunc
The GroupAggregatorFunc type is an adapter to allow the use of ordinary functions as a GroupAggregator.
If f is a function with the appropriate signature, GroupAggregatorFunc(f) is a GroupAggregator that calls f. Aggregate calls fn(group, tasks)
GroupAggregatorFunc : GroupAggregator
GroupInfo represents a state of a group at a certain time. Name of the group. Size of the group.
func (*Inspector).Groups(queue string) ([]*GroupInfo, error)
A Handler processes tasks.
ProcessTask should return nil if the processing of a task
is successful.
If ProcessTask returns a non-nil error or panics, the task
will be retried after delay if retry-count is remaining,
otherwise the task will be archived.
One exception to this rule is when ProcessTask returns a SkipRetry error.
If the returned error is SkipRetry or an error wraps SkipRetry, retry is
skipped and the task will be immediately archived instead.( Handler) ProcessTask(context.Context, *Task) errorHandlerFunc
*ServeMux
func NotFoundHandler() Handler
func (*ServeMux).Handler(t *Task) (h Handler, pattern string)
func (*ServeMux).Handle(pattern string, handler Handler)
func (*Server).Run(handler Handler) error
func (*Server).Start(handler Handler) error
The HandlerFunc type is an adapter to allow the use of
ordinary functions as a Handler. If f is a function
with the appropriate signature, HandlerFunc(f) is a
Handler that calls f. ProcessTask calls fn(ctx, task)
HandlerFunc : Handler
Inspector is a client interface to inspect and mutate the state of
queues and tasks. ArchiveAllAggregatingTasks archives all tasks from the given group,
and reports the number of tasks archived. ArchiveAllPendingTasks archives all pending tasks from the given queue,
and reports the number of tasks archived. ArchiveAllRetryTasks archives all retry tasks from the given queue,
and reports the number of tasks archiveed. ArchiveAllScheduledTasks archives all scheduled tasks from the given queue,
and reports the number of tasks archiveed. ArchiveTask archives a task with the given id in the given queue.
The task needs to be in pending, scheduled, or retry state, otherwise ArchiveTask
will return an error.
If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
If the task is in already archived, it returns a non-nil error. CancelProcessing sends a signal to cancel processing of the task
given a task id. CancelProcessing is best-effort, which means that it does not
guarantee that the task with the given id will be canceled. The return
value only indicates whether the cancelation signal has been sent. Close closes the connection with redis. ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to. ClusterNodes returns a list of nodes the given queue belongs to.
Only relevant if task queues are stored in redis cluster. DeleteAllAggregatingTasks deletes all tasks from the specified group,
and reports the number of tasks deleted. DeleteAllArchivedTasks deletes all archived tasks from the specified queue,
and reports the number tasks deleted. DeleteAllCompletedTasks deletes all completed tasks from the specified queue,
and reports the number tasks deleted. DeleteAllPendingTasks deletes all pending tasks from the specified queue,
and reports the number tasks deleted. DeleteAllRetryTasks deletes all retry tasks from the specified queue,
and reports the number tasks deleted. DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue,
and reports the number tasks deleted. DeleteQueue removes the specified queue.
If force is set to true, DeleteQueue will remove the queue regardless of
the queue size as long as no tasks are active in the queue.
If force is set to false, DeleteQueue will remove the queue only if
the queue is empty.
If the specified queue does not exist, DeleteQueue returns ErrQueueNotFound.
If force is set to false and the specified queue is not empty, DeleteQueue
returns ErrQueueNotEmpty. DeleteTask deletes a task with the given id from the given queue.
The task needs to be in pending, scheduled, retry, or archived state,
otherwise DeleteTask will return an error.
If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
If the task is in active state, it returns a non-nil error. GetQueueInfo returns current information of the given queue. GetTaskInfo retrieves task information given a task id and queue name.
Returns an error wrapping ErrQueueNotFound if a queue with the given name doesn't exist.
Returns an error wrapping ErrTaskNotFound if a task with the given id doesn't exist in the queue. Groups returns a list of all groups within the given queue. History returns a list of stats from the last n days. ListActiveTasks retrieves active tasks from the specified queue.
By default, it retrieves the first 30 tasks. ListAggregatingTasks retrieves scheduled tasks from the specified group.
By default, it retrieves the first 30 tasks. ListArchivedTasks retrieves archived tasks from the specified queue.
Tasks are sorted by LastFailedAt in descending order.
By default, it retrieves the first 30 tasks. ListCompletedTasks retrieves completed tasks from the specified queue.
Tasks are sorted by expiration time (i.e. CompletedAt + Retention) in descending order.
By default, it retrieves the first 30 tasks. ListPendingTasks retrieves pending tasks from the specified queue.
By default, it retrieves the first 30 tasks. ListRetryTasks retrieves retry tasks from the specified queue.
Tasks are sorted by NextProcessAt in ascending order.
By default, it retrieves the first 30 tasks. ListScheduledTasks retrieves scheduled tasks from the specified queue.
Tasks are sorted by NextProcessAt in ascending order.
By default, it retrieves the first 30 tasks. ListSchedulerEnqueueEvents retrieves a list of enqueue events from the specified scheduler entry.
By default, it retrieves the first 30 tasks. PauseQueue pauses task processing on the specified queue.
If the queue is already paused, it will return a non-nil error. Queues returns a list of all queue names. RunAllAggregatingTasks schedules all tasks from the given grou to run.
and reports the number of tasks scheduled to run. RunAllArchivedTasks schedules all archived tasks from the given queue to run,
and reports the number of tasks scheduled to run. RunAllRetryTasks schedules all retry tasks from the given queue to run,
and reports the number of tasks scheduled to run. RunAllScheduledTasks schedules all scheduled tasks from the given queue to run,
and reports the number of tasks scheduled to run. RunTask updates the task to pending state given a queue name and task id.
The task needs to be in scheduled, retry, or archived state, otherwise RunTask
will return an error.
If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
If the task is in pending or active state, it returns a non-nil error. SchedulerEntries returns a list of all entries registered with
currently running schedulers. Servers return a list of running servers' information. UnpauseQueue resumes task processing on the specified queue.
If the queue is not paused, it will return a non-nil error.
*Inspector : github.com/prometheus/common/expfmt.Closer
*Inspector : io.Closer
func NewInspector(r RedisConnOpt) *Inspector
Logger supports logging at various log levels. Debug logs a message at Debug level. Error logs a message at Error level. Fatal logs a message at Fatal level
and process will exit with status set to 1. Info logs a message at Info level. Warn logs a message at Warning level.
github.com/hibiken/asynq/internal/log.Base(interface)
*github.com/hibiken/asynq/internal/log.Logger
github.com/ipfs/go-log/v2.EventLogger(interface)
github.com/ipfs/go-log/v2.StandardLogger(interface)
*github.com/ipfs/go-log/v2.ZapEventLogger
*go.uber.org/zap.SugaredLogger
Logger : github.com/hibiken/asynq/internal/log.Base
LogLevel represents logging level.
It satisfies flag.Value interface. Set is part of the flag.Value interface. String is part of the flag.Value interface.
*LogLevel : expvar.Var
*LogLevel : flag.Value
*LogLevel : fmt.Stringer
const DebugLevel
const ErrorLevel
const FatalLevel
const InfoLevel
const WarnLevel
MiddlewareFunc is a function which receives an asynq.Handler and returns another asynq.Handler.
Typically, the returned handler is a closure which does something with the context and task passed
to it, and then calls the handler passed as parameter to the MiddlewareFunc.
func (*ServeMux).Use(mws ...MiddlewareFunc)
PeriodicTaskConfig specifies the details of a periodic task. // required: must be non empty string // optional: can be nil // required: must be non nil
func PeriodicTaskConfigProvider.GetConfigs() ([]*PeriodicTaskConfig, error)
PeriodicTaskConfigProvider provides configs for periodic tasks.
GetConfigs will be called by a PeriodicTaskManager periodically to
sync the scheduler's entries with the configs returned by the provider.( PeriodicTaskConfigProvider) GetConfigs() ([]*PeriodicTaskConfig, error)
PeriodicTaskManager manages scheduling of periodic tasks.
It syncs scheduler's entries by calling the config provider periodically. Run starts the manager and blocks until an os signal to exit the program is received.
Once it receives a signal, it gracefully shuts down the manager. Shutdown gracefully shuts down the manager.
It notifies a background syncer goroutine to stop and stops scheduler. Start starts a scheduler and background goroutine to sync the scheduler with the configs
returned by the provider.
Start returns any error encountered at start up time.
func NewPeriodicTaskManager(opts PeriodicTaskManagerOpts) (*PeriodicTaskManager, error)
Required: must be non nil Required: must be non nil Optional: scheduler options Deprecated: Use PostEnqueueFunc instead
EnqueueErrorHandler gets called when scheduler cannot enqueue a registered task
due to an error. Location specifies the time zone location.
If unset, the UTC time zone (time.UTC) is used. LogLevel specifies the minimum log level to enable.
If unset, InfoLevel is used by default. Logger specifies the logger used by the scheduler instance.
If unset, the default logger is used. PostEnqueueFunc, if provided, is called after a task gets enqueued by Scheduler.
The callback function should return quickly to not block the current thread. PreEnqueueFunc, if provided, is called before a task gets enqueued by Scheduler.
The callback function should return quickly to not block the current thread. Optional: default is 3m
func NewPeriodicTaskManager(opts PeriodicTaskManagerOpts) (*PeriodicTaskManager, error)
QueueInfo represents a state of a queue at a certain time. Number of active tasks. Number of aggregating tasks. Number of archived tasks. Number of stored completed tasks. Total number of tasks failed to be processed within the given date (counter resets daily). Total number of tasks failed (cumulative). Groups is the total number of groups in the queue. Latency of the queue, measured by the oldest pending task in the queue. Total number of bytes that the queue and its tasks require to be stored in redis.
It is an approximate memory usage value in bytes since the value is computed by sampling. Paused indicates whether the queue is paused.
If true, tasks in the queue will not be processed. Number of pending tasks. Total number of tasks being processed within the given date (counter resets daily).
The number includes both succeeded and failed tasks. Total number of tasks processed (cumulative). Name of the queue. Number of retry tasks. Number of scheduled tasks. Size is the total number of tasks in the queue.
The value is the sum of Pending, Active, Scheduled, Retry, Aggregating and Archived. Time when this queue info snapshot was taken.
func (*Inspector).GetQueueInfo(queue string) (*QueueInfo, error)
RedisClientOpt is used to create a redis client that connects
to a redis server directly. Redis server address in "host:port" format. Redis DB to select after connecting to a server.
See: https://redis.io/commands/select. Dial timeout for establishing new connections.
Default is 5 seconds. Network type to use, either tcp or unix.
Default is tcp. Password to authenticate the current connection.
See: https://redis.io/commands/auth. Maximum number of socket connections.
Default is 10 connections per every CPU as reported by runtime.NumCPU. Timeout for socket reads.
If timeout is reached, read commands will fail with a timeout error
instead of blocking.
Use value -1 for no timeout and 0 for default.
Default is 3 seconds. TLS Config used to connect to a server.
TLS will be negotiated only if this field is set. Username to authenticate the current connection when Redis ACLs are used.
See: https://redis.io/commands/auth. Timeout for socket writes.
If timeout is reached, write commands will fail with a timeout error
instead of blocking.
Use value -1 for no timeout and 0 for default.
Default is ReadTimout.( RedisClientOpt) MakeRedisClient() interface{}
RedisClientOpt : RedisConnOpt
RedisClusterClientOpt is used to creates a redis client that connects to
redis cluster. A seed list of host:port addresses of cluster nodes. Dial timeout for establishing new connections.
Default is 5 seconds. The maximum number of retries before giving up.
Command is retried on network errors and MOVED/ASK redirects.
Default is 8 retries. Password to authenticate the current connection.
See: https://redis.io/commands/auth. Timeout for socket reads.
If timeout is reached, read commands will fail with a timeout error
instead of blocking.
Use value -1 for no timeout and 0 for default.
Default is 3 seconds. TLS Config used to connect to a server.
TLS will be negotiated only if this field is set. Username to authenticate the current connection when Redis ACLs are used.
See: https://redis.io/commands/auth. Timeout for socket writes.
If timeout is reached, write commands will fail with a timeout error
instead of blocking.
Use value -1 for no timeout and 0 for default.
Default is ReadTimeout.( RedisClusterClientOpt) MakeRedisClient() interface{}
RedisClusterClientOpt : RedisConnOpt
RedisFailoverClientOpt is used to creates a redis client that talks
to redis sentinels for service discovery and has an automatic failover
capability. Redis DB to select after connecting to a server.
See: https://redis.io/commands/select. Dial timeout for establishing new connections.
Default is 5 seconds. Redis master name that monitored by sentinels. Password to authenticate the current connection.
See: https://redis.io/commands/auth. Maximum number of socket connections.
Default is 10 connections per every CPU as reported by runtime.NumCPU. Timeout for socket reads.
If timeout is reached, read commands will fail with a timeout error
instead of blocking.
Use value -1 for no timeout and 0 for default.
Default is 3 seconds. Addresses of sentinels in "host:port" format.
Use at least three sentinels to avoid problems described in
https://redis.io/topics/sentinel. Redis sentinel password. TLS Config used to connect to a server.
TLS will be negotiated only if this field is set. Username to authenticate the current connection when Redis ACLs are used.
See: https://redis.io/commands/auth. Timeout for socket writes.
If timeout is reached, write commands will fail with a timeout error
instead of blocking.
Use value -1 for no timeout and 0 for default.
Default is ReadTimeout( RedisFailoverClientOpt) MakeRedisClient() interface{}
RedisFailoverClientOpt : RedisConnOpt
ResultWriter is a client interface to write result data for a task.
It writes the data to the redis instance the server is connected to. TaskID returns the ID of the task the ResultWriter is associated with. Write writes the given data as a result of the task the ResultWriter is associated with.
*ResultWriter : github.com/miekg/dns.Writer
*ResultWriter : internal/bisect.Writer
*ResultWriter : io.Writer
func (*Task).ResultWriter() *ResultWriter
RetryDelayFunc calculates the retry delay duration for a failed task given
the retry count, error, and the task.
n is the number of times the task has been retried.
e is the error returned by the task handler.
t is the task in question.
A Scheduler kicks off tasks at regular intervals based on the user defined schedule.
Schedulers are safe for concurrent use by multiple goroutines. Register registers a task to be enqueued on the given schedule specified by the cronspec.
It returns an ID of the newly registered entry. Run starts the scheduler until an os signal to exit the program is received.
It returns an error if scheduler is already running or has been shutdown. Shutdown stops and shuts down the scheduler. Start starts the scheduler.
It returns an error if the scheduler is already running or has been shutdown. Unregister removes a registered entry by entry ID.
Unregister returns a non-nil error if no entries were found for the given entryID.
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler
SchedulerEnqueueEvent holds information about an enqueue event by a scheduler. Time the task was enqueued. ID of the task that was enqueued.
func (*Inspector).ListSchedulerEnqueueEvents(entryID string, opts ...ListOption) ([]*SchedulerEnqueueEvent, error)
SchedulerEntry holds information about a periodic task registered with a scheduler. Identifier of this entry. Next shows the next time the task will be enqueued. Opts is the options for the periodic task. Prev shows the last time the task was enqueued.
Zero time if task was never enqueued. Spec describes the schedule of this entry. Periodic Task registered for this entry.
func (*Inspector).SchedulerEntries() ([]*SchedulerEntry, error)
SchedulerOpts specifies scheduler options. Deprecated: Use PostEnqueueFunc instead
EnqueueErrorHandler gets called when scheduler cannot enqueue a registered task
due to an error. Location specifies the time zone location.
If unset, the UTC time zone (time.UTC) is used. LogLevel specifies the minimum log level to enable.
If unset, InfoLevel is used by default. Logger specifies the logger used by the scheduler instance.
If unset, the default logger is used. PostEnqueueFunc, if provided, is called after a task gets enqueued by Scheduler.
The callback function should return quickly to not block the current thread. PreEnqueueFunc, if provided, is called before a task gets enqueued by Scheduler.
The callback function should return quickly to not block the current thread.
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler
ServeMux is a multiplexer for asynchronous tasks.
It matches the type of each task against a list of registered patterns
and calls the handler for the pattern that most closely matches the
task's type name.
Longer patterns take precedence over shorter ones, so that if there are
handlers registered for both "images" and "images:thumbnails",
the latter handler will be called for tasks with a type name beginning with
"images:thumbnails" and the former will receive tasks with type name beginning
with "images". Handle registers the handler for the given pattern.
If a handler already exists for pattern, Handle panics. HandleFunc registers the handler function for the given pattern. Handler returns the handler to use for the given task.
It always return a non-nil handler.
Handler also returns the registered pattern that matches the task.
If there is no registered handler that applies to the task,
handler returns a 'not found' handler which returns an error. ProcessTask dispatches the task to the handler whose
pattern most closely matches the task type. Use appends a MiddlewareFunc to the chain.
Middlewares are executed in the order that they are applied to the ServeMux.
*ServeMux : Handler
func NewServeMux() *ServeMux
Server is responsible for task processing and task lifecycle management.
Server pulls tasks off queues and processes them.
If the processing of a task is unsuccessful, server will schedule it for a retry.
A task will be retried until either the task gets processed successfully
or until it reaches its max retry count.
If a task exhausts its retries, it will be moved to the archive and
will be kept in the archive set.
Note that the archive size is finite and once it reaches its max size,
oldest tasks in the archive will be deleted. Run starts the task processing and blocks until
an os signal to exit the program is received. Once it receives
a signal, it gracefully shuts down all active workers and other
goroutines to process the tasks.
Run returns any error encountered at server startup time.
If the server has already been shutdown, ErrServerClosed is returned. Shutdown gracefully shuts down the server.
It gracefully closes all active workers. The server will wait for
active workers to finish processing tasks for duration specified in Config.ShutdownTimeout.
If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis. Start starts the worker server. Once the server has started,
it pulls tasks off queues and starts a worker goroutine for each task
and then call Handler to process it.
Tasks are processed concurrently by the workers up to the number of
concurrency specified in Config.Concurrency.
Start returns any error encountered at server startup time.
If the server has already been shutdown, ErrServerClosed is returned. Stop signals the server to stop pulling new tasks off queues.
Stop can be used before shutting down the server to ensure that all
currently active tasks are processed before server shutdown.
Stop does not shutdown the server, make sure to call Shutdown before exit.
func NewServer(r RedisConnOpt, cfg Config) *Server
ServerInfo describes a running Server instance. A List of active workers currently processing tasks. Server configuration details.
See Config doc for field descriptions. Host machine on which the server is running. Unique Identifier for the server. PID of the process in which the server is running.Queuesmap[string]int Time the server started. Status indicates the status of the server.
TODO: Update comment with more details.StrictPrioritybool
func (*Inspector).Servers() ([]*ServerInfo, error)
A TaskInfo describes a task and its metadata. CompletedAt is the time when the task is processed successfully.
Zero value (i.e. time.Time{}) indicates no value. Deadline is the deadline for the task, zero value if not specified. Group is the name of the group in which the task belongs.
Tasks in the same queue can be grouped together by Group name and will be aggregated into one task
by a Server processing the queue.
Empty string (default) indicates task does not belong to any groups, and no aggregation will be applied to the task. ID is the identifier of the task. IsOrphaned describes whether the task is left in active state with no worker processing it.
An orphaned task indicates that the worker has crashed or experienced network failures and was not able to
extend its lease on the task.
This task will be recovered by running a server against the queue the task is in.
This field is only applicable to tasks with TaskStateActive. LastErr is the error message from the last failure. LastFailedAt is the time time of the last failure if any.
If the task has no failures, LastFailedAt is zero time (i.e. time.Time{}). MaxRetry is the maximum number of times the task can be retried. NextProcessAt is the time the task is scheduled to be processed,
zero if not applicable. Payload is the payload data of the task. Queue is the name of the queue in which the task belongs. Result holds the result data associated with the task.
Use ResultWriter to write result data from the Handler. Retention is duration of the retention period after the task is successfully processed. Retried is the number of times the task has retried so far. State indicates the task state. Timeout is the duration the task can be processed by Handler before being retried,
zero if not specified Type is the type name of the task.
func (*Client).Enqueue(task *Task, opts ...Option) (*TaskInfo, error)
func (*Client).EnqueueContext(ctx context.Context, task *Task, opts ...Option) (*TaskInfo, error)
func (*Inspector).GetTaskInfo(queue, id string) (*TaskInfo, error)
func (*Inspector).ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error)
func (*Inspector).ListAggregatingTasks(queue, group string, opts ...ListOption) ([]*TaskInfo, error)
func (*Inspector).ListArchivedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error)
func (*Inspector).ListCompletedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error)
func (*Inspector).ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error)
func (*Inspector).ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInfo, error)
func (*Inspector).ListScheduledTasks(queue string, opts ...ListOption) ([]*TaskInfo, error)
WorkerInfo describes a running worker processing a task. Time the worker needs to finish processing the task by. Queue from which the worker got its task. Time the worker started processing the task. ID of the task the worker is processing. Payload of the task the worker is processing. Type of the task the worker is processing.
Package-Level Functions (total 27)
Deadline returns an option to specify the deadline for the given task.
If it reaches the deadline before the Handler returns, then the task
will be retried.
If there's a conflicting Timeout option, whichever comes earliest
will be used.
DefaultRetryDelayFunc is the default RetryDelayFunc used if one is not specified in Config.
It uses exponential back-off strategy to calculate the retry delay.
GetMaxRetry extracts maximum retry from a context, if any.
Return value n indicates the maximum number of times the associated task
can be retried if ProcessTask returns a non-nil error.
GetQueueName extracts queue name from a context, if any.
Return value queue indicates which queue the task was pulled from.
GetRetryCount extracts retry count from a context, if any.
Return value n indicates the number of times associated task has been
retried so far.
GetTaskID extracts a task ID from a context, if any.
ID of a task is guaranteed to be unique.
ID of a task doesn't change if the task is being retried.
Group returns an option to specify the group used for the task.
Tasks in a given queue with the same group will be aggregated into one task before passed to Handler.
MaxRetry returns an option to specify the max number of times
the task will be retried.
Negative retry count is treated as zero retry.
NewClient returns a new Client instance given a redis connection option.
New returns a new instance of Inspector.
NewPeriodicTaskManager returns a new PeriodicTaskManager instance.
The given opts should specify the RedisConnOp and PeriodicTaskConfigProvider at minimum.
NewScheduler returns a new Scheduler instance given the redis connection option.
The parameter opts is optional, defaults will be used if opts is set to nil
NewServeMux allocates and returns a new ServeMux.
NewServer returns a new Server given a redis connection option
and server configuration.
NewTask returns a new Task given a type name and payload data.
Options can be passed to configure task processing behavior.
NotFound returns an error indicating that the handler was not found for the given task.
NotFoundHandler returns a simple task handler that returns a ``not found`` error.
Page returns an option to specify the page number for list operation.
The value 1 fetches the first page.
Negative page number is treated as one.
PageSize returns an option to specify the page size for list operation.
Negative page size is treated as zero.
ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid.
It returns a non-nil error if uri cannot be parsed.
Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
Supported formats are:
redis://[:password@]host[:port][/dbnumber]
rediss://[:password@]host[:port][/dbnumber]
redis-socket://[:password@]path[?db=dbnumber]
redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
ProcessAt returns an option to specify when to process the given task.
If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others.
ProcessIn returns an option to specify when to process the given task relative to the current time.
If there's a conflicting ProcessAt option, the last option passed to Enqueue overrides the others.
Queue returns an option to specify the queue to enqueue the task into.
Retention returns an option to specify the duration of retention period for the task.
If this option is provided, the task will be stored as a completed task after successful processing.
A completed task will be deleted after the specified duration elapses.
TaskID returns an option to specify the task ID.
Timeout returns an option to specify how long a task may run.
If the timeout elapses before the Handler returns, then the task
will be retried.
Zero duration means no limit.
If there's a conflicting Deadline option, whichever comes earliest
will be used.
Unique returns an option to enqueue a task only if the given task is unique.
Task enqueued with this option is guaranteed to be unique within the given ttl.
Once the task gets processed successfully or once the TTL has expired,
another task with the same uniqueness may be enqueued.
ErrDuplicateTask error is returned when enqueueing a duplicate task.
TTL duration must be greater than or equal to 1 second.
Uniqueness of a task is based on the following properties:
- Task Type
- Task Payload
- Queue Name
Package-Level Variables (total 8)
ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
ErrLeaseExpired error indicates that the task failed because the worker working on the task
could not extend its lease due to missing heartbeats. The worker may have crashed or got cutoff from the network.
ErrQueueNotEmpty indicates that the specified queue is not empty.
ErrQueueNotFound indicates that the specified queue does not exist.
ErrServerClosed indicates that the operation is now illegal because of the server has been shutdown.
ErrTaskIDConflict indicates that the given task could not be enqueued since its task ID already exists.
ErrTaskIDConflict error only applies to tasks enqueued with a TaskID option.
ErrTaskNotFound indicates that the specified task cannot be found in the queue.
SkipRetry is used as a return value from Handler.ProcessTask to indicate that
the task should not be retried and should be archived instead.
WarnLevel is used for undesired but relatively expected events,
which may indicate a problem.
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.