// Copyright 2020 Kentaro Hibino. All rights reserved.// Use of this source code is governed by a MIT license// that can be found in the LICENSE file.package asynqimport ()// Server is responsible for task processing and task lifecycle management.//// Server pulls tasks off queues and processes them.// If the processing of a task is unsuccessful, server will schedule it for a retry.//// A task will be retried until either the task gets processed successfully// or until it reaches its max retry count.//// If a task exhausts its retries, it will be moved to the archive and// will be kept in the archive set.// Note that the archive size is finite and once it reaches its max size,// oldest tasks in the archive will be deleted.typeServerstruct { logger *log.Logger broker base.Broker state *serverState// wait group to wait for all goroutines to finish. wg sync.WaitGroup forwarder *forwarder processor *processor syncer *syncer heartbeater *heartbeater subscriber *subscriber recoverer *recoverer healthchecker *healthchecker janitor *janitor aggregator *aggregator}type serverState struct { mu sync.Mutex value serverStateValue}type serverStateValue intconst (// StateNew represents a new server. Server begins in // this state and then transition to StatusActive when // Start or Run is callled. srvStateNew serverStateValue = iota// StateActive indicates the server is up and active. srvStateActive// StateStopped indicates the server is up but no longer processing new tasks. srvStateStopped// StateClosed indicates the server has been shutdown. srvStateClosed)var serverStates = []string{"new","active","stopped","closed",}func ( serverStateValue) () string {ifsrvStateNew <= && <= srvStateClosed {returnserverStates[] }return"unknown status"}// Config specifies the server's background-task processing behavior.typeConfigstruct {// Maximum number of concurrent processing of tasks. // // If set to a zero or negative value, NewServer will overwrite the value // to the number of CPUs usable by the current process. Concurrency int// BaseContext optionally specifies a function that returns the base context for Handler invocations on this server. // // If BaseContext is nil, the default is context.Background(). // If this is defined, then it MUST return a non-nil context BaseContext func() context.Context// Function to calculate retry delay for a failed task. // // By default, it uses exponential backoff algorithm to calculate the delay. RetryDelayFunc RetryDelayFunc// Predicate function to determine whether the error returned from Handler is a failure. // If the function returns false, Server will not increment the retried counter for the task, // and Server won't record the queue stats (processed and failed stats) to avoid skewing the error // rate of the queue. // // By default, if the given error is non-nil the function returns true. IsFailure func(error) bool// List of queues to process with given priority value. Keys are the names of the // queues and values are associated priority value. // // If set to nil or not specified, the server will process only the "default" queue. // // Priority is treated as follows to avoid starving low priority queues. // // Example: // // Queues: map[string]int{ // "critical": 6, // "default": 3, // "low": 1, // } // // With the above config and given that all queues are not empty, the tasks // in "critical", "default", "low" should be processed 60%, 30%, 10% of // the time respectively. // // If a queue has a zero or negative priority value, the queue will be ignored. Queues map[string]int// StrictPriority indicates whether the queue priority should be treated strictly. // // If set to true, tasks in the queue with the highest priority is processed first. // The tasks in lower priority queues are processed only when those queues with // higher priorities are empty. StrictPriority bool// ErrorHandler handles errors returned by the task handler. // // HandleError is invoked only if the task handler returns a non-nil error. // // Example: // // func reportError(ctx context, task *asynq.Task, err error) { // retried, _ := asynq.GetRetryCount(ctx) // maxRetry, _ := asynq.GetMaxRetry(ctx) // if retried >= maxRetry { // err = fmt.Errorf("retry exhausted for task %s: %w", task.Type, err) // } // errorReportingService.Notify(err) // }) // // ErrorHandler: asynq.ErrorHandlerFunc(reportError) ErrorHandler ErrorHandler// Logger specifies the logger used by the server instance. // // If unset, default logger is used. Logger Logger// LogLevel specifies the minimum log level to enable. // // If unset, InfoLevel is used by default. LogLevel LogLevel// ShutdownTimeout specifies the duration to wait to let workers finish their tasks // before forcing them to abort when stopping the server. // // If unset or zero, default timeout of 8 seconds is used. ShutdownTimeout time.Duration// HealthCheckFunc is called periodically with any errors encountered during ping to the // connected redis server. HealthCheckFunc func(error)// HealthCheckInterval specifies the interval between healthchecks. // // If unset or zero, the interval is set to 15 seconds. HealthCheckInterval time.Duration// DelayedTaskCheckInterval specifies the interval between checks run on 'scheduled' and 'retry' // tasks, and forwarding them to 'pending' state if they are ready to be processed. // // If unset or zero, the interval is set to 5 seconds. DelayedTaskCheckInterval time.Duration// GroupGracePeriod specifies the amount of time the server will wait for an incoming task before aggregating // the tasks in a group. If an incoming task is received within this period, the server will wait for another // period of the same length, up to GroupMaxDelay if specified. // // If unset or zero, the grace period is set to 1 minute. // Minimum duration for GroupGracePeriod is 1 second. If value specified is less than a second, the call to // NewServer will panic. GroupGracePeriod time.Duration// GroupMaxDelay specifies the maximum amount of time the server will wait for incoming tasks before aggregating // the tasks in a group. // // If unset or zero, no delay limit is used. GroupMaxDelay time.Duration// GroupMaxSize specifies the maximum number of tasks that can be aggregated into a single task within a group. // If GroupMaxSize is reached, the server will aggregate the tasks into one immediately. // // If unset or zero, no size limit is used. GroupMaxSize int// GroupAggregator specifies the aggregation function used to aggregate multiple tasks in a group into one task. // // If unset or nil, the group aggregation feature will be disabled on the server. GroupAggregator GroupAggregator}// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.typeGroupAggregatorinterface {// Aggregate aggregates the given tasks in a group with the given group name, // and returns a new task which is the aggregation of those tasks. // // Use NewTask(typename, payload, opts...) to set any options for the aggregated task. // The Queue option, if provided, will be ignored and the aggregated task will always be enqueued // to the same queue the group belonged.Aggregate(group string, tasks []*Task) *Task}// The GroupAggregatorFunc type is an adapter to allow the use of ordinary functions as a GroupAggregator.// If f is a function with the appropriate signature, GroupAggregatorFunc(f) is a GroupAggregator that calls f.typeGroupAggregatorFuncfunc(group string, tasks []*Task) *Task// Aggregate calls fn(group, tasks)func ( GroupAggregatorFunc) ( string, []*Task) *Task {return (, )}// An ErrorHandler handles an error occurred during task processing.typeErrorHandlerinterface {HandleError(ctx context.Context, task *Task, err error)}// The ErrorHandlerFunc type is an adapter to allow the use of ordinary functions as a ErrorHandler.// If f is a function with the appropriate signature, ErrorHandlerFunc(f) is a ErrorHandler that calls f.typeErrorHandlerFuncfunc(ctx context.Context, task *Task, err error)// HandleError calls fn(ctx, task, err)func ( ErrorHandlerFunc) ( context.Context, *Task, error) { (, , )}// RetryDelayFunc calculates the retry delay duration for a failed task given// the retry count, error, and the task.//// n is the number of times the task has been retried.// e is the error returned by the task handler.// t is the task in question.typeRetryDelayFuncfunc(n int, e error, t *Task) time.Duration// Logger supports logging at various log levels.typeLoggerinterface {// Debug logs a message at Debug level.Debug(args ...interface{})// Info logs a message at Info level.Info(args ...interface{})// Warn logs a message at Warning level.Warn(args ...interface{})// Error logs a message at Error level.Error(args ...interface{})// Fatal logs a message at Fatal level // and process will exit with status set to 1.Fatal(args ...interface{})}// LogLevel represents logging level.//// It satisfies flag.Value interface.typeLogLevelint32const (// Note: reserving value zero to differentiate unspecified case. level_unspecified LogLevel = iota// DebugLevel is the lowest level of logging. // Debug logs are intended for debugging and development purposes.DebugLevel// InfoLevel is used for general informational log messages.InfoLevel// WarnLevel is used for undesired but relatively expected events, // which may indicate a problem.WarnLevel// ErrorLevel is used for undesired and unexpected events that // the program can recover from.ErrorLevel// FatalLevel is used for undesired and unexpected events that // the program cannot recover from.FatalLevel)// String is part of the flag.Value interface.func ( *LogLevel) () string {switch * {caseDebugLevel:return"debug"caseInfoLevel:return"info"caseWarnLevel:return"warn"caseErrorLevel:return"error"caseFatalLevel:return"fatal" }panic(fmt.Sprintf("asynq: unexpected log level: %v", *))}// Set is part of the flag.Value interface.func ( *LogLevel) ( string) error {switchstrings.ToLower() {case"debug": * = DebugLevelcase"info": * = InfoLevelcase"warn", "warning": * = WarnLevelcase"error": * = ErrorLevelcase"fatal": * = FatalLeveldefault:returnfmt.Errorf("asynq: unsupported log level %q", ) }returnnil}func toInternalLogLevel( LogLevel) log.Level {switch {caseDebugLevel:returnlog.DebugLevelcaseInfoLevel:returnlog.InfoLevelcaseWarnLevel:returnlog.WarnLevelcaseErrorLevel:returnlog.ErrorLevelcaseFatalLevel:returnlog.FatalLevel }panic(fmt.Sprintf("asynq: unexpected log level: %v", ))}// DefaultRetryDelayFunc is the default RetryDelayFunc used if one is not specified in Config.// It uses exponential back-off strategy to calculate the retry delay.func ( int, error, *Task) time.Duration { := rand.New(rand.NewSource(time.Now().UnixNano()))// Formula taken from https://github.com/mperham/sidekiq. := int(math.Pow(float64(), 4)) + 15 + (.Intn(30) * ( + 1))returntime.Duration() * time.Second}func defaultIsFailureFunc( error) bool { return != nil }var defaultQueueConfig = map[string]int{base.DefaultQueueName: 1,}const ( defaultShutdownTimeout = 8 * time.Second defaultHealthCheckInterval = 15 * time.Second defaultDelayedTaskCheckInterval = 5 * time.Second defaultGroupGracePeriod = 1 * time.Minute)// NewServer returns a new Server given a redis connection option// and server configuration.func ( RedisConnOpt, Config) *Server { , := .MakeRedisClient().(redis.UniversalClient)if ! {panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", )) } := .BaseContextif == nil { = context.Background } := .Concurrencyif < 1 { = runtime.NumCPU() } := .RetryDelayFuncif == nil { = DefaultRetryDelayFunc } := .IsFailureif == nil { = defaultIsFailureFunc } := make(map[string]int)for , := range .Queues {if := base.ValidateQueueName(); != nil {continue// ignore invalid queue names }if > 0 { [] = } }iflen() == 0 { = defaultQueueConfig }var []stringfor := range { = append(, ) } := .ShutdownTimeoutif == 0 { = defaultShutdownTimeout } := .HealthCheckIntervalif == 0 { = defaultHealthCheckInterval }// TODO: Create a helper to check for zero value and fall back to default (e.g. getDurationOrDefault()) := .GroupGracePeriodif == 0 { = defaultGroupGracePeriod }if < time.Second {panic("GroupGracePeriod cannot be less than a second") } := log.NewLogger(.Logger) := .LogLevelif == level_unspecified { = InfoLevel } .SetLevel(toInternalLogLevel()) := rdb.NewRDB() := make(chan *workerInfo) := make(chan *base.TaskMessage) := make(chan *syncRequest) := &serverState{value: srvStateNew} := base.NewCancelations() := newSyncer(syncerParams{logger: ,requestsCh: ,interval: 5 * time.Second, }) := newHeartbeater(heartbeaterParams{logger: ,broker: ,interval: 5 * time.Second,concurrency: ,queues: ,strictPriority: .StrictPriority,state: ,starting: ,finished: , }) := .DelayedTaskCheckIntervalif == 0 { = defaultDelayedTaskCheckInterval } := newForwarder(forwarderParams{logger: ,broker: ,queues: ,interval: , }) := newSubscriber(subscriberParams{logger: ,broker: ,cancelations: , }) := newProcessor(processorParams{logger: ,broker: ,retryDelayFunc: ,baseCtxFn: ,isFailureFunc: ,syncCh: ,cancelations: ,concurrency: ,queues: ,strictPriority: .StrictPriority,errHandler: .ErrorHandler,shutdownTimeout: ,starting: ,finished: , }) := newRecoverer(recovererParams{logger: ,broker: ,retryDelayFunc: ,isFailureFunc: ,queues: ,interval: 1 * time.Minute, }) := newHealthChecker(healthcheckerParams{logger: ,broker: ,interval: ,healthcheckFunc: .HealthCheckFunc, }) := newJanitor(janitorParams{logger: ,broker: ,queues: ,interval: 8 * time.Second, }) := newAggregator(aggregatorParams{logger: ,broker: ,queues: ,gracePeriod: ,maxDelay: .GroupMaxDelay,maxSize: .GroupMaxSize,groupAggregator: .GroupAggregator, })return &Server{logger: ,broker: ,state: ,forwarder: ,processor: ,syncer: ,heartbeater: ,subscriber: ,recoverer: ,healthchecker: ,janitor: ,aggregator: , }}// A Handler processes tasks.//// ProcessTask should return nil if the processing of a task// is successful.//// If ProcessTask returns a non-nil error or panics, the task// will be retried after delay if retry-count is remaining,// otherwise the task will be archived.//// One exception to this rule is when ProcessTask returns a SkipRetry error.// If the returned error is SkipRetry or an error wraps SkipRetry, retry is// skipped and the task will be immediately archived instead.typeHandlerinterface {ProcessTask(context.Context, *Task) error}// The HandlerFunc type is an adapter to allow the use of// ordinary functions as a Handler. If f is a function// with the appropriate signature, HandlerFunc(f) is a// Handler that calls f.typeHandlerFuncfunc(context.Context, *Task) error// ProcessTask calls fn(ctx, task)func ( HandlerFunc) ( context.Context, *Task) error {return (, )}// ErrServerClosed indicates that the operation is now illegal because of the server has been shutdown.varErrServerClosed = errors.New("asynq: Server closed")// Run starts the task processing and blocks until// an os signal to exit the program is received. Once it receives// a signal, it gracefully shuts down all active workers and other// goroutines to process the tasks.//// Run returns any error encountered at server startup time.// If the server has already been shutdown, ErrServerClosed is returned.func ( *Server) ( Handler) error {if := .Start(); != nil {return } .waitForSignals() .Shutdown()returnnil}// Start starts the worker server. Once the server has started,// it pulls tasks off queues and starts a worker goroutine for each task// and then call Handler to process it.// Tasks are processed concurrently by the workers up to the number of// concurrency specified in Config.Concurrency.//// Start returns any error encountered at server startup time.// If the server has already been shutdown, ErrServerClosed is returned.func ( *Server) ( Handler) error {if == nil {returnfmt.Errorf("asynq: server cannot run with nil handler") } .processor.handler = if := .start(); != nil {return } .logger.Info("Starting processing") .heartbeater.start(&.wg) .healthchecker.start(&.wg) .subscriber.start(&.wg) .syncer.start(&.wg) .recoverer.start(&.wg) .forwarder.start(&.wg) .processor.start(&.wg) .janitor.start(&.wg) .aggregator.start(&.wg)returnnil}// Checks server state and returns an error if pre-condition is not met.// Otherwise it sets the server state to active.func ( *Server) () error { .state.mu.Lock()defer .state.mu.Unlock()switch .state.value {casesrvStateActive:returnfmt.Errorf("asynq: the server is already running")casesrvStateStopped:returnfmt.Errorf("asynq: the server is in the stopped state. Waiting for shutdown.")casesrvStateClosed:returnErrServerClosed } .state.value = srvStateActivereturnnil}// Shutdown gracefully shuts down the server.// It gracefully closes all active workers. The server will wait for// active workers to finish processing tasks for duration specified in Config.ShutdownTimeout.// If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis.func ( *Server) () { .state.mu.Lock()if .state.value == srvStateNew || .state.value == srvStateClosed { .state.mu.Unlock()// server is not running, do nothing and return.return } .state.value = srvStateClosed .state.mu.Unlock() .logger.Info("Starting graceful shutdown")// Note: The order of shutdown is important. // Sender goroutines should be terminated before the receiver goroutines. // processor -> syncer (via syncCh) // processor -> heartbeater (via starting, finished channels) .forwarder.shutdown() .processor.shutdown() .recoverer.shutdown() .syncer.shutdown() .subscriber.shutdown() .janitor.shutdown() .aggregator.shutdown() .healthchecker.shutdown() .heartbeater.shutdown() .wg.Wait() .broker.Close() .logger.Info("Exiting")}// Stop signals the server to stop pulling new tasks off queues.// Stop can be used before shutting down the server to ensure that all// currently active tasks are processed before server shutdown.//// Stop does not shutdown the server, make sure to call Shutdown before exit.func ( *Server) () { .state.mu.Lock()if .state.value != srvStateActive {// Invalid calll to Stop, server can only go from Active state to Stopped state. .state.mu.Unlock()return } .state.value = srvStateStopped .state.mu.Unlock() .logger.Info("Stopping processor") .processor.stop() .logger.Info("Processor stopped")}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.