Skip to content

Commit

Permalink
Add measure flag and agent sampling
Browse files Browse the repository at this point in the history
  • Loading branch information
Hartigan committed Dec 3, 2023
1 parent 3f7d5ca commit a5afbb8
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 2 deletions.
2 changes: 2 additions & 0 deletions opentelemetry-datadog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[features]
measure = []
agent-sampling = []
reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"]
reqwest-client = ["reqwest", "opentelemetry-http/reqwest"]
surf-client = ["surf", "opentelemetry-http/surf"]
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-datadog/src/exporter/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ mod v05;
// https://github.com/DataDog/dd-trace-js/blob/c89a35f7d27beb4a60165409376e170eacb194c5/packages/dd-trace/src/constants.js#L4
static SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1";

// https://github.com/DataDog/datadog-agent/blob/ec96f3c24173ec66ba235bda7710504400d9a000/pkg/trace/traceutil/span.go#L20
#[cfg(feature = "measure")]
static DD_MEASURED_KEY: &str = "_dd.measured";

/// Custom mapping between opentelemetry spans and datadog spans.
///
/// User can provide custom function to change the mapping. It currently supports customizing the following
Expand Down
33 changes: 31 additions & 2 deletions opentelemetry-datadog/src/exporter/model/v05.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use crate::exporter::intern::StringInterner;
#[cfg(feature = "measure")]
use crate::exporter::model::DD_MEASURED_KEY;
use crate::exporter::model::SAMPLING_PRIORITY_KEY;
use crate::exporter::{Error, ModelConfig};
#[cfg(feature = "agent-sampling")]
use crate::propagator::TRACE_STATE_PRIORITY_SAMPLING;
#[cfg(feature = "measure")]
use crate::propagator::TRACE_STATE_MEASURE;
use opentelemetry::trace::Status;
use opentelemetry_sdk::export::trace::SpanData;
use std::time::SystemTime;
Expand Down Expand Up @@ -204,16 +210,39 @@ where
rmp::encode::write_u32(&mut encoded, interner.intern(kv.key.as_str()))?;
rmp::encode::write_u32(&mut encoded, interner.intern(kv.value.as_str().as_ref()))?;
}
rmp::encode::write_map_len(&mut encoded, 1)?;

const MEASURE_ENTRY : u32 = if cfg!(feature = "measure") { 1 } else { 0 };
const METRICS_LEN : u32 = 1 + MEASURE_ENTRY;

#[cfg(not(feature = "agent-sampling"))]
let sampling_priority = true;
#[cfg(feature = "agent-sampling")]
let sampling_priority = span.span_context
.trace_state()
.get(TRACE_STATE_PRIORITY_SAMPLING)
.map(|x| x == "1")
.unwrap_or(false);

rmp::encode::write_map_len(&mut encoded, METRICS_LEN)?;
rmp::encode::write_u32(&mut encoded, interner.intern(SAMPLING_PRIORITY_KEY))?;
rmp::encode::write_f64(
&mut encoded,
if span.span_context.is_sampled() {
if sampling_priority {
1.0
} else {
0.0
},
)?;
#[cfg(feature = "measure")]
{
let is_measure = span.span_context
.trace_state()
.get(TRACE_STATE_MEASURE)
.map(|x| if x == "1" { 1.0 } else { 0.0 })
.unwrap_or(0.0);
rmp::encode::write_u32(&mut encoded, interner.intern(DD_MEASURED_KEY))?;
rmp::encode::write_f64(&mut encoded, is_measure)?;
}
rmp::encode::write_u32(&mut encoded, span_type)?;
}
}
Expand Down
24 changes: 24 additions & 0 deletions opentelemetry-datadog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ pub use exporter::{
ModelConfig,
};
pub use propagator::DatadogPropagator;
#[cfg(feature = "agent-sampling")]
pub use propagator::TRACE_STATE_PRIORITY_SAMPLING;
#[cfg(feature = "measure")]
pub use propagator::TRACE_STATE_MEASURE;

mod propagator {
use once_cell::sync::Lazy;
Expand All @@ -155,6 +159,10 @@ mod propagator {
const DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority";

const TRACE_FLAG_DEFERRED: TraceFlags = TraceFlags::new(0x02);
#[cfg(feature = "agent-sampling")]
pub const TRACE_STATE_PRIORITY_SAMPLING: &str = "psr";
#[cfg(feature = "measure")]
pub const TRACE_STATE_MEASURE: &str = "m";

static DATADOG_HEADER_FIELDS: Lazy<[String; 3]> = Lazy::new(|| {
[
Expand Down Expand Up @@ -262,7 +270,12 @@ mod propagator {
Err(_) => TRACE_FLAG_DEFERRED,
};

#[cfg(not(feature = "agent-sampling"))]
let trace_state = TraceState::default();
#[cfg(feature = "agent-sampling")]
let trace_state = TraceState::from_key_value(
[(TRACE_STATE_PRIORITY_SAMPLING, if sampled.is_sampled() { "1" } else { "0" })]
).unwrap_or_default();

Ok(SpanContext::new(
trace_id,
Expand All @@ -289,11 +302,22 @@ mod propagator {
);

if span_context.trace_flags() & TRACE_FLAG_DEFERRED != TRACE_FLAG_DEFERRED {
#[cfg(not(feature = "agent-sampling"))]
let sampling_priority = if span_context.is_sampled() {
SamplingPriority::AutoKeep
} else {
SamplingPriority::AutoReject
};
#[cfg(feature = "agent-sampling")]
let sampling_priority =
if span_context
.trace_state()
.get(TRACE_STATE_PRIORITY_SAMPLING)
.unwrap_or("0") == "1" {
SamplingPriority::AutoKeep
} else {
SamplingPriority::AutoReject
};

injector.set(
DATADOG_SAMPLING_PRIORITY_HEADER,
Expand Down

0 comments on commit a5afbb8

Please sign in to comment.