From 4761fcc6294ddf253482344b2553095cc00a7e58 Mon Sep 17 00:00:00 2001 From: Matthew Boddewyn <31598686+mattbodd@users.noreply.github.com> Date: Wed, 25 Sep 2024 11:02:04 -0700 Subject: [PATCH] Emit metric points as unique ETW events (#111) --- opentelemetry-etw-metrics/src/exporter/mod.rs | 433 +++++++++++++++++- 1 file changed, 408 insertions(+), 25 deletions(-) diff --git a/opentelemetry-etw-metrics/src/exporter/mod.rs b/opentelemetry-etw-metrics/src/exporter/mod.rs index 84083d91..14d8bc59 100644 --- a/opentelemetry-etw-metrics/src/exporter/mod.rs +++ b/opentelemetry-etw-metrics/src/exporter/mod.rs @@ -4,7 +4,10 @@ use opentelemetry::{ }; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_sdk::metrics::{ - data::{ResourceMetrics, Temporality}, + data::{ + self, ExponentialBucket, ExponentialHistogramDataPoint, Metric, ResourceMetrics, + ScopeMetrics, Temporality, + }, exporter::PushMetricsExporter, reader::{AggregationSelector, DefaultAggregationSelector, TemporalitySelector}, Aggregation, InstrumentKind, @@ -63,32 +66,278 @@ impl Debug for MetricsExporter { #[async_trait] impl PushMetricsExporter for MetricsExporter { async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> { - let proto_message: ExportMetricsServiceRequest = (&*metrics).into(); - - let mut byte_array = Vec::new(); - proto_message - .encode(&mut byte_array) - .map_err(|err| MetricsError::Other(err.to_string()))?; - - if (byte_array.len()) > etw::MAX_EVENT_SIZE { - global::handle_error(MetricsError::Other(format!( - "Exporting failed due to event size {} exceeding the maximum size of {} bytes", - byte_array.len(), - etw::MAX_EVENT_SIZE - ))); - } else { - let result = etw::write(&byte_array); - // TODO: Better logging/internal metrics needed here for non-failure - // case Uncomment the line below to see the exported bytes until a - // better logging solution is implemented - // println!("Exported {} bytes to ETW", byte_array.len()); - if result != 0 { - global::handle_error(MetricsError::Other(format!( - "Failed to write ETW event with error code: {}", - result - ))); + for scope_metric in &metrics.scope_metrics { + for metric in &scope_metric.metrics { + let mut resource_metrics = Vec::new(); + + let data = &metric.data.as_any(); + if let Some(hist) = data.downcast_ref::>() { + for data_point in &hist.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Histogram { + temporality: hist.temporality, + data_points: vec![data_point.clone()], + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else if let Some(hist) = data.downcast_ref::>() { + for data_point in &hist.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Histogram { + temporality: hist.temporality, + data_points: vec![data_point.clone()], + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else if let Some(hist) = data.downcast_ref::>() { + for data_point in &hist.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::ExponentialHistogram { + temporality: hist.temporality, + data_points: vec![ExponentialHistogramDataPoint { + attributes: data_point.attributes.clone(), + count: data_point.count, + start_time: data_point.start_time, + time: data_point.time, + min: data_point.min, + max: data_point.max, + sum: data_point.sum, + scale: data_point.scale, + zero_count: data_point.zero_count, + zero_threshold: data_point.zero_threshold, + positive_bucket: ExponentialBucket { + offset: data_point.positive_bucket.offset, + counts: data_point.positive_bucket.counts.clone(), + }, + negative_bucket: ExponentialBucket { + offset: data_point.negative_bucket.offset, + counts: data_point.negative_bucket.counts.clone(), + }, + exemplars: data_point.exemplars.clone(), + }], + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else if let Some(hist) = data.downcast_ref::>() { + for data_point in &hist.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::ExponentialHistogram { + temporality: hist.temporality, + data_points: vec![ExponentialHistogramDataPoint { + attributes: data_point.attributes.clone(), + count: data_point.count, + start_time: data_point.start_time, + time: data_point.time, + min: data_point.min, + max: data_point.max, + sum: data_point.sum, + scale: data_point.scale, + zero_count: data_point.zero_count, + zero_threshold: data_point.zero_threshold, + positive_bucket: ExponentialBucket { + offset: data_point.positive_bucket.offset, + counts: data_point.positive_bucket.counts.clone(), + }, + negative_bucket: ExponentialBucket { + offset: data_point.negative_bucket.offset, + counts: data_point.negative_bucket.counts.clone(), + }, + exemplars: data_point.exemplars.clone(), + }], + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else if let Some(sum) = data.downcast_ref::>() { + for data_point in &sum.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Sum { + temporality: sum.temporality, + data_points: vec![data_point.clone()], + is_monotonic: sum.is_monotonic, + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else if let Some(sum) = data.downcast_ref::>() { + for data_point in &sum.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Sum { + temporality: sum.temporality, + data_points: vec![data_point.clone()], + is_monotonic: sum.is_monotonic, + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else if let Some(sum) = data.downcast_ref::>() { + for data_point in &sum.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Sum { + temporality: sum.temporality, + data_points: vec![data_point.clone()], + is_monotonic: sum.is_monotonic, + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else if let Some(gauge) = data.downcast_ref::>() { + for data_point in &gauge.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Gauge { + data_points: vec![data_point.clone()], + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else if let Some(gauge) = data.downcast_ref::>() { + for data_point in &gauge.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Gauge { + data_points: vec![data_point.clone()], + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else if let Some(gauge) = data.downcast_ref::>() { + for data_point in &gauge.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Gauge { + data_points: vec![data_point.clone()], + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else { + global::handle_error(MetricsError::Other(format!( + "Unsupported aggregation type: {:?}", + data + ))); + } + + for resource_metric in resource_metrics { + let mut byte_array = Vec::new(); + let proto_message: ExportMetricsServiceRequest = (&resource_metric).into(); + proto_message + .encode(&mut byte_array) + .map_err(|err| MetricsError::Other(err.to_string()))?; + + if (byte_array.len()) > etw::MAX_EVENT_SIZE { + global::handle_error(MetricsError::Other(format!( + "Exporting failed due to event size {} exceeding the maximum size of {} bytes", + byte_array.len(), + etw::MAX_EVENT_SIZE + ))); + } else { + let result = etw::write(&byte_array); + // TODO: Better logging/internal metrics needed here for non-failure + // case Uncomment the line below to see the exported bytes until a + // better logging solution is implemented + // println!("Exported {} bytes to ETW", byte_array.len()); + if result != 0 { + global::handle_error(MetricsError::Other(format!( + "Failed to write ETW event with error code: {}", + result + ))); + } + } + } } } + Ok(()) } @@ -102,3 +351,137 @@ impl PushMetricsExporter for MetricsExporter { Ok(()) } } + +#[cfg(test)] +mod tests { + use opentelemetry::{metrics::MeterProvider as _, KeyValue}; + use opentelemetry_sdk::{ + metrics::{PeriodicReader, SdkMeterProvider}, + runtime, Resource, + }; + + use crate::etw; + + #[tokio::test(flavor = "multi_thread")] + async fn emit_metrics_that_combined_exceed_etw_max_event_size() { + let exporter = super::MetricsExporter::new(); + let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder() + .with_resource(Resource::new(vec![KeyValue::new( + "service.name", + "service-name", + )])) + .with_reader(reader) + .build(); + + let meter = meter_provider.meter("user-event-test"); + + let u64_histogram = meter + .u64_histogram("Testu64Histogram") + .with_description("u64_histogram_test_description") + .with_unit("u64_histogram_test_unit") + .init(); + + let f64_histogram = meter + .f64_histogram("TestHistogram") + .with_description("f64_histogram_test_description") + .with_unit("f64_histogram_test_unit") + .init(); + + let u64_counter = meter + .u64_counter("Testu64Counter") + .with_description("u64_counter_test_description") + .with_unit("u64_counter_test_units") + .init(); + + let f64_counter = meter + .f64_counter("Testf64Counter") + .with_description("f64_counter_test_description") + .with_unit("f64_counter_test_units") + .init(); + + let i64_counter = meter + .i64_up_down_counter("Testi64Counter") + .with_description("i64_counter_test_description") + .with_unit("i64_counter_test_units") + .init(); + + let u64_gauge = meter + .u64_gauge("Testu64Gauge") + .with_description("u64_gauge_test_description") + .with_unit("u64_gauge_test_unit") + .init(); + + let i64_gauge = meter + .i64_gauge("Testi64Gauge") + .with_description("i64_gauge_test_description") + .with_unit("i64_gauge_test_unit") + .init(); + + let f64_gauge = meter + .f64_gauge("Testf64Gauge") + .with_description("f64_gauge_test_description") + .with_unit("f64_gauge_test_unit") + .init(); + + // Create a key that is 1/10th the size of the MAX_EVENT_SIZE + let key_size = etw::MAX_EVENT_SIZE / 10; + let large_key = "a".repeat(key_size); + + for index in 0..11 { + u64_histogram.record( + 1, + [KeyValue::new(large_key.clone(), format!("{index}"))].as_ref(), + ); + } + + for index in 0..11 { + f64_histogram.record( + 1.0, + [KeyValue::new(large_key.clone(), format!("{index}"))].as_ref(), + ); + } + + for index in 0..11 { + u64_counter.add( + 1, + [KeyValue::new(large_key.clone(), format!("{index}"))].as_ref(), + ); + } + + for index in 0..11 { + f64_counter.add( + 1.0, + [KeyValue::new(large_key.clone(), format!("{index}"))].as_ref(), + ); + } + + for index in 0..11 { + i64_counter.add( + 1, + [KeyValue::new(large_key.clone(), format!("{index}"))].as_ref(), + ); + } + + for index in 0..11 { + u64_gauge.record( + 1, + [KeyValue::new(large_key.clone(), format!("{index}"))].as_ref(), + ); + } + + for index in 0..11 { + i64_gauge.record( + 1, + [KeyValue::new(large_key.clone(), format!("{index}"))].as_ref(), + ); + } + + for index in 0..11 { + f64_gauge.record( + 1.0, + [KeyValue::new(large_key.clone(), format!("{index}"))].as_ref(), + ); + } + } +}