// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package rdb

import (
	
	
	
	

	
	
	
	
)

// AllQueues returns a list of all queue names.
func ( *RDB) () ([]string, error) {
	return .client.SMembers(context.Background(), base.AllQueues).Result()
}

// Stats represents a state of queues at a certain time.
type Stats struct {
	// Name of the queue (e.g. "default", "critical").
	Queue string
	// MemoryUsage is the total number of bytes the queue and its tasks require
	// to be stored in redis. It is an approximate memory usage value in bytes
	// since the value is computed by sampling.
	MemoryUsage int64
	// Paused indicates whether the queue is paused.
	// If true, tasks in the queue should not be processed.
	Paused bool
	// Size is the total number of tasks in the queue.
	Size int

	// Groups is the total number of groups in the queue.
	Groups int

	// Number of tasks in each state.
	Pending     int
	Active      int
	Scheduled   int
	Retry       int
	Archived    int
	Completed   int
	Aggregating int

	// Number of tasks processed within the current date.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Number of tasks failed within the current date.
	Failed int

	// Total number of tasks processed (both succeeded and failed) from this queue.
	ProcessedTotal int
	// Total number of tasks failed.
	FailedTotal int

	// Latency of the queue, measured by the oldest pending task in the queue.
	Latency time.Duration
	// Time this stats was taken.
	Timestamp time.Time
}

// DailyStats holds aggregate data for a given day.
type DailyStats struct {
	// Name of the queue (e.g. "default", "critical").
	Queue string
	// Total number of tasks processed during the given day.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Total number of tasks failed during the given day.
	Failed int
	// Date this stats was taken.
	Time time.Time
}

// KEYS[1] ->  asynq:<qname>:pending
// KEYS[2] ->  asynq:<qname>:active
// KEYS[3] ->  asynq:<qname>:scheduled
// KEYS[4] ->  asynq:<qname>:retry
// KEYS[5] ->  asynq:<qname>:archived
// KEYS[6] ->  asynq:<qname>:completed
// KEYS[7] ->  asynq:<qname>:processed:<yyyy-mm-dd>
// KEYS[8] ->  asynq:<qname>:failed:<yyyy-mm-dd>
// KEYS[9] ->  asynq:<qname>:processed
// KEYS[10] -> asynq:<qname>:failed
// KEYS[11] -> asynq:<qname>:paused
// KEYS[12] -> asynq:<qname>:groups
// --------
// ARGV[1] -> task key prefix
// ARGV[2] -> group key prefix
var currentStatsCmd = redis.NewScript(`
local res = {}
local pendingTaskCount = redis.call("LLEN", KEYS[1])
table.insert(res, KEYS[1])
table.insert(res, pendingTaskCount)
table.insert(res, KEYS[2])
table.insert(res, redis.call("LLEN", KEYS[2]))
table.insert(res, KEYS[3])
table.insert(res, redis.call("ZCARD", KEYS[3]))
table.insert(res, KEYS[4])
table.insert(res, redis.call("ZCARD", KEYS[4]))
table.insert(res, KEYS[5])
table.insert(res, redis.call("ZCARD", KEYS[5]))
table.insert(res, KEYS[6])
table.insert(res, redis.call("ZCARD", KEYS[6]))
for i=7,10 do
    local count = 0
	local n = redis.call("GET", KEYS[i])
	if n then
	    count = tonumber(n)
	end
	table.insert(res, KEYS[i])
	table.insert(res, count)
end
table.insert(res, KEYS[11])
table.insert(res, redis.call("EXISTS", KEYS[11]))
table.insert(res, "oldest_pending_since")
if pendingTaskCount > 0 then
	local id = redis.call("LRANGE", KEYS[1], -1, -1)[1]
	table.insert(res, redis.call("HGET", ARGV[1] .. id, "pending_since"))
else
	table.insert(res, 0)
end
local group_names = redis.call("SMEMBERS", KEYS[12])
table.insert(res, "group_size")
table.insert(res, table.getn(group_names))
local aggregating_count = 0
for _, gname in ipairs(group_names) do
	aggregating_count = aggregating_count + redis.call("ZCARD", ARGV[2] .. gname)
end
table.insert(res, "aggregating_count")
table.insert(res, aggregating_count)
return res`)

// CurrentStats returns a current state of the queues.
func ( *RDB) ( string) (*Stats, error) {
	var  errors.Op = "rdb.CurrentStats"
	,  := .queueExists()
	if  != nil {
		return nil, errors.E(, errors.Unknown, )
	}
	if ! {
		return nil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: })
	}
	 := .clock.Now()
	 := []string{
		base.PendingKey(),
		base.ActiveKey(),
		base.ScheduledKey(),
		base.RetryKey(),
		base.ArchivedKey(),
		base.CompletedKey(),
		base.ProcessedKey(, ),
		base.FailedKey(, ),
		base.ProcessedTotalKey(),
		base.FailedTotalKey(),
		base.PausedKey(),
		base.AllGroups(),
	}
	 := []interface{}{
		base.TaskKeyPrefix(),
		base.GroupKeyPrefix(),
	}
	,  := currentStatsCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		return nil, errors.E(, errors.Unknown, )
	}
	,  := cast.ToSliceE()
	if  != nil {
		return nil, errors.E(, errors.Internal, "cast error: unexpected return value from Lua script")
	}
	 := &Stats{
		Queue:     ,
		Timestamp: ,
	}
	 := 0
	for  := 0;  < len();  += 2 {
		 := cast.ToString([])
		 := cast.ToInt([+1])
		switch  {
		case base.PendingKey():
			.Pending = 
			 += 
		case base.ActiveKey():
			.Active = 
			 += 
		case base.ScheduledKey():
			.Scheduled = 
			 += 
		case base.RetryKey():
			.Retry = 
			 += 
		case base.ArchivedKey():
			.Archived = 
			 += 
		case base.CompletedKey():
			.Completed = 
			 += 
		case base.ProcessedKey(, ):
			.Processed = 
		case base.FailedKey(, ):
			.Failed = 
		case base.ProcessedTotalKey():
			.ProcessedTotal = 
		case base.FailedTotalKey():
			.FailedTotal = 
		case base.PausedKey():
			if  == 0 {
				.Paused = false
			} else {
				.Paused = true
			}
		case "oldest_pending_since":
			if  == 0 {
				.Latency = 0
			} else {
				.Latency = .clock.Now().Sub(time.Unix(0, int64()))
			}
		case "group_size":
			.Groups = 
		case "aggregating_count":
			.Aggregating = 
			 += 
		}
	}
	.Size = 
	,  := .memoryUsage()
	if  != nil {
		return nil, errors.E(, errors.CanonicalCode(), )
	}
	.MemoryUsage = 
	return , nil
}

// Computes memory usage for the given queue by sampling tasks
// from each redis list/zset. Returns approximate memory usage value
// in bytes.
//
// KEYS[1] -> asynq:{qname}:active
// KEYS[2] -> asynq:{qname}:pending
// KEYS[3] -> asynq:{qname}:scheduled
// KEYS[4] -> asynq:{qname}:retry
// KEYS[5] -> asynq:{qname}:archived
// KEYS[6] -> asynq:{qname}:completed
// KEYS[7] -> asynq:{qname}:groups
// -------
// ARGV[1] -> asynq:{qname}:t: (task key prefix)
// ARGV[2] -> task sample size per redis list/zset (e.g 20)
// ARGV[3] -> group sample size
// ARGV[4] -> asynq:{qname}:g: (group key prefix)
var memoryUsageCmd = redis.NewScript(`
local sample_size = tonumber(ARGV[2])
if sample_size <= 0 then
    return redis.error_reply("sample size must be a positive number")
end
local memusg = 0
for i=1,2 do
    local ids = redis.call("LRANGE", KEYS[i], 0, sample_size - 1)
    local sample_total = 0
    if (table.getn(ids) > 0) then
        for _, id in ipairs(ids) do
            local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
            sample_total = sample_total + bytes
        end
        local n = redis.call("LLEN", KEYS[i])
        local avg = sample_total / table.getn(ids)
        memusg = memusg + (avg * n)
    end
    local m = redis.call("MEMORY", "USAGE", KEYS[i])
    if (m) then
        memusg = memusg + m
    end
end
for i=3,6 do
    local ids = redis.call("ZRANGE", KEYS[i], 0, sample_size - 1)
    local sample_total = 0
    if (table.getn(ids) > 0) then
        for _, id in ipairs(ids) do
            local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
            sample_total = sample_total + bytes
        end
        local n = redis.call("ZCARD", KEYS[i])
        local avg = sample_total / table.getn(ids)
        memusg = memusg + (avg * n)
    end
    local m = redis.call("MEMORY", "USAGE", KEYS[i])
    if (m) then
        memusg = memusg + m
    end
end
local groups = redis.call("SMEMBERS", KEYS[7])
if table.getn(groups) > 0 then
	local agg_task_count = 0
	local agg_task_sample_total = 0
	local agg_task_sample_size = 0
	for i, gname in ipairs(groups) do
		local group_key = ARGV[4] .. gname
		agg_task_count = agg_task_count + redis.call("ZCARD", group_key)
		if i <= tonumber(ARGV[3]) then
			local ids = redis.call("ZRANGE", group_key, 0, sample_size - 1)
			for _, id in ipairs(ids) do
				local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
				agg_task_sample_total = agg_task_sample_total + bytes
				agg_task_sample_size = agg_task_sample_size + 1
			end
		end
	end
	local avg = agg_task_sample_total / agg_task_sample_size
	memusg = memusg + (avg * agg_task_count)
end
return memusg
`)

func ( *RDB) ( string) (int64, error) {
	var  errors.Op = "rdb.memoryUsage"
	const (
		  = 20
		 = 5
	)

	 := []string{
		base.ActiveKey(),
		base.PendingKey(),
		base.ScheduledKey(),
		base.RetryKey(),
		base.ArchivedKey(),
		base.CompletedKey(),
		base.AllGroups(),
	}
	 := []interface{}{
		base.TaskKeyPrefix(),
		,
		,
		base.GroupKeyPrefix(),
	}
	,  := memoryUsageCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		return 0, errors.E(, errors.Unknown, fmt.Sprintf("redis eval error: %v", ))
	}
	,  := cast.ToInt64E()
	if  != nil {
		return 0, errors.E(, errors.Internal, fmt.Sprintf("could not cast script return value to int64"))
	}
	return , nil
}

var historicalStatsCmd = redis.NewScript(`
local res = {}
for _, key in ipairs(KEYS) do
	local n = redis.call("GET", key)
	if not n then
		n = 0
	end
	table.insert(res, tonumber(n))
end
return res`)

// HistoricalStats returns a list of stats from the last n days for the given queue.
func ( *RDB) ( string,  int) ([]*DailyStats, error) {
	var  errors.Op = "rdb.HistoricalStats"
	if  < 1 {
		return nil, errors.E(, errors.FailedPrecondition, "the number of days must be positive")
	}
	,  := .queueExists()
	if  != nil {
		return nil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: })
	}
	if ! {
		return nil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: })
	}
	const  = 24 * time.Hour
	 := .clock.Now().UTC()
	var  []time.Time
	var  []string
	for  := 0;  < ; ++ {
		 := .Add(-time.Duration() * )
		 = append(, )
		 = append(, base.ProcessedKey(, ))
		 = append(, base.FailedKey(, ))
	}
	,  := historicalStatsCmd.Run(context.Background(), .client, ).Result()
	if  != nil {
		return nil, errors.E(, errors.Unknown, fmt.Sprintf("redis eval error: %v", ))
	}
	,  := cast.ToIntSliceE()
	if  != nil {
		return nil, errors.E(, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", ))
	}
	var  []*DailyStats
	for  := 0;  < len();  += 2 {
		 = append(, &DailyStats{
			Queue:     ,
			Processed: [],
			Failed:    [+1],
			Time:      [/2],
		})
	}
	return , nil
}

// RedisInfo returns a map of redis info.
func ( *RDB) () (map[string]string, error) {
	,  := .client.Info(context.Background()).Result()
	if  != nil {
		return nil, 
	}
	return parseInfo()
}

// RedisClusterInfo returns a map of redis cluster info.
func ( *RDB) () (map[string]string, error) {
	,  := .client.ClusterInfo(context.Background()).Result()
	if  != nil {
		return nil, 
	}
	return parseInfo()
}

func parseInfo( string) (map[string]string, error) {
	 := make(map[string]string)
	 := strings.Split(, "\r\n")
	for ,  := range  {
		 := strings.Split(, ":")
		if len() == 2 {
			[[0]] = [1]
		}
	}
	return , nil
}

// TODO: Use generics once available.
func reverse( []*base.TaskInfo) {
	for  := len()/2 - 1;  >= 0; -- {
		 := len() - 1 - 
		[], [] = [], []
	}
}

// checkQueueExists verifies whether the queue exists.
// It returns QueueNotFoundError if queue doesn't exist.
func ( *RDB) ( string) error {
	,  := .queueExists()
	if  != nil {
		return errors.E(errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: })
	}
	if ! {
		return errors.E(errors.Internal, &errors.QueueNotFoundError{Queue: })
	}
	return nil
}

// Input:
// KEYS[1] -> task key (asynq:{<qname>}:t:<taskid>)
// ARGV[1] -> task id
// ARGV[2] -> current time in Unix time (seconds)
// ARGV[3] -> queue key prefix (asynq:{<qname>}:)
//
// Output:
// Tuple of {msg, state, nextProcessAt, result}
// msg: encoded task message
// state: string describing the state of the task
// nextProcessAt: unix time in seconds, zero if not applicable.
// result: result data associated with the task
//
// If the task key doesn't exist, it returns error with a message "NOT FOUND"
var getTaskInfoCmd = redis.NewScript(`
	if redis.call("EXISTS", KEYS[1]) == 0 then
		return redis.error_reply("NOT FOUND")
	end
	local msg, state, result = unpack(redis.call("HMGET", KEYS[1], "msg", "state", "result"))
	if state == "scheduled" or state == "retry" then
		return {msg, state, redis.call("ZSCORE", ARGV[3] .. state, ARGV[1]), result}
	end
	if state == "pending" then
		return {msg, state, ARGV[2], result}
	end
	return {msg, state, 0, result}
`)

// GetTaskInfo returns a TaskInfo describing the task from the given queue.
func ( *RDB) (,  string) (*base.TaskInfo, error) {
	var  errors.Op = "rdb.GetTaskInfo"
	if  := .checkQueueExists();  != nil {
		return nil, errors.E(, errors.CanonicalCode(), )
	}
	 := []string{base.TaskKey(, )}
	 := []interface{}{
		,
		.clock.Now().Unix(),
		base.QueueKeyPrefix(),
	}
	,  := getTaskInfoCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		if .Error() == "NOT FOUND" {
			return nil, errors.E(, errors.NotFound, &errors.TaskNotFoundError{Queue: , ID: })
		}
		return nil, errors.E(, errors.Unknown, )
	}
	,  := cast.ToSliceE()
	if  != nil {
		return nil, errors.E(, errors.Internal, "unexpected value returned from Lua script")
	}
	if len() != 4 {
		return nil, errors.E(, errors.Internal, "unepxected number of values returned from Lua script")
	}
	,  := cast.ToStringE([0])
	if  != nil {
		return nil, errors.E(, errors.Internal, "unexpected value returned from Lua script")
	}
	,  := cast.ToStringE([1])
	if  != nil {
		return nil, errors.E(, errors.Internal, "unexpected value returned from Lua script")
	}
	,  := cast.ToInt64E([2])
	if  != nil {
		return nil, errors.E(, errors.Internal, "unexpected value returned from Lua script")
	}
	,  := cast.ToStringE([3])
	if  != nil {
		return nil, errors.E(, errors.Internal, "unexpected value returned from Lua script")
	}
	,  := base.DecodeMessage([]byte())
	if  != nil {
		return nil, errors.E(, errors.Internal, "could not decode task message")
	}
	,  := base.TaskStateFromString()
	if  != nil {
		return nil, errors.E(, errors.CanonicalCode(), )
	}
	var  time.Time
	if  != 0 {
		 = time.Unix(, 0)
	}
	var  []byte
	if len() > 0 {
		 = []byte()
	}
	return &base.TaskInfo{
		Message:       ,
		State:         ,
		NextProcessAt: ,
		Result:        ,
	}, nil
}

type GroupStat struct {
	// Name of the group.
	Group string

	// Size of the group.
	Size int
}

// KEYS[1] -> asynq:{<qname>}:groups
// -------
// ARGV[1] -> group key prefix
//
// Output:
// list of group name and size (e.g. group1 size1 group2 size2 ...)
//
// Time Complexity:
// O(N) where N being the number of groups in the given queue.
var groupStatsCmd = redis.NewScript(`
local res = {}
local group_names = redis.call("SMEMBERS", KEYS[1])
for _, gname in ipairs(group_names) do
	local size = redis.call("ZCARD", ARGV[1] .. gname)
	table.insert(res, gname)
	table.insert(res, size)
end
return res
`)

func ( *RDB) ( string) ([]*GroupStat, error) {
	var  errors.Op = "RDB.GroupStats"
	 := []string{base.AllGroups()}
	 := []interface{}{base.GroupKeyPrefix()}
	,  := groupStatsCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		return nil, errors.E(, errors.Unknown, )
	}
	,  := cast.ToSliceE()
	if  != nil {
		return nil, errors.E(, errors.Internal, "cast error: unexpected return value from Lua script")
	}
	var  []*GroupStat
	for  := 0;  < len();  += 2 {
		 = append(, &GroupStat{
			Group: [].(string),
			Size:  int([+1].(int64)),
		})
	}
	return , nil
}

// Pagination specifies the page size and page number
// for the list operation.
type Pagination struct {
	// Number of items in the page.
	Size int

	// Page number starting from zero.
	Page int
}

func ( Pagination) () int64 {
	return int64(.Size * .Page)
}

func ( Pagination) () int64 {
	return int64(.Size*.Page + .Size - 1)
}

// ListPending returns pending tasks that are ready to be processed.
func ( *RDB) ( string,  Pagination) ([]*base.TaskInfo, error) {
	var  errors.Op = "rdb.ListPending"
	,  := .queueExists()
	if  != nil {
		return nil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: })
	}
	if ! {
		return nil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: })
	}
	,  := .listMessages(, base.TaskStatePending, )
	if  != nil {
		return nil, errors.E(, errors.CanonicalCode(), )
	}
	return , nil
}

// ListActive returns all tasks that are currently being processed for the given queue.
func ( *RDB) ( string,  Pagination) ([]*base.TaskInfo, error) {
	var  errors.Op = "rdb.ListActive"
	,  := .queueExists()
	if  != nil {
		return nil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: })
	}
	if ! {
		return nil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: })
	}
	,  := .listMessages(, base.TaskStateActive, )
	if  != nil {
		return nil, errors.E(, errors.CanonicalCode(), )
	}
	return , nil
}

// KEYS[1] -> key for id list (e.g. asynq:{<qname>}:pending)
// ARGV[1] -> start offset
// ARGV[2] -> stop offset
// ARGV[3] -> task key prefix
var listMessagesCmd = redis.NewScript(`
local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2])
local data = {}
for _, id in ipairs(ids) do
	local key = ARGV[3] .. id
	local msg, result = unpack(redis.call("HMGET", key, "msg","result"))
	table.insert(data, msg)
	table.insert(data, result)
end
return data
`)

// listMessages returns a list of TaskInfo in Redis list with the given key.
func ( *RDB) ( string,  base.TaskState,  Pagination) ([]*base.TaskInfo, error) {
	var  string
	switch  {
	case base.TaskStateActive:
		 = base.ActiveKey()
	case base.TaskStatePending:
		 = base.PendingKey()
	default:
		panic(fmt.Sprintf("unsupported task state: %v", ))
	}
	// Note: Because we use LPUSH to redis list, we need to calculate the
	// correct range and reverse the list to get the tasks with pagination.
	 := -.start() - 1
	 := -.stop() - 1
	,  := listMessagesCmd.Run(context.Background(), .client,
		[]string{}, , , base.TaskKeyPrefix()).Result()
	if  != nil {
		return nil, errors.E(errors.Unknown, )
	}
	,  := cast.ToStringSliceE()
	if  != nil {
		return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", ))
	}
	var  []*base.TaskInfo
	for  := 0;  < len();  += 2 {
		,  := base.DecodeMessage([]byte([]))
		if  != nil {
			continue // bad data, ignore and continue
		}
		var  []byte
		if len([+1]) > 0 {
			 = []byte([+1])
		}
		var  time.Time
		if  == base.TaskStatePending {
			 = .clock.Now()
		}
		 = append(, &base.TaskInfo{
			Message:       ,
			State:         ,
			NextProcessAt: ,
			Result:        ,
		})
	}
	reverse()
	return , nil

}

// ListScheduled returns all tasks from the given queue that are scheduled
// to be processed in the future.
func ( *RDB) ( string,  Pagination) ([]*base.TaskInfo, error) {
	var  errors.Op = "rdb.ListScheduled"
	,  := .queueExists()
	if  != nil {
		return nil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: })
	}
	if ! {
		return nil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: })
	}
	,  := .listZSetEntries(, base.TaskStateScheduled, base.ScheduledKey(), )
	if  != nil {
		return nil, errors.E(, errors.CanonicalCode(), )
	}
	return , nil
}

// ListRetry returns all tasks from the given queue that have failed before
// and willl be retried in the future.
func ( *RDB) ( string,  Pagination) ([]*base.TaskInfo, error) {
	var  errors.Op = "rdb.ListRetry"
	,  := .queueExists()
	if  != nil {
		return nil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: })
	}
	if ! {
		return nil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: })
	}
	,  := .listZSetEntries(, base.TaskStateRetry, base.RetryKey(), )
	if  != nil {
		return nil, errors.E(, errors.CanonicalCode(), )
	}
	return , nil
}

// ListArchived returns all tasks from the given queue that have exhausted its retry limit.
func ( *RDB) ( string,  Pagination) ([]*base.TaskInfo, error) {
	var  errors.Op = "rdb.ListArchived"
	,  := .queueExists()
	if  != nil {
		return nil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: })
	}
	if ! {
		return nil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: })
	}
	,  := .listZSetEntries(, base.TaskStateArchived, base.ArchivedKey(), )
	if  != nil {
		return nil, errors.E(, errors.CanonicalCode(), )
	}
	return , nil
}

// ListCompleted returns all tasks from the given queue that have completed successfully.
func ( *RDB) ( string,  Pagination) ([]*base.TaskInfo, error) {
	var  errors.Op = "rdb.ListCompleted"
	,  := .queueExists()
	if  != nil {
		return nil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: })
	}
	if ! {
		return nil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: })
	}
	,  := .listZSetEntries(, base.TaskStateCompleted, base.CompletedKey(), )
	if  != nil {
		return nil, errors.E(, errors.CanonicalCode(), )
	}
	return , nil
}

// ListAggregating returns all tasks from the given group.
func ( *RDB) (,  string,  Pagination) ([]*base.TaskInfo, error) {
	var  errors.Op = "rdb.ListAggregating"
	,  := .queueExists()
	if  != nil {
		return nil, errors.E(, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: })
	}
	if ! {
		return nil, errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: })
	}
	,  := .listZSetEntries(, base.TaskStateAggregating, base.GroupKey(, ), )
	if  != nil {
		return nil, errors.E(, errors.CanonicalCode(), )
	}
	return , nil
}

// Reports whether a queue with the given name exists.
func ( *RDB) ( string) (bool, error) {
	return .client.SIsMember(context.Background(), base.AllQueues, ).Result()
}

// KEYS[1] -> key for ids set (e.g. asynq:{<qname>}:scheduled)
// ARGV[1] -> min
// ARGV[2] -> max
// ARGV[3] -> task key prefix
//
// Returns an array populated with
// [msg1, score1, result1, msg2, score2, result2, ..., msgN, scoreN, resultN]
var listZSetEntriesCmd = redis.NewScript(`
local data = {}
local id_score_pairs = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2], "WITHSCORES")
for i = 1, table.getn(id_score_pairs), 2 do
	local id = id_score_pairs[i]
	local score = id_score_pairs[i+1]
	local key = ARGV[3] .. id
	local msg, res = unpack(redis.call("HMGET", key, "msg", "result"))
	table.insert(data, msg)
	table.insert(data, score)
	table.insert(data, res)
end
return data
`)

// listZSetEntries returns a list of message and score pairs in Redis sorted-set
// with the given key.
func ( *RDB) ( string,  base.TaskState,  string,  Pagination) ([]*base.TaskInfo, error) {
	,  := listZSetEntriesCmd.Run(context.Background(), .client, []string{},
		.start(), .stop(), base.TaskKeyPrefix()).Result()
	if  != nil {
		return nil, errors.E(errors.Unknown, )
	}
	,  := cast.ToSliceE()
	if  != nil {
		return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", ))
	}
	var  []*base.TaskInfo
	for  := 0;  < len();  += 3 {
		,  := cast.ToStringE([])
		if  != nil {
			return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", ))
		}
		,  := cast.ToInt64E([+1])
		if  != nil {
			return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", ))
		}
		,  := cast.ToStringE([+2])
		if  != nil {
			return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", ))
		}
		,  := base.DecodeMessage([]byte())
		if  != nil {
			continue // bad data, ignore and continue
		}
		var  time.Time
		if  == base.TaskStateScheduled ||  == base.TaskStateRetry {
			 = time.Unix(, 0)
		}
		var  []byte
		if len() > 0 {
			 = []byte()
		}
		 = append(, &base.TaskInfo{
			Message:       ,
			State:         ,
			NextProcessAt: ,
			Result:        ,
		})
	}
	return , nil
}

// RunAllScheduledTasks enqueues all scheduled tasks from the given queue
// and returns the number of tasks enqueued.
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func ( *RDB) ( string) (int64, error) {
	var  errors.Op = "rdb.RunAllScheduledTasks"
	,  := .runAll(base.ScheduledKey(), )
	if errors.IsQueueNotFound() {
		return 0, errors.E(, errors.NotFound, )
	}
	if  != nil {
		return 0, errors.E(, errors.Unknown, )
	}
	return , nil
}

// RunAllRetryTasks enqueues all retry tasks from the given queue
// and returns the number of tasks enqueued.
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func ( *RDB) ( string) (int64, error) {
	var  errors.Op = "rdb.RunAllRetryTasks"
	,  := .runAll(base.RetryKey(), )
	if errors.IsQueueNotFound() {
		return 0, errors.E(, errors.NotFound, )
	}
	if  != nil {
		return 0, errors.E(, errors.Unknown, )
	}
	return , nil
}

// RunAllArchivedTasks enqueues all archived tasks from the given queue
// and returns the number of tasks enqueued.
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func ( *RDB) ( string) (int64, error) {
	var  errors.Op = "rdb.RunAllArchivedTasks"
	,  := .runAll(base.ArchivedKey(), )
	if errors.IsQueueNotFound() {
		return 0, errors.E(, errors.NotFound, )
	}
	if  != nil {
		return 0, errors.E(, errors.Unknown, )
	}
	return , nil
}

// runAllAggregatingCmd schedules all tasks in the group to run individually.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:g:<gname>
// KEYS[2] -> asynq:{<qname>}:pending
// KEYS[3] -> asynq:{<qname>}:groups
// -------
// ARGV[1] -> task key prefix
// ARGV[2] -> group name
//
// Output:
// integer: number of tasks scheduled to run
var runAllAggregatingCmd = redis.NewScript(`
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
	redis.call("LPUSH", KEYS[2], id)
	redis.call("HSET", ARGV[1] .. id, "state", "pending")
end
redis.call("DEL", KEYS[1])
redis.call("SREM", KEYS[3], ARGV[2])
return table.getn(ids)
`)

// RunAllAggregatingTasks schedules all tasks from the given queue to run
// and returns the number of tasks scheduled to run.
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func ( *RDB) (,  string) (int64, error) {
	var  errors.Op = "rdb.RunAllAggregatingTasks"
	if  := .checkQueueExists();  != nil {
		return 0, errors.E(, errors.CanonicalCode(), )
	}
	 := []string{
		base.GroupKey(, ),
		base.PendingKey(),
		base.AllGroups(),
	}
	 := []interface{}{
		base.TaskKeyPrefix(),
		,
	}
	,  := runAllAggregatingCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		return 0, errors.E(, errors.Internal, )
	}
	,  := .(int64)
	if ! {
		return 0, errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from script %v", ))
	}
	return , nil
}

// runTaskCmd is a Lua script that updates the given task to pending state.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:pending
// KEYS[3] -> asynq:{<qname>}:groups
// --
// ARGV[1] -> task ID
// ARGV[2] -> queue key prefix; asynq:{<qname>}:
// ARGV[3] -> group key prefix
//
// Output:
// Numeric code indicating the status:
// Returns 1 if task is successfully updated.
// Returns 0 if task is not found.
// Returns -1 if task is in active state.
// Returns -2 if task is in pending state.
// Returns error reply if unexpected error occurs.
var runTaskCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 0 then
	return 0
end
local state, group = unpack(redis.call("HMGET", KEYS[1], "state", "group"))
if state == "active" then
	return -1
elseif state == "pending" then
	return -2
elseif state == "aggregating" then
	local n = redis.call("ZREM", ARGV[3] .. group, ARGV[1])
	if n == 0 then
		return redis.error_reply("internal error: task id not found in zset " .. tostring(ARGV[3] .. group))
	end
	if redis.call("ZCARD", ARGV[3] .. group) == 0 then
		redis.call("SREM", KEYS[3], group)
	end
else
	local n = redis.call("ZREM", ARGV[2] .. state, ARGV[1])
	if n == 0 then
		return redis.error_reply("internal error: task id not found in zset " .. tostring(ARGV[2] .. state))
	end
end
redis.call("LPUSH", KEYS[2], ARGV[1])
redis.call("HSET", KEYS[1], "state", "pending")
return 1
`)

// RunTask finds a task that matches the id from the given queue and updates it to pending state.
// It returns nil if it successfully updated the task.
//
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError
// If a task is in active or pending state it returns non-nil error with Code FailedPrecondition.
func ( *RDB) (,  string) error {
	var  errors.Op = "rdb.RunTask"
	if  := .checkQueueExists();  != nil {
		return errors.E(, errors.CanonicalCode(), )
	}
	 := []string{
		base.TaskKey(, ),
		base.PendingKey(),
		base.AllGroups(),
	}
	 := []interface{}{
		,
		base.QueueKeyPrefix(),
		base.GroupKeyPrefix(),
	}
	,  := runTaskCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		return errors.E(, errors.Unknown, )
	}
	,  := .(int64)
	if ! {
		return errors.E(, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", ))
	}
	switch  {
	case 1:
		return nil
	case 0:
		return errors.E(, errors.NotFound, &errors.TaskNotFoundError{Queue: , ID: })
	case -1:
		return errors.E(, errors.FailedPrecondition, "task is already running")
	case -2:
		return errors.E(, errors.FailedPrecondition, "task is already in pending state")
	default:
		return errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from Lua script %d", ))
	}
}

// runAllCmd is a Lua script that moves all tasks in the given state
// (one of: scheduled, retry, archived) to pending state.
//
// Input:
// KEYS[1] -> zset which holds task ids (e.g. asynq:{<qname>}:scheduled)
// KEYS[2] -> asynq:{<qname>}:pending
// --
// ARGV[1] -> task key prefix
//
// Output:
// integer: number of tasks updated to pending state.
var runAllCmd = redis.NewScript(`
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
	redis.call("LPUSH", KEYS[2], id)
	redis.call("HSET", ARGV[1] .. id, "state", "pending")
end
redis.call("DEL", KEYS[1])
return table.getn(ids)`)

func ( *RDB) (,  string) (int64, error) {
	if  := .checkQueueExists();  != nil {
		return 0, 
	}
	 := []string{
		,
		base.PendingKey(),
	}
	 := []interface{}{
		base.TaskKeyPrefix(),
	}
	,  := runAllCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		return 0, 
	}
	,  := .(int64)
	if ! {
		return 0, fmt.Errorf("could not cast %v to int64", )
	}
	if  == -1 {
		return 0, &errors.QueueNotFoundError{Queue: }
	}
	return , nil
}

// ArchiveAllRetryTasks archives all retry tasks from the given queue and
// returns the number of tasks that were moved.
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func ( *RDB) ( string) (int64, error) {
	var  errors.Op = "rdb.ArchiveAllRetryTasks"
	,  := .archiveAll(base.RetryKey(), base.ArchivedKey(), )
	if errors.IsQueueNotFound() {
		return 0, errors.E(, errors.NotFound, )
	}
	if  != nil {
		return 0, errors.E(, errors.Internal, )
	}
	return , nil
}

// ArchiveAllScheduledTasks archives all scheduled tasks from the given queue and
// returns the number of tasks that were moved.
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func ( *RDB) ( string) (int64, error) {
	var  errors.Op = "rdb.ArchiveAllScheduledTasks"
	,  := .archiveAll(base.ScheduledKey(), base.ArchivedKey(), )
	if errors.IsQueueNotFound() {
		return 0, errors.E(, errors.NotFound, )
	}
	if  != nil {
		return 0, errors.E(, errors.Internal, )
	}
	return , nil
}

// archiveAllAggregatingCmd archives all tasks in the given group.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:g:<gname>
// KEYS[2] -> asynq:{<qname>}:archived
// KEYS[3] -> asynq:{<qname>}:groups
// -------
// ARGV[1] -> current timestamp
// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[3] -> max number of tasks in archive (e.g., 100)
// ARGV[4] -> task key prefix (asynq:{<qname>}:t:)
// ARGV[5] -> group name
//
// Output:
// integer: Number of tasks archived
var archiveAllAggregatingCmd = redis.NewScript(`
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
	redis.call("ZADD", KEYS[2], ARGV[1], id)
	redis.call("HSET", ARGV[4] .. id, "state", "archived")
end
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3])
redis.call("DEL", KEYS[1])
redis.call("SREM", KEYS[3], ARGV[5])
return table.getn(ids)
`)

// ArchiveAllAggregatingTasks archives all aggregating tasks from the given group
// and returns the number of tasks archived.
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func ( *RDB) (,  string) (int64, error) {
	var  errors.Op = "rdb.ArchiveAllAggregatingTasks"
	if  := .checkQueueExists();  != nil {
		return 0, errors.E(, errors.CanonicalCode(), )
	}
	 := []string{
		base.GroupKey(, ),
		base.ArchivedKey(),
		base.AllGroups(),
	}
	 := .clock.Now()
	 := []interface{}{
		.Unix(),
		.AddDate(0, 0, -archivedExpirationInDays).Unix(),
		maxArchiveSize,
		base.TaskKeyPrefix(),
		,
	}
	,  := archiveAllAggregatingCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		return 0, errors.E(, errors.Internal, )
	}
	,  := .(int64)
	if ! {
		return 0, errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from script %v", ))
	}
	return , nil
}

// archiveAllPendingCmd is a Lua script that moves all pending tasks from
// the given queue to archived state.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:archived
// --
// ARGV[1] -> current timestamp
// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[3] -> max number of tasks in archive (e.g., 100)
// ARGV[4] -> task key prefix (asynq:{<qname>}:t:)
//
// Output:
// integer: Number of tasks archived
var archiveAllPendingCmd = redis.NewScript(`
local ids = redis.call("LRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
	redis.call("ZADD", KEYS[2], ARGV[1], id)
	redis.call("HSET", ARGV[4] .. id, "state", "archived")
end
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3])
redis.call("DEL", KEYS[1])
return table.getn(ids)`)

// ArchiveAllPendingTasks archives all pending tasks from the given queue and
// returns the number of tasks moved.
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func ( *RDB) ( string) (int64, error) {
	var  errors.Op = "rdb.ArchiveAllPendingTasks"
	if  := .checkQueueExists();  != nil {
		return 0, errors.E(, errors.CanonicalCode(), )
	}
	 := []string{
		base.PendingKey(),
		base.ArchivedKey(),
	}
	 := .clock.Now()
	 := []interface{}{
		.Unix(),
		.AddDate(0, 0, -archivedExpirationInDays).Unix(),
		maxArchiveSize,
		base.TaskKeyPrefix(),
	}
	,  := archiveAllPendingCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		return 0, errors.E(, errors.Internal, )
	}
	,  := .(int64)
	if ! {
		return 0, errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from script %v", ))
	}
	return , nil
}

// archiveTaskCmd is a Lua script that archives a task given a task id.
//
// Input:
// KEYS[1] -> task key (asynq:{<qname>}:t:<task_id>)
// KEYS[2] -> archived key (asynq:{<qname>}:archived)
// KEYS[3] -> all groups key (asynq:{<qname>}:groups)
// --
// ARGV[1] -> id of the task to archive
// ARGV[2] -> current timestamp
// ARGV[3] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[4] -> max number of tasks in archived state (e.g., 100)
// ARGV[5] -> queue key prefix (asynq:{<qname>}:)
// ARGV[6] -> group key prefix (asynq:{<qname>}:g:)
//
// Output:
// Numeric code indicating the status:
// Returns 1 if task is successfully archived.
// Returns 0 if task is not found.
// Returns -1 if task is already archived.
// Returns -2 if task is in active state.
// Returns error reply if unexpected error occurs.
var archiveTaskCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 0 then
	return 0
end
local state, group = unpack(redis.call("HMGET", KEYS[1], "state", "group"))
if state == "active" then
	return -2
end
if state == "archived" then
	return -1
end
if state == "pending" then
	if redis.call("LREM", ARGV[5] .. state, 1, ARGV[1]) == 0 then
		return redis.error_reply("task id not found in list " .. tostring(ARGV[5] .. state))
	end
elseif state == "aggregating" then
	if redis.call("ZREM", ARGV[6] .. group, ARGV[1]) == 0 then
		return redis.error_reply("task id not found in zset " .. tostring(ARGV[6] .. group))
	end
	if redis.call("ZCARD", ARGV[6] .. group) == 0 then
		redis.call("SREM", KEYS[3], group)
	end
else
	if redis.call("ZREM", ARGV[5] .. state, ARGV[1]) == 0 then
		return redis.error_reply("task id not found in zset " .. tostring(ARGV[5] .. state))
	end
end
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1])
redis.call("HSET", KEYS[1], "state", "archived")
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[3])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4])
return 1
`)

// ArchiveTask finds a task that matches the id from the given queue and archives it.
// It returns nil if it successfully archived the task.
//
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError
// If a task is already archived, it returns TaskAlreadyArchivedError.
// If a task is in active state it returns non-nil error with FailedPrecondition code.
func ( *RDB) (,  string) error {
	var  errors.Op = "rdb.ArchiveTask"
	if  := .checkQueueExists();  != nil {
		return errors.E(, errors.CanonicalCode(), )
	}
	 := []string{
		base.TaskKey(, ),
		base.ArchivedKey(),
		base.AllGroups(),
	}
	 := .clock.Now()
	 := []interface{}{
		,
		.Unix(),
		.AddDate(0, 0, -archivedExpirationInDays).Unix(),
		maxArchiveSize,
		base.QueueKeyPrefix(),
		base.GroupKeyPrefix(),
	}
	,  := archiveTaskCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		return errors.E(, errors.Unknown, )
	}
	,  := .(int64)
	if ! {
		return errors.E(, errors.Internal, fmt.Sprintf("could not cast the return value %v from archiveTaskCmd to int64.", ))
	}
	switch  {
	case 1:
		return nil
	case 0:
		return errors.E(, errors.NotFound, &errors.TaskNotFoundError{Queue: , ID: })
	case -1:
		return errors.E(, errors.FailedPrecondition, &errors.TaskAlreadyArchivedError{Queue: , ID: })
	case -2:
		return errors.E(, errors.FailedPrecondition, "cannot archive task in active state. use CancelProcessing instead.")
	case -3:
		return errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: })
	default:
		return errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from archiveTaskCmd script: %d", ))
	}
}

// archiveAllCmd is a Lua script that archives all tasks in either scheduled
// or retry state from the given queue.
//
// Input:
// KEYS[1] -> ZSET to move task from (e.g., asynq:{<qname>}:retry)
// KEYS[2] -> asynq:{<qname>}:archived
// --
// ARGV[1] -> current timestamp
// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[3] -> max number of tasks in archive (e.g., 100)
// ARGV[4] -> task key prefix (asynq:{<qname>}:t:)
//
// Output:
// integer: number of tasks archived
var archiveAllCmd = redis.NewScript(`
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
	redis.call("ZADD", KEYS[2], ARGV[1], id)
	redis.call("HSET", ARGV[4] .. id, "state", "archived")
end
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3])
redis.call("DEL", KEYS[1])
return table.getn(ids)`)

func ( *RDB) (, ,  string) (int64, error) {
	if  := .checkQueueExists();  != nil {
		return 0, 
	}
	 := []string{
		,
		,
	}
	 := .clock.Now()
	 := []interface{}{
		.Unix(),
		.AddDate(0, 0, -archivedExpirationInDays).Unix(),
		maxArchiveSize,
		base.TaskKeyPrefix(),
		,
	}
	,  := archiveAllCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		return 0, 
	}
	,  := .(int64)
	if ! {
		return 0, fmt.Errorf("unexpected return value from script: %v", )
	}
	if  == -1 {
		return 0, &errors.QueueNotFoundError{Queue: }
	}
	return , nil
}

// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:groups
// --
// ARGV[1] -> task ID
// ARGV[2] -> queue key prefix
// ARGV[3] -> group key prefix
//
// Output:
// Numeric code indicating the status:
// Returns 1 if task is successfully deleted.
// Returns 0 if task is not found.
// Returns -1 if task is in active state.
var deleteTaskCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 0 then
	return 0
end
local state, group = unpack(redis.call("HMGET", KEYS[1], "state", "group"))
if state == "active" then
	return -1
end
if state == "pending" then
	if redis.call("LREM", ARGV[2] .. state, 0, ARGV[1]) == 0 then
		return redis.error_reply("task is not found in list: " .. tostring(ARGV[2] .. state))
	end
elseif state == "aggregating" then
	if redis.call("ZREM", ARGV[3] .. group, ARGV[1]) == 0 then
		return redis.error_reply("task is not found in zset: " .. tostring(ARGV[3] .. group))
	end
	if redis.call("ZCARD", ARGV[3] .. group) == 0 then
		redis.call("SREM", KEYS[2], group)
	end
else
	if redis.call("ZREM", ARGV[2] .. state, ARGV[1]) == 0 then
		return redis.error_reply("task is not found in zset: " .. tostring(ARGV[2] .. state))
	end
end
local unique_key = redis.call("HGET", KEYS[1], "unique_key")
if unique_key and unique_key ~= "" and redis.call("GET", unique_key) == ARGV[1] then
	redis.call("DEL", unique_key)
end
return redis.call("DEL", KEYS[1])
`)

// DeleteTask finds a task that matches the id from the given queue and deletes it.
// It returns nil if it successfully archived the task.
//
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError
// If a task is in active state it returns non-nil error with Code FailedPrecondition.
func ( *RDB) (,  string) error {
	var  errors.Op = "rdb.DeleteTask"
	if  := .checkQueueExists();  != nil {
		return errors.E(, errors.CanonicalCode(), )
	}
	 := []string{
		base.TaskKey(, ),
		base.AllGroups(),
	}
	 := []interface{}{
		,
		base.QueueKeyPrefix(),
		base.GroupKeyPrefix(),
	}
	,  := deleteTaskCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		return errors.E(, errors.Unknown, )
	}
	,  := .(int64)
	if ! {
		return errors.E(, errors.Internal, fmt.Sprintf("cast error: deleteTaskCmd script returned unexported value %v", ))
	}
	switch  {
	case 1:
		return nil
	case 0:
		return errors.E(, errors.NotFound, &errors.TaskNotFoundError{Queue: , ID: })
	case -1:
		return errors.E(, errors.FailedPrecondition, "cannot delete task in active state. use CancelProcessing instead.")
	default:
		return errors.E(, errors.Internal, fmt.Sprintf("unexpected return value from deleteTaskCmd script: %d", ))
	}
}

// DeleteAllArchivedTasks deletes all archived tasks from the given queue
// and returns the number of tasks deleted.
func ( *RDB) ( string) (int64, error) {
	var  errors.Op = "rdb.DeleteAllArchivedTasks"
	,  := .deleteAll(base.ArchivedKey(), )
	if errors.IsQueueNotFound() {
		return 0, errors.E(, errors.NotFound, )
	}
	if  != nil {
		return 0, errors.E(, errors.Unknown, )
	}
	return , nil
}

// DeleteAllRetryTasks deletes all retry tasks from the given queue
// and returns the number of tasks deleted.
func ( *RDB) ( string) (int64, error) {
	var  errors.Op = "rdb.DeleteAllRetryTasks"
	,  := .deleteAll(base.RetryKey(), )
	if errors.IsQueueNotFound() {
		return 0, errors.E(, errors.NotFound, )
	}
	if  != nil {
		return 0, errors.E(, errors.Unknown, )
	}
	return , nil
}

// DeleteAllScheduledTasks deletes all scheduled tasks from the given queue
// and returns the number of tasks deleted.
func ( *RDB) ( string) (int64, error) {
	var  errors.Op = "rdb.DeleteAllScheduledTasks"
	,  := .deleteAll(base.ScheduledKey(), )
	if errors.IsQueueNotFound() {
		return 0, errors.E(, errors.NotFound, )
	}
	if  != nil {
		return 0, errors.E(, errors.Unknown, )
	}
	return , nil
}

// DeleteAllCompletedTasks deletes all completed tasks from the given queue
// and returns the number of tasks deleted.
func ( *RDB) ( string) (int64, error) {
	var  errors.Op = "rdb.DeleteAllCompletedTasks"
	,  := .deleteAll(base.CompletedKey(), )
	if errors.IsQueueNotFound() {
		return 0, errors.E(, errors.NotFound, )
	}
	if  != nil {
		return 0, errors.E(, errors.Unknown, )
	}
	return , nil
}

// deleteAllCmd deletes tasks from the given zset.
//
// Input:
// KEYS[1] -> zset holding the task ids.
// --
// ARGV[1] -> task key prefix
//
// Output:
// integer: number of tasks deleted
var deleteAllCmd = redis.NewScript(`
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
	local task_key = ARGV[1] .. id
	local unique_key = redis.call("HGET", task_key, "unique_key")
	if unique_key and unique_key ~= "" and redis.call("GET", unique_key) == id then
		redis.call("DEL", unique_key)
	end
	redis.call("DEL", task_key)
end
redis.call("DEL", KEYS[1])
return table.getn(ids)`)

func ( *RDB) (,  string) (int64, error) {
	if  := .checkQueueExists();  != nil {
		return 0, 
	}
	 := []interface{}{
		base.TaskKeyPrefix(),
		,
	}
	,  := deleteAllCmd.Run(context.Background(), .client, []string{}, ...).Result()
	if  != nil {
		return 0, 
	}
	,  := .(int64)
	if ! {
		return 0, fmt.Errorf("unexpected return value from Lua script: %v", )
	}
	return , nil
}

// deleteAllAggregatingCmd deletes all tasks from the given group.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:g:<gname>
// KEYS[2] -> asynq:{<qname>}:groups
// -------
// ARGV[1] -> task key prefix
// ARGV[2] -> group name
var deleteAllAggregatingCmd = redis.NewScript(`
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
	redis.call("DEL", ARGV[1] .. id)
end
redis.call("SREM", KEYS[2], ARGV[2])
redis.call("DEL", KEYS[1])
return table.getn(ids)
`)

// DeleteAllAggregatingTasks deletes all aggregating tasks from the given group
// and returns the number of tasks deleted.
func ( *RDB) (,  string) (int64, error) {
	var  errors.Op = "rdb.DeleteAllAggregatingTasks"
	if  := .checkQueueExists();  != nil {
		return 0, errors.E(, errors.CanonicalCode(), )
	}
	 := []string{
		base.GroupKey(, ),
		base.AllGroups(),
	}
	 := []interface{}{
		base.TaskKeyPrefix(),
		,
	}
	,  := deleteAllAggregatingCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		return 0, errors.E(, errors.Unknown, )
	}
	,  := .(int64)
	if ! {
		return 0, errors.E(, errors.Internal, "command error: unexpected return value %v", )
	}
	return , nil
}

// deleteAllPendingCmd deletes all pending tasks from the given queue.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:pending
// --
// ARGV[1] -> task key prefix
//
// Output:
// integer: number of tasks deleted
var deleteAllPendingCmd = redis.NewScript(`
local ids = redis.call("LRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
	redis.call("DEL", ARGV[1] .. id)
end
redis.call("DEL", KEYS[1])
return table.getn(ids)`)

// DeleteAllPendingTasks deletes all pending tasks from the given queue
// and returns the number of tasks deleted.
func ( *RDB) ( string) (int64, error) {
	var  errors.Op = "rdb.DeleteAllPendingTasks"
	if  := .checkQueueExists();  != nil {
		return 0, errors.E(, errors.CanonicalCode(), )
	}
	 := []string{
		base.PendingKey(),
	}
	 := []interface{}{
		base.TaskKeyPrefix(),
	}
	,  := deleteAllPendingCmd.Run(context.Background(), .client, , ...).Result()
	if  != nil {
		return 0, errors.E(, errors.Unknown, )
	}
	,  := .(int64)
	if ! {
		return 0, errors.E(, errors.Internal, "command error: unexpected return value %v", )
	}
	return , nil
}

// removeQueueForceCmd removes the given queue regardless of
// whether the queue is empty.
// It only check whether active queue is empty before removing.
//
// Input:
// KEYS[1] -> asynq:{<qname>}
// KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:scheduled
// KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:archived
// KEYS[6] -> asynq:{<qname>}:lease
// --
// ARGV[1] -> task key prefix
//
// Output:
// Numeric code to indicate the status.
// Returns 1 if successfully removed.
// Returns -2 if the queue has active tasks.
var removeQueueForceCmd = redis.NewScript(`
local active = redis.call("LLEN", KEYS[2])
if active > 0 then
    return -2
end
for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do
	redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do
	redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do
	redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do
	redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
	redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do
	redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do
	redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do
	redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do
	redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
	redis.call("DEL", ARGV[1] .. id)
end
redis.call("DEL", KEYS[1])
redis.call("DEL", KEYS[2])
redis.call("DEL", KEYS[3])
redis.call("DEL", KEYS[4])
redis.call("DEL", KEYS[5])
redis.call("DEL", KEYS[6])
return 1`)

// removeQueueCmd removes the given queue.
// It checks whether queue is empty before removing.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:scheduled
// KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:archived
// KEYS[6] -> asynq:{<qname>}:lease
// --
// ARGV[1] -> task key prefix
//
// Output:
// Numeric code to indicate the status
// Returns 1 if successfully removed.
// Returns -1 if queue is not empty
var removeQueueCmd = redis.NewScript(`
local ids = {}
for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do
	table.insert(ids, id)
end
for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do
	table.insert(ids, id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do
	table.insert(ids, id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do
	table.insert(ids, id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
	table.insert(ids, id)
end
if table.getn(ids) > 0 then
	return -1
end
for _, id in ipairs(ids) do
	redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(ids) do
	redis.call("DEL", ARGV[1] .. id)
end
redis.call("DEL", KEYS[1])
redis.call("DEL", KEYS[2])
redis.call("DEL", KEYS[3])
redis.call("DEL", KEYS[4])
redis.call("DEL", KEYS[5])
redis.call("DEL", KEYS[6])
return 1`)

// RemoveQueue removes the specified queue.
//
// If force is set to true, it will remove the queue regardless
// as long as no tasks are active for the queue.
// If force is set to false, it will only remove the queue if
// the queue is empty.
func ( *RDB) ( string,  bool) error {
	var  errors.Op = "rdb.RemoveQueue"
	,  := .queueExists()
	if  != nil {
		return 
	}
	if ! {
		return errors.E(, errors.NotFound, &errors.QueueNotFoundError{Queue: })
	}
	var  *redis.Script
	if  {
		 = removeQueueForceCmd
	} else {
		 = removeQueueCmd
	}
	 := []string{
		base.PendingKey(),
		base.ActiveKey(),
		base.ScheduledKey(),
		base.RetryKey(),
		base.ArchivedKey(),
		base.LeaseKey(),
	}
	,  := .Run(context.Background(), .client, , base.TaskKeyPrefix()).Result()
	if  != nil {
		return errors.E(, errors.Unknown, )
	}
	,  := .(int64)
	if ! {
		return errors.E(, errors.Internal, fmt.Sprintf("unexpeced return value from Lua script: %v", ))
	}
	switch  {
	case 1:
		if  := .client.SRem(context.Background(), base.AllQueues, ).Err();  != nil {
			return errors.E(, errors.Unknown, )
		}
		return nil
	case -1:
		return errors.E(, errors.NotFound, &errors.QueueNotEmptyError{Queue: })
	case -2:
		return errors.E(, errors.FailedPrecondition, "cannot remove queue with active tasks")
	default:
		return errors.E(, errors.Unknown, fmt.Sprintf("unexpected return value from Lua script: %d", ))
	}
}

// Note: Script also removes stale keys.
var listServerKeysCmd = redis.NewScript(`
local now = tonumber(ARGV[1])
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return keys`)

// ListServers returns the list of server info.
func ( *RDB) () ([]*base.ServerInfo, error) {
	 := .clock.Now()
	,  := listServerKeysCmd.Run(context.Background(), .client, []string{base.AllServers}, .Unix()).Result()
	if  != nil {
		return nil, 
	}
	,  := cast.ToStringSliceE()
	if  != nil {
		return nil, 
	}
	var  []*base.ServerInfo
	for ,  := range  {
		,  := .client.Get(context.Background(), ).Result()
		if  != nil {
			continue // skip bad data
		}
		,  := base.DecodeServerInfo([]byte())
		if  != nil {
			continue // skip bad data
		}
		 = append(, )
	}
	return , nil
}

// Note: Script also removes stale keys.
var listWorkersCmd = redis.NewScript(`
local now = tonumber(ARGV[1])
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return keys`)

// ListWorkers returns the list of worker stats.
func ( *RDB) () ([]*base.WorkerInfo, error) {
	var  errors.Op = "rdb.ListWorkers"
	 := .clock.Now()
	,  := listWorkersCmd.Run(context.Background(), .client, []string{base.AllWorkers}, .Unix()).Result()
	if  != nil {
		return nil, errors.E(, errors.Unknown, )
	}
	,  := cast.ToStringSliceE()
	if  != nil {
		return nil, errors.E(, errors.Internal, fmt.Sprintf("unexpeced return value from Lua script: %v", ))
	}
	var  []*base.WorkerInfo
	for ,  := range  {
		,  := .client.HVals(context.Background(), ).Result()
		if  != nil {
			continue // skip bad data
		}
		for ,  := range  {
			,  := base.DecodeWorkerInfo([]byte())
			if  != nil {
				continue // skip bad data
			}
			 = append(, )
		}
	}
	return , nil
}

// Note: Script also removes stale keys.
var listSchedulerKeysCmd = redis.NewScript(`
local now = tonumber(ARGV[1])
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return keys`)

// ListSchedulerEntries returns the list of scheduler entries.
func ( *RDB) () ([]*base.SchedulerEntry, error) {
	 := .clock.Now()
	,  := listSchedulerKeysCmd.Run(context.Background(), .client, []string{base.AllSchedulers}, .Unix()).Result()
	if  != nil {
		return nil, 
	}
	,  := cast.ToStringSliceE()
	if  != nil {
		return nil, 
	}
	var  []*base.SchedulerEntry
	for ,  := range  {
		,  := .client.LRange(context.Background(), , 0, -1).Result()
		if  != nil {
			continue // skip bad data
		}
		for ,  := range  {
			,  := base.DecodeSchedulerEntry([]byte())
			if  != nil {
				continue // skip bad data
			}
			 = append(, )
		}
	}
	return , nil
}

// ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.
func ( *RDB) ( string,  Pagination) ([]*base.SchedulerEnqueueEvent, error) {
	 := base.SchedulerHistoryKey()
	,  := .client.ZRevRangeWithScores(context.Background(), , .start(), .stop()).Result()
	if  != nil {
		return nil, 
	}
	var  []*base.SchedulerEnqueueEvent
	for ,  := range  {
		,  := cast.ToStringE(.Member)
		if  != nil {
			return nil, 
		}
		,  := base.DecodeSchedulerEnqueueEvent([]byte())
		if  != nil {
			return nil, 
		}
		 = append(, )
	}
	return , nil
}

// Pause pauses processing of tasks from the given queue.
func ( *RDB) ( string) error {
	 := base.PausedKey()
	,  := .client.SetNX(context.Background(), , .clock.Now().Unix(), 0).Result()
	if  != nil {
		return 
	}
	if ! {
		return fmt.Errorf("queue %q is already paused", )
	}
	return nil
}

// Unpause resumes processing of tasks from the given queue.
func ( *RDB) ( string) error {
	 := base.PausedKey()
	,  := .client.Del(context.Background(), ).Result()
	if  != nil {
		return 
	}
	if  == 0 {
		return fmt.Errorf("queue %q is not paused", )
	}
	return nil
}

// ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
func ( *RDB) ( string) (int64, error) {
	 := base.PendingKey()
	return .client.ClusterKeySlot(context.Background(), ).Result()
}

// ClusterNodes returns a list of nodes the given queue belongs to.
func ( *RDB) ( string) ([]redis.ClusterNode, error) {
	,  := .ClusterKeySlot()
	if  != nil {
		return nil, 
	}
	,  := .client.ClusterSlots(context.Background()).Result()
	if  != nil {
		return nil, 
	}
	for ,  := range  {
		if int64(.Start) <=  &&  <= int64(.End) {
			return .Nodes, nil
		}
	}
	return nil, fmt.Errorf("nodes not found")
}