diff --git a/Cargo.toml b/Cargo.toml index f3d1aad8..bf35af1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ parking_lot = "0.12" prometheus-client-derive-encode = { version = "0.4.1", path = "derive-encode" } prost = { version = "0.11.0", optional = true } prost-types = { version = "0.11.0", optional = true } +quantiles = "0.7.1" [dev-dependencies] async-std = { version = "1", features = ["attributes"] } diff --git a/src/encoding.rs b/src/encoding.rs index b59f2450..c912d14c 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -131,6 +131,21 @@ impl<'a, 'b> MetricEncoder<'a, 'b> { ) } + /// Encode a summary. + pub fn encode_summary( + &mut self, + sum: f64, + count: u64, + quantiles: &[(f64, f64)], + ) -> Result<(), std::fmt::Error> { + for_both_mut!( + self, + MetricEncoderInner, + e, + e.encode_summary::(sum, count, quantiles) + ) + } + /// Encode a metric family. pub fn encode_family<'c, 'd, S: EncodeLabelSet>( &'c mut self, diff --git a/src/encoding/protobuf.rs b/src/encoding/protobuf.rs index 500ded75..3951a7d6 100644 --- a/src/encoding/protobuf.rs +++ b/src/encoding/protobuf.rs @@ -99,6 +99,7 @@ impl From for openmetrics_data_model::MetricType { MetricType::Counter => openmetrics_data_model::MetricType::Counter, MetricType::Gauge => openmetrics_data_model::MetricType::Gauge, MetricType::Histogram => openmetrics_data_model::MetricType::Histogram, + MetricType::Summary => openmetrics_data_model::MetricType::Summary, MetricType::Info => openmetrics_data_model::MetricType::Info, MetricType::Unknown => openmetrics_data_model::MetricType::Unknown, } @@ -247,6 +248,44 @@ impl<'a> MetricEncoder<'a> { Ok(()) } + + pub fn encode_summary( + &mut self, + sum: f64, + count: u64, + quantile: &[(f64, f64)], + ) -> Result<(), std::fmt::Error> { + let quantiles = quantiles + .iter() + .enumerate() + .map(|(_, (quantile, value))| { + Ok(openmetrics_data_model::summary_value::Quantile { + quantile: *quantile, + value: *value, + }) + }) + .collect::, std::fmt::Error>>()?; + + self.family.push(openmetrics_data_model::Metric { + labels: self.labels.clone(), + metric_points: vec![openmetrics_data_model::MetricPoint { + value: Some(openmetrics_data_model::metric_point::Value::SummaryValue( + openmetrics_data_model::SummaryValue { + count, + created: None, + quantiles, + sum: Some(openmetrics_data_model::summary_value::Sum::DoubleValue( + sum, + )), + }, + )), + ..Default::default() + }], + }); + + Ok(()) + } + } impl TryFrom<&Exemplar> @@ -403,6 +442,7 @@ mod tests { use crate::metrics::family::Family; use crate::metrics::gauge::Gauge; use crate::metrics::histogram::{exponential_buckets, Histogram}; + use crate::metrics::summary::Summary; use crate::metrics::info::Info; use crate::registry::Unit; use std::borrow::Cow; @@ -643,6 +683,39 @@ mod tests { } } + #[test] + fn encode_summary() { + let mut registry = Registry::default(); + let summary = Summary::new(5, 10, vec![0.5, 0.9, 0.99], 0.01); + registry.register("my_summary", "My summary", summary.clone()); + summary.observe(1.0); + + let metric_set = encode(®istry).unwrap(); + + let family = metric_set.metric_families.first().unwrap(); + assert_eq!("my_summary", family.name); + assert_eq!("My summary.", family.help); + + assert_eq!( + openmetrics_data_model::MetricType::Summary as i32, + extract_metric_type(&metric_set) + ); + + match extract_metric_point_value(metric_set) { + openmetrics_data_model::metric_point::Value::SummaryValue(value) => { + assert_eq!( + Some(openmetrics_data_model::summary_value::Sum::DoubleValue( + 1.0 + )), + value.sum + ); + assert_eq!(1, value.count); + assert_eq!(11, value.quantile.len()); + } + _ => panic!("wrong value type"), + } + } + #[test] fn encode_histogram() { let mut registry = Registry::default(); diff --git a/src/encoding/text.rs b/src/encoding/text.rs index dea22e05..1a26f101 100644 --- a/src/encoding/text.rs +++ b/src/encoding/text.rs @@ -211,6 +211,40 @@ impl<'a, 'b> MetricEncoder<'a, 'b> { }) } + pub fn encode_summary( + &mut self, + sum: f64, + count: u64, + quantiles: &[(f64, f64)], + ) -> Result<(), std::fmt::Error> { + self.write_name_and_unit()?; + self.write_suffix("sum")?; + self.encode_labels::<()>(None)?; + self.writer.write_str(" ")?; + self.writer.write_str(dtoa::Buffer::new().format(sum))?; + self.newline()?; + + self.write_name_and_unit()?; + self.write_suffix("count")?; + self.encode_labels::<()>(None)?; + self.writer.write_str(" ")?; + self.writer.write_str(itoa::Buffer::new().format(count))?; + self.newline()?; + + for (_, (quantile, result)) in quantiles.iter().enumerate() { + self.write_name_and_unit()?; + self.encode_labels(Some(&[("quantile", *quantile)]))?; + + self.writer.write_str(" ")?; + self.writer + .write_str(result.to_string().as_str())?; + + self.newline()?; + } + + Ok(()) + } + pub fn encode_histogram( &mut self, sum: f64, @@ -511,6 +545,7 @@ mod tests { use crate::metrics::family::Family; use crate::metrics::gauge::Gauge; use crate::metrics::histogram::{exponential_buckets, Histogram}; + use crate::metrics::summary::Summary; use crate::metrics::info::Info; use crate::metrics::{counter::Counter, exemplar::CounterWithExemplar}; use pyo3::{prelude::*, types::PyModule}; @@ -723,6 +758,31 @@ mod tests { parse_with_python_client(encoded); } + #[test] + fn encode_summary() { + let mut registry = Registry::default(); + let summary = Summary::new(3, 10, vec![0.5, 0.9, 0.99], 0.0); + registry.register("my_summary", "My summary", summary.clone()); + summary.observe(0.10); + summary.observe(0.20); + summary.observe(0.30); + + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let expected = "# HELP my_summary My summary.\n".to_owned() + + "# TYPE my_summary summary\n" + + "my_summary_sum 0.6000000000000001\n" + + "my_summary_count 3\n" + + "my_summary{quantile=\"0.5\"} 0.2\n" + + "my_summary{quantile=\"0.9\"} 0.3\n" + + "my_summary{quantile=\"0.99\"} 0.3\n" + + "# EOF\n"; + assert_eq!(expected, encoded); + + parse_with_python_client(encoded); + } + fn parse_with_python_client(input: String) { pyo3::prepare_freethreaded_python(); diff --git a/src/metrics.rs b/src/metrics.rs index cd389527..57500721 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -5,6 +5,7 @@ pub mod exemplar; pub mod family; pub mod gauge; pub mod histogram; +pub mod summary; pub mod info; /// A metric that is aware of its Open Metrics metric type. @@ -22,6 +23,7 @@ pub enum MetricType { Histogram, Info, Unknown, + Summary, // Not (yet) supported metric types. // // GaugeHistogram, @@ -36,6 +38,7 @@ impl MetricType { MetricType::Counter => "counter", MetricType::Gauge => "gauge", MetricType::Histogram => "histogram", + MetricType::Summary => "summary", MetricType::Info => "info", MetricType::Unknown => "unknown", } diff --git a/src/metrics/summary.rs b/src/metrics/summary.rs new file mode 100644 index 00000000..2f132334 --- /dev/null +++ b/src/metrics/summary.rs @@ -0,0 +1,164 @@ +//! Module implementing an Open Metrics summary. +//! +//! See [`Summary`] for details. + +use crate::encoding::{EncodeMetric, MetricEncoder}; + +use super::{MetricType, TypedMetric}; +use parking_lot::RwLock; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use quantiles::ckms::CKMS; + +/// Open Metrics [`Summary`] to measure distributions of discrete events. +#[derive(Debug)] +pub struct Summary { + target_quantile: Vec, + target_error: f64, + max_age_buckets: u64, + max_age_seconds: u64, + stream_duration: Duration, + inner: Arc>, +} + +impl Clone for Summary { + fn clone(&self) -> Self { + Summary { + target_quantile: self.target_quantile.clone(), + target_error: self.target_error, + max_age_buckets: self.max_age_buckets, + max_age_seconds: self.max_age_seconds, + stream_duration: self.stream_duration, + inner: self.inner.clone(), + } + } +} + +#[derive(Debug)] +pub(crate) struct InnerSummary { + sum: f64, + count: u64, + quantile_streams: Vec>, + // head_stream is like a cursor which carries the index + // of the stream in the quantile_streams that we want to query. + head_stream_idx: u64, + // timestamp at which the head_stream_idx was last rotated. + last_rotated_timestamp: Instant, +} + +impl Summary { + /// Create a new [`Summary`]. + pub fn new(max_age_buckets: u64, max_age_seconds: u64, target_quantile: Vec, target_error: f64) -> Self { + let mut streams: Vec> = Vec::new(); + for _ in 0..max_age_buckets { + streams.push(CKMS::new(target_error)); + } + + let stream_duration = Duration::from_secs(max_age_seconds / max_age_buckets); + let last_rotated_timestamp = Instant::now(); + + if target_quantile.iter().any(|&x| x > 1.0 || x < 0.0) { + panic!("Quantile value out of range"); + } + + Summary{ + max_age_buckets, + max_age_seconds, + stream_duration, + target_quantile, + target_error, + inner: Arc::new(RwLock::new(InnerSummary { + sum: Default::default(), + count: Default::default(), + quantile_streams: streams, + head_stream_idx: 0, + last_rotated_timestamp, + })) + } + } + + /// Observe the given value. + pub fn observe(&self, v: f64) { + self.rotate_buckets(); + + let mut inner = self.inner.write(); + inner.sum += v; + inner.count += 1; + + // insert quantiles into all streams/buckets. + for stream in inner.quantile_streams.iter_mut() { + stream.insert(v); + } + } + + /// Retrieve the values of the summary metric. + pub fn get(&self) -> (f64, u64, Vec<(f64, f64)>) { + self.rotate_buckets(); + + let inner = self.inner.read(); + let sum = inner.sum; + let count = inner.count; + let mut quantile_values: Vec<(f64, f64)> = Vec::new(); + + for q in self.target_quantile.iter() { + match inner.quantile_streams[inner.head_stream_idx as usize].query(*q) { + Some((_, v)) => quantile_values.push((*q, v)), + None => continue, + }; + } + (sum, count, quantile_values) + } + + fn rotate_buckets(&self) { + let mut inner = self.inner.write(); + if inner.last_rotated_timestamp.elapsed() >= self.stream_duration { + inner.last_rotated_timestamp = Instant::now(); + if inner.head_stream_idx == self.max_age_buckets { + inner.head_stream_idx = 0; + } else { + inner.head_stream_idx += 1; + } + }; + } +} + +impl TypedMetric for Summary { + const TYPE: MetricType = MetricType::Summary; +} + +impl EncodeMetric for Summary { + fn encode(&self, mut encoder: MetricEncoder) -> Result<(), std::fmt::Error> { + let (sum, count, quantiles) = self.get(); + encoder.encode_summary::<()>(sum, count, &quantiles) + } + + fn metric_type(&self) -> MetricType { + Self::TYPE + } +} + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn summary() { + let summary = Summary::new(5, 10, vec![0.5, 0.9, 0.99], 0.01); + summary.observe(1.0); + summary.observe(5.0); + summary.observe(10.0); + + let (s, c, q) = summary.get(); + assert_eq!(16.0, s); + assert_eq!(3, c); + assert_eq!(vec![(0.5, 5.0), (0.9, 10.0), (0.99, 10.0)], q); + } + + #[test] + #[should_panic(expected="Quantile value out of range")] + fn summary_panic() { + Summary::new(5, 10, vec![1.0, 5.0, 9.0], 0.01); + } +}