package asynq
import (
"crypto/sha256"
"fmt"
"io"
"sort"
"sync"
"time"
)
type PeriodicTaskManager struct {
s *Scheduler
p PeriodicTaskConfigProvider
syncInterval time .Duration
done chan (struct {})
wg sync .WaitGroup
m map [string ]string
}
type PeriodicTaskManagerOpts struct {
PeriodicTaskConfigProvider PeriodicTaskConfigProvider
RedisConnOpt RedisConnOpt
*SchedulerOpts
SyncInterval time .Duration
}
const defaultSyncInterval = 3 * time .Minute
func NewPeriodicTaskManager (opts PeriodicTaskManagerOpts ) (*PeriodicTaskManager , error ) {
if opts .PeriodicTaskConfigProvider == nil {
return nil , fmt .Errorf ("PeriodicTaskConfigProvider cannot be nil" )
}
if opts .RedisConnOpt == nil {
return nil , fmt .Errorf ("RedisConnOpt cannot be nil" )
}
scheduler := NewScheduler (opts .RedisConnOpt , opts .SchedulerOpts )
syncInterval := opts .SyncInterval
if syncInterval == 0 {
syncInterval = defaultSyncInterval
}
return &PeriodicTaskManager {
s : scheduler ,
p : opts .PeriodicTaskConfigProvider ,
syncInterval : syncInterval ,
done : make (chan struct {}),
m : make (map [string ]string ),
}, nil
}
type PeriodicTaskConfigProvider interface {
GetConfigs () ([]*PeriodicTaskConfig , error )
}
type PeriodicTaskConfig struct {
Cronspec string
Task *Task
Opts []Option
}
func (c *PeriodicTaskConfig ) hash () string {
h := sha256 .New ()
io .WriteString (h , c .Cronspec )
io .WriteString (h , c .Task .Type ())
h .Write (c .Task .Payload ())
opts := stringifyOptions (c .Opts )
sort .Strings (opts )
for _ , opt := range opts {
io .WriteString (h , opt )
}
return fmt .Sprintf ("%x" , h .Sum (nil ))
}
func validatePeriodicTaskConfig(c *PeriodicTaskConfig ) error {
if c == nil {
return fmt .Errorf ("PeriodicTaskConfig cannot be nil" )
}
if c .Task == nil {
return fmt .Errorf ("PeriodicTaskConfig.Task cannot be nil" )
}
if c .Cronspec == "" {
return fmt .Errorf ("PeriodicTaskConfig.Cronspec cannot be empty" )
}
return nil
}
func (mgr *PeriodicTaskManager ) Start () error {
if mgr .s == nil || mgr .p == nil {
panic ("asynq: cannot start uninitialized PeriodicTaskManager; use NewPeriodicTaskManager to initialize" )
}
if err := mgr .initialSync (); err != nil {
return fmt .Errorf ("asynq: %v" , err )
}
if err := mgr .s .Start (); err != nil {
return fmt .Errorf ("asynq: %v" , err )
}
mgr .wg .Add (1 )
go func () {
defer mgr .wg .Done ()
ticker := time .NewTicker (mgr .syncInterval )
for {
select {
case <- mgr .done :
mgr .s .logger .Debugf ("Stopping syncer goroutine" )
ticker .Stop ()
return
case <- ticker .C :
mgr .sync ()
}
}
}()
return nil
}
func (mgr *PeriodicTaskManager ) Shutdown () {
close (mgr .done )
mgr .wg .Wait ()
mgr .s .Shutdown ()
}
func (mgr *PeriodicTaskManager ) Run () error {
if err := mgr .Start (); err != nil {
return err
}
mgr .s .waitForSignals ()
mgr .Shutdown ()
mgr .s .logger .Debugf ("PeriodicTaskManager exiting" )
return nil
}
func (mgr *PeriodicTaskManager ) initialSync () error {
configs , err := mgr .p .GetConfigs ()
if err != nil {
return fmt .Errorf ("initial call to GetConfigs failed: %v" , err )
}
for _ , c := range configs {
if err := validatePeriodicTaskConfig (c ); err != nil {
return fmt .Errorf ("initial call to GetConfigs contained an invalid config: %v" , err )
}
}
mgr .add (configs )
return nil
}
func (mgr *PeriodicTaskManager ) add (configs []*PeriodicTaskConfig ) {
for _ , c := range configs {
entryID , err := mgr .s .Register (c .Cronspec , c .Task , c .Opts ...)
if err != nil {
mgr .s .logger .Errorf ("Failed to register periodic task: cronspec=%q task=%q" ,
c .Cronspec , c .Task .Type ())
continue
}
mgr .m [c .hash ()] = entryID
mgr .s .logger .Infof ("Successfully registered periodic task: cronspec=%q task=%q, entryID=%s" ,
c .Cronspec , c .Task .Type (), entryID )
}
}
func (mgr *PeriodicTaskManager ) remove (removed map [string ]string ) {
for hash , entryID := range removed {
if err := mgr .s .Unregister (entryID ); err != nil {
mgr .s .logger .Errorf ("Failed to unregister periodic task: %v" , err )
continue
}
delete (mgr .m , hash )
mgr .s .logger .Infof ("Successfully unregistered periodic task: entryID=%s" , entryID )
}
}
func (mgr *PeriodicTaskManager ) sync () {
configs , err := mgr .p .GetConfigs ()
if err != nil {
mgr .s .logger .Errorf ("Failed to get periodic task configs: %v" , err )
return
}
for _ , c := range configs {
if err := validatePeriodicTaskConfig (c ); err != nil {
mgr .s .logger .Errorf ("Failed to sync: GetConfigs returned an invalid config: %v" , err )
return
}
}
removed := mgr .diffRemoved (configs )
added := mgr .diffAdded (configs )
mgr .remove (removed )
mgr .add (added )
}
func (mgr *PeriodicTaskManager ) diffRemoved (configs []*PeriodicTaskConfig ) map [string ]string {
newConfigs := make (map [string ]string )
for _ , c := range configs {
newConfigs [c .hash ()] = ""
}
removed := make (map [string ]string )
for k , v := range mgr .m {
if _ , found := newConfigs [k ]; !found {
removed [k ] = v
}
}
return removed
}
func (mgr *PeriodicTaskManager ) diffAdded (configs []*PeriodicTaskConfig ) []*PeriodicTaskConfig {
var added []*PeriodicTaskConfig
for _ , c := range configs {
if _ , found := mgr .m [c .hash ()]; !found {
added = append (added , c )
}
}
return added
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .