package asynq
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
)
type Inspector struct {
rdb *rdb .RDB
}
func NewInspector (r RedisConnOpt ) *Inspector {
c , ok := r .MakeRedisClient ().(redis .UniversalClient )
if !ok {
panic (fmt .Sprintf ("inspeq: unsupported RedisConnOpt type %T" , r ))
}
return &Inspector {
rdb : rdb .NewRDB (c ),
}
}
func (i *Inspector ) Close () error {
return i .rdb .Close ()
}
func (i *Inspector ) Queues () ([]string , error ) {
return i .rdb .AllQueues ()
}
func (i *Inspector ) Groups (queue string ) ([]*GroupInfo , error ) {
stats , err := i .rdb .GroupStats (queue )
if err != nil {
return nil , err
}
var res []*GroupInfo
for _ , s := range stats {
res = append (res , &GroupInfo {
Group : s .Group ,
Size : s .Size ,
})
}
return res , nil
}
type GroupInfo struct {
Group string
Size int
}
type QueueInfo struct {
Queue string
MemoryUsage int64
Latency time .Duration
Size int
Groups int
Pending int
Active int
Scheduled int
Retry int
Archived int
Completed int
Aggregating int
Processed int
Failed int
ProcessedTotal int
FailedTotal int
Paused bool
Timestamp time .Time
}
func (i *Inspector ) GetQueueInfo (queue string ) (*QueueInfo , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return nil , err
}
stats , err := i .rdb .CurrentStats (queue )
if err != nil {
return nil , err
}
return &QueueInfo {
Queue : stats .Queue ,
MemoryUsage : stats .MemoryUsage ,
Latency : stats .Latency ,
Size : stats .Size ,
Groups : stats .Groups ,
Pending : stats .Pending ,
Active : stats .Active ,
Scheduled : stats .Scheduled ,
Retry : stats .Retry ,
Archived : stats .Archived ,
Completed : stats .Completed ,
Aggregating : stats .Aggregating ,
Processed : stats .Processed ,
Failed : stats .Failed ,
ProcessedTotal : stats .ProcessedTotal ,
FailedTotal : stats .FailedTotal ,
Paused : stats .Paused ,
Timestamp : stats .Timestamp ,
}, nil
}
type DailyStats struct {
Queue string
Processed int
Failed int
Date time .Time
}
func (i *Inspector ) History (queue string , n int ) ([]*DailyStats , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return nil , err
}
stats , err := i .rdb .HistoricalStats (queue , n )
if err != nil {
return nil , err
}
var res []*DailyStats
for _ , s := range stats {
res = append (res , &DailyStats {
Queue : s .Queue ,
Processed : s .Processed ,
Failed : s .Failed ,
Date : s .Time ,
})
}
return res , nil
}
var (
ErrQueueNotFound = errors .New ("queue not found" )
ErrQueueNotEmpty = errors .New ("queue is not empty" )
ErrTaskNotFound = errors .New ("task not found" )
)
func (i *Inspector ) DeleteQueue (queue string , force bool ) error {
err := i .rdb .RemoveQueue (queue , force )
if errors .IsQueueNotFound (err ) {
return fmt .Errorf ("%w: queue=%q" , ErrQueueNotFound , queue )
}
if errors .IsQueueNotEmpty (err ) {
return fmt .Errorf ("%w: queue=%q" , ErrQueueNotEmpty , queue )
}
return err
}
func (i *Inspector ) GetTaskInfo (queue , id string ) (*TaskInfo , error ) {
info , err := i .rdb .GetTaskInfo (queue , id )
switch {
case errors .IsQueueNotFound (err ):
return nil , fmt .Errorf ("asynq: %w" , ErrQueueNotFound )
case errors .IsTaskNotFound (err ):
return nil , fmt .Errorf ("asynq: %w" , ErrTaskNotFound )
case err != nil :
return nil , fmt .Errorf ("asynq: %v" , err )
}
return newTaskInfo (info .Message , info .State , info .NextProcessAt , info .Result ), nil
}
type ListOption interface {}
type (
pageSizeOpt int
pageNumOpt int
)
type listOption struct {
pageSize int
pageNum int
}
const (
defaultPageSize = 30
defaultPageNum = 1
)
func composeListOptions(opts ...ListOption ) listOption {
res := listOption {
pageSize : defaultPageSize ,
pageNum : defaultPageNum ,
}
for _ , opt := range opts {
switch opt := opt .(type ) {
case pageSizeOpt :
res .pageSize = int (opt )
case pageNumOpt :
res .pageNum = int (opt )
default :
}
}
return res
}
func PageSize (n int ) ListOption {
if n < 0 {
n = 0
}
return pageSizeOpt (n )
}
func Page (n int ) ListOption {
if n < 0 {
n = 1
}
return pageNumOpt (n )
}
func (i *Inspector ) ListPendingTasks (queue string , opts ...ListOption ) ([]*TaskInfo , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return nil , fmt .Errorf ("asynq: %v" , err )
}
opt := composeListOptions (opts ...)
pgn := rdb .Pagination {Size : opt .pageSize , Page : opt .pageNum - 1 }
infos , err := i .rdb .ListPending (queue , pgn )
switch {
case errors .IsQueueNotFound (err ):
return nil , fmt .Errorf ("asynq: %w" , ErrQueueNotFound )
case err != nil :
return nil , fmt .Errorf ("asynq: %v" , err )
}
var tasks []*TaskInfo
for _ , i := range infos {
tasks = append (tasks , newTaskInfo (
i .Message ,
i .State ,
i .NextProcessAt ,
i .Result ,
))
}
return tasks , err
}
func (i *Inspector ) ListActiveTasks (queue string , opts ...ListOption ) ([]*TaskInfo , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return nil , fmt .Errorf ("asynq: %v" , err )
}
opt := composeListOptions (opts ...)
pgn := rdb .Pagination {Size : opt .pageSize , Page : opt .pageNum - 1 }
infos , err := i .rdb .ListActive (queue , pgn )
switch {
case errors .IsQueueNotFound (err ):
return nil , fmt .Errorf ("asynq: %w" , ErrQueueNotFound )
case err != nil :
return nil , fmt .Errorf ("asynq: %v" , err )
}
expired , err := i .rdb .ListLeaseExpired (time .Now (), queue )
if err != nil {
return nil , fmt .Errorf ("asynq: %v" , err )
}
expiredSet := make (map [string ]struct {})
for _ , msg := range expired {
expiredSet [msg .ID ] = struct {}{}
}
var tasks []*TaskInfo
for _ , i := range infos {
t := newTaskInfo (
i .Message ,
i .State ,
i .NextProcessAt ,
i .Result ,
)
if _ , ok := expiredSet [i .Message .ID ]; ok {
t .IsOrphaned = true
}
tasks = append (tasks , t )
}
return tasks , nil
}
func (i *Inspector ) ListAggregatingTasks (queue , group string , opts ...ListOption ) ([]*TaskInfo , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return nil , fmt .Errorf ("asynq: %v" , err )
}
opt := composeListOptions (opts ...)
pgn := rdb .Pagination {Size : opt .pageSize , Page : opt .pageNum - 1 }
infos , err := i .rdb .ListAggregating (queue , group , pgn )
switch {
case errors .IsQueueNotFound (err ):
return nil , fmt .Errorf ("asynq: %w" , ErrQueueNotFound )
case err != nil :
return nil , fmt .Errorf ("asynq: %v" , err )
}
var tasks []*TaskInfo
for _ , i := range infos {
tasks = append (tasks , newTaskInfo (
i .Message ,
i .State ,
i .NextProcessAt ,
i .Result ,
))
}
return tasks , nil
}
func (i *Inspector ) ListScheduledTasks (queue string , opts ...ListOption ) ([]*TaskInfo , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return nil , fmt .Errorf ("asynq: %v" , err )
}
opt := composeListOptions (opts ...)
pgn := rdb .Pagination {Size : opt .pageSize , Page : opt .pageNum - 1 }
infos , err := i .rdb .ListScheduled (queue , pgn )
switch {
case errors .IsQueueNotFound (err ):
return nil , fmt .Errorf ("asynq: %w" , ErrQueueNotFound )
case err != nil :
return nil , fmt .Errorf ("asynq: %v" , err )
}
var tasks []*TaskInfo
for _ , i := range infos {
tasks = append (tasks , newTaskInfo (
i .Message ,
i .State ,
i .NextProcessAt ,
i .Result ,
))
}
return tasks , nil
}
func (i *Inspector ) ListRetryTasks (queue string , opts ...ListOption ) ([]*TaskInfo , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return nil , fmt .Errorf ("asynq: %v" , err )
}
opt := composeListOptions (opts ...)
pgn := rdb .Pagination {Size : opt .pageSize , Page : opt .pageNum - 1 }
infos , err := i .rdb .ListRetry (queue , pgn )
switch {
case errors .IsQueueNotFound (err ):
return nil , fmt .Errorf ("asynq: %w" , ErrQueueNotFound )
case err != nil :
return nil , fmt .Errorf ("asynq: %v" , err )
}
var tasks []*TaskInfo
for _ , i := range infos {
tasks = append (tasks , newTaskInfo (
i .Message ,
i .State ,
i .NextProcessAt ,
i .Result ,
))
}
return tasks , nil
}
func (i *Inspector ) ListArchivedTasks (queue string , opts ...ListOption ) ([]*TaskInfo , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return nil , fmt .Errorf ("asynq: %v" , err )
}
opt := composeListOptions (opts ...)
pgn := rdb .Pagination {Size : opt .pageSize , Page : opt .pageNum - 1 }
infos , err := i .rdb .ListArchived (queue , pgn )
switch {
case errors .IsQueueNotFound (err ):
return nil , fmt .Errorf ("asynq: %w" , ErrQueueNotFound )
case err != nil :
return nil , fmt .Errorf ("asynq: %v" , err )
}
var tasks []*TaskInfo
for _ , i := range infos {
tasks = append (tasks , newTaskInfo (
i .Message ,
i .State ,
i .NextProcessAt ,
i .Result ,
))
}
return tasks , nil
}
func (i *Inspector ) ListCompletedTasks (queue string , opts ...ListOption ) ([]*TaskInfo , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return nil , fmt .Errorf ("asynq: %v" , err )
}
opt := composeListOptions (opts ...)
pgn := rdb .Pagination {Size : opt .pageSize , Page : opt .pageNum - 1 }
infos , err := i .rdb .ListCompleted (queue , pgn )
switch {
case errors .IsQueueNotFound (err ):
return nil , fmt .Errorf ("asynq: %w" , ErrQueueNotFound )
case err != nil :
return nil , fmt .Errorf ("asynq: %v" , err )
}
var tasks []*TaskInfo
for _ , i := range infos {
tasks = append (tasks , newTaskInfo (
i .Message ,
i .State ,
i .NextProcessAt ,
i .Result ,
))
}
return tasks , nil
}
func (i *Inspector ) DeleteAllPendingTasks (queue string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .DeleteAllPendingTasks (queue )
return int (n ), err
}
func (i *Inspector ) DeleteAllScheduledTasks (queue string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .DeleteAllScheduledTasks (queue )
return int (n ), err
}
func (i *Inspector ) DeleteAllRetryTasks (queue string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .DeleteAllRetryTasks (queue )
return int (n ), err
}
func (i *Inspector ) DeleteAllArchivedTasks (queue string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .DeleteAllArchivedTasks (queue )
return int (n ), err
}
func (i *Inspector ) DeleteAllCompletedTasks (queue string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .DeleteAllCompletedTasks (queue )
return int (n ), err
}
func (i *Inspector ) DeleteAllAggregatingTasks (queue , group string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .DeleteAllAggregatingTasks (queue , group )
return int (n ), err
}
func (i *Inspector ) DeleteTask (queue , id string ) error {
if err := base .ValidateQueueName (queue ); err != nil {
return fmt .Errorf ("asynq: %v" , err )
}
err := i .rdb .DeleteTask (queue , id )
switch {
case errors .IsQueueNotFound (err ):
return fmt .Errorf ("asynq: %w" , ErrQueueNotFound )
case errors .IsTaskNotFound (err ):
return fmt .Errorf ("asynq: %w" , ErrTaskNotFound )
case err != nil :
return fmt .Errorf ("asynq: %v" , err )
}
return nil
}
func (i *Inspector ) RunAllScheduledTasks (queue string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .RunAllScheduledTasks (queue )
return int (n ), err
}
func (i *Inspector ) RunAllRetryTasks (queue string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .RunAllRetryTasks (queue )
return int (n ), err
}
func (i *Inspector ) RunAllArchivedTasks (queue string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .RunAllArchivedTasks (queue )
return int (n ), err
}
func (i *Inspector ) RunAllAggregatingTasks (queue , group string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .RunAllAggregatingTasks (queue , group )
return int (n ), err
}
func (i *Inspector ) RunTask (queue , id string ) error {
if err := base .ValidateQueueName (queue ); err != nil {
return fmt .Errorf ("asynq: %v" , err )
}
err := i .rdb .RunTask (queue , id )
switch {
case errors .IsQueueNotFound (err ):
return fmt .Errorf ("asynq: %w" , ErrQueueNotFound )
case errors .IsTaskNotFound (err ):
return fmt .Errorf ("asynq: %w" , ErrTaskNotFound )
case err != nil :
return fmt .Errorf ("asynq: %v" , err )
}
return nil
}
func (i *Inspector ) ArchiveAllPendingTasks (queue string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .ArchiveAllPendingTasks (queue )
return int (n ), err
}
func (i *Inspector ) ArchiveAllScheduledTasks (queue string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .ArchiveAllScheduledTasks (queue )
return int (n ), err
}
func (i *Inspector ) ArchiveAllRetryTasks (queue string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .ArchiveAllRetryTasks (queue )
return int (n ), err
}
func (i *Inspector ) ArchiveAllAggregatingTasks (queue , group string ) (int , error ) {
if err := base .ValidateQueueName (queue ); err != nil {
return 0 , err
}
n , err := i .rdb .ArchiveAllAggregatingTasks (queue , group )
return int (n ), err
}
func (i *Inspector ) ArchiveTask (queue , id string ) error {
if err := base .ValidateQueueName (queue ); err != nil {
return fmt .Errorf ("asynq: err" )
}
err := i .rdb .ArchiveTask (queue , id )
switch {
case errors .IsQueueNotFound (err ):
return fmt .Errorf ("asynq: %w" , ErrQueueNotFound )
case errors .IsTaskNotFound (err ):
return fmt .Errorf ("asynq: %w" , ErrTaskNotFound )
case err != nil :
return fmt .Errorf ("asynq: %v" , err )
}
return nil
}
func (i *Inspector ) CancelProcessing (id string ) error {
return i .rdb .PublishCancelation (id )
}
func (i *Inspector ) PauseQueue (queue string ) error {
if err := base .ValidateQueueName (queue ); err != nil {
return err
}
return i .rdb .Pause (queue )
}
func (i *Inspector ) UnpauseQueue (queue string ) error {
if err := base .ValidateQueueName (queue ); err != nil {
return err
}
return i .rdb .Unpause (queue )
}
func (i *Inspector ) Servers () ([]*ServerInfo , error ) {
servers , err := i .rdb .ListServers ()
if err != nil {
return nil , err
}
workers , err := i .rdb .ListWorkers ()
if err != nil {
return nil , err
}
m := make (map [string ]*ServerInfo )
for _ , s := range servers {
m [s .ServerID ] = &ServerInfo {
ID : s .ServerID ,
Host : s .Host ,
PID : s .PID ,
Concurrency : s .Concurrency ,
Queues : s .Queues ,
StrictPriority : s .StrictPriority ,
Started : s .Started ,
Status : s .Status ,
ActiveWorkers : make ([]*WorkerInfo , 0 ),
}
}
for _ , w := range workers {
srvInfo , ok := m [w .ServerID ]
if !ok {
continue
}
wrkInfo := &WorkerInfo {
TaskID : w .ID ,
TaskType : w .Type ,
TaskPayload : w .Payload ,
Queue : w .Queue ,
Started : w .Started ,
Deadline : w .Deadline ,
}
srvInfo .ActiveWorkers = append (srvInfo .ActiveWorkers , wrkInfo )
}
var out []*ServerInfo
for _ , srvInfo := range m {
out = append (out , srvInfo )
}
return out , nil
}
type ServerInfo struct {
ID string
Host string
PID int
Concurrency int
Queues map [string ]int
StrictPriority bool
Started time .Time
Status string
ActiveWorkers []*WorkerInfo
}
type WorkerInfo struct {
TaskID string
TaskType string
TaskPayload []byte
Queue string
Started time .Time
Deadline time .Time
}
func (i *Inspector ) ClusterKeySlot (queue string ) (int64 , error ) {
return i .rdb .ClusterKeySlot (queue )
}
type ClusterNode struct {
ID string
Addr string
}
func (i *Inspector ) ClusterNodes (queue string ) ([]*ClusterNode , error ) {
nodes , err := i .rdb .ClusterNodes (queue )
if err != nil {
return nil , err
}
var res []*ClusterNode
for _ , node := range nodes {
res = append (res , &ClusterNode {ID : node .ID , Addr : node .Addr })
}
return res , nil
}
type SchedulerEntry struct {
ID string
Spec string
Task *Task
Opts []Option
Next time .Time
Prev time .Time
}
func (i *Inspector ) SchedulerEntries () ([]*SchedulerEntry , error ) {
var entries []*SchedulerEntry
res , err := i .rdb .ListSchedulerEntries ()
if err != nil {
return nil , err
}
for _ , e := range res {
task := NewTask (e .Type , e .Payload )
var opts []Option
for _ , s := range e .Opts {
if o , err := parseOption (s ); err == nil {
opts = append (opts , o )
}
}
entries = append (entries , &SchedulerEntry {
ID : e .ID ,
Spec : e .Spec ,
Task : task ,
Opts : opts ,
Next : e .Next ,
Prev : e .Prev ,
})
}
return entries , nil
}
func parseOption(s string ) (Option , error ) {
fn , arg := parseOptionFunc (s ), parseOptionArg (s )
switch fn {
case "Queue" :
queue , err := strconv .Unquote (arg )
if err != nil {
return nil , err
}
return Queue (queue ), nil
case "MaxRetry" :
n , err := strconv .Atoi (arg )
if err != nil {
return nil , err
}
return MaxRetry (n ), nil
case "Timeout" :
d , err := time .ParseDuration (arg )
if err != nil {
return nil , err
}
return Timeout (d ), nil
case "Deadline" :
t , err := time .Parse (time .UnixDate , arg )
if err != nil {
return nil , err
}
return Deadline (t ), nil
case "Unique" :
d , err := time .ParseDuration (arg )
if err != nil {
return nil , err
}
return Unique (d ), nil
case "ProcessAt" :
t , err := time .Parse (time .UnixDate , arg )
if err != nil {
return nil , err
}
return ProcessAt (t ), nil
case "ProcessIn" :
d , err := time .ParseDuration (arg )
if err != nil {
return nil , err
}
return ProcessIn (d ), nil
case "Retention" :
d , err := time .ParseDuration (arg )
if err != nil {
return nil , err
}
return Retention (d ), nil
default :
return nil , fmt .Errorf ("cannot not parse option string %q" , s )
}
}
func parseOptionFunc(s string ) string {
i := strings .Index (s , "(" )
return s [:i ]
}
func parseOptionArg(s string ) string {
i := strings .Index (s , "(" )
if i >= 0 {
j := strings .Index (s , ")" )
if j > i {
return s [i +1 : j ]
}
}
return ""
}
type SchedulerEnqueueEvent struct {
TaskID string
EnqueuedAt time .Time
}
func (i *Inspector ) ListSchedulerEnqueueEvents (entryID string , opts ...ListOption ) ([]*SchedulerEnqueueEvent , error ) {
opt := composeListOptions (opts ...)
pgn := rdb .Pagination {Size : opt .pageSize , Page : opt .pageNum - 1 }
data , err := i .rdb .ListSchedulerEnqueueEvents (entryID , pgn )
if err != nil {
return nil , err
}
var events []*SchedulerEnqueueEvent
for _ , e := range data {
events = append (events , &SchedulerEnqueueEvent {TaskID : e .TaskID , EnqueuedAt : e .EnqueuedAt })
}
return events , nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .