Skip to content

Commit

Permalink
[span processor] Fix problem that panic when no env vars were set.
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyCpp committed Sep 25, 2020
1 parent 866047b commit 0785d01
Showing 1 changed file with 57 additions and 29 deletions.
86 changes: 57 additions & 29 deletions src/sdk/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl api::SpanProcessor for BatchSpanProcessor {
#[allow(missing_debug_implementations)]
pub struct BatchSpanProcessorWorker {
exporter: Box<dyn exporter::trace::SpanExporter>,
messages: Pin<Box<dyn Stream<Item=BatchMessage> + Send>>,
messages: Pin<Box<dyn Stream<Item = BatchMessage> + Send>>,
config: BatchConfig,
buffer: Vec<Arc<exporter::trace::SpanData>>,
}
Expand Down Expand Up @@ -243,10 +243,10 @@ impl BatchSpanProcessor {
interval: I,
config: BatchConfig,
) -> Self
where
S: Fn(BatchSpanProcessorWorker) -> SO,
I: Fn(time::Duration) -> IS,
IS: Stream<Item=ISI> + Send + 'static,
where
S: Fn(BatchSpanProcessorWorker) -> SO,
I: Fn(time::Duration) -> IS,
IS: Stream<Item = ISI> + Send + 'static,
{
let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size);
let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Tick);
Expand All @@ -271,10 +271,10 @@ impl BatchSpanProcessor {
spawn: S,
interval: I,
) -> BatchSpanProcessorBuilder<E, S, I>
where
E: exporter::trace::SpanExporter,
S: Fn(BatchSpanProcessorWorker) -> SO,
I: Fn(time::Duration) -> IO,
where
E: exporter::trace::SpanExporter,
S: Fn(BatchSpanProcessorWorker) -> SO,
I: Fn(time::Duration) -> IO,
{
BatchSpanProcessorBuilder {
exporter,
Expand All @@ -296,29 +296,29 @@ impl BatchSpanProcessor {
spawn: S,
interval: I,
) -> BatchSpanProcessorBuilder<E, S, I>
where
E: exporter::trace::SpanExporter,
S: Fn(BatchSpanProcessorWorker) -> SO,
I: Fn(time::Duration) -> IO,
where
E: exporter::trace::SpanExporter,
S: Fn(BatchSpanProcessorWorker) -> SO,
I: Fn(time::Duration) -> IO,
{
let mut config = BatchConfig::default();
let schedule_delay = std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS)
.and_then(|delay| u64::from_str(&delay).or(Ok(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT)))
.unwrap();
.map(|delay| u64::from_str(&delay).unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT))
.unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT);
config.scheduled_delay = time::Duration::from_millis(schedule_delay);

let max_queue_size = std::env::var(OTEL_BSP_MAX_QUEUE_SIZE)
.and_then(|queue_size| {
usize::from_str(&queue_size).or(Ok(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT))
.map(|queue_size| {
usize::from_str(&queue_size).unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT)
})
.unwrap();
.unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT);
config.max_queue_size = max_queue_size;

let max_export_batch_size = std::env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
.and_then(|batch_size| {
usize::from_str(&batch_size).or(Ok(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT))
.map(|batch_size| {
usize::from_str(&batch_size).unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT)
})
.unwrap();
.unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT);
// max export batch size must be less or equal to max queue size.
// we set max export batch size to max queue size if it's larger than max queue size.
if max_export_batch_size > max_queue_size {
Expand Down Expand Up @@ -376,11 +376,11 @@ pub struct BatchSpanProcessorBuilder<E, S, I> {
}

impl<E, S, SO, I, IS, ISI> BatchSpanProcessorBuilder<E, S, I>
where
E: exporter::trace::SpanExporter + 'static,
S: Fn(BatchSpanProcessorWorker) -> SO,
I: Fn(time::Duration) -> IS,
IS: Stream<Item=ISI> + Send + 'static,
where
E: exporter::trace::SpanExporter + 'static,
S: Fn(BatchSpanProcessorWorker) -> SO,
I: Fn(time::Duration) -> IS,
IS: Stream<Item = ISI> + Send + 'static,
{
/// Set max queue size for batches
pub fn with_max_queue_size(self, size: usize) -> Self {
Expand All @@ -403,7 +403,7 @@ impl<E, S, SO, I, IS, ISI> BatchSpanProcessorBuilder<E, S, I>
/// If input is larger than max queue size, will lower it to be equal to max queue size
pub fn with_max_export_batch_size(self, size: usize) -> Self {
let mut config = self.config;
if size > config.max_queue_size{
if size > config.max_queue_size {
config.max_export_batch_size = config.max_queue_size;
} else {
config.max_export_batch_size = size;
Expand All @@ -426,10 +426,35 @@ impl<E, S, SO, I, IS, ISI> BatchSpanProcessorBuilder<E, S, I>
#[cfg(test)]
mod tests {
use crate::exporter::trace::stdout;
use crate::sdk::trace::span_processor::{OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS};
use crate::sdk::trace::span_processor::{
OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS,
OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT,
};
use crate::sdk::BatchSpanProcessor;
use std::time;

#[test]
fn test_build_batch_span_processor_without_env() {
let builder = BatchSpanProcessor::from_env(
stdout::Exporter::new(std::io::stdout(), true),
tokio::spawn,
tokio::time::interval,
);
assert_eq!(
builder.config.max_export_batch_size,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT
);
assert_eq!(
builder.config.max_queue_size,
OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
);
assert_eq!(
builder.config.scheduled_delay,
std::time::Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT)
);
}

#[test]
fn test_build_batch_span_processor_from_env() {
std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "5000");
Expand All @@ -442,7 +467,10 @@ mod tests {
tokio::time::interval,
);
// export batch size cannot exceed max queue size
assert_eq!(builder.config.max_export_batch_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT);
assert_eq!(
builder.config.max_export_batch_size,
OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
);
assert_eq!(
builder.config.scheduled_delay,
time::Duration::from_millis(120)
Expand Down

0 comments on commit 0785d01

Please sign in to comment.