diff --git a/metrics-exporter-prometheus/src/builder.rs b/metrics-exporter-prometheus/src/builder.rs index 33d37040..649e126e 100644 --- a/metrics-exporter-prometheus/src/builder.rs +++ b/metrics-exporter-prometheus/src/builder.rs @@ -47,6 +47,7 @@ use metrics_util::{ use crate::common::Matcher; use crate::distribution::DistributionBuilder; use crate::recorder::{Inner, PrometheusRecorder}; +use crate::registry::AtomicStorage; use crate::{common::BuildError, PrometheusHandle}; #[cfg(any(feature = "http-listener", feature = "push-gateway"))] @@ -508,7 +509,7 @@ impl PrometheusBuilder { pub(crate) fn build_with_clock(self, clock: Clock) -> PrometheusRecorder { let inner = Inner { - registry: Registry::new(GenerationalStorage::atomic()), + registry: Registry::new(GenerationalStorage::new(AtomicStorage)), recency: Recency::new(clock, self.recency_mask, self.idle_timeout), distributions: RwLock::new(HashMap::new()), distribution_builder: DistributionBuilder::new( @@ -543,7 +544,8 @@ mod tests { #[test] fn test_render() { - let recorder = PrometheusBuilder::new().build_recorder(); + let recorder = + PrometheusBuilder::new().set_quantiles(&[0.0, 1.0]).unwrap().build_recorder(); let key = Key::from_name("basic_counter"); let counter1 = recorder.register_counter(&key); @@ -575,11 +577,6 @@ mod tests { let histogram_data = concat!( "# TYPE basic_histogram summary\n", "basic_histogram{quantile=\"0\"} 12\n", - "basic_histogram{quantile=\"0.5\"} 12\n", - "basic_histogram{quantile=\"0.9\"} 12\n", - "basic_histogram{quantile=\"0.95\"} 12\n", - "basic_histogram{quantile=\"0.99\"} 12\n", - "basic_histogram{quantile=\"0.999\"} 12\n", "basic_histogram{quantile=\"1\"} 12\n", "basic_histogram_sum 12\n", "basic_histogram_count 1\n", @@ -685,6 +682,8 @@ mod tests { let recorder = PrometheusBuilder::new() .idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(10))) + .set_quantiles(&[0.0, 1.0]) + .unwrap() .build_with_clock(clock); let key = Key::from_name("basic_counter"); @@ -708,11 +707,6 @@ mod tests { "basic_gauge -3.14\n\n", "# TYPE basic_histogram summary\n", "basic_histogram{quantile=\"0\"} 1\n", - "basic_histogram{quantile=\"0.5\"} 1\n", - "basic_histogram{quantile=\"0.9\"} 1\n", - "basic_histogram{quantile=\"0.95\"} 1\n", - "basic_histogram{quantile=\"0.99\"} 1\n", - "basic_histogram{quantile=\"0.999\"} 1\n", "basic_histogram{quantile=\"1\"} 1\n", "basic_histogram_sum 1\n", "basic_histogram_count 1\n\n", @@ -738,6 +732,8 @@ mod tests { MetricKindMask::COUNTER | MetricKindMask::HISTOGRAM, Some(Duration::from_secs(10)), ) + .set_quantiles(&[0.0, 1.0]) + .unwrap() .build_with_clock(clock); let key = Key::from_name("basic_counter"); @@ -761,11 +757,6 @@ mod tests { "basic_gauge -3.14\n\n", "# TYPE basic_histogram summary\n", "basic_histogram{quantile=\"0\"} 1\n", - "basic_histogram{quantile=\"0.5\"} 1\n", - "basic_histogram{quantile=\"0.9\"} 1\n", - "basic_histogram{quantile=\"0.95\"} 1\n", - "basic_histogram{quantile=\"0.99\"} 1\n", - "basic_histogram{quantile=\"0.999\"} 1\n", "basic_histogram{quantile=\"1\"} 1\n", "basic_histogram_sum 1\n", "basic_histogram_count 1\n\n", @@ -790,6 +781,8 @@ mod tests { let recorder = PrometheusBuilder::new() .idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(10))) + .set_quantiles(&[0.0, 1.0]) + .unwrap() .build_with_clock(clock); let key = Key::from_name("basic_counter"); @@ -813,11 +806,6 @@ mod tests { "basic_gauge -3.14\n\n", "# TYPE basic_histogram summary\n", "basic_histogram{quantile=\"0\"} 1\n", - "basic_histogram{quantile=\"0.5\"} 1\n", - "basic_histogram{quantile=\"0.9\"} 1\n", - "basic_histogram{quantile=\"0.95\"} 1\n", - "basic_histogram{quantile=\"0.99\"} 1\n", - "basic_histogram{quantile=\"0.999\"} 1\n", "basic_histogram{quantile=\"1\"} 1\n", "basic_histogram_sum 1\n", "basic_histogram_count 1\n\n", @@ -840,20 +828,10 @@ mod tests { "basic_gauge -3.14\n\n", "# TYPE basic_histogram summary\n", "basic_histogram{quantile=\"0\"} 1\n", - "basic_histogram{quantile=\"0.5\"} 1\n", - "basic_histogram{quantile=\"0.9\"} 1\n", - "basic_histogram{quantile=\"0.95\"} 1\n", - "basic_histogram{quantile=\"0.99\"} 1\n", - "basic_histogram{quantile=\"0.999\"} 1\n", "basic_histogram{quantile=\"1\"} 1\n", "basic_histogram_sum 1\n", "basic_histogram_count 1\n", "basic_histogram{type=\"special\",quantile=\"0\"} 2\n", - "basic_histogram{type=\"special\",quantile=\"0.5\"} 2\n", - "basic_histogram{type=\"special\",quantile=\"0.9\"} 2\n", - "basic_histogram{type=\"special\",quantile=\"0.95\"} 2\n", - "basic_histogram{type=\"special\",quantile=\"0.99\"} 2\n", - "basic_histogram{type=\"special\",quantile=\"0.999\"} 2\n", "basic_histogram{type=\"special\",quantile=\"1\"} 2\n", "basic_histogram_sum{type=\"special\"} 2\n", "basic_histogram_count{type=\"special\"} 1\n\n", @@ -864,11 +842,6 @@ mod tests { let expected_after = concat!( "# TYPE basic_histogram summary\n", "basic_histogram{type=\"special\",quantile=\"0\"} 2\n", - "basic_histogram{type=\"special\",quantile=\"0.5\"} 2\n", - "basic_histogram{type=\"special\",quantile=\"0.9\"} 2\n", - "basic_histogram{type=\"special\",quantile=\"0.95\"} 2\n", - "basic_histogram{type=\"special\",quantile=\"0.99\"} 2\n", - "basic_histogram{type=\"special\",quantile=\"0.999\"} 2\n", "basic_histogram{type=\"special\",quantile=\"1\"} 2\n", "basic_histogram_sum{type=\"special\"} 2\n", "basic_histogram_count{type=\"special\"} 1\n\n", diff --git a/metrics-exporter-prometheus/src/distribution.rs b/metrics-exporter-prometheus/src/distribution.rs index 06f7478b..5dcc7984 100644 --- a/metrics-exporter-prometheus/src/distribution.rs +++ b/metrics-exporter-prometheus/src/distribution.rs @@ -1,5 +1,9 @@ +use std::num::NonZeroU32; +use std::time::Duration; use std::{collections::HashMap, sync::Arc}; +use quanta::Instant; + use crate::common::Matcher; use metrics_util::{Histogram, Quantile, Summary}; @@ -18,7 +22,7 @@ pub enum Distribution { /// Computes and exposes value quantiles directly to Prometheus i.e. 50% of /// requests were faster than 200ms, and 99% of requests were faster than /// 1000ms, etc. - Summary(Summary, Arc>, f64), + Summary(RollingSummary, Arc>, f64), } impl Distribution { @@ -30,17 +34,19 @@ impl Distribution { /// Creates a summary distribution. pub fn new_summary(quantiles: Arc>) -> Distribution { - let summary = Summary::with_defaults(); + let summary = RollingSummary::default(); Distribution::Summary(summary, quantiles, 0.0) } /// Records the given `samples` in the current distribution. - pub fn record_samples(&mut self, samples: &[f64]) { + pub fn record_samples(&mut self, samples: &[(f64, Instant)]) { match self { - Distribution::Histogram(hist) => hist.record_many(samples), + Distribution::Histogram(hist) => { + hist.record_many(samples.iter().map(|(sample, _ts)| sample)); + } Distribution::Summary(hist, _, sum) => { - for sample in samples { - hist.add(*sample); + for (sample, ts) in samples { + hist.add(*sample, *ts); *sum += *sample; } } @@ -108,3 +114,302 @@ impl DistributionBuilder { "summary" } } + +#[derive(Clone)] +struct Bucket { + begin: Instant, + summary: Summary, +} + +/// A `RollingSummary` manages a list of [Summary] so that old results can be expired. +#[derive(Clone)] +pub struct RollingSummary { + // Buckets are ordered with the latest buckets first. The buckets are kept in alignment based + // on the instant of the first added bucket and the bucket_duration. There may be gaps in the + // bucket list. + buckets: Vec, + // Maximum number of buckets to track. + max_buckets: usize, + // Duration of values stored per bucket. + bucket_duration: Duration, + // This is the maximum duration a bucket will be kept. + max_bucket_duration: Duration, + // Total samples since creation of this summary. This is separate from the Summary since it is + // never reset. + count: usize, +} + +impl Default for RollingSummary { + fn default() -> Self { + RollingSummary::new(NonZeroU32::new(3).unwrap(), Duration::from_secs(20)) + } +} + +impl RollingSummary { + /// Create a new `RollingSummary` with the given number of `buckets` and `bucket-duration`. + /// + /// The summary will store quantiles over `buckets * bucket_duration` seconds. + pub fn new(buckets: std::num::NonZeroU32, bucket_duration: Duration) -> RollingSummary { + assert!(!bucket_duration.is_zero()); + let max_bucket_duration = bucket_duration * buckets.get(); + let max_buckets = buckets.get() as usize; + + RollingSummary { + buckets: Vec::with_capacity(max_buckets), + max_buckets, + bucket_duration, + max_bucket_duration, + count: 0, + } + } + + /// Add a sample `value` to the `RollingSummary` at the time `now`. + /// + /// Any values that expire at the `value_ts` are removed from the `RollingSummary`. + pub fn add(&mut self, value: f64, now: Instant) { + // The count is incremented even if this value is too old to be saved in any bucket. + self.count += 1; + + // If we can find a bucket that this value belongs in, then we can just add it in and be + // done. + for bucket in &mut self.buckets { + let end = bucket.begin + self.bucket_duration; + + // If this value belongs in a future bucket... + if now > bucket.begin + self.bucket_duration { + break; + } + + if now >= bucket.begin && now < end { + bucket.summary.add(value); + return; + } + } + + // Remove any expired buckets. + if let Some(cutoff) = now.checked_sub(self.max_bucket_duration) { + self.buckets.retain(|b| b.begin > cutoff); + } + + if self.buckets.is_empty() { + let mut summary = Summary::with_defaults(); + summary.add(value); + self.buckets.push(Bucket { begin: now, summary }); + return; + } + + // Take the first bucket time as a reference. Other buckets will be created at an offset + // of this time. We know this time is close to the value_ts, if it were much older the + // bucket would have been removed. + let reftime = self.buckets[0].begin; + + let mut summary = Summary::with_defaults(); + summary.add(value); + + // If the value is newer than the first bucket then count upwards to the new bucket time. + let mut begin; + if now > reftime { + begin = reftime + self.bucket_duration; + let mut end = begin + self.bucket_duration; + while now < begin || now >= end { + begin += self.bucket_duration; + end += self.bucket_duration; + } + + self.buckets.truncate(self.max_buckets - 1); + self.buckets.insert(0, Bucket { begin, summary }); + } else { + begin = reftime - self.bucket_duration; + while now < begin { + begin -= self.bucket_duration; + } + + self.buckets.truncate(self.max_buckets - 1); + self.buckets.push(Bucket { begin, summary }); + self.buckets.sort_unstable_by(|a, b| b.begin.cmp(&a.begin)); + } + } + + /// Return a merged Summary of all items that are valid at `now`. + /// + /// # Warning + /// + /// The snapshot `Summary::count()` contains the total number of values considered in the + /// Snapshot, which is not the full count of the `RollingSummary`. Use `RollingSummary::count()` + /// instead. + pub fn snapshot(&self, now: Instant) -> Summary { + let cutoff = now.checked_sub(self.max_bucket_duration); + let mut acc = Summary::with_defaults(); + self.buckets + .iter() + .filter(|b| if let Some(cutoff) = cutoff { b.begin > cutoff } else { true }) + .map(|b| &b.summary) + .fold(&mut acc, |acc, item| { + acc.merge(item).expect("merge can only fail if summary config inconsistent"); + acc + }); + acc + } + + /// Whether or not this summary is empty. + pub fn is_empty(&self) -> bool { + self.count() == 0 + } + + /// Gets the totoal number of samples this summary has seen so far. + pub fn count(&self) -> usize { + self.count + } + + #[cfg(test)] + fn buckets(&self) -> &Vec { + &self.buckets + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use quanta::Clock; + + #[test] + fn new_rolling_summary() { + let summary = RollingSummary::default(); + + assert_eq!(0, summary.buckets().len()); + assert_eq!(0, summary.count()); + assert!(summary.is_empty()); + } + + #[test] + fn empty_snapshot() { + let (clock, _mock) = Clock::mock(); + let summary = RollingSummary::default(); + let snapshot = summary.snapshot(clock.now()); + + assert_eq!(0, snapshot.count()); + assert_eq!(f64::INFINITY, snapshot.min()); + assert_eq!(f64::NEG_INFINITY, snapshot.max()); + assert_eq!(None, snapshot.quantile(0.5)); + } + + #[test] + fn snapshot() { + let (clock, mock) = Clock::mock(); + mock.increment(Duration::from_secs(3600)); + + let mut summary = RollingSummary::default(); + summary.add(42.0, clock.now()); + mock.increment(Duration::from_secs(20)); + summary.add(42.0, clock.now()); + mock.increment(Duration::from_secs(20)); + summary.add(42.0, clock.now()); + + let snapshot = summary.snapshot(clock.now()); + + assert_eq!(42.0, snapshot.min()); + assert_eq!(42.0, snapshot.max()); + // 42 +/- (42 * 0.0001) + assert!(Some(41.9958) < snapshot.quantile(0.5)); + assert!(Some(42.0042) > snapshot.quantile(0.5)); + } + + #[test] + fn add_first_value() { + let (clock, mock) = Clock::mock(); + mock.increment(Duration::from_secs(3600)); + + let mut summary = RollingSummary::default(); + summary.add(42.0, clock.now()); + + assert_eq!(1, summary.buckets().len()); + assert_eq!(1, summary.count()); + assert!(!summary.is_empty()); + } + + #[test] + fn add_new_head() { + let (clock, mock) = Clock::mock(); + mock.increment(Duration::from_secs(3600)); + + let mut summary = RollingSummary::default(); + summary.add(42.0, clock.now()); + mock.increment(Duration::from_secs(20)); + summary.add(42.0, clock.now()); + + assert_eq!(2, summary.buckets().len()); + } + + #[test] + fn truncate_old_buckets() { + let (clock, mock) = Clock::mock(); + mock.increment(Duration::from_secs(3600)); + + let mut summary = RollingSummary::default(); + summary.add(42.0, clock.now()); + + for _ in 0..3 { + mock.increment(Duration::from_secs(20)); + summary.add(42.0, clock.now()); + } + + assert_eq!(3, summary.buckets().len()); + } + + #[test] + fn add_to_tail() { + let (clock, mock) = Clock::mock(); + mock.increment(Duration::from_secs(3600)); + + let mut summary = RollingSummary::default(); + summary.add(42.0, clock.now()); + let mut expected = Vec::new(); + expected.push(clock.now()); + mock.decrement(Duration::from_secs(20)); + summary.add(42.0, clock.now()); + expected.push(clock.now()); + + let actual: Vec = summary.buckets().iter().map(|b| b.begin).collect(); + assert_eq!(expected, actual); + } + + #[test] + fn add_to_tail_with_gap() { + let (clock, mock) = Clock::mock(); + mock.increment(Duration::from_secs(3600)); + + let mut summary = RollingSummary::default(); + summary.add(42.0, clock.now()); + let mut expected = Vec::new(); + expected.push(clock.now()); + mock.decrement(Duration::from_secs(40)); + summary.add(42.0, clock.now()); + expected.push(clock.now()); + + let actual: Vec = summary.buckets().iter().map(|b| b.begin).collect(); + assert_eq!(expected, actual); + } + + #[test] + fn add_to_middle_gap() { + let (clock, mock) = Clock::mock(); + mock.increment(Duration::from_secs(3600)); + + let mut expected = Vec::new(); + expected.resize(3, Instant::now()); + + let mut summary = RollingSummary::default(); + summary.add(42.0, clock.now()); + expected[0] = clock.now(); + mock.decrement(Duration::from_secs(40)); + summary.add(42.0, clock.now()); + expected[2] = clock.now(); + mock.increment(Duration::from_secs(20)); + summary.add(42.0, clock.now()); + expected[1] = clock.now(); + + let actual: Vec = summary.buckets().iter().map(|b| b.begin).collect(); + assert_eq!(expected, actual); + } +} diff --git a/metrics-exporter-prometheus/src/lib.rs b/metrics-exporter-prometheus/src/lib.rs index 8f6de32c..399d942b 100644 --- a/metrics-exporter-prometheus/src/lib.rs +++ b/metrics-exporter-prometheus/src/lib.rs @@ -112,4 +112,6 @@ pub use self::builder::PrometheusBuilder; pub mod formatting; mod recorder; +mod registry; + pub use self::recorder::{PrometheusHandle, PrometheusRecorder}; diff --git a/metrics-exporter-prometheus/src/recorder.rs b/metrics-exporter-prometheus/src/recorder.rs index 4ada6e2b..74b3a061 100644 --- a/metrics-exporter-prometheus/src/recorder.rs +++ b/metrics-exporter-prometheus/src/recorder.rs @@ -4,14 +4,16 @@ use std::sync::Arc; use indexmap::IndexMap; use metrics::{Counter, Gauge, Histogram, Key, KeyName, Recorder, Unit}; -use metrics_util::registry::{GenerationalAtomicStorage, Recency, Registry}; +use metrics_util::registry::{Recency, Registry}; use parking_lot::RwLock; +use quanta::Instant; use crate::common::Snapshot; use crate::distribution::{Distribution, DistributionBuilder}; use crate::formatting::{ key_to_parts, sanitize_metric_name, write_help_line, write_metric_line, write_type_line, }; +use crate::registry::GenerationalAtomicStorage; pub(crate) struct Inner { pub registry: Registry, @@ -136,8 +138,9 @@ impl Inner { for (labels, distribution) in by_labels.drain(..) { let (sum, count) = match distribution { Distribution::Summary(summary, quantiles, sum) => { + let snapshot = summary.snapshot(Instant::now()); for quantile in quantiles.iter() { - let value = summary.quantile(quantile.value()).unwrap_or(0.0); + let value = snapshot.quantile(quantile.value()).unwrap_or(0.0); write_metric_line( &mut output, &name, diff --git a/metrics-exporter-prometheus/src/registry.rs b/metrics-exporter-prometheus/src/registry.rs new file mode 100644 index 00000000..b82cba67 --- /dev/null +++ b/metrics-exporter-prometheus/src/registry.rs @@ -0,0 +1,55 @@ +use std::sync::atomic::AtomicU64; +use std::sync::Arc; + +use metrics::HistogramFn; +use metrics_util::registry::GenerationalStorage; +use metrics_util::AtomicBucket; +use quanta::Instant; + +pub type GenerationalAtomicStorage = GenerationalStorage; + +/// Atomic metric storage for the prometheus exporter. +pub struct AtomicStorage; + +impl metrics_util::registry::Storage for AtomicStorage { + type Counter = Arc; + type Gauge = Arc; + type Histogram = Arc>; + + fn counter(&self, _: &K) -> Self::Counter { + Arc::new(AtomicU64::new(0)) + } + + fn gauge(&self, _: &K) -> Self::Gauge { + Arc::new(AtomicU64::new(0)) + } + + fn histogram(&self, _: &K) -> Self::Histogram { + Arc::new(AtomicBucketInstant::new()) + } +} + +/// An `AtomicBucket` newtype wrapper that tracks the time of value insertion. +pub struct AtomicBucketInstant { + inner: AtomicBucket<(T, Instant)>, +} + +impl AtomicBucketInstant { + fn new() -> AtomicBucketInstant { + Self { inner: AtomicBucket::new() } + } + + pub fn clear_with(&self, f: F) + where + F: FnMut(&[(T, Instant)]), + { + self.inner.clear_with(f); + } +} + +impl HistogramFn for AtomicBucketInstant { + fn record(&self, value: f64) { + let now = Instant::now(); + self.inner.push((value, now)); + } +} diff --git a/metrics-util/Cargo.toml b/metrics-util/Cargo.toml index 0d5602a8..4214a0db 100644 --- a/metrics-util/Cargo.toml +++ b/metrics-util/Cargo.toml @@ -54,7 +54,7 @@ aho-corasick = { version = "0.7", default-features = false, optional = true, fea indexmap = { version = "1", default-features = false, optional = true } parking_lot = { version = "0.11", default-features = false, optional = true } quanta = { version = "0.9.3", default-features = false, optional = true } -sketches-ddsketch = { version = "0.1.3", default-features = false, optional = true } +sketches-ddsketch = { version = "0.2.0", default-features = false, optional = true } radix_trie = { version = "0.2", default-features = false, optional = true } ordered-float = { version = "2.0", default-features = false, optional = true } num_cpus = { version = "1", default-features = false, optional = true } @@ -68,7 +68,7 @@ rand = { version = "0.8", features = ["small_rng"] } rand_distr = "0.4" getopts = "0.2" hdrhistogram = { version = "7.2", default-features = false } -sketches-ddsketch = "0.1.3" +sketches-ddsketch = "0.2.0" ndarray = "0.15" ndarray-stats = "0.5" noisy_float = "0.2" diff --git a/metrics-util/src/registry/recency.rs b/metrics-util/src/registry/recency.rs index aeb15b46..ed8ed13d 100644 --- a/metrics-util/src/registry/recency.rs +++ b/metrics-util/src/registry/recency.rs @@ -248,12 +248,15 @@ where /// method will return `true` and will update the last update time internally. If the given key /// has not been updated recently enough, the key will be removed from the given registry if the /// given generation also matches. - pub fn should_store_counter( + pub fn should_store_counter( &self, key: &K, gen: Generation, - registry: &Registry, - ) -> bool { + registry: &Registry, + ) -> bool + where + S: Storage, + { self.should_store(key, gen, registry, MetricKind::Counter, |registry, key| { registry.delete_counter(key) }) @@ -265,12 +268,10 @@ where /// method will return `true` and will update the last update time internally. If the given key /// has not been updated recently enough, the key will be removed from the given registry if the /// given generation also matches. - pub fn should_store_gauge( - &self, - key: &K, - gen: Generation, - registry: &Registry, - ) -> bool { + pub fn should_store_gauge(&self, key: &K, gen: Generation, registry: &Registry) -> bool + where + S: Storage, + { self.should_store(key, gen, registry, MetricKind::Gauge, |registry, key| { registry.delete_gauge(key) }) @@ -282,27 +283,31 @@ where /// method will return `true` and will update the last update time internally. If the given key /// has not been updated recently enough, the key will be removed from the given registry if the /// given generation also matches. - pub fn should_store_histogram( + pub fn should_store_histogram( &self, key: &K, gen: Generation, - registry: &Registry, - ) -> bool { + registry: &Registry, + ) -> bool + where + S: Storage, + { self.should_store(key, gen, registry, MetricKind::Histogram, |registry, key| { registry.delete_histogram(key) }) } - fn should_store( + fn should_store( &self, key: &K, gen: Generation, - registry: &Registry, + registry: &Registry, kind: MetricKind, delete_op: F, ) -> bool where - F: Fn(&Registry, &K) -> bool, + F: Fn(&Registry, &K) -> bool, + S: Storage, { if let Some(idle_timeout) = self.idle_timeout { if self.mask.matches(kind) { diff --git a/metrics-util/src/summary.rs b/metrics-util/src/summary.rs index 6c1772f1..cd4c5cc2 100644 --- a/metrics-util/src/summary.rs +++ b/metrics-util/src/summary.rs @@ -1,4 +1,5 @@ use sketches_ddsketch::{Config, DDSketch}; +use std::fmt; /// A quantile sketch with relative-error guarantees. /// @@ -41,12 +42,7 @@ use sketches_ddsketch::{Config, DDSketch}; /// [hdrhistogram]: https://docs.rs/hdrhistogram #[derive(Clone)] pub struct Summary { - negative: DDSketch, - positive: DDSketch, - min_value: f64, - zeroes: usize, - min: f64, - max: f64, + sketch: DDSketch, } impl Summary { @@ -68,14 +64,7 @@ impl Summary { pub fn new(alpha: f64, max_buckets: u32, min_value: f64) -> Summary { let config = Config::new(alpha, max_buckets, min_value.abs()); - Summary { - negative: DDSketch::new(config), - positive: DDSketch::new(config), - min_value: min_value.abs(), - zeroes: 0, - min: f64::INFINITY, - max: f64::NEG_INFINITY, - } + Summary { sketch: DDSketch::new(config) } } /// Creates a new [`Summary`] with default values. @@ -100,21 +89,7 @@ impl Summary { return; } - if value < self.min { - self.min = value; - } - - if value > self.max { - self.max = value; - } - - if value > self.min_value { - self.positive.add(value); - } else if value < -self.min_value { - self.negative.add(-value); - } else { - self.zeroes += 1; - } + self.sketch.add(value); } /// Gets the estimated value at the given quantile. @@ -127,40 +102,30 @@ impl Summary { pub fn quantile(&self, q: f64) -> Option { if !(0.0..=1.0).contains(&q) || self.count() == 0 { return None; - } else if q == 0.0 { - return Some(self.min()); - } else if q == 1.0 { - return Some(self.max()); } - let ncount = self.negative.count() as f64; - let pcount = self.positive.count() as f64; - let zcount = self.zeroes as f64; - // Defer rounding to the underlying sketch - let rank = q * (ncount + pcount + zcount); - - if rank <= ncount { - // Quantile lands in the negative side. - let nq = 1.0 - (rank / ncount as f64); - self.negative.quantile(nq).expect("quantile should be valid at this point").map(|v| -v) - } else if rank <= (ncount + zcount) { - // Quantile lands in the zero band. - Some(0.0) - } else { - // Quantile lands in the positive side. - let pq = (rank - (ncount + zcount)) / pcount; - self.positive.quantile(pq).expect("quantile should be valid at this point") - } + self.sketch.quantile(q).expect("quantile should be valid at this point") + } + + /// Merge another Summary into this one. + /// + /// # Errors + /// + /// This function will return an error if the other Summary was not created with the same + /// parameters. + pub fn merge(&mut self, other: &Summary) -> Result<(), MergeError> { + self.sketch.merge(&other.sketch).map_err(|_| MergeError {})?; + Ok(()) } /// Gets the minimum value this summary has seen so far. pub fn min(&self) -> f64 { - self.min + self.sketch.min().unwrap_or(f64::INFINITY) } /// Gets the maximum value this summary has seen so far. pub fn max(&self) -> f64 { - self.max + self.sketch.max().unwrap_or(f64::NEG_INFINITY) } /// Whether or not this summary is empty. @@ -170,12 +135,7 @@ impl Summary { /// Gets the number of samples in this summary. pub fn count(&self) -> usize { - self.negative.count() + self.positive.count() + self.zeroes - } - - /// Gets the number of samples in this summary by zeroes, negative, and positive counts. - pub fn detailed_count(&self) -> (usize, usize, usize) { - (self.zeroes, self.negative.count(), self.positive.count()) + self.sketch.count() } /// Gets the estimized size of this summary, in bytes. @@ -183,10 +143,21 @@ impl Summary { /// In practice, this value should be very close to the actual size, but will not be entirely /// precise. pub fn estimated_size(&self) -> usize { - std::mem::size_of::() + ((self.positive.length() + self.negative.length()) * 8) + std::mem::size_of::() + (self.sketch.length() * 8) + } +} + +#[derive(Copy, Clone, Debug)] +pub struct MergeError {} + +impl fmt::Display for MergeError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "merge error") } } +impl std::error::Error for MergeError {} + #[cfg(test)] mod tests { use super::Summary; @@ -217,30 +188,30 @@ mod tests { assert_eq!(summary.count(), 1); assert_relative_eq!(summary.min(), -420.42); assert_relative_eq!(summary.max(), -420.42); - let alpha = 0.001; let test_cases = vec![(0.1, -420.42), (0.5, -420.42), (0.9, -420.42)]; for (q, val) in test_cases { assert_relative_eq!( summary.quantile(q).expect("value should exist"), val, - max_relative = 2.0 * alpha * val + max_relative = alpha ); } summary.add(420.42); + assert_eq!(summary.count(), 2); assert_relative_eq!(summary.min(), -420.42); assert_relative_eq!(summary.max(), 420.42); assert_relative_eq!( summary.quantile(0.5).expect("value should exist"), -420.42, - max_relative = 2.0 * alpha * 420.42 + max_relative = alpha ); assert_relative_eq!( summary.quantile(0.51).expect("value should exist"), - 420.42, - max_relative = 2.0 * alpha * 420.42 + -420.42, + max_relative = alpha ); summary.add(42.42); @@ -250,16 +221,16 @@ mod tests { let test_cases = vec![ (0.333333, -420.42), - (0.333334, 42.42), + (0.333334, -420.42), (0.666666, 42.42), - (0.666667, 420.42), - (0.999999, 420.42), + (0.666667, 42.42), + (0.999999, 42.42), ]; for (q, val) in test_cases { assert_relative_eq!( summary.quantile(q).expect("value should exist"), val, - max_relative = 2.0 * alpha * val + max_relative = alpha ); } }