Source File
syncer.go
Belonging Package
github.com/hibiken/asynq
// Copyright 2020 Kentaro Hibino. All rights reserved.// Use of this source code is governed by a MIT license// that can be found in the LICENSE file.package asynqimport ()// syncer is responsible for queuing up failed requests to redis and retry// those requests to sync state between the background process and redis.type syncer struct {logger *log.LoggerrequestsCh <-chan *syncRequest// channel to communicate back to the long running "syncer" goroutine.done chan struct{}// interval between sync operations.interval time.Duration}type syncRequest struct {fn func() error // sync operationerrMsg string // error messagedeadline time.Time // request should be dropped if deadline has been exceeded}type syncerParams struct {logger *log.LoggerrequestsCh <-chan *syncRequestinterval time.Duration}func newSyncer( syncerParams) *syncer {return &syncer{logger: .logger,requestsCh: .requestsCh,done: make(chan struct{}),interval: .interval,}}func ( *syncer) () {.logger.Debug("Syncer shutting down...")// Signal the syncer goroutine to stop..done <- struct{}{}}func ( *syncer) ( *sync.WaitGroup) {.Add(1)go func() {defer .Done()var []*syncRequestfor {select {case <-.done:// Try sync one last time before shutting down.for , := range {if := .fn(); != nil {.logger.Error(.errMsg)}}.logger.Debug("Syncer done")returncase := <-.requestsCh:= append(, )case <-time.After(.interval):var []*syncRequestfor , := range {if .deadline.Before(time.Now()) {continue // drop stale request}if := .fn(); != nil {= append(, )}}=}}}()}
![]() |
The pages are generated with Golds v0.8.2. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |