// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package asynq

import (
	
	
	
	
	
	
	
	
	
	

	
	asynqcontext 
	
	
	
	
)

type processor struct {
	logger *log.Logger
	broker base.Broker
	clock  timeutil.Clock

	handler   Handler
	baseCtxFn func() context.Context

	queueConfig map[string]int

	// orderedQueues is set only in strict-priority mode.
	orderedQueues []string

	retryDelayFunc RetryDelayFunc
	isFailureFunc  func(error) bool

	errHandler ErrorHandler

	shutdownTimeout time.Duration

	// channel via which to send sync requests to syncer.
	syncRequestCh chan<- *syncRequest

	// rate limiter to prevent spamming logs with a bunch of errors.
	errLogLimiter *rate.Limiter

	// sema is a counting semaphore to ensure the number of active workers
	// does not exceed the limit.
	sema chan struct{}

	// channel to communicate back to the long running "processor" goroutine.
	// once is used to send value to the channel only once.
	done chan struct{}
	once sync.Once

	// quit channel is closed when the shutdown of the "processor" goroutine starts.
	quit chan struct{}

	// abort channel communicates to the in-flight worker goroutines to stop.
	abort chan struct{}

	// cancelations is a set of cancel functions for all active tasks.
	cancelations *base.Cancelations

	starting chan<- *workerInfo
	finished chan<- *base.TaskMessage
}

type processorParams struct {
	logger          *log.Logger
	broker          base.Broker
	baseCtxFn       func() context.Context
	retryDelayFunc  RetryDelayFunc
	isFailureFunc   func(error) bool
	syncCh          chan<- *syncRequest
	cancelations    *base.Cancelations
	concurrency     int
	queues          map[string]int
	strictPriority  bool
	errHandler      ErrorHandler
	shutdownTimeout time.Duration
	starting        chan<- *workerInfo
	finished        chan<- *base.TaskMessage
}

// newProcessor constructs a new processor.
func newProcessor( processorParams) *processor {
	 := normalizeQueues(.queues)
	 := []string(nil)
	if .strictPriority {
		 = sortByPriority()
	}
	return &processor{
		logger:          .logger,
		broker:          .broker,
		baseCtxFn:       .baseCtxFn,
		clock:           timeutil.NewRealClock(),
		queueConfig:     ,
		orderedQueues:   ,
		retryDelayFunc:  .retryDelayFunc,
		isFailureFunc:   .isFailureFunc,
		syncRequestCh:   .syncCh,
		cancelations:    .cancelations,
		errLogLimiter:   rate.NewLimiter(rate.Every(3*time.Second), 1),
		sema:            make(chan struct{}, .concurrency),
		done:            make(chan struct{}),
		quit:            make(chan struct{}),
		abort:           make(chan struct{}),
		errHandler:      .errHandler,
		handler:         HandlerFunc(func( context.Context,  *Task) error { return fmt.Errorf("handler not set") }),
		shutdownTimeout: .shutdownTimeout,
		starting:        .starting,
		finished:        .finished,
	}
}

// Note: stops only the "processor" goroutine, does not stop workers.
// It's safe to call this method multiple times.
func ( *processor) () {
	.once.Do(func() {
		.logger.Debug("Processor shutting down...")
		// Unblock if processor is waiting for sema token.
		close(.quit)
		// Signal the processor goroutine to stop processing tasks
		// from the queue.
		.done <- struct{}{}
	})
}

// NOTE: once shutdown, processor cannot be re-started.
func ( *processor) () {
	.stop()

	time.AfterFunc(.shutdownTimeout, func() { close(.abort) })

	.logger.Info("Waiting for all workers to finish...")
	// block until all workers have released the token
	for  := 0;  < cap(.sema); ++ {
		.sema <- struct{}{}
	}
	.logger.Info("All workers have finished")
}

func ( *processor) ( *sync.WaitGroup) {
	.Add(1)
	go func() {
		defer .Done()
		for {
			select {
			case <-.done:
				.logger.Debug("Processor done")
				return
			default:
				.exec()
			}
		}
	}()
}

// exec pulls a task out of the queue and starts a worker goroutine to
// process the task.
func ( *processor) () {
	select {
	case <-.quit:
		return
	case .sema <- struct{}{}: // acquire token
		 := .queues()
		, ,  := .broker.Dequeue(...)
		switch {
		case errors.Is(, errors.ErrNoProcessableTask):
			.logger.Debug("All queues are empty")
			// Queues are empty, this is a normal behavior.
			// Sleep to avoid slamming redis and let scheduler move tasks into queues.
			// Note: We are not using blocking pop operation and polling queues instead.
			// This adds significant load to redis.
			time.Sleep(time.Second)
			<-.sema // release token
			return
		case  != nil:
			if .errLogLimiter.Allow() {
				.logger.Errorf("Dequeue error: %v", )
			}
			<-.sema // release token
			return
		}

		 := base.NewLease()
		 := .computeDeadline()
		.starting <- &workerInfo{, time.Now(), , }
		go func() {
			defer func() {
				.finished <- 
				<-.sema // release token
			}()

			,  := asynqcontext.New(.baseCtxFn(), , )
			.cancelations.Add(.ID, )
			defer func() {
				()
				.cancelations.Delete(.ID)
			}()

			// check context before starting a worker goroutine.
			select {
			case <-.Done():
				// already canceled (e.g. deadline exceeded).
				.handleFailedMessage(, , , .Err())
				return
			default:
			}

			 := make(chan error, 1)
			go func() {
				 := newTask(
					.Type,
					.Payload,
					&ResultWriter{
						id:     .ID,
						qname:  .Queue,
						broker: .broker,
						ctx:    ,
					},
				)
				 <- .perform(, )
			}()

			select {
			case <-.abort:
				// time is up, push the message back to queue and quit this worker goroutine.
				.logger.Warnf("Quitting worker. task id=%s", .ID)
				.requeue(, )
				return
			case <-.Done():
				()
				.handleFailedMessage(, , , ErrLeaseExpired)
				return
			case <-.Done():
				.handleFailedMessage(, , , .Err())
				return
			case  := <-:
				if  != nil {
					.handleFailedMessage(, , , )
					return
				}
				.handleSucceededMessage(, )
			}
		}()
	}
}

func ( *processor) ( *base.Lease,  *base.TaskMessage) {
	if !.IsValid() {
		// If lease is not valid, do not write to redis; Let recoverer take care of it.
		return
	}
	,  := context.WithDeadline(context.Background(), .Deadline())
	 := .broker.Requeue(, )
	if  != nil {
		.logger.Errorf("Could not push task id=%s back to queue: %v", .ID, )
	} else {
		.logger.Infof("Pushed task id=%s back to queue", .ID)
	}
}

func ( *processor) ( *base.Lease,  *base.TaskMessage) {
	if .Retention > 0 {
		.markAsComplete(, )
	} else {
		.markAsDone(, )
	}
}

func ( *processor) ( *base.Lease,  *base.TaskMessage) {
	if !.IsValid() {
		// If lease is not valid, do not write to redis; Let recoverer take care of it.
		return
	}
	,  := context.WithDeadline(context.Background(), .Deadline())
	 := .broker.MarkAsComplete(, )
	if  != nil {
		 := fmt.Sprintf("Could not move task id=%s type=%q from %q to %q:  %+v",
			.ID, .Type, base.ActiveKey(.Queue), base.CompletedKey(.Queue), )
		.logger.Warnf("%s; Will retry syncing", )
		.syncRequestCh <- &syncRequest{
			fn: func() error {
				return .broker.MarkAsComplete(, )
			},
			errMsg:   ,
			deadline: .Deadline(),
		}
	}
}

func ( *processor) ( *base.Lease,  *base.TaskMessage) {
	if !.IsValid() {
		// If lease is not valid, do not write to redis; Let recoverer take care of it.
		return
	}
	,  := context.WithDeadline(context.Background(), .Deadline())
	 := .broker.Done(, )
	if  != nil {
		 := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", .ID, .Type, base.ActiveKey(.Queue), )
		.logger.Warnf("%s; Will retry syncing", )
		.syncRequestCh <- &syncRequest{
			fn: func() error {
				return .broker.Done(, )
			},
			errMsg:   ,
			deadline: .Deadline(),
		}
	}
}

// SkipRetry is used as a return value from Handler.ProcessTask to indicate that
// the task should not be retried and should be archived instead.
var SkipRetry = errors.New("skip retry for the task")

func ( *processor) ( context.Context,  *base.Lease,  *base.TaskMessage,  error) {
	if .errHandler != nil {
		.errHandler.HandleError(, NewTask(.Type, .Payload), )
	}
	if !.isFailureFunc() {
		// retry the task without marking it as failed
		.retry(, , , false /*isFailure*/)
		return
	}
	if .Retried >= .Retry || errors.Is(, SkipRetry) {
		.logger.Warnf("Retry exhausted for task id=%s", .ID)
		.archive(, , )
	} else {
		.retry(, , , true /*isFailure*/)
	}
}

func ( *processor) ( *base.Lease,  *base.TaskMessage,  error,  bool) {
	if !.IsValid() {
		// If lease is not valid, do not write to redis; Let recoverer take care of it.
		return
	}
	,  := context.WithDeadline(context.Background(), .Deadline())
	 := .retryDelayFunc(.Retried, , NewTask(.Type, .Payload))
	 := time.Now().Add()
	 := .broker.Retry(, , , .Error(), )
	if  != nil {
		 := fmt.Sprintf("Could not move task id=%s from %q to %q", .ID, base.ActiveKey(.Queue), base.RetryKey(.Queue))
		.logger.Warnf("%s; Will retry syncing", )
		.syncRequestCh <- &syncRequest{
			fn: func() error {
				return .broker.Retry(, , , .Error(), )
			},
			errMsg:   ,
			deadline: .Deadline(),
		}
	}
}

func ( *processor) ( *base.Lease,  *base.TaskMessage,  error) {
	if !.IsValid() {
		// If lease is not valid, do not write to redis; Let recoverer take care of it.
		return
	}
	,  := context.WithDeadline(context.Background(), .Deadline())
	 := .broker.Archive(, , .Error())
	if  != nil {
		 := fmt.Sprintf("Could not move task id=%s from %q to %q", .ID, base.ActiveKey(.Queue), base.ArchivedKey(.Queue))
		.logger.Warnf("%s; Will retry syncing", )
		.syncRequestCh <- &syncRequest{
			fn: func() error {
				return .broker.Archive(, , .Error())
			},
			errMsg:   ,
			deadline: .Deadline(),
		}
	}
}

// queues returns a list of queues to query.
// Order of the queue names is based on the priority of each queue.
// Queue names is sorted by their priority level if strict-priority is true.
// If strict-priority is false, then the order of queue names are roughly based on
// the priority level but randomized in order to avoid starving low priority queues.
func ( *processor) () []string {
	// skip the overhead of generating a list of queue names
	// if we are processing one queue.
	if len(.queueConfig) == 1 {
		for  := range .queueConfig {
			return []string{}
		}
	}
	if .orderedQueues != nil {
		return .orderedQueues
	}
	var  []string
	for ,  := range .queueConfig {
		for  := 0;  < ; ++ {
			 = append(, )
		}
	}
	 := rand.New(rand.NewSource(time.Now().UnixNano()))
	.Shuffle(len(), func(,  int) { [], [] = [], [] })
	return uniq(, len(.queueConfig))
}

// perform calls the handler with the given task.
// If the call returns without panic, it simply returns the value,
// otherwise, it recovers from panic and returns an error.
func ( *processor) ( context.Context,  *Task) ( error) {
	defer func() {
		if  := recover();  != nil {
			.logger.Errorf("recovering from panic. See the stack trace below for details:\n%s", string(debug.Stack()))
			, , ,  := runtime.Caller(1) // skip the first frame (panic itself)
			if  && strings.Contains(, "runtime/") {
				// The panic came from the runtime, most likely due to incorrect
				// map/slice usage. The parent frame should have the real trigger.
				_, , ,  = runtime.Caller(2)
			}

			// Include the file and line number info in the error, if runtime.Caller returned ok.
			if  {
				 = fmt.Errorf("panic [%s:%d]: %v", , , )
			} else {
				 = fmt.Errorf("panic: %v", )
			}
		}
	}()
	return .handler.ProcessTask(, )
}

// uniq dedupes elements and returns a slice of unique names of length l.
// Order of the output slice is based on the input list.
func uniq( []string,  int) []string {
	var  []string
	 := make(map[string]struct{})
	for ,  := range  {
		if ,  := []; ! {
			[] = struct{}{}
			 = append(, )
		}
		if len() ==  {
			break
		}
	}
	return 
}

// sortByPriority returns a list of queue names sorted by
// their priority level in descending order.
func sortByPriority( map[string]int) []string {
	var  []*queue
	for ,  := range  {
		 = append(, &queue{, })
	}
	sort.Sort(sort.Reverse(byPriority()))
	var  []string
	for ,  := range  {
		 = append(, .name)
	}
	return 
}

type queue struct {
	name     string
	priority int
}

type byPriority []*queue

func ( byPriority) () int           { return len() }
func ( byPriority) (,  int) bool { return [].priority < [].priority }
func ( byPriority) (,  int)      { [], [] = [], [] }

// normalizeQueues divides priority numbers by their greatest common divisor.
func normalizeQueues( map[string]int) map[string]int {
	var  []int
	for ,  := range  {
		 = append(, )
	}
	 := gcd(...)
	 := make(map[string]int)
	for ,  := range  {
		[] =  / 
	}
	return 
}

func gcd( ...int) int {
	 := func(,  int) int {
		for  > 0 {
			,  = , %
		}
		return 
	}
	 := [0]
	for  := 0;  < len(); ++ {
		 = ([], )
		if  == 1 {
			return 1
		}
	}
	return 
}

// computeDeadline returns the given task's deadline,
func ( *processor) ( *base.TaskMessage) time.Time {
	if .Timeout == 0 && .Deadline == 0 {
		.logger.Errorf("asynq: internal error: both timeout and deadline are not set for the task message: %s", .ID)
		return .clock.Now().Add(defaultTimeout)
	}
	if .Timeout != 0 && .Deadline != 0 {
		 := math.Min(float64(.clock.Now().Unix()+.Timeout), float64(.Deadline))
		return time.Unix(int64(), 0)
	}
	if .Timeout != 0 {
		return .clock.Now().Add(time.Duration(.Timeout) * time.Second)
	}
	return time.Unix(.Deadline, 0)
}