// Copyright 2022 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package asynq

import (
	
	
	
	
	
	
)

// PeriodicTaskManager manages scheduling of periodic tasks.
// It syncs scheduler's entries by calling the config provider periodically.
type PeriodicTaskManager struct {
	s            *Scheduler
	p            PeriodicTaskConfigProvider
	syncInterval time.Duration
	done         chan (struct{})
	wg           sync.WaitGroup
	m            map[string]string // map[hash]entryID
}

type PeriodicTaskManagerOpts struct {
	// Required: must be non nil
	PeriodicTaskConfigProvider PeriodicTaskConfigProvider

	// Required: must be non nil
	RedisConnOpt RedisConnOpt

	// Optional: scheduler options
	*SchedulerOpts

	// Optional: default is 3m
	SyncInterval time.Duration
}

const defaultSyncInterval = 3 * time.Minute

// NewPeriodicTaskManager returns a new PeriodicTaskManager instance.
// The given opts should specify the RedisConnOp and PeriodicTaskConfigProvider at minimum.
func ( PeriodicTaskManagerOpts) (*PeriodicTaskManager, error) {
	if .PeriodicTaskConfigProvider == nil {
		return nil, fmt.Errorf("PeriodicTaskConfigProvider cannot be nil")
	}
	if .RedisConnOpt == nil {
		return nil, fmt.Errorf("RedisConnOpt cannot be nil")
	}
	 := NewScheduler(.RedisConnOpt, .SchedulerOpts)
	 := .SyncInterval
	if  == 0 {
		 = defaultSyncInterval
	}
	return &PeriodicTaskManager{
		s:            ,
		p:            .PeriodicTaskConfigProvider,
		syncInterval: ,
		done:         make(chan struct{}),
		m:            make(map[string]string),
	}, nil
}

// PeriodicTaskConfigProvider provides configs for periodic tasks.
// GetConfigs will be called by a PeriodicTaskManager periodically to
// sync the scheduler's entries with the configs returned by the provider.
type PeriodicTaskConfigProvider interface {
	GetConfigs() ([]*PeriodicTaskConfig, error)
}

// PeriodicTaskConfig specifies the details of a periodic task.
type PeriodicTaskConfig struct {
	Cronspec string   // required: must be non empty string
	Task     *Task    // required: must be non nil
	Opts     []Option // optional: can be nil
}

func ( *PeriodicTaskConfig) () string {
	 := sha256.New()
	io.WriteString(, .Cronspec)
	io.WriteString(, .Task.Type())
	.Write(.Task.Payload())
	 := stringifyOptions(.Opts)
	sort.Strings()
	for ,  := range  {
		io.WriteString(, )
	}
	return fmt.Sprintf("%x", .Sum(nil))
}

func validatePeriodicTaskConfig( *PeriodicTaskConfig) error {
	if  == nil {
		return fmt.Errorf("PeriodicTaskConfig cannot be nil")
	}
	if .Task == nil {
		return fmt.Errorf("PeriodicTaskConfig.Task cannot be nil")
	}
	if .Cronspec == "" {
		return fmt.Errorf("PeriodicTaskConfig.Cronspec cannot be empty")
	}
	return nil
}

// Start starts a scheduler and background goroutine to sync the scheduler with the configs
// returned by the provider.
//
// Start returns any error encountered at start up time.
func ( *PeriodicTaskManager) () error {
	if .s == nil || .p == nil {
		panic("asynq: cannot start uninitialized PeriodicTaskManager; use NewPeriodicTaskManager to initialize")
	}
	if  := .initialSync();  != nil {
		return fmt.Errorf("asynq: %v", )
	}
	if  := .s.Start();  != nil {
		return fmt.Errorf("asynq: %v", )
	}
	.wg.Add(1)
	go func() {
		defer .wg.Done()
		 := time.NewTicker(.syncInterval)
		for {
			select {
			case <-.done:
				.s.logger.Debugf("Stopping syncer goroutine")
				.Stop()
				return
			case <-.C:
				.sync()
			}
		}
	}()
	return nil
}

// Shutdown gracefully shuts down the manager.
// It notifies a background syncer goroutine to stop and stops scheduler.
func ( *PeriodicTaskManager) () {
	close(.done)
	.wg.Wait()
	.s.Shutdown()
}

// Run starts the manager and blocks until an os signal to exit the program is received.
// Once it receives a signal, it gracefully shuts down the manager.
func ( *PeriodicTaskManager) () error {
	if  := .Start();  != nil {
		return 
	}
	.s.waitForSignals()
	.Shutdown()
	.s.logger.Debugf("PeriodicTaskManager exiting")
	return nil
}

func ( *PeriodicTaskManager) () error {
	,  := .p.GetConfigs()
	if  != nil {
		return fmt.Errorf("initial call to GetConfigs failed: %v", )
	}
	for ,  := range  {
		if  := validatePeriodicTaskConfig();  != nil {
			return fmt.Errorf("initial call to GetConfigs contained an invalid config: %v", )
		}
	}
	.add()
	return nil
}

func ( *PeriodicTaskManager) ( []*PeriodicTaskConfig) {
	for ,  := range  {
		,  := .s.Register(.Cronspec, .Task, .Opts...)
		if  != nil {
			.s.logger.Errorf("Failed to register periodic task: cronspec=%q task=%q",
				.Cronspec, .Task.Type())
			continue
		}
		.m[.hash()] = 
		.s.logger.Infof("Successfully registered periodic task: cronspec=%q task=%q, entryID=%s",
			.Cronspec, .Task.Type(), )
	}
}

func ( *PeriodicTaskManager) ( map[string]string) {
	for ,  := range  {
		if  := .s.Unregister();  != nil {
			.s.logger.Errorf("Failed to unregister periodic task: %v", )
			continue
		}
		delete(.m, )
		.s.logger.Infof("Successfully unregistered periodic task: entryID=%s", )
	}
}

func ( *PeriodicTaskManager) () {
	,  := .p.GetConfigs()
	if  != nil {
		.s.logger.Errorf("Failed to get periodic task configs: %v", )
		return
	}
	for ,  := range  {
		if  := validatePeriodicTaskConfig();  != nil {
			.s.logger.Errorf("Failed to sync: GetConfigs returned an invalid config: %v", )
			return
		}
	}
	// Diff and only register/unregister the newly added/removed entries.
	 := .diffRemoved()
	 := .diffAdded()
	.remove()
	.add()
}

// diffRemoved diffs the incoming configs with the registered config and returns
// a map containing hash and entryID of each config that was removed.
func ( *PeriodicTaskManager) ( []*PeriodicTaskConfig) map[string]string {
	 := make(map[string]string)
	for ,  := range  {
		[.hash()] = "" // empty value since we don't have entryID yet
	}
	 := make(map[string]string)
	for ,  := range .m {
		// test whether existing config is present in the incoming configs
		if ,  := []; ! {
			[] = 
		}
	}
	return 
}

// diffAdded diffs the incoming configs with the registered configs and returns
// a list of configs that were added.
func ( *PeriodicTaskManager) ( []*PeriodicTaskConfig) []*PeriodicTaskConfig {
	var  []*PeriodicTaskConfig
	for ,  := range  {
		if ,  := .m[.hash()]; ! {
			 = append(, )
		}
	}
	return 
}