// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package asynq

import (
	
	
	
	

	
	
	
	
	
)

// A Client is responsible for scheduling tasks.
//
// A Client is used to register tasks that should be processed
// immediately or some time in the future.
//
// Clients are safe for concurrent use by multiple goroutines.
type Client struct {
	broker base.Broker
}

// NewClient returns a new Client instance given a redis connection option.
func ( RedisConnOpt) *Client {
	,  := .MakeRedisClient().(redis.UniversalClient)
	if ! {
		panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", ))
	}
	return &Client{broker: rdb.NewRDB()}
}

type OptionType int

const (
	MaxRetryOpt OptionType = iota
	QueueOpt
	TimeoutOpt
	DeadlineOpt
	UniqueOpt
	ProcessAtOpt
	ProcessInOpt
	TaskIDOpt
	RetentionOpt
	GroupOpt
)

// Option specifies the task processing behavior.
type Option interface {
	// String returns a string representation of the option.
	String() string

	// Type describes the type of the option.
	Type() OptionType

	// Value returns a value used to create this option.
	Value() interface{}
}

// Internal option representations.
type (
	retryOption     int
	queueOption     string
	taskIDOption    string
	timeoutOption   time.Duration
	deadlineOption  time.Time
	uniqueOption    time.Duration
	processAtOption time.Time
	processInOption time.Duration
	retentionOption time.Duration
	groupOption     string
)

// MaxRetry returns an option to specify the max number of times
// the task will be retried.
//
// Negative retry count is treated as zero retry.
func ( int) Option {
	if  < 0 {
		 = 0
	}
	return retryOption()
}

func ( retryOption) () string     { return fmt.Sprintf("MaxRetry(%d)", int()) }
func ( retryOption) () OptionType   { return MaxRetryOpt }
func ( retryOption) () interface{} { return int() }

// Queue returns an option to specify the queue to enqueue the task into.
func ( string) Option {
	return queueOption()
}

func ( queueOption) () string     { return fmt.Sprintf("Queue(%q)", string()) }
func ( queueOption) () OptionType   { return QueueOpt }
func ( queueOption) () interface{} { return string() }

// TaskID returns an option to specify the task ID.
func ( string) Option {
	return taskIDOption()
}

func ( taskIDOption) () string     { return fmt.Sprintf("TaskID(%q)", string()) }
func ( taskIDOption) () OptionType   { return TaskIDOpt }
func ( taskIDOption) () interface{} { return string() }

// Timeout returns an option to specify how long a task may run.
// If the timeout elapses before the Handler returns, then the task
// will be retried.
//
// Zero duration means no limit.
//
// If there's a conflicting Deadline option, whichever comes earliest
// will be used.
func ( time.Duration) Option {
	return timeoutOption()
}

func ( timeoutOption) () string     { return fmt.Sprintf("Timeout(%v)", time.Duration()) }
func ( timeoutOption) () OptionType   { return TimeoutOpt }
func ( timeoutOption) () interface{} { return time.Duration() }

// Deadline returns an option to specify the deadline for the given task.
// If it reaches the deadline before the Handler returns, then the task
// will be retried.
//
// If there's a conflicting Timeout option, whichever comes earliest
// will be used.
func ( time.Time) Option {
	return deadlineOption()
}

func ( deadlineOption) () string {
	return fmt.Sprintf("Deadline(%v)", time.Time().Format(time.UnixDate))
}
func ( deadlineOption) () OptionType   { return DeadlineOpt }
func ( deadlineOption) () interface{} { return time.Time() }

// Unique returns an option to enqueue a task only if the given task is unique.
// Task enqueued with this option is guaranteed to be unique within the given ttl.
// Once the task gets processed successfully or once the TTL has expired,
// another task with the same uniqueness may be enqueued.
// ErrDuplicateTask error is returned when enqueueing a duplicate task.
// TTL duration must be greater than or equal to 1 second.
//
// Uniqueness of a task is based on the following properties:
//     - Task Type
//     - Task Payload
//     - Queue Name
func ( time.Duration) Option {
	return uniqueOption()
}

func ( uniqueOption) () string     { return fmt.Sprintf("Unique(%v)", time.Duration()) }
func ( uniqueOption) () OptionType   { return UniqueOpt }
func ( uniqueOption) () interface{} { return time.Duration() }

// ProcessAt returns an option to specify when to process the given task.
//
// If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others.
func ( time.Time) Option {
	return processAtOption()
}

func ( processAtOption) () string {
	return fmt.Sprintf("ProcessAt(%v)", time.Time().Format(time.UnixDate))
}
func ( processAtOption) () OptionType   { return ProcessAtOpt }
func ( processAtOption) () interface{} { return time.Time() }

// ProcessIn returns an option to specify when to process the given task relative to the current time.
//
// If there's a conflicting ProcessAt option, the last option passed to Enqueue overrides the others.
func ( time.Duration) Option {
	return processInOption()
}

func ( processInOption) () string     { return fmt.Sprintf("ProcessIn(%v)", time.Duration()) }
func ( processInOption) () OptionType   { return ProcessInOpt }
func ( processInOption) () interface{} { return time.Duration() }

// Retention returns an option to specify the duration of retention period for the task.
// If this option is provided, the task will be stored as a completed task after successful processing.
// A completed task will be deleted after the specified duration elapses.
func ( time.Duration) Option {
	return retentionOption()
}

func ( retentionOption) () string     { return fmt.Sprintf("Retention(%v)", time.Duration()) }
func ( retentionOption) () OptionType   { return RetentionOpt }
func ( retentionOption) () interface{} { return time.Duration() }

// Group returns an option to specify the group used for the task.
// Tasks in a given queue with the same group will be aggregated into one task before passed to Handler.
func ( string) Option {
	return groupOption()
}

func ( groupOption) () string     { return fmt.Sprintf("Group(%q)", string()) }
func ( groupOption) () OptionType   { return GroupOpt }
func ( groupOption) () interface{} { return string() }

// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
var ErrDuplicateTask = errors.New("task already exists")

// ErrTaskIDConflict indicates that the given task could not be enqueued since its task ID already exists.
//
// ErrTaskIDConflict error only applies to tasks enqueued with a TaskID option.
var ErrTaskIDConflict = errors.New("task ID conflicts with another task")

type option struct {
	retry     int
	queue     string
	taskID    string
	timeout   time.Duration
	deadline  time.Time
	uniqueTTL time.Duration
	processAt time.Time
	retention time.Duration
	group     string
}

// composeOptions merges user provided options into the default options
// and returns the composed option.
// It also validates the user provided options and returns an error if any of
// the user provided options fail the validations.
func composeOptions( ...Option) (option, error) {
	 := option{
		retry:     defaultMaxRetry,
		queue:     base.DefaultQueueName,
		taskID:    uuid.NewString(),
		timeout:   0, // do not set to defaultTimeout here
		deadline:  time.Time{},
		processAt: time.Now(),
	}
	for ,  := range  {
		switch opt := .(type) {
		case retryOption:
			.retry = int()
		case queueOption:
			 := string()
			if  := base.ValidateQueueName();  != nil {
				return option{}, 
			}
			.queue = 
		case taskIDOption:
			 := string()
			if isBlank() {
				return option{}, errors.New("task ID cannot be empty")
			}
			.taskID = 
		case timeoutOption:
			.timeout = time.Duration()
		case deadlineOption:
			.deadline = time.Time()
		case uniqueOption:
			 := time.Duration()
			if  < 1*time.Second {
				return option{}, errors.New("Unique TTL cannot be less than 1s")
			}
			.uniqueTTL = 
		case processAtOption:
			.processAt = time.Time()
		case processInOption:
			.processAt = time.Now().Add(time.Duration())
		case retentionOption:
			.retention = time.Duration()
		case groupOption:
			 := string()
			if isBlank() {
				return option{}, errors.New("group key cannot be empty")
			}
			.group = 
		default:
			// ignore unexpected option
		}
	}
	return , nil
}

// isBlank returns true if the given s is empty or consist of all whitespaces.
func isBlank( string) bool {
	return strings.TrimSpace() == ""
}

const (
	// Default max retry count used if nothing is specified.
	defaultMaxRetry = 25

	// Default timeout used if both timeout and deadline are not specified.
	defaultTimeout = 30 * time.Minute
)

// Value zero indicates no timeout and no deadline.
var (
	noTimeout  time.Duration = 0
	noDeadline time.Time     = time.Unix(0, 0)
)

// Close closes the connection with redis.
func ( *Client) () error {
	return .broker.Close()
}

// Enqueue enqueues the given task to a queue.
//
// Enqueue returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
// Any options provided to NewTask can be overridden by options passed to Enqueue.
// By default, max retry is set to 25 and timeout is set to 30 minutes.
//
// If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
//
// Enqueue uses context.Background internally; to specify the context, use EnqueueContext.
func ( *Client) ( *Task,  ...Option) (*TaskInfo, error) {
	return .EnqueueContext(context.Background(), , ...)
}

// EnqueueContext enqueues the given task to a queue.
//
// EnqueueContext returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
// Any options provided to NewTask can be overridden by options passed to Enqueue.
// By default, max retry is set to 25 and timeout is set to 30 minutes.
//
// If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
//
// The first argument context applies to the enqueue operation. To specify task timeout and deadline, use Timeout and Deadline option instead.
func ( *Client) ( context.Context,  *Task,  ...Option) (*TaskInfo, error) {
	if  == nil {
		return nil, fmt.Errorf("task cannot be nil")
	}
	if strings.TrimSpace(.Type()) == "" {
		return nil, fmt.Errorf("task typename cannot be empty")
	}
	// merge task options with the options provided at enqueue time.
	 = append(.opts, ...)
	,  := composeOptions(...)
	if  != nil {
		return nil, 
	}
	 := noDeadline
	if !.deadline.IsZero() {
		 = .deadline
	}
	 := noTimeout
	if .timeout != 0 {
		 = .timeout
	}
	if .Equal(noDeadline) &&  == noTimeout {
		// If neither deadline nor timeout are set, use default timeout.
		 = defaultTimeout
	}
	var  string
	if .uniqueTTL > 0 {
		 = base.UniqueKey(.queue, .Type(), .Payload())
	}
	 := &base.TaskMessage{
		ID:        .taskID,
		Type:      .Type(),
		Payload:   .Payload(),
		Queue:     .queue,
		Retry:     .retry,
		Deadline:  .Unix(),
		Timeout:   int64(.Seconds()),
		UniqueKey: ,
		GroupKey:  .group,
		Retention: int64(.retention.Seconds()),
	}
	 := time.Now()
	var  base.TaskState
	if .processAt.After() {
		 = .schedule(, , .processAt, .uniqueTTL)
		 = base.TaskStateScheduled
	} else if .group != "" {
		// Use zero value for processAt since we don't know when the task will be aggregated and processed.
		.processAt = time.Time{}
		 = .addToGroup(, , .group, .uniqueTTL)
		 = base.TaskStateAggregating
	} else {
		.processAt = 
		 = .enqueue(, , .uniqueTTL)
		 = base.TaskStatePending
	}
	switch {
	case errors.Is(, errors.ErrDuplicateTask):
		return nil, fmt.Errorf("%w", ErrDuplicateTask)
	case errors.Is(, errors.ErrTaskIdConflict):
		return nil, fmt.Errorf("%w", ErrTaskIDConflict)
	case  != nil:
		return nil, 
	}
	return newTaskInfo(, , .processAt, nil), nil
}

func ( *Client) ( context.Context,  *base.TaskMessage,  time.Duration) error {
	if  > 0 {
		return .broker.EnqueueUnique(, , )
	}
	return .broker.Enqueue(, )
}

func ( *Client) ( context.Context,  *base.TaskMessage,  time.Time,  time.Duration) error {
	if  > 0 {
		 := .Add().Sub(time.Now())
		return .broker.ScheduleUnique(, , , )
	}
	return .broker.Schedule(, , )
}

func ( *Client) ( context.Context,  *base.TaskMessage,  string,  time.Duration) error {
	if  > 0 {
		return .broker.AddToGroupUnique(, , , )
	}
	return .broker.AddToGroup(, , )
}