Source File
subscriber.go
Belonging Package
github.com/hibiken/asynq
// Copyright 2020 Kentaro Hibino. All rights reserved.// Use of this source code is governed by a MIT license// that can be found in the LICENSE file.package asynqimport ()type subscriber struct {logger *log.Loggerbroker base.Broker// channel to communicate back to the long running "subscriber" goroutine.done chan struct{}// cancelations hold cancel functions for all active tasks.cancelations *base.Cancelations// time to wait before retrying to connect to redis.retryTimeout time.Duration}type subscriberParams struct {logger *log.Loggerbroker base.Brokercancelations *base.Cancelations}func newSubscriber( subscriberParams) *subscriber {return &subscriber{logger: .logger,broker: .broker,done: make(chan struct{}),cancelations: .cancelations,retryTimeout: 5 * time.Second,}}func ( *subscriber) () {.logger.Debug("Subscriber shutting down...")// Signal the subscriber goroutine to stop..done <- struct{}{}}func ( *subscriber) ( *sync.WaitGroup) {.Add(1)go func() {defer .Done()var (*redis.PubSuberror)// Try until successfully connect to Redis.for {, = .broker.CancelationPubSub()if != nil {.logger.Errorf("cannot subscribe to cancelation channel: %v", )select {case <-time.After(.retryTimeout):continuecase <-.done:.logger.Debug("Subscriber done")return}}break}:= .Channel()for {select {case <-.done:.Close().logger.Debug("Subscriber done")returncase := <-:, := .cancelations.Get(.Payload)if {()}}}}()}
![]() |
The pages are generated with Golds v0.8.2. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |