Skip to content

Commit

Permalink
Make runtime id configurable in the trace exporter.
Browse files Browse the repository at this point in the history
  • Loading branch information
hoolioh committed Feb 6, 2025
1 parent b8cdf9e commit 13bcec4
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 26 deletions.
58 changes: 53 additions & 5 deletions data-pipeline/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct TelemetryClientBuilder {
language_version: Option<String>,
tracer_version: Option<String>,
config: ddtelemetry::config::Config,
runtime_id: Option<String>,
}

impl TelemetryClientBuilder {
Expand Down Expand Up @@ -68,17 +69,29 @@ impl TelemetryClientBuilder {
self
}

/// Sets runtime id for the telemetry client.
pub fn set_runtime_id(mut self, id: &str) -> Self {
self.runtime_id = Some(id.to_string());
self
}

/// Builds the telemetry client.
pub async fn build(self) -> Result<TelemetryClient, TelemetryError> {
let (worker, handle) = TelemetryWorkerBuilder::new_fetch_host(
let mut builder = TelemetryWorkerBuilder::new_fetch_host(
self.service_name.unwrap(),
self.language.unwrap(),
self.language_version.unwrap(),
self.tracer_version.unwrap(),
)
.spawn_with_config(self.config)
.await
.map_err(|e| TelemetryError::Builder(e.to_string()))?;
);

if let Some(id) = self.runtime_id {
builder.runtime_id = Some(id);
}

let (worker, handle) = builder
.spawn_with_config(self.config)
.await
.map_err(|e| TelemetryError::Builder(e.to_string()))?;

Ok(TelemetryClient {
handle,
Expand Down Expand Up @@ -534,4 +547,39 @@ mod tests {
let _ = client.handle.await;
telemetry_srv.assert_hits_async(1).await;
}

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn runtime_id_test() {
let server = MockServer::start_async().await;

let telemetry_srv = server
.mock_async(|when, then| {
when.method(POST).body_contains(r#""runtime_id":"foo""#);
then.status(200).body("");
})
.await;

let result = TelemetryClientBuilder::default()
.set_service_name("test_service")
.set_language("test_language")
.set_language_version("test_language_version")
.set_tracer_version("test_tracer_version")
.set_url(&server.url("/"))
.set_hearbeat(100)
.set_runtime_id("foo")
.build()
.await;

assert!(result.is_ok());

let client = result.unwrap();

client.start().await;
client.shutdown().await;
let _ = client.handle.await;

// Check for 2 hits: app-started and app-closing.
telemetry_srv.assert_hits_async(2).await;
}
}
50 changes: 29 additions & 21 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,9 @@ impl TraceExporter {
const DEFAULT_AGENT_URL: &str = "http://127.0.0.1:8126";

#[derive(Default)]
struct TelemetryConfig {
heartbeat: u64,
pub struct TelemetryConfig {
pub heartbeat: u64,
pub runtime_id: Option<String>,
}

#[allow(missing_docs)]
Expand Down Expand Up @@ -869,12 +870,12 @@ impl TraceExporterBuilder {
}

/// Enables sending telemetry metrics.
pub fn enable_telemetry(mut self, heartbeat_ms: Option<u64>) -> Self {
let mut config = TelemetryConfig::default();
if let Some(interval) = heartbeat_ms {
config.heartbeat = interval;
pub fn enable_telemetry(mut self, cfg: Option<TelemetryConfig>) -> Self {
if let Some(cfg) = cfg {
self.telemetry = Some(cfg);
} else {
self.telemetry = Some(TelemetryConfig::default());
}
self.telemetry = Some(config);
self
}

Expand Down Expand Up @@ -915,18 +916,19 @@ impl TraceExporterBuilder {
}

let telemetry = if let Some(telemetry_config) = self.telemetry {
Some(
runtime.block_on(
TelemetryClientBuilder::default()
.set_language(&self.language)
.set_language_version(&self.language_version)
.set_service_name(&self.service)
.set_tracer_version(&self.tracer_version)
.set_hearbeat(telemetry_config.heartbeat)
.set_url(base_url)
.build(),
)?,
)
Some(runtime.block_on(async {
let mut builder = TelemetryClientBuilder::default()
.set_language(&self.language)
.set_language_version(&self.language_version)
.set_service_name(&self.service)
.set_tracer_version(&self.tracer_version)
.set_hearbeat(telemetry_config.heartbeat)
.set_url(base_url);
if let Some(id) = telemetry_config.runtime_id {
builder = builder.set_runtime_id(&id);
}
builder.build().await
})?)
} else {
None
};
Expand Down Expand Up @@ -1003,7 +1005,10 @@ mod tests {
.set_input_format(TraceExporterInputFormat::Proxy)
.set_output_format(TraceExporterOutputFormat::V07)
.set_client_computed_stats()
.enable_telemetry(Some(1000))
.enable_telemetry(Some(TelemetryConfig {
heartbeat: 1000,
runtime_id: None,
}))
.build()
.unwrap();

Expand Down Expand Up @@ -1749,7 +1754,10 @@ mod tests {
.set_language("nodejs")
.set_language_version("1.0")
.set_language_interpreter("v8")
.enable_telemetry(Some(100))
.enable_telemetry(Some(TelemetryConfig {
heartbeat: 100,
..Default::default()
}))
.build()
.unwrap();

Expand Down

0 comments on commit 13bcec4

Please sign in to comment.