Source File
asynq.go
Belonging Package
github.com/hibiken/asynq
// Copyright 2020 Kentaro Hibino. All rights reserved.// Use of this source code is governed by a MIT license// that can be found in the LICENSE file.package asynqimport ()// Task represents a unit of work to be performed.type Task struct {// typename indicates the type of task to be performed.typename string// payload holds data needed to perform the task.payload []byte// opts holds options for the task.opts []Option// w is the ResultWriter for the task.w *ResultWriter}func ( *Task) () string { return .typename }func ( *Task) () []byte { return .payload }// ResultWriter returns a pointer to the ResultWriter associated with the task.//// Nil pointer is returned if called on a newly created task (i.e. task created by calling NewTask).// Only the tasks passed to Handler.ProcessTask have a valid ResultWriter pointer.func ( *Task) () *ResultWriter { return .w }// NewTask returns a new Task given a type name and payload data.// Options can be passed to configure task processing behavior.func ( string, []byte, ...Option) *Task {return &Task{typename: ,payload: ,opts: ,}}// newTask creates a task with the given typename, payload and ResultWriter.func newTask( string, []byte, *ResultWriter) *Task {return &Task{typename: ,payload: ,w: ,}}// A TaskInfo describes a task and its metadata.type TaskInfo struct {// ID is the identifier of the task.ID string// Queue is the name of the queue in which the task belongs.Queue string// Type is the type name of the task.Type string// Payload is the payload data of the task.Payload []byte// State indicates the task state.State TaskState// MaxRetry is the maximum number of times the task can be retried.MaxRetry int// Retried is the number of times the task has retried so far.Retried int// LastErr is the error message from the last failure.LastErr string// LastFailedAt is the time time of the last failure if any.// If the task has no failures, LastFailedAt is zero time (i.e. time.Time{}).LastFailedAt time.Time// Timeout is the duration the task can be processed by Handler before being retried,// zero if not specifiedTimeout time.Duration// Deadline is the deadline for the task, zero value if not specified.Deadline time.Time// Group is the name of the group in which the task belongs.//// Tasks in the same queue can be grouped together by Group name and will be aggregated into one task// by a Server processing the queue.//// Empty string (default) indicates task does not belong to any groups, and no aggregation will be applied to the task.Group string// NextProcessAt is the time the task is scheduled to be processed,// zero if not applicable.NextProcessAt time.Time// IsOrphaned describes whether the task is left in active state with no worker processing it.// An orphaned task indicates that the worker has crashed or experienced network failures and was not able to// extend its lease on the task.//// This task will be recovered by running a server against the queue the task is in.// This field is only applicable to tasks with TaskStateActive.IsOrphaned bool// Retention is duration of the retention period after the task is successfully processed.Retention time.Duration// CompletedAt is the time when the task is processed successfully.// Zero value (i.e. time.Time{}) indicates no value.CompletedAt time.Time// Result holds the result data associated with the task.// Use ResultWriter to write result data from the Handler.Result []byte}// If t is non-zero, returns time converted from t as unix time in seconds.// If t is zero, returns zero value of time.Time.func fromUnixTimeOrZero( int64) time.Time {if == 0 {return time.Time{}}return time.Unix(, 0)}func newTaskInfo( *base.TaskMessage, base.TaskState, time.Time, []byte) *TaskInfo {:= TaskInfo{ID: .ID,Queue: .Queue,Type: .Type,Payload: .Payload, // Do we need to make a copy?MaxRetry: .Retry,Retried: .Retried,LastErr: .ErrorMsg,Group: .GroupKey,Timeout: time.Duration(.Timeout) * time.Second,Deadline: fromUnixTimeOrZero(.Deadline),Retention: time.Duration(.Retention) * time.Second,NextProcessAt: ,LastFailedAt: fromUnixTimeOrZero(.LastFailedAt),CompletedAt: fromUnixTimeOrZero(.CompletedAt),Result: ,}switch {case base.TaskStateActive:.State = TaskStateActivecase base.TaskStatePending:.State = TaskStatePendingcase base.TaskStateScheduled:.State = TaskStateScheduledcase base.TaskStateRetry:.State = TaskStateRetrycase base.TaskStateArchived:.State = TaskStateArchivedcase base.TaskStateCompleted:.State = TaskStateCompletedcase base.TaskStateAggregating:.State = TaskStateAggregatingdefault:panic(fmt.Sprintf("internal error: unknown state: %d", ))}return &}// TaskState denotes the state of a task.type TaskState intconst (// Indicates that the task is currently being processed by Handler.TaskStateActive TaskState = iota + 1// Indicates that the task is ready to be processed by Handler.TaskStatePending// Indicates that the task is scheduled to be processed some time in the future.TaskStateScheduled// Indicates that the task has previously failed and scheduled to be processed some time in the future.TaskStateRetry// Indicates that the task is archived and stored for inspection purposes.TaskStateArchived// Indicates that the task is processed successfully and retained until the retention TTL expires.TaskStateCompleted// Indicates that the task is waiting in a group to be aggregated into one task.TaskStateAggregating)func ( TaskState) () string {switch {case TaskStateActive:return "active"case TaskStatePending:return "pending"case TaskStateScheduled:return "scheduled"case TaskStateRetry:return "retry"case TaskStateArchived:return "archived"case TaskStateCompleted:return "completed"case TaskStateAggregating:return "aggregating"}panic("asynq: unknown task state")}// RedisConnOpt is a discriminated union of types that represent Redis connection configuration option.//// RedisConnOpt represents a sum of following types://// - RedisClientOpt// - RedisFailoverClientOpt// - RedisClusterClientOpttype RedisConnOpt interface {// MakeRedisClient returns a new redis client instance.// Return value is intentionally opaque to hide the implementation detail of redis client.MakeRedisClient() interface{}}// RedisClientOpt is used to create a redis client that connects// to a redis server directly.type RedisClientOpt struct {// Network type to use, either tcp or unix.// Default is tcp.Network string// Redis server address in "host:port" format.Addr string// Username to authenticate the current connection when Redis ACLs are used.// See: https://redis.io/commands/auth.Username string// Password to authenticate the current connection.// See: https://redis.io/commands/auth.Password string// Redis DB to select after connecting to a server.// See: https://redis.io/commands/select.DB int// Dial timeout for establishing new connections.// Default is 5 seconds.DialTimeout time.Duration// Timeout for socket reads.// If timeout is reached, read commands will fail with a timeout error// instead of blocking.//// Use value -1 for no timeout and 0 for default.// Default is 3 seconds.ReadTimeout time.Duration// Timeout for socket writes.// If timeout is reached, write commands will fail with a timeout error// instead of blocking.//// Use value -1 for no timeout and 0 for default.// Default is ReadTimout.WriteTimeout time.Duration// Maximum number of socket connections.// Default is 10 connections per every CPU as reported by runtime.NumCPU.PoolSize int// TLS Config used to connect to a server.// TLS will be negotiated only if this field is set.TLSConfig *tls.Config}func ( RedisClientOpt) () interface{} {return redis.NewClient(&redis.Options{Network: .Network,Addr: .Addr,Username: .Username,Password: .Password,DB: .DB,DialTimeout: .DialTimeout,ReadTimeout: .ReadTimeout,WriteTimeout: .WriteTimeout,PoolSize: .PoolSize,TLSConfig: .TLSConfig,})}// RedisFailoverClientOpt is used to creates a redis client that talks// to redis sentinels for service discovery and has an automatic failover// capability.type RedisFailoverClientOpt struct {// Redis master name that monitored by sentinels.MasterName string// Addresses of sentinels in "host:port" format.// Use at least three sentinels to avoid problems described in// https://redis.io/topics/sentinel.SentinelAddrs []string// Redis sentinel password.SentinelPassword string// Username to authenticate the current connection when Redis ACLs are used.// See: https://redis.io/commands/auth.Username string// Password to authenticate the current connection.// See: https://redis.io/commands/auth.Password string// Redis DB to select after connecting to a server.// See: https://redis.io/commands/select.DB int// Dial timeout for establishing new connections.// Default is 5 seconds.DialTimeout time.Duration// Timeout for socket reads.// If timeout is reached, read commands will fail with a timeout error// instead of blocking.//// Use value -1 for no timeout and 0 for default.// Default is 3 seconds.ReadTimeout time.Duration// Timeout for socket writes.// If timeout is reached, write commands will fail with a timeout error// instead of blocking.//// Use value -1 for no timeout and 0 for default.// Default is ReadTimeoutWriteTimeout time.Duration// Maximum number of socket connections.// Default is 10 connections per every CPU as reported by runtime.NumCPU.PoolSize int// TLS Config used to connect to a server.// TLS will be negotiated only if this field is set.TLSConfig *tls.Config}func ( RedisFailoverClientOpt) () interface{} {return redis.NewFailoverClient(&redis.FailoverOptions{MasterName: .MasterName,SentinelAddrs: .SentinelAddrs,SentinelPassword: .SentinelPassword,Username: .Username,Password: .Password,DB: .DB,DialTimeout: .DialTimeout,ReadTimeout: .ReadTimeout,WriteTimeout: .WriteTimeout,PoolSize: .PoolSize,TLSConfig: .TLSConfig,})}// RedisClusterClientOpt is used to creates a redis client that connects to// redis cluster.type RedisClusterClientOpt struct {// A seed list of host:port addresses of cluster nodes.Addrs []string// The maximum number of retries before giving up.// Command is retried on network errors and MOVED/ASK redirects.// Default is 8 retries.MaxRedirects int// Username to authenticate the current connection when Redis ACLs are used.// See: https://redis.io/commands/auth.Username string// Password to authenticate the current connection.// See: https://redis.io/commands/auth.Password string// Dial timeout for establishing new connections.// Default is 5 seconds.DialTimeout time.Duration// Timeout for socket reads.// If timeout is reached, read commands will fail with a timeout error// instead of blocking.//// Use value -1 for no timeout and 0 for default.// Default is 3 seconds.ReadTimeout time.Duration// Timeout for socket writes.// If timeout is reached, write commands will fail with a timeout error// instead of blocking.//// Use value -1 for no timeout and 0 for default.// Default is ReadTimeout.WriteTimeout time.Duration// TLS Config used to connect to a server.// TLS will be negotiated only if this field is set.TLSConfig *tls.Config}func ( RedisClusterClientOpt) () interface{} {return redis.NewClusterClient(&redis.ClusterOptions{Addrs: .Addrs,MaxRedirects: .MaxRedirects,Username: .Username,Password: .Password,DialTimeout: .DialTimeout,ReadTimeout: .ReadTimeout,WriteTimeout: .WriteTimeout,TLSConfig: .TLSConfig,})}// ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid.// It returns a non-nil error if uri cannot be parsed.//// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.// Supported formats are:// redis://[:password@]host[:port][/dbnumber]// rediss://[:password@]host[:port][/dbnumber]// redis-socket://[:password@]path[?db=dbnumber]// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]func ( string) (RedisConnOpt, error) {, := url.Parse()if != nil {return nil, fmt.Errorf("asynq: could not parse redis uri: %v", )}switch .Scheme {case "redis", "rediss":return parseRedisURI()case "redis-socket":return parseRedisSocketURI()case "redis-sentinel":return parseRedisSentinelURI()default:return nil, fmt.Errorf("asynq: unsupported uri scheme: %q", .Scheme)}}func parseRedisURI( *url.URL) (RedisConnOpt, error) {var intvar errorvar RedisClientOptif len(.Path) > 0 {:= strings.Split(strings.Trim(.Path, "/"), "/"), = strconv.Atoi([0])if != nil {return nil, fmt.Errorf("asynq: could not parse redis uri: database number should be the first segment of the path")}}var stringif , := .User.Password(); {=}if .Scheme == "rediss" {, , := net.SplitHostPort(.Host)if != nil {= .Host}.TLSConfig = &tls.Config{ServerName: }}.Addr = .Host.Password =.DB =return , nil}func parseRedisSocketURI( *url.URL) (RedisConnOpt, error) {const = "asynq: could not parse redis socket uri"if len(.Path) == 0 {return nil, fmt.Errorf("%s: path does not exist", )}:= .Query()var intvar errorif := .Get("db"); != "" {, = strconv.Atoi()if != nil {return nil, fmt.Errorf("%s: query param `db` should be a number", )}}var stringif , := .User.Password(); {=}return RedisClientOpt{Network: "unix", Addr: .Path, DB: , Password: }, nil}func parseRedisSentinelURI( *url.URL) (RedisConnOpt, error) {:= strings.Split(.Host, ","):= .Query().Get("master")var stringif , := .User.Password(); {=}return RedisFailoverClientOpt{MasterName: , SentinelAddrs: , SentinelPassword: }, nil}// ResultWriter is a client interface to write result data for a task.// It writes the data to the redis instance the server is connected to.type ResultWriter struct {id string // task ID this writer is responsible forqname string // queue name the task belongs tobroker base.Brokerctx context.Context // context associated with the task}// Write writes the given data as a result of the task the ResultWriter is associated with.func ( *ResultWriter) ( []byte) ( int, error) {select {case <-.ctx.Done():return 0, fmt.Errorf("failed to result task result: %v", .ctx.Err())default:}return .broker.WriteResult(.qname, .id, )}// TaskID returns the ID of the task the ResultWriter is associated with.func ( *ResultWriter) () string {return .id}
![]() |
The pages are generated with Golds v0.8.2. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |