package flow

import (
	
	
	

	
)

// IdleRate the rate at which we declare a meter idle (and stop tracking it
// until it's re-registered).
//
// The default ensures that 1 event every ~30s will keep the meter from going
// idle.
var IdleRate = 1e-13

// Alpha for EWMA of 1s
var alpha = 1 - math.Exp(-1.0)

// The global sweeper.
var globalSweeper sweeper

var cl = clock.New()

// SetClock sets a clock to use in the sweeper.
// This will probably only ever be useful for testing purposes.
func ( clock.Clock) {
	cl = 
}

// We tick every second.
var ewmaRate = time.Second

type sweeper struct {
	sweepOnce sync.Once

	snapshotMu   sync.RWMutex
	meters       []*Meter
	activeMeters int

	lastUpdateTime  time.Time
	registerChannel chan *Meter
}

func ( *sweeper) () {
	.registerChannel = make(chan *Meter, 16)
	go .run()
}

func ( *sweeper) () {
	for  := range .registerChannel {
		.register()
		.runActive()
	}
}

func ( *sweeper) ( *Meter) {
	if .registered {
		// registered twice, move on.
		return
	}
	.registered = true
	.meters = append(.meters, )
}

func ( *sweeper) () {
	 := cl.Ticker(ewmaRate)
	defer .Stop()

	.lastUpdateTime = cl.Now()
	for len(.meters) > 0 {
		// Scale back allocation.
		if len(.meters)*2 < cap(.meters) {
			 := make([]*Meter, len(.meters))
			copy(, .meters)
			.meters = 
		}

		select {
		case <-.C:
			.update()
		case  := <-.registerChannel:
			.register()
		}
	}
	.meters = nil
	// Till next time.
}

func ( *sweeper) () {
	.snapshotMu.Lock()
	defer .snapshotMu.Unlock()

	 := cl.Now()
	 := .Sub(.lastUpdateTime)
	if  < 0 {
		// we went back in time, skip this update.
		// note: if we go _forward_ in time, we don't really care as
		// we'll just log really low bandwidth for a second.
		.lastUpdateTime = 

		// update the totals but leave the rates alone.
		for ,  := range .meters {
			.snapshot.Total = .accumulator.Load()
		}
		return
	} else if  <= ewmaRate/10 {
		// If the time-delta is too small, wait a bit. Otherwise, we can end up logging a
		// very large spike.
		//
		// This won't fix the case where a user passes a large update (spanning multiple
		// seconds) to `Meter.Mark`, but it will fix the case where the system fails to
		// accurately schedule the sweeper goroutine.
		return
	}

	.lastUpdateTime = 
	 := float64(ewmaRate) / float64()

	// Calculate the bandwidth for all active meters.
	for ,  := range .meters[:.activeMeters] {
		 := .accumulator.Load()
		 :=  - .snapshot.Total
		 :=  * float64()

		if  > 0 {
			.snapshot.LastUpdate = 
		}

		if .snapshot.Rate == 0 {
			.snapshot.Rate = 
		} else {
			.snapshot.Rate += alpha * ( - .snapshot.Rate)
		}
		.snapshot.Total = 

		// This is equivalent to one zeros, then one, then 30 zeros.
		// We'll consider that to be "idle".
		if .snapshot.Rate > IdleRate {
			continue
		}

		// Ok, so we are idle...

		// Mark this as idle by zeroing the accumulator.
		 := .accumulator.Swap(0)

		// So..., are we really idle?
		if  >  {
			// Not so idle...
			// Now we need to make sure this gets re-registered.

			// First, add back what we removed. If we can do this
			// fast enough, we can put it back before anyone
			// notices.
			 := .accumulator.Add()

			// Did we make it?
			if  ==  {
				// Yes! Nobody noticed, move along.
				continue
			}
			// No. Someone noticed and will (or has) put back into
			// the registration channel.
			//
			// Remove the snapshot total, it'll get added back on
			// registration.
			//
			// `^uint64(total - 1)` is the two's complement of
			// `total`. It's the "correct" way to subtract
			// atomically in go.
			.accumulator.Add(^uint64(.snapshot.Total - 1))
		}

		// Reset the rate, keep the total.
		.registered = false
		.snapshot.Rate = 0
		.meters[] = nil
	}

	// Re-add the total to all the newly active accumulators and set the snapshot to the total.
	// 1. We don't do this on register to avoid having to take the snapshot lock.
	// 2. We skip calculating the bandwidth for this round so we get an _accurate_ bandwidth calculation.
	for ,  := range .meters[.activeMeters:] {
		 := .accumulator.Add(.snapshot.Total)
		if  > .snapshot.Total {
			.snapshot.LastUpdate = 
		}
		.snapshot.Total = 
	}

	// compress and trim the meter list
	var  int
	for ,  := range .meters {
		if  != nil {
			.meters[] = 
			++
		}
	}

	.meters = .meters[:]

	// Finally, mark all meters still in the list as "active".
	.activeMeters = len(.meters)
}

func ( *sweeper) ( *Meter) {
	.sweepOnce.Do(.start)
	.registerChannel <- 
}