// Copyright 2020 Kentaro Hibino. All rights reserved.// Use of this source code is governed by a MIT license// that can be found in the LICENSE file.
// Package rdb encapsulates the interactions with redis.
package rdbimport ()const statsTTL = 90 * 24 * time.Hour// 90 days// LeaseDuration is the duration used to initially create a lease and to extend it thereafter.constLeaseDuration = 30 * time.Second// RDB is a client interface to query and mutate task queues.typeRDBstruct { client redis.UniversalClient clock timeutil.Clock}// NewRDB returns a new instance of RDB.func ( redis.UniversalClient) *RDB {return &RDB{client: ,clock: timeutil.NewRealClock(), }}// Close closes the connection with redis server.func ( *RDB) () error {return .client.Close()}// Client returns the reference to underlying redis client.func ( *RDB) () redis.UniversalClient {return .client}// SetClock sets the clock used by RDB to the given clock.//// Use this function to set the clock to SimulatedClock in tests.func ( *RDB) ( timeutil.Clock) { .clock = }// Ping checks the connection with redis server.func ( *RDB) () error {return .client.Ping(context.Background()).Err()}func ( *RDB) ( context.Context, errors.Op, *redis.Script, []string, ...interface{}) error {if := .Run(, .client, , ...).Err(); != nil {returnerrors.E(, errors.Internal, fmt.Sprintf("redis eval error: %v", )) }returnnil}// Runs the given script with keys and args and returns the script's return value as int64.func ( *RDB) ( context.Context, errors.Op, *redis.Script, []string, ...interface{}) (int64, error) { , := .Run(, .client, , ...).Result()if != nil {return0, errors.E(, errors.Unknown, fmt.Sprintf("redis eval error: %v", )) } , := .(int64)if ! {return0, errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", )) }return , nil}// enqueueCmd enqueues a given task message.//// Input:// KEYS[1] -> asynq:{<qname>}:t:<task_id>// KEYS[2] -> asynq:{<qname>}:pending// --// ARGV[1] -> task message data// ARGV[2] -> task ID// ARGV[3] -> current unix time in nsec//// Output:// Returns 1 if successfully enqueued// Returns 0 if task ID already existsvar enqueueCmd = redis.NewScript(`if redis.call("EXISTS", KEYS[1]) == 1 then return 0endredis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "pending", "pending_since", ARGV[3])redis.call("LPUSH", KEYS[2], ARGV[2])return 1`)// Enqueue adds the given task to the pending list of the queue.func ( *RDB) ( context.Context, *base.TaskMessage) error {varerrors.Op = "rdb.Enqueue" , := base.EncodeMessage()if != nil {returnerrors.E(, errors.Unknown, fmt.Sprintf("cannot encode message: %v", )) }if := .client.SAdd(, base.AllQueues, .Queue).Err(); != nil {returnerrors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) } := []string{base.TaskKey(.Queue, .ID),base.PendingKey(.Queue), } := []interface{}{ , .ID, .clock.Now().UnixNano(), } , := .runScriptWithErrorCode(, , enqueueCmd, , ...)if != nil {return }if == 0 {returnerrors.E(, errors.AlreadyExists, errors.ErrTaskIdConflict) }returnnil}// enqueueUniqueCmd enqueues the task message if the task is unique.//// KEYS[1] -> unique key// KEYS[2] -> asynq:{<qname>}:t:<taskid>// KEYS[3] -> asynq:{<qname>}:pending// --// ARGV[1] -> task ID// ARGV[2] -> uniqueness lock TTL// ARGV[3] -> task message data// ARGV[4] -> current unix time in nsec//// Output:// Returns 1 if successfully enqueued// Returns 0 if task ID conflicts with another task// Returns -1 if task unique key already existsvar enqueueUniqueCmd = redis.NewScript(`local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])if not ok then return -1endif redis.call("EXISTS", KEYS[2]) == 1 then return 0endredis.call("HSET", KEYS[2], "msg", ARGV[3], "state", "pending", "pending_since", ARGV[4], "unique_key", KEYS[1])redis.call("LPUSH", KEYS[3], ARGV[1])return 1`)// EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired.// It returns ErrDuplicateTask if the lock cannot be acquired.func ( *RDB) ( context.Context, *base.TaskMessage, time.Duration) error {varerrors.Op = "rdb.EnqueueUnique" , := base.EncodeMessage()if != nil {returnerrors.E(, errors.Internal, "cannot encode task message: %v", ) }if := .client.SAdd(, base.AllQueues, .Queue).Err(); != nil {returnerrors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) } := []string{ .UniqueKey,base.TaskKey(.Queue, .ID),base.PendingKey(.Queue), } := []interface{}{ .ID,int(.Seconds()), , .clock.Now().UnixNano(), } , := .runScriptWithErrorCode(, , enqueueUniqueCmd, , ...)if != nil {return }if == -1 {returnerrors.E(, errors.AlreadyExists, errors.ErrDuplicateTask) }if == 0 {returnerrors.E(, errors.AlreadyExists, errors.ErrTaskIdConflict) }returnnil}// Input:// KEYS[1] -> asynq:{<qname>}:pending// KEYS[2] -> asynq:{<qname>}:paused// KEYS[3] -> asynq:{<qname>}:active// KEYS[4] -> asynq:{<qname>}:lease// --// ARGV[1] -> initial lease expiration Unix time// ARGV[2] -> task key prefix//// Output:// Returns nil if no processable task is found in the given queue.// Returns an encoded TaskMessage.//// Note: dequeueCmd checks whether a queue is paused first, before// calling RPOPLPUSH to pop a task from the queue.var dequeueCmd = redis.NewScript(`if redis.call("EXISTS", KEYS[2]) == 0 then local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3]) if id then local key = ARGV[2] .. id redis.call("HSET", key, "state", "active") redis.call("HDEL", key, "pending_since") redis.call("ZADD", KEYS[4], ARGV[1], id) return redis.call("HGET", key, "msg") endendreturn nil`)// Dequeue queries given queues in order and pops a task message// off a queue if one exists and returns the message and its lease expiration time.// Dequeue skips a queue if the queue is paused.// If all queues are empty, ErrNoProcessableTask error is returned.func ( *RDB) ( ...string) ( *base.TaskMessage, time.Time, error) {varerrors.Op = "rdb.Dequeue"for , := range { := []string{base.PendingKey(),base.PausedKey(),base.ActiveKey(),base.LeaseKey(), } = .clock.Now().Add(LeaseDuration) := []interface{}{ .Unix(),base.TaskKeyPrefix(), } , := dequeueCmd.Run(context.Background(), .client, , ...).Result()if == redis.Nil {continue } elseif != nil {returnnil, time.Time{}, errors.E(, errors.Unknown, fmt.Sprintf("redis eval error: %v", )) } , := cast.ToStringE()if != nil {returnnil, time.Time{}, errors.E(, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", )) }if , = base.DecodeMessage([]byte()); != nil {returnnil, time.Time{}, errors.E(, errors.Internal, fmt.Sprintf("cannot decode message: %v", )) }return , , nil }returnnil, time.Time{}, errors.E(, errors.NotFound, errors.ErrNoProcessableTask)}// KEYS[1] -> asynq:{<qname>}:active// KEYS[2] -> asynq:{<qname>}:lease// KEYS[3] -> asynq:{<qname>}:t:<task_id>// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>// KEYS[5] -> asynq:{<qname>}:processed// -------// ARGV[1] -> task ID// ARGV[2] -> stats expiration timestamp// ARGV[3] -> max int64 valuevar doneCmd = redis.NewScript(`if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endif redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endif redis.call("DEL", KEYS[3]) == 0 then return redis.error_reply("NOT FOUND")endlocal n = redis.call("INCR", KEYS[4])if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[4], ARGV[2])endlocal total = redis.call("GET", KEYS[5])if tonumber(total) == tonumber(ARGV[3]) then redis.call("SET", KEYS[5], 1)else redis.call("INCR", KEYS[5])endreturn redis.status_reply("OK")`)// KEYS[1] -> asynq:{<qname>}:active// KEYS[2] -> asynq:{<qname>}:lease// KEYS[3] -> asynq:{<qname>}:t:<task_id>// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>// KEYS[5] -> asynq:{<qname>}:processed// KEYS[6] -> unique key// -------// ARGV[1] -> task ID// ARGV[2] -> stats expiration timestamp// ARGV[3] -> max int64 valuevar doneUniqueCmd = redis.NewScript(`if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endif redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endif redis.call("DEL", KEYS[3]) == 0 then return redis.error_reply("NOT FOUND")endlocal n = redis.call("INCR", KEYS[4])if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[4], ARGV[2])endlocal total = redis.call("GET", KEYS[5])if tonumber(total) == tonumber(ARGV[3]) then redis.call("SET", KEYS[5], 1)else redis.call("INCR", KEYS[5])endif redis.call("GET", KEYS[6]) == ARGV[1] then redis.call("DEL", KEYS[6])endreturn redis.status_reply("OK")`)// Done removes the task from active queue and deletes the task.// It removes a uniqueness lock acquired by the task, if any.func ( *RDB) ( context.Context, *base.TaskMessage) error {varerrors.Op = "rdb.Done" := .clock.Now() := .Add(statsTTL) := []string{base.ActiveKey(.Queue),base.LeaseKey(.Queue),base.TaskKey(.Queue, .ID),base.ProcessedKey(.Queue, ),base.ProcessedTotalKey(.Queue), } := []interface{}{ .ID, .Unix(),int64(math.MaxInt64), }// Note: We cannot pass empty unique key when running this script in redis-cluster.iflen(.UniqueKey) > 0 { = append(, .UniqueKey)return .runScript(, , doneUniqueCmd, , ...) }return .runScript(, , doneCmd, , ...)}// KEYS[1] -> asynq:{<qname>}:active// KEYS[2] -> asynq:{<qname>}:lease// KEYS[3] -> asynq:{<qname>}:completed// KEYS[4] -> asynq:{<qname>}:t:<task_id>// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>// KEYS[6] -> asynq:{<qname>}:processed//// ARGV[1] -> task ID// ARGV[2] -> stats expiration timestamp// ARGV[3] -> task expiration time in unix time// ARGV[4] -> task message data// ARGV[5] -> max int64 valuevar markAsCompleteCmd = redis.NewScript(`if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endif redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endif redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) ~= 1 then return redis.error_reply("INTERNAL")endredis.call("HSET", KEYS[4], "msg", ARGV[4], "state", "completed")local n = redis.call("INCR", KEYS[5])if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[2])endlocal total = redis.call("GET", KEYS[6])if tonumber(total) == tonumber(ARGV[5]) then redis.call("SET", KEYS[6], 1)else redis.call("INCR", KEYS[6])endreturn redis.status_reply("OK")`)// KEYS[1] -> asynq:{<qname>}:active// KEYS[2] -> asynq:{<qname>}:lease// KEYS[3] -> asynq:{<qname>}:completed// KEYS[4] -> asynq:{<qname>}:t:<task_id>// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>// KEYS[6] -> asynq:{<qname>}:processed// KEYS[7] -> asynq:{<qname>}:unique:{<checksum>}//// ARGV[1] -> task ID// ARGV[2] -> stats expiration timestamp// ARGV[3] -> task expiration time in unix time// ARGV[4] -> task message data// ARGV[5] -> max int64 valuevar markAsCompleteUniqueCmd = redis.NewScript(`if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endif redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endif redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) ~= 1 then return redis.error_reply("INTERNAL")endredis.call("HSET", KEYS[4], "msg", ARGV[4], "state", "completed")local n = redis.call("INCR", KEYS[5])if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[2])endlocal total = redis.call("GET", KEYS[6])if tonumber(total) == tonumber(ARGV[5]) then redis.call("SET", KEYS[6], 1)else redis.call("INCR", KEYS[6])endif redis.call("GET", KEYS[7]) == ARGV[1] then redis.call("DEL", KEYS[7])endreturn redis.status_reply("OK")`)// MarkAsComplete removes the task from active queue to mark the task as completed.// It removes a uniqueness lock acquired by the task, if any.func ( *RDB) ( context.Context, *base.TaskMessage) error {varerrors.Op = "rdb.MarkAsComplete" := .clock.Now() := .Add(statsTTL) .CompletedAt = .Unix() , := base.EncodeMessage()if != nil {returnerrors.E(, errors.Unknown, fmt.Sprintf("cannot encode message: %v", )) } := []string{base.ActiveKey(.Queue),base.LeaseKey(.Queue),base.CompletedKey(.Queue),base.TaskKey(.Queue, .ID),base.ProcessedKey(.Queue, ),base.ProcessedTotalKey(.Queue), } := []interface{}{ .ID, .Unix(), .Unix() + .Retention, ,int64(math.MaxInt64), }// Note: We cannot pass empty unique key when running this script in redis-cluster.iflen(.UniqueKey) > 0 { = append(, .UniqueKey)return .runScript(, , markAsCompleteUniqueCmd, , ...) }return .runScript(, , markAsCompleteCmd, , ...)}// KEYS[1] -> asynq:{<qname>}:active// KEYS[2] -> asynq:{<qname>}:lease// KEYS[3] -> asynq:{<qname>}:pending// KEYS[4] -> asynq:{<qname>}:t:<task_id>// ARGV[1] -> task ID// Note: Use RPUSH to push to the head of the queue.var requeueCmd = redis.NewScript(`if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endif redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endredis.call("RPUSH", KEYS[3], ARGV[1])redis.call("HSET", KEYS[4], "state", "pending")return redis.status_reply("OK")`)// Requeue moves the task from active queue to the specified queue.func ( *RDB) ( context.Context, *base.TaskMessage) error {varerrors.Op = "rdb.Requeue" := []string{base.ActiveKey(.Queue),base.LeaseKey(.Queue),base.PendingKey(.Queue),base.TaskKey(.Queue, .ID), }return .runScript(, , requeueCmd, , .ID)}// KEYS[1] -> asynq:{<qname>}:t:<task_id>// KEYS[2] -> asynq:{<qname>}:g:<group_key>// KEYS[3] -> asynq:{<qname>}:groups// -------// ARGV[1] -> task message data// ARGV[2] -> task ID// ARGV[3] -> current time in Unix time// ARGV[4] -> group key//// Output:// Returns 1 if successfully added// Returns 0 if task ID already existsvar addToGroupCmd = redis.NewScript(`if redis.call("EXISTS", KEYS[1]) == 1 then return 0endredis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "aggregating", "group", ARGV[4])redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])redis.call("SADD", KEYS[3], ARGV[4])return 1`)func ( *RDB) ( context.Context, *base.TaskMessage, string) error {varerrors.Op = "rdb.AddToGroup" , := base.EncodeMessage()if != nil {returnerrors.E(, errors.Unknown, fmt.Sprintf("cannot encode message: %v", )) }if := .client.SAdd(, base.AllQueues, .Queue).Err(); != nil {returnerrors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) } := []string{base.TaskKey(.Queue, .ID),base.GroupKey(.Queue, ),base.AllGroups(.Queue), } := []interface{}{ , .ID, .clock.Now().Unix(), , } , := .runScriptWithErrorCode(, , addToGroupCmd, , ...)if != nil {return }if == 0 {returnerrors.E(, errors.AlreadyExists, errors.ErrTaskIdConflict) }returnnil}// KEYS[1] -> asynq:{<qname>}:t:<task_id>// KEYS[2] -> asynq:{<qname>}:g:<group_key>// KEYS[3] -> asynq:{<qname>}:groups// KEYS[4] -> unique key// -------// ARGV[1] -> task message data// ARGV[2] -> task ID// ARGV[3] -> current time in Unix time// ARGV[4] -> group key// ARGV[5] -> uniqueness lock TTL//// Output:// Returns 1 if successfully added// Returns 0 if task ID already exists// Returns -1 if task unique key already existsvar addToGroupUniqueCmd = redis.NewScript(`local ok = redis.call("SET", KEYS[4], ARGV[2], "NX", "EX", ARGV[5])if not ok then return -1endif redis.call("EXISTS", KEYS[1]) == 1 then return 0endredis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "aggregating", "group", ARGV[4])redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])redis.call("SADD", KEYS[3], ARGV[4])return 1`)func ( *RDB) ( context.Context, *base.TaskMessage, string, time.Duration) error {varerrors.Op = "rdb.AddToGroupUnique" , := base.EncodeMessage()if != nil {returnerrors.E(, errors.Unknown, fmt.Sprintf("cannot encode message: %v", )) }if := .client.SAdd(, base.AllQueues, .Queue).Err(); != nil {returnerrors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) } := []string{base.TaskKey(.Queue, .ID),base.GroupKey(.Queue, ),base.AllGroups(.Queue),base.UniqueKey(.Queue, .Type, .Payload), } := []interface{}{ , .ID, .clock.Now().Unix(), ,int(.Seconds()), } , := .runScriptWithErrorCode(, , addToGroupUniqueCmd, , ...)if != nil {return }if == -1 {returnerrors.E(, errors.AlreadyExists, errors.ErrDuplicateTask) }if == 0 {returnerrors.E(, errors.AlreadyExists, errors.ErrTaskIdConflict) }returnnil}// KEYS[1] -> asynq:{<qname>}:t:<task_id>// KEYS[2] -> asynq:{<qname>}:scheduled// -------// ARGV[1] -> task message data// ARGV[2] -> process_at time in Unix time// ARGV[3] -> task ID//// Output:// Returns 1 if successfully enqueued// Returns 0 if task ID already existsvar scheduleCmd = redis.NewScript(`if redis.call("EXISTS", KEYS[1]) == 1 then return 0endredis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "scheduled")redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3])return 1`)// Schedule adds the task to the scheduled set to be processed in the future.func ( *RDB) ( context.Context, *base.TaskMessage, time.Time) error {varerrors.Op = "rdb.Schedule" , := base.EncodeMessage()if != nil {returnerrors.E(, errors.Unknown, fmt.Sprintf("cannot encode message: %v", )) }if := .client.SAdd(, base.AllQueues, .Queue).Err(); != nil {returnerrors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) } := []string{base.TaskKey(.Queue, .ID),base.ScheduledKey(.Queue), } := []interface{}{ , .Unix(), .ID, } , := .runScriptWithErrorCode(, , scheduleCmd, , ...)if != nil {return }if == 0 {returnerrors.E(, errors.AlreadyExists, errors.ErrTaskIdConflict) }returnnil}// KEYS[1] -> unique key// KEYS[2] -> asynq:{<qname>}:t:<task_id>// KEYS[3] -> asynq:{<qname>}:scheduled// -------// ARGV[1] -> task ID// ARGV[2] -> uniqueness lock TTL// ARGV[3] -> score (process_at timestamp)// ARGV[4] -> task message//// Output:// Returns 1 if successfully scheduled// Returns 0 if task ID already exists// Returns -1 if task unique key already existsvar scheduleUniqueCmd = redis.NewScript(`local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])if not ok then return -1endif redis.call("EXISTS", KEYS[2]) == 1 then return 0endredis.call("HSET", KEYS[2], "msg", ARGV[4], "state", "scheduled", "unique_key", KEYS[1])redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1])return 1`)// ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired.// It returns ErrDuplicateTask if the lock cannot be acquired.func ( *RDB) ( context.Context, *base.TaskMessage, time.Time, time.Duration) error {varerrors.Op = "rdb.ScheduleUnique" , := base.EncodeMessage()if != nil {returnerrors.E(, errors.Internal, fmt.Sprintf("cannot encode task message: %v", )) }if := .client.SAdd(, base.AllQueues, .Queue).Err(); != nil {returnerrors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) } := []string{ .UniqueKey,base.TaskKey(.Queue, .ID),base.ScheduledKey(.Queue), } := []interface{}{ .ID,int(.Seconds()), .Unix(), , } , := .runScriptWithErrorCode(, , scheduleUniqueCmd, , ...)if != nil {return }if == -1 {returnerrors.E(, errors.AlreadyExists, errors.ErrDuplicateTask) }if == 0 {returnerrors.E(, errors.AlreadyExists, errors.ErrTaskIdConflict) }returnnil}// KEYS[1] -> asynq:{<qname>}:t:<task_id>// KEYS[2] -> asynq:{<qname>}:active// KEYS[3] -> asynq:{<qname>}:lease// KEYS[4] -> asynq:{<qname>}:retry// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>// KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd>// KEYS[7] -> asynq:{<qname>}:processed// KEYS[8] -> asynq:{<qname>}:failed// -------// ARGV[1] -> task ID// ARGV[2] -> updated base.TaskMessage value// ARGV[3] -> retry_at UNIX timestamp// ARGV[4] -> stats expiration timestamp// ARGV[5] -> is_failure (bool)// ARGV[6] -> max int64 valuevar retryCmd = redis.NewScript(`if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endif redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endredis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "retry")if tonumber(ARGV[5]) == 1 then local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[4]) end local m = redis.call("INCR", KEYS[6]) if tonumber(m) == 1 then redis.call("EXPIREAT", KEYS[6], ARGV[4]) end local total = redis.call("GET", KEYS[7]) if tonumber(total) == tonumber(ARGV[6]) then redis.call("SET", KEYS[7], 1) redis.call("SET", KEYS[8], 1) else redis.call("INCR", KEYS[7]) redis.call("INCR", KEYS[8]) endendreturn redis.status_reply("OK")`)// Retry moves the task from active to retry queue.// It also annotates the message with the given error message and// if isFailure is true increments the retried counter.func ( *RDB) ( context.Context, *base.TaskMessage, time.Time, string, bool) error {varerrors.Op = "rdb.Retry" := .clock.Now() := *if { .Retried++ } .ErrorMsg = .LastFailedAt = .Unix() , := base.EncodeMessage(&)if != nil {returnerrors.E(, errors.Internal, fmt.Sprintf("cannot encode message: %v", )) } := .Add(statsTTL) := []string{base.TaskKey(.Queue, .ID),base.ActiveKey(.Queue),base.LeaseKey(.Queue),base.RetryKey(.Queue),base.ProcessedKey(.Queue, ),base.FailedKey(.Queue, ),base.ProcessedTotalKey(.Queue),base.FailedTotalKey(.Queue), } := []interface{}{ .ID, , .Unix(), .Unix(), ,int64(math.MaxInt64), }return .runScript(, , retryCmd, , ...)}const ( maxArchiveSize = 10000// maximum number of tasks in archive archivedExpirationInDays = 90// number of days before an archived task gets deleted permanently)// KEYS[1] -> asynq:{<qname>}:t:<task_id>// KEYS[2] -> asynq:{<qname>}:active// KEYS[3] -> asynq:{<qname>}:lease// KEYS[4] -> asynq:{<qname>}:archived// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>// KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd>// KEYS[7] -> asynq:{<qname>}:processed// KEYS[8] -> asynq:{<qname>}:failed// -------// ARGV[1] -> task ID// ARGV[2] -> updated base.TaskMessage value// ARGV[3] -> died_at UNIX timestamp// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)// ARGV[5] -> max number of tasks in archive (e.g., 100)// ARGV[6] -> stats expiration timestamp// ARGV[7] -> max int64 valuevar archiveCmd = redis.NewScript(`if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endif redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND")endredis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4])redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5])redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "archived")local n = redis.call("INCR", KEYS[5])if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[6])endlocal m = redis.call("INCR", KEYS[6])if tonumber(m) == 1 then redis.call("EXPIREAT", KEYS[6], ARGV[6])endlocal total = redis.call("GET", KEYS[7])if tonumber(total) == tonumber(ARGV[7]) then redis.call("SET", KEYS[7], 1) redis.call("SET", KEYS[8], 1)else redis.call("INCR", KEYS[7]) redis.call("INCR", KEYS[8])endreturn redis.status_reply("OK")`)// Archive sends the given task to archive, attaching the error message to the task.// It also trims the archive by timestamp and set size.func ( *RDB) ( context.Context, *base.TaskMessage, string) error {varerrors.Op = "rdb.Archive" := .clock.Now() := * .ErrorMsg = .LastFailedAt = .Unix() , := base.EncodeMessage(&)if != nil {returnerrors.E(, errors.Internal, fmt.Sprintf("cannot encode message: %v", )) } := .AddDate(0, 0, -archivedExpirationInDays) := .Add(statsTTL) := []string{base.TaskKey(.Queue, .ID),base.ActiveKey(.Queue),base.LeaseKey(.Queue),base.ArchivedKey(.Queue),base.ProcessedKey(.Queue, ),base.FailedKey(.Queue, ),base.ProcessedTotalKey(.Queue),base.FailedTotalKey(.Queue), } := []interface{}{ .ID, , .Unix(), .Unix(),maxArchiveSize, .Unix(),int64(math.MaxInt64), }return .runScript(, , archiveCmd, , ...)}// ForwardIfReady checks scheduled and retry sets of the given queues// and move any tasks that are ready to be processed to the pending set.func ( *RDB) ( ...string) error {varerrors.Op = "rdb.ForwardIfReady"for , := range {if := .forwardAll(); != nil {returnerrors.E(, errors.CanonicalCode(), ) } }returnnil}// KEYS[1] -> source queue (e.g. asynq:{<qname>:scheduled or asynq:{<qname>}:retry})// KEYS[2] -> asynq:{<qname>}:pending// ARGV[1] -> current unix time in seconds// ARGV[2] -> task key prefix// ARGV[3] -> current unix time in nsec// ARGV[4] -> group key prefix// Note: Script moves tasks up to 100 at a time to keep the runtime of script short.var forwardCmd = redis.NewScript(`local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)for _, id in ipairs(ids) do local taskKey = ARGV[2] .. id local group = redis.call("HGET", taskKey, "group") if group and group ~= '' then redis.call("ZADD", ARGV[4] .. group, ARGV[1], id) redis.call("ZREM", KEYS[1], id) redis.call("HSET", taskKey, "state", "aggregating") else redis.call("LPUSH", KEYS[2], id) redis.call("ZREM", KEYS[1], id) redis.call("HSET", taskKey, "state", "pending", "pending_since", ARGV[3]) endendreturn table.getn(ids)`)// forward moves tasks with a score less than the current unix time from the delayed (i.e. scheduled | retry) zset// to the pending list or group set.// It returns the number of tasks moved.func ( *RDB) (, , , string) (int, error) { := .clock.Now() := []string{, } := []interface{}{ .Unix(), , .UnixNano(), , } , := forwardCmd.Run(context.Background(), .client, , ...).Result()if != nil {return0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", )) } , := cast.ToIntE()if != nil {return0, errors.E(errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", )) }return , nil}// forwardAll checks for tasks in scheduled/retry state that are ready to be run, and updates// their state to "pending" or "aggregating".func ( *RDB) ( string) ( error) { := []string{base.ScheduledKey(), base.RetryKey()} := base.PendingKey() := base.TaskKeyPrefix() := base.GroupKeyPrefix()for , := range { := 1for != 0 { , = .forward(, , , )if != nil {return } } }returnnil}// ListGroups returns a list of all known groups in the given queue.func ( *RDB) ( string) ([]string, error) {varerrors.Op = "RDB.ListGroups" , := .client.SMembers(context.Background(), base.AllGroups()).Result()if != nil {returnnil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "smembers", Err: }) }return , nil}// aggregationCheckCmd checks the given group for whether to create an aggregation set.// An aggregation set is created if one of the aggregation criteria is met:// 1) group has reached or exceeded its max size// 2) group's oldest task has reached or exceeded its max delay// 3) group's latest task has reached or exceeded its grace period// if aggreation criteria is met, the command moves those tasks from the group// and put them in an aggregation set. Additionally, if the creation of aggregation set// empties the group, it will clear the group name from the all groups set.//// KEYS[1] -> asynq:{<qname>}:g:<gname>// KEYS[2] -> asynq:{<qname>}:g:<gname>:<aggregation_set_id>// KEYS[3] -> asynq:{<qname>}:aggregation_sets// KEYS[4] -> asynq:{<qname>}:groups// -------// ARGV[1] -> max group size// ARGV[2] -> max group delay in unix time// ARGV[3] -> start time of the grace period// ARGV[4] -> aggregation set expire time// ARGV[5] -> current time in unix time// ARGV[6] -> group name//// Output:// Returns 0 if no aggregation set was created// Returns 1 if an aggregation set was created//// Time Complexity:// O(log(N) + M) with N being the number tasks in the group zset// and M being the max size.var aggregationCheckCmd = redis.NewScript(`local size = redis.call("ZCARD", KEYS[1])if size == 0 then return 0endlocal maxSize = tonumber(ARGV[1])if maxSize ~= 0 and size >= maxSize then local res = redis.call("ZRANGE", KEYS[1], 0, maxSize-1, "WITHSCORES") for i=1, table.getn(res)-1, 2 do redis.call("ZADD", KEYS[2], tonumber(res[i+1]), res[i]) end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) if size == maxSize then redis.call("SREM", KEYS[4], ARGV[6]) end return 1endlocal maxDelay = tonumber(ARGV[2])local currentTime = tonumber(ARGV[5])if maxDelay ~= 0 then local oldestEntry = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES") local oldestEntryScore = tonumber(oldestEntry[2]) local maxDelayTime = currentTime - maxDelay if oldestEntryScore <= maxDelayTime then local res = redis.call("ZRANGE", KEYS[1], 0, maxSize-1, "WITHSCORES") for i=1, table.getn(res)-1, 2 do redis.call("ZADD", KEYS[2], tonumber(res[i+1]), res[i]) end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) if size <= maxSize or maxSize == 0 then redis.call("SREM", KEYS[4], ARGV[6]) end return 1 endendlocal latestEntry = redis.call("ZREVRANGE", KEYS[1], 0, 0, "WITHSCORES")local latestEntryScore = tonumber(latestEntry[2])local gracePeriodStartTime = currentTime - tonumber(ARGV[3])if latestEntryScore <= gracePeriodStartTime then local res = redis.call("ZRANGE", KEYS[1], 0, maxSize-1, "WITHSCORES") for i=1, table.getn(res)-1, 2 do redis.call("ZADD", KEYS[2], tonumber(res[i+1]), res[i]) end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) if size <= maxSize or maxSize == 0 then redis.call("SREM", KEYS[4], ARGV[6]) end return 1endreturn 0`)// Task aggregation should finish within this timeout.// Otherwise an aggregation set should be reclaimed by the recoverer.const aggregationTimeout = 2 * time.Minute// AggregationCheck checks the group identified by the given queue and group name to see if the tasks in the// group are ready to be aggregated. If so, it moves the tasks to be aggregated to a aggregation set and returns// the set ID. If not, it returns an empty string for the set ID.// The time for gracePeriod and maxDelay is computed relative to the time t.//// Note: It assumes that this function is called at frequency less than or equal to the gracePeriod. In other words,// the function only checks the most recently added task against the given gracePeriod.func ( *RDB) (, string, time.Time, , time.Duration, int) (string, error) {varerrors.Op = "RDB.AggregationCheck" := uuid.NewString() := .clock.Now().Add(aggregationTimeout) := []string{base.GroupKey(, ),base.AggregationSetKey(, , ),base.AllAggregationSets(),base.AllGroups(), } := []interface{}{ ,int64(.Seconds()),int64(.Seconds()), .Unix(), .Unix(), , } , := .runScriptWithErrorCode(context.Background(), , aggregationCheckCmd, , ...)if != nil {return"", }switch {case0:return"", nilcase1:return , nildefault:return"", errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from lua script: %d", )) }}// KEYS[1] -> asynq:{<qname>}:g:<gname>:<aggregation_set_id>// ------// ARGV[1] -> task key prefix//// Output:// Array of encoded task messages//// Time Complexity:// O(N) with N being the number of tasks in the aggregation set.var readAggregationSetCmd = redis.NewScript(`local msgs = {}local ids = redis.call("ZRANGE", KEYS[1], 0, -1)for _, id in ipairs(ids) do local key = ARGV[1] .. id table.insert(msgs, redis.call("HGET", key, "msg"))endreturn msgs`)// ReadAggregationSet retrieves members of an aggregation set and returns a list of tasks in the set and// the deadline for aggregating those tasks.func ( *RDB) (, , string) ([]*base.TaskMessage, time.Time, error) {varerrors.Op = "RDB.ReadAggregationSet" := context.Background() := base.AggregationSetKey(, , ) , := readAggregationSetCmd.Run(, .client, []string{}, base.TaskKeyPrefix()).Result()if != nil {returnnil, time.Time{}, errors.E(, errors.Unknown, fmt.Sprintf("redis eval error: %v", )) } , := cast.ToStringSliceE()if != nil {returnnil, time.Time{}, errors.E(, errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", )) }var []*base.TaskMessagefor , := range { , := base.DecodeMessage([]byte())if != nil {returnnil, time.Time{}, errors.E(, errors.Internal, fmt.Sprintf("cannot decode message: %v", )) } = append(, ) } , := .client.ZScore(, base.AllAggregationSets(), ).Result()if != nil {returnnil, time.Time{}, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "zscore", Err: }) }return , time.Unix(int64(), 0), nil}// KEYS[1] -> asynq:{<qname>}:g:<gname>:<aggregation_set_id>// KEYS[2] -> asynq:{<qname>}:aggregation_sets// -------// ARGV[1] -> task key prefix//// Output:// Redis status reply//// Time Complexity:// max(O(N), O(log(M))) with N being the number of tasks in the aggregation set// and M being the number of elements in the all-aggregation-sets list.var deleteAggregationSetCmd = redis.NewScript(`local ids = redis.call("ZRANGE", KEYS[1], 0, -1)for _, id in ipairs(ids) do redis.call("DEL", ARGV[1] .. id)endredis.call("DEL", KEYS[1])redis.call("ZREM", KEYS[2], KEYS[1])return redis.status_reply("OK")`)// DeleteAggregationSet deletes the aggregation set and its members identified by the parameters.func ( *RDB) ( context.Context, , , string) error {varerrors.Op = "RDB.DeleteAggregationSet" := []string{base.AggregationSetKey(, , ),base.AllAggregationSets(), }return .runScript(, , deleteAggregationSetCmd, , base.TaskKeyPrefix())}// KEYS[1] -> asynq:{<qname>}:aggregation_sets// -------// ARGV[1] -> current time in unix timevar reclaimStateAggregationSetsCmd = redis.NewScript(`local staleSetKeys = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])for _, key in ipairs(staleSetKeys) do local idx = string.find(key, ":[^:]*$") local groupKey = string.sub(key, 1, idx-1) local res = redis.call("ZRANGE", key, 0, -1, "WITHSCORES") for i=1, table.getn(res)-1, 2 do redis.call("ZADD", groupKey, tonumber(res[i+1]), res[i]) end redis.call("DEL", key)endredis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])return redis.status_reply("OK")`)// ReclaimStateAggregationSets checks for any stale aggregation sets in the given queue, and// reclaim tasks in the stale aggregation set by putting them back in the group.func ( *RDB) ( string) error {varerrors.Op = "RDB.ReclaimStaleAggregationSets"return .runScript(context.Background(), , reclaimStateAggregationSetsCmd, []string{base.AllAggregationSets()}, .clock.Now().Unix())}// KEYS[1] -> asynq:{<qname>}:completed// ARGV[1] -> current time in unix time// ARGV[2] -> task key prefix// ARGV[3] -> batch size (i.e. maximum number of tasks to delete)//// Returns the number of tasks deleted.var deleteExpiredCompletedTasksCmd = redis.NewScript(`local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, tonumber(ARGV[3]))for _, id in ipairs(ids) do redis.call("DEL", ARGV[2] .. id) redis.call("ZREM", KEYS[1], id)endreturn table.getn(ids)`)// DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set,// and delete all expired tasks.func ( *RDB) ( string) error {// Note: Do this operation in fix batches to prevent long running script.const = 100for { , := .deleteExpiredCompletedTasks(, )if != nil {return }if == 0 {returnnil } }}// deleteExpiredCompletedTasks runs the lua script to delete expired deleted task with the specified// batch size. It reports the number of tasks deleted.func ( *RDB) ( string, int) (int64, error) {varerrors.Op = "rdb.DeleteExpiredCompletedTasks" := []string{base.CompletedKey()} := []interface{}{ .clock.Now().Unix(),base.TaskKeyPrefix(), , } , := deleteExpiredCompletedTasksCmd.Run(context.Background(), .client, , ...).Result()if != nil {return0, errors.E(, errors.Internal, fmt.Sprintf("redis eval error: %v", )) } , := .(int64)if ! {return0, errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", )) }return , nil}// KEYS[1] -> asynq:{<qname>}:lease// ARGV[1] -> cutoff in unix time// ARGV[2] -> task key prefixvar listLeaseExpiredCmd = redis.NewScript(`local res = {}local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])for _, id in ipairs(ids) do local key = ARGV[2] .. id table.insert(res, redis.call("HGET", key, "msg"))endreturn res`)// ListLeaseExpired returns a list of task messages with an expired lease from the given queues.func ( *RDB) ( time.Time, ...string) ([]*base.TaskMessage, error) {varerrors.Op = "rdb.ListLeaseExpired"var []*base.TaskMessagefor , := range { , := listLeaseExpiredCmd.Run(context.Background(), .client, []string{base.LeaseKey()}, .Unix(), base.TaskKeyPrefix()).Result()if != nil {returnnil, errors.E(, errors.Internal, fmt.Sprintf("redis eval error: %v", )) } , := cast.ToStringSliceE()if != nil {returnnil, errors.E(, errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", )) }for , := range { , := base.DecodeMessage([]byte())if != nil {returnnil, errors.E(, errors.Internal, fmt.Sprintf("cannot decode message: %v", )) } = append(, ) } }return , nil}// ExtendLease extends the lease for the given tasks by LeaseDuration (30s).// It returns a new expiration time if the operation was successful.func ( *RDB) ( string, ...string) ( time.Time, error) { := .clock.Now().Add(LeaseDuration)var []redis.Zfor , := range { = append(, redis.Z{Member: , Score: float64(.Unix())}) }// Use XX option to only update elements that already exist; Don't add new elements // TODO: Consider adding GT option to ensure we only "extend" the lease. Ceveat is that GT is supported from redis v6.2.0 or above. = .client.ZAddXX(context.Background(), base.LeaseKey(), ...).Err()if != nil {returntime.Time{}, }return , nil}// KEYS[1] -> asynq:servers:{<host:pid:sid>}// KEYS[2] -> asynq:workers:{<host:pid:sid>}// ARGV[1] -> TTL in seconds// ARGV[2] -> server info// ARGV[3:] -> alternate key-value pair of (worker id, worker data)// Note: Add key to ZSET with expiration time as score.// ref: https://github.com/antirez/redis/issues/135#issuecomment-2361996var writeServerStateCmd = redis.NewScript(`redis.call("SETEX", KEYS[1], ARGV[1], ARGV[2])redis.call("DEL", KEYS[2])for i = 3, table.getn(ARGV)-1, 2 do redis.call("HSET", KEYS[2], ARGV[i], ARGV[i+1])endredis.call("EXPIRE", KEYS[2], ARGV[1])return redis.status_reply("OK")`)// WriteServerState writes server state data to redis with expiration set to the value ttl.func ( *RDB) ( *base.ServerInfo, []*base.WorkerInfo, time.Duration) error {varerrors.Op = "rdb.WriteServerState" := context.Background() , := base.EncodeServerInfo()if != nil {returnerrors.E(, errors.Internal, fmt.Sprintf("cannot encode server info: %v", )) } := .clock.Now().Add().UTC() := []interface{}{.Seconds(), } // args to the lua scriptfor , := range { , := base.EncodeWorkerInfo()if != nil {continue// skip bad data } = append(, .ID, ) } := base.ServerInfoKey(.Host, .PID, .ServerID) := base.WorkersKey(.Host, .PID, .ServerID)if := .client.ZAdd(, base.AllServers, redis.Z{Score: float64(.Unix()), Member: }).Err(); != nil {returnerrors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) }if := .client.ZAdd(, base.AllWorkers, redis.Z{Score: float64(.Unix()), Member: }).Err(); != nil {returnerrors.E(, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: }) }return .runScript(, , writeServerStateCmd, []string{, }, ...)}// KEYS[1] -> asynq:servers:{<host:pid:sid>}// KEYS[2] -> asynq:workers:{<host:pid:sid>}var clearServerStateCmd = redis.NewScript(`redis.call("DEL", KEYS[1])redis.call("DEL", KEYS[2])return redis.status_reply("OK")`)// ClearServerState deletes server state data from redis.func ( *RDB) ( string, int, string) error {varerrors.Op = "rdb.ClearServerState" := context.Background() := base.ServerInfoKey(, , ) := base.WorkersKey(, , )if := .client.ZRem(, base.AllServers, ).Err(); != nil {returnerrors.E(, errors.Internal, &errors.RedisCommandError{Command: "zrem", Err: }) }if := .client.ZRem(, base.AllWorkers, ).Err(); != nil {returnerrors.E(, errors.Internal, &errors.RedisCommandError{Command: "zrem", Err: }) }return .runScript(, , clearServerStateCmd, []string{, })}// KEYS[1] -> asynq:schedulers:{<schedulerID>}// ARGV[1] -> TTL in seconds// ARGV[2:] -> schedler entriesvar writeSchedulerEntriesCmd = redis.NewScript(`redis.call("DEL", KEYS[1])for i = 2, #ARGV do redis.call("LPUSH", KEYS[1], ARGV[i])endredis.call("EXPIRE", KEYS[1], ARGV[1])return redis.status_reply("OK")`)// WriteSchedulerEntries writes scheduler entries data to redis with expiration set to the value ttl.func ( *RDB) ( string, []*base.SchedulerEntry, time.Duration) error {varerrors.Op = "rdb.WriteSchedulerEntries" := context.Background() := []interface{}{.Seconds()}for , := range { , := base.EncodeSchedulerEntry()if != nil {continue// skip bad data } = append(, ) } := .clock.Now().Add().UTC() := base.SchedulerEntriesKey() := .client.ZAdd(, base.AllSchedulers, redis.Z{Score: float64(.Unix()), Member: }).Err()if != nil {returnerrors.E(, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: }) }return .runScript(, , writeSchedulerEntriesCmd, []string{}, ...)}// ClearSchedulerEntries deletes scheduler entries data from redis.func ( *RDB) ( string) error {varerrors.Op = "rdb.ClearSchedulerEntries" := context.Background() := base.SchedulerEntriesKey()if := .client.ZRem(, base.AllSchedulers, ).Err(); != nil {returnerrors.E(, errors.Unknown, &errors.RedisCommandError{Command: "zrem", Err: }) }if := .client.Del(, ).Err(); != nil {returnerrors.E(, errors.Unknown, &errors.RedisCommandError{Command: "del", Err: }) }returnnil}// CancelationPubSub returns a pubsub for cancelation messages.func ( *RDB) () (*redis.PubSub, error) {varerrors.Op = "rdb.CancelationPubSub" := context.Background() := .client.Subscribe(, base.CancelChannel) , := .Receive()if != nil {returnnil, errors.E(, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", )) }return , nil}// PublishCancelation publish cancelation message to all subscribers.// The message is the ID for the task to be canceled.func ( *RDB) ( string) error {varerrors.Op = "rdb.PublishCancelation" := context.Background()if := .client.Publish(, base.CancelChannel, ).Err(); != nil {returnerrors.E(, errors.Unknown, fmt.Sprintf("redis pubsub publish error: %v", )) }returnnil}// KEYS[1] -> asynq:scheduler_history:<entryID>// ARGV[1] -> enqueued_at timestamp// ARGV[2] -> serialized SchedulerEnqueueEvent data// ARGV[3] -> max number of events to be persistedvar recordSchedulerEnqueueEventCmd = redis.NewScript(`redis.call("ZREMRANGEBYRANK", KEYS[1], 0, -ARGV[3])redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2])return redis.status_reply("OK")`)// Maximum number of enqueue events to store per entry.const maxEvents = 1000// RecordSchedulerEnqueueEvent records the time when the given task was enqueued.func ( *RDB) ( string, *base.SchedulerEnqueueEvent) error {varerrors.Op = "rdb.RecordSchedulerEnqueueEvent" := context.Background() , := base.EncodeSchedulerEnqueueEvent()if != nil {returnerrors.E(, errors.Internal, fmt.Sprintf("cannot encode scheduler enqueue event: %v", )) } := []string{base.SchedulerHistoryKey(), } := []interface{}{ .EnqueuedAt.Unix(), ,maxEvents, }return .runScript(, , recordSchedulerEnqueueEventCmd, , ...)}// ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry.func ( *RDB) ( string) error {varerrors.Op = "rdb.ClearSchedulerHistory" := context.Background() := base.SchedulerHistoryKey()if := .client.Del(, ).Err(); != nil {returnerrors.E(, errors.Unknown, &errors.RedisCommandError{Command: "del", Err: }) }returnnil}// WriteResult writes the given result data for the specified task.func ( *RDB) (, string, []byte) (int, error) {varerrors.Op = "rdb.WriteResult" := context.Background() := base.TaskKey(, )if := .client.HSet(, , "result", ).Err(); != nil {return0, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "hset", Err: }) }returnlen(), nil}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.