Skip to content

Commit

Permalink
Add config options from env for BatchSpanProcessor (#228)
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyCpp authored Sep 26, 2020
1 parent 5c64f66 commit 1689c73
Showing 1 changed file with 120 additions and 5 deletions.
125 changes: 120 additions & 5 deletions src/sdk/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,23 @@ use futures::{
Future, Stream, StreamExt,
};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time;

/// Delay interval between two consecutive exports, default to be 5000.
const OTEL_BSP_SCHEDULE_DELAY_MILLIS: &str = "OTEL_BSP_SCHEDULE_DELAY_MILLIS";
/// Default delay interval between two consecutive exports.
const OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT: u64 = 5000;
/// Maximum queue size, default to be 2048
const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
/// Default maximum queue size
const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2048;
/// Maximum batch size, must be less than or equal to OTEL_BSP_MAX_QUEUE_SIZE, default to be 512
const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
/// Default maximum batch size
const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;

/// A [`SpanProcessor`] that exports synchronously when spans are finished.
///
/// [`SpanProcessor`]: ../../../api/trace/span_processor/trait.SpanProcessor.html
Expand Down Expand Up @@ -269,6 +283,57 @@ impl BatchSpanProcessor {
config: Default::default(),
}
}

/// Create a new batch processor builder and set the config value based on environment variables.
///
/// If the value in environment variables is illegal, will fall back to use default value.
///
/// Note that export batch size should be less than or equals to max queue size.
/// If export batch size is larger than max queue size, we will lower to be the same as max
/// queue size
pub fn from_env<E, S, SO, I, IO>(
exporter: E,
spawn: S,
interval: I,
) -> BatchSpanProcessorBuilder<E, S, I>
where
E: exporter::trace::SpanExporter,
S: Fn(BatchSpanProcessorWorker) -> SO,
I: Fn(time::Duration) -> IO,
{
let mut config = BatchConfig::default();
let schedule_delay = std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS)
.map(|delay| u64::from_str(&delay).unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT))
.unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT);
config.scheduled_delay = time::Duration::from_millis(schedule_delay);

let max_queue_size = std::env::var(OTEL_BSP_MAX_QUEUE_SIZE)
.map(|queue_size| {
usize::from_str(&queue_size).unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT)
})
.unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT);
config.max_queue_size = max_queue_size;

let max_export_batch_size = std::env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
.map(|batch_size| {
usize::from_str(&batch_size).unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT)
})
.unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT);
// max export batch size must be less or equal to max queue size.
// we set max export batch size to max queue size if it's larger than max queue size.
if max_export_batch_size > max_queue_size {
config.max_export_batch_size = max_queue_size;
} else {
config.max_export_batch_size = max_export_batch_size;
}

BatchSpanProcessorBuilder {
config,
exporter,
spawn,
interval,
}
}
}

/// Batch span processor configuration
Expand All @@ -292,9 +357,9 @@ pub struct BatchConfig {
impl Default for BatchConfig {
fn default() -> Self {
BatchConfig {
max_queue_size: 2048,
scheduled_delay: time::Duration::from_secs(5),
max_export_batch_size: 512,
max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
scheduled_delay: time::Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT),
max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
}
}
}
Expand Down Expand Up @@ -333,10 +398,16 @@ where
BatchSpanProcessorBuilder { config, ..self }
}

/// Set max export size for batches
/// Set max export size for batches, should always less than or equals to max queue size.
///
/// If input is larger than max queue size, will lower it to be equal to max queue size
pub fn with_max_export_batch_size(self, size: usize) -> Self {
let mut config = self.config;
config.max_export_batch_size = size;
if size > config.max_queue_size {
config.max_export_batch_size = config.max_queue_size;
} else {
config.max_export_batch_size = size;
}

BatchSpanProcessorBuilder { config, ..self }
}
Expand All @@ -351,3 +422,47 @@ where
)
}
}

#[cfg(test)]
mod tests {
use crate::exporter::trace::stdout;
use crate::sdk::trace::span_processor::{
OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE,
OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS,
OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT,
};
use crate::sdk::BatchSpanProcessor;
use std::time;

#[test]
fn test_build_batch_span_processor_from_env() {
std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500");
std::env::set_var(OTEL_BSP_SCHEDULE_DELAY_MILLIS, "I am not number");

let mut builder = BatchSpanProcessor::from_env(
stdout::Exporter::new(std::io::stdout(), true),
tokio::spawn,
tokio::time::interval,
);
// export batch size cannot exceed max queue size
assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
builder.config.scheduled_delay,
time::Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT)
);
assert_eq!(
builder.config.max_queue_size,
OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
);

std::env::set_var(OTEL_BSP_MAX_QUEUE_SIZE, "120");
builder = BatchSpanProcessor::from_env(
stdout::Exporter::new(std::io::stdout(), true),
tokio::spawn,
tokio::time::interval,
);

assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
}
}

0 comments on commit 1689c73

Please sign in to comment.