// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package asynq

import (
	
	

	
	
)

// A forwarder is responsible for moving scheduled and retry tasks to pending state
// so that the tasks get processed by the workers.
type forwarder struct {
	logger *log.Logger
	broker base.Broker

	// channel to communicate back to the long running "forwarder" goroutine.
	done chan struct{}

	// list of queue names to check and enqueue.
	queues []string

	// poll interval on average
	avgInterval time.Duration
}

type forwarderParams struct {
	logger   *log.Logger
	broker   base.Broker
	queues   []string
	interval time.Duration
}

func newForwarder( forwarderParams) *forwarder {
	return &forwarder{
		logger:      .logger,
		broker:      .broker,
		done:        make(chan struct{}),
		queues:      .queues,
		avgInterval: .interval,
	}
}

func ( *forwarder) () {
	.logger.Debug("Forwarder shutting down...")
	// Signal the forwarder goroutine to stop polling.
	.done <- struct{}{}
}

// start starts the "forwarder" goroutine.
func ( *forwarder) ( *sync.WaitGroup) {
	.Add(1)
	go func() {
		defer .Done()
		 := time.NewTimer(.avgInterval)
		for {
			select {
			case <-.done:
				.logger.Debug("Forwarder done")
				return
			case <-.C:
				.exec()
				.Reset(.avgInterval)
			}
		}
	}()
}

func ( *forwarder) () {
	if  := .broker.ForwardIfReady(.queues...);  != nil {
		.logger.Errorf("Failed to forward scheduled tasks: %v", )
	}
}