// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package asynq

import (
	
	
	

	
	
	
)

type recoverer struct {
	logger         *log.Logger
	broker         base.Broker
	retryDelayFunc RetryDelayFunc
	isFailureFunc  func(error) bool

	// channel to communicate back to the long running "recoverer" goroutine.
	done chan struct{}

	// list of queues to check for deadline.
	queues []string

	// poll interval.
	interval time.Duration
}

type recovererParams struct {
	logger         *log.Logger
	broker         base.Broker
	queues         []string
	interval       time.Duration
	retryDelayFunc RetryDelayFunc
	isFailureFunc  func(error) bool
}

func newRecoverer( recovererParams) *recoverer {
	return &recoverer{
		logger:         .logger,
		broker:         .broker,
		done:           make(chan struct{}),
		queues:         .queues,
		interval:       .interval,
		retryDelayFunc: .retryDelayFunc,
		isFailureFunc:  .isFailureFunc,
	}
}

func ( *recoverer) () {
	.logger.Debug("Recoverer shutting down...")
	// Signal the recoverer goroutine to stop polling.
	.done <- struct{}{}
}

func ( *recoverer) ( *sync.WaitGroup) {
	.Add(1)
	go func() {
		defer .Done()
		.recover()
		 := time.NewTimer(.interval)
		for {
			select {
			case <-.done:
				.logger.Debug("Recoverer done")
				.Stop()
				return
			case <-.C:
				.recover()
				.Reset(.interval)
			}
		}
	}()
}

// ErrLeaseExpired error indicates that the task failed because the worker working on the task
// could not extend its lease due to missing heartbeats. The worker may have crashed or got cutoff from the network.
var ErrLeaseExpired = errors.New("asynq: task lease expired")

func ( *recoverer) () {
	.recoverLeaseExpiredTasks()
	.recoverStaleAggregationSets()
}

func ( *recoverer) () {
	// Get all tasks which have expired 30 seconds ago or earlier to accommodate certain amount of clock skew.
	 := time.Now().Add(-30 * time.Second)
	,  := .broker.ListLeaseExpired(, .queues...)
	if  != nil {
		.logger.Warnf("recoverer: could not list lease expired tasks: %v", )
		return
	}
	for ,  := range  {
		if .Retried >= .Retry {
			.archive(, ErrLeaseExpired)
		} else {
			.retry(, ErrLeaseExpired)
		}
	}
}

func ( *recoverer) () {
	for ,  := range .queues {
		if  := .broker.ReclaimStaleAggregationSets();  != nil {
			.logger.Warnf("recoverer: could not reclaim stale aggregation sets in queue %q: %v", , )
		}
	}
}

func ( *recoverer) ( *base.TaskMessage,  error) {
	 := .retryDelayFunc(.Retried, , NewTask(.Type, .Payload))
	 := time.Now().Add()
	if  := .broker.Retry(context.Background(), , , .Error(), .isFailureFunc());  != nil {
		.logger.Warnf("recoverer: could not retry lease expired task: %v", )
	}
}

func ( *recoverer) ( *base.TaskMessage,  error) {
	if  := .broker.Archive(context.Background(), , .Error());  != nil {
		.logger.Warnf("recoverer: could not move task to archive: %v", )
	}
}