// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package asynq

import (
	
	
	
	

	
	
	
	
)

// Inspector is a client interface to inspect and mutate the state of
// queues and tasks.
type Inspector struct {
	rdb *rdb.RDB
}

// New returns a new instance of Inspector.
func ( RedisConnOpt) *Inspector {
	,  := .MakeRedisClient().(redis.UniversalClient)
	if ! {
		panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", ))
	}
	return &Inspector{
		rdb: rdb.NewRDB(),
	}
}

// Close closes the connection with redis.
func ( *Inspector) () error {
	return .rdb.Close()
}

// Queues returns a list of all queue names.
func ( *Inspector) () ([]string, error) {
	return .rdb.AllQueues()
}

// Groups returns a list of all groups within the given queue.
func ( *Inspector) ( string) ([]*GroupInfo, error) {
	,  := .rdb.GroupStats()
	if  != nil {
		return nil, 
	}
	var  []*GroupInfo
	for ,  := range  {
		 = append(, &GroupInfo{
			Group: .Group,
			Size:  .Size,
		})
	}
	return , nil
}

// GroupInfo represents a state of a group at a certain time.
type GroupInfo struct {
	// Name of the group.
	Group string

	// Size is the total number of tasks in the group.
	Size int
}

// QueueInfo represents a state of a queue at a certain time.
type QueueInfo struct {
	// Name of the queue.
	Queue string

	// Total number of bytes that the queue and its tasks require to be stored in redis.
	// It is an approximate memory usage value in bytes since the value is computed by sampling.
	MemoryUsage int64

	// Latency of the queue, measured by the oldest pending task in the queue.
	Latency time.Duration

	// Size is the total number of tasks in the queue.
	// The value is the sum of Pending, Active, Scheduled, Retry, Aggregating and Archived.
	Size int

	// Groups is the total number of groups in the queue.
	Groups int

	// Number of pending tasks.
	Pending int
	// Number of active tasks.
	Active int
	// Number of scheduled tasks.
	Scheduled int
	// Number of retry tasks.
	Retry int
	// Number of archived tasks.
	Archived int
	// Number of stored completed tasks.
	Completed int
	// Number of aggregating tasks.
	Aggregating int

	// Total number of tasks being processed within the given date (counter resets daily).
	// The number includes both succeeded and failed tasks.
	Processed int
	// Total number of tasks failed to be processed within the given date (counter resets daily).
	Failed int

	// Total number of tasks processed (cumulative).
	ProcessedTotal int
	// Total number of tasks failed (cumulative).
	FailedTotal int

	// Paused indicates whether the queue is paused.
	// If true, tasks in the queue will not be processed.
	Paused bool

	// Time when this queue info snapshot was taken.
	Timestamp time.Time
}

// GetQueueInfo returns current information of the given queue.
func ( *Inspector) ( string) (*QueueInfo, error) {
	if  := base.ValidateQueueName();  != nil {
		return nil, 
	}
	,  := .rdb.CurrentStats()
	if  != nil {
		return nil, 
	}
	return &QueueInfo{
		Queue:          .Queue,
		MemoryUsage:    .MemoryUsage,
		Latency:        .Latency,
		Size:           .Size,
		Groups:         .Groups,
		Pending:        .Pending,
		Active:         .Active,
		Scheduled:      .Scheduled,
		Retry:          .Retry,
		Archived:       .Archived,
		Completed:      .Completed,
		Aggregating:    .Aggregating,
		Processed:      .Processed,
		Failed:         .Failed,
		ProcessedTotal: .ProcessedTotal,
		FailedTotal:    .FailedTotal,
		Paused:         .Paused,
		Timestamp:      .Timestamp,
	}, nil
}

// DailyStats holds aggregate data for a given day for a given queue.
type DailyStats struct {
	// Name of the queue.
	Queue string
	// Total number of tasks being processed during the given date.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Total number of tasks failed to be processed during the given date.
	Failed int
	// Date this stats was taken.
	Date time.Time
}

// History returns a list of stats from the last n days.
func ( *Inspector) ( string,  int) ([]*DailyStats, error) {
	if  := base.ValidateQueueName();  != nil {
		return nil, 
	}
	,  := .rdb.HistoricalStats(, )
	if  != nil {
		return nil, 
	}
	var  []*DailyStats
	for ,  := range  {
		 = append(, &DailyStats{
			Queue:     .Queue,
			Processed: .Processed,
			Failed:    .Failed,
			Date:      .Time,
		})
	}
	return , nil
}

var (
	// ErrQueueNotFound indicates that the specified queue does not exist.
	ErrQueueNotFound = errors.New("queue not found")

	// ErrQueueNotEmpty indicates that the specified queue is not empty.
	ErrQueueNotEmpty = errors.New("queue is not empty")

	// ErrTaskNotFound indicates that the specified task cannot be found in the queue.
	ErrTaskNotFound = errors.New("task not found")
)

// DeleteQueue removes the specified queue.
//
// If force is set to true, DeleteQueue will remove the queue regardless of
// the queue size as long as no tasks are active in the queue.
// If force is set to false, DeleteQueue will remove the queue only if
// the queue is empty.
//
// If the specified queue does not exist, DeleteQueue returns ErrQueueNotFound.
// If force is set to false and the specified queue is not empty, DeleteQueue
// returns ErrQueueNotEmpty.
func ( *Inspector) ( string,  bool) error {
	 := .rdb.RemoveQueue(, )
	if errors.IsQueueNotFound() {
		return fmt.Errorf("%w: queue=%q", ErrQueueNotFound, )
	}
	if errors.IsQueueNotEmpty() {
		return fmt.Errorf("%w: queue=%q", ErrQueueNotEmpty, )
	}
	return 
}

// GetTaskInfo retrieves task information given a task id and queue name.
//
// Returns an error wrapping ErrQueueNotFound if a queue with the given name doesn't exist.
// Returns an error wrapping ErrTaskNotFound if a task with the given id doesn't exist in the queue.
func ( *Inspector) (,  string) (*TaskInfo, error) {
	,  := .rdb.GetTaskInfo(, )
	switch {
	case errors.IsQueueNotFound():
		return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
	case errors.IsTaskNotFound():
		return nil, fmt.Errorf("asynq: %w", ErrTaskNotFound)
	case  != nil:
		return nil, fmt.Errorf("asynq: %v", )
	}
	return newTaskInfo(.Message, .State, .NextProcessAt, .Result), nil
}

// ListOption specifies behavior of list operation.
type ListOption interface{}

// Internal list option representations.
type (
	pageSizeOpt int
	pageNumOpt  int
)

type listOption struct {
	pageSize int
	pageNum  int
}

const (
	// Page size used by default in list operation.
	defaultPageSize = 30

	// Page number used by default in list operation.
	defaultPageNum = 1
)

func composeListOptions( ...ListOption) listOption {
	 := listOption{
		pageSize: defaultPageSize,
		pageNum:  defaultPageNum,
	}
	for ,  := range  {
		switch opt := .(type) {
		case pageSizeOpt:
			.pageSize = int()
		case pageNumOpt:
			.pageNum = int()
		default:
			// ignore unexpected option
		}
	}
	return 
}

// PageSize returns an option to specify the page size for list operation.
//
// Negative page size is treated as zero.
func ( int) ListOption {
	if  < 0 {
		 = 0
	}
	return pageSizeOpt()
}

// Page returns an option to specify the page number for list operation.
// The value 1 fetches the first page.
//
// Negative page number is treated as one.
func ( int) ListOption {
	if  < 0 {
		 = 1
	}
	return pageNumOpt()
}

// ListPendingTasks retrieves pending tasks from the specified queue.
//
// By default, it retrieves the first 30 tasks.
func ( *Inspector) ( string,  ...ListOption) ([]*TaskInfo, error) {
	if  := base.ValidateQueueName();  != nil {
		return nil, fmt.Errorf("asynq: %v", )
	}
	 := composeListOptions(...)
	 := rdb.Pagination{Size: .pageSize, Page: .pageNum - 1}
	,  := .rdb.ListPending(, )
	switch {
	case errors.IsQueueNotFound():
		return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
	case  != nil:
		return nil, fmt.Errorf("asynq: %v", )
	}
	var  []*TaskInfo
	for ,  := range  {
		 = append(, newTaskInfo(
			.Message,
			.State,
			.NextProcessAt,
			.Result,
		))
	}
	return , 
}

// ListActiveTasks retrieves active tasks from the specified queue.
//
// By default, it retrieves the first 30 tasks.
func ( *Inspector) ( string,  ...ListOption) ([]*TaskInfo, error) {
	if  := base.ValidateQueueName();  != nil {
		return nil, fmt.Errorf("asynq: %v", )
	}
	 := composeListOptions(...)
	 := rdb.Pagination{Size: .pageSize, Page: .pageNum - 1}
	,  := .rdb.ListActive(, )
	switch {
	case errors.IsQueueNotFound():
		return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
	case  != nil:
		return nil, fmt.Errorf("asynq: %v", )
	}
	,  := .rdb.ListLeaseExpired(time.Now(), )
	if  != nil {
		return nil, fmt.Errorf("asynq: %v", )
	}
	 := make(map[string]struct{}) // set of expired message IDs
	for ,  := range  {
		[.ID] = struct{}{}
	}
	var  []*TaskInfo
	for ,  := range  {
		 := newTaskInfo(
			.Message,
			.State,
			.NextProcessAt,
			.Result,
		)
		if ,  := [.Message.ID];  {
			.IsOrphaned = true
		}
		 = append(, )
	}
	return , nil
}

// ListAggregatingTasks retrieves scheduled tasks from the specified group.
//
// By default, it retrieves the first 30 tasks.
func ( *Inspector) (,  string,  ...ListOption) ([]*TaskInfo, error) {
	if  := base.ValidateQueueName();  != nil {
		return nil, fmt.Errorf("asynq: %v", )
	}
	 := composeListOptions(...)
	 := rdb.Pagination{Size: .pageSize, Page: .pageNum - 1}
	,  := .rdb.ListAggregating(, , )
	switch {
	case errors.IsQueueNotFound():
		return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
	case  != nil:
		return nil, fmt.Errorf("asynq: %v", )
	}
	var  []*TaskInfo
	for ,  := range  {
		 = append(, newTaskInfo(
			.Message,
			.State,
			.NextProcessAt,
			.Result,
		))
	}
	return , nil
}

// ListScheduledTasks retrieves scheduled tasks from the specified queue.
// Tasks are sorted by NextProcessAt in ascending order.
//
// By default, it retrieves the first 30 tasks.
func ( *Inspector) ( string,  ...ListOption) ([]*TaskInfo, error) {
	if  := base.ValidateQueueName();  != nil {
		return nil, fmt.Errorf("asynq: %v", )
	}
	 := composeListOptions(...)
	 := rdb.Pagination{Size: .pageSize, Page: .pageNum - 1}
	,  := .rdb.ListScheduled(, )
	switch {
	case errors.IsQueueNotFound():
		return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
	case  != nil:
		return nil, fmt.Errorf("asynq: %v", )
	}
	var  []*TaskInfo
	for ,  := range  {
		 = append(, newTaskInfo(
			.Message,
			.State,
			.NextProcessAt,
			.Result,
		))
	}
	return , nil
}

// ListRetryTasks retrieves retry tasks from the specified queue.
// Tasks are sorted by NextProcessAt in ascending order.
//
// By default, it retrieves the first 30 tasks.
func ( *Inspector) ( string,  ...ListOption) ([]*TaskInfo, error) {
	if  := base.ValidateQueueName();  != nil {
		return nil, fmt.Errorf("asynq: %v", )
	}
	 := composeListOptions(...)
	 := rdb.Pagination{Size: .pageSize, Page: .pageNum - 1}
	,  := .rdb.ListRetry(, )
	switch {
	case errors.IsQueueNotFound():
		return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
	case  != nil:
		return nil, fmt.Errorf("asynq: %v", )
	}
	var  []*TaskInfo
	for ,  := range  {
		 = append(, newTaskInfo(
			.Message,
			.State,
			.NextProcessAt,
			.Result,
		))
	}
	return , nil
}

// ListArchivedTasks retrieves archived tasks from the specified queue.
// Tasks are sorted by LastFailedAt in descending order.
//
// By default, it retrieves the first 30 tasks.
func ( *Inspector) ( string,  ...ListOption) ([]*TaskInfo, error) {
	if  := base.ValidateQueueName();  != nil {
		return nil, fmt.Errorf("asynq: %v", )
	}
	 := composeListOptions(...)
	 := rdb.Pagination{Size: .pageSize, Page: .pageNum - 1}
	,  := .rdb.ListArchived(, )
	switch {
	case errors.IsQueueNotFound():
		return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
	case  != nil:
		return nil, fmt.Errorf("asynq: %v", )
	}
	var  []*TaskInfo
	for ,  := range  {
		 = append(, newTaskInfo(
			.Message,
			.State,
			.NextProcessAt,
			.Result,
		))
	}
	return , nil
}

// ListCompletedTasks retrieves completed tasks from the specified queue.
// Tasks are sorted by expiration time (i.e. CompletedAt + Retention) in descending order.
//
// By default, it retrieves the first 30 tasks.
func ( *Inspector) ( string,  ...ListOption) ([]*TaskInfo, error) {
	if  := base.ValidateQueueName();  != nil {
		return nil, fmt.Errorf("asynq: %v", )
	}
	 := composeListOptions(...)
	 := rdb.Pagination{Size: .pageSize, Page: .pageNum - 1}
	,  := .rdb.ListCompleted(, )
	switch {
	case errors.IsQueueNotFound():
		return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
	case  != nil:
		return nil, fmt.Errorf("asynq: %v", )
	}
	var  []*TaskInfo
	for ,  := range  {
		 = append(, newTaskInfo(
			.Message,
			.State,
			.NextProcessAt,
			.Result,
		))
	}
	return , nil
}

// DeleteAllPendingTasks deletes all pending tasks from the specified queue,
// and reports the number tasks deleted.
func ( *Inspector) ( string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.DeleteAllPendingTasks()
	return int(), 
}

// DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue,
// and reports the number tasks deleted.
func ( *Inspector) ( string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.DeleteAllScheduledTasks()
	return int(), 
}

// DeleteAllRetryTasks deletes all retry tasks from the specified queue,
// and reports the number tasks deleted.
func ( *Inspector) ( string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.DeleteAllRetryTasks()
	return int(), 
}

// DeleteAllArchivedTasks deletes all archived tasks from the specified queue,
// and reports the number tasks deleted.
func ( *Inspector) ( string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.DeleteAllArchivedTasks()
	return int(), 
}

// DeleteAllCompletedTasks deletes all completed tasks from the specified queue,
// and reports the number tasks deleted.
func ( *Inspector) ( string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.DeleteAllCompletedTasks()
	return int(), 
}

// DeleteAllAggregatingTasks deletes all tasks from the specified group,
// and reports the number of tasks deleted.
func ( *Inspector) (,  string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.DeleteAllAggregatingTasks(, )
	return int(), 
}

// DeleteTask deletes a task with the given id from the given queue.
// The task needs to be in pending, scheduled, retry, or archived state,
// otherwise DeleteTask will return an error.
//
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
// If the task is in active state, it returns a non-nil error.
func ( *Inspector) (,  string) error {
	if  := base.ValidateQueueName();  != nil {
		return fmt.Errorf("asynq: %v", )
	}
	 := .rdb.DeleteTask(, )
	switch {
	case errors.IsQueueNotFound():
		return fmt.Errorf("asynq: %w", ErrQueueNotFound)
	case errors.IsTaskNotFound():
		return fmt.Errorf("asynq: %w", ErrTaskNotFound)
	case  != nil:
		return fmt.Errorf("asynq: %v", )
	}
	return nil

}

// RunAllScheduledTasks schedules all scheduled tasks from the given queue to run,
// and reports the number of tasks scheduled to run.
func ( *Inspector) ( string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.RunAllScheduledTasks()
	return int(), 
}

// RunAllRetryTasks schedules all retry tasks from the given queue to run,
// and reports the number of tasks scheduled to run.
func ( *Inspector) ( string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.RunAllRetryTasks()
	return int(), 
}

// RunAllArchivedTasks schedules all archived tasks from the given queue to run,
// and reports the number of tasks scheduled to run.
func ( *Inspector) ( string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.RunAllArchivedTasks()
	return int(), 
}

// RunAllAggregatingTasks schedules all tasks from the given grou to run.
// and reports the number of tasks scheduled to run.
func ( *Inspector) (,  string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.RunAllAggregatingTasks(, )
	return int(), 
}

// RunTask updates the task to pending state given a queue name and task id.
// The task needs to be in scheduled, retry, or archived state, otherwise RunTask
// will return an error.
//
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
// If the task is in pending or active state, it returns a non-nil error.
func ( *Inspector) (,  string) error {
	if  := base.ValidateQueueName();  != nil {
		return fmt.Errorf("asynq: %v", )
	}
	 := .rdb.RunTask(, )
	switch {
	case errors.IsQueueNotFound():
		return fmt.Errorf("asynq: %w", ErrQueueNotFound)
	case errors.IsTaskNotFound():
		return fmt.Errorf("asynq: %w", ErrTaskNotFound)
	case  != nil:
		return fmt.Errorf("asynq: %v", )
	}
	return nil
}

// ArchiveAllPendingTasks archives all pending tasks from the given queue,
// and reports the number of tasks archived.
func ( *Inspector) ( string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.ArchiveAllPendingTasks()
	return int(), 
}

// ArchiveAllScheduledTasks archives all scheduled tasks from the given queue,
// and reports the number of tasks archiveed.
func ( *Inspector) ( string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.ArchiveAllScheduledTasks()
	return int(), 
}

// ArchiveAllRetryTasks archives all retry tasks from the given queue,
// and reports the number of tasks archiveed.
func ( *Inspector) ( string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.ArchiveAllRetryTasks()
	return int(), 
}

// ArchiveAllAggregatingTasks archives all tasks from the given group,
// and reports the number of tasks archived.
func ( *Inspector) (,  string) (int, error) {
	if  := base.ValidateQueueName();  != nil {
		return 0, 
	}
	,  := .rdb.ArchiveAllAggregatingTasks(, )
	return int(), 
}

// ArchiveTask archives a task with the given id in the given queue.
// The task needs to be in pending, scheduled, or retry state, otherwise ArchiveTask
// will return an error.
//
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
// If the task is in already archived, it returns a non-nil error.
func ( *Inspector) (,  string) error {
	if  := base.ValidateQueueName();  != nil {
		return fmt.Errorf("asynq: err")
	}
	 := .rdb.ArchiveTask(, )
	switch {
	case errors.IsQueueNotFound():
		return fmt.Errorf("asynq: %w", ErrQueueNotFound)
	case errors.IsTaskNotFound():
		return fmt.Errorf("asynq: %w", ErrTaskNotFound)
	case  != nil:
		return fmt.Errorf("asynq: %v", )
	}
	return nil
}

// CancelProcessing sends a signal to cancel processing of the task
// given a task id. CancelProcessing is best-effort, which means that it does not
// guarantee that the task with the given id will be canceled. The return
// value only indicates whether the cancelation signal has been sent.
func ( *Inspector) ( string) error {
	return .rdb.PublishCancelation()
}

// PauseQueue pauses task processing on the specified queue.
// If the queue is already paused, it will return a non-nil error.
func ( *Inspector) ( string) error {
	if  := base.ValidateQueueName();  != nil {
		return 
	}
	return .rdb.Pause()
}

// UnpauseQueue resumes task processing on the specified queue.
// If the queue is not paused, it will return a non-nil error.
func ( *Inspector) ( string) error {
	if  := base.ValidateQueueName();  != nil {
		return 
	}
	return .rdb.Unpause()
}

// Servers return a list of running servers' information.
func ( *Inspector) () ([]*ServerInfo, error) {
	,  := .rdb.ListServers()
	if  != nil {
		return nil, 
	}
	,  := .rdb.ListWorkers()
	if  != nil {
		return nil, 
	}
	 := make(map[string]*ServerInfo) // ServerInfo keyed by serverID
	for ,  := range  {
		[.ServerID] = &ServerInfo{
			ID:             .ServerID,
			Host:           .Host,
			PID:            .PID,
			Concurrency:    .Concurrency,
			Queues:         .Queues,
			StrictPriority: .StrictPriority,
			Started:        .Started,
			Status:         .Status,
			ActiveWorkers:  make([]*WorkerInfo, 0),
		}
	}
	for ,  := range  {
		,  := [.ServerID]
		if ! {
			continue
		}
		 := &WorkerInfo{
			TaskID:      .ID,
			TaskType:    .Type,
			TaskPayload: .Payload,
			Queue:       .Queue,
			Started:     .Started,
			Deadline:    .Deadline,
		}
		.ActiveWorkers = append(.ActiveWorkers, )
	}
	var  []*ServerInfo
	for ,  := range  {
		 = append(, )
	}
	return , nil
}

// ServerInfo describes a running Server instance.
type ServerInfo struct {
	// Unique Identifier for the server.
	ID string
	// Host machine on which the server is running.
	Host string
	// PID of the process in which the server is running.
	PID int

	// Server configuration details.
	// See Config doc for field descriptions.
	Concurrency    int
	Queues         map[string]int
	StrictPriority bool

	// Time the server started.
	Started time.Time
	// Status indicates the status of the server.
	// TODO: Update comment with more details.
	Status string
	// A List of active workers currently processing tasks.
	ActiveWorkers []*WorkerInfo
}

// WorkerInfo describes a running worker processing a task.
type WorkerInfo struct {
	// ID of the task the worker is processing.
	TaskID string
	// Type of the task the worker is processing.
	TaskType string
	// Payload of the task the worker is processing.
	TaskPayload []byte
	// Queue from which the worker got its task.
	Queue string
	// Time the worker started processing the task.
	Started time.Time
	// Time the worker needs to finish processing the task by.
	Deadline time.Time
}

// ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
func ( *Inspector) ( string) (int64, error) {
	return .rdb.ClusterKeySlot()
}

// ClusterNode describes a node in redis cluster.
type ClusterNode struct {
	// Node ID in the cluster.
	ID string

	// Address of the node.
	Addr string
}

// ClusterNodes returns a list of nodes the given queue belongs to.
//
// Only relevant if task queues are stored in redis cluster.
func ( *Inspector) ( string) ([]*ClusterNode, error) {
	,  := .rdb.ClusterNodes()
	if  != nil {
		return nil, 
	}
	var  []*ClusterNode
	for ,  := range  {
		 = append(, &ClusterNode{ID: .ID, Addr: .Addr})
	}
	return , nil
}

// SchedulerEntry holds information about a periodic task registered with a scheduler.
type SchedulerEntry struct {
	// Identifier of this entry.
	ID string

	// Spec describes the schedule of this entry.
	Spec string

	// Periodic Task registered for this entry.
	Task *Task

	// Opts is the options for the periodic task.
	Opts []Option

	// Next shows the next time the task will be enqueued.
	Next time.Time

	// Prev shows the last time the task was enqueued.
	// Zero time if task was never enqueued.
	Prev time.Time
}

// SchedulerEntries returns a list of all entries registered with
// currently running schedulers.
func ( *Inspector) () ([]*SchedulerEntry, error) {
	var  []*SchedulerEntry
	,  := .rdb.ListSchedulerEntries()
	if  != nil {
		return nil, 
	}
	for ,  := range  {
		 := NewTask(.Type, .Payload)
		var  []Option
		for ,  := range .Opts {
			if ,  := parseOption();  == nil {
				// ignore bad data
				 = append(, )
			}
		}
		 = append(, &SchedulerEntry{
			ID:   .ID,
			Spec: .Spec,
			Task: ,
			Opts: ,
			Next: .Next,
			Prev: .Prev,
		})
	}
	return , nil
}

// parseOption interprets a string s as an Option and returns the Option if parsing is successful,
// otherwise returns non-nil error.
func parseOption( string) (Option, error) {
	,  := parseOptionFunc(), parseOptionArg()
	switch  {
	case "Queue":
		,  := strconv.Unquote()
		if  != nil {
			return nil, 
		}
		return Queue(), nil
	case "MaxRetry":
		,  := strconv.Atoi()
		if  != nil {
			return nil, 
		}
		return MaxRetry(), nil
	case "Timeout":
		,  := time.ParseDuration()
		if  != nil {
			return nil, 
		}
		return Timeout(), nil
	case "Deadline":
		,  := time.Parse(time.UnixDate, )
		if  != nil {
			return nil, 
		}
		return Deadline(), nil
	case "Unique":
		,  := time.ParseDuration()
		if  != nil {
			return nil, 
		}
		return Unique(), nil
	case "ProcessAt":
		,  := time.Parse(time.UnixDate, )
		if  != nil {
			return nil, 
		}
		return ProcessAt(), nil
	case "ProcessIn":
		,  := time.ParseDuration()
		if  != nil {
			return nil, 
		}
		return ProcessIn(), nil
	case "Retention":
		,  := time.ParseDuration()
		if  != nil {
			return nil, 
		}
		return Retention(), nil
	default:
		return nil, fmt.Errorf("cannot not parse option string %q", )
	}
}

func parseOptionFunc( string) string {
	 := strings.Index(, "(")
	return [:]
}

func parseOptionArg( string) string {
	 := strings.Index(, "(")
	if  >= 0 {
		 := strings.Index(, ")")
		if  >  {
			return [+1 : ]
		}
	}
	return ""
}

// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
type SchedulerEnqueueEvent struct {
	// ID of the task that was enqueued.
	TaskID string

	// Time the task was enqueued.
	EnqueuedAt time.Time
}

// ListSchedulerEnqueueEvents retrieves a list of enqueue events from the specified scheduler entry.
//
// By default, it retrieves the first 30 tasks.
func ( *Inspector) ( string,  ...ListOption) ([]*SchedulerEnqueueEvent, error) {
	 := composeListOptions(...)
	 := rdb.Pagination{Size: .pageSize, Page: .pageNum - 1}
	,  := .rdb.ListSchedulerEnqueueEvents(, )
	if  != nil {
		return nil, 
	}
	var  []*SchedulerEnqueueEvent
	for ,  := range  {
		 = append(, &SchedulerEnqueueEvent{TaskID: .TaskID, EnqueuedAt: .EnqueuedAt})
	}
	return , nil
}