From 29661c0edca7b1f6b52a4d69d96a96f3a31cc4db Mon Sep 17 00:00:00 2001 From: Palash Nigam Date: Mon, 21 Nov 2022 19:54:32 +0530 Subject: [PATCH 1/3] Add Summary metric Signed-off-by: Palash Nigam --- Cargo.toml | 1 + src/encoding/text.rs | 80 ++++++++++++++++++++++ src/metrics.rs | 2 + src/metrics/summary.rs | 151 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 234 insertions(+) create mode 100644 src/metrics/summary.rs diff --git a/Cargo.toml b/Cargo.toml index a42a237f..3da3e9c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ prometheus-client-derive-encode = { version = "0.3.0", path = "derive-encode" } prost = { version = "0.11.0", optional = true } prost-types = { version = "0.11.0", optional = true } void = { version = "1.0", optional = true } +quantiles = "0.7.1" [dev-dependencies] async-std = { version = "1", features = ["attributes"] } diff --git a/src/encoding/text.rs b/src/encoding/text.rs index 754b08a7..f6d61035 100644 --- a/src/encoding/text.rs +++ b/src/encoding/text.rs @@ -29,6 +29,7 @@ use crate::metrics::exemplar::{CounterWithExemplar, Exemplar, HistogramWithExemp use crate::metrics::family::{Family, MetricConstructor}; use crate::metrics::gauge::{self, Gauge}; use crate::metrics::histogram::Histogram; +use crate::metrics::summary::Summary; use crate::metrics::info::Info; use crate::metrics::{MetricType, TypedMetric}; use crate::registry::{Registry, Unit}; @@ -186,6 +187,7 @@ impl Encode for MetricType { MetricType::Histogram => "histogram", MetricType::Info => "info", MetricType::Unknown => "unknown", + MetricType::Summary => "summary", }; writer.write_all(t.as_bytes())?; @@ -323,6 +325,23 @@ impl<'a> BucketEncoder<'a> { }) } + /// Encode a quantile. Used for the [`Summary`] metric type. + pub fn encode_quantile(&mut self, quantile: f64) -> Result { + if self.opened_curly_brackets { + self.writer.write_all(b",")?; + } else { + self.writer.write_all(b"{")?; + } + + self.writer.write_all(b"quantile=\"")?; + quantile.encode(self.writer)?; + self.writer.write_all(b"\"}")?; + + Ok(ValueEncoder { + writer: self.writer, + }) + } + /// Signal that the metric type has no bucket. pub fn no_bucket(&mut self) -> Result { if self.opened_curly_brackets { @@ -579,6 +598,41 @@ fn encode_histogram_with_maybe_exemplars( Ok(()) } + +///////////////////////////////////////////////////////////////////////////////// +// Summary + +impl EncodeMetric for Summary { + fn encode(&self, mut encoder: Encoder) -> Result<(), std::io::Error> { + let (sum, count, quantiles) = self.get(); + + encoder + .encode_suffix("sum")? + .no_bucket()? + .encode_value(sum)? + .no_exemplar()?; + encoder + .encode_suffix("count")? + .no_bucket()? + .encode_value(count)? + .no_exemplar()?; + + for (_, (quantile, result)) in quantiles.iter().enumerate() { + let mut bucket_encoder = encoder.no_suffix()?; + let mut value_encoder = bucket_encoder.encode_quantile(*quantile)?; + let mut exemplar_encoder = value_encoder.encode_value(*result)?; + exemplar_encoder.no_exemplar()? + } + + Result::Ok(()) + } + + fn metric_type(&self) -> MetricType { + Self::TYPE + } +} + + ///////////////////////////////////////////////////////////////////////////////// // Info @@ -818,6 +872,32 @@ mod tests { parse_with_python_client(String::from_utf8(encoded).unwrap()); } + #[test] + fn encode_summary() { + let mut registry = Registry::default(); + let summary = Summary::new(3, 10, vec![0.5, 0.9, 0.99], 0.0); + registry.register("my_summary", "My summary", summary.clone()); + summary.observe(0.10); + summary.observe(0.20); + summary.observe(0.30); + + let mut encoded = Vec::new(); + + encode(&mut encoded, ®istry).unwrap(); + + let expected = "# HELP my_summary My summary.\n".to_owned() + + "# TYPE my_summary summary\n" + + "my_summary_sum 0.6000000000000001\n" + + "my_summary_count 3\n" + + "my_summary{quantile=\"0.5\"} 0.2\n" + + "my_summary{quantile=\"0.9\"} 0.3\n" + + "my_summary{quantile=\"0.99\"} 0.3\n" + + "# EOF\n"; + assert_eq!(expected, String::from_utf8(encoded.clone()).unwrap()); + + parse_with_python_client(String::from_utf8(encoded).unwrap()); + } + fn parse_with_python_client(input: String) { pyo3::prepare_freethreaded_python(); diff --git a/src/metrics.rs b/src/metrics.rs index 647fa5c7..e1a4cb2a 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -5,6 +5,7 @@ pub mod exemplar; pub mod family; pub mod gauge; pub mod histogram; +pub mod summary; pub mod info; /// A metric that is aware of its Open Metrics metric type. @@ -22,6 +23,7 @@ pub enum MetricType { Histogram, Info, Unknown, + Summary, // Not (yet) supported metric types. // // GaugeHistogram, diff --git a/src/metrics/summary.rs b/src/metrics/summary.rs new file mode 100644 index 00000000..b4a30c62 --- /dev/null +++ b/src/metrics/summary.rs @@ -0,0 +1,151 @@ +//! Module implementing an Open Metrics histogram. +//! +//! See [`Summary`] for details. + +use super::{MetricType, TypedMetric}; +//use owning_ref::OwningRef; +//use std::iter::{self, once}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use quantiles::ckms::CKMS; + +/// Open Metrics [`Summary`] to measure distributions of discrete events. +#[derive(Debug)] +pub struct Summary { + target_quantile: Vec, + target_error: f64, + max_age_buckets: u64, + max_age_seconds: u64, + stream_duration: Duration, + inner: Arc>, +} + +impl Clone for Summary { + fn clone(&self) -> Self { + Summary { + target_quantile: self.target_quantile.clone(), + target_error: self.target_error, + max_age_buckets: self.max_age_buckets, + max_age_seconds: self.max_age_seconds, + stream_duration: self.stream_duration, + inner: self.inner.clone(), + } + } +} + +#[derive(Debug)] +pub(crate) struct InnerSummary { + sum: f64, + count: u64, + quantile_streams: Vec>, + // head_stream is like a cursor which carries the index + // of the stream in the quantile_streams that we want to query. + head_stream_idx: u64, + // timestamp at which the head_stream_idx was last rotated. + last_rotated_timestamp: Instant, +} + +impl Summary { + /// Create a new [`Summary`]. + pub fn new(max_age_buckets: u64, max_age_seconds: u64, target_quantile: Vec, target_error: f64) -> Self { + let mut streams: Vec> = Vec::new(); + for _ in 0..max_age_buckets { + streams.push(CKMS::new(target_error)); + } + + let stream_duration = Duration::from_secs(max_age_seconds / max_age_buckets); + let last_rotated_timestamp = Instant::now(); + + if target_quantile.iter().any(|&x| x > 1.0 || x < 0.0) { + panic!("Quantile value out of range"); + } + + Summary{ + max_age_buckets, + max_age_seconds, + stream_duration, + target_quantile, + target_error, + inner: Arc::new(Mutex::new(InnerSummary { + sum: Default::default(), + count: Default::default(), + quantile_streams: streams, + head_stream_idx: 0, + last_rotated_timestamp, + })) + } + } + + /// Observe the given value. + pub fn observe(&self, v: f64) { + self.rotate_buckets(); + + let mut inner = self.inner.lock().unwrap(); + inner.sum += v; + inner.count += 1; + + // insert quantiles into all streams/buckets. + for stream in inner.quantile_streams.iter_mut() { + stream.insert(v); + } + } + + /// Retrieve the values of the summary metric. + pub fn get(&self) -> (f64, u64, Vec<(f64, f64)>) { + self.rotate_buckets(); + + let inner = self.inner.lock().unwrap(); + let sum = inner.sum; + let count = inner.count; + let mut quantile_values: Vec<(f64, f64)> = Vec::new(); + + for q in self.target_quantile.iter() { + match inner.quantile_streams[inner.head_stream_idx as usize].query(*q) { + Some((_, v)) => quantile_values.push((*q, v)), + None => continue, + }; + } + (sum, count, quantile_values) + } + + fn rotate_buckets(&self) { + let mut inner = self.inner.lock().unwrap(); + if inner.last_rotated_timestamp.elapsed() >= self.stream_duration { + inner.last_rotated_timestamp = Instant::now(); + if inner.head_stream_idx == self.max_age_buckets { + inner.head_stream_idx = 0; + } else { + inner.head_stream_idx += 1; + } + }; + } +} + +impl TypedMetric for Summary { + const TYPE: MetricType = MetricType::Summary; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn summary() { + let summary = Summary::new(5, 10, vec![0.5, 0.9, 0.99], 0.01); + summary.observe(1.0); + summary.observe(5.0); + summary.observe(10.0); + + let (s, c, q) = summary.get(); + assert_eq!(16.0, s); + assert_eq!(3, c); + assert_eq!(vec![(0.5, 5.0), (0.9, 10.0), (0.99, 10.0)], q); + } + + #[test] + #[should_panic(expected="Quantile value out of range")] + fn summary_panic() { + Summary::new(5, 10, vec![1.0, 5.0, 9.0], 0.01); + } +} From 9462cd6a2f95c086b142c9f8d11683953e1bbd2a Mon Sep 17 00:00:00 2001 From: Palash Nigam Date: Sun, 29 Jan 2023 00:41:35 +0530 Subject: [PATCH 2/3] Add serde style encoding for Summary Signed-off-by: Palash Nigam --- src/encoding.rs | 15 +++++++++ src/encoding/protobuf.rs | 73 ++++++++++++++++++++++++++++++++++++++++ src/encoding/text.rs | 42 ++++++++++++++++++++--- src/metrics.rs | 1 + src/metrics/summary.rs | 14 ++++++++ 5 files changed, 141 insertions(+), 4 deletions(-) diff --git a/src/encoding.rs b/src/encoding.rs index b59f2450..c912d14c 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -131,6 +131,21 @@ impl<'a, 'b> MetricEncoder<'a, 'b> { ) } + /// Encode a summary. + pub fn encode_summary( + &mut self, + sum: f64, + count: u64, + quantiles: &[(f64, f64)], + ) -> Result<(), std::fmt::Error> { + for_both_mut!( + self, + MetricEncoderInner, + e, + e.encode_summary::(sum, count, quantiles) + ) + } + /// Encode a metric family. pub fn encode_family<'c, 'd, S: EncodeLabelSet>( &'c mut self, diff --git a/src/encoding/protobuf.rs b/src/encoding/protobuf.rs index 500ded75..3951a7d6 100644 --- a/src/encoding/protobuf.rs +++ b/src/encoding/protobuf.rs @@ -99,6 +99,7 @@ impl From for openmetrics_data_model::MetricType { MetricType::Counter => openmetrics_data_model::MetricType::Counter, MetricType::Gauge => openmetrics_data_model::MetricType::Gauge, MetricType::Histogram => openmetrics_data_model::MetricType::Histogram, + MetricType::Summary => openmetrics_data_model::MetricType::Summary, MetricType::Info => openmetrics_data_model::MetricType::Info, MetricType::Unknown => openmetrics_data_model::MetricType::Unknown, } @@ -247,6 +248,44 @@ impl<'a> MetricEncoder<'a> { Ok(()) } + + pub fn encode_summary( + &mut self, + sum: f64, + count: u64, + quantile: &[(f64, f64)], + ) -> Result<(), std::fmt::Error> { + let quantiles = quantiles + .iter() + .enumerate() + .map(|(_, (quantile, value))| { + Ok(openmetrics_data_model::summary_value::Quantile { + quantile: *quantile, + value: *value, + }) + }) + .collect::, std::fmt::Error>>()?; + + self.family.push(openmetrics_data_model::Metric { + labels: self.labels.clone(), + metric_points: vec![openmetrics_data_model::MetricPoint { + value: Some(openmetrics_data_model::metric_point::Value::SummaryValue( + openmetrics_data_model::SummaryValue { + count, + created: None, + quantiles, + sum: Some(openmetrics_data_model::summary_value::Sum::DoubleValue( + sum, + )), + }, + )), + ..Default::default() + }], + }); + + Ok(()) + } + } impl TryFrom<&Exemplar> @@ -403,6 +442,7 @@ mod tests { use crate::metrics::family::Family; use crate::metrics::gauge::Gauge; use crate::metrics::histogram::{exponential_buckets, Histogram}; + use crate::metrics::summary::Summary; use crate::metrics::info::Info; use crate::registry::Unit; use std::borrow::Cow; @@ -643,6 +683,39 @@ mod tests { } } + #[test] + fn encode_summary() { + let mut registry = Registry::default(); + let summary = Summary::new(5, 10, vec![0.5, 0.9, 0.99], 0.01); + registry.register("my_summary", "My summary", summary.clone()); + summary.observe(1.0); + + let metric_set = encode(®istry).unwrap(); + + let family = metric_set.metric_families.first().unwrap(); + assert_eq!("my_summary", family.name); + assert_eq!("My summary.", family.help); + + assert_eq!( + openmetrics_data_model::MetricType::Summary as i32, + extract_metric_type(&metric_set) + ); + + match extract_metric_point_value(metric_set) { + openmetrics_data_model::metric_point::Value::SummaryValue(value) => { + assert_eq!( + Some(openmetrics_data_model::summary_value::Sum::DoubleValue( + 1.0 + )), + value.sum + ); + assert_eq!(1, value.count); + assert_eq!(11, value.quantile.len()); + } + _ => panic!("wrong value type"), + } + } + #[test] fn encode_histogram() { let mut registry = Registry::default(); diff --git a/src/encoding/text.rs b/src/encoding/text.rs index bcd24c85..1a26f101 100644 --- a/src/encoding/text.rs +++ b/src/encoding/text.rs @@ -211,6 +211,40 @@ impl<'a, 'b> MetricEncoder<'a, 'b> { }) } + pub fn encode_summary( + &mut self, + sum: f64, + count: u64, + quantiles: &[(f64, f64)], + ) -> Result<(), std::fmt::Error> { + self.write_name_and_unit()?; + self.write_suffix("sum")?; + self.encode_labels::<()>(None)?; + self.writer.write_str(" ")?; + self.writer.write_str(dtoa::Buffer::new().format(sum))?; + self.newline()?; + + self.write_name_and_unit()?; + self.write_suffix("count")?; + self.encode_labels::<()>(None)?; + self.writer.write_str(" ")?; + self.writer.write_str(itoa::Buffer::new().format(count))?; + self.newline()?; + + for (_, (quantile, result)) in quantiles.iter().enumerate() { + self.write_name_and_unit()?; + self.encode_labels(Some(&[("quantile", *quantile)]))?; + + self.writer.write_str(" ")?; + self.writer + .write_str(result.to_string().as_str())?; + + self.newline()?; + } + + Ok(()) + } + pub fn encode_histogram( &mut self, sum: f64, @@ -511,6 +545,7 @@ mod tests { use crate::metrics::family::Family; use crate::metrics::gauge::Gauge; use crate::metrics::histogram::{exponential_buckets, Histogram}; + use crate::metrics::summary::Summary; use crate::metrics::info::Info; use crate::metrics::{counter::Counter, exemplar::CounterWithExemplar}; use pyo3::{prelude::*, types::PyModule}; @@ -732,8 +767,7 @@ mod tests { summary.observe(0.20); summary.observe(0.30); - let mut encoded = Vec::new(); - + let mut encoded = String::new(); encode(&mut encoded, ®istry).unwrap(); let expected = "# HELP my_summary My summary.\n".to_owned() @@ -744,9 +778,9 @@ mod tests { + "my_summary{quantile=\"0.9\"} 0.3\n" + "my_summary{quantile=\"0.99\"} 0.3\n" + "# EOF\n"; - assert_eq!(expected, String::from_utf8(encoded.clone()).unwrap()); + assert_eq!(expected, encoded); - parse_with_python_client(String::from_utf8(encoded).unwrap()); + parse_with_python_client(encoded); } fn parse_with_python_client(input: String) { diff --git a/src/metrics.rs b/src/metrics.rs index a4683b5c..57500721 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -38,6 +38,7 @@ impl MetricType { MetricType::Counter => "counter", MetricType::Gauge => "gauge", MetricType::Histogram => "histogram", + MetricType::Summary => "summary", MetricType::Info => "info", MetricType::Unknown => "unknown", } diff --git a/src/metrics/summary.rs b/src/metrics/summary.rs index b4a30c62..2b39b217 100644 --- a/src/metrics/summary.rs +++ b/src/metrics/summary.rs @@ -2,6 +2,8 @@ //! //! See [`Summary`] for details. +use crate::encoding::{EncodeMetric, MetricEncoder}; + use super::{MetricType, TypedMetric}; //use owning_ref::OwningRef; //use std::iter::{self, once}; @@ -126,6 +128,18 @@ impl TypedMetric for Summary { const TYPE: MetricType = MetricType::Summary; } +impl EncodeMetric for Summary { + fn encode(&self, mut encoder: MetricEncoder) -> Result<(), std::fmt::Error> { + let (sum, count, quantiles) = self.get(); + encoder.encode_summary::<()>(sum, count, &quantiles) + } + + fn metric_type(&self) -> MetricType { + Self::TYPE + } +} + + #[cfg(test)] mod tests { use super::*; From 29f3d4bed7978ad3ae59d4d2088724d33cfab5ae Mon Sep 17 00:00:00 2001 From: Palash Nigam Date: Mon, 6 Mar 2023 12:48:39 +0530 Subject: [PATCH 3/3] Address review comments Signed-off-by: Palash Nigam --- src/metrics/summary.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/metrics/summary.rs b/src/metrics/summary.rs index 2b39b217..2f132334 100644 --- a/src/metrics/summary.rs +++ b/src/metrics/summary.rs @@ -1,13 +1,12 @@ -//! Module implementing an Open Metrics histogram. +//! Module implementing an Open Metrics summary. //! //! See [`Summary`] for details. use crate::encoding::{EncodeMetric, MetricEncoder}; use super::{MetricType, TypedMetric}; -//use owning_ref::OwningRef; -//use std::iter::{self, once}; -use std::sync::{Arc, Mutex}; +use parking_lot::RwLock; +use std::sync::Arc; use std::time::{Duration, Instant}; use quantiles::ckms::CKMS; @@ -20,7 +19,7 @@ pub struct Summary { max_age_buckets: u64, max_age_seconds: u64, stream_duration: Duration, - inner: Arc>, + inner: Arc>, } impl Clone for Summary { @@ -69,7 +68,7 @@ impl Summary { stream_duration, target_quantile, target_error, - inner: Arc::new(Mutex::new(InnerSummary { + inner: Arc::new(RwLock::new(InnerSummary { sum: Default::default(), count: Default::default(), quantile_streams: streams, @@ -83,7 +82,7 @@ impl Summary { pub fn observe(&self, v: f64) { self.rotate_buckets(); - let mut inner = self.inner.lock().unwrap(); + let mut inner = self.inner.write(); inner.sum += v; inner.count += 1; @@ -97,7 +96,7 @@ impl Summary { pub fn get(&self) -> (f64, u64, Vec<(f64, f64)>) { self.rotate_buckets(); - let inner = self.inner.lock().unwrap(); + let inner = self.inner.read(); let sum = inner.sum; let count = inner.count; let mut quantile_values: Vec<(f64, f64)> = Vec::new(); @@ -112,7 +111,7 @@ impl Summary { } fn rotate_buckets(&self) { - let mut inner = self.inner.lock().unwrap(); + let mut inner = self.inner.write(); if inner.last_rotated_timestamp.elapsed() >= self.stream_duration { inner.last_rotated_timestamp = Instant::now(); if inner.head_stream_idx == self.max_age_buckets {