Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics refactor #179

Merged
merged 1 commit into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.16.2. Do not edit
// This file is generated by rust-protobuf 2.17.0. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.16.2. Do not edit
// This file is generated by rust-protobuf 2.17.0. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/metrics_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.16.2. Do not edit
// This file is generated by rust-protobuf 2.17.0. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/resource.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.16.2. Do not edit
// This file is generated by rust-protobuf 2.17.0. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/trace.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.16.2. Do not edit
// This file is generated by rust-protobuf 2.17.0. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/trace_config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.16.2. Do not edit
// This file is generated by rust-protobuf 2.17.0. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/trace_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.16.2. Do not edit
// This file is generated by rust-protobuf 2.17.0. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_16_2;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_17_0;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))]
Expand Down
7 changes: 4 additions & 3 deletions opentelemetry-prometheus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ impl ExporterBuilder {
.unwrap_or_else(|| vec![0.5, 0.9, 0.99]);
let selector = Box::new(Selector::Histogram(default_histogram_boundaries.clone()));
let mut controller_builder = controllers::pull(selector, Box::new(EXPORT_KIND))
.with_cache_period(self.cache_period.unwrap_or(DEFAULT_CACHE_PERIOD));
.with_cache_period(self.cache_period.unwrap_or(DEFAULT_CACHE_PERIOD))
.with_memory(true);
if let Some(resource) = self.resource {
controller_builder = controller_builder.with_resource(resource);
}
Expand Down Expand Up @@ -279,7 +280,7 @@ impl prometheus::core::Collector for Collector {
let mut metrics = Vec::new();

if let Err(err) = controller.collect() {
global::handle(err);
global::handle_error(err);
return metrics;
}

Expand All @@ -301,7 +302,7 @@ impl prometheus::core::Collector for Collector {

Ok(())
}) {
global::handle(err);
global::handle_error(err);
}

metrics
Expand Down
4 changes: 2 additions & 2 deletions src/global/error_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ struct ErrorHandler(Box<dyn Fn(MetricsError) + Send + Sync>);
/// Handle error using the globally configured error handler.
///
/// Writes to stderr if unset.
pub fn handle(err: MetricsError) {
pub fn handle_error(err: MetricsError) {
match GLOBAL_ERROR_HANDLER.read() {
Ok(handler) if handler.is_some() => (handler.as_ref().unwrap().0)(err),
_ => eprintln!("OpenTelemetry metrics error occurred {:?}", err),
}
}

/// Set global error handler.
pub fn set_handler<F>(f: F) -> Result<()>
pub fn set_error_handler<F>(f: F) -> Result<()>
where
F: Fn(MetricsError) + Send + Sync + 'static,
{
Expand Down
2 changes: 1 addition & 1 deletion src/global/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ mod propagation;
mod trace;

#[cfg(feature = "metrics")]
pub use error_handler::{handle, set_handler};
pub use error_handler::{handle_error, set_error_handler};
#[cfg(feature = "metrics")]
pub use metrics::{meter, meter_provider, set_meter_provider};
#[cfg(feature = "trace")]
Expand Down
46 changes: 25 additions & 21 deletions src/sdk/env.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
//! EnvResourceDetector
//!
//! Implementation of `ResourceDetector` to extract a `Resource` from environment variables.
//! Implementation of `ResourceDetector` to extract a `Resource` from environment
//! variables.
use crate::api::{Key, KeyValue, Value};
use crate::sdk::resource::ResourceDetector;
use crate::sdk::Resource;
use crate::api::{KeyValue, Key, Value};
use std::env;
use std::time::Duration;

const OTEL_RESOURCE_ATTRIBUTES: &str = "OTEL_RESOURCE_ATTRIBUTES";
static OTEL_RESOURCE_ATTRIBUTES: &str = "OTEL_RESOURCE_ATTRIBUTES";

/// Resource detector implements ResourceDetector and is used to extract
/// general SDK configuration from environment.
Expand All @@ -32,9 +33,7 @@ impl ResourceDetector for EnvResourceDetector {
impl EnvResourceDetector {
/// Create `EnvResourceDetector` instance.
pub fn new() -> Self {
EnvResourceDetector {
_private: (),
}
EnvResourceDetector { _private: () }
}
}

Expand All @@ -44,7 +43,8 @@ impl Default for EnvResourceDetector {
}
}

/// Extract key value pairs and construct a resource from resources string like key1=value1,key2=value2,...
/// Extract key value pairs and construct a resource from resources string like
/// key1=value1,key2=value2,...
fn construct_otel_resources(s: String) -> Resource {
let mut key_values = vec![];
for entries in s.split_terminator(',') {
Expand All @@ -64,11 +64,11 @@ fn construct_otel_resources(s: String) -> Resource {

#[cfg(test)]
mod tests {
use std::{env, time};
use crate::api::{Key, KeyValue, Value};
use crate::sdk::env::OTEL_RESOURCE_ATTRIBUTES;
use crate::sdk::EnvResourceDetector;
use crate::sdk::resource::{Resource, ResourceDetector};
use crate::api::{KeyValue, Key, Value};
use crate::sdk::EnvResourceDetector;
use std::{env, time};

#[test]
fn test_read_from_env() {
Expand All @@ -77,20 +77,24 @@ mod tests {

let detector = EnvResourceDetector::new();
let resource = detector.detect(time::Duration::from_secs(5));
assert_eq!(resource, Resource::new(vec![
KeyValue::new(Key::new("key".to_string()), Value::String("value".to_string())),
KeyValue::new(Key::new("k".to_string()), Value::String("v".to_string())),
KeyValue::new(Key::new("a".to_string()), Value::String("x".to_string())),
KeyValue::new(Key::new("a".to_string()), Value::String("z".to_string()))
]))
}
assert_eq!(
resource,
Resource::new(vec![
KeyValue::new(
Key::new("key".to_string()),
Value::String("value".to_string())
),
KeyValue::new(Key::new("k".to_string()), Value::String("v".to_string())),
KeyValue::new(Key::new("a".to_string()), Value::String("x".to_string())),
KeyValue::new(Key::new("a".to_string()), Value::String("z".to_string()))
])
);

#[test]
fn test_empty() {
// Test this case in same test to avoid race condition when running tests in parallel.
env::set_var(OTEL_RESOURCE_ATTRIBUTES, "");

let detector = EnvResourceDetector::new();
let resource = detector.detect(time::Duration::from_secs(5));
assert!(resource.is_empty())
assert!(resource.is_empty());
}
}
}
31 changes: 19 additions & 12 deletions src/sdk/export/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,6 @@ pub trait LockedProcessor {
/// The Context argument originates from the controller that orchestrates
/// collection.
fn process(&mut self, accumulation: Accumulation) -> Result<()>;

/// Allows a controller to access a complete checkpoint of aggregated metrics
/// from the `Processor`. This is passed to the Exporter which may then iterate
/// over the collection of aggregated metrics.
fn checkpoint_set(&mut self) -> &mut dyn CheckpointSet;

/// Logic to be run at the start of an integration cycle.
fn start_collection(&mut self);

/// Cleanup logic or other behavior that needs to be run by the processor after
/// collection is complete.
fn finish_collection(&mut self) -> Result<()>;
}

/// AggregatorSelector supports selecting the kind of `Aggregator` to use at
Expand All @@ -90,6 +78,25 @@ pub trait AggregatorSelector: fmt::Debug {
fn aggregator_for(&self, descriptor: &Descriptor) -> Option<Arc<dyn Aggregator + Send + Sync>>;
}

/// The interface used by a `Controller` to coordinate the `Processor` with
/// `Accumulator`(s) and `Exporter`(s). The `start_collection` and
/// `finish_collection` methods start and finish a collection interval.
/// `Controller`s call the `Accumulator`(s) during collection to process
/// `Accumulation`s.
pub trait Checkpointer: LockedProcessor {
/// A checkpoint of the current data set. This may be called before and after
/// collection. The implementation is required to return the same value
/// throughout its lifetime.
fn checkpoint_set(&mut self) -> &mut dyn CheckpointSet;

/// Logic to be run at the start of a collection interval.
fn start_collection(&mut self);

/// Cleanup logic or other behavior that needs to be run after a collection
/// interval is complete.
fn finish_collection(&mut self) -> Result<()>;
}

/// Aggregator implements a specific aggregation behavior, i.e., a behavior to
/// track a sequence of updates to an instrument. Sum-only instruments commonly
/// use a simple Sum aggregator, but for the distribution instruments
Expand Down
32 changes: 25 additions & 7 deletions src/sdk/metrics/controllers/pull.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::api::metrics::{registry, Result};
use crate::sdk::{
export::metrics::{
AggregatorSelector, CheckpointSet, ExportKindSelector, LockedProcessor, Record,
AggregatorSelector, CheckpointSet, Checkpointer, ExportKindSelector, Record,
},
metrics::{
accumulator,
Expand All @@ -23,7 +23,11 @@ pub fn pull(
PullControllerBuilder::with_selectors(aggregator_selector, export_selector)
}

/// Controller manages access to an `Accumulator` and `Processor`.
/// Pull controllers are typically used in an environment where there are
/// multiple readers. It is common, therefore, when configuring a
/// `BasicProcessor` for use with this controller, to use a
/// `ExportKind::Cumulative` strategy and the `with_memory(true)` builder
/// option, which ensures that every `CheckpointSet` includes full state.
#[derive(Debug)]
pub struct PullController {
accumulator: Accumulator,
Expand All @@ -47,10 +51,10 @@ impl PullController {
.map_or(true, |elapsed| elapsed > self.period)
{
self.last_collect = SystemTime::now();
self.processor.lock().and_then(|mut locked_processor| {
locked_processor.start_collection();
self.accumulator.0.collect(&mut locked_processor);
locked_processor.finish_collection()
self.processor.lock().and_then(|mut checkpointer| {
checkpointer.start_collection();
self.accumulator.0.collect(&mut checkpointer);
checkpointer.finish_collection()
})
} else {
Ok(())
Expand Down Expand Up @@ -91,6 +95,12 @@ pub struct PullControllerBuilder {
/// If the period is zero, caching of the result is disabled. The default value
/// is 10 seconds.
cache_period: Option<Duration>,

/// Memory controls whether the controller's processor remembers metric
/// instruments and label sets that were previously reported. When memory is
/// `true`, `CheckpointSet::try_for_each` will visit metrics that were not
/// updated in the most recent interval. Default true.
memory: bool,
}

impl PullControllerBuilder {
Expand All @@ -104,6 +114,7 @@ impl PullControllerBuilder {
export_selector,
resource: None,
cache_period: None,
memory: true,
}
}

Expand All @@ -123,12 +134,19 @@ impl PullControllerBuilder {
}
}

/// Sets the memory behavior of the controller's `Processor`. If this is
/// `true`, the processor will report metric instruments and label sets that
/// were previously reported but not updated in the most recent interval.
pub fn with_memory(self, memory: bool) -> Self {
PullControllerBuilder { memory, ..self }
}

/// Build a new `PullController` from the current configuration.
pub fn build(self) -> PullController {
let processor = Arc::new(processors::basic(
self.aggregator_selector,
self.export_selector,
true,
self.memory,
));

let accumulator = accumulator(processor.clone())
Expand Down
Loading