Skip to content

Commit

Permalink
feature: add global labels support for dogstatsd exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
nappairam committed Feb 4, 2025
1 parent 56936bf commit ad09659
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 15 deletions.
13 changes: 13 additions & 0 deletions metrics-exporter-dogstatsd/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{fmt, net::SocketAddr, sync::Arc, time::Duration};

use metrics::Label;
use thiserror::Error;
use tracing::debug;

Expand Down Expand Up @@ -97,6 +98,7 @@ pub struct DogStatsDBuilder {
histogram_sampling: bool,
histogram_reservoir_size: usize,
histograms_as_distributions: bool,
global_labels: Vec<Label>,
}

impl DogStatsDBuilder {
Expand Down Expand Up @@ -229,6 +231,15 @@ impl DogStatsDBuilder {
self
}

/// Set Global labels for all metrics to this exporter
///
/// Global labels are applied to all metrics.
#[must_use]
pub fn with_global_labels(mut self, labels: Vec<Label>) -> Self {
self.global_labels = labels;
self
}

/// Sets whether or not to enable telemetry for the exporter.
///
/// When enabled, additional metrics will be sent to the configured remote server that provide insight into the
Expand Down Expand Up @@ -339,6 +350,7 @@ impl DogStatsDBuilder {
max_payload_len,
flush_interval,
write_timeout: self.write_timeout,
global_labels: Arc::new(self.global_labels),
};

if self.synchronous {
Expand Down Expand Up @@ -387,6 +399,7 @@ impl Default for DogStatsDBuilder {
histogram_sampling: false,
histogram_reservoir_size: DEFAULT_HISTOGRAM_RESERVOIR_SIZE,
histograms_as_distributions: true,
global_labels: Default::default()
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions metrics-exporter-dogstatsd/src/forwarder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#[cfg(target_os = "linux")]
use std::path::PathBuf;
use std::{
fmt,
net::{SocketAddr, ToSocketAddrs as _},
time::Duration,
fmt, net::{SocketAddr, ToSocketAddrs as _}, sync::Arc, time::Duration
};

use metrics::Label;

pub mod sync;

#[derive(Clone, Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -108,6 +108,7 @@ pub(crate) struct ForwarderConfiguration {
pub max_payload_len: usize,
pub flush_interval: Duration,
pub write_timeout: Duration,
pub global_labels: Arc<Vec<Label>>,
}

impl ForwarderConfiguration {
Expand Down
2 changes: 1 addition & 1 deletion metrics-exporter-dogstatsd/src/forwarder/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Forwarder {
pub fn run(mut self) {
let mut flush_state = FlushState::default();
let mut writer =
PayloadWriter::new(self.config.max_payload_len, self.config.is_length_prefixed());
PayloadWriter::new(self.config.max_payload_len, self.config.is_length_prefixed(), self.config.global_labels.clone());
let mut telemetry_update = TelemetryUpdate::default();

let mut next_flush = Instant::now() + self.config.flush_interval;
Expand Down
27 changes: 16 additions & 11 deletions metrics-exporter-dogstatsd/src/writer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use metrics::Key;
use std::{slice::Iter, sync::Arc};

use metrics::{Key, Label};

pub struct WriteResult {
payloads_written: u64,
Expand Down Expand Up @@ -45,11 +47,12 @@ pub(super) struct PayloadWriter {
trailer_buf: Vec<u8>,
offsets: Vec<usize>,
with_length_prefix: bool,
global_labels: Arc<Vec<Label>>,
}

impl PayloadWriter {
/// Creates a new `PayloadWriter` with the given maximum payload length.
pub fn new(max_payload_len: usize, with_length_prefix: bool) -> Self {
pub fn new(max_payload_len: usize, with_length_prefix: bool, global_labels: Arc<Vec<Label>>) -> Self {
// NOTE: This should also be handled in the builder, but we want to just double check here that we're getting a
// properly sanitized value.
assert!(
Expand All @@ -63,6 +66,7 @@ impl PayloadWriter {
trailer_buf: Vec::new(),
offsets: Vec::new(),
with_length_prefix,
global_labels,
};

writer.prepare_for_write();
Expand Down Expand Up @@ -122,7 +126,7 @@ impl PayloadWriter {
}

fn write_trailing(&mut self, key: &Key, timestamp: Option<u64>) {
write_metric_trailer(key, timestamp, &mut self.buf, None);
write_metric_trailer(key, timestamp, &mut self.buf, None, self.global_labels.iter());
}

/// Writes a counter payload.
Expand Down Expand Up @@ -210,7 +214,7 @@ impl PayloadWriter {
//
// We do this for efficiency reasons, but also to calculate the minimum payload length.
self.trailer_buf.clear();
write_metric_trailer(key, None, &mut self.trailer_buf, maybe_sample_rate);
write_metric_trailer(key, None, &mut self.trailer_buf, maybe_sample_rate, self.global_labels.iter());

// Calculate the minimum payload length, which is the key name, the metric trailer, and the metric type
// substring (`|<metric type>`). This is the minimum amount of space we need to write out the metric without
Expand Down Expand Up @@ -333,6 +337,7 @@ fn write_metric_trailer(
maybe_timestamp: Option<u64>,
buf: &mut Vec<u8>,
maybe_sample_rate: Option<f64>,
global_labels: Iter<Label>
) {
// Write the sample rate if it's not 1.0, as that is the implied default.
if let Some(sample_rate) = maybe_sample_rate {
Expand All @@ -346,7 +351,7 @@ fn write_metric_trailer(
// Write the metric tags first.
let tags = key.labels();
let mut wrote_tag = false;
for tag in tags {
for tag in global_labels.chain(tags) {
// If we haven't written a tag yet, write out the tags prefix first.
//
// Otherwise, write a tag separator.
Expand Down Expand Up @@ -466,7 +471,7 @@ mod tests {
];

for (key, value, ts, expected) in cases {
let mut writer = PayloadWriter::new(8192, false);
let mut writer = PayloadWriter::new(8192, false, Default::default());
let result = writer.write_counter(&key, value, ts);
assert_eq!(result.payloads_written(), 1);

Expand Down Expand Up @@ -496,7 +501,7 @@ mod tests {
];

for (key, value, ts, expected) in cases {
let mut writer = PayloadWriter::new(8192, false);
let mut writer = PayloadWriter::new(8192, false, Default::default());
let result = writer.write_gauge(&key, value, ts);
assert_eq!(result.payloads_written(), 1);

Expand Down Expand Up @@ -528,7 +533,7 @@ mod tests {
];

for (key, values, expected) in cases {
let mut writer = PayloadWriter::new(8192, false);
let mut writer = PayloadWriter::new(8192, false, Default::default());
let result = writer.write_histogram(&key, values.iter().copied(), None);
assert_eq!(result.payloads_written(), 1);

Expand Down Expand Up @@ -560,7 +565,7 @@ mod tests {
];

for (key, values, expected) in cases {
let mut writer = PayloadWriter::new(8192, false);
let mut writer = PayloadWriter::new(8192, false, Default::default());
let result = writer.write_distribution(&key, values.iter().copied(), None);
assert_eq!(result.payloads_written(), 1);

Expand Down Expand Up @@ -599,7 +604,7 @@ mod tests {
];

for (key, values, expected) in cases {
let mut writer = PayloadWriter::new(8192, true);
let mut writer = PayloadWriter::new(8192, true, Default::default());
let result = writer.write_distribution(&key, values.iter().copied(), None);
assert_eq!(result.payloads_written(), 1);

Expand All @@ -613,7 +618,7 @@ mod tests {
fn property_test_gauntlet(payload_limit in 0..16384usize, inputs in arb_vec(arb_metric(), 1..128)) {
// TODO: Parameterize reservoir size so we can exercise the sample rate stuff.[]

let mut writer = PayloadWriter::new(payload_limit, false);
let mut writer = PayloadWriter::new(payload_limit, false, Default::default());
let mut total_input_points: u64 = 0;
let mut payloads_written = 0;
let mut points_dropped = 0;
Expand Down

0 comments on commit ad09659

Please sign in to comment.