// Package quantile computes approximate quantiles over an unbounded data // stream within low memory and CPU bounds. // // A small amount of accuracy is traded to achieve the above properties. // // Multiple streams can be merged before calling Query to generate a single set // of results. This is meaningful when the streams represent the same type of // data. See Merge and Samples. // // For more detailed information about the algorithm used, see: // // Effective Computation of Biased Quantiles over Data Streams // // http://www.cs.rutgers.edu/~muthu/bquant.pdf
package quantile import ( ) // Sample holds an observed value and meta information for compression. JSON // tags have been added for convenience. type Sample struct { Value float64 `json:",string"` Width float64 `json:",string"` Delta float64 `json:",string"` } // Samples represents a slice of samples. It implements sort.Interface. type Samples []Sample func ( Samples) () int { return len() } func ( Samples) (, int) bool { return [].Value < [].Value } func ( Samples) (, int) { [], [] = [], [] } type invariant func(s *stream, r float64) float64 // NewLowBiased returns an initialized Stream for low-biased quantiles // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but // error guarantees can still be given even for the lower ranks of the data // distribution. // // The provided epsilon is a relative error, i.e. the true quantile of a value // returned by a query is guaranteed to be within (1±Epsilon)*Quantile. // // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error // properties. func ( float64) *Stream { := func( *stream, float64) float64 { return 2 * * } return newStream() } // NewHighBiased returns an initialized Stream for high-biased quantiles // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but // error guarantees can still be given even for the higher ranks of the data // distribution. // // The provided epsilon is a relative error, i.e. the true quantile of a value // returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile). // // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error // properties. func ( float64) *Stream { := func( *stream, float64) float64 { return 2 * * (.n - ) } return newStream() } // NewTargeted returns an initialized Stream concerned with a particular set of // quantile values that are supplied a priori. Knowing these a priori reduces // space and computation time. The targets map maps the desired quantiles to // their absolute errors, i.e. the true quantile of a value returned by a query // is guaranteed to be within (Quantile±Epsilon). // // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties. func ( map[float64]float64) *Stream { // Convert map to slice to avoid slow iterations on a map. // ƒ is called on the hot path, so converting the map to a slice // beforehand results in significant CPU savings. := targetMapToSlice() := func( *stream, float64) float64 { var = math.MaxFloat64 var float64 for , := range { if .quantile*.n <= { = (2 * .epsilon * ) / .quantile } else { = (2 * .epsilon * (.n - )) / (1 - .quantile) } if < { = } } return } return newStream() } type target struct { quantile float64 epsilon float64 } func targetMapToSlice( map[float64]float64) []target { := make([]target, 0, len()) for , := range { := target{ quantile: , epsilon: , } = append(, ) } return } // Stream computes quantiles for a stream of float64s. It is not thread-safe by // design. Take care when using across multiple goroutines. type Stream struct { *stream b Samples sorted bool } func newStream( invariant) *Stream { := &stream{ƒ: } return &Stream{, make(Samples, 0, 500), true} } // Insert inserts v into the stream. func ( *Stream) ( float64) { .insert(Sample{Value: , Width: 1}) } func ( *Stream) ( Sample) { .b = append(.b, ) .sorted = false if len(.b) == cap(.b) { .flush() } } // Query returns the computed qth percentiles value. If s was created with // NewTargeted, and q is not in the set of quantiles provided a priori, Query // will return an unspecified result. func ( *Stream) ( float64) float64 { if !.flushed() { // Fast path when there hasn't been enough data for a flush; // this also yields better accuracy for small sets of data. := len(.b) if == 0 { return 0 } := int(math.Ceil(float64() * )) if > 0 { -= 1 } .maybeSort() return .b[].Value } .flush() return .stream.query() } // Merge merges samples into the underlying streams samples. This is handy when // merging multiple streams from separate threads, database shards, etc. // // ATTENTION: This method is broken and does not yield correct results. The // underlying algorithm is not capable of merging streams correctly. func ( *Stream) ( Samples) { sort.Sort() .stream.merge() } // Reset reinitializes and clears the list reusing the samples buffer memory. func ( *Stream) () { .stream.reset() .b = .b[:0] } // Samples returns stream samples held by s. func ( *Stream) () Samples { if !.flushed() { return .b } .flush() return .stream.samples() } // Count returns the total number of samples observed in the stream // since initialization. func ( *Stream) () int { return len(.b) + .stream.count() } func ( *Stream) () { .maybeSort() .stream.merge(.b) .b = .b[:0] } func ( *Stream) () { if !.sorted { .sorted = true sort.Sort(.b) } } func ( *Stream) () bool { return len(.stream.l) > 0 } type stream struct { n float64 l []Sample ƒ invariant } func ( *stream) () { .l = .l[:0] .n = 0 } func ( *stream) ( float64) { .merge(Samples{{, 1, 0}}) } func ( *stream) ( Samples) { // TODO(beorn7): This tries to merge not only individual samples, but // whole summaries. The paper doesn't mention merging summaries at // all. Unittests show that the merging is inaccurate. Find out how to // do merges properly. var float64 := 0 for , := range { for ; < len(.l); ++ { := .l[] if .Value > .Value { // Insert at position i. .l = append(.l, Sample{}) copy(.l[+1:], .l[:]) .l[] = Sample{ .Value, .Width, math.Max(.Delta, math.Floor(.ƒ(, ))-1), // TODO(beorn7): How to calculate delta correctly? } ++ goto } += .Width } .l = append(.l, Sample{.Value, .Width, 0}) ++ : .n += .Width += .Width } .compress() } func ( *stream) () int { return int(.n) } func ( *stream) ( float64) float64 { := math.Ceil( * .n) += math.Ceil(.ƒ(, ) / 2) := .l[0] var float64 for , := range .l[1:] { += .Width if +.Width+.Delta > { return .Value } = } return .Value } func ( *stream) () { if len(.l) < 2 { return } := .l[len(.l)-1] := len(.l) - 1 := .n - 1 - .Width for := len(.l) - 2; >= 0; -- { := .l[] if .Width+.Width+.Delta <= .ƒ(, ) { .Width += .Width .l[] = // Remove element at i. copy(.l[:], .l[+1:]) .l = .l[:len(.l)-1] -= 1 } else { = = } -= .Width } } func ( *stream) () Samples { := make(Samples, len(.l)) copy(, .l) return }