Skip to content

Commit

Permalink
Merge branch 'main' into http-client-request-body-bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyCpp authored Jan 8, 2025
2 parents e0156e6 + e05979d commit c643683
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 170 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ for specific dates and for Zoom meeting links. "OTel Rust SIG" is the name of
meeting for this group.

Meeting notes are available as a public [Google
doc](https://docs.google.com/document/d/1tGKuCsSnyT2McDncVJrMgg74_z8V06riWZa0Sr79I_4/edit).
doc](https://docs.google.com/document/d/12upOzNk8c3SFTjsL6IRohCWMgzLKoknSCOOdMakbWo4/edit).
If you have trouble accessing the doc, please get in touch on
[Slack](https://cloud-native.slack.com/archives/C03GDP0H023).

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@
//! * `gzip-tonic`: Use gzip compression for `tonic` grpc layer.
//! * `zstd-tonic`: Use zstd compression for `tonic` grpc layer.
//! * `tls-roots`: Adds system trust roots to rustls-based gRPC clients using the rustls-native-certs crate
//! * `tls-webkpi-roots`: Embeds Mozilla's trust roots to rustls-based gRPC clients using the webkpi-roots crate
//! * `tls-webpki-roots`: Embeds Mozilla's trust roots to rustls-based gRPC clients using the webpki-roots crate
//!
//! The following feature flags offer additional configurations on http:
//!
//! * `http-proto`: Use http as transport layer, protobuf as body format.
//! * `reqwest-blocking-client`: Use reqwest blocking http client.
//! * `reqwest-client`: Use reqwest http client.
//! * `reqwest-rustls`: Use reqwest with TLS with system trust roots via `rustls-native-certs` crate.
//! * `reqwest-rustls-webkpi-roots`: Use reqwest with TLS with Mozilla's trust roots via `webkpi-roots` crate.
//! * `reqwest-rustls-webpki-roots`: Use reqwest with TLS with Mozilla's trust roots via `webpki-roots` crate.
//!
//! # Kitchen Sink Full Configuration
//!
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-otlp/tests/integration_test/tests/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ mod logtests {
Ok(())
}

#[ignore = "TODO: [Fix Me] Failing on CI. Needs to be investigated and resolved."]
#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn logs_batch_non_tokio_main() -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-otlp/tests/integration_test/tests/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ pub fn test_serde() -> Result<()> {
Ok(())
}

#[ignore = "TODO: [Fix Me] Failing on CI. Needs to be investigated and resolved."]
#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn span_batch_non_tokio_main() -> Result<()> {
Expand Down
94 changes: 40 additions & 54 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,6 @@ pub(crate) trait Measure<T>: Send + Sync + 'static {
fn call(&self, measurement: T, attrs: &[KeyValue]);
}

impl<F, T> Measure<T> for F
where
F: Fn(T, &[KeyValue]) + Send + Sync + 'static,
{
fn call(&self, measurement: T, attrs: &[KeyValue]) {
self(measurement, attrs)
}
}

/// Stores the aggregate of measurements into the aggregation and returns the number
/// of aggregate data-points output.
pub(crate) trait ComputeAggregation: Send + Sync + 'static {
Expand All @@ -47,15 +38,23 @@ pub(crate) trait ComputeAggregation: Send + Sync + 'static {
fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>);
}

impl<T> ComputeAggregation for T
/// Separate `measure` and `collect` functions for an aggregate.
pub(crate) struct AggregateFns<T> {
pub(crate) measure: Arc<dyn Measure<T>>,
pub(crate) collect: Arc<dyn ComputeAggregation>,
}

/// Creates aggregate functions out of aggregate instance
impl<A, T> From<A> for AggregateFns<T>
where
T: Fn(Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>)
+ Send
+ Sync
+ 'static,
A: Measure<T> + ComputeAggregation,
{
fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
self(dest)
fn from(value: A) -> Self {
let inst = Arc::new(value);
Self {
measure: inst.clone(),
collect: inst,
}
}
}

Expand Down Expand Up @@ -144,30 +143,18 @@ impl<T: Number> AggregateBuilder<T> {
}

/// Builds a last-value aggregate function input and output.
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
let lv = Arc::new(LastValue::new(self.temporality, self.filter.clone()));
(lv.clone(), lv)
pub(crate) fn last_value(&self) -> AggregateFns<T> {
LastValue::new(self.temporality, self.filter.clone()).into()
}

/// Builds a precomputed sum aggregate function input and output.
pub(crate) fn precomputed_sum(
&self,
monotonic: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(PrecomputedSum::new(
self.temporality,
self.filter.clone(),
monotonic,
));

(s.clone(), s)
pub(crate) fn precomputed_sum(&self, monotonic: bool) -> AggregateFns<T> {
PrecomputedSum::new(self.temporality, self.filter.clone(), monotonic).into()
}

/// Builds a sum aggregate function input and output.
pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(Sum::new(self.temporality, self.filter.clone(), monotonic));

(s.clone(), s)
pub(crate) fn sum(&self, monotonic: bool) -> AggregateFns<T> {
Sum::new(self.temporality, self.filter.clone(), monotonic).into()
}

/// Builds a histogram aggregate function input and output.
Expand All @@ -176,16 +163,15 @@ impl<T: Number> AggregateBuilder<T> {
boundaries: Vec<f64>,
record_min_max: bool,
record_sum: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let h = Arc::new(Histogram::new(
) -> AggregateFns<T> {
Histogram::new(
self.temporality,
self.filter.clone(),
boundaries,
record_min_max,
record_sum,
));

(h.clone(), h)
)
.into()
}

/// Builds an exponential histogram aggregate function input and output.
Expand All @@ -195,17 +181,16 @@ impl<T: Number> AggregateBuilder<T> {
max_scale: i8,
record_min_max: bool,
record_sum: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let h = Arc::new(ExpoHistogram::new(
) -> AggregateFns<T> {
ExpoHistogram::new(
self.temporality,
self.filter.clone(),
max_size,
max_scale,
record_min_max,
record_sum,
));

(h.clone(), h)
)
.into()
}
}

Expand All @@ -221,7 +206,7 @@ mod tests {

#[test]
fn last_value_aggregation() {
let (measure, agg) =
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value();
let mut a = Gauge {
data_points: vec![GaugeDataPoint {
Expand All @@ -235,7 +220,7 @@ mod tests {
let new_attributes = [KeyValue::new("b", 2)];
measure.call(2, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));
let (count, new_agg) = collect.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
Expand All @@ -247,7 +232,7 @@ mod tests {
#[test]
fn precomputed_sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) =
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
let mut a = Sum {
data_points: vec![
Expand All @@ -274,7 +259,7 @@ mod tests {
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));
let (count, new_agg) = collect.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
Expand All @@ -289,7 +274,8 @@ mod tests {
#[test]
fn sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None).sum(true);
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None).sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
Expand All @@ -315,7 +301,7 @@ mod tests {
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));
let (count, new_agg) = collect.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
Expand All @@ -330,7 +316,7 @@ mod tests {
#[test]
fn explicit_bucket_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
.explicit_bucket_histogram(vec![1.0], true, true);
let mut a = Histogram {
data_points: vec![HistogramDataPoint {
Expand All @@ -354,7 +340,7 @@ mod tests {
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));
let (count, new_agg) = collect.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
Expand All @@ -373,7 +359,7 @@ mod tests {
#[test]
fn exponential_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
.exponential_bucket_histogram(4, 20, true, true);
let mut a = ExponentialHistogram {
data_points: vec![ExponentialHistogramDataPoint {
Expand Down Expand Up @@ -406,7 +392,7 @@ mod tests {
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));
let (count, new_agg) = collect.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
Expand Down
Loading

0 comments on commit c643683

Please sign in to comment.