package asynq
import (
"os"
"sync"
"time"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/timeutil"
)
type heartbeater struct {
logger *log .Logger
broker base .Broker
clock timeutil .Clock
done chan struct {}
interval time .Duration
host string
pid int
serverID string
concurrency int
queues map [string ]int
strictPriority bool
started time .Time
workers map [string ]*workerInfo
state *serverState
starting <-chan *workerInfo
finished <-chan *base .TaskMessage
}
type heartbeaterParams struct {
logger *log .Logger
broker base .Broker
interval time .Duration
concurrency int
queues map [string ]int
strictPriority bool
state *serverState
starting <-chan *workerInfo
finished <-chan *base .TaskMessage
}
func newHeartbeater(params heartbeaterParams ) *heartbeater {
host , err := os .Hostname ()
if err != nil {
host = "unknown-host"
}
return &heartbeater {
logger : params .logger ,
broker : params .broker ,
clock : timeutil .NewRealClock (),
done : make (chan struct {}),
interval : params .interval ,
host : host ,
pid : os .Getpid (),
serverID : uuid .New ().String (),
concurrency : params .concurrency ,
queues : params .queues ,
strictPriority : params .strictPriority ,
state : params .state ,
workers : make (map [string ]*workerInfo ),
starting : params .starting ,
finished : params .finished ,
}
}
func (h *heartbeater ) shutdown () {
h .logger .Debug ("Heartbeater shutting down..." )
h .done <- struct {}{}
}
type workerInfo struct {
msg *base .TaskMessage
started time .Time
deadline time .Time
lease *base .Lease
}
func (h *heartbeater ) start (wg *sync .WaitGroup ) {
wg .Add (1 )
go func () {
defer wg .Done ()
h .started = h .clock .Now ()
h .beat ()
timer := time .NewTimer (h .interval )
for {
select {
case <- h .done :
h .broker .ClearServerState (h .host , h .pid , h .serverID )
h .logger .Debug ("Heartbeater done" )
timer .Stop ()
return
case <- timer .C :
h .beat ()
timer .Reset (h .interval )
case w := <- h .starting :
h .workers [w .msg .ID ] = w
case msg := <- h .finished :
delete (h .workers , msg .ID )
}
}
}()
}
func (h *heartbeater ) beat () {
h .state .mu .Lock ()
srvStatus := h .state .value .String ()
h .state .mu .Unlock ()
info := base .ServerInfo {
Host : h .host ,
PID : h .pid ,
ServerID : h .serverID ,
Concurrency : h .concurrency ,
Queues : h .queues ,
StrictPriority : h .strictPriority ,
Status : srvStatus ,
Started : h .started ,
ActiveWorkerCount : len (h .workers ),
}
var ws []*base .WorkerInfo
idsByQueue := make (map [string ][]string )
for id , w := range h .workers {
ws = append (ws , &base .WorkerInfo {
Host : h .host ,
PID : h .pid ,
ServerID : h .serverID ,
ID : id ,
Type : w .msg .Type ,
Queue : w .msg .Queue ,
Payload : w .msg .Payload ,
Started : w .started ,
Deadline : w .deadline ,
})
if w .lease .IsValid () {
idsByQueue [w .msg .Queue ] = append (idsByQueue [w .msg .Queue ], id )
} else {
w .lease .NotifyExpiration ()
}
}
if err := h .broker .WriteServerState (&info , ws , h .interval *2 ); err != nil {
h .logger .Errorf ("Failed to write server state data: %v" , err )
}
for qname , ids := range idsByQueue {
expirationTime , err := h .broker .ExtendLease (qname , ids ...)
if err != nil {
h .logger .Errorf ("Failed to extend lease for tasks %v: %v" , ids , err )
continue
}
for _ , id := range ids {
if l := h .workers [id ].lease ; !l .Reset (expirationTime ) {
h .logger .Warnf ("Lease reset failed for %s; lease deadline: %v" , id , l .Deadline ())
}
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .