From 0a2db7e4f3db68771aa95c629c0badfc792063dd Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Sun, 16 Aug 2020 13:07:34 -0700 Subject: [PATCH] Metrics refactor * Rename `global::handle` to `global::handle_error` * Fix race condition in env resource detector test * Split `LockedProcessor` into `Checkpointer` * Update for dimensionality reduction --- opentelemetry-otlp/src/proto/common.rs | 4 +- opentelemetry-otlp/src/proto/metrics.rs | 4 +- .../src/proto/metrics_service.rs | 4 +- opentelemetry-otlp/src/proto/resource.rs | 4 +- opentelemetry-otlp/src/proto/trace.rs | 4 +- opentelemetry-otlp/src/proto/trace_config.rs | 4 +- opentelemetry-otlp/src/proto/trace_service.rs | 4 +- opentelemetry-prometheus/src/lib.rs | 7 +- src/global/error_handler.rs | 4 +- src/global/mod.rs | 2 +- src/sdk/env.rs | 46 +++--- src/sdk/export/metrics/mod.rs | 31 ++-- src/sdk/metrics/controllers/pull.rs | 32 +++- src/sdk/metrics/controllers/push.rs | 14 +- src/sdk/metrics/mod.rs | 12 +- src/sdk/metrics/processors/basic.rs | 151 +++++++----------- src/sdk/mod.rs | 4 +- src/sdk/resource.rs | 36 +++-- 18 files changed, 190 insertions(+), 177 deletions(-) diff --git a/opentelemetry-otlp/src/proto/common.rs b/opentelemetry-otlp/src/proto/common.rs index 96f96774ba..27689afb53 100644 --- a/opentelemetry-otlp/src/proto/common.rs +++ b/opentelemetry-otlp/src/proto/common.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.16.2. Do not edit +// This file is generated by rust-protobuf 2.17.0. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-otlp/src/proto/metrics.rs b/opentelemetry-otlp/src/proto/metrics.rs index ecfead15ee..459f92a876 100644 --- a/opentelemetry-otlp/src/proto/metrics.rs +++ b/opentelemetry-otlp/src/proto/metrics.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.16.2. Do not edit +// This file is generated by rust-protobuf 2.17.0. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-otlp/src/proto/metrics_service.rs b/opentelemetry-otlp/src/proto/metrics_service.rs index 406d790bdd..ba71d16e07 100644 --- a/opentelemetry-otlp/src/proto/metrics_service.rs +++ b/opentelemetry-otlp/src/proto/metrics_service.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.16.2. Do not edit +// This file is generated by rust-protobuf 2.17.0. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-otlp/src/proto/resource.rs b/opentelemetry-otlp/src/proto/resource.rs index 62da18efe3..5b0ee3c19b 100644 --- a/opentelemetry-otlp/src/proto/resource.rs +++ b/opentelemetry-otlp/src/proto/resource.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.16.2. Do not edit +// This file is generated by rust-protobuf 2.17.0. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-otlp/src/proto/trace.rs b/opentelemetry-otlp/src/proto/trace.rs index d29223267e..41881dce69 100644 --- a/opentelemetry-otlp/src/proto/trace.rs +++ b/opentelemetry-otlp/src/proto/trace.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.16.2. Do not edit +// This file is generated by rust-protobuf 2.17.0. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-otlp/src/proto/trace_config.rs b/opentelemetry-otlp/src/proto/trace_config.rs index 7a62243e51..cf86d76f0c 100644 --- a/opentelemetry-otlp/src/proto/trace_config.rs +++ b/opentelemetry-otlp/src/proto/trace_config.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.16.2. Do not edit +// This file is generated by rust-protobuf 2.17.0. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-otlp/src/proto/trace_service.rs b/opentelemetry-otlp/src/proto/trace_service.rs index 8b9e50b85c..e875f588b4 100644 --- a/opentelemetry-otlp/src/proto/trace_service.rs +++ b/opentelemetry-otlp/src/proto/trace_service.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.16.2. Do not edit +// This file is generated by rust-protobuf 2.17.0. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] diff --git a/opentelemetry-prometheus/src/lib.rs b/opentelemetry-prometheus/src/lib.rs index ee768a559d..05fc9e5ab5 100644 --- a/opentelemetry-prometheus/src/lib.rs +++ b/opentelemetry-prometheus/src/lib.rs @@ -170,7 +170,8 @@ impl ExporterBuilder { .unwrap_or_else(|| vec![0.5, 0.9, 0.99]); let selector = Box::new(Selector::Histogram(default_histogram_boundaries.clone())); let mut controller_builder = controllers::pull(selector, Box::new(EXPORT_KIND)) - .with_cache_period(self.cache_period.unwrap_or(DEFAULT_CACHE_PERIOD)); + .with_cache_period(self.cache_period.unwrap_or(DEFAULT_CACHE_PERIOD)) + .with_memory(true); if let Some(resource) = self.resource { controller_builder = controller_builder.with_resource(resource); } @@ -279,7 +280,7 @@ impl prometheus::core::Collector for Collector { let mut metrics = Vec::new(); if let Err(err) = controller.collect() { - global::handle(err); + global::handle_error(err); return metrics; } @@ -301,7 +302,7 @@ impl prometheus::core::Collector for Collector { Ok(()) }) { - global::handle(err); + global::handle_error(err); } metrics diff --git a/src/global/error_handler.rs b/src/global/error_handler.rs index 3adb49ff15..14e7c76225 100644 --- a/src/global/error_handler.rs +++ b/src/global/error_handler.rs @@ -11,7 +11,7 @@ struct ErrorHandler(Box); /// Handle error using the globally configured error handler. /// /// Writes to stderr if unset. -pub fn handle(err: MetricsError) { +pub fn handle_error(err: MetricsError) { match GLOBAL_ERROR_HANDLER.read() { Ok(handler) if handler.is_some() => (handler.as_ref().unwrap().0)(err), _ => eprintln!("OpenTelemetry metrics error occurred {:?}", err), @@ -19,7 +19,7 @@ pub fn handle(err: MetricsError) { } /// Set global error handler. -pub fn set_handler(f: F) -> Result<()> +pub fn set_error_handler(f: F) -> Result<()> where F: Fn(MetricsError) + Send + Sync + 'static, { diff --git a/src/global/mod.rs b/src/global/mod.rs index d4836a22d0..0dbf16036f 100644 --- a/src/global/mod.rs +++ b/src/global/mod.rs @@ -78,7 +78,7 @@ mod propagation; mod trace; #[cfg(feature = "metrics")] -pub use error_handler::{handle, set_handler}; +pub use error_handler::{handle_error, set_error_handler}; #[cfg(feature = "metrics")] pub use metrics::{meter, meter_provider, set_meter_provider}; #[cfg(feature = "trace")] diff --git a/src/sdk/env.rs b/src/sdk/env.rs index b7b468702f..1ef63c6dce 100644 --- a/src/sdk/env.rs +++ b/src/sdk/env.rs @@ -1,13 +1,14 @@ //! EnvResourceDetector //! -//! Implementation of `ResourceDetector` to extract a `Resource` from environment variables. +//! Implementation of `ResourceDetector` to extract a `Resource` from environment +//! variables. +use crate::api::{Key, KeyValue, Value}; use crate::sdk::resource::ResourceDetector; use crate::sdk::Resource; -use crate::api::{KeyValue, Key, Value}; use std::env; use std::time::Duration; -const OTEL_RESOURCE_ATTRIBUTES: &str = "OTEL_RESOURCE_ATTRIBUTES"; +static OTEL_RESOURCE_ATTRIBUTES: &str = "OTEL_RESOURCE_ATTRIBUTES"; /// Resource detector implements ResourceDetector and is used to extract /// general SDK configuration from environment. @@ -32,9 +33,7 @@ impl ResourceDetector for EnvResourceDetector { impl EnvResourceDetector { /// Create `EnvResourceDetector` instance. pub fn new() -> Self { - EnvResourceDetector { - _private: (), - } + EnvResourceDetector { _private: () } } } @@ -44,7 +43,8 @@ impl Default for EnvResourceDetector { } } -/// Extract key value pairs and construct a resource from resources string like key1=value1,key2=value2,... +/// Extract key value pairs and construct a resource from resources string like +/// key1=value1,key2=value2,... fn construct_otel_resources(s: String) -> Resource { let mut key_values = vec![]; for entries in s.split_terminator(',') { @@ -64,11 +64,11 @@ fn construct_otel_resources(s: String) -> Resource { #[cfg(test)] mod tests { - use std::{env, time}; + use crate::api::{Key, KeyValue, Value}; use crate::sdk::env::OTEL_RESOURCE_ATTRIBUTES; - use crate::sdk::EnvResourceDetector; use crate::sdk::resource::{Resource, ResourceDetector}; - use crate::api::{KeyValue, Key, Value}; + use crate::sdk::EnvResourceDetector; + use std::{env, time}; #[test] fn test_read_from_env() { @@ -77,20 +77,24 @@ mod tests { let detector = EnvResourceDetector::new(); let resource = detector.detect(time::Duration::from_secs(5)); - assert_eq!(resource, Resource::new(vec![ - KeyValue::new(Key::new("key".to_string()), Value::String("value".to_string())), - KeyValue::new(Key::new("k".to_string()), Value::String("v".to_string())), - KeyValue::new(Key::new("a".to_string()), Value::String("x".to_string())), - KeyValue::new(Key::new("a".to_string()), Value::String("z".to_string())) - ])) - } + assert_eq!( + resource, + Resource::new(vec![ + KeyValue::new( + Key::new("key".to_string()), + Value::String("value".to_string()) + ), + KeyValue::new(Key::new("k".to_string()), Value::String("v".to_string())), + KeyValue::new(Key::new("a".to_string()), Value::String("x".to_string())), + KeyValue::new(Key::new("a".to_string()), Value::String("z".to_string())) + ]) + ); - #[test] - fn test_empty() { + // Test this case in same test to avoid race condition when running tests in parallel. env::set_var(OTEL_RESOURCE_ATTRIBUTES, ""); let detector = EnvResourceDetector::new(); let resource = detector.detect(time::Duration::from_secs(5)); - assert!(resource.is_empty()) + assert!(resource.is_empty()); } -} \ No newline at end of file +} diff --git a/src/sdk/export/metrics/mod.rs b/src/sdk/export/metrics/mod.rs index b8c9dc3d00..8ba81eab63 100644 --- a/src/sdk/export/metrics/mod.rs +++ b/src/sdk/export/metrics/mod.rs @@ -60,18 +60,6 @@ pub trait LockedProcessor { /// The Context argument originates from the controller that orchestrates /// collection. fn process(&mut self, accumulation: Accumulation) -> Result<()>; - - /// Allows a controller to access a complete checkpoint of aggregated metrics - /// from the `Processor`. This is passed to the Exporter which may then iterate - /// over the collection of aggregated metrics. - fn checkpoint_set(&mut self) -> &mut dyn CheckpointSet; - - /// Logic to be run at the start of an integration cycle. - fn start_collection(&mut self); - - /// Cleanup logic or other behavior that needs to be run by the processor after - /// collection is complete. - fn finish_collection(&mut self) -> Result<()>; } /// AggregatorSelector supports selecting the kind of `Aggregator` to use at @@ -90,6 +78,25 @@ pub trait AggregatorSelector: fmt::Debug { fn aggregator_for(&self, descriptor: &Descriptor) -> Option>; } +/// The interface used by a `Controller` to coordinate the `Processor` with +/// `Accumulator`(s) and `Exporter`(s). The `start_collection` and +/// `finish_collection` methods start and finish a collection interval. +/// `Controller`s call the `Accumulator`(s) during collection to process +/// `Accumulation`s. +pub trait Checkpointer: LockedProcessor { + /// A checkpoint of the current data set. This may be called before and after + /// collection. The implementation is required to return the same value + /// throughout its lifetime. + fn checkpoint_set(&mut self) -> &mut dyn CheckpointSet; + + /// Logic to be run at the start of a collection interval. + fn start_collection(&mut self); + + /// Cleanup logic or other behavior that needs to be run after a collection + /// interval is complete. + fn finish_collection(&mut self) -> Result<()>; +} + /// Aggregator implements a specific aggregation behavior, i.e., a behavior to /// track a sequence of updates to an instrument. Sum-only instruments commonly /// use a simple Sum aggregator, but for the distribution instruments diff --git a/src/sdk/metrics/controllers/pull.rs b/src/sdk/metrics/controllers/pull.rs index 9a6ecb2d81..d049f2a063 100644 --- a/src/sdk/metrics/controllers/pull.rs +++ b/src/sdk/metrics/controllers/pull.rs @@ -1,7 +1,7 @@ use crate::api::metrics::{registry, Result}; use crate::sdk::{ export::metrics::{ - AggregatorSelector, CheckpointSet, ExportKindSelector, LockedProcessor, Record, + AggregatorSelector, CheckpointSet, Checkpointer, ExportKindSelector, Record, }, metrics::{ accumulator, @@ -23,7 +23,11 @@ pub fn pull( PullControllerBuilder::with_selectors(aggregator_selector, export_selector) } -/// Controller manages access to an `Accumulator` and `Processor`. +/// Pull controllers are typically used in an environment where there are +/// multiple readers. It is common, therefore, when configuring a +/// `BasicProcessor` for use with this controller, to use a +/// `ExportKind::Cumulative` strategy and the `with_memory(true)` builder +/// option, which ensures that every `CheckpointSet` includes full state. #[derive(Debug)] pub struct PullController { accumulator: Accumulator, @@ -47,10 +51,10 @@ impl PullController { .map_or(true, |elapsed| elapsed > self.period) { self.last_collect = SystemTime::now(); - self.processor.lock().and_then(|mut locked_processor| { - locked_processor.start_collection(); - self.accumulator.0.collect(&mut locked_processor); - locked_processor.finish_collection() + self.processor.lock().and_then(|mut checkpointer| { + checkpointer.start_collection(); + self.accumulator.0.collect(&mut checkpointer); + checkpointer.finish_collection() }) } else { Ok(()) @@ -91,6 +95,12 @@ pub struct PullControllerBuilder { /// If the period is zero, caching of the result is disabled. The default value /// is 10 seconds. cache_period: Option, + + /// Memory controls whether the controller's processor remembers metric + /// instruments and label sets that were previously reported. When memory is + /// `true`, `CheckpointSet::try_for_each` will visit metrics that were not + /// updated in the most recent interval. Default true. + memory: bool, } impl PullControllerBuilder { @@ -104,6 +114,7 @@ impl PullControllerBuilder { export_selector, resource: None, cache_period: None, + memory: true, } } @@ -123,12 +134,19 @@ impl PullControllerBuilder { } } + /// Sets the memory behavior of the controller's `Processor`. If this is + /// `true`, the processor will report metric instruments and label sets that + /// were previously reported but not updated in the most recent interval. + pub fn with_memory(self, memory: bool) -> Self { + PullControllerBuilder { memory, ..self } + } + /// Build a new `PullController` from the current configuration. pub fn build(self) -> PullController { let processor = Arc::new(processors::basic( self.aggregator_selector, self.export_selector, - true, + self.memory, )); let accumulator = accumulator(processor.clone()) diff --git a/src/sdk/metrics/controllers/push.rs b/src/sdk/metrics/controllers/push.rs index 62de660622..f6418603db 100644 --- a/src/sdk/metrics/controllers/push.rs +++ b/src/sdk/metrics/controllers/push.rs @@ -1,7 +1,7 @@ use crate::api::metrics::registry; use crate::global; use crate::sdk::{ - export::metrics::{AggregatorSelector, ExportKindSelector, Exporter, LockedProcessor}, + export::metrics::{AggregatorSelector, Checkpointer, ExportKindSelector, Exporter}, metrics::{ self, processors::{self, BasicProcessor}, @@ -74,13 +74,13 @@ impl PushControllerWorker { fn on_tick(&mut self) { // TODO handle timeout - if let Err(err) = self.processor.lock().and_then(|mut locked_processor| { - locked_processor.start_collection(); - self.accumulator.0.collect(&mut locked_processor); - locked_processor.finish_collection()?; - self.exporter.export(locked_processor.checkpoint_set()) + if let Err(err) = self.processor.lock().and_then(|mut checkpointer| { + checkpointer.start_collection(); + self.accumulator.0.collect(&mut checkpointer); + checkpointer.finish_collection()?; + self.exporter.export(checkpointer.checkpoint_set()) }) { - global::handle(err) + global::handle_error(err) } } } diff --git a/src/sdk/metrics/mod.rs b/src/sdk/metrics/mod.rs index 6b82a28b35..98c6dbd88d 100644 --- a/src/sdk/metrics/mod.rs +++ b/src/sdk/metrics/mod.rs @@ -215,7 +215,7 @@ impl AccumulatorCore { if let (Some(current), Some(checkpoint)) = (&record.current, &record.checkpoint) { if let Err(err) = current.synchronized_move(checkpoint, record.instrument.descriptor()) { - global::handle(err); + global::handle_error(err); return 0; } @@ -227,7 +227,7 @@ impl AccumulatorCore { &checkpoint, ); if let Err(err) = locked_processor.process(accumulation) { - global::handle(err); + global::handle_error(err); } 1 @@ -261,7 +261,7 @@ impl AccumulatorCore { ); if let Err(err) = locked_processor.process(accumulation) { - global::handle(err); + global::handle_error(err); } checkpointed += 1; } @@ -369,11 +369,11 @@ struct AsyncInstrument { impl AsyncInstrument { fn observe(&self, number: &Number, labels: &LabelSet) { if let Err(err) = aggregators::range_test(number, &self.instrument.descriptor) { - global::handle(err); + global::handle_error(err); } if let Some(recorder) = self.get_recorder(labels) { if let Err(err) = recorder.update(number, &self.instrument.descriptor) { - global::handle(err) + global::handle_error(err) } } } @@ -488,7 +488,7 @@ impl sdk_api::SyncBoundInstrumentCore for Record { aggregators::range_test(&number, &self.instrument.instrument.descriptor) .and_then(|_| recorder.update(&number, &self.instrument.instrument.descriptor)) { - global::handle(err); + global::handle_error(err); return; } diff --git a/src/sdk/metrics/processors/basic.rs b/src/sdk/metrics/processors/basic.rs index 87606d99b5..16df535424 100644 --- a/src/sdk/metrics/processors/basic.rs +++ b/src/sdk/metrics/processors/basic.rs @@ -4,8 +4,8 @@ use crate::api::{ }; use crate::sdk::{ export::metrics::{ - self, Accumulation, Aggregator, AggregatorSelector, CheckpointSet, ExportKind, - ExportKindSelector, LockedProcessor, Processor, Record, Subtractor, + self, Accumulation, Aggregator, AggregatorSelector, CheckpointSet, Checkpointer, + ExportKind, ExportKindSelector, LockedProcessor, Processor, Record, Subtractor, }, metrics::aggregators::SumAggregator, Resource, @@ -107,72 +107,35 @@ impl<'a> LockedProcessor for BasicLockedProcessor<'a> { // Case (b) occurs when the variable `sameCollection` is true, // indicating that the stateKey for Accumulation has already // been seen in the same collection. When this happens, it - // implies that multiple Accumulators are being used because - // the Accumulator outputs a maximum of one Accumulation per - // instrument and label set. - // - // The following logic distinguishes between asynchronous and - // synchronous instruments in order to ensure that the use of - // multiple Accumulators does not change instrument semantics. - // To maintain the instrument semantics, multiple synchronous - // Accumulations should be merged, whereas when multiple - // asynchronous Accumulations are processed, the last value - // should be kept. + // implies that multiple Accumulators are being used, or that + // a single Accumulator has been configured with a label key + // filter. if !same_collection { - // This is the first Accumulation we've seen for this - // stateKey during this collection. Just keep a - // reference to the Accumulator's Aggregator. - value.current = agg.clone(); - return Ok(()); - } - if desc.instrument_kind().asynchronous() { - // The last value across multiple accumulators is taken. - // Just keep a reference to the Accumulator's Aggregator. - value.current = agg.clone(); - return Ok(()); + if !value.current_owned { + // This is the first Accumulation we've seen for this + // stateKey during this collection. Just keep a + // reference to the Accumulator's Aggregator. All the other cases + // copy Aggregator state. + value.current = agg.clone(); + return Ok(()); + } + return agg.synchronized_move(&value.current, desc); } - // The above two cases are keeping a reference to the - // Accumulator's Aggregator. The remaining cases address - // synchronous instruments, which always merge multiple - // Accumulations using `value.delta` for temporary storage. - - if value.delta.is_none() { - // The temporary `value.delta` may have been allocated - // already, either in a prior pass through this block of - // code or in the `!ok` branch above. It would be - // allocated in the `!ok` branch if this is stateful - // PrecomputedSum instrument (in which case the exporter - // is requesting a delta so we allocate it up front), - // and it would be allocated in this block when multiple - // accumulators are used and the first condition is not - // met. - value.delta = self.parent.aggregation_selector().aggregator_for(desc); - } - if Some(value.current.as_any().type_id()) - != value.delta.as_ref().map(|d| d.as_any().type_id()) - { - // If the current and delta Aggregators are not the same it - // implies that multiple Accumulators were used. The first - // Accumulation seen for a given stateKey will return in - // one of the cases above after assigning `value.current - // = agg` (i.e., after taking a reference to the - // Accumulator's Aggregator). - // - // The second time through this branch copies the - // Accumulator's Aggregator into `value.delta` and sets - // `value.current` appropriately to avoid this branch if - // a third Accumulator is used. - value - .current - .synchronized_move(value.delta.as_ref().unwrap(), desc)?; - value.current = value.delta.clone().unwrap(); + // If the current is not owned, take ownership of a copy + // before merging below. + if !value.current_owned { + let tmp = value.current.clone(); + if let Some(current) = self.parent.aggregation_selector().aggregator_for(desc) { + value.current = current; + value.current_owned = true; + tmp.synchronized_move(&value.current, &desc)?; + } } - // The two statements above ensures that `value.current` refers - // to `value.delta` and not to an Accumulator's Aggregator. Now - // combine this Accumulation with the prior Accumulation. - return value.delta.as_ref().unwrap().merge(agg.as_ref(), desc); + + // Combine this `Accumulation` with the prior `Accumulation`. + return value.current.merge(agg.as_ref(), desc); } let stateful = self @@ -184,13 +147,10 @@ impl<'a> LockedProcessor for BasicLockedProcessor<'a> { let mut delta = None; let cumulative = if stateful { if desc.instrument_kind().precomputed_sum() { - // If we know we need to compute deltas, allocate two aggregators. + // If we know we need to compute deltas, allocate one. delta = self.parent.aggregation_selector().aggregator_for(desc); } - // In this case we are not certain to need a delta, only allocate a - // cumulative aggregator. We _may_ need a delta accumulator if - // multiple synchronous Accumulators produce an Accumulation (handled - // below), which requires merging them into a temporary Aggregator. + // Always allocate a cumulative aggregator if stateful self.parent.aggregation_selector().aggregator_for(desc) } else { None @@ -199,20 +159,23 @@ impl<'a> LockedProcessor for BasicLockedProcessor<'a> { self.state.values.insert( key, StateValue { + descriptor: desc.clone(), + labels: accumulation.labels().clone(), + resource: accumulation.resource().clone(), + current_owned: false, current: agg.clone(), delta, cumulative, stateful, updated: finished_collection, - descriptor: desc.clone(), - labels: accumulation.labels().clone(), - resource: accumulation.resource().clone(), }, ); Ok(()) } +} +impl Checkpointer for BasicLockedProcessor<'_> { fn checkpoint_set(&mut self) -> &mut dyn CheckpointSet { &mut *self.state } @@ -418,6 +381,15 @@ struct StateKey(u64); #[derive(Debug)] struct StateValue { + /// Instrument descriptor + descriptor: Descriptor, + + /// Instrument labels + labels: LabelSet, + + /// Resource that created the instrument + resource: Resource, + /// Indicates the last sequence number when this value had process called by an /// accumulator. updated: u64, @@ -426,30 +398,25 @@ struct StateValue { /// process start time. stateful: bool, - // TODO: as seen in lengthy comments below, both the `current` and `delta` - // fields have multiple uses depending on the specific configuration of - // instrument, exporter, and accumulator. It is possible to simplify this - // situation by declaring explicit fields that are not used with a dual purpose. - // Improve this situation? - // - // 1. "delta" is used to combine deltas from multiple accumulators, and it is - // also used to store the output of subtraction when computing deltas of - // PrecomputedSum instruments. - // - // 2. "current" either refers to the Aggregator passed to process() by a single - // accumulator (when either there is just one Accumulator, or the instrument - // is Asynchronous), or it refers to "delta", depending on configuration. - // - /// Refers to single-accumulator checkpoint or delta. + /// Indicates that "current" was allocated + /// by the processor in order to merge results from + /// multiple `Accumulator`s during a single collection + /// round, which may happen either because: + /// + /// (1) multiple `Accumulator`s output the same `Accumulation. + /// (2) one `Accumulator` is configured with dimensionality reduction. + current_owned: bool, + + /// The output from a single `Accumulator` (if !current_owned) or an + /// `Aggregator` owned by the processor used to accumulate multiple values in a + /// single collection round. current: Arc, - /// Owned if multi accumulator else `None`. + /// If `Some`, refers to an `Aggregator` owned by the processor used to compute + /// deltas between precomputed sums. delta: Option>, - /// Owned if stateful else `None`. + /// If `Some`, refers to an `Aggregator` owned by the processor used to store + /// the last cumulative value. cumulative: Option>, - - descriptor: Descriptor, - labels: LabelSet, - resource: Resource, } diff --git a/src/sdk/mod.rs b/src/sdk/mod.rs index 3c5dfdba8d..b4832681cb 100644 --- a/src/sdk/mod.rs +++ b/src/sdk/mod.rs @@ -6,16 +6,16 @@ //! facilitates the delivery of telemetry data to storage systems //! through `Exporter`s. These can be configured on `Tracer` and //! `Meter` creation. +pub mod env; pub mod export; #[cfg(feature = "metrics")] pub mod metrics; pub mod resource; -pub mod env; #[cfg(feature = "trace")] pub mod trace; -pub use resource::Resource; pub use env::EnvResourceDetector; +pub use resource::Resource; #[cfg(feature = "trace")] pub use trace::{ config::Config, diff --git a/src/sdk/resource.rs b/src/sdk/resource.rs index 59bed0a7fc..5c2fc57005 100644 --- a/src/sdk/resource.rs +++ b/src/sdk/resource.rs @@ -15,10 +15,10 @@ //! [`Provider`]: ../../api/trace/provider/trait.Provider.html use crate::api; use crate::api::labels; +use crate::api::KeyValue; #[cfg(feature = "serialize")] use serde::{Deserialize, Serialize}; use std::collections::{btree_map, btree_map::Entry, BTreeMap}; -use crate::api::KeyValue; use std::time::Duration; /// Describes an entity about which identifying information and metadata is exposed. @@ -35,7 +35,7 @@ impl Resource { /// /// Values are de-duplicated by key, and the first key-value pair with a non-empty string value /// will be retained - pub fn new>(kvs: T) -> Self { + pub fn new>(kvs: T) -> Self { let mut resource = Resource::default(); for kv in kvs.into_iter() { @@ -190,9 +190,9 @@ pub trait ResourceDetector { mod tests { use super::Resource; use crate::api; + use crate::sdk::EnvResourceDetector; use std::collections::BTreeMap; use std::{env, time}; - use crate::sdk::EnvResourceDetector; #[test] fn new_resource() { @@ -243,12 +243,28 @@ mod tests { env::set_var("irrelevant".to_uppercase(), "20200810"); let detector = EnvResourceDetector::new(); - let resource = Resource::from_detectors(time::Duration::from_secs(5), vec![Box::new(detector)]); - assert_eq!(resource, Resource::new(vec![ - api::KeyValue::new(api::Key::new("key".to_string()), api::Value::String("value".to_string())), - api::KeyValue::new(api::Key::new("k".to_string()), api::Value::String("v".to_string())), - api::KeyValue::new(api::Key::new("a".to_string()), api::Value::String("x".to_string())), - api::KeyValue::new(api::Key::new("a".to_string()), api::Value::String("z".to_string())) - ])) + let resource = + Resource::from_detectors(time::Duration::from_secs(5), vec![Box::new(detector)]); + assert_eq!( + resource, + Resource::new(vec![ + api::KeyValue::new( + api::Key::new("key".to_string()), + api::Value::String("value".to_string()) + ), + api::KeyValue::new( + api::Key::new("k".to_string()), + api::Value::String("v".to_string()) + ), + api::KeyValue::new( + api::Key::new("a".to_string()), + api::Value::String("x".to_string()) + ), + api::KeyValue::new( + api::Key::new("a".to_string()), + api::Value::String("z".to_string()) + ) + ]) + ) } }