Source File
sweeper.go
Belonging Package
github.com/libp2p/go-flow-metrics
package flowimport ()// IdleRate the rate at which we declare a meter idle (and stop tracking it// until it's re-registered).//// The default ensures that 1 event every ~30s will keep the meter from going// idle.var IdleRate = 1e-13// Alpha for EWMA of 1svar alpha = 1 - math.Exp(-1.0)// The global sweeper.var globalSweeper sweepervar cl = clock.New()// SetClock sets a clock to use in the sweeper.// This will probably only ever be useful for testing purposes.func ( clock.Clock) {cl =}// We tick every second.var ewmaRate = time.Secondtype sweeper struct {sweepOnce sync.OncesnapshotMu sync.RWMutexmeters []*MeteractiveMeters intlastUpdateTime time.TimeregisterChannel chan *Meter}func ( *sweeper) () {.registerChannel = make(chan *Meter, 16)go .run()}func ( *sweeper) () {for := range .registerChannel {.register().runActive()}}func ( *sweeper) ( *Meter) {if .registered {// registered twice, move on.return}.registered = true.meters = append(.meters, )}func ( *sweeper) () {:= cl.Ticker(ewmaRate)defer .Stop().lastUpdateTime = cl.Now()for len(.meters) > 0 {// Scale back allocation.if len(.meters)*2 < cap(.meters) {:= make([]*Meter, len(.meters))copy(, .meters).meters =}select {case <-.C:.update()case := <-.registerChannel:.register()}}.meters = nil// Till next time.}func ( *sweeper) () {.snapshotMu.Lock()defer .snapshotMu.Unlock():= cl.Now():= .Sub(.lastUpdateTime)if < 0 {// we went back in time, skip this update.// note: if we go _forward_ in time, we don't really care as// we'll just log really low bandwidth for a second..lastUpdateTime =// update the totals but leave the rates alone.for , := range .meters {.snapshot.Total = .accumulator.Load()}return} else if <= ewmaRate/10 {// If the time-delta is too small, wait a bit. Otherwise, we can end up logging a// very large spike.//// This won't fix the case where a user passes a large update (spanning multiple// seconds) to `Meter.Mark`, but it will fix the case where the system fails to// accurately schedule the sweeper goroutine.return}.lastUpdateTime =:= float64(ewmaRate) / float64()// Calculate the bandwidth for all active meters.for , := range .meters[:.activeMeters] {:= .accumulator.Load():= - .snapshot.Total:= * float64()if > 0 {.snapshot.LastUpdate =}if .snapshot.Rate == 0 {.snapshot.Rate =} else {.snapshot.Rate += alpha * ( - .snapshot.Rate)}.snapshot.Total =// This is equivalent to one zeros, then one, then 30 zeros.// We'll consider that to be "idle".if .snapshot.Rate > IdleRate {continue}// Ok, so we are idle...// Mark this as idle by zeroing the accumulator.:= .accumulator.Swap(0)// So..., are we really idle?if > {// Not so idle...// Now we need to make sure this gets re-registered.// First, add back what we removed. If we can do this// fast enough, we can put it back before anyone// notices.:= .accumulator.Add()// Did we make it?if == {// Yes! Nobody noticed, move along.continue}// No. Someone noticed and will (or has) put back into// the registration channel.//// Remove the snapshot total, it'll get added back on// registration.//// `^uint64(total - 1)` is the two's complement of// `total`. It's the "correct" way to subtract// atomically in go..accumulator.Add(^uint64(.snapshot.Total - 1))}// Reset the rate, keep the total..registered = false.snapshot.Rate = 0.meters[] = nil}// Re-add the total to all the newly active accumulators and set the snapshot to the total.// 1. We don't do this on register to avoid having to take the snapshot lock.// 2. We skip calculating the bandwidth for this round so we get an _accurate_ bandwidth calculation.for , := range .meters[.activeMeters:] {:= .accumulator.Add(.snapshot.Total)if > .snapshot.Total {.snapshot.LastUpdate =}.snapshot.Total =}// compress and trim the meter listvar intfor , := range .meters {if != nil {.meters[] =++}}.meters = .meters[:]// Finally, mark all meters still in the list as "active"..activeMeters = len(.meters)}func ( *sweeper) ( *Meter) {.sweepOnce.Do(.start).registerChannel <-}
![]() |
The pages are generated with Golds v0.8.2. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |