From 6b86ab6342405e4066400d9eb35c31faefd9dab5 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Sat, 25 Mar 2023 11:48:23 +0100 Subject: [PATCH 1/2] fix: Backport code from main repo This is pretty much a backport from the code that @jtescher did on the main repo Signed-off-by: Jayson Reis --- Cargo.toml | 8 +- README.md | 4 +- src/layer.rs | 146 ++++++++++-------- src/lib.rs | 4 +- src/metrics.rs | 93 +++++++----- src/tracer.rs | 26 ++-- tests/metrics_publishing.rs | 247 +++++++++++++++---------------- tests/trace_state_propagation.rs | 21 ++- 8 files changed, 287 insertions(+), 262 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 63c1967..82f2ac3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ categories = [ keywords = ["tracing", "opentelemetry", "jaeger", "zipkin", "async"] license = "MIT" edition = "2018" -rust-version = "1.46.0" +rust-version = "1.56.0" [features] default = ["tracing-log", "metrics"] @@ -25,7 +25,7 @@ default = ["tracing-log", "metrics"] metrics = ["opentelemetry/metrics"] [dependencies] -opentelemetry = { version = "0.17.0", default-features = false, features = ["trace"] } +opentelemetry = { version = "0.18.0", default-features = false, features = ["trace"] } tracing = { version = "0.1.35", default-features = false, features = ["std"] } tracing-core = "0.1.28" tracing-subscriber = { version = "0.3.0", default-features = false, features = ["registry", "std"] } @@ -39,7 +39,7 @@ thiserror = { version = "1.0.31", optional = true } [dev-dependencies] async-trait = "0.1.56" criterion = { version = "0.3.6", default-features = false } -opentelemetry-jaeger = "0.16.0" +opentelemetry-jaeger = "0.17.0" futures-util = { version = "0.3", default-features = false } tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" @@ -53,4 +53,4 @@ harness = false [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] \ No newline at end of file +rustdoc-args = ["--cfg", "docsrs"] diff --git a/README.md b/README.md index 4fcb658..9260d80 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ The crate provides the following types: [`tracing`]: https://crates.io/crates/tracing [OpenTelemetry]: https://opentelemetry.io/ -*Compiler support: [requires `rustc` 1.49+][msrv]* +*Compiler support: [requires `rustc` 1.56+][msrv]* [msrv]: #supported-rust-versions @@ -110,7 +110,7 @@ $ firefox http://localhost:16686/ ## Supported Rust Versions Tracing Opentelemetry is built against the latest stable release. The minimum -supported version is 1.46. The current Tracing version is not guaranteed to +supported version is 1.56. The current Tracing version is not guaranteed to build on Rust versions earlier than the minimum supported version. Tracing follows the same compiler support policies as the rest of the Tokio diff --git a/src/layer.rs b/src/layer.rs index 0e6af5c..8e6424b 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -1,11 +1,10 @@ use crate::{OtelData, PreSampledTracer}; use once_cell::unsync; use opentelemetry::{ - trace::{self as otel, noop, TraceContextExt}, - Context as OtelContext, Key, KeyValue, Value, + trace::{self as otel, noop, OrderMap, TraceContextExt}, + Context as OtelContext, Key, KeyValue, StringValue, Value, }; use std::any::TypeId; -use std::borrow::Cow; use std::fmt; use std::marker; use std::thread; @@ -106,12 +105,11 @@ fn str_to_span_kind(s: &str) -> Option { } } -fn str_to_status_code(s: &str) -> Option { +fn str_to_status(s: &str) -> otel::Status { match s { - s if s.eq_ignore_ascii_case("unset") => Some(otel::StatusCode::Unset), - s if s.eq_ignore_ascii_case("ok") => Some(otel::StatusCode::Ok), - s if s.eq_ignore_ascii_case("error") => Some(otel::StatusCode::Error), - _ => None, + s if s.eq_ignore_ascii_case("ok") => otel::Status::Ok, + s if s.eq_ignore_ascii_case("error") => otel::Status::error(""), + _ => otel::Status::Unset, } } @@ -217,11 +215,11 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { field: &tracing_core::Field, value: &(dyn std::error::Error + 'static), ) { - let mut chain = Vec::new(); + let mut chain: Vec = Vec::new(); let mut next_err = value.source(); while let Some(err) = next_err { - chain.push(Cow::Owned(err.to_string())); + chain.push(err.to_string().into()); next_err = err.source(); } @@ -246,7 +244,10 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { if self.exception_config.propagate { if let Some(span) = &mut self.span_builder { if let Some(attrs) = span.attributes.as_mut() { - attrs.push(Key::new(FIELD_EXCEPTION_MESSAGE).string(error_msg.clone())); + attrs.insert( + Key::new(FIELD_EXCEPTION_MESSAGE), + Value::String(error_msg.clone().into()), + ); // NOTE: This is actually not the stacktrace of the exception. This is // the "source chain". It represents the heirarchy of errors from the @@ -254,7 +255,10 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { // of the callsites in the code that led to the error happening. // `std::error::Error::backtrace` is a nightly-only API and cannot be // used here until the feature is stabilized. - attrs.push(Key::new(FIELD_EXCEPTION_STACKTRACE).array(chain.clone())); + attrs.insert( + Key::new(FIELD_EXCEPTION_STACKTRACE), + Value::Array(chain.clone().into()), + ); } } } @@ -289,7 +293,7 @@ impl<'a> SpanAttributeVisitor<'a> { fn record(&mut self, attribute: KeyValue) { debug_assert!(self.span_builder.attributes.is_some()); if let Some(v) = self.span_builder.attributes.as_mut() { - v.push(attribute); + v.insert(attribute.key, attribute.value); } } } @@ -323,9 +327,9 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { match field.name() { SPAN_NAME_FIELD => self.span_builder.name = value.to_string().into(), SPAN_KIND_FIELD => self.span_builder.span_kind = str_to_span_kind(value), - SPAN_STATUS_CODE_FIELD => self.span_builder.status_code = str_to_status_code(value), + SPAN_STATUS_CODE_FIELD => self.span_builder.status = str_to_status(value), SPAN_STATUS_MESSAGE_FIELD => { - self.span_builder.status_message = Some(value.to_owned().into()) + self.span_builder.status = otel::Status::error(value.to_string()) } _ => self.record(KeyValue::new(field.name(), value.to_string())), } @@ -342,10 +346,10 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { self.span_builder.span_kind = str_to_span_kind(&format!("{:?}", value)) } SPAN_STATUS_CODE_FIELD => { - self.span_builder.status_code = str_to_status_code(&format!("{:?}", value)) + self.span_builder.status = str_to_status(&format!("{:?}", value)) } SPAN_STATUS_MESSAGE_FIELD => { - self.span_builder.status_message = Some(format!("{:?}", value).into()) + self.span_builder.status = otel::Status::error(format!("{:?}", value)) } _ => self.record(Key::new(field.name()).string(format!("{:?}", value))), } @@ -360,11 +364,11 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { field: &tracing_core::Field, value: &(dyn std::error::Error + 'static), ) { - let mut chain = Vec::new(); + let mut chain: Vec = Vec::new(); let mut next_err = value.source(); while let Some(err) = next_err { - chain.push(Cow::Owned(err.to_string())); + chain.push(err.to_string().into()); next_err = err.source(); } @@ -406,7 +410,7 @@ where /// use tracing_subscriber::Registry; /// /// // Create a jaeger exporter pipeline for a `trace_demo` service. - /// let tracer = opentelemetry_jaeger::new_pipeline() + /// let tracer = opentelemetry_jaeger::new_agent_pipeline() /// .with_service_name("trace_demo") /// .install_simple() /// .expect("Error initializing Jaeger exporter"); @@ -447,7 +451,7 @@ where /// use tracing_subscriber::Registry; /// /// // Create a jaeger exporter pipeline for a `trace_demo` service. - /// let tracer = opentelemetry_jaeger::new_pipeline() + /// let tracer = opentelemetry_jaeger::new_agent_pipeline() /// .with_service_name("trace_demo") /// .install_simple() /// .expect("Error initializing Jaeger exporter"); @@ -685,7 +689,7 @@ where builder.trace_id = Some(self.tracer.new_trace_id()); } - let builder_attrs = builder.attributes.get_or_insert(Vec::with_capacity( + let builder_attrs = builder.attributes.get_or_insert(OrderMap::with_capacity( attrs.fields().len() + self.extra_span_attrs(), )); @@ -693,26 +697,26 @@ where let meta = attrs.metadata(); if let Some(filename) = meta.file() { - builder_attrs.push(KeyValue::new("code.filepath", filename)); + builder_attrs.insert("code.filepath".into(), filename.into()); } if let Some(module) = meta.module_path() { - builder_attrs.push(KeyValue::new("code.namespace", module)); + builder_attrs.insert("code.namespace".into(), module.into()); } if let Some(line) = meta.line() { - builder_attrs.push(KeyValue::new("code.lineno", line as i64)); + builder_attrs.insert("code.lineno".into(), (line as i64).into()); } } if self.with_threads { - THREAD_ID.with(|id| builder_attrs.push(KeyValue::new("thread.id", **id as i64))); + THREAD_ID.with(|id| builder_attrs.insert("thread.id".into(), (**id as i64).into())); if let Some(name) = std::thread::current().name() { // TODO(eliza): it's a bummer that we have to allocate here, but // we can't easily get the string as a `static`. it would be // nice if `opentelemetry` could also take `Arc`s as // `String` values... - builder_attrs.push(KeyValue::new("thread.name", name.to_owned())); + builder_attrs.insert("thread.name".into(), name.to_owned().into()); } } @@ -846,8 +850,10 @@ where }); if let Some(OtelData { builder, .. }) = extensions.get_mut::() { - if builder.status_code.is_none() && *meta.level() == tracing_core::Level::ERROR { - builder.status_code = Some(otel::StatusCode::Error); + if builder.status == otel::Status::Unset + && *meta.level() == tracing_core::Level::ERROR + { + builder.status = otel::Status::error("") } if self.location { @@ -905,15 +911,14 @@ where if self.tracked_inactivity { // Append busy/idle timings when enabled. if let Some(timings) = extensions.get_mut::() { - let busy_ns = KeyValue::new("busy_ns", timings.busy); - let idle_ns = KeyValue::new("idle_ns", timings.idle); - - if let Some(ref mut attributes) = builder.attributes { - attributes.push(busy_ns); - attributes.push(idle_ns); - } else { - builder.attributes = Some(vec![busy_ns, idle_ns]); - } + let busy_ns = Key::new("busy_ns"); + let idle_ns = Key::new("idle_ns"); + + let attributes = builder + .attributes + .get_or_insert_with(|| OrderMap::with_capacity(2)); + attributes.insert(busy_ns, timings.busy.into()); + attributes.insert(idle_ns, timings.idle.into()); } } @@ -966,7 +971,7 @@ fn thread_id_integer(id: thread::ThreadId) -> u64 { mod tests { use super::*; use crate::OtelData; - use opentelemetry::trace::{noop, SpanKind, TraceFlags}; + use opentelemetry::trace::{noop, TraceFlags}; use std::{ borrow::Cow, collections::HashMap, @@ -1044,7 +1049,7 @@ mod tests { false } fn set_attribute(&mut self, _attribute: KeyValue) {} - fn set_status(&mut self, _code: otel::StatusCode, _message: String) {} + fn set_status(&mut self, _status: otel::Status) {} fn update_name>>(&mut self, _new_name: T) {} fn end_with_timestamp(&mut self, _timestamp: SystemTime) {} } @@ -1088,6 +1093,16 @@ mod tests { tracing::subscriber::with_default(subscriber, || { tracing::debug_span!("static_name", otel.name = dynamic_name.as_str()); }); + let recorded_status = tracer + .0 + .lock() + .unwrap() + .as_ref() + .unwrap() + .builder + .status + .clone(); + assert_eq!(recorded_status, otel::Status::Unset); let recorded_name = tracer .0 @@ -1104,7 +1119,7 @@ mod tests { let subscriber = tracing_subscriber::registry().with(layer().with_tracer(tracer.clone())); tracing::subscriber::with_default(subscriber, || { - tracing::debug_span!("request", otel.kind = %SpanKind::Server); + tracing::debug_span!("request", otel.kind = "server"); }); let recorded_kind = tracer.with_data(|data| data.builder.span_kind.clone()); @@ -1117,11 +1132,11 @@ mod tests { let subscriber = tracing_subscriber::registry().with(layer().with_tracer(tracer.clone())); tracing::subscriber::with_default(subscriber, || { - tracing::debug_span!("request", otel.status_code = ?otel::StatusCode::Ok); + tracing::debug_span!("request", otel.status_code = ?otel::Status::Ok); }); - let recorded_status_code = tracer.with_data(|data| data.builder.status_code); - assert_eq!(recorded_status_code, Some(otel::StatusCode::Ok)) + let recorded_status = tracer.with_data(|data| data.builder.status.clone()); + assert_eq!(recorded_status, otel::Status::Ok) } #[test] @@ -1135,8 +1150,17 @@ mod tests { tracing::debug_span!("request", otel.status_message = message); }); - let recorded_status_message = tracer.with_data(|data| data.builder.status_message.clone()); - assert_eq!(recorded_status_message, Some(message.into())) + let recorded_status_message = tracer + .0 + .lock() + .unwrap() + .as_ref() + .unwrap() + .builder + .status + .clone(); + + assert_eq!(recorded_status_message, otel::Status::error(message)) } #[test] @@ -1154,7 +1178,7 @@ mod tests { let _g = existing_cx.attach(); tracing::subscriber::with_default(subscriber, || { - tracing::debug_span!("request", otel.kind = %SpanKind::Server); + tracing::debug_span!("request", otel.kind = "server"); }); let recorded_trace_id = @@ -1178,7 +1202,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(keys.contains(&"idle_ns")); assert!(keys.contains(&"busy_ns")); @@ -1218,7 +1242,7 @@ mod tests { let key_values = attributes .into_iter() - .map(|attr| (attr.key.as_str().to_owned(), attr.value)) + .map(|(key, value)| (key.as_str().to_owned(), value)) .collect::>(); assert_eq!(key_values["error"].as_str(), "user error"); @@ -1226,8 +1250,8 @@ mod tests { key_values["error.chain"], Value::Array( vec![ - Cow::Borrowed("intermediate error"), - Cow::Borrowed("base error") + StringValue::from("intermediate error"), + StringValue::from("base error") ] .into() ) @@ -1238,8 +1262,8 @@ mod tests { key_values[FIELD_EXCEPTION_STACKTRACE], Value::Array( vec![ - Cow::Borrowed("intermediate error"), - Cow::Borrowed("base error") + StringValue::from("intermediate error"), + StringValue::from("base error") ] .into() ) @@ -1259,7 +1283,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(keys.contains(&"code.filepath")); assert!(keys.contains(&"code.namespace")); @@ -1279,7 +1303,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(!keys.contains(&"code.filepath")); assert!(!keys.contains(&"code.namespace")); @@ -1291,7 +1315,7 @@ mod tests { let thread = thread::current(); let expected_name = thread .name() - .map(|name| Value::String(Cow::Owned(name.to_owned()))); + .map(|name| Value::String(name.to_owned().into())); let expected_id = Value::I64(thread_id_integer(thread.id()) as i64); let tracer = TestTracer(Arc::new(Mutex::new(None))); @@ -1305,7 +1329,7 @@ mod tests { let attributes = tracer .with_data(|data| data.builder.attributes.as_ref().unwrap().clone()) .drain(..) - .map(|keyval| (keyval.key.as_str().to_string(), keyval.value)) + .map(|(key, value)| (key.as_str().to_string(), value)) .collect::>(); assert_eq!(attributes.get("thread.name"), expected_name.as_ref()); assert_eq!(attributes.get("thread.id"), Some(&expected_id)); @@ -1324,7 +1348,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(!keys.contains(&"thread.name")); assert!(!keys.contains(&"thread.id")); @@ -1366,7 +1390,7 @@ mod tests { let key_values = attributes .into_iter() - .map(|attr| (attr.key.as_str().to_owned(), attr.value)) + .map(|(key, value)| (key.as_str().to_owned(), value)) .collect::>(); assert_eq!(key_values[FIELD_EXCEPTION_MESSAGE].as_str(), "user error"); @@ -1374,8 +1398,8 @@ mod tests { key_values[FIELD_EXCEPTION_STACKTRACE], Value::Array( vec![ - Cow::Borrowed("intermediate error"), - Cow::Borrowed("base error") + StringValue::from("intermediate error"), + StringValue::from("base error") ] .into() ) diff --git a/src/lib.rs b/src/lib.rs index 5cd725d..d3bc5bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,7 @@ //! [OpenTelemetry]: https://opentelemetry.io //! [`tracing`]: https://github.com/tokio-rs/tracing //! -//! *Compiler support: [requires `rustc` 1.49+][msrv]* +//! *Compiler support: [requires `rustc` 1.56+][msrv]* //! //! [msrv]: #supported-rust-versions //! @@ -86,7 +86,7 @@ //! ## Supported Rust Versions //! //! Tracing is built against the latest stable release. The minimum supported -//! version is 1.49. The current Tracing version is not guaranteed to build on +//! version is 1.56. The current Tracing version is not guaranteed to build on //! Rust versions earlier than the minimum supported version. //! //! Tracing follows the same compiler support policies as the rest of the Tokio diff --git a/src/metrics.rs b/src/metrics.rs index 37df62c..91b9e63 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -3,8 +3,9 @@ use tracing::{field::Visit, Subscriber}; use tracing_core::Field; use opentelemetry::{ - metrics::{Counter, Meter, MeterProvider, UpDownCounter, ValueRecorder}, - sdk::metrics::PushController, + metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter}, + sdk::metrics::controllers::BasicController, + Context as OtelContext, }; use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer}; @@ -13,7 +14,7 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry"; const METRIC_PREFIX_MONOTONIC_COUNTER: &str = "monotonic_counter."; const METRIC_PREFIX_COUNTER: &str = "counter."; -const METRIC_PREFIX_VALUE: &str = "value."; +const METRIC_PREFIX_HISTOGRAM: &str = "histogram."; const I64_MAX: u64 = i64::MAX as u64; #[derive(Default)] @@ -22,9 +23,9 @@ pub(crate) struct Instruments { f64_counter: MetricsMap>, i64_up_down_counter: MetricsMap>, f64_up_down_counter: MetricsMap>, - u64_value_recorder: MetricsMap>, - i64_value_recorder: MetricsMap>, - f64_value_recorder: MetricsMap>, + u64_histogram: MetricsMap>, + i64_histogram: MetricsMap>, + f64_histogram: MetricsMap>, } type MetricsMap = RwLock>; @@ -35,14 +36,15 @@ pub(crate) enum InstrumentType { CounterF64(f64), UpDownCounterI64(i64), UpDownCounterF64(f64), - ValueRecorderU64(u64), - ValueRecorderI64(i64), - ValueRecorderF64(f64), + HistogramU64(u64), + HistogramI64(i64), + HistogramF64(f64), } impl Instruments { pub(crate) fn update_metric( &self, + cx: &OtelContext, meter: &Meter, instrument_type: InstrumentType, metric_name: &'static str, @@ -76,7 +78,7 @@ impl Instruments { &self.u64_counter, metric_name, || meter.u64_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::CounterF64(value) => { @@ -84,7 +86,7 @@ impl Instruments { &self.f64_counter, metric_name, || meter.f64_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::UpDownCounterI64(value) => { @@ -92,7 +94,7 @@ impl Instruments { &self.i64_up_down_counter, metric_name, || meter.i64_up_down_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::UpDownCounterF64(value) => { @@ -100,31 +102,31 @@ impl Instruments { &self.f64_up_down_counter, metric_name, || meter.f64_up_down_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } - InstrumentType::ValueRecorderU64(value) => { + InstrumentType::HistogramU64(value) => { update_or_insert( - &self.u64_value_recorder, + &self.u64_histogram, metric_name, - || meter.u64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.u64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } - InstrumentType::ValueRecorderI64(value) => { + InstrumentType::HistogramI64(value) => { update_or_insert( - &self.i64_value_recorder, + &self.i64_histogram, metric_name, - || meter.i64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.i64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } - InstrumentType::ValueRecorderF64(value) => { + InstrumentType::HistogramF64(value) => { update_or_insert( - &self.f64_value_recorder, + &self.f64_histogram, metric_name, - || meter.f64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.f64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } }; @@ -142,8 +144,10 @@ impl<'a> Visit for MetricVisitor<'a> { } fn record_u64(&mut self, field: &Field, value: u64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterU64(value), metric_name, @@ -151,6 +155,7 @@ impl<'a> Visit for MetricVisitor<'a> { } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { if value <= I64_MAX { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterI64(value as i64), metric_name, @@ -163,54 +168,63 @@ impl<'a> Visit for MetricVisitor<'a> { value ); } - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderU64(value), + InstrumentType::HistogramU64(value), metric_name, ); } } fn record_f64(&mut self, field: &Field, value: f64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterF64(value), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterF64(value), metric_name, ); - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderF64(value), + InstrumentType::HistogramF64(value), metric_name, ); } } fn record_i64(&mut self, field: &Field, value: i64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterU64(value as u64), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterI64(value), metric_name, ); - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderI64(value), + InstrumentType::HistogramI64(value), metric_name, ); } @@ -232,14 +246,14 @@ impl<'a> Visit for MetricVisitor<'a> { /// use tracing_opentelemetry::MetricsLayer; /// use tracing_subscriber::layer::SubscriberExt; /// use tracing_subscriber::Registry; -/// # use opentelemetry::sdk::metrics::PushController; +/// # use opentelemetry::sdk::metrics::controllers::BasicController; /// -/// // Constructing a PushController is out-of-scope for the docs here, but there +/// // Constructing a BasicController is out-of-scope for the docs here, but there /// // are examples in the opentelemetry repository. See: -/// // https://github.com/open-telemetry/opentelemetry-rust/blob/c13a11e62a68eacd8c41a0742a0d097808e28fbd/examples/basic-otlp/src/main.rs#L39-L53 -/// # let push_controller: PushController = unimplemented!(); +/// // https://github.com/open-telemetry/opentelemetry-rust/blob/d4b9befea04bcc7fc19319a6ebf5b5070131c486/examples/basic-otlp/src/main.rs#L35-L52 +/// # let controller: BasicController = unimplemented!(); /// -/// let opentelemetry_metrics = MetricsLayer::new(push_controller); +/// let opentelemetry_metrics = MetricsLayer::new(controller); /// let subscriber = Registry::default().with(opentelemetry_metrics); /// tracing::subscriber::set_global_default(subscriber).unwrap(); /// ``` @@ -329,10 +343,9 @@ pub struct MetricsLayer { impl MetricsLayer { /// Create a new instance of MetricsLayer. - pub fn new(push_controller: PushController) -> Self { - let meter = push_controller - .provider() - .meter(INSTRUMENTATION_LIBRARY_NAME, Some(CARGO_PKG_VERSION)); + pub fn new(controller: BasicController) -> Self { + let meter = + controller.versioned_meter(INSTRUMENTATION_LIBRARY_NAME, Some(CARGO_PKG_VERSION), None); MetricsLayer { meter, instruments: Default::default(), diff --git a/src/tracer.rs b/src/tracer.rs index 9cd6d37..66c52fe 100644 --- a/src/tracer.rs +++ b/src/tracer.rs @@ -1,9 +1,10 @@ -use opentelemetry::sdk::trace::{SamplingDecision, SamplingResult, Tracer, TracerProvider}; +use opentelemetry::sdk::trace::{Tracer, TracerProvider}; +use opentelemetry::trace::OrderMap; use opentelemetry::{ trace as otel, trace::{ - noop, SpanBuilder, SpanContext, SpanId, SpanKind, TraceContextExt, TraceFlags, TraceId, - TraceState, + noop, SamplingDecision, SamplingResult, SpanBuilder, SpanContext, SpanId, SpanKind, + TraceContextExt, TraceFlags, TraceId, TraceState, }, Context as OtelContext, }; @@ -74,19 +75,18 @@ impl PreSampledTracer for Tracer { let builder = &mut data.builder; // Gather trace state - let (no_parent, trace_id, remote_parent, parent_trace_flags) = - current_trace_state(builder, parent_cx, &provider); + let (trace_id, parent_trace_flags) = current_trace_state(builder, parent_cx, &provider); // Sample or defer to existing sampling decisions let (flags, trace_state) = if let Some(result) = &builder.sampling_result { process_sampling_result(result, parent_trace_flags) - } else if no_parent || remote_parent { + } else { builder.sampling_result = Some(provider.config().sampler.should_sample( Some(parent_cx), trace_id, &builder.name, builder.span_kind.as_ref().unwrap_or(&SpanKind::Internal), - builder.attributes.as_deref().unwrap_or(&[]), + builder.attributes.as_ref().unwrap_or(&OrderMap::default()), builder.links.as_deref().unwrap_or(&[]), self.instrumentation_library(), )); @@ -95,12 +95,6 @@ impl PreSampledTracer for Tracer { builder.sampling_result.as_ref().unwrap(), parent_trace_flags, ) - } else { - // has parent that is local - Some(( - parent_trace_flags, - parent_cx.span().span_context().trace_state().clone(), - )) } .unwrap_or_default(); @@ -126,18 +120,16 @@ fn current_trace_state( builder: &SpanBuilder, parent_cx: &OtelContext, provider: &TracerProvider, -) -> (bool, TraceId, bool, TraceFlags) { +) -> (TraceId, TraceFlags) { if parent_cx.has_active_span() { let span = parent_cx.span(); let sc = span.span_context(); - (false, sc.trace_id(), sc.is_remote(), sc.trace_flags()) + (sc.trace_id(), sc.trace_flags()) } else { ( - true, builder .trace_id .unwrap_or_else(|| provider.config().id_generator.new_trace_id()), - false, Default::default(), ) } diff --git a/tests/metrics_publishing.rs b/tests/metrics_publishing.rs index 9db53fc..7616b8c 100644 --- a/tests/metrics_publishing.rs +++ b/tests/metrics_publishing.rs @@ -1,26 +1,21 @@ -#![cfg(feature = "metrics")] -use async_trait::async_trait; -use futures_util::{Stream, StreamExt as _}; use opentelemetry::{ - metrics::{Descriptor, InstrumentKind}, - metrics::{Number, NumberKind}, + metrics::MetricsError, sdk::{ - export::{ - metrics::{ - CheckpointSet, ExportKind, ExportKindFor, ExportKindSelector, - Exporter as MetricsExporter, Points, Sum, - }, - trace::{SpanData, SpanExporter}, + export::metrics::{ + aggregation::{self, Histogram, Sum, TemporalitySelector}, + InstrumentationLibraryReader, }, metrics::{ - aggregators::{ArrayAggregator, SumAggregator}, - selectors::simple::Selector, + aggregators::{HistogramAggregator, SumAggregator}, + controllers::BasicController, + processors, + sdk_api::{Descriptor, InstrumentKind, Number, NumberKind}, + selectors, }, }, - Key, Value, + Context, }; use std::cmp::Ordering; -use std::time::Duration; use tracing::Subscriber; use tracing_opentelemetry::MetricsLayer; use tracing_subscriber::prelude::*; @@ -30,7 +25,7 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry"; #[tokio::test] async fn u64_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "hello_world".to_string(), InstrumentKind::Counter, NumberKind::U64, @@ -40,11 +35,13 @@ async fn u64_counter_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world = 1_u64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn u64_counter_is_exported_i64_at_instrumentation_point() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "hello_world2".to_string(), InstrumentKind::Counter, NumberKind::U64, @@ -54,11 +51,13 @@ async fn u64_counter_is_exported_i64_at_instrumentation_point() { tracing::subscriber::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world2 = 1_i64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn f64_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "float_hello_world".to_string(), InstrumentKind::Counter, NumberKind::F64, @@ -68,11 +67,13 @@ async fn f64_counter_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(monotonic_counter.float_hello_world = 1.000000123_f64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn i64_up_down_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak".to_string(), InstrumentKind::UpDownCounter, NumberKind::I64, @@ -82,11 +83,13 @@ async fn i64_up_down_counter_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(counter.pebcak = -5_i64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak2".to_string(), InstrumentKind::UpDownCounter, NumberKind::I64, @@ -96,11 +99,13 @@ async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() { tracing::subscriber::with_default(subscriber, || { tracing::info!(counter.pebcak2 = 5_u64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn f64_up_down_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak_blah".to_string(), InstrumentKind::UpDownCounter, NumberKind::F64, @@ -110,13 +115,15 @@ async fn f64_up_down_counter_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(counter.pebcak_blah = 99.123_f64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn u64_value_is_exported() { - let subscriber = init_subscriber( +async fn u64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::U64, Number::from(9_u64), ); @@ -124,13 +131,15 @@ async fn u64_value_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(value.abcdefg = 9_u64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn i64_value_is_exported() { - let subscriber = init_subscriber( +async fn i64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg_auenatsou".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::I64, Number::from(-19_i64), ); @@ -138,13 +147,15 @@ async fn i64_value_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(value.abcdefg_auenatsou = -19_i64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn f64_value_is_exported() { - let subscriber = init_subscriber( +async fn f64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg_racecar".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::F64, Number::from(777.0012_f64), ); @@ -152,6 +163,8 @@ async fn f64_value_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(value.abcdefg_racecar = 777.0012_f64); }); + + exporter.export().unwrap(); } fn init_subscriber( @@ -159,24 +172,24 @@ fn init_subscriber( expected_instrument_kind: InstrumentKind, expected_number_kind: NumberKind, expected_value: Number, -) -> impl Subscriber + 'static { +) -> (impl Subscriber + 'static, TestExporter) { + let controller = opentelemetry::sdk::metrics::controllers::basic(processors::factory( + selectors::simple::histogram(vec![-10.0, 100.0]), + aggregation::cumulative_temporality_selector(), + )) + .build(); let exporter = TestExporter { expected_metric_name, expected_instrument_kind, expected_number_kind, expected_value, + controller: controller.clone(), }; - let push_controller = opentelemetry::sdk::metrics::controllers::push( - Selector::Exact, - ExportKindSelector::Stateless, + ( + tracing_subscriber::registry().with(MetricsLayer::new(controller)), exporter, - tokio::spawn, - delayed_interval, ) - .build(); - - tracing_subscriber::registry().with(MetricsLayer::new(push_controller)) } #[derive(Clone, Debug)] @@ -185,100 +198,84 @@ struct TestExporter { expected_instrument_kind: InstrumentKind, expected_number_kind: NumberKind, expected_value: Number, + controller: BasicController, } -#[async_trait] -impl SpanExporter for TestExporter { - async fn export( - &mut self, - mut _batch: Vec, - ) -> opentelemetry::sdk::export::trace::ExportResult { - Ok(()) - } -} +impl TestExporter { + fn export(&self) -> Result<(), MetricsError> { + self.controller.collect(&Context::current())?; + self.controller.try_for_each(&mut |library, reader| { + reader.try_for_each(self, &mut |record| { + assert_eq!(self.expected_metric_name, record.descriptor().name()); + assert_eq!( + self.expected_instrument_kind, + *record.descriptor().instrument_kind() + ); + assert_eq!( + self.expected_number_kind, + *record.descriptor().number_kind() + ); + match self.expected_instrument_kind { + InstrumentKind::Counter | InstrumentKind::UpDownCounter => { + let number = record + .aggregator() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .sum() + .unwrap(); + + assert_eq!( + Ordering::Equal, + number + .partial_cmp(&NumberKind::U64, &self.expected_value) + .unwrap() + ); + } + InstrumentKind::Histogram => { + let histogram = record + .aggregator() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .histogram() + .unwrap(); + + let counts = histogram.counts(); + if dbg!(self.expected_value.to_i64(&self.expected_number_kind)) > 100 { + assert_eq!(counts, &[0.0, 0.0, 1.0]); + } else if self.expected_value.to_i64(&self.expected_number_kind) > 0 { + assert_eq!(counts, &[0.0, 1.0, 0.0]); + } else { + assert_eq!(counts, &[1.0, 0.0, 0.0]); + } + } + _ => panic!( + "InstrumentKind {:?} not currently supported!", + self.expected_instrument_kind + ), + }; -impl MetricsExporter for TestExporter { - fn export(&self, checkpoint_set: &mut dyn CheckpointSet) -> opentelemetry::metrics::Result<()> { - checkpoint_set.try_for_each(self, &mut |record| { - assert_eq!(self.expected_metric_name, record.descriptor().name()); - assert_eq!( - self.expected_instrument_kind, - *record.descriptor().instrument_kind() - ); - assert_eq!( - self.expected_number_kind, - *record.descriptor().number_kind() - ); - let number = match self.expected_instrument_kind { - InstrumentKind::Counter | InstrumentKind::UpDownCounter => record - .aggregator() - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .sum() - .unwrap(), - InstrumentKind::ValueRecorder => record - .aggregator() - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .points() - .unwrap()[0] - .clone(), - _ => panic!( - "InstrumentKind {:?} not currently supported!", - self.expected_instrument_kind - ), - }; - assert_eq!( - Ordering::Equal, - number - .partial_cmp(&NumberKind::U64, &self.expected_value) - .unwrap() - ); - - // The following are the same regardless of the individual metric. - assert_eq!( - INSTRUMENTATION_LIBRARY_NAME, - record.descriptor().instrumentation_library().name - ); - assert_eq!( - CARGO_PKG_VERSION, - record.descriptor().instrumentation_version().unwrap() - ); - assert_eq!( - Value::String("unknown_service".into()), - record - .resource() - .get(Key::new("service.name".to_string())) - .unwrap() - ); - - opentelemetry::metrics::Result::Ok(()) + // The following are the same regardless of the individual metric. + assert_eq!(INSTRUMENTATION_LIBRARY_NAME, library.name); + assert_eq!(CARGO_PKG_VERSION, library.version.as_ref().unwrap()); + + Ok(()) + }) }) } } -impl ExportKindFor for TestExporter { - fn export_kind_for(&self, _descriptor: &Descriptor) -> ExportKind { +impl TemporalitySelector for TestExporter { + fn temporality_for( + &self, + _descriptor: &Descriptor, + _kind: &aggregation::AggregationKind, + ) -> aggregation::Temporality { // I don't think the value here makes a difference since // we are just testing a single metric. - ExportKind::Cumulative + aggregation::Temporality::Cumulative } } - -// From opentelemetry::sdk::util:: -// For some reason I can't pull it in from the other crate, it gives -// could not find `util` in `sdk` -/// Helper which wraps `tokio::time::interval` and makes it return a stream -fn tokio_interval_stream(period: std::time::Duration) -> tokio_stream::wrappers::IntervalStream { - tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(period)) -} - -// https://github.com/open-telemetry/opentelemetry-rust/blob/2585d109bf90d53d57c91e19c758dca8c36f5512/examples/basic-otlp/src/main.rs#L34-L37 -// Skip first immediate tick from tokio, not needed for async_std. -fn delayed_interval(duration: Duration) -> impl Stream { - tokio_interval_stream(duration).skip(0) -} diff --git a/tests/trace_state_propagation.rs b/tests/trace_state_propagation.rs index 0f92bc4..49200b4 100644 --- a/tests/trace_state_propagation.rs +++ b/tests/trace_state_propagation.rs @@ -1,8 +1,8 @@ -use async_trait::async_trait; +use futures_util::future::BoxFuture; use opentelemetry::{ propagation::TextMapPropagator, sdk::{ - export::trace::{SpanData, SpanExporter}, + export::trace::{ExportResult, SpanData, SpanExporter}, propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator}, trace::{Tracer, TracerProvider}, }, @@ -158,15 +158,14 @@ fn build_sampled_context() -> (Context, impl Subscriber, TestExporter, TracerPro #[derive(Clone, Default, Debug)] struct TestExporter(Arc>>); -#[async_trait] impl SpanExporter for TestExporter { - async fn export( - &mut self, - mut batch: Vec, - ) -> opentelemetry::sdk::export::trace::ExportResult { - if let Ok(mut inner) = self.0.lock() { - inner.append(&mut batch); - } - Ok(()) + fn export(&mut self, mut batch: Vec) -> BoxFuture<'static, ExportResult> { + let spans = self.0.clone(); + Box::pin(async move { + if let Ok(mut inner) = spans.lock() { + inner.append(&mut batch); + } + Ok(()) + }) } } From 1e3fd32c888902d0d4f0c9025c5c4dda7d144d2d Mon Sep 17 00:00:00 2001 From: Jayson Reis Date: Wed, 3 May 2023 19:11:43 +0200 Subject: [PATCH 2/2] chore: Fix example's jaeger initialization Signed-off-by: Jayson Reis --- examples/opentelemetry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/opentelemetry.rs b/examples/opentelemetry.rs index b536c89..23dc9a3 100644 --- a/examples/opentelemetry.rs +++ b/examples/opentelemetry.rs @@ -18,7 +18,7 @@ fn main() -> Result<(), Box> { // Install an otel pipeline with a simple span processor that exports data one at a time when // spans end. See the `install_batch` option on each exporter's pipeline builder to see how to // export in batches. - let tracer = opentelemetry_jaeger::new_pipeline() + let tracer = opentelemetry_jaeger::new_agent_pipeline() .with_service_name("report_example") .install_simple()?; let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);