// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package asynq

import (
	
	
	
	

	
	
	
	
	
	
)

// A Scheduler kicks off tasks at regular intervals based on the user defined schedule.
//
// Schedulers are safe for concurrent use by multiple goroutines.
type Scheduler struct {
	id string

	state *serverState

	logger          *log.Logger
	client          *Client
	rdb             *rdb.RDB
	cron            *cron.Cron
	location        *time.Location
	done            chan struct{}
	wg              sync.WaitGroup
	preEnqueueFunc  func(task *Task, opts []Option)
	postEnqueueFunc func(info *TaskInfo, err error)
	errHandler      func(task *Task, opts []Option, err error)

	// guards idmap
	mu sync.Mutex
	// idmap maps Scheduler's entry ID to cron.EntryID
	// to avoid using cron.EntryID as the public API of
	// the Scheduler.
	idmap map[string]cron.EntryID
}

// NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil
func ( RedisConnOpt,  *SchedulerOpts) *Scheduler {
	,  := .MakeRedisClient().(redis.UniversalClient)
	if ! {
		panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", ))
	}
	if  == nil {
		 = &SchedulerOpts{}
	}

	 := log.NewLogger(.Logger)
	 := .LogLevel
	if  == level_unspecified {
		 = InfoLevel
	}
	.SetLevel(toInternalLogLevel())

	 := .Location
	if  == nil {
		 = time.UTC
	}

	return &Scheduler{
		id:              generateSchedulerID(),
		state:           &serverState{value: srvStateNew},
		logger:          ,
		client:          NewClient(),
		rdb:             rdb.NewRDB(),
		cron:            cron.New(cron.WithLocation()),
		location:        ,
		done:            make(chan struct{}),
		preEnqueueFunc:  .PreEnqueueFunc,
		postEnqueueFunc: .PostEnqueueFunc,
		errHandler:      .EnqueueErrorHandler,
		idmap:           make(map[string]cron.EntryID),
	}
}

func generateSchedulerID() string {
	,  := os.Hostname()
	if  != nil {
		 = "unknown-host"
	}
	return fmt.Sprintf("%s:%d:%v", , os.Getpid(), uuid.New())
}

// SchedulerOpts specifies scheduler options.
type SchedulerOpts struct {
	// Logger specifies the logger used by the scheduler instance.
	//
	// If unset, the default logger is used.
	Logger Logger

	// LogLevel specifies the minimum log level to enable.
	//
	// If unset, InfoLevel is used by default.
	LogLevel LogLevel

	// Location specifies the time zone location.
	//
	// If unset, the UTC time zone (time.UTC) is used.
	Location *time.Location

	// PreEnqueueFunc, if provided, is called before a task gets enqueued by Scheduler.
	// The callback function should return quickly to not block the current thread.
	PreEnqueueFunc func(task *Task, opts []Option)

	// PostEnqueueFunc, if provided, is called after a task gets enqueued by Scheduler.
	// The callback function should return quickly to not block the current thread.
	PostEnqueueFunc func(info *TaskInfo, err error)

	// Deprecated: Use PostEnqueueFunc instead
	// EnqueueErrorHandler gets called when scheduler cannot enqueue a registered task
	// due to an error.
	EnqueueErrorHandler func(task *Task, opts []Option, err error)
}

// enqueueJob encapsulates the job of enqueuing a task and recording the event.
type enqueueJob struct {
	id              uuid.UUID
	cronspec        string
	task            *Task
	opts            []Option
	location        *time.Location
	logger          *log.Logger
	client          *Client
	rdb             *rdb.RDB
	preEnqueueFunc  func(task *Task, opts []Option)
	postEnqueueFunc func(info *TaskInfo, err error)
	errHandler      func(task *Task, opts []Option, err error)
}

func ( *enqueueJob) () {
	if .preEnqueueFunc != nil {
		.preEnqueueFunc(.task, .opts)
	}
	,  := .client.Enqueue(.task, .opts...)
	if .postEnqueueFunc != nil {
		.postEnqueueFunc(, )
	}
	if  != nil {
		if .errHandler != nil {
			.errHandler(.task, .opts, )
		}
		return
	}
	.logger.Debugf("scheduler enqueued a task: %+v", )
	 := &base.SchedulerEnqueueEvent{
		TaskID:     .ID,
		EnqueuedAt: time.Now().In(.location),
	}
	 = .rdb.RecordSchedulerEnqueueEvent(.id.String(), )
	if  != nil {
		.logger.Warnf("scheduler could not record enqueue event of enqueued task %s: %v", .ID, )
	}
}

// Register registers a task to be enqueued on the given schedule specified by the cronspec.
// It returns an ID of the newly registered entry.
func ( *Scheduler) ( string,  *Task,  ...Option) ( string,  error) {
	 := &enqueueJob{
		id:              uuid.New(),
		cronspec:        ,
		task:            ,
		opts:            ,
		location:        .location,
		client:          .client,
		rdb:             .rdb,
		logger:          .logger,
		preEnqueueFunc:  .preEnqueueFunc,
		postEnqueueFunc: .postEnqueueFunc,
		errHandler:      .errHandler,
	}
	,  := .cron.AddJob(, )
	if  != nil {
		return "", 
	}
	.mu.Lock()
	.idmap[.id.String()] = 
	.mu.Unlock()
	return .id.String(), nil
}

// Unregister removes a registered entry by entry ID.
// Unregister returns a non-nil error if no entries were found for the given entryID.
func ( *Scheduler) ( string) error {
	.mu.Lock()
	defer .mu.Unlock()
	,  := .idmap[]
	if ! {
		return fmt.Errorf("asynq: no scheduler entry found")
	}
	delete(.idmap, )
	.cron.Remove()
	return nil
}

// Run starts the scheduler until an os signal to exit the program is received.
// It returns an error if scheduler is already running or has been shutdown.
func ( *Scheduler) () error {
	if  := .Start();  != nil {
		return 
	}
	.waitForSignals()
	.Shutdown()
	return nil
}

// Start starts the scheduler.
// It returns an error if the scheduler is already running or has been shutdown.
func ( *Scheduler) () error {
	if  := .start();  != nil {
		return 
	}
	.logger.Info("Scheduler starting")
	.logger.Infof("Scheduler timezone is set to %v", .location)
	.cron.Start()
	.wg.Add(1)
	go .runHeartbeater()
	return nil
}

// Checks server state and returns an error if pre-condition is not met.
// Otherwise it sets the server state to active.
func ( *Scheduler) () error {
	.state.mu.Lock()
	defer .state.mu.Unlock()
	switch .state.value {
	case srvStateActive:
		return fmt.Errorf("asynq: the scheduler is already running")
	case srvStateClosed:
		return fmt.Errorf("asynq: the scheduler has already been stopped")
	}
	.state.value = srvStateActive
	return nil
}

// Shutdown stops and shuts down the scheduler.
func ( *Scheduler) () {
	.state.mu.Lock()
	if .state.value == srvStateNew || .state.value == srvStateClosed {
		// scheduler is not running, do nothing and return.
		.state.mu.Unlock()
		return
	}
	.state.value = srvStateClosed
	.state.mu.Unlock()

	.logger.Info("Scheduler shutting down")
	close(.done) // signal heartbeater to stop
	 := .cron.Stop()
	<-.Done()
	.wg.Wait()

	.clearHistory()
	.client.Close()
	.rdb.Close()
	.logger.Info("Scheduler stopped")
}

func ( *Scheduler) () {
	defer .wg.Done()
	 := time.NewTicker(5 * time.Second)
	for {
		select {
		case <-.done:
			.logger.Debugf("Scheduler heatbeater shutting down")
			.rdb.ClearSchedulerEntries(.id)
			.Stop()
			return
		case <-.C:
			.beat()
		}
	}
}

// beat writes a snapshot of entries to redis.
func ( *Scheduler) () {
	var  []*base.SchedulerEntry
	for ,  := range .cron.Entries() {
		 := .Job.(*enqueueJob)
		 := &base.SchedulerEntry{
			ID:      .id.String(),
			Spec:    .cronspec,
			Type:    .task.Type(),
			Payload: .task.Payload(),
			Opts:    stringifyOptions(.opts),
			Next:    .Next,
			Prev:    .Prev,
		}
		 = append(, )
	}
	.logger.Debugf("Writing entries %v", )
	if  := .rdb.WriteSchedulerEntries(.id, , 5*time.Second);  != nil {
		.logger.Warnf("Scheduler could not write heartbeat data: %v", )
	}
}

func stringifyOptions( []Option) []string {
	var  []string
	for ,  := range  {
		 = append(, .String())
	}
	return 
}

func ( *Scheduler) () {
	for ,  := range .cron.Entries() {
		 := .Job.(*enqueueJob)
		if  := .rdb.ClearSchedulerHistory(.id.String());  != nil {
			.logger.Warnf("Could not clear scheduler history for entry %q: %v", .id.String(), )
		}
	}
}