// Package quantile computes approximate quantiles over an unbounded data// stream within low memory and CPU bounds.//// A small amount of accuracy is traded to achieve the above properties.//// Multiple streams can be merged before calling Query to generate a single set// of results. This is meaningful when the streams represent the same type of// data. See Merge and Samples.//// For more detailed information about the algorithm used, see://// Effective Computation of Biased Quantiles over Data Streams//// http://www.cs.rutgers.edu/~muthu/bquant.pdf
package quantileimport ()// Sample holds an observed value and meta information for compression. JSON// tags have been added for convenience.typeSamplestruct { Value float64`json:",string"` Width float64`json:",string"` Delta float64`json:",string"`}// Samples represents a slice of samples. It implements sort.Interface.typeSamples []Samplefunc ( Samples) () int { returnlen() }func ( Samples) (, int) bool { return [].Value < [].Value }func ( Samples) (, int) { [], [] = [], [] }type invariant func(s *stream, r float64) float64// NewLowBiased returns an initialized Stream for low-biased quantiles// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but// error guarantees can still be given even for the lower ranks of the data// distribution.//// The provided epsilon is a relative error, i.e. the true quantile of a value// returned by a query is guaranteed to be within (1±Epsilon)*Quantile.//// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error// properties.func ( float64) *Stream { := func( *stream, float64) float64 {return2 * * }returnnewStream()}// NewHighBiased returns an initialized Stream for high-biased quantiles// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but// error guarantees can still be given even for the higher ranks of the data// distribution.//// The provided epsilon is a relative error, i.e. the true quantile of a value// returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile).//// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error// properties.func ( float64) *Stream { := func( *stream, float64) float64 {return2 * * (.n - ) }returnnewStream()}// NewTargeted returns an initialized Stream concerned with a particular set of// quantile values that are supplied a priori. Knowing these a priori reduces// space and computation time. The targets map maps the desired quantiles to// their absolute errors, i.e. the true quantile of a value returned by a query// is guaranteed to be within (Quantile±Epsilon).//// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.func ( map[float64]float64) *Stream {// Convert map to slice to avoid slow iterations on a map. // ƒ is called on the hot path, so converting the map to a slice // beforehand results in significant CPU savings. := targetMapToSlice() := func( *stream, float64) float64 {var = math.MaxFloat64varfloat64for , := range {if .quantile*.n <= { = (2 * .epsilon * ) / .quantile } else { = (2 * .epsilon * (.n - )) / (1 - .quantile) }if < { = } }return }returnnewStream()}type target struct { quantile float64 epsilon float64}func targetMapToSlice( map[float64]float64) []target { := make([]target, 0, len())for , := range { := target{quantile: ,epsilon: , } = append(, ) }return}// Stream computes quantiles for a stream of float64s. It is not thread-safe by// design. Take care when using across multiple goroutines.typeStreamstruct { *stream b Samples sorted bool}func newStream( invariant) *Stream { := &stream{ƒ: }return &Stream{, make(Samples, 0, 500), true}}// Insert inserts v into the stream.func ( *Stream) ( float64) { .insert(Sample{Value: , Width: 1})}func ( *Stream) ( Sample) { .b = append(.b, ) .sorted = falseiflen(.b) == cap(.b) { .flush() }}// Query returns the computed qth percentiles value. If s was created with// NewTargeted, and q is not in the set of quantiles provided a priori, Query// will return an unspecified result.func ( *Stream) ( float64) float64 {if !.flushed() {// Fast path when there hasn't been enough data for a flush; // this also yields better accuracy for small sets of data. := len(.b)if == 0 {return0 } := int(math.Ceil(float64() * ))if > 0 { -= 1 } .maybeSort()return .b[].Value } .flush()return .stream.query()}// Merge merges samples into the underlying streams samples. This is handy when// merging multiple streams from separate threads, database shards, etc.//// ATTENTION: This method is broken and does not yield correct results. The// underlying algorithm is not capable of merging streams correctly.func ( *Stream) ( Samples) {sort.Sort() .stream.merge()}// Reset reinitializes and clears the list reusing the samples buffer memory.func ( *Stream) () { .stream.reset() .b = .b[:0]}// Samples returns stream samples held by s.func ( *Stream) () Samples {if !.flushed() {return .b } .flush()return .stream.samples()}// Count returns the total number of samples observed in the stream// since initialization.func ( *Stream) () int {returnlen(.b) + .stream.count()}func ( *Stream) () { .maybeSort() .stream.merge(.b) .b = .b[:0]}func ( *Stream) () {if !.sorted { .sorted = truesort.Sort(.b) }}func ( *Stream) () bool {returnlen(.stream.l) > 0}type stream struct { n float64 l []Sample ƒ invariant}func ( *stream) () { .l = .l[:0] .n = 0}func ( *stream) ( float64) { .merge(Samples{{, 1, 0}})}func ( *stream) ( Samples) {// TODO(beorn7): This tries to merge not only individual samples, but // whole summaries. The paper doesn't mention merging summaries at // all. Unittests show that the merging is inaccurate. Find out how to // do merges properly.varfloat64 := 0for , := range {for ; < len(.l); ++ { := .l[]if .Value > .Value {// Insert at position i. .l = append(.l, Sample{})copy(.l[+1:], .l[:]) .l[] = Sample{ .Value, .Width,math.Max(.Delta, math.Floor(.ƒ(, ))-1),// TODO(beorn7): How to calculate delta correctly? } ++goto } += .Width } .l = append(.l, Sample{.Value, .Width, 0}) ++ : .n += .Width += .Width } .compress()}func ( *stream) () int {returnint(.n)}func ( *stream) ( float64) float64 { := math.Ceil( * .n) += math.Ceil(.ƒ(, ) / 2) := .l[0]varfloat64for , := range .l[1:] { += .Widthif +.Width+.Delta > {return .Value } = }return .Value}func ( *stream) () {iflen(.l) < 2 {return } := .l[len(.l)-1] := len(.l) - 1 := .n - 1 - .Widthfor := len(.l) - 2; >= 0; -- { := .l[]if .Width+.Width+.Delta <= .ƒ(, ) { .Width += .Width .l[] = // Remove element at i.copy(.l[:], .l[+1:]) .l = .l[:len(.l)-1] -= 1 } else { = = } -= .Width }}func ( *stream) () Samples { := make(Samples, len(.l))copy(, .l)return}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.