Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simpler usage of Observable instruments #1715

Merged
95 changes: 43 additions & 52 deletions examples/metrics-basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,21 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
);

// Create a ObservableCounter instrument and register a callback that reports the measurement.
let observable_counter = meter
let _observable_counter = meter
.u64_observable_counter("my_observable_counter")
.with_description("My observable counter example description")
.with_unit(Unit::new("myunit"))
.with_callback(|observer| {
observer.observe(
100,
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
)
})
.init();

meter.register_callback(&[observable_counter.as_any()], move |observer| {
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
observer.observe_u64(
&observable_counter,
100,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
)
})?;

// Create a UpCounter Instrument.
let updown_counter = meter.i64_up_down_counter("my_updown_counter").init();

Expand All @@ -73,23 +71,21 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
);

// Create a Observable UpDownCounter instrument and register a callback that reports the measurement.
let observable_up_down_counter = meter
let _observable_up_down_counter = meter
.i64_observable_up_down_counter("my_observable_updown_counter")
.with_description("My observable updown counter example description")
.with_unit(Unit::new("myunit"))
.with_callback(|observer| {
observer.observe(
100,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
)
})
.init();

meter.register_callback(&[observable_up_down_counter.as_any()], move |observer| {
observer.observe_i64(
&observable_up_down_counter,
100,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
)
})?;

// Create a Histogram Instrument.
let histogram = meter
.f64_histogram("my_histogram")
Expand All @@ -108,41 +104,36 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Note that there is no ObservableHistogram instrument.

// Create a Gauge Instrument.
{
let gauge = meter
.f64_gauge("my_gauge")
.with_description("A gauge set to 1.0")
.with_unit(Unit::new("myunit"))
.init();

gauge.record(
1.0,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
);
}
let gauge = meter
.f64_gauge("my_gauge")
.with_description("A gauge set to 1.0")
.with_unit(Unit::new("myunit"))
.init();

gauge.record(
1.0,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
);

// Create a ObservableGauge instrument and register a callback that reports the measurement.
let observable_gauge = meter
let _observable_gauge = meter
.f64_observable_gauge("my_observable_gauge")
.with_description("An observable gauge set to 1.0")
.with_unit(Unit::new("myunit"))
.with_callback(|observer| {
observer.observe(
1.0,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
)
})
.init();

// Register a callback that reports the measurement.
meter.register_callback(&[observable_gauge.as_any()], move |observer| {
observer.observe_f64(
&observable_gauge,
1.0,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
)
})?;

// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
Expand Down
7 changes: 2 additions & 5 deletions opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let tracer = global::tracer("ex.com/basic");
let meter = global::meter("ex.com/basic");

let gauge = meter
let _gauge = meter
.f64_observable_gauge("ex.com.one")
.with_description("A gauge set to 1.0")
.with_callback(|observer| observer.observe(1.0, COMMON_ATTRIBUTES.as_ref()))
.init();

meter.register_callback(&[gauge.as_any()], move |observer| {
observer.observe_f64(&gauge, 1.0, COMMON_ATTRIBUTES.as_ref())
})?;

let histogram = meter.f64_histogram("ex.com.two").init();
histogram.record(5.5, COMMON_ATTRIBUTES.as_ref());

Expand Down
122 changes: 99 additions & 23 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,91 @@ mod tests {
);
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation --features=metrics,testing -- --nocapture

// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();

// Act
let meter = meter_provider.meter("test");
let _counter = meter
.u64_observable_counter("my_observable_counter")
.with_unit(Unit::new("my_unit"))
.with_callback(|observer| {
observer.observe(100, &[KeyValue::new("key1", "value1")]);
observer.observe(200, &[KeyValue::new("key1", "value2")]);
})
.init();

meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter");
assert_eq!(metric.unit.as_str(), "my_unit");
let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");

// Expecting 2 time-series.
assert_eq!(sum.data_points.len(), 2);
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(
sum.temporality,
data::Temporality::Cumulative,
"Should produce cumulative by default."
);

// find and validate key1=value1 datapoint
let mut data_point1 = None;
for datapoint in &sum.data_points {
if datapoint
.attributes
.iter()
.any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value1")
{
data_point1 = Some(datapoint);
}
}
assert_eq!(
data_point1
.expect("datapoint with key1=value1 expected")
.value,
100
);

// find and validate key1=value2 datapoint
let mut data_point1 = None;
for datapoint in &sum.data_points {
if datapoint
.attributes
.iter()
.any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value2")
{
data_point1 = Some(datapoint);
}
}
assert_eq!(
data_point1
.expect("datapoint with key1=value2 expected")
.value,
200
);
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down Expand Up @@ -445,7 +530,7 @@ mod tests {
// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "Spatial aggregation is not yet implemented."]
// #[ignore = "Spatial aggregation is not yet implemented."]
async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
// cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=metrics,testing

Expand All @@ -465,43 +550,34 @@ mod tests {

// Act
let meter = meter_provider.meter("test");
let observable_counter = meter.u64_observable_counter("my_observable_counter").init();

// Normally, these callbacks would generate 3 time-series, but since the view
// drops all attributes, we expect only 1 time-series.
meter
.register_callback(&[observable_counter.as_any()], move |observer| {
observer.observe_u64(
&observable_counter,
let _observable_counter = meter
.u64_observable_counter("my_observable_counter")
.with_callback(|observer| {
observer.observe(
100,
[
&[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "get"),
]
.as_ref(),
],
);

observer.observe_u64(
&observable_counter,
observer.observe(
100,
[
&[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "post"),
]
.as_ref(),
],
);

observer.observe_u64(
&observable_counter,
observer.observe(
100,
[
&[
KeyValue::new("statusCode", "500"),
KeyValue::new("verb", "get"),
]
.as_ref(),
],
);
})
.expect("Expected to register callback");
.init();

meter_provider.force_flush().unwrap();

Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,12 @@ mod tests {
// Act
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let counter = meter.u64_observable_counter("testcounter").init();
meter
.register_callback(&[counter.as_any()], move |_| {
let _counter = meter
.u64_observable_counter("testcounter")
.with_callback(move |_| {
sender.send(()).expect("channel should still be open");
})
.expect("callback registration should succeed");
.init();

_ = meter_provider.force_flush();

Expand Down
Loading