diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index aaad464747..fc9d5975c3 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -27,15 +27,6 @@ pub(crate) trait Measure: Send + Sync + 'static { fn call(&self, measurement: T, attrs: &[KeyValue]); } -impl Measure for F -where - F: Fn(T, &[KeyValue]) + Send + Sync + 'static, -{ - fn call(&self, measurement: T, attrs: &[KeyValue]) { - self(measurement, attrs) - } -} - /// Stores the aggregate of measurements into the aggregation and returns the number /// of aggregate data-points output. pub(crate) trait ComputeAggregation: Send + Sync + 'static { @@ -47,15 +38,23 @@ pub(crate) trait ComputeAggregation: Send + Sync + 'static { fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>); } -impl ComputeAggregation for T +/// Separate `measure` and `collect` functions for an aggregate. +pub(crate) struct AggregateFns { + pub(crate) measure: Arc>, + pub(crate) collect: Arc, +} + +/// Creates aggregate functions out of aggregate instance +impl From for AggregateFns where - T: Fn(Option<&mut dyn Aggregation>) -> (usize, Option>) - + Send - + Sync - + 'static, + A: Measure + ComputeAggregation, { - fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { - self(dest) + fn from(value: A) -> Self { + let inst = Arc::new(value); + Self { + measure: inst.clone(), + collect: inst, + } } } @@ -144,30 +143,18 @@ impl AggregateBuilder { } /// Builds a last-value aggregate function input and output. - pub(crate) fn last_value(&self) -> (impl Measure, impl ComputeAggregation) { - let lv = Arc::new(LastValue::new(self.temporality, self.filter.clone())); - (lv.clone(), lv) + pub(crate) fn last_value(&self) -> AggregateFns { + LastValue::new(self.temporality, self.filter.clone()).into() } /// Builds a precomputed sum aggregate function input and output. - pub(crate) fn precomputed_sum( - &self, - monotonic: bool, - ) -> (impl Measure, impl ComputeAggregation) { - let s = Arc::new(PrecomputedSum::new( - self.temporality, - self.filter.clone(), - monotonic, - )); - - (s.clone(), s) + pub(crate) fn precomputed_sum(&self, monotonic: bool) -> AggregateFns { + PrecomputedSum::new(self.temporality, self.filter.clone(), monotonic).into() } /// Builds a sum aggregate function input and output. - pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure, impl ComputeAggregation) { - let s = Arc::new(Sum::new(self.temporality, self.filter.clone(), monotonic)); - - (s.clone(), s) + pub(crate) fn sum(&self, monotonic: bool) -> AggregateFns { + Sum::new(self.temporality, self.filter.clone(), monotonic).into() } /// Builds a histogram aggregate function input and output. @@ -176,16 +163,15 @@ impl AggregateBuilder { boundaries: Vec, record_min_max: bool, record_sum: bool, - ) -> (impl Measure, impl ComputeAggregation) { - let h = Arc::new(Histogram::new( + ) -> AggregateFns { + Histogram::new( self.temporality, self.filter.clone(), boundaries, record_min_max, record_sum, - )); - - (h.clone(), h) + ) + .into() } /// Builds an exponential histogram aggregate function input and output. @@ -195,17 +181,16 @@ impl AggregateBuilder { max_scale: i8, record_min_max: bool, record_sum: bool, - ) -> (impl Measure, impl ComputeAggregation) { - let h = Arc::new(ExpoHistogram::new( + ) -> AggregateFns { + ExpoHistogram::new( self.temporality, self.filter.clone(), max_size, max_scale, record_min_max, record_sum, - )); - - (h.clone(), h) + ) + .into() } } @@ -221,7 +206,7 @@ mod tests { #[test] fn last_value_aggregation() { - let (measure, agg) = + let AggregateFns { measure, collect } = AggregateBuilder::::new(Temporality::Cumulative, None).last_value(); let mut a = Gauge { data_points: vec![GaugeDataPoint { @@ -235,7 +220,7 @@ mod tests { let new_attributes = [KeyValue::new("b", 2)]; measure.call(2, &new_attributes[..]); - let (count, new_agg) = agg.call(Some(&mut a)); + let (count, new_agg) = collect.call(Some(&mut a)); assert_eq!(count, 1); assert!(new_agg.is_none()); @@ -247,7 +232,7 @@ mod tests { #[test] fn precomputed_sum_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let (measure, agg) = + let AggregateFns { measure, collect } = AggregateBuilder::::new(temporality, None).precomputed_sum(true); let mut a = Sum { data_points: vec![ @@ -274,7 +259,7 @@ mod tests { let new_attributes = [KeyValue::new("b", 2)]; measure.call(3, &new_attributes[..]); - let (count, new_agg) = agg.call(Some(&mut a)); + let (count, new_agg) = collect.call(Some(&mut a)); assert_eq!(count, 1); assert!(new_agg.is_none()); @@ -289,7 +274,8 @@ mod tests { #[test] fn sum_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let (measure, agg) = AggregateBuilder::::new(temporality, None).sum(true); + let AggregateFns { measure, collect } = + AggregateBuilder::::new(temporality, None).sum(true); let mut a = Sum { data_points: vec![ SumDataPoint { @@ -315,7 +301,7 @@ mod tests { let new_attributes = [KeyValue::new("b", 2)]; measure.call(3, &new_attributes[..]); - let (count, new_agg) = agg.call(Some(&mut a)); + let (count, new_agg) = collect.call(Some(&mut a)); assert_eq!(count, 1); assert!(new_agg.is_none()); @@ -330,7 +316,7 @@ mod tests { #[test] fn explicit_bucket_histogram_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let (measure, agg) = AggregateBuilder::::new(temporality, None) + let AggregateFns { measure, collect } = AggregateBuilder::::new(temporality, None) .explicit_bucket_histogram(vec![1.0], true, true); let mut a = Histogram { data_points: vec![HistogramDataPoint { @@ -354,7 +340,7 @@ mod tests { let new_attributes = [KeyValue::new("b", 2)]; measure.call(3, &new_attributes[..]); - let (count, new_agg) = agg.call(Some(&mut a)); + let (count, new_agg) = collect.call(Some(&mut a)); assert_eq!(count, 1); assert!(new_agg.is_none()); @@ -373,7 +359,7 @@ mod tests { #[test] fn exponential_histogram_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let (measure, agg) = AggregateBuilder::::new(temporality, None) + let AggregateFns { measure, collect } = AggregateBuilder::::new(temporality, None) .exponential_bucket_histogram(4, 20, true, true); let mut a = ExponentialHistogram { data_points: vec![ExponentialHistogramDataPoint { @@ -406,7 +392,7 @@ mod tests { let new_attributes = [KeyValue::new("b", 2)]; measure.call(3, &new_attributes[..]); - let (count, new_agg) = agg.call(Some(&mut a)); + let (count, new_agg) = collect.call(Some(&mut a)); assert_eq!(count, 1); assert!(new_agg.is_none()); diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index b54d78bace..170f4a068d 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -1,9 +1,4 @@ -use std::{ - f64::consts::LOG2_E, - mem::replace, - ops::DerefMut, - sync::{Arc, Mutex}, -}; +use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex}; use opentelemetry::{otel_debug, KeyValue}; use std::sync::OnceLock; @@ -500,7 +495,7 @@ impl ExpoHistogram { } } -impl Measure for Arc> +impl Measure for ExpoHistogram where T: Number, { @@ -518,7 +513,7 @@ where } } -impl ComputeAggregation for Arc> +impl ComputeAggregation for ExpoHistogram where T: Number, { @@ -529,11 +524,12 @@ where } } } - #[cfg(test)] mod tests { use std::{ops::Neg, time::SystemTime}; + use tests::internal::AggregateFns; + use crate::metrics::internal::{self, AggregateBuilder}; use super::*; @@ -695,14 +691,14 @@ mod tests { ]; for test in test_cases { - let h = Arc::new(ExpoHistogram::new( + let h = ExpoHistogram::new( Temporality::Cumulative, AttributeSetFilter::new(None), 4, 20, true, true, - )); + ); for v in test.values { Measure::call(&h, v, &[]); } @@ -751,14 +747,14 @@ mod tests { ]; for test in test_cases { - let h = Arc::new(ExpoHistogram::new( + let h = ExpoHistogram::new( Temporality::Cumulative, AttributeSetFilter::new(None), 4, 20, true, true, - )); + ); for v in test.values { Measure::call(&h, v, &[]); } @@ -1252,15 +1248,6 @@ mod tests { hist_aggregation::(); } - fn box_val( - (m, ca): (impl internal::Measure, impl internal::ComputeAggregation), - ) -> ( - Box>, - Box, - ) { - (Box::new(m), Box::new(ca)) - } - fn hist_aggregation>() { let max_size = 4; let max_scale = 20; @@ -1270,12 +1257,7 @@ mod tests { #[allow(clippy::type_complexity)] struct TestCase { name: &'static str, - build: Box< - dyn Fn() -> ( - Box>, - Box, - ), - >, + build: Box AggregateFns>, input: Vec>, want: data::ExponentialHistogram, want_count: usize, @@ -1284,14 +1266,11 @@ mod tests { TestCase { name: "Delta Single", build: Box::new(move || { - box_val( - AggregateBuilder::new(Temporality::Delta, None) - .exponential_bucket_histogram( - max_size, - max_scale, - record_min_max, - record_sum, - ), + AggregateBuilder::new(Temporality::Delta, None).exponential_bucket_histogram( + max_size, + max_scale, + record_min_max, + record_sum, ) }), input: vec![vec![4, 4, 4, 2, 16, 1] @@ -1327,15 +1306,13 @@ mod tests { TestCase { name: "Cumulative Single", build: Box::new(move || { - box_val( - internal::AggregateBuilder::new(Temporality::Cumulative, None) - .exponential_bucket_histogram( - max_size, - max_scale, - record_min_max, - record_sum, - ), - ) + internal::AggregateBuilder::new(Temporality::Cumulative, None) + .exponential_bucket_histogram( + max_size, + max_scale, + record_min_max, + record_sum, + ) }), input: vec![vec![4, 4, 4, 2, 16, 1] .into_iter() @@ -1370,15 +1347,13 @@ mod tests { TestCase { name: "Delta Multiple", build: Box::new(move || { - box_val( - internal::AggregateBuilder::new(Temporality::Delta, None) - .exponential_bucket_histogram( - max_size, - max_scale, - record_min_max, - record_sum, - ), - ) + internal::AggregateBuilder::new(Temporality::Delta, None) + .exponential_bucket_histogram( + max_size, + max_scale, + record_min_max, + record_sum, + ) }), input: vec![ vec![2, 3, 8].into_iter().map(Into::into).collect(), @@ -1416,15 +1391,13 @@ mod tests { TestCase { name: "Cumulative Multiple ", build: Box::new(move || { - box_val( - internal::AggregateBuilder::new(Temporality::Cumulative, None) - .exponential_bucket_histogram( - max_size, - max_scale, - record_min_max, - record_sum, - ), - ) + internal::AggregateBuilder::new(Temporality::Cumulative, None) + .exponential_bucket_histogram( + max_size, + max_scale, + record_min_max, + record_sum, + ) }), input: vec![ vec![2, 3, 8].into_iter().map(Into::into).collect(), @@ -1462,7 +1435,7 @@ mod tests { ]; for test in test_cases { - let (in_fn, out_fn) = (test.build)(); + let AggregateFns { measure, collect } = (test.build)(); let mut got: Box = Box::new(data::ExponentialHistogram:: { data_points: vec![], @@ -1473,9 +1446,9 @@ mod tests { let mut count = 0; for n in test.input { for v in n { - in_fn.call(v, &[]) + measure.call(v, &[]) } - count = out_fn.call(Some(got.as_mut())).0 + count = collect.call(Some(got.as_mut())).0 } assert_aggregation_eq::(Box::new(test.want), got, test.name); diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index c79f9e4899..988f8cf359 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -1,6 +1,5 @@ use std::mem::replace; use std::ops::DerefMut; -use std::sync::Arc; use std::sync::Mutex; use crate::metrics::data::HistogramDataPoint; @@ -209,7 +208,7 @@ impl Histogram { } } -impl Measure for Arc> +impl Measure for Histogram where T: Number, { @@ -228,7 +227,7 @@ where } } -impl ComputeAggregation for Arc> +impl ComputeAggregation for Histogram where T: Number, { @@ -246,13 +245,13 @@ mod tests { #[test] fn check_buckets_are_selected_correctly() { - let hist = Arc::new(Histogram::::new( + let hist = Histogram::::new( Temporality::Cumulative, AttributeSetFilter::new(None), vec![1.0, 3.0, 6.0], false, false, - )); + ); for v in 1..11 { Measure::call(&hist, v, &[]); } diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index 8fbd92f44a..cc2176b897 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use crate::metrics::{ data::{self, Aggregation, GaugeDataPoint}, Temporality, @@ -128,7 +126,7 @@ impl LastValue { } } -impl Measure for Arc> +impl Measure for LastValue where T: Number, { @@ -139,7 +137,7 @@ where } } -impl ComputeAggregation for Arc> +impl ComputeAggregation for LastValue where T: Number, { diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 1b5a6a4de5..6316c97b23 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -13,7 +13,7 @@ use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering} use std::sync::{Arc, OnceLock, RwLock}; use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT}; -pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; +pub(crate) use aggregate::{AggregateBuilder, AggregateFns, ComputeAggregation, Measure}; pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; use opentelemetry::{otel_warn, KeyValue}; diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index 6421d85f94..b2f478e078 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -6,7 +6,6 @@ use crate::metrics::Temporality; use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter}; use super::{last_value::Assign, AtomicTracker, Number, ValueMap}; use super::{ComputeAggregation, Measure}; -use std::sync::Arc; use std::{collections::HashMap, sync::Mutex}; /// Summarizes a set of pre-computed sums as their arithmetic sum. @@ -124,7 +123,7 @@ impl PrecomputedSum { } } -impl Measure for Arc> +impl Measure for PrecomputedSum where T: Number, { @@ -135,7 +134,7 @@ where } } -impl ComputeAggregation for Arc> +impl ComputeAggregation for PrecomputedSum where T: Number, { diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 7de2f7d2b5..24b656dc1f 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,6 +1,3 @@ -use std::sync::Arc; -use std::vec; - use crate::metrics::data::{self, Aggregation, SumDataPoint}; use crate::metrics::Temporality; use opentelemetry::KeyValue; @@ -143,7 +140,7 @@ impl Sum { } } -impl Measure for Arc> +impl Measure for Sum where T: Number, { @@ -154,7 +151,7 @@ where } } -impl ComputeAggregation for Arc> +impl ComputeAggregation for Sum where T: Number, { diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 5ba4bba75f..8212d872ad 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -22,6 +22,8 @@ use crate::{ Resource, }; +use self::internal::AggregateFns; + use super::Aggregation; /// Connects all of the instruments created by a meter provider to a [MetricReader]. @@ -176,7 +178,7 @@ struct InstrumentSync { name: Cow<'static, str>, description: Cow<'static, str>, unit: Cow<'static, str>, - comp_agg: Box, + comp_agg: Arc, } impl fmt::Debug for InstrumentSync { @@ -385,9 +387,9 @@ where .map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>); let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter); - let (m, ca) = match aggregate_fn(b, &agg, kind) { - Ok(Some((m, ca))) => (m, ca), - other => return other.map(|fs| fs.map(|(m, _)| m)), // Drop aggregator or error + let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind) { + Ok(Some(inst)) => inst, + other => return other.map(|fs| fs.map(|inst| inst.measure)), // Drop aggregator or error }; self.pipeline.add_sync( @@ -396,11 +398,11 @@ where name: stream.name, description: stream.description, unit: stream.unit, - comp_agg: ca, + comp_agg: collect, }, ); - Ok(Some(m)) + Ok(Some(measure)) }); match cached { @@ -476,11 +478,6 @@ fn default_aggregation_selector(kind: InstrumentKind) -> Aggregation { } } -type AggregateFns = ( - Arc>, - Box, -); - /// Returns new aggregate functions for the given params. /// /// If the aggregation is unknown or temporality is invalid, an error is returned. @@ -489,25 +486,16 @@ fn aggregate_fn( agg: &aggregation::Aggregation, kind: InstrumentKind, ) -> MetricResult>> { - fn box_val( - (m, ca): (impl internal::Measure, impl internal::ComputeAggregation), - ) -> ( - Arc>, - Box, - ) { - (Arc::new(m), Box::new(ca)) - } - match agg { Aggregation::Default => aggregate_fn(b, &default_aggregation_selector(kind), kind), Aggregation::Drop => Ok(None), - Aggregation::LastValue => Ok(Some(box_val(b.last_value()))), + Aggregation::LastValue => Ok(Some(b.last_value())), Aggregation::Sum => { let fns = match kind { - InstrumentKind::ObservableCounter => box_val(b.precomputed_sum(true)), - InstrumentKind::ObservableUpDownCounter => box_val(b.precomputed_sum(false)), - InstrumentKind::Counter | InstrumentKind::Histogram => box_val(b.sum(true)), - _ => box_val(b.sum(false)), + InstrumentKind::ObservableCounter => b.precomputed_sum(true), + InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false), + InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true), + _ => b.sum(false), }; Ok(Some(fns)) } @@ -521,11 +509,11 @@ fn aggregate_fn( | InstrumentKind::ObservableUpDownCounter | InstrumentKind::ObservableGauge ); - Ok(Some(box_val(b.explicit_bucket_histogram( + Ok(Some(b.explicit_bucket_histogram( boundaries.to_vec(), *record_min_max, record_sum, - )))) + ))) } Aggregation::Base2ExponentialHistogram { max_size, @@ -538,12 +526,12 @@ fn aggregate_fn( | InstrumentKind::ObservableUpDownCounter | InstrumentKind::ObservableGauge ); - Ok(Some(box_val(b.exponential_bucket_histogram( + Ok(Some(b.exponential_bucket_histogram( *max_size, *max_scale, *record_min_max, record_sum, - )))) + ))) } } }