package asynq
import (
"fmt"
"os"
"sync"
"time"
"github.com/redis/go-redis/v9"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
"github.com/robfig/cron/v3"
)
type Scheduler struct {
id string
state *serverState
logger *log .Logger
client *Client
rdb *rdb .RDB
cron *cron .Cron
location *time .Location
done chan struct {}
wg sync .WaitGroup
preEnqueueFunc func (task *Task , opts []Option )
postEnqueueFunc func (info *TaskInfo , err error )
errHandler func (task *Task , opts []Option , err error )
mu sync .Mutex
idmap map [string ]cron .EntryID
}
func NewScheduler (r RedisConnOpt , opts *SchedulerOpts ) *Scheduler {
c , ok := r .MakeRedisClient ().(redis .UniversalClient )
if !ok {
panic (fmt .Sprintf ("asynq: unsupported RedisConnOpt type %T" , r ))
}
if opts == nil {
opts = &SchedulerOpts {}
}
logger := log .NewLogger (opts .Logger )
loglevel := opts .LogLevel
if loglevel == level_unspecified {
loglevel = InfoLevel
}
logger .SetLevel (toInternalLogLevel (loglevel ))
loc := opts .Location
if loc == nil {
loc = time .UTC
}
return &Scheduler {
id : generateSchedulerID (),
state : &serverState {value : srvStateNew },
logger : logger ,
client : NewClient (r ),
rdb : rdb .NewRDB (c ),
cron : cron .New (cron .WithLocation (loc )),
location : loc ,
done : make (chan struct {}),
preEnqueueFunc : opts .PreEnqueueFunc ,
postEnqueueFunc : opts .PostEnqueueFunc ,
errHandler : opts .EnqueueErrorHandler ,
idmap : make (map [string ]cron .EntryID ),
}
}
func generateSchedulerID() string {
host , err := os .Hostname ()
if err != nil {
host = "unknown-host"
}
return fmt .Sprintf ("%s:%d:%v" , host , os .Getpid (), uuid .New ())
}
type SchedulerOpts struct {
Logger Logger
LogLevel LogLevel
Location *time .Location
PreEnqueueFunc func (task *Task , opts []Option )
PostEnqueueFunc func (info *TaskInfo , err error )
EnqueueErrorHandler func (task *Task , opts []Option , err error )
}
type enqueueJob struct {
id uuid .UUID
cronspec string
task *Task
opts []Option
location *time .Location
logger *log .Logger
client *Client
rdb *rdb .RDB
preEnqueueFunc func (task *Task , opts []Option )
postEnqueueFunc func (info *TaskInfo , err error )
errHandler func (task *Task , opts []Option , err error )
}
func (j *enqueueJob ) Run () {
if j .preEnqueueFunc != nil {
j .preEnqueueFunc (j .task , j .opts )
}
info , err := j .client .Enqueue (j .task , j .opts ...)
if j .postEnqueueFunc != nil {
j .postEnqueueFunc (info , err )
}
if err != nil {
if j .errHandler != nil {
j .errHandler (j .task , j .opts , err )
}
return
}
j .logger .Debugf ("scheduler enqueued a task: %+v" , info )
event := &base .SchedulerEnqueueEvent {
TaskID : info .ID ,
EnqueuedAt : time .Now ().In (j .location ),
}
err = j .rdb .RecordSchedulerEnqueueEvent (j .id .String (), event )
if err != nil {
j .logger .Warnf ("scheduler could not record enqueue event of enqueued task %s: %v" , info .ID , err )
}
}
func (s *Scheduler ) Register (cronspec string , task *Task , opts ...Option ) (entryID string , err error ) {
job := &enqueueJob {
id : uuid .New (),
cronspec : cronspec ,
task : task ,
opts : opts ,
location : s .location ,
client : s .client ,
rdb : s .rdb ,
logger : s .logger ,
preEnqueueFunc : s .preEnqueueFunc ,
postEnqueueFunc : s .postEnqueueFunc ,
errHandler : s .errHandler ,
}
cronID , err := s .cron .AddJob (cronspec , job )
if err != nil {
return "" , err
}
s .mu .Lock ()
s .idmap [job .id .String ()] = cronID
s .mu .Unlock ()
return job .id .String (), nil
}
func (s *Scheduler ) Unregister (entryID string ) error {
s .mu .Lock ()
defer s .mu .Unlock ()
cronID , ok := s .idmap [entryID ]
if !ok {
return fmt .Errorf ("asynq: no scheduler entry found" )
}
delete (s .idmap , entryID )
s .cron .Remove (cronID )
return nil
}
func (s *Scheduler ) Run () error {
if err := s .Start (); err != nil {
return err
}
s .waitForSignals ()
s .Shutdown ()
return nil
}
func (s *Scheduler ) Start () error {
if err := s .start (); err != nil {
return err
}
s .logger .Info ("Scheduler starting" )
s .logger .Infof ("Scheduler timezone is set to %v" , s .location )
s .cron .Start ()
s .wg .Add (1 )
go s .runHeartbeater ()
return nil
}
func (s *Scheduler ) start () error {
s .state .mu .Lock ()
defer s .state .mu .Unlock ()
switch s .state .value {
case srvStateActive :
return fmt .Errorf ("asynq: the scheduler is already running" )
case srvStateClosed :
return fmt .Errorf ("asynq: the scheduler has already been stopped" )
}
s .state .value = srvStateActive
return nil
}
func (s *Scheduler ) Shutdown () {
s .state .mu .Lock ()
if s .state .value == srvStateNew || s .state .value == srvStateClosed {
s .state .mu .Unlock ()
return
}
s .state .value = srvStateClosed
s .state .mu .Unlock ()
s .logger .Info ("Scheduler shutting down" )
close (s .done )
ctx := s .cron .Stop ()
<-ctx .Done ()
s .wg .Wait ()
s .clearHistory ()
s .client .Close ()
s .rdb .Close ()
s .logger .Info ("Scheduler stopped" )
}
func (s *Scheduler ) runHeartbeater () {
defer s .wg .Done ()
ticker := time .NewTicker (5 * time .Second )
for {
select {
case <- s .done :
s .logger .Debugf ("Scheduler heatbeater shutting down" )
s .rdb .ClearSchedulerEntries (s .id )
ticker .Stop ()
return
case <- ticker .C :
s .beat ()
}
}
}
func (s *Scheduler ) beat () {
var entries []*base .SchedulerEntry
for _ , entry := range s .cron .Entries () {
job := entry .Job .(*enqueueJob )
e := &base .SchedulerEntry {
ID : job .id .String (),
Spec : job .cronspec ,
Type : job .task .Type (),
Payload : job .task .Payload (),
Opts : stringifyOptions (job .opts ),
Next : entry .Next ,
Prev : entry .Prev ,
}
entries = append (entries , e )
}
s .logger .Debugf ("Writing entries %v" , entries )
if err := s .rdb .WriteSchedulerEntries (s .id , entries , 5 *time .Second ); err != nil {
s .logger .Warnf ("Scheduler could not write heartbeat data: %v" , err )
}
}
func stringifyOptions(opts []Option ) []string {
var res []string
for _ , opt := range opts {
res = append (res , opt .String ())
}
return res
}
func (s *Scheduler ) clearHistory () {
for _ , entry := range s .cron .Entries () {
job := entry .Job .(*enqueueJob )
if err := s .rdb .ClearSchedulerHistory (job .id .String ()); err != nil {
s .logger .Warnf ("Could not clear scheduler history for entry %q: %v" , job .id .String (), err )
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .