// Copyright 2020 Kentaro Hibino. All rights reserved.// Use of this source code is governed by a MIT license// that can be found in the LICENSE file.package rdbimport ()// AllQueues returns a list of all queue names.func ( *RDB) () ([]string, error) {return .client.SMembers(context.Background(), base.AllQueues).Result()}// Stats represents a state of queues at a certain time.typeStatsstruct {// Name of the queue (e.g. "default", "critical"). Queue string// MemoryUsage is the total number of bytes the queue and its tasks require // to be stored in redis. It is an approximate memory usage value in bytes // since the value is computed by sampling. MemoryUsage int64// Paused indicates whether the queue is paused. // If true, tasks in the queue should not be processed. Paused bool// Size is the total number of tasks in the queue. Size int// Groups is the total number of groups in the queue. Groups int// Number of tasks in each state. Pending int Active int Scheduled int Retry int Archived int Completed int Aggregating int// Number of tasks processed within the current date. // The number includes both succeeded and failed tasks. Processed int// Number of tasks failed within the current date. Failed int// Total number of tasks processed (both succeeded and failed) from this queue. ProcessedTotal int// Total number of tasks failed. FailedTotal int// Latency of the queue, measured by the oldest pending task in the queue. Latency time.Duration// Time this stats was taken. Timestamp time.Time}// DailyStats holds aggregate data for a given day.typeDailyStatsstruct {// Name of the queue (e.g. "default", "critical"). Queue string// Total number of tasks processed during the given day. // The number includes both succeeded and failed tasks. Processed int// Total number of tasks failed during the given day. Failed int// Date this stats was taken. Time time.Time}// KEYS[1] -> asynq:<qname>:pending// KEYS[2] -> asynq:<qname>:active// KEYS[3] -> asynq:<qname>:scheduled// KEYS[4] -> asynq:<qname>:retry// KEYS[5] -> asynq:<qname>:archived// KEYS[6] -> asynq:<qname>:completed// KEYS[7] -> asynq:<qname>:processed:<yyyy-mm-dd>// KEYS[8] -> asynq:<qname>:failed:<yyyy-mm-dd>// KEYS[9] -> asynq:<qname>:processed// KEYS[10] -> asynq:<qname>:failed// KEYS[11] -> asynq:<qname>:paused// KEYS[12] -> asynq:<qname>:groups// --------// ARGV[1] -> task key prefix// ARGV[2] -> group key prefixvar currentStatsCmd = redis.NewScript(`local res = {}local pendingTaskCount = redis.call("LLEN", KEYS[1])table.insert(res, KEYS[1])table.insert(res, pendingTaskCount)table.insert(res, KEYS[2])table.insert(res, redis.call("LLEN", KEYS[2]))table.insert(res, KEYS[3])table.insert(res, redis.call("ZCARD", KEYS[3]))table.insert(res, KEYS[4])table.insert(res, redis.call("ZCARD", KEYS[4]))table.insert(res, KEYS[5])table.insert(res, redis.call("ZCARD", KEYS[5]))table.insert(res, KEYS[6])table.insert(res, redis.call("ZCARD", KEYS[6]))for i=7,10 do local count = 0 local n = redis.call("GET", KEYS[i]) if n then count = tonumber(n) end table.insert(res, KEYS[i]) table.insert(res, count)endtable.insert(res, KEYS[11])table.insert(res, redis.call("EXISTS", KEYS[11]))table.insert(res, "oldest_pending_since")if pendingTaskCount > 0 then local id = redis.call("LRANGE", KEYS[1], -1, -1)[1] table.insert(res, redis.call("HGET", ARGV[1] .. id, "pending_since"))else table.insert(res, 0)endlocal group_names = redis.call("SMEMBERS", KEYS[12])table.insert(res, "group_size")table.insert(res, table.getn(group_names))local aggregating_count = 0for _, gname in ipairs(group_names) do aggregating_count = aggregating_count + redis.call("ZCARD", ARGV[2] .. gname)endtable.insert(res, "aggregating_count")table.insert(res, aggregating_count)return res`)// CurrentStats returns a current state of the queues.func ( *RDB) ( string) (*Stats, error) {varerrors.Op = "rdb.CurrentStats" , := .queueExists()if != nil {returnnil, errors.E(, errors.Unknown, ) }if ! {returnnil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: }) } := .clock.Now() := []string{base.PendingKey(),base.ActiveKey(),base.ScheduledKey(),base.RetryKey(),base.ArchivedKey(),base.CompletedKey(),base.ProcessedKey(, ),base.FailedKey(, ),base.ProcessedTotalKey(),base.FailedTotalKey(),base.PausedKey(),base.AllGroups(), } := []interface{}{base.TaskKeyPrefix(),base.GroupKeyPrefix(), } , := currentStatsCmd.Run(context.Background(), .client, , ...).Result()if != nil {returnnil, errors.E(, errors.Unknown, ) } , := cast.ToSliceE()if != nil {returnnil, errors.E(, errors.Internal, "cast error: unexpected return value from Lua script") } := &Stats{Queue: ,Timestamp: , } := 0for := 0; < len(); += 2 { := cast.ToString([]) := cast.ToInt([+1])switch {casebase.PendingKey(): .Pending = += casebase.ActiveKey(): .Active = += casebase.ScheduledKey(): .Scheduled = += casebase.RetryKey(): .Retry = += casebase.ArchivedKey(): .Archived = += casebase.CompletedKey(): .Completed = += casebase.ProcessedKey(, ): .Processed = casebase.FailedKey(, ): .Failed = casebase.ProcessedTotalKey(): .ProcessedTotal = casebase.FailedTotalKey(): .FailedTotal = casebase.PausedKey():if == 0 { .Paused = false } else { .Paused = true }case"oldest_pending_since":if == 0 { .Latency = 0 } else { .Latency = .clock.Now().Sub(time.Unix(0, int64())) }case"group_size": .Groups = case"aggregating_count": .Aggregating = += } } .Size = , := .memoryUsage()if != nil {returnnil, errors.E(, errors.CanonicalCode(), ) } .MemoryUsage = return , nil}// Computes memory usage for the given queue by sampling tasks// from each redis list/zset. Returns approximate memory usage value// in bytes.//// KEYS[1] -> asynq:{qname}:active// KEYS[2] -> asynq:{qname}:pending// KEYS[3] -> asynq:{qname}:scheduled// KEYS[4] -> asynq:{qname}:retry// KEYS[5] -> asynq:{qname}:archived// KEYS[6] -> asynq:{qname}:completed// KEYS[7] -> asynq:{qname}:groups// -------// ARGV[1] -> asynq:{qname}:t: (task key prefix)// ARGV[2] -> task sample size per redis list/zset (e.g 20)// ARGV[3] -> group sample size// ARGV[4] -> asynq:{qname}:g: (group key prefix)var memoryUsageCmd = redis.NewScript(`local sample_size = tonumber(ARGV[2])if sample_size <= 0 then return redis.error_reply("sample size must be a positive number")endlocal memusg = 0for i=1,2 do local ids = redis.call("LRANGE", KEYS[i], 0, sample_size - 1) local sample_total = 0 if (table.getn(ids) > 0) then for _, id in ipairs(ids) do local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id) sample_total = sample_total + bytes end local n = redis.call("LLEN", KEYS[i]) local avg = sample_total / table.getn(ids) memusg = memusg + (avg * n) end local m = redis.call("MEMORY", "USAGE", KEYS[i]) if (m) then memusg = memusg + m endendfor i=3,6 do local ids = redis.call("ZRANGE", KEYS[i], 0, sample_size - 1) local sample_total = 0 if (table.getn(ids) > 0) then for _, id in ipairs(ids) do local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id) sample_total = sample_total + bytes end local n = redis.call("ZCARD", KEYS[i]) local avg = sample_total / table.getn(ids) memusg = memusg + (avg * n) end local m = redis.call("MEMORY", "USAGE", KEYS[i]) if (m) then memusg = memusg + m endendlocal groups = redis.call("SMEMBERS", KEYS[7])if table.getn(groups) > 0 then local agg_task_count = 0 local agg_task_sample_total = 0 local agg_task_sample_size = 0 for i, gname in ipairs(groups) do local group_key = ARGV[4] .. gname agg_task_count = agg_task_count + redis.call("ZCARD", group_key) if i <= tonumber(ARGV[3]) then local ids = redis.call("ZRANGE", group_key, 0, sample_size - 1) for _, id in ipairs(ids) do local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id) agg_task_sample_total = agg_task_sample_total + bytes agg_task_sample_size = agg_task_sample_size + 1 end end end local avg = agg_task_sample_total / agg_task_sample_size memusg = memusg + (avg * agg_task_count)endreturn memusg`)func ( *RDB) ( string) (int64, error) {varerrors.Op = "rdb.memoryUsage"const ( = 20 = 5 ) := []string{base.ActiveKey(),base.PendingKey(),base.ScheduledKey(),base.RetryKey(),base.ArchivedKey(),base.CompletedKey(),base.AllGroups(), } := []interface{}{base.TaskKeyPrefix(), , ,base.GroupKeyPrefix(), } , := memoryUsageCmd.Run(context.Background(), .client, , ...).Result()if != nil {return0, errors.E(, errors.Unknown, fmt.Sprintf("redis eval error: %v", )) } , := cast.ToInt64E()if != nil {return0, errors.E(, errors.Internal, fmt.Sprintf("could not cast script return value to int64")) }return , nil}var historicalStatsCmd = redis.NewScript(`local res = {}for _, key in ipairs(KEYS) do local n = redis.call("GET", key) if not n then n = 0 end table.insert(res, tonumber(n))endreturn res`)// HistoricalStats returns a list of stats from the last n days for the given queue.func ( *RDB) ( string, int) ([]*DailyStats, error) {varerrors.Op = "rdb.HistoricalStats"if < 1 {returnnil, errors.E(, errors.FailedPrecondition, "the number of days must be positive") } , := .queueExists()if != nil {returnnil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: }) }if ! {returnnil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: }) }const = 24 * time.Hour := .clock.Now().UTC()var []time.Timevar []stringfor := 0; < ; ++ { := .Add(-time.Duration() * ) = append(, ) = append(, base.ProcessedKey(, )) = append(, base.FailedKey(, )) } , := historicalStatsCmd.Run(context.Background(), .client, ).Result()if != nil {returnnil, errors.E(, errors.Unknown, fmt.Sprintf("redis eval error: %v", )) } , := cast.ToIntSliceE()if != nil {returnnil, errors.E(, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", )) }var []*DailyStatsfor := 0; < len(); += 2 { = append(, &DailyStats{Queue: ,Processed: [],Failed: [+1],Time: [/2], }) }return , nil}// RedisInfo returns a map of redis info.func ( *RDB) () (map[string]string, error) { , := .client.Info(context.Background()).Result()if != nil {returnnil, }returnparseInfo()}// RedisClusterInfo returns a map of redis cluster info.func ( *RDB) () (map[string]string, error) { , := .client.ClusterInfo(context.Background()).Result()if != nil {returnnil, }returnparseInfo()}func parseInfo( string) (map[string]string, error) { := make(map[string]string) := strings.Split(, "\r\n")for , := range { := strings.Split(, ":")iflen() == 2 { [[0]] = [1] } }return , nil}// TODO: Use generics once available.func reverse( []*base.TaskInfo) {for := len()/2 - 1; >= 0; -- { := len() - 1 - [], [] = [], [] }}// checkQueueExists verifies whether the queue exists.// It returns QueueNotFoundError if queue doesn't exist.func ( *RDB) ( string) error { , := .queueExists()if != nil {returnerrors.E(errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: }) }if ! {returnerrors.E(errors.Internal, &errors.QueueNotFoundError{Queue: }) }returnnil}// Input:// KEYS[1] -> task key (asynq:{<qname>}:t:<taskid>)// ARGV[1] -> task id// ARGV[2] -> current time in Unix time (seconds)// ARGV[3] -> queue key prefix (asynq:{<qname>}:)//// Output:// Tuple of {msg, state, nextProcessAt, result}// msg: encoded task message// state: string describing the state of the task// nextProcessAt: unix time in seconds, zero if not applicable.// result: result data associated with the task//// If the task key doesn't exist, it returns error with a message "NOT FOUND"var getTaskInfoCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[1]) == 0 then return redis.error_reply("NOT FOUND") end local msg, state, result = unpack(redis.call("HMGET", KEYS[1], "msg", "state", "result")) if state == "scheduled" or state == "retry" then return {msg, state, redis.call("ZSCORE", ARGV[3] .. state, ARGV[1]), result} end if state == "pending" then return {msg, state, ARGV[2], result} end return {msg, state, 0, result}`)// GetTaskInfo returns a TaskInfo describing the task from the given queue.func ( *RDB) (, string) (*base.TaskInfo, error) {varerrors.Op = "rdb.GetTaskInfo"if := .checkQueueExists(); != nil {returnnil, errors.E(, errors.CanonicalCode(), ) } := []string{base.TaskKey(, )} := []interface{}{ , .clock.Now().Unix(),base.QueueKeyPrefix(), } , := getTaskInfoCmd.Run(context.Background(), .client, , ...).Result()if != nil {if .Error() == "NOT FOUND" {returnnil, errors.E(, errors.NotFound, &errors.TaskNotFoundError{Queue: , ID: }) }returnnil, errors.E(, errors.Unknown, ) } , := cast.ToSliceE()if != nil {returnnil, errors.E(, errors.Internal, "unexpected value returned from Lua script") }iflen() != 4 {returnnil, errors.E(, errors.Internal, "unepxected number of values returned from Lua script") } , := cast.ToStringE([0])if != nil {returnnil, errors.E(, errors.Internal, "unexpected value returned from Lua script") } , := cast.ToStringE([1])if != nil {returnnil, errors.E(, errors.Internal, "unexpected value returned from Lua script") } , := cast.ToInt64E([2])if != nil {returnnil, errors.E(, errors.Internal, "unexpected value returned from Lua script") } , := cast.ToStringE([3])if != nil {returnnil, errors.E(, errors.Internal, "unexpected value returned from Lua script") } , := base.DecodeMessage([]byte())if != nil {returnnil, errors.E(, errors.Internal, "could not decode task message") } , := base.TaskStateFromString()if != nil {returnnil, errors.E(, errors.CanonicalCode(), ) }vartime.Timeif != 0 { = time.Unix(, 0) }var []byteiflen() > 0 { = []byte() }return &base.TaskInfo{Message: ,State: ,NextProcessAt: ,Result: , }, nil}typeGroupStatstruct {// Name of the group. Group string// Size of the group. Size int}// KEYS[1] -> asynq:{<qname>}:groups// -------// ARGV[1] -> group key prefix//// Output:// list of group name and size (e.g. group1 size1 group2 size2 ...)//// Time Complexity:// O(N) where N being the number of groups in the given queue.var groupStatsCmd = redis.NewScript(`local res = {}local group_names = redis.call("SMEMBERS", KEYS[1])for _, gname in ipairs(group_names) do local size = redis.call("ZCARD", ARGV[1] .. gname) table.insert(res, gname) table.insert(res, size)endreturn res`)func ( *RDB) ( string) ([]*GroupStat, error) {varerrors.Op = "RDB.GroupStats" := []string{base.AllGroups()} := []interface{}{base.GroupKeyPrefix()} , := groupStatsCmd.Run(context.Background(), .client, , ...).Result()if != nil {returnnil, errors.E(, errors.Unknown, ) } , := cast.ToSliceE()if != nil {returnnil, errors.E(, errors.Internal, "cast error: unexpected return value from Lua script") }var []*GroupStatfor := 0; < len(); += 2 { = append(, &GroupStat{Group: [].(string),Size: int([+1].(int64)), }) }return , nil}// Pagination specifies the page size and page number// for the list operation.typePaginationstruct {// Number of items in the page. Size int// Page number starting from zero. Page int}func ( Pagination) () int64 {returnint64(.Size * .Page)}func ( Pagination) () int64 {returnint64(.Size*.Page + .Size - 1)}// ListPending returns pending tasks that are ready to be processed.func ( *RDB) ( string, Pagination) ([]*base.TaskInfo, error) {varerrors.Op = "rdb.ListPending" , := .queueExists()if != nil {returnnil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: }) }if ! {returnnil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: }) } , := .listMessages(, base.TaskStatePending, )if != nil {returnnil, errors.E(, errors.CanonicalCode(), ) }return , nil}// ListActive returns all tasks that are currently being processed for the given queue.func ( *RDB) ( string, Pagination) ([]*base.TaskInfo, error) {varerrors.Op = "rdb.ListActive" , := .queueExists()if != nil {returnnil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: }) }if ! {returnnil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: }) } , := .listMessages(, base.TaskStateActive, )if != nil {returnnil, errors.E(, errors.CanonicalCode(), ) }return , nil}// KEYS[1] -> key for id list (e.g. asynq:{<qname>}:pending)// ARGV[1] -> start offset// ARGV[2] -> stop offset// ARGV[3] -> task key prefixvar listMessagesCmd = redis.NewScript(`local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2])local data = {}for _, id in ipairs(ids) do local key = ARGV[3] .. id local msg, result = unpack(redis.call("HMGET", key, "msg","result")) table.insert(data, msg) table.insert(data, result)endreturn data`)// listMessages returns a list of TaskInfo in Redis list with the given key.func ( *RDB) ( string, base.TaskState, Pagination) ([]*base.TaskInfo, error) {varstringswitch {casebase.TaskStateActive: = base.ActiveKey()casebase.TaskStatePending: = base.PendingKey()default:panic(fmt.Sprintf("unsupported task state: %v", )) }// Note: Because we use LPUSH to redis list, we need to calculate the // correct range and reverse the list to get the tasks with pagination. := -.start() - 1 := -.stop() - 1 , := listMessagesCmd.Run(context.Background(), .client, []string{}, , , base.TaskKeyPrefix()).Result()if != nil {returnnil, errors.E(errors.Unknown, ) } , := cast.ToStringSliceE()if != nil {returnnil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", )) }var []*base.TaskInfofor := 0; < len(); += 2 { , := base.DecodeMessage([]byte([]))if != nil {continue// bad data, ignore and continue }var []byteiflen([+1]) > 0 { = []byte([+1]) }vartime.Timeif == base.TaskStatePending { = .clock.Now() } = append(, &base.TaskInfo{Message: ,State: ,NextProcessAt: ,Result: , }) }reverse()return , nil}// ListScheduled returns all tasks from the given queue that are scheduled// to be processed in the future.func ( *RDB) ( string, Pagination) ([]*base.TaskInfo, error) {varerrors.Op = "rdb.ListScheduled" , := .queueExists()if != nil {returnnil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: }) }if ! {returnnil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: }) } , := .listZSetEntries(, base.TaskStateScheduled, base.ScheduledKey(), )if != nil {returnnil, errors.E(, errors.CanonicalCode(), ) }return , nil}// ListRetry returns all tasks from the given queue that have failed before// and willl be retried in the future.func ( *RDB) ( string, Pagination) ([]*base.TaskInfo, error) {varerrors.Op = "rdb.ListRetry" , := .queueExists()if != nil {returnnil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: }) }if ! {returnnil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: }) } , := .listZSetEntries(, base.TaskStateRetry, base.RetryKey(), )if != nil {returnnil, errors.E(, errors.CanonicalCode(), ) }return , nil}// ListArchived returns all tasks from the given queue that have exhausted its retry limit.func ( *RDB) ( string, Pagination) ([]*base.TaskInfo, error) {varerrors.Op = "rdb.ListArchived" , := .queueExists()if != nil {returnnil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: }) }if ! {returnnil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: }) } , := .listZSetEntries(, base.TaskStateArchived, base.ArchivedKey(), )if != nil {returnnil, errors.E(, errors.CanonicalCode(), ) }return , nil}// ListCompleted returns all tasks from the given queue that have completed successfully.func ( *RDB) ( string, Pagination) ([]*base.TaskInfo, error) {varerrors.Op = "rdb.ListCompleted" , := .queueExists()if != nil {returnnil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: }) }if ! {returnnil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: }) } , := .listZSetEntries(, base.TaskStateCompleted, base.CompletedKey(), )if != nil {returnnil, errors.E(, errors.CanonicalCode(), ) }return , nil}// ListAggregating returns all tasks from the given group.func ( *RDB) (, string, Pagination) ([]*base.TaskInfo, error) {varerrors.Op = "rdb.ListAggregating" , := .queueExists()if != nil {returnnil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: }) }if ! {returnnil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: }) } , := .listZSetEntries(, base.TaskStateAggregating, base.GroupKey(, ), )if != nil {returnnil, errors.E(, errors.CanonicalCode(), ) }return , nil}// Reports whether a queue with the given name exists.func ( *RDB) ( string) (bool, error) {return .client.SIsMember(context.Background(), base.AllQueues, ).Result()}// KEYS[1] -> key for ids set (e.g. asynq:{<qname>}:scheduled)// ARGV[1] -> min// ARGV[2] -> max// ARGV[3] -> task key prefix//// Returns an array populated with// [msg1, score1, result1, msg2, score2, result2, ..., msgN, scoreN, resultN]var listZSetEntriesCmd = redis.NewScript(`local data = {}local id_score_pairs = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2], "WITHSCORES")for i = 1, table.getn(id_score_pairs), 2 do local id = id_score_pairs[i] local score = id_score_pairs[i+1] local key = ARGV[3] .. id local msg, res = unpack(redis.call("HMGET", key, "msg", "result")) table.insert(data, msg) table.insert(data, score) table.insert(data, res)endreturn data`)// listZSetEntries returns a list of message and score pairs in Redis sorted-set// with the given key.func ( *RDB) ( string, base.TaskState, string, Pagination) ([]*base.TaskInfo, error) { , := listZSetEntriesCmd.Run(context.Background(), .client, []string{}, .start(), .stop(), base.TaskKeyPrefix()).Result()if != nil {returnnil, errors.E(errors.Unknown, ) } , := cast.ToSliceE()if != nil {returnnil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", )) }var []*base.TaskInfofor := 0; < len(); += 3 { , := cast.ToStringE([])if != nil {returnnil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", )) } , := cast.ToInt64E([+1])if != nil {returnnil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", )) } , := cast.ToStringE([+2])if != nil {returnnil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", )) } , := base.DecodeMessage([]byte())if != nil {continue// bad data, ignore and continue }vartime.Timeif == base.TaskStateScheduled || == base.TaskStateRetry { = time.Unix(, 0) }var []byteiflen() > 0 { = []byte() } = append(, &base.TaskInfo{Message: ,State: ,NextProcessAt: ,Result: , }) }return , nil}// RunAllScheduledTasks enqueues all scheduled tasks from the given queue// and returns the number of tasks enqueued.// If a queue with the given name doesn't exist, it returns QueueNotFoundError.func ( *RDB) ( string) (int64, error) {varerrors.Op = "rdb.RunAllScheduledTasks" , := .runAll(base.ScheduledKey(), )iferrors.IsQueueNotFound() {return0, errors.E(, errors.NotFound, ) }if != nil {return0, errors.E(, errors.Unknown, ) }return , nil}// RunAllRetryTasks enqueues all retry tasks from the given queue// and returns the number of tasks enqueued.// If a queue with the given name doesn't exist, it returns QueueNotFoundError.func ( *RDB) ( string) (int64, error) {varerrors.Op = "rdb.RunAllRetryTasks" , := .runAll(base.RetryKey(), )iferrors.IsQueueNotFound() {return0, errors.E(, errors.NotFound, ) }if != nil {return0, errors.E(, errors.Unknown, ) }return , nil}// RunAllArchivedTasks enqueues all archived tasks from the given queue// and returns the number of tasks enqueued.// If a queue with the given name doesn't exist, it returns QueueNotFoundError.func ( *RDB) ( string) (int64, error) {varerrors.Op = "rdb.RunAllArchivedTasks" , := .runAll(base.ArchivedKey(), )iferrors.IsQueueNotFound() {return0, errors.E(, errors.NotFound, ) }if != nil {return0, errors.E(, errors.Unknown, ) }return , nil}// runAllAggregatingCmd schedules all tasks in the group to run individually.//// Input:// KEYS[1] -> asynq:{<qname>}:g:<gname>// KEYS[2] -> asynq:{<qname>}:pending// KEYS[3] -> asynq:{<qname>}:groups// -------// ARGV[1] -> task key prefix// ARGV[2] -> group name//// Output:// integer: number of tasks scheduled to runvar runAllAggregatingCmd = redis.NewScript(`local ids = redis.call("ZRANGE", KEYS[1], 0, -1)for _, id in ipairs(ids) do redis.call("LPUSH", KEYS[2], id) redis.call("HSET", ARGV[1] .. id, "state", "pending")endredis.call("DEL", KEYS[1])redis.call("SREM", KEYS[3], ARGV[2])return table.getn(ids)`)// RunAllAggregatingTasks schedules all tasks from the given queue to run// and returns the number of tasks scheduled to run.// If a queue with the given name doesn't exist, it returns QueueNotFoundError.func ( *RDB) (, string) (int64, error) {varerrors.Op = "rdb.RunAllAggregatingTasks"if := .checkQueueExists(); != nil {return0, errors.E(, errors.CanonicalCode(), ) } := []string{base.GroupKey(, ),base.PendingKey(),base.AllGroups(), } := []interface{}{base.TaskKeyPrefix(), , } , := runAllAggregatingCmd.Run(context.Background(), .client, , ...).Result()if != nil {return0, errors.E(, errors.Internal, ) } , := .(int64)if ! {return0, errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from script %v", )) }return , nil}// runTaskCmd is a Lua script that updates the given task to pending state.//// Input:// KEYS[1] -> asynq:{<qname>}:t:<task_id>// KEYS[2] -> asynq:{<qname>}:pending// KEYS[3] -> asynq:{<qname>}:groups// --// ARGV[1] -> task ID// ARGV[2] -> queue key prefix; asynq:{<qname>}:// ARGV[3] -> group key prefix//// Output:// Numeric code indicating the status:// Returns 1 if task is successfully updated.// Returns 0 if task is not found.// Returns -1 if task is in active state.// Returns -2 if task is in pending state.// Returns error reply if unexpected error occurs.var runTaskCmd = redis.NewScript(`if redis.call("EXISTS", KEYS[1]) == 0 then return 0endlocal state, group = unpack(redis.call("HMGET", KEYS[1], "state", "group"))if state == "active" then return -1elseif state == "pending" then return -2elseif state == "aggregating" then local n = redis.call("ZREM", ARGV[3] .. group, ARGV[1]) if n == 0 then return redis.error_reply("internal error: task id not found in zset " .. tostring(ARGV[3] .. group)) end if redis.call("ZCARD", ARGV[3] .. group) == 0 then redis.call("SREM", KEYS[3], group) endelse local n = redis.call("ZREM", ARGV[2] .. state, ARGV[1]) if n == 0 then return redis.error_reply("internal error: task id not found in zset " .. tostring(ARGV[2] .. state)) endendredis.call("LPUSH", KEYS[2], ARGV[1])redis.call("HSET", KEYS[1], "state", "pending")return 1`)// RunTask finds a task that matches the id from the given queue and updates it to pending state.// It returns nil if it successfully updated the task.//// If a queue with the given name doesn't exist, it returns QueueNotFoundError.// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError// If a task is in active or pending state it returns non-nil error with Code FailedPrecondition.func ( *RDB) (, string) error {varerrors.Op = "rdb.RunTask"if := .checkQueueExists(); != nil {returnerrors.E(, errors.CanonicalCode(), ) } := []string{base.TaskKey(, ),base.PendingKey(),base.AllGroups(), } := []interface{}{ ,base.QueueKeyPrefix(),base.GroupKeyPrefix(), } , := runTaskCmd.Run(context.Background(), .client, , ...).Result()if != nil {returnerrors.E(, errors.Unknown, ) } , := .(int64)if ! {returnerrors.E(, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", )) }switch {case1:returnnilcase0:returnerrors.E(, errors.NotFound, &errors.TaskNotFoundError{Queue: , ID: })case -1:returnerrors.E(, errors.FailedPrecondition, "task is already running")case -2:returnerrors.E(, errors.FailedPrecondition, "task is already in pending state")default:returnerrors.E(, errors.Internal, fmt.Sprintf("unexpected return value from Lua script %d", )) }}// runAllCmd is a Lua script that moves all tasks in the given state// (one of: scheduled, retry, archived) to pending state.//// Input:// KEYS[1] -> zset which holds task ids (e.g. asynq:{<qname>}:scheduled)// KEYS[2] -> asynq:{<qname>}:pending// --// ARGV[1] -> task key prefix//// Output:// integer: number of tasks updated to pending state.var runAllCmd = redis.NewScript(`local ids = redis.call("ZRANGE", KEYS[1], 0, -1)for _, id in ipairs(ids) do redis.call("LPUSH", KEYS[2], id) redis.call("HSET", ARGV[1] .. id, "state", "pending")endredis.call("DEL", KEYS[1])return table.getn(ids)`)func ( *RDB) (, string) (int64, error) {if := .checkQueueExists(); != nil {return0, } := []string{ ,base.PendingKey(), } := []interface{}{base.TaskKeyPrefix(), } , := runAllCmd.Run(context.Background(), .client, , ...).Result()if != nil {return0, } , := .(int64)if ! {return0, fmt.Errorf("could not cast %v to int64", ) }if == -1 {return0, &errors.QueueNotFoundError{Queue: } }return , nil}// ArchiveAllRetryTasks archives all retry tasks from the given queue and// returns the number of tasks that were moved.// If a queue with the given name doesn't exist, it returns QueueNotFoundError.func ( *RDB) ( string) (int64, error) {varerrors.Op = "rdb.ArchiveAllRetryTasks" , := .archiveAll(base.RetryKey(), base.ArchivedKey(), )iferrors.IsQueueNotFound() {return0, errors.E(, errors.NotFound, ) }if != nil {return0, errors.E(, errors.Internal, ) }return , nil}// ArchiveAllScheduledTasks archives all scheduled tasks from the given queue and// returns the number of tasks that were moved.// If a queue with the given name doesn't exist, it returns QueueNotFoundError.func ( *RDB) ( string) (int64, error) {varerrors.Op = "rdb.ArchiveAllScheduledTasks" , := .archiveAll(base.ScheduledKey(), base.ArchivedKey(), )iferrors.IsQueueNotFound() {return0, errors.E(, errors.NotFound, ) }if != nil {return0, errors.E(, errors.Internal, ) }return , nil}// archiveAllAggregatingCmd archives all tasks in the given group.//// Input:// KEYS[1] -> asynq:{<qname>}:g:<gname>// KEYS[2] -> asynq:{<qname>}:archived// KEYS[3] -> asynq:{<qname>}:groups// -------// ARGV[1] -> current timestamp// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)// ARGV[3] -> max number of tasks in archive (e.g., 100)// ARGV[4] -> task key prefix (asynq:{<qname>}:t:)// ARGV[5] -> group name//// Output:// integer: Number of tasks archivedvar archiveAllAggregatingCmd = redis.NewScript(`local ids = redis.call("ZRANGE", KEYS[1], 0, -1)for _, id in ipairs(ids) do redis.call("ZADD", KEYS[2], ARGV[1], id) redis.call("HSET", ARGV[4] .. id, "state", "archived")endredis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2])redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3])redis.call("DEL", KEYS[1])redis.call("SREM", KEYS[3], ARGV[5])return table.getn(ids)`)// ArchiveAllAggregatingTasks archives all aggregating tasks from the given group// and returns the number of tasks archived.// If a queue with the given name doesn't exist, it returns QueueNotFoundError.func ( *RDB) (, string) (int64, error) {varerrors.Op = "rdb.ArchiveAllAggregatingTasks"if := .checkQueueExists(); != nil {return0, errors.E(, errors.CanonicalCode(), ) } := []string{base.GroupKey(, ),base.ArchivedKey(),base.AllGroups(), } := .clock.Now() := []interface{}{ .Unix(), .AddDate(0, 0, -archivedExpirationInDays).Unix(),maxArchiveSize,base.TaskKeyPrefix(), , } , := archiveAllAggregatingCmd.Run(context.Background(), .client, , ...).Result()if != nil {return0, errors.E(, errors.Internal, ) } , := .(int64)if ! {return0, errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from script %v", )) }return , nil}// archiveAllPendingCmd is a Lua script that moves all pending tasks from// the given queue to archived state.//// Input:// KEYS[1] -> asynq:{<qname>}:pending// KEYS[2] -> asynq:{<qname>}:archived// --// ARGV[1] -> current timestamp// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)// ARGV[3] -> max number of tasks in archive (e.g., 100)// ARGV[4] -> task key prefix (asynq:{<qname>}:t:)//// Output:// integer: Number of tasks archivedvar archiveAllPendingCmd = redis.NewScript(`local ids = redis.call("LRANGE", KEYS[1], 0, -1)for _, id in ipairs(ids) do redis.call("ZADD", KEYS[2], ARGV[1], id) redis.call("HSET", ARGV[4] .. id, "state", "archived")endredis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2])redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3])redis.call("DEL", KEYS[1])return table.getn(ids)`)// ArchiveAllPendingTasks archives all pending tasks from the given queue and// returns the number of tasks moved.// If a queue with the given name doesn't exist, it returns QueueNotFoundError.func ( *RDB) ( string) (int64, error) {varerrors.Op = "rdb.ArchiveAllPendingTasks"if := .checkQueueExists(); != nil {return0, errors.E(, errors.CanonicalCode(), ) } := []string{base.PendingKey(),base.ArchivedKey(), } := .clock.Now() := []interface{}{ .Unix(), .AddDate(0, 0, -archivedExpirationInDays).Unix(),maxArchiveSize,base.TaskKeyPrefix(), } , := archiveAllPendingCmd.Run(context.Background(), .client, , ...).Result()if != nil {return0, errors.E(, errors.Internal, ) } , := .(int64)if ! {return0, errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from script %v", )) }return , nil}// archiveTaskCmd is a Lua script that archives a task given a task id.//// Input:// KEYS[1] -> task key (asynq:{<qname>}:t:<task_id>)// KEYS[2] -> archived key (asynq:{<qname>}:archived)// KEYS[3] -> all groups key (asynq:{<qname>}:groups)// --// ARGV[1] -> id of the task to archive// ARGV[2] -> current timestamp// ARGV[3] -> cutoff timestamp (e.g., 90 days ago)// ARGV[4] -> max number of tasks in archived state (e.g., 100)// ARGV[5] -> queue key prefix (asynq:{<qname>}:)// ARGV[6] -> group key prefix (asynq:{<qname>}:g:)//// Output:// Numeric code indicating the status:// Returns 1 if task is successfully archived.// Returns 0 if task is not found.// Returns -1 if task is already archived.// Returns -2 if task is in active state.// Returns error reply if unexpected error occurs.var archiveTaskCmd = redis.NewScript(`if redis.call("EXISTS", KEYS[1]) == 0 then return 0endlocal state, group = unpack(redis.call("HMGET", KEYS[1], "state", "group"))if state == "active" then return -2endif state == "archived" then return -1endif state == "pending" then if redis.call("LREM", ARGV[5] .. state, 1, ARGV[1]) == 0 then return redis.error_reply("task id not found in list " .. tostring(ARGV[5] .. state)) endelseif state == "aggregating" then if redis.call("ZREM", ARGV[6] .. group, ARGV[1]) == 0 then return redis.error_reply("task id not found in zset " .. tostring(ARGV[6] .. group)) end if redis.call("ZCARD", ARGV[6] .. group) == 0 then redis.call("SREM", KEYS[3], group) endelse if redis.call("ZREM", ARGV[5] .. state, ARGV[1]) == 0 then return redis.error_reply("task id not found in zset " .. tostring(ARGV[5] .. state)) endendredis.call("ZADD", KEYS[2], ARGV[2], ARGV[1])redis.call("HSET", KEYS[1], "state", "archived")redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[3])redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4])return 1`)// ArchiveTask finds a task that matches the id from the given queue and archives it.// It returns nil if it successfully archived the task.//// If a queue with the given name doesn't exist, it returns QueueNotFoundError.// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError// If a task is already archived, it returns TaskAlreadyArchivedError.// If a task is in active state it returns non-nil error with FailedPrecondition code.func ( *RDB) (, string) error {varerrors.Op = "rdb.ArchiveTask"if := .checkQueueExists(); != nil {returnerrors.E(, errors.CanonicalCode(), ) } := []string{base.TaskKey(, ),base.ArchivedKey(),base.AllGroups(), } := .clock.Now() := []interface{}{ , .Unix(), .AddDate(0, 0, -archivedExpirationInDays).Unix(),maxArchiveSize,base.QueueKeyPrefix(),base.GroupKeyPrefix(), } , := archiveTaskCmd.Run(context.Background(), .client, , ...).Result()if != nil {returnerrors.E(, errors.Unknown, ) } , := .(int64)if ! {returnerrors.E(, errors.Internal, fmt.Sprintf("could not cast the return value %v from archiveTaskCmd to int64.", )) }switch {case1:returnnilcase0:returnerrors.E(, errors.NotFound, &errors.TaskNotFoundError{Queue: , ID: })case -1:returnerrors.E(, errors.FailedPrecondition, &errors.TaskAlreadyArchivedError{Queue: , ID: })case -2:returnerrors.E(, errors.FailedPrecondition, "cannot archive task in active state. use CancelProcessing instead.")case -3:returnerrors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: })default:returnerrors.E(, errors.Internal, fmt.Sprintf("unexpected return value from archiveTaskCmd script: %d", )) }}// archiveAllCmd is a Lua script that archives all tasks in either scheduled// or retry state from the given queue.//// Input:// KEYS[1] -> ZSET to move task from (e.g., asynq:{<qname>}:retry)// KEYS[2] -> asynq:{<qname>}:archived// --// ARGV[1] -> current timestamp// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)// ARGV[3] -> max number of tasks in archive (e.g., 100)// ARGV[4] -> task key prefix (asynq:{<qname>}:t:)//// Output:// integer: number of tasks archivedvar archiveAllCmd = redis.NewScript(`local ids = redis.call("ZRANGE", KEYS[1], 0, -1)for _, id in ipairs(ids) do redis.call("ZADD", KEYS[2], ARGV[1], id) redis.call("HSET", ARGV[4] .. id, "state", "archived")endredis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2])redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3])redis.call("DEL", KEYS[1])return table.getn(ids)`)func ( *RDB) (, , string) (int64, error) {if := .checkQueueExists(); != nil {return0, } := []string{ , , } := .clock.Now() := []interface{}{ .Unix(), .AddDate(0, 0, -archivedExpirationInDays).Unix(),maxArchiveSize,base.TaskKeyPrefix(), , } , := archiveAllCmd.Run(context.Background(), .client, , ...).Result()if != nil {return0, } , := .(int64)if ! {return0, fmt.Errorf("unexpected return value from script: %v", ) }if == -1 {return0, &errors.QueueNotFoundError{Queue: } }return , nil}// Input:// KEYS[1] -> asynq:{<qname>}:t:<task_id>// KEYS[2] -> asynq:{<qname>}:groups// --// ARGV[1] -> task ID// ARGV[2] -> queue key prefix// ARGV[3] -> group key prefix//// Output:// Numeric code indicating the status:// Returns 1 if task is successfully deleted.// Returns 0 if task is not found.// Returns -1 if task is in active state.var deleteTaskCmd = redis.NewScript(`if redis.call("EXISTS", KEYS[1]) == 0 then return 0endlocal state, group = unpack(redis.call("HMGET", KEYS[1], "state", "group"))if state == "active" then return -1endif state == "pending" then if redis.call("LREM", ARGV[2] .. state, 0, ARGV[1]) == 0 then return redis.error_reply("task is not found in list: " .. tostring(ARGV[2] .. state)) endelseif state == "aggregating" then if redis.call("ZREM", ARGV[3] .. group, ARGV[1]) == 0 then return redis.error_reply("task is not found in zset: " .. tostring(ARGV[3] .. group)) end if redis.call("ZCARD", ARGV[3] .. group) == 0 then redis.call("SREM", KEYS[2], group) endelse if redis.call("ZREM", ARGV[2] .. state, ARGV[1]) == 0 then return redis.error_reply("task is not found in zset: " .. tostring(ARGV[2] .. state)) endendlocal unique_key = redis.call("HGET", KEYS[1], "unique_key")if unique_key and unique_key ~= "" and redis.call("GET", unique_key) == ARGV[1] then redis.call("DEL", unique_key)endreturn redis.call("DEL", KEYS[1])`)// DeleteTask finds a task that matches the id from the given queue and deletes it.// It returns nil if it successfully archived the task.//// If a queue with the given name doesn't exist, it returns QueueNotFoundError.// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError// If a task is in active state it returns non-nil error with Code FailedPrecondition.func ( *RDB) (, string) error {varerrors.Op = "rdb.DeleteTask"if := .checkQueueExists(); != nil {returnerrors.E(, errors.CanonicalCode(), ) } := []string{base.TaskKey(, ),base.AllGroups(), } := []interface{}{ ,base.QueueKeyPrefix(),base.GroupKeyPrefix(), } , := deleteTaskCmd.Run(context.Background(), .client, , ...).Result()if != nil {returnerrors.E(, errors.Unknown, ) } , := .(int64)if ! {returnerrors.E(, errors.Internal, fmt.Sprintf("cast error: deleteTaskCmd script returned unexported value %v", )) }switch {case1:returnnilcase0:returnerrors.E(, errors.NotFound, &errors.TaskNotFoundError{Queue: , ID: })case -1:returnerrors.E(, errors.FailedPrecondition, "cannot delete task in active state. use CancelProcessing instead.")default:returnerrors.E(, errors.Internal, fmt.Sprintf("unexpected return value from deleteTaskCmd script: %d", )) }}// DeleteAllArchivedTasks deletes all archived tasks from the given queue// and returns the number of tasks deleted.func ( *RDB) ( string) (int64, error) {varerrors.Op = "rdb.DeleteAllArchivedTasks" , := .deleteAll(base.ArchivedKey(), )iferrors.IsQueueNotFound() {return0, errors.E(, errors.NotFound, ) }if != nil {return0, errors.E(, errors.Unknown, ) }return , nil}// DeleteAllRetryTasks deletes all retry tasks from the given queue// and returns the number of tasks deleted.func ( *RDB) ( string) (int64, error) {varerrors.Op = "rdb.DeleteAllRetryTasks" , := .deleteAll(base.RetryKey(), )iferrors.IsQueueNotFound() {return0, errors.E(, errors.NotFound, ) }if != nil {return0, errors.E(, errors.Unknown, ) }return , nil}// DeleteAllScheduledTasks deletes all scheduled tasks from the given queue// and returns the number of tasks deleted.func ( *RDB) ( string) (int64, error) {varerrors.Op = "rdb.DeleteAllScheduledTasks" , := .deleteAll(base.ScheduledKey(), )iferrors.IsQueueNotFound() {return0, errors.E(, errors.NotFound, ) }if != nil {return0, errors.E(, errors.Unknown, ) }return , nil}// DeleteAllCompletedTasks deletes all completed tasks from the given queue// and returns the number of tasks deleted.func ( *RDB) ( string) (int64, error) {varerrors.Op = "rdb.DeleteAllCompletedTasks" , := .deleteAll(base.CompletedKey(), )iferrors.IsQueueNotFound() {return0, errors.E(, errors.NotFound, ) }if != nil {return0, errors.E(, errors.Unknown, ) }return , nil}// deleteAllCmd deletes tasks from the given zset.//// Input:// KEYS[1] -> zset holding the task ids.// --// ARGV[1] -> task key prefix//// Output:// integer: number of tasks deletedvar deleteAllCmd = redis.NewScript(`local ids = redis.call("ZRANGE", KEYS[1], 0, -1)for _, id in ipairs(ids) do local task_key = ARGV[1] .. id local unique_key = redis.call("HGET", task_key, "unique_key") if unique_key and unique_key ~= "" and redis.call("GET", unique_key) == id then redis.call("DEL", unique_key) end redis.call("DEL", task_key)endredis.call("DEL", KEYS[1])return table.getn(ids)`)func ( *RDB) (, string) (int64, error) {if := .checkQueueExists(); != nil {return0, } := []interface{}{base.TaskKeyPrefix(), , } , := deleteAllCmd.Run(context.Background(), .client, []string{}, ...).Result()if != nil {return0, } , := .(int64)if ! {return0, fmt.Errorf("unexpected return value from Lua script: %v", ) }return , nil}// deleteAllAggregatingCmd deletes all tasks from the given group.//// Input:// KEYS[1] -> asynq:{<qname>}:g:<gname>// KEYS[2] -> asynq:{<qname>}:groups// -------// ARGV[1] -> task key prefix// ARGV[2] -> group namevar deleteAllAggregatingCmd = redis.NewScript(`local ids = redis.call("ZRANGE", KEYS[1], 0, -1)for _, id in ipairs(ids) do redis.call("DEL", ARGV[1] .. id)endredis.call("SREM", KEYS[2], ARGV[2])redis.call("DEL", KEYS[1])return table.getn(ids)`)// DeleteAllAggregatingTasks deletes all aggregating tasks from the given group// and returns the number of tasks deleted.func ( *RDB) (, string) (int64, error) {varerrors.Op = "rdb.DeleteAllAggregatingTasks"if := .checkQueueExists(); != nil {return0, errors.E(, errors.CanonicalCode(), ) } := []string{base.GroupKey(, ),base.AllGroups(), } := []interface{}{base.TaskKeyPrefix(), , } , := deleteAllAggregatingCmd.Run(context.Background(), .client, , ...).Result()if != nil {return0, errors.E(, errors.Unknown, ) } , := .(int64)if ! {return0, errors.E(, errors.Internal, "command error: unexpected return value %v", ) }return , nil}// deleteAllPendingCmd deletes all pending tasks from the given queue.//// Input:// KEYS[1] -> asynq:{<qname>}:pending// --// ARGV[1] -> task key prefix//// Output:// integer: number of tasks deletedvar deleteAllPendingCmd = redis.NewScript(`local ids = redis.call("LRANGE", KEYS[1], 0, -1)for _, id in ipairs(ids) do redis.call("DEL", ARGV[1] .. id)endredis.call("DEL", KEYS[1])return table.getn(ids)`)// DeleteAllPendingTasks deletes all pending tasks from the given queue// and returns the number of tasks deleted.func ( *RDB) ( string) (int64, error) {varerrors.Op = "rdb.DeleteAllPendingTasks"if := .checkQueueExists(); != nil {return0, errors.E(, errors.CanonicalCode(), ) } := []string{base.PendingKey(), } := []interface{}{base.TaskKeyPrefix(), } , := deleteAllPendingCmd.Run(context.Background(), .client, , ...).Result()if != nil {return0, errors.E(, errors.Unknown, ) } , := .(int64)if ! {return0, errors.E(, errors.Internal, "command error: unexpected return value %v", ) }return , nil}// removeQueueForceCmd removes the given queue regardless of// whether the queue is empty.// It only check whether active queue is empty before removing.//// Input:// KEYS[1] -> asynq:{<qname>}// KEYS[2] -> asynq:{<qname>}:active// KEYS[3] -> asynq:{<qname>}:scheduled// KEYS[4] -> asynq:{<qname>}:retry// KEYS[5] -> asynq:{<qname>}:archived// KEYS[6] -> asynq:{<qname>}:lease// --// ARGV[1] -> task key prefix//// Output:// Numeric code to indicate the status.// Returns 1 if successfully removed.// Returns -2 if the queue has active tasks.var removeQueueForceCmd = redis.NewScript(`local active = redis.call("LLEN", KEYS[2])if active > 0 then return -2endfor _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do redis.call("DEL", ARGV[1] .. id)endfor _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do redis.call("DEL", ARGV[1] .. id)endfor _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do redis.call("DEL", ARGV[1] .. id)endfor _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do redis.call("DEL", ARGV[1] .. id)endfor _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do redis.call("DEL", ARGV[1] .. id)endfor _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do redis.call("DEL", ARGV[1] .. id)endfor _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do redis.call("DEL", ARGV[1] .. id)endfor _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do redis.call("DEL", ARGV[1] .. id)endfor _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do redis.call("DEL", ARGV[1] .. id)endfor _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do redis.call("DEL", ARGV[1] .. id)endredis.call("DEL", KEYS[1])redis.call("DEL", KEYS[2])redis.call("DEL", KEYS[3])redis.call("DEL", KEYS[4])redis.call("DEL", KEYS[5])redis.call("DEL", KEYS[6])return 1`)// removeQueueCmd removes the given queue.// It checks whether queue is empty before removing.//// Input:// KEYS[1] -> asynq:{<qname>}:pending// KEYS[2] -> asynq:{<qname>}:active// KEYS[3] -> asynq:{<qname>}:scheduled// KEYS[4] -> asynq:{<qname>}:retry// KEYS[5] -> asynq:{<qname>}:archived// KEYS[6] -> asynq:{<qname>}:lease// --// ARGV[1] -> task key prefix//// Output:// Numeric code to indicate the status// Returns 1 if successfully removed.// Returns -1 if queue is not emptyvar removeQueueCmd = redis.NewScript(`local ids = {}for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do table.insert(ids, id)endfor _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do table.insert(ids, id)endfor _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do table.insert(ids, id)endfor _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do table.insert(ids, id)endfor _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do table.insert(ids, id)endif table.getn(ids) > 0 then return -1endfor _, id in ipairs(ids) do redis.call("DEL", ARGV[1] .. id)endfor _, id in ipairs(ids) do redis.call("DEL", ARGV[1] .. id)endredis.call("DEL", KEYS[1])redis.call("DEL", KEYS[2])redis.call("DEL", KEYS[3])redis.call("DEL", KEYS[4])redis.call("DEL", KEYS[5])redis.call("DEL", KEYS[6])return 1`)// RemoveQueue removes the specified queue.//// If force is set to true, it will remove the queue regardless// as long as no tasks are active for the queue.// If force is set to false, it will only remove the queue if// the queue is empty.func ( *RDB) ( string, bool) error {varerrors.Op = "rdb.RemoveQueue" , := .queueExists()if != nil {return }if ! {returnerrors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: }) }var *redis.Scriptif { = removeQueueForceCmd } else { = removeQueueCmd } := []string{base.PendingKey(),base.ActiveKey(),base.ScheduledKey(),base.RetryKey(),base.ArchivedKey(),base.LeaseKey(), } , := .Run(context.Background(), .client, , base.TaskKeyPrefix()).Result()if != nil {returnerrors.E(, errors.Unknown, ) } , := .(int64)if ! {returnerrors.E(, errors.Internal, fmt.Sprintf("unexpeced return value from Lua script: %v", )) }switch {case1:if := .client.SRem(context.Background(), base.AllQueues, ).Err(); != nil {returnerrors.E(, errors.Unknown, ) }returnnilcase -1:returnerrors.E(, errors.NotFound, &errors.QueueNotEmptyError{Queue: })case -2:returnerrors.E(, errors.FailedPrecondition, "cannot remove queue with active tasks")default:returnerrors.E(, errors.Unknown, fmt.Sprintf("unexpected return value from Lua script: %d", )) }}// Note: Script also removes stale keys.var listServerKeysCmd = redis.NewScript(`local now = tonumber(ARGV[1])local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)return keys`)// ListServers returns the list of server info.func ( *RDB) () ([]*base.ServerInfo, error) { := .clock.Now() , := listServerKeysCmd.Run(context.Background(), .client, []string{base.AllServers}, .Unix()).Result()if != nil {returnnil, } , := cast.ToStringSliceE()if != nil {returnnil, }var []*base.ServerInfofor , := range { , := .client.Get(context.Background(), ).Result()if != nil {continue// skip bad data } , := base.DecodeServerInfo([]byte())if != nil {continue// skip bad data } = append(, ) }return , nil}// Note: Script also removes stale keys.var listWorkersCmd = redis.NewScript(`local now = tonumber(ARGV[1])local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)return keys`)// ListWorkers returns the list of worker stats.func ( *RDB) () ([]*base.WorkerInfo, error) {varerrors.Op = "rdb.ListWorkers" := .clock.Now() , := listWorkersCmd.Run(context.Background(), .client, []string{base.AllWorkers}, .Unix()).Result()if != nil {returnnil, errors.E(, errors.Unknown, ) } , := cast.ToStringSliceE()if != nil {returnnil, errors.E(, errors.Internal, fmt.Sprintf("unexpeced return value from Lua script: %v", )) }var []*base.WorkerInfofor , := range { , := .client.HVals(context.Background(), ).Result()if != nil {continue// skip bad data }for , := range { , := base.DecodeWorkerInfo([]byte())if != nil {continue// skip bad data } = append(, ) } }return , nil}// Note: Script also removes stale keys.var listSchedulerKeysCmd = redis.NewScript(`local now = tonumber(ARGV[1])local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)return keys`)// ListSchedulerEntries returns the list of scheduler entries.func ( *RDB) () ([]*base.SchedulerEntry, error) { := .clock.Now() , := listSchedulerKeysCmd.Run(context.Background(), .client, []string{base.AllSchedulers}, .Unix()).Result()if != nil {returnnil, } , := cast.ToStringSliceE()if != nil {returnnil, }var []*base.SchedulerEntryfor , := range { , := .client.LRange(context.Background(), , 0, -1).Result()if != nil {continue// skip bad data }for , := range { , := base.DecodeSchedulerEntry([]byte())if != nil {continue// skip bad data } = append(, ) } }return , nil}// ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.func ( *RDB) ( string, Pagination) ([]*base.SchedulerEnqueueEvent, error) { := base.SchedulerHistoryKey() , := .client.ZRevRangeWithScores(context.Background(), , .start(), .stop()).Result()if != nil {returnnil, }var []*base.SchedulerEnqueueEventfor , := range { , := cast.ToStringE(.Member)if != nil {returnnil, } , := base.DecodeSchedulerEnqueueEvent([]byte())if != nil {returnnil, } = append(, ) }return , nil}// Pause pauses processing of tasks from the given queue.func ( *RDB) ( string) error { := base.PausedKey() , := .client.SetNX(context.Background(), , .clock.Now().Unix(), 0).Result()if != nil {return }if ! {returnfmt.Errorf("queue %q is already paused", ) }returnnil}// Unpause resumes processing of tasks from the given queue.func ( *RDB) ( string) error { := base.PausedKey() , := .client.Del(context.Background(), ).Result()if != nil {return }if == 0 {returnfmt.Errorf("queue %q is not paused", ) }returnnil}// ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.func ( *RDB) ( string) (int64, error) { := base.PendingKey()return .client.ClusterKeySlot(context.Background(), ).Result()}// ClusterNodes returns a list of nodes the given queue belongs to.func ( *RDB) ( string) ([]redis.ClusterNode, error) { , := .ClusterKeySlot()if != nil {returnnil, } , := .client.ClusterSlots(context.Background()).Result()if != nil {returnnil, }for , := range {ifint64(.Start) <= && <= int64(.End) {return .Nodes, nil } }returnnil, fmt.Errorf("nodes not found")}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.