// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package asynq

import (
	
	
	
	
	
	
	
	

	
	
)

// Task represents a unit of work to be performed.
type Task struct {
	// typename indicates the type of task to be performed.
	typename string

	// payload holds data needed to perform the task.
	payload []byte

	// opts holds options for the task.
	opts []Option

	// w is the ResultWriter for the task.
	w *ResultWriter
}

func ( *Task) () string    { return .typename }
func ( *Task) () []byte { return .payload }

// ResultWriter returns a pointer to the ResultWriter associated with the task.
//
// Nil pointer is returned if called on a newly created task (i.e. task created by calling NewTask).
// Only the tasks passed to Handler.ProcessTask have a valid ResultWriter pointer.
func ( *Task) () *ResultWriter { return .w }

// NewTask returns a new Task given a type name and payload data.
// Options can be passed to configure task processing behavior.
func ( string,  []byte,  ...Option) *Task {
	return &Task{
		typename: ,
		payload:  ,
		opts:     ,
	}
}

// newTask creates a task with the given typename, payload and ResultWriter.
func newTask( string,  []byte,  *ResultWriter) *Task {
	return &Task{
		typename: ,
		payload:  ,
		w:        ,
	}
}

// A TaskInfo describes a task and its metadata.
type TaskInfo struct {
	// ID is the identifier of the task.
	ID string

	// Queue is the name of the queue in which the task belongs.
	Queue string

	// Type is the type name of the task.
	Type string

	// Payload is the payload data of the task.
	Payload []byte

	// State indicates the task state.
	State TaskState

	// MaxRetry is the maximum number of times the task can be retried.
	MaxRetry int

	// Retried is the number of times the task has retried so far.
	Retried int

	// LastErr is the error message from the last failure.
	LastErr string

	// LastFailedAt is the time time of the last failure if any.
	// If the task has no failures, LastFailedAt is zero time (i.e. time.Time{}).
	LastFailedAt time.Time

	// Timeout is the duration the task can be processed by Handler before being retried,
	// zero if not specified
	Timeout time.Duration

	// Deadline is the deadline for the task, zero value if not specified.
	Deadline time.Time

	// Group is the name of the group in which the task belongs.
	//
	// Tasks in the same queue can be grouped together by Group name and will be aggregated into one task
	// by a Server processing the queue.
	//
	// Empty string (default) indicates task does not belong to any groups, and no aggregation will be applied to the task.
	Group string

	// NextProcessAt is the time the task is scheduled to be processed,
	// zero if not applicable.
	NextProcessAt time.Time

	// IsOrphaned describes whether the task is left in active state with no worker processing it.
	// An orphaned task indicates that the worker has crashed or experienced network failures and was not able to
	// extend its lease on the task.
	//
	// This task will be recovered by running a server against the queue the task is in.
	// This field is only applicable to tasks with TaskStateActive.
	IsOrphaned bool

	// Retention is duration of the retention period after the task is successfully processed.
	Retention time.Duration

	// CompletedAt is the time when the task is processed successfully.
	// Zero value (i.e. time.Time{}) indicates no value.
	CompletedAt time.Time

	// Result holds the result data associated with the task.
	// Use ResultWriter to write result data from the Handler.
	Result []byte
}

// If t is non-zero, returns time converted from t as unix time in seconds.
// If t is zero, returns zero value of time.Time.
func fromUnixTimeOrZero( int64) time.Time {
	if  == 0 {
		return time.Time{}
	}
	return time.Unix(, 0)
}

func newTaskInfo( *base.TaskMessage,  base.TaskState,  time.Time,  []byte) *TaskInfo {
	 := TaskInfo{
		ID:            .ID,
		Queue:         .Queue,
		Type:          .Type,
		Payload:       .Payload, // Do we need to make a copy?
		MaxRetry:      .Retry,
		Retried:       .Retried,
		LastErr:       .ErrorMsg,
		Group:         .GroupKey,
		Timeout:       time.Duration(.Timeout) * time.Second,
		Deadline:      fromUnixTimeOrZero(.Deadline),
		Retention:     time.Duration(.Retention) * time.Second,
		NextProcessAt: ,
		LastFailedAt:  fromUnixTimeOrZero(.LastFailedAt),
		CompletedAt:   fromUnixTimeOrZero(.CompletedAt),
		Result:        ,
	}

	switch  {
	case base.TaskStateActive:
		.State = TaskStateActive
	case base.TaskStatePending:
		.State = TaskStatePending
	case base.TaskStateScheduled:
		.State = TaskStateScheduled
	case base.TaskStateRetry:
		.State = TaskStateRetry
	case base.TaskStateArchived:
		.State = TaskStateArchived
	case base.TaskStateCompleted:
		.State = TaskStateCompleted
	case base.TaskStateAggregating:
		.State = TaskStateAggregating
	default:
		panic(fmt.Sprintf("internal error: unknown state: %d", ))
	}
	return &
}

// TaskState denotes the state of a task.
type TaskState int

const (
	// Indicates that the task is currently being processed by Handler.
	TaskStateActive TaskState = iota + 1

	// Indicates that the task is ready to be processed by Handler.
	TaskStatePending

	// Indicates that the task is scheduled to be processed some time in the future.
	TaskStateScheduled

	// Indicates that the task has previously failed and scheduled to be processed some time in the future.
	TaskStateRetry

	// Indicates that the task is archived and stored for inspection purposes.
	TaskStateArchived

	// Indicates that the task is processed successfully and retained until the retention TTL expires.
	TaskStateCompleted

	// Indicates that the task is waiting in a group to be aggregated into one task.
	TaskStateAggregating
)

func ( TaskState) () string {
	switch  {
	case TaskStateActive:
		return "active"
	case TaskStatePending:
		return "pending"
	case TaskStateScheduled:
		return "scheduled"
	case TaskStateRetry:
		return "retry"
	case TaskStateArchived:
		return "archived"
	case TaskStateCompleted:
		return "completed"
	case TaskStateAggregating:
		return "aggregating"
	}
	panic("asynq: unknown task state")
}

// RedisConnOpt is a discriminated union of types that represent Redis connection configuration option.
//
// RedisConnOpt represents a sum of following types:
//
//   - RedisClientOpt
//   - RedisFailoverClientOpt
//   - RedisClusterClientOpt
type RedisConnOpt interface {
	// MakeRedisClient returns a new redis client instance.
	// Return value is intentionally opaque to hide the implementation detail of redis client.
	MakeRedisClient() interface{}
}

// RedisClientOpt is used to create a redis client that connects
// to a redis server directly.
type RedisClientOpt struct {
	// Network type to use, either tcp or unix.
	// Default is tcp.
	Network string

	// Redis server address in "host:port" format.
	Addr string

	// Username to authenticate the current connection when Redis ACLs are used.
	// See: https://redis.io/commands/auth.
	Username string

	// Password to authenticate the current connection.
	// See: https://redis.io/commands/auth.
	Password string

	// Redis DB to select after connecting to a server.
	// See: https://redis.io/commands/select.
	DB int

	// Dial timeout for establishing new connections.
	// Default is 5 seconds.
	DialTimeout time.Duration

	// Timeout for socket reads.
	// If timeout is reached, read commands will fail with a timeout error
	// instead of blocking.
	//
	// Use value -1 for no timeout and 0 for default.
	// Default is 3 seconds.
	ReadTimeout time.Duration

	// Timeout for socket writes.
	// If timeout is reached, write commands will fail with a timeout error
	// instead of blocking.
	//
	// Use value -1 for no timeout and 0 for default.
	// Default is ReadTimout.
	WriteTimeout time.Duration

	// Maximum number of socket connections.
	// Default is 10 connections per every CPU as reported by runtime.NumCPU.
	PoolSize int

	// TLS Config used to connect to a server.
	// TLS will be negotiated only if this field is set.
	TLSConfig *tls.Config
}

func ( RedisClientOpt) () interface{} {
	return redis.NewClient(&redis.Options{
		Network:      .Network,
		Addr:         .Addr,
		Username:     .Username,
		Password:     .Password,
		DB:           .DB,
		DialTimeout:  .DialTimeout,
		ReadTimeout:  .ReadTimeout,
		WriteTimeout: .WriteTimeout,
		PoolSize:     .PoolSize,
		TLSConfig:    .TLSConfig,
	})
}

// RedisFailoverClientOpt is used to creates a redis client that talks
// to redis sentinels for service discovery and has an automatic failover
// capability.
type RedisFailoverClientOpt struct {
	// Redis master name that monitored by sentinels.
	MasterName string

	// Addresses of sentinels in "host:port" format.
	// Use at least three sentinels to avoid problems described in
	// https://redis.io/topics/sentinel.
	SentinelAddrs []string

	// Redis sentinel password.
	SentinelPassword string

	// Username to authenticate the current connection when Redis ACLs are used.
	// See: https://redis.io/commands/auth.
	Username string

	// Password to authenticate the current connection.
	// See: https://redis.io/commands/auth.
	Password string

	// Redis DB to select after connecting to a server.
	// See: https://redis.io/commands/select.
	DB int

	// Dial timeout for establishing new connections.
	// Default is 5 seconds.
	DialTimeout time.Duration

	// Timeout for socket reads.
	// If timeout is reached, read commands will fail with a timeout error
	// instead of blocking.
	//
	// Use value -1 for no timeout and 0 for default.
	// Default is 3 seconds.
	ReadTimeout time.Duration

	// Timeout for socket writes.
	// If timeout is reached, write commands will fail with a timeout error
	// instead of blocking.
	//
	// Use value -1 for no timeout and 0 for default.
	// Default is ReadTimeout
	WriteTimeout time.Duration

	// Maximum number of socket connections.
	// Default is 10 connections per every CPU as reported by runtime.NumCPU.
	PoolSize int

	// TLS Config used to connect to a server.
	// TLS will be negotiated only if this field is set.
	TLSConfig *tls.Config
}

func ( RedisFailoverClientOpt) () interface{} {
	return redis.NewFailoverClient(&redis.FailoverOptions{
		MasterName:       .MasterName,
		SentinelAddrs:    .SentinelAddrs,
		SentinelPassword: .SentinelPassword,
		Username:         .Username,
		Password:         .Password,
		DB:               .DB,
		DialTimeout:      .DialTimeout,
		ReadTimeout:      .ReadTimeout,
		WriteTimeout:     .WriteTimeout,
		PoolSize:         .PoolSize,
		TLSConfig:        .TLSConfig,
	})
}

// RedisClusterClientOpt is used to creates a redis client that connects to
// redis cluster.
type RedisClusterClientOpt struct {
	// A seed list of host:port addresses of cluster nodes.
	Addrs []string

	// The maximum number of retries before giving up.
	// Command is retried on network errors and MOVED/ASK redirects.
	// Default is 8 retries.
	MaxRedirects int

	// Username to authenticate the current connection when Redis ACLs are used.
	// See: https://redis.io/commands/auth.
	Username string

	// Password to authenticate the current connection.
	// See: https://redis.io/commands/auth.
	Password string

	// Dial timeout for establishing new connections.
	// Default is 5 seconds.
	DialTimeout time.Duration

	// Timeout for socket reads.
	// If timeout is reached, read commands will fail with a timeout error
	// instead of blocking.
	//
	// Use value -1 for no timeout and 0 for default.
	// Default is 3 seconds.
	ReadTimeout time.Duration

	// Timeout for socket writes.
	// If timeout is reached, write commands will fail with a timeout error
	// instead of blocking.
	//
	// Use value -1 for no timeout and 0 for default.
	// Default is ReadTimeout.
	WriteTimeout time.Duration

	// TLS Config used to connect to a server.
	// TLS will be negotiated only if this field is set.
	TLSConfig *tls.Config
}

func ( RedisClusterClientOpt) () interface{} {
	return redis.NewClusterClient(&redis.ClusterOptions{
		Addrs:        .Addrs,
		MaxRedirects: .MaxRedirects,
		Username:     .Username,
		Password:     .Password,
		DialTimeout:  .DialTimeout,
		ReadTimeout:  .ReadTimeout,
		WriteTimeout: .WriteTimeout,
		TLSConfig:    .TLSConfig,
	})
}

// ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid.
// It returns a non-nil error if uri cannot be parsed.
//
// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
// Supported formats are:
//     redis://[:password@]host[:port][/dbnumber]
//     rediss://[:password@]host[:port][/dbnumber]
//     redis-socket://[:password@]path[?db=dbnumber]
//     redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
func ( string) (RedisConnOpt, error) {
	,  := url.Parse()
	if  != nil {
		return nil, fmt.Errorf("asynq: could not parse redis uri: %v", )
	}
	switch .Scheme {
	case "redis", "rediss":
		return parseRedisURI()
	case "redis-socket":
		return parseRedisSocketURI()
	case "redis-sentinel":
		return parseRedisSentinelURI()
	default:
		return nil, fmt.Errorf("asynq: unsupported uri scheme: %q", .Scheme)
	}
}

func parseRedisURI( *url.URL) (RedisConnOpt, error) {
	var  int
	var  error
	var  RedisClientOpt

	if len(.Path) > 0 {
		 := strings.Split(strings.Trim(.Path, "/"), "/")
		,  = strconv.Atoi([0])
		if  != nil {
			return nil, fmt.Errorf("asynq: could not parse redis uri: database number should be the first segment of the path")
		}
	}
	var  string
	if ,  := .User.Password();  {
		 = 
	}

	if .Scheme == "rediss" {
		, ,  := net.SplitHostPort(.Host)
		if  != nil {
			 = .Host
		}
		.TLSConfig = &tls.Config{ServerName: }
	}

	.Addr = .Host
	.Password = 
	.DB = 

	return , nil
}

func parseRedisSocketURI( *url.URL) (RedisConnOpt, error) {
	const  = "asynq: could not parse redis socket uri"
	if len(.Path) == 0 {
		return nil, fmt.Errorf("%s: path does not exist", )
	}
	 := .Query()
	var  int
	var  error
	if  := .Get("db");  != "" {
		,  = strconv.Atoi()
		if  != nil {
			return nil, fmt.Errorf("%s: query param `db` should be a number", )
		}
	}
	var  string
	if ,  := .User.Password();  {
		 = 
	}
	return RedisClientOpt{Network: "unix", Addr: .Path, DB: , Password: }, nil
}

func parseRedisSentinelURI( *url.URL) (RedisConnOpt, error) {
	 := strings.Split(.Host, ",")
	 := .Query().Get("master")
	var  string
	if ,  := .User.Password();  {
		 = 
	}
	return RedisFailoverClientOpt{MasterName: , SentinelAddrs: , SentinelPassword: }, nil
}

// ResultWriter is a client interface to write result data for a task.
// It writes the data to the redis instance the server is connected to.
type ResultWriter struct {
	id     string // task ID this writer is responsible for
	qname  string // queue name the task belongs to
	broker base.Broker
	ctx    context.Context // context associated with the task
}

// Write writes the given data as a result of the task the ResultWriter is associated with.
func ( *ResultWriter) ( []byte) ( int,  error) {
	select {
	case <-.ctx.Done():
		return 0, fmt.Errorf("failed to result task result: %v", .ctx.Err())
	default:
	}
	return .broker.WriteResult(.qname, .id, )
}

// TaskID returns the ID of the task the ResultWriter is associated with.
func ( *ResultWriter) () string {
	return .id
}