// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

// Package rdb encapsulates the interactions with redis.
package rdb import ( ) const statsTTL = 90 * 24 * time.Hour // 90 days // LeaseDuration is the duration used to initially create a lease and to extend it thereafter. const LeaseDuration = 30 * time.Second // RDB is a client interface to query and mutate task queues. type RDB struct { client redis.UniversalClient clock timeutil.Clock } // NewRDB returns a new instance of RDB. func ( redis.UniversalClient) *RDB { return &RDB{ client: , clock: timeutil.NewRealClock(), } } // Close closes the connection with redis server. func ( *RDB) () error { return .client.Close() } // Client returns the reference to underlying redis client. func ( *RDB) () redis.UniversalClient { return .client } // SetClock sets the clock used by RDB to the given clock. // // Use this function to set the clock to SimulatedClock in tests. func ( *RDB) ( timeutil.Clock) { .clock = } // Ping checks the connection with redis server. func ( *RDB) () error { return .client.Ping(context.Background()).Err() } func ( *RDB) ( context.Context, errors.Op, *redis.Script, []string, ...interface{}) error { if := .Run(, .client, , ...).Err(); != nil { return errors.E(, errors.Internal, fmt.Sprintf("redis eval error: %v", )) } return nil } // Runs the given script with keys and args and returns the script's return value as int64. func ( *RDB) ( context.Context, errors.Op, *redis.Script, []string, ...interface{}) (int64, error) { , := .Run(, .client, , ...).Result() if != nil { return 0, errors.E(, errors.Unknown, fmt.Sprintf("redis eval error: %v", )) } , := .(int64) if ! { return 0, errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", )) } return , nil } // enqueueCmd enqueues a given task message. // // Input: // KEYS[1] -> asynq:{<qname>}:t:<task_id> // KEYS[2] -> asynq:{<qname>}:pending // -- // ARGV[1] -> task message data // ARGV[2] -> task ID // ARGV[3] -> current unix time in nsec // // Output: // Returns 1 if successfully enqueued // Returns 0 if task ID already exists var enqueueCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[1]) == 1 then return 0 end redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "pending", "pending_since", ARGV[3]) redis.call("LPUSH", KEYS[2], ARGV[2]) return 1 `) // Enqueue adds the given task to the pending list of the queue. func ( *RDB) ( context.Context, *base.TaskMessage) error { var errors.Op = "rdb.Enqueue" , := base.EncodeMessage() if != nil { return errors.E(, errors.Unknown, fmt.Sprintf("cannot encode message: %v", )) } if := .client.SAdd(, base.AllQueues, .Queue).Err(); != nil { return errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) } := []string{ base.TaskKey(.Queue, .ID), base.PendingKey(.Queue), } := []interface{}{ , .ID, .clock.Now().UnixNano(), } , := .runScriptWithErrorCode(, , enqueueCmd, , ...) if != nil { return } if == 0 { return errors.E(, errors.AlreadyExists, errors.ErrTaskIdConflict) } return nil } // enqueueUniqueCmd enqueues the task message if the task is unique. // // KEYS[1] -> unique key // KEYS[2] -> asynq:{<qname>}:t:<taskid> // KEYS[3] -> asynq:{<qname>}:pending // -- // ARGV[1] -> task ID // ARGV[2] -> uniqueness lock TTL // ARGV[3] -> task message data // ARGV[4] -> current unix time in nsec // // Output: // Returns 1 if successfully enqueued // Returns 0 if task ID conflicts with another task // Returns -1 if task unique key already exists var enqueueUniqueCmd = redis.NewScript(` local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) if not ok then return -1 end if redis.call("EXISTS", KEYS[2]) == 1 then return 0 end redis.call("HSET", KEYS[2], "msg", ARGV[3], "state", "pending", "pending_since", ARGV[4], "unique_key", KEYS[1]) redis.call("LPUSH", KEYS[3], ARGV[1]) return 1 `) // EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. // It returns ErrDuplicateTask if the lock cannot be acquired. func ( *RDB) ( context.Context, *base.TaskMessage, time.Duration) error { var errors.Op = "rdb.EnqueueUnique" , := base.EncodeMessage() if != nil { return errors.E(, errors.Internal, "cannot encode task message: %v", ) } if := .client.SAdd(, base.AllQueues, .Queue).Err(); != nil { return errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) } := []string{ .UniqueKey, base.TaskKey(.Queue, .ID), base.PendingKey(.Queue), } := []interface{}{ .ID, int(.Seconds()), , .clock.Now().UnixNano(), } , := .runScriptWithErrorCode(, , enqueueUniqueCmd, , ...) if != nil { return } if == -1 { return errors.E(, errors.AlreadyExists, errors.ErrDuplicateTask) } if == 0 { return errors.E(, errors.AlreadyExists, errors.ErrTaskIdConflict) } return nil } // Input: // KEYS[1] -> asynq:{<qname>}:pending // KEYS[2] -> asynq:{<qname>}:paused // KEYS[3] -> asynq:{<qname>}:active // KEYS[4] -> asynq:{<qname>}:lease // -- // ARGV[1] -> initial lease expiration Unix time // ARGV[2] -> task key prefix // // Output: // Returns nil if no processable task is found in the given queue. // Returns an encoded TaskMessage. // // Note: dequeueCmd checks whether a queue is paused first, before // calling RPOPLPUSH to pop a task from the queue. var dequeueCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[2]) == 0 then local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3]) if id then local key = ARGV[2] .. id redis.call("HSET", key, "state", "active") redis.call("HDEL", key, "pending_since") redis.call("ZADD", KEYS[4], ARGV[1], id) return redis.call("HGET", key, "msg") end end return nil`) // Dequeue queries given queues in order and pops a task message // off a queue if one exists and returns the message and its lease expiration time. // Dequeue skips a queue if the queue is paused. // If all queues are empty, ErrNoProcessableTask error is returned. func ( *RDB) ( ...string) ( *base.TaskMessage, time.Time, error) { var errors.Op = "rdb.Dequeue" for , := range { := []string{ base.PendingKey(), base.PausedKey(), base.ActiveKey(), base.LeaseKey(), } = .clock.Now().Add(LeaseDuration) := []interface{}{ .Unix(), base.TaskKeyPrefix(), } , := dequeueCmd.Run(context.Background(), .client, , ...).Result() if == redis.Nil { continue } else if != nil { return nil, time.Time{}, errors.E(, errors.Unknown, fmt.Sprintf("redis eval error: %v", )) } , := cast.ToStringE() if != nil { return nil, time.Time{}, errors.E(, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", )) } if , = base.DecodeMessage([]byte()); != nil { return nil, time.Time{}, errors.E(, errors.Internal, fmt.Sprintf("cannot decode message: %v", )) } return , , nil } return nil, time.Time{}, errors.E(, errors.NotFound, errors.ErrNoProcessableTask) } // KEYS[1] -> asynq:{<qname>}:active // KEYS[2] -> asynq:{<qname>}:lease // KEYS[3] -> asynq:{<qname>}:t:<task_id> // KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[5] -> asynq:{<qname>}:processed // ------- // ARGV[1] -> task ID // ARGV[2] -> stats expiration timestamp // ARGV[3] -> max int64 value var doneCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end if redis.call("DEL", KEYS[3]) == 0 then return redis.error_reply("NOT FOUND") end local n = redis.call("INCR", KEYS[4]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[4], ARGV[2]) end local total = redis.call("GET", KEYS[5]) if tonumber(total) == tonumber(ARGV[3]) then redis.call("SET", KEYS[5], 1) else redis.call("INCR", KEYS[5]) end return redis.status_reply("OK") `) // KEYS[1] -> asynq:{<qname>}:active // KEYS[2] -> asynq:{<qname>}:lease // KEYS[3] -> asynq:{<qname>}:t:<task_id> // KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[5] -> asynq:{<qname>}:processed // KEYS[6] -> unique key // ------- // ARGV[1] -> task ID // ARGV[2] -> stats expiration timestamp // ARGV[3] -> max int64 value var doneUniqueCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end if redis.call("DEL", KEYS[3]) == 0 then return redis.error_reply("NOT FOUND") end local n = redis.call("INCR", KEYS[4]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[4], ARGV[2]) end local total = redis.call("GET", KEYS[5]) if tonumber(total) == tonumber(ARGV[3]) then redis.call("SET", KEYS[5], 1) else redis.call("INCR", KEYS[5]) end if redis.call("GET", KEYS[6]) == ARGV[1] then redis.call("DEL", KEYS[6]) end return redis.status_reply("OK") `) // Done removes the task from active queue and deletes the task. // It removes a uniqueness lock acquired by the task, if any. func ( *RDB) ( context.Context, *base.TaskMessage) error { var errors.Op = "rdb.Done" := .clock.Now() := .Add(statsTTL) := []string{ base.ActiveKey(.Queue), base.LeaseKey(.Queue), base.TaskKey(.Queue, .ID), base.ProcessedKey(.Queue, ), base.ProcessedTotalKey(.Queue), } := []interface{}{ .ID, .Unix(), int64(math.MaxInt64), } // Note: We cannot pass empty unique key when running this script in redis-cluster. if len(.UniqueKey) > 0 { = append(, .UniqueKey) return .runScript(, , doneUniqueCmd, , ...) } return .runScript(, , doneCmd, , ...) } // KEYS[1] -> asynq:{<qname>}:active // KEYS[2] -> asynq:{<qname>}:lease // KEYS[3] -> asynq:{<qname>}:completed // KEYS[4] -> asynq:{<qname>}:t:<task_id> // KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[6] -> asynq:{<qname>}:processed // // ARGV[1] -> task ID // ARGV[2] -> stats expiration timestamp // ARGV[3] -> task expiration time in unix time // ARGV[4] -> task message data // ARGV[5] -> max int64 value var markAsCompleteCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end if redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) ~= 1 then return redis.error_reply("INTERNAL") end redis.call("HSET", KEYS[4], "msg", ARGV[4], "state", "completed") local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[2]) end local total = redis.call("GET", KEYS[6]) if tonumber(total) == tonumber(ARGV[5]) then redis.call("SET", KEYS[6], 1) else redis.call("INCR", KEYS[6]) end return redis.status_reply("OK") `) // KEYS[1] -> asynq:{<qname>}:active // KEYS[2] -> asynq:{<qname>}:lease // KEYS[3] -> asynq:{<qname>}:completed // KEYS[4] -> asynq:{<qname>}:t:<task_id> // KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[6] -> asynq:{<qname>}:processed // KEYS[7] -> asynq:{<qname>}:unique:{<checksum>} // // ARGV[1] -> task ID // ARGV[2] -> stats expiration timestamp // ARGV[3] -> task expiration time in unix time // ARGV[4] -> task message data // ARGV[5] -> max int64 value var markAsCompleteUniqueCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end if redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) ~= 1 then return redis.error_reply("INTERNAL") end redis.call("HSET", KEYS[4], "msg", ARGV[4], "state", "completed") local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[2]) end local total = redis.call("GET", KEYS[6]) if tonumber(total) == tonumber(ARGV[5]) then redis.call("SET", KEYS[6], 1) else redis.call("INCR", KEYS[6]) end if redis.call("GET", KEYS[7]) == ARGV[1] then redis.call("DEL", KEYS[7]) end return redis.status_reply("OK") `) // MarkAsComplete removes the task from active queue to mark the task as completed. // It removes a uniqueness lock acquired by the task, if any. func ( *RDB) ( context.Context, *base.TaskMessage) error { var errors.Op = "rdb.MarkAsComplete" := .clock.Now() := .Add(statsTTL) .CompletedAt = .Unix() , := base.EncodeMessage() if != nil { return errors.E(, errors.Unknown, fmt.Sprintf("cannot encode message: %v", )) } := []string{ base.ActiveKey(.Queue), base.LeaseKey(.Queue), base.CompletedKey(.Queue), base.TaskKey(.Queue, .ID), base.ProcessedKey(.Queue, ), base.ProcessedTotalKey(.Queue), } := []interface{}{ .ID, .Unix(), .Unix() + .Retention, , int64(math.MaxInt64), } // Note: We cannot pass empty unique key when running this script in redis-cluster. if len(.UniqueKey) > 0 { = append(, .UniqueKey) return .runScript(, , markAsCompleteUniqueCmd, , ...) } return .runScript(, , markAsCompleteCmd, , ...) } // KEYS[1] -> asynq:{<qname>}:active // KEYS[2] -> asynq:{<qname>}:lease // KEYS[3] -> asynq:{<qname>}:pending // KEYS[4] -> asynq:{<qname>}:t:<task_id> // ARGV[1] -> task ID // Note: Use RPUSH to push to the head of the queue. var requeueCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end redis.call("RPUSH", KEYS[3], ARGV[1]) redis.call("HSET", KEYS[4], "state", "pending") return redis.status_reply("OK")`) // Requeue moves the task from active queue to the specified queue. func ( *RDB) ( context.Context, *base.TaskMessage) error { var errors.Op = "rdb.Requeue" := []string{ base.ActiveKey(.Queue), base.LeaseKey(.Queue), base.PendingKey(.Queue), base.TaskKey(.Queue, .ID), } return .runScript(, , requeueCmd, , .ID) } // KEYS[1] -> asynq:{<qname>}:t:<task_id> // KEYS[2] -> asynq:{<qname>}:g:<group_key> // KEYS[3] -> asynq:{<qname>}:groups // ------- // ARGV[1] -> task message data // ARGV[2] -> task ID // ARGV[3] -> current time in Unix time // ARGV[4] -> group key // // Output: // Returns 1 if successfully added // Returns 0 if task ID already exists var addToGroupCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[1]) == 1 then return 0 end redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "aggregating", "group", ARGV[4]) redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) redis.call("SADD", KEYS[3], ARGV[4]) return 1 `) func ( *RDB) ( context.Context, *base.TaskMessage, string) error { var errors.Op = "rdb.AddToGroup" , := base.EncodeMessage() if != nil { return errors.E(, errors.Unknown, fmt.Sprintf("cannot encode message: %v", )) } if := .client.SAdd(, base.AllQueues, .Queue).Err(); != nil { return errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) } := []string{ base.TaskKey(.Queue, .ID), base.GroupKey(.Queue, ), base.AllGroups(.Queue), } := []interface{}{ , .ID, .clock.Now().Unix(), , } , := .runScriptWithErrorCode(, , addToGroupCmd, , ...) if != nil { return } if == 0 { return errors.E(, errors.AlreadyExists, errors.ErrTaskIdConflict) } return nil } // KEYS[1] -> asynq:{<qname>}:t:<task_id> // KEYS[2] -> asynq:{<qname>}:g:<group_key> // KEYS[3] -> asynq:{<qname>}:groups // KEYS[4] -> unique key // ------- // ARGV[1] -> task message data // ARGV[2] -> task ID // ARGV[3] -> current time in Unix time // ARGV[4] -> group key // ARGV[5] -> uniqueness lock TTL // // Output: // Returns 1 if successfully added // Returns 0 if task ID already exists // Returns -1 if task unique key already exists var addToGroupUniqueCmd = redis.NewScript(` local ok = redis.call("SET", KEYS[4], ARGV[2], "NX", "EX", ARGV[5]) if not ok then return -1 end if redis.call("EXISTS", KEYS[1]) == 1 then return 0 end redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "aggregating", "group", ARGV[4]) redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) redis.call("SADD", KEYS[3], ARGV[4]) return 1 `) func ( *RDB) ( context.Context, *base.TaskMessage, string, time.Duration) error { var errors.Op = "rdb.AddToGroupUnique" , := base.EncodeMessage() if != nil { return errors.E(, errors.Unknown, fmt.Sprintf("cannot encode message: %v", )) } if := .client.SAdd(, base.AllQueues, .Queue).Err(); != nil { return errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) } := []string{ base.TaskKey(.Queue, .ID), base.GroupKey(.Queue, ), base.AllGroups(.Queue), base.UniqueKey(.Queue, .Type, .Payload), } := []interface{}{ , .ID, .clock.Now().Unix(), , int(.Seconds()), } , := .runScriptWithErrorCode(, , addToGroupUniqueCmd, , ...) if != nil { return } if == -1 { return errors.E(, errors.AlreadyExists, errors.ErrDuplicateTask) } if == 0 { return errors.E(, errors.AlreadyExists, errors.ErrTaskIdConflict) } return nil } // KEYS[1] -> asynq:{<qname>}:t:<task_id> // KEYS[2] -> asynq:{<qname>}:scheduled // ------- // ARGV[1] -> task message data // ARGV[2] -> process_at time in Unix time // ARGV[3] -> task ID // // Output: // Returns 1 if successfully enqueued // Returns 0 if task ID already exists var scheduleCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[1]) == 1 then return 0 end redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "scheduled") redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3]) return 1 `) // Schedule adds the task to the scheduled set to be processed in the future. func ( *RDB) ( context.Context, *base.TaskMessage, time.Time) error { var errors.Op = "rdb.Schedule" , := base.EncodeMessage() if != nil { return errors.E(, errors.Unknown, fmt.Sprintf("cannot encode message: %v", )) } if := .client.SAdd(, base.AllQueues, .Queue).Err(); != nil { return errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) } := []string{ base.TaskKey(.Queue, .ID), base.ScheduledKey(.Queue), } := []interface{}{ , .Unix(), .ID, } , := .runScriptWithErrorCode(, , scheduleCmd, , ...) if != nil { return } if == 0 { return errors.E(, errors.AlreadyExists, errors.ErrTaskIdConflict) } return nil } // KEYS[1] -> unique key // KEYS[2] -> asynq:{<qname>}:t:<task_id> // KEYS[3] -> asynq:{<qname>}:scheduled // ------- // ARGV[1] -> task ID // ARGV[2] -> uniqueness lock TTL // ARGV[3] -> score (process_at timestamp) // ARGV[4] -> task message // // Output: // Returns 1 if successfully scheduled // Returns 0 if task ID already exists // Returns -1 if task unique key already exists var scheduleUniqueCmd = redis.NewScript(` local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) if not ok then return -1 end if redis.call("EXISTS", KEYS[2]) == 1 then return 0 end redis.call("HSET", KEYS[2], "msg", ARGV[4], "state", "scheduled", "unique_key", KEYS[1]) redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) return 1 `) // ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. // It returns ErrDuplicateTask if the lock cannot be acquired. func ( *RDB) ( context.Context, *base.TaskMessage, time.Time, time.Duration) error { var errors.Op = "rdb.ScheduleUnique" , := base.EncodeMessage() if != nil { return errors.E(, errors.Internal, fmt.Sprintf("cannot encode task message: %v", )) } if := .client.SAdd(, base.AllQueues, .Queue).Err(); != nil { return errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) } := []string{ .UniqueKey, base.TaskKey(.Queue, .ID), base.ScheduledKey(.Queue), } := []interface{}{ .ID, int(.Seconds()), .Unix(), , } , := .runScriptWithErrorCode(, , scheduleUniqueCmd, , ...) if != nil { return } if == -1 { return errors.E(, errors.AlreadyExists, errors.ErrDuplicateTask) } if == 0 { return errors.E(, errors.AlreadyExists, errors.ErrTaskIdConflict) } return nil } // KEYS[1] -> asynq:{<qname>}:t:<task_id> // KEYS[2] -> asynq:{<qname>}:active // KEYS[3] -> asynq:{<qname>}:lease // KEYS[4] -> asynq:{<qname>}:retry // KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd> // KEYS[7] -> asynq:{<qname>}:processed // KEYS[8] -> asynq:{<qname>}:failed // ------- // ARGV[1] -> task ID // ARGV[2] -> updated base.TaskMessage value // ARGV[3] -> retry_at UNIX timestamp // ARGV[4] -> stats expiration timestamp // ARGV[5] -> is_failure (bool) // ARGV[6] -> max int64 value var retryCmd = redis.NewScript(` if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "retry") if tonumber(ARGV[5]) == 1 then local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[4]) end local m = redis.call("INCR", KEYS[6]) if tonumber(m) == 1 then redis.call("EXPIREAT", KEYS[6], ARGV[4]) end local total = redis.call("GET", KEYS[7]) if tonumber(total) == tonumber(ARGV[6]) then redis.call("SET", KEYS[7], 1) redis.call("SET", KEYS[8], 1) else redis.call("INCR", KEYS[7]) redis.call("INCR", KEYS[8]) end end return redis.status_reply("OK")`) // Retry moves the task from active to retry queue. // It also annotates the message with the given error message and // if isFailure is true increments the retried counter. func ( *RDB) ( context.Context, *base.TaskMessage, time.Time, string, bool) error { var errors.Op = "rdb.Retry" := .clock.Now() := * if { .Retried++ } .ErrorMsg = .LastFailedAt = .Unix() , := base.EncodeMessage(&) if != nil { return errors.E(, errors.Internal, fmt.Sprintf("cannot encode message: %v", )) } := .Add(statsTTL) := []string{ base.TaskKey(.Queue, .ID), base.ActiveKey(.Queue), base.LeaseKey(.Queue), base.RetryKey(.Queue), base.ProcessedKey(.Queue, ), base.FailedKey(.Queue, ), base.ProcessedTotalKey(.Queue), base.FailedTotalKey(.Queue), } := []interface{}{ .ID, , .Unix(), .Unix(), , int64(math.MaxInt64), } return .runScript(, , retryCmd, , ...) } const ( maxArchiveSize = 10000 // maximum number of tasks in archive archivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently ) // KEYS[1] -> asynq:{<qname>}:t:<task_id> // KEYS[2] -> asynq:{<qname>}:active // KEYS[3] -> asynq:{<qname>}:lease // KEYS[4] -> asynq:{<qname>}:archived // KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd> // KEYS[7] -> asynq:{<qname>}:processed // KEYS[8] -> asynq:{<qname>}:failed // ------- // ARGV[1] -> task ID // ARGV[2] -> updated base.TaskMessage value // ARGV[3] -> died_at UNIX timestamp // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) // ARGV[5] -> max number of tasks in archive (e.g., 100) // ARGV[6] -> stats expiration timestamp // ARGV[7] -> max int64 value var archiveCmd = redis.NewScript(` if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4]) redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5]) redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "archived") local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[6]) end local m = redis.call("INCR", KEYS[6]) if tonumber(m) == 1 then redis.call("EXPIREAT", KEYS[6], ARGV[6]) end local total = redis.call("GET", KEYS[7]) if tonumber(total) == tonumber(ARGV[7]) then redis.call("SET", KEYS[7], 1) redis.call("SET", KEYS[8], 1) else redis.call("INCR", KEYS[7]) redis.call("INCR", KEYS[8]) end return redis.status_reply("OK")`) // Archive sends the given task to archive, attaching the error message to the task. // It also trims the archive by timestamp and set size. func ( *RDB) ( context.Context, *base.TaskMessage, string) error { var errors.Op = "rdb.Archive" := .clock.Now() := * .ErrorMsg = .LastFailedAt = .Unix() , := base.EncodeMessage(&) if != nil { return errors.E(, errors.Internal, fmt.Sprintf("cannot encode message: %v", )) } := .AddDate(0, 0, -archivedExpirationInDays) := .Add(statsTTL) := []string{ base.TaskKey(.Queue, .ID), base.ActiveKey(.Queue), base.LeaseKey(.Queue), base.ArchivedKey(.Queue), base.ProcessedKey(.Queue, ), base.FailedKey(.Queue, ), base.ProcessedTotalKey(.Queue), base.FailedTotalKey(.Queue), } := []interface{}{ .ID, , .Unix(), .Unix(), maxArchiveSize, .Unix(), int64(math.MaxInt64), } return .runScript(, , archiveCmd, , ...) } // ForwardIfReady checks scheduled and retry sets of the given queues // and move any tasks that are ready to be processed to the pending set. func ( *RDB) ( ...string) error { var errors.Op = "rdb.ForwardIfReady" for , := range { if := .forwardAll(); != nil { return errors.E(, errors.CanonicalCode(), ) } } return nil } // KEYS[1] -> source queue (e.g. asynq:{<qname>:scheduled or asynq:{<qname>}:retry}) // KEYS[2] -> asynq:{<qname>}:pending // ARGV[1] -> current unix time in seconds // ARGV[2] -> task key prefix // ARGV[3] -> current unix time in nsec // ARGV[4] -> group key prefix // Note: Script moves tasks up to 100 at a time to keep the runtime of script short. var forwardCmd = redis.NewScript(` local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) for _, id in ipairs(ids) do local taskKey = ARGV[2] .. id local group = redis.call("HGET", taskKey, "group") if group and group ~= '' then redis.call("ZADD", ARGV[4] .. group, ARGV[1], id) redis.call("ZREM", KEYS[1], id) redis.call("HSET", taskKey, "state", "aggregating") else redis.call("LPUSH", KEYS[2], id) redis.call("ZREM", KEYS[1], id) redis.call("HSET", taskKey, "state", "pending", "pending_since", ARGV[3]) end end return table.getn(ids)`) // forward moves tasks with a score less than the current unix time from the delayed (i.e. scheduled | retry) zset // to the pending list or group set. // It returns the number of tasks moved. func ( *RDB) (, , , string) (int, error) { := .clock.Now() := []string{, } := []interface{}{ .Unix(), , .UnixNano(), , } , := forwardCmd.Run(context.Background(), .client, , ...).Result() if != nil { return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", )) } , := cast.ToIntE() if != nil { return 0, errors.E(errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", )) } return , nil } // forwardAll checks for tasks in scheduled/retry state that are ready to be run, and updates // their state to "pending" or "aggregating". func ( *RDB) ( string) ( error) { := []string{base.ScheduledKey(), base.RetryKey()} := base.PendingKey() := base.TaskKeyPrefix() := base.GroupKeyPrefix() for , := range { := 1 for != 0 { , = .forward(, , , ) if != nil { return } } } return nil } // ListGroups returns a list of all known groups in the given queue. func ( *RDB) ( string) ([]string, error) { var errors.Op = "RDB.ListGroups" , := .client.SMembers(context.Background(), base.AllGroups()).Result() if != nil { return nil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "smembers", Err: }) } return , nil } // aggregationCheckCmd checks the given group for whether to create an aggregation set. // An aggregation set is created if one of the aggregation criteria is met: // 1) group has reached or exceeded its max size // 2) group's oldest task has reached or exceeded its max delay // 3) group's latest task has reached or exceeded its grace period // if aggreation criteria is met, the command moves those tasks from the group // and put them in an aggregation set. Additionally, if the creation of aggregation set // empties the group, it will clear the group name from the all groups set. // // KEYS[1] -> asynq:{<qname>}:g:<gname> // KEYS[2] -> asynq:{<qname>}:g:<gname>:<aggregation_set_id> // KEYS[3] -> asynq:{<qname>}:aggregation_sets // KEYS[4] -> asynq:{<qname>}:groups // ------- // ARGV[1] -> max group size // ARGV[2] -> max group delay in unix time // ARGV[3] -> start time of the grace period // ARGV[4] -> aggregation set expire time // ARGV[5] -> current time in unix time // ARGV[6] -> group name // // Output: // Returns 0 if no aggregation set was created // Returns 1 if an aggregation set was created // // Time Complexity: // O(log(N) + M) with N being the number tasks in the group zset // and M being the max size. var aggregationCheckCmd = redis.NewScript(` local size = redis.call("ZCARD", KEYS[1]) if size == 0 then return 0 end local maxSize = tonumber(ARGV[1]) if maxSize ~= 0 and size >= maxSize then local res = redis.call("ZRANGE", KEYS[1], 0, maxSize-1, "WITHSCORES") for i=1, table.getn(res)-1, 2 do redis.call("ZADD", KEYS[2], tonumber(res[i+1]), res[i]) end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) if size == maxSize then redis.call("SREM", KEYS[4], ARGV[6]) end return 1 end local maxDelay = tonumber(ARGV[2]) local currentTime = tonumber(ARGV[5]) if maxDelay ~= 0 then local oldestEntry = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES") local oldestEntryScore = tonumber(oldestEntry[2]) local maxDelayTime = currentTime - maxDelay if oldestEntryScore <= maxDelayTime then local res = redis.call("ZRANGE", KEYS[1], 0, maxSize-1, "WITHSCORES") for i=1, table.getn(res)-1, 2 do redis.call("ZADD", KEYS[2], tonumber(res[i+1]), res[i]) end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) if size <= maxSize or maxSize == 0 then redis.call("SREM", KEYS[4], ARGV[6]) end return 1 end end local latestEntry = redis.call("ZREVRANGE", KEYS[1], 0, 0, "WITHSCORES") local latestEntryScore = tonumber(latestEntry[2]) local gracePeriodStartTime = currentTime - tonumber(ARGV[3]) if latestEntryScore <= gracePeriodStartTime then local res = redis.call("ZRANGE", KEYS[1], 0, maxSize-1, "WITHSCORES") for i=1, table.getn(res)-1, 2 do redis.call("ZADD", KEYS[2], tonumber(res[i+1]), res[i]) end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) if size <= maxSize or maxSize == 0 then redis.call("SREM", KEYS[4], ARGV[6]) end return 1 end return 0 `) // Task aggregation should finish within this timeout. // Otherwise an aggregation set should be reclaimed by the recoverer. const aggregationTimeout = 2 * time.Minute // AggregationCheck checks the group identified by the given queue and group name to see if the tasks in the // group are ready to be aggregated. If so, it moves the tasks to be aggregated to a aggregation set and returns // the set ID. If not, it returns an empty string for the set ID. // The time for gracePeriod and maxDelay is computed relative to the time t. // // Note: It assumes that this function is called at frequency less than or equal to the gracePeriod. In other words, // the function only checks the most recently added task against the given gracePeriod. func ( *RDB) (, string, time.Time, , time.Duration, int) (string, error) { var errors.Op = "RDB.AggregationCheck" := uuid.NewString() := .clock.Now().Add(aggregationTimeout) := []string{ base.GroupKey(, ), base.AggregationSetKey(, , ), base.AllAggregationSets(), base.AllGroups(), } := []interface{}{ , int64(.Seconds()), int64(.Seconds()), .Unix(), .Unix(), , } , := .runScriptWithErrorCode(context.Background(), , aggregationCheckCmd, , ...) if != nil { return "", } switch { case 0: return "", nil case 1: return , nil default: return "", errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from lua script: %d", )) } } // KEYS[1] -> asynq:{<qname>}:g:<gname>:<aggregation_set_id> // ------ // ARGV[1] -> task key prefix // // Output: // Array of encoded task messages // // Time Complexity: // O(N) with N being the number of tasks in the aggregation set. var readAggregationSetCmd = redis.NewScript(` local msgs = {} local ids = redis.call("ZRANGE", KEYS[1], 0, -1) for _, id in ipairs(ids) do local key = ARGV[1] .. id table.insert(msgs, redis.call("HGET", key, "msg")) end return msgs `) // ReadAggregationSet retrieves members of an aggregation set and returns a list of tasks in the set and // the deadline for aggregating those tasks. func ( *RDB) (, , string) ([]*base.TaskMessage, time.Time, error) { var errors.Op = "RDB.ReadAggregationSet" := context.Background() := base.AggregationSetKey(, , ) , := readAggregationSetCmd.Run(, .client, []string{}, base.TaskKeyPrefix()).Result() if != nil { return nil, time.Time{}, errors.E(, errors.Unknown, fmt.Sprintf("redis eval error: %v", )) } , := cast.ToStringSliceE() if != nil { return nil, time.Time{}, errors.E(, errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", )) } var []*base.TaskMessage for , := range { , := base.DecodeMessage([]byte()) if != nil { return nil, time.Time{}, errors.E(, errors.Internal, fmt.Sprintf("cannot decode message: %v", )) } = append(, ) } , := .client.ZScore(, base.AllAggregationSets(), ).Result() if != nil { return nil, time.Time{}, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "zscore", Err: }) } return , time.Unix(int64(), 0), nil } // KEYS[1] -> asynq:{<qname>}:g:<gname>:<aggregation_set_id> // KEYS[2] -> asynq:{<qname>}:aggregation_sets // ------- // ARGV[1] -> task key prefix // // Output: // Redis status reply // // Time Complexity: // max(O(N), O(log(M))) with N being the number of tasks in the aggregation set // and M being the number of elements in the all-aggregation-sets list. var deleteAggregationSetCmd = redis.NewScript(` local ids = redis.call("ZRANGE", KEYS[1], 0, -1) for _, id in ipairs(ids) do redis.call("DEL", ARGV[1] .. id) end redis.call("DEL", KEYS[1]) redis.call("ZREM", KEYS[2], KEYS[1]) return redis.status_reply("OK") `) // DeleteAggregationSet deletes the aggregation set and its members identified by the parameters. func ( *RDB) ( context.Context, , , string) error { var errors.Op = "RDB.DeleteAggregationSet" := []string{ base.AggregationSetKey(, , ), base.AllAggregationSets(), } return .runScript(, , deleteAggregationSetCmd, , base.TaskKeyPrefix()) } // KEYS[1] -> asynq:{<qname>}:aggregation_sets // ------- // ARGV[1] -> current time in unix time var reclaimStateAggregationSetsCmd = redis.NewScript(` local staleSetKeys = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) for _, key in ipairs(staleSetKeys) do local idx = string.find(key, ":[^:]*$") local groupKey = string.sub(key, 1, idx-1) local res = redis.call("ZRANGE", key, 0, -1, "WITHSCORES") for i=1, table.getn(res)-1, 2 do redis.call("ZADD", groupKey, tonumber(res[i+1]), res[i]) end redis.call("DEL", key) end redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) return redis.status_reply("OK") `) // ReclaimStateAggregationSets checks for any stale aggregation sets in the given queue, and // reclaim tasks in the stale aggregation set by putting them back in the group. func ( *RDB) ( string) error { var errors.Op = "RDB.ReclaimStaleAggregationSets" return .runScript(context.Background(), , reclaimStateAggregationSetsCmd, []string{base.AllAggregationSets()}, .clock.Now().Unix()) } // KEYS[1] -> asynq:{<qname>}:completed // ARGV[1] -> current time in unix time // ARGV[2] -> task key prefix // ARGV[3] -> batch size (i.e. maximum number of tasks to delete) // // Returns the number of tasks deleted. var deleteExpiredCompletedTasksCmd = redis.NewScript(` local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, tonumber(ARGV[3])) for _, id in ipairs(ids) do redis.call("DEL", ARGV[2] .. id) redis.call("ZREM", KEYS[1], id) end return table.getn(ids)`) // DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set, // and delete all expired tasks. func ( *RDB) ( string) error { // Note: Do this operation in fix batches to prevent long running script. const = 100 for { , := .deleteExpiredCompletedTasks(, ) if != nil { return } if == 0 { return nil } } } // deleteExpiredCompletedTasks runs the lua script to delete expired deleted task with the specified // batch size. It reports the number of tasks deleted. func ( *RDB) ( string, int) (int64, error) { var errors.Op = "rdb.DeleteExpiredCompletedTasks" := []string{base.CompletedKey()} := []interface{}{ .clock.Now().Unix(), base.TaskKeyPrefix(), , } , := deleteExpiredCompletedTasksCmd.Run(context.Background(), .client, , ...).Result() if != nil { return 0, errors.E(, errors.Internal, fmt.Sprintf("redis eval error: %v", )) } , := .(int64) if ! { return 0, errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", )) } return , nil } // KEYS[1] -> asynq:{<qname>}:lease // ARGV[1] -> cutoff in unix time // ARGV[2] -> task key prefix var listLeaseExpiredCmd = redis.NewScript(` local res = {} local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) for _, id in ipairs(ids) do local key = ARGV[2] .. id table.insert(res, redis.call("HGET", key, "msg")) end return res `) // ListLeaseExpired returns a list of task messages with an expired lease from the given queues. func ( *RDB) ( time.Time, ...string) ([]*base.TaskMessage, error) { var errors.Op = "rdb.ListLeaseExpired" var []*base.TaskMessage for , := range { , := listLeaseExpiredCmd.Run(context.Background(), .client, []string{base.LeaseKey()}, .Unix(), base.TaskKeyPrefix()).Result() if != nil { return nil, errors.E(, errors.Internal, fmt.Sprintf("redis eval error: %v", )) } , := cast.ToStringSliceE() if != nil { return nil, errors.E(, errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", )) } for , := range { , := base.DecodeMessage([]byte()) if != nil { return nil, errors.E(, errors.Internal, fmt.Sprintf("cannot decode message: %v", )) } = append(, ) } } return , nil } // ExtendLease extends the lease for the given tasks by LeaseDuration (30s). // It returns a new expiration time if the operation was successful. func ( *RDB) ( string, ...string) ( time.Time, error) { := .clock.Now().Add(LeaseDuration) var []redis.Z for , := range { = append(, redis.Z{Member: , Score: float64(.Unix())}) } // Use XX option to only update elements that already exist; Don't add new elements // TODO: Consider adding GT option to ensure we only "extend" the lease. Ceveat is that GT is supported from redis v6.2.0 or above. = .client.ZAddXX(context.Background(), base.LeaseKey(), ...).Err() if != nil { return time.Time{}, } return , nil } // KEYS[1] -> asynq:servers:{<host:pid:sid>} // KEYS[2] -> asynq:workers:{<host:pid:sid>} // ARGV[1] -> TTL in seconds // ARGV[2] -> server info // ARGV[3:] -> alternate key-value pair of (worker id, worker data) // Note: Add key to ZSET with expiration time as score. // ref: https://github.com/antirez/redis/issues/135#issuecomment-2361996 var writeServerStateCmd = redis.NewScript(` redis.call("SETEX", KEYS[1], ARGV[1], ARGV[2]) redis.call("DEL", KEYS[2]) for i = 3, table.getn(ARGV)-1, 2 do redis.call("HSET", KEYS[2], ARGV[i], ARGV[i+1]) end redis.call("EXPIRE", KEYS[2], ARGV[1]) return redis.status_reply("OK")`) // WriteServerState writes server state data to redis with expiration set to the value ttl. func ( *RDB) ( *base.ServerInfo, []*base.WorkerInfo, time.Duration) error { var errors.Op = "rdb.WriteServerState" := context.Background() , := base.EncodeServerInfo() if != nil { return errors.E(, errors.Internal, fmt.Sprintf("cannot encode server info: %v", )) } := .clock.Now().Add().UTC() := []interface{}{.Seconds(), } // args to the lua script for , := range { , := base.EncodeWorkerInfo() if != nil { continue // skip bad data } = append(, .ID, ) } := base.ServerInfoKey(.Host, .PID, .ServerID) := base.WorkersKey(.Host, .PID, .ServerID) if := .client.ZAdd(, base.AllServers, redis.Z{Score: float64(.Unix()), Member: }).Err(); != nil { return errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: }) } if := .client.ZAdd(, base.AllWorkers, redis.Z{Score: float64(.Unix()), Member: }).Err(); != nil { return errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: }) } return .runScript(, , writeServerStateCmd, []string{, }, ...) } // KEYS[1] -> asynq:servers:{<host:pid:sid>} // KEYS[2] -> asynq:workers:{<host:pid:sid>} var clearServerStateCmd = redis.NewScript(` redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[2]) return redis.status_reply("OK")`) // ClearServerState deletes server state data from redis. func ( *RDB) ( string, int, string) error { var errors.Op = "rdb.ClearServerState" := context.Background() := base.ServerInfoKey(, , ) := base.WorkersKey(, , ) if := .client.ZRem(, base.AllServers, ).Err(); != nil { return errors.E(, errors.Internal, &errors.RedisCommandError{Command: "zrem", Err: }) } if := .client.ZRem(, base.AllWorkers, ).Err(); != nil { return errors.E(, errors.Internal, &errors.RedisCommandError{Command: "zrem", Err: }) } return .runScript(, , clearServerStateCmd, []string{, }) } // KEYS[1] -> asynq:schedulers:{<schedulerID>} // ARGV[1] -> TTL in seconds // ARGV[2:] -> schedler entries var writeSchedulerEntriesCmd = redis.NewScript(` redis.call("DEL", KEYS[1]) for i = 2, #ARGV do redis.call("LPUSH", KEYS[1], ARGV[i]) end redis.call("EXPIRE", KEYS[1], ARGV[1]) return redis.status_reply("OK")`) // WriteSchedulerEntries writes scheduler entries data to redis with expiration set to the value ttl. func ( *RDB) ( string, []*base.SchedulerEntry, time.Duration) error { var errors.Op = "rdb.WriteSchedulerEntries" := context.Background() := []interface{}{.Seconds()} for , := range { , := base.EncodeSchedulerEntry() if != nil { continue // skip bad data } = append(, ) } := .clock.Now().Add().UTC() := base.SchedulerEntriesKey() := .client.ZAdd(, base.AllSchedulers, redis.Z{Score: float64(.Unix()), Member: }).Err() if != nil { return errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: }) } return .runScript(, , writeSchedulerEntriesCmd, []string{}, ...) } // ClearSchedulerEntries deletes scheduler entries data from redis. func ( *RDB) ( string) error { var errors.Op = "rdb.ClearSchedulerEntries" := context.Background() := base.SchedulerEntriesKey() if := .client.ZRem(, base.AllSchedulers, ).Err(); != nil { return errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "zrem", Err: }) } if := .client.Del(, ).Err(); != nil { return errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "del", Err: }) } return nil } // CancelationPubSub returns a pubsub for cancelation messages. func ( *RDB) () (*redis.PubSub, error) { var errors.Op = "rdb.CancelationPubSub" := context.Background() := .client.Subscribe(, base.CancelChannel) , := .Receive() if != nil { return nil, errors.E(, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", )) } return , nil } // PublishCancelation publish cancelation message to all subscribers. // The message is the ID for the task to be canceled. func ( *RDB) ( string) error { var errors.Op = "rdb.PublishCancelation" := context.Background() if := .client.Publish(, base.CancelChannel, ).Err(); != nil { return errors.E(, errors.Unknown, fmt.Sprintf("redis pubsub publish error: %v", )) } return nil } // KEYS[1] -> asynq:scheduler_history:<entryID> // ARGV[1] -> enqueued_at timestamp // ARGV[2] -> serialized SchedulerEnqueueEvent data // ARGV[3] -> max number of events to be persisted var recordSchedulerEnqueueEventCmd = redis.NewScript(` redis.call("ZREMRANGEBYRANK", KEYS[1], 0, -ARGV[3]) redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2]) return redis.status_reply("OK")`) // Maximum number of enqueue events to store per entry. const maxEvents = 1000 // RecordSchedulerEnqueueEvent records the time when the given task was enqueued. func ( *RDB) ( string, *base.SchedulerEnqueueEvent) error { var errors.Op = "rdb.RecordSchedulerEnqueueEvent" := context.Background() , := base.EncodeSchedulerEnqueueEvent() if != nil { return errors.E(, errors.Internal, fmt.Sprintf("cannot encode scheduler enqueue event: %v", )) } := []string{ base.SchedulerHistoryKey(), } := []interface{}{ .EnqueuedAt.Unix(), , maxEvents, } return .runScript(, , recordSchedulerEnqueueEventCmd, , ...) } // ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry. func ( *RDB) ( string) error { var errors.Op = "rdb.ClearSchedulerHistory" := context.Background() := base.SchedulerHistoryKey() if := .client.Del(, ).Err(); != nil { return errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "del", Err: }) } return nil } // WriteResult writes the given result data for the specified task. func ( *RDB) (, string, []byte) (int, error) { var errors.Op = "rdb.WriteResult" := context.Background() := base.TaskKey(, ) if := .client.HSet(, , "result", ).Err(); != nil { return 0, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "hset", Err: }) } return len(), nil }