From 1896ee6a0649f3c70c410c0a59182713226386be Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Tue, 22 Sep 2020 21:58:27 -0400 Subject: [PATCH 1/4] [env] Add environment configuration for BatchSpanProcessor. --- src/sdk/trace/span_processor.rs | 104 ++++++++++++++++++++++++++------ 1 file changed, 87 insertions(+), 17 deletions(-) diff --git a/src/sdk/trace/span_processor.rs b/src/sdk/trace/span_processor.rs index 6202e3868e..4922f5f430 100644 --- a/src/sdk/trace/span_processor.rs +++ b/src/sdk/trace/span_processor.rs @@ -97,9 +97,23 @@ use futures::{ Future, Stream, StreamExt, }; use std::pin::Pin; +use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time; +/// Delay interval between two consecutive exports, default to be 5000. +const OTEL_BSP_SCHEDULE_DELAY_MILLIS: &str = "OTEL_BSP_SCHEDULE_DELAY_MILLIS"; +/// Default delay interval between two consecutive exports. +const OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT: u64 = 5000; +/// Maximum queue size, default to be 2048 +const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE"; +/// Default maximum queue size +const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2048; +/// Maximum batch size, must be less than or equal to OTEL_BSP_MAX_QUEUE_SIZE, default to be 512 +const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"; +/// Default maximum batch size +const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; + /// A [`SpanProcessor`] that exports synchronously when spans are finished. /// /// [`SpanProcessor`]: ../../../api/trace/span_processor/trait.SpanProcessor.html @@ -165,7 +179,7 @@ impl api::SpanProcessor for BatchSpanProcessor { #[allow(missing_debug_implementations)] pub struct BatchSpanProcessorWorker { exporter: Box, - messages: Pin + Send>>, + messages: Pin + Send>>, config: BatchConfig, buffer: Vec>, } @@ -229,10 +243,10 @@ impl BatchSpanProcessor { interval: I, config: BatchConfig, ) -> Self - where - S: Fn(BatchSpanProcessorWorker) -> SO, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, + where + S: Fn(BatchSpanProcessorWorker) -> SO, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, { let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size); let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Tick); @@ -257,10 +271,10 @@ impl BatchSpanProcessor { spawn: S, interval: I, ) -> BatchSpanProcessorBuilder - where - E: exporter::trace::SpanExporter, - S: Fn(BatchSpanProcessorWorker) -> SO, - I: Fn(time::Duration) -> IO, + where + E: exporter::trace::SpanExporter, + S: Fn(BatchSpanProcessorWorker) -> SO, + I: Fn(time::Duration) -> IO, { BatchSpanProcessorBuilder { exporter, @@ -292,9 +306,9 @@ pub struct BatchConfig { impl Default for BatchConfig { fn default() -> Self { BatchConfig { - max_queue_size: 2048, - scheduled_delay: time::Duration::from_secs(5), - max_export_batch_size: 512, + max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, + scheduled_delay: time::Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT), + max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, } } } @@ -311,12 +325,48 @@ pub struct BatchSpanProcessorBuilder { } impl BatchSpanProcessorBuilder -where - E: exporter::trace::SpanExporter + 'static, - S: Fn(BatchSpanProcessorWorker) -> SO, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, + where + E: exporter::trace::SpanExporter + 'static, + S: Fn(BatchSpanProcessorWorker) -> SO, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, { + /// Detect configurations from environment variable + pub fn with_env(self) -> Self { + let mut config = self.config; + let schedule_delay = u64::from_str( + std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS) + .unwrap_or_else(|_| "".to_string()) + .as_str(), + ) + .unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT); + config.scheduled_delay = time::Duration::from_millis(schedule_delay); + + let max_queue_size = usize::from_str( + std::env::var(OTEL_BSP_MAX_QUEUE_SIZE) + .unwrap_or_else(|_| "".to_string()) + .as_str(), + ) + .unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT); + config.max_queue_size = max_queue_size; + + let max_export_batch_size = usize::from_str( + std::env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE) + .unwrap_or_else(|_| "".to_string()) + .as_str(), + ) + .unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT); + // max export batch size must be less or equal to max queue size. + // we set max export batch size to max queue size if it's larger than max queue size. + if max_export_batch_size > max_queue_size { + config.max_export_batch_size = max_queue_size; + } else { + config.max_export_batch_size = max_export_batch_size; + } + + BatchSpanProcessorBuilder { config, ..self } + } + /// Set max queue size for batches pub fn with_max_queue_size(self, size: usize) -> Self { let mut config = self.config; @@ -351,3 +401,23 @@ where ) } } + +#[cfg(test)] +mod tests { + use crate::sdk::trace::span_processor::{OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_SCHEDULE_DELAY_MILLIS}; + use crate::sdk::BatchSpanProcessor; + use crate::exporter::trace::stdout; + use tokio; + use std::time; + + #[test] + fn test_build_batch_span_processor_from_env() { + std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "5000"); + std::env::set_var(OTEL_BSP_SCHEDULE_DELAY_MILLIS, "120"); + + let mut builder = BatchSpanProcessor::builder(stdout::Builder::default().init(), tokio::spawn, tokio::time::interval); + builder = builder.with_env(); + assert_eq!(builder.config.max_export_batch_size, 2048); + assert_eq!(builder.config.scheduled_delay, time::Duration::from_millis(120)); + } +} From df45f923147d59d147e28a4c572bdc8fc4714d89 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Wed, 23 Sep 2020 20:30:47 -0400 Subject: [PATCH 2/4] [env] Move from_env method from BatchSpanProcessorBuilder to BatchSpanProcessor. --- src/sdk/trace/span_processor.rs | 127 +++++++++++++++++++------------- 1 file changed, 74 insertions(+), 53 deletions(-) diff --git a/src/sdk/trace/span_processor.rs b/src/sdk/trace/span_processor.rs index 4922f5f430..1ac030754c 100644 --- a/src/sdk/trace/span_processor.rs +++ b/src/sdk/trace/span_processor.rs @@ -179,7 +179,7 @@ impl api::SpanProcessor for BatchSpanProcessor { #[allow(missing_debug_implementations)] pub struct BatchSpanProcessorWorker { exporter: Box, - messages: Pin + Send>>, + messages: Pin + Send>>, config: BatchConfig, buffer: Vec>, } @@ -243,10 +243,10 @@ impl BatchSpanProcessor { interval: I, config: BatchConfig, ) -> Self - where - S: Fn(BatchSpanProcessorWorker) -> SO, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, + where + S: Fn(BatchSpanProcessorWorker) -> SO, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, { let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size); let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Tick); @@ -270,17 +270,67 @@ impl BatchSpanProcessor { exporter: E, spawn: S, interval: I, + ) -> BatchSpanProcessorBuilder + where + E: exporter::trace::SpanExporter, + S: Fn(BatchSpanProcessorWorker) -> SO, + I: Fn(time::Duration) -> IO, + { + BatchSpanProcessorBuilder { + exporter, + spawn, + interval, + config: Default::default(), + } + } + + /// Create a new batch processor builder and set the config value based on environment variables. + pub fn from_env( + exporter: E, + spawn: S, + interval: I, ) -> BatchSpanProcessorBuilder where E: exporter::trace::SpanExporter, S: Fn(BatchSpanProcessorWorker) -> SO, I: Fn(time::Duration) -> IO, { + let mut config = BatchConfig::default(); + let schedule_delay = u64::from_str( + std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS) + .unwrap_or_else(|_| "".to_string()) + .as_str(), + ) + .unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT); + config.scheduled_delay = time::Duration::from_millis(schedule_delay); + + let max_queue_size = usize::from_str( + std::env::var(OTEL_BSP_MAX_QUEUE_SIZE) + .unwrap_or_else(|_| "".to_string()) + .as_str(), + ) + .unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT); + config.max_queue_size = max_queue_size; + + let max_export_batch_size = usize::from_str( + std::env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE) + .unwrap_or_else(|_| "".to_string()) + .as_str(), + ) + .unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT); + // max export batch size must be less or equal to max queue size. + // we set max export batch size to max queue size if it's larger than max queue size. + if max_export_batch_size > max_queue_size { + config.max_export_batch_size = max_queue_size; + } else { + config.max_export_batch_size = max_export_batch_size; + } + BatchSpanProcessorBuilder { + config, exporter, spawn, interval, - config: Default::default(), } } } @@ -325,48 +375,12 @@ pub struct BatchSpanProcessorBuilder { } impl BatchSpanProcessorBuilder - where - E: exporter::trace::SpanExporter + 'static, - S: Fn(BatchSpanProcessorWorker) -> SO, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, +where + E: exporter::trace::SpanExporter + 'static, + S: Fn(BatchSpanProcessorWorker) -> SO, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, { - /// Detect configurations from environment variable - pub fn with_env(self) -> Self { - let mut config = self.config; - let schedule_delay = u64::from_str( - std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS) - .unwrap_or_else(|_| "".to_string()) - .as_str(), - ) - .unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT); - config.scheduled_delay = time::Duration::from_millis(schedule_delay); - - let max_queue_size = usize::from_str( - std::env::var(OTEL_BSP_MAX_QUEUE_SIZE) - .unwrap_or_else(|_| "".to_string()) - .as_str(), - ) - .unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT); - config.max_queue_size = max_queue_size; - - let max_export_batch_size = usize::from_str( - std::env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE) - .unwrap_or_else(|_| "".to_string()) - .as_str(), - ) - .unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT); - // max export batch size must be less or equal to max queue size. - // we set max export batch size to max queue size if it's larger than max queue size. - if max_export_batch_size > max_queue_size { - config.max_export_batch_size = max_queue_size; - } else { - config.max_export_batch_size = max_export_batch_size; - } - - BatchSpanProcessorBuilder { config, ..self } - } - /// Set max queue size for batches pub fn with_max_queue_size(self, size: usize) -> Self { let mut config = self.config; @@ -404,20 +418,27 @@ impl BatchSpanProcessorBuilder #[cfg(test)] mod tests { - use crate::sdk::trace::span_processor::{OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_SCHEDULE_DELAY_MILLIS}; - use crate::sdk::BatchSpanProcessor; use crate::exporter::trace::stdout; - use tokio; + use crate::sdk::trace::span_processor::{ + OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_SCHEDULE_DELAY_MILLIS, + }; use std::time; + use crate::sdk::BatchSpanProcessor; #[test] fn test_build_batch_span_processor_from_env() { std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "5000"); std::env::set_var(OTEL_BSP_SCHEDULE_DELAY_MILLIS, "120"); - let mut builder = BatchSpanProcessor::builder(stdout::Builder::default().init(), tokio::spawn, tokio::time::interval); - builder = builder.with_env(); + let builder = BatchSpanProcessor::from_env( + stdout::Builder::default().init(), + tokio::spawn, + tokio::time::interval, + ); assert_eq!(builder.config.max_export_batch_size, 2048); - assert_eq!(builder.config.scheduled_delay, time::Duration::from_millis(120)); + assert_eq!( + builder.config.scheduled_delay, + time::Duration::from_millis(120) + ); } } From 866047b664a20a848dcbb00373ff0f2f926259f7 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Thu, 24 Sep 2020 20:43:24 -0400 Subject: [PATCH 3/4] [trace] Address comments. Make sure max export batch size will be smaller than max queue size. * Add validation in `with_max_export_batch_size`, when input is larger than max queue size. Will lower it to be equal to max queue size. * Address comments. --- src/sdk/trace/span_processor.rs | 91 ++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 40 deletions(-) diff --git a/src/sdk/trace/span_processor.rs b/src/sdk/trace/span_processor.rs index 1ac030754c..a437d2cf03 100644 --- a/src/sdk/trace/span_processor.rs +++ b/src/sdk/trace/span_processor.rs @@ -179,7 +179,7 @@ impl api::SpanProcessor for BatchSpanProcessor { #[allow(missing_debug_implementations)] pub struct BatchSpanProcessorWorker { exporter: Box, - messages: Pin + Send>>, + messages: Pin + Send>>, config: BatchConfig, buffer: Vec>, } @@ -243,10 +243,10 @@ impl BatchSpanProcessor { interval: I, config: BatchConfig, ) -> Self - where - S: Fn(BatchSpanProcessorWorker) -> SO, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, + where + S: Fn(BatchSpanProcessorWorker) -> SO, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, { let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size); let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Tick); @@ -271,10 +271,10 @@ impl BatchSpanProcessor { spawn: S, interval: I, ) -> BatchSpanProcessorBuilder - where - E: exporter::trace::SpanExporter, - S: Fn(BatchSpanProcessorWorker) -> SO, - I: Fn(time::Duration) -> IO, + where + E: exporter::trace::SpanExporter, + S: Fn(BatchSpanProcessorWorker) -> SO, + I: Fn(time::Duration) -> IO, { BatchSpanProcessorBuilder { exporter, @@ -285,6 +285,12 @@ impl BatchSpanProcessor { } /// Create a new batch processor builder and set the config value based on environment variables. + /// + /// If the value in environment variables is illegal, will fall back to use default value. + /// + /// Note that export batch size should be less than or equals to max queue size. + /// If export batch size is larger than max queue size, we will lower to be the same as max + /// queue size pub fn from_env( exporter: E, spawn: S, @@ -296,28 +302,23 @@ impl BatchSpanProcessor { I: Fn(time::Duration) -> IO, { let mut config = BatchConfig::default(); - let schedule_delay = u64::from_str( - std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS) - .unwrap_or_else(|_| "".to_string()) - .as_str(), - ) - .unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT); + let schedule_delay = std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS) + .and_then(|delay| u64::from_str(&delay).or(Ok(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT))) + .unwrap(); config.scheduled_delay = time::Duration::from_millis(schedule_delay); - let max_queue_size = usize::from_str( - std::env::var(OTEL_BSP_MAX_QUEUE_SIZE) - .unwrap_or_else(|_| "".to_string()) - .as_str(), - ) - .unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT); + let max_queue_size = std::env::var(OTEL_BSP_MAX_QUEUE_SIZE) + .and_then(|queue_size| { + usize::from_str(&queue_size).or(Ok(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT)) + }) + .unwrap(); config.max_queue_size = max_queue_size; - let max_export_batch_size = usize::from_str( - std::env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE) - .unwrap_or_else(|_| "".to_string()) - .as_str(), - ) - .unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT); + let max_export_batch_size = std::env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE) + .and_then(|batch_size| { + usize::from_str(&batch_size).or(Ok(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT)) + }) + .unwrap(); // max export batch size must be less or equal to max queue size. // we set max export batch size to max queue size if it's larger than max queue size. if max_export_batch_size > max_queue_size { @@ -375,11 +376,11 @@ pub struct BatchSpanProcessorBuilder { } impl BatchSpanProcessorBuilder -where - E: exporter::trace::SpanExporter + 'static, - S: Fn(BatchSpanProcessorWorker) -> SO, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, + where + E: exporter::trace::SpanExporter + 'static, + S: Fn(BatchSpanProcessorWorker) -> SO, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, { /// Set max queue size for batches pub fn with_max_queue_size(self, size: usize) -> Self { @@ -397,10 +398,16 @@ where BatchSpanProcessorBuilder { config, ..self } } - /// Set max export size for batches + /// Set max export size for batches, should always less than or equals to max queue size. + /// + /// If input is larger than max queue size, will lower it to be equal to max queue size pub fn with_max_export_batch_size(self, size: usize) -> Self { let mut config = self.config; - config.max_export_batch_size = size; + if size > config.max_queue_size{ + config.max_export_batch_size = config.max_queue_size; + } else { + config.max_export_batch_size = size; + } BatchSpanProcessorBuilder { config, ..self } } @@ -419,26 +426,30 @@ where #[cfg(test)] mod tests { use crate::exporter::trace::stdout; - use crate::sdk::trace::span_processor::{ - OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_SCHEDULE_DELAY_MILLIS, - }; - use std::time; + use crate::sdk::trace::span_processor::{OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS}; use crate::sdk::BatchSpanProcessor; + use std::time; #[test] fn test_build_batch_span_processor_from_env() { std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "5000"); std::env::set_var(OTEL_BSP_SCHEDULE_DELAY_MILLIS, "120"); + std::env::set_var(OTEL_BSP_MAX_QUEUE_SIZE, "I am not number"); let builder = BatchSpanProcessor::from_env( - stdout::Builder::default().init(), + stdout::Exporter::new(std::io::stdout(), true), tokio::spawn, tokio::time::interval, ); - assert_eq!(builder.config.max_export_batch_size, 2048); + // export batch size cannot exceed max queue size + assert_eq!(builder.config.max_export_batch_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT); assert_eq!( builder.config.scheduled_delay, time::Duration::from_millis(120) ); + assert_eq!( + builder.config.max_queue_size, + OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT + ); } } From 9b83e31d308688fa600e6f8bc09d2b99508662a8 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Fri, 25 Sep 2020 19:37:54 -0400 Subject: [PATCH 4/4] [span processor] Fix problem that panic when no env vars were set. --- src/sdk/trace/span_processor.rs | 81 +++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/src/sdk/trace/span_processor.rs b/src/sdk/trace/span_processor.rs index a437d2cf03..6a5bf9d58e 100644 --- a/src/sdk/trace/span_processor.rs +++ b/src/sdk/trace/span_processor.rs @@ -179,7 +179,7 @@ impl api::SpanProcessor for BatchSpanProcessor { #[allow(missing_debug_implementations)] pub struct BatchSpanProcessorWorker { exporter: Box, - messages: Pin + Send>>, + messages: Pin + Send>>, config: BatchConfig, buffer: Vec>, } @@ -243,10 +243,10 @@ impl BatchSpanProcessor { interval: I, config: BatchConfig, ) -> Self - where - S: Fn(BatchSpanProcessorWorker) -> SO, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, + where + S: Fn(BatchSpanProcessorWorker) -> SO, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, { let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size); let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Tick); @@ -271,10 +271,10 @@ impl BatchSpanProcessor { spawn: S, interval: I, ) -> BatchSpanProcessorBuilder - where - E: exporter::trace::SpanExporter, - S: Fn(BatchSpanProcessorWorker) -> SO, - I: Fn(time::Duration) -> IO, + where + E: exporter::trace::SpanExporter, + S: Fn(BatchSpanProcessorWorker) -> SO, + I: Fn(time::Duration) -> IO, { BatchSpanProcessorBuilder { exporter, @@ -296,29 +296,29 @@ impl BatchSpanProcessor { spawn: S, interval: I, ) -> BatchSpanProcessorBuilder - where - E: exporter::trace::SpanExporter, - S: Fn(BatchSpanProcessorWorker) -> SO, - I: Fn(time::Duration) -> IO, + where + E: exporter::trace::SpanExporter, + S: Fn(BatchSpanProcessorWorker) -> SO, + I: Fn(time::Duration) -> IO, { let mut config = BatchConfig::default(); let schedule_delay = std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS) - .and_then(|delay| u64::from_str(&delay).or(Ok(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT))) - .unwrap(); + .map(|delay| u64::from_str(&delay).unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT)) + .unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT); config.scheduled_delay = time::Duration::from_millis(schedule_delay); let max_queue_size = std::env::var(OTEL_BSP_MAX_QUEUE_SIZE) - .and_then(|queue_size| { - usize::from_str(&queue_size).or(Ok(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT)) + .map(|queue_size| { + usize::from_str(&queue_size).unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT) }) - .unwrap(); + .unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT); config.max_queue_size = max_queue_size; let max_export_batch_size = std::env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE) - .and_then(|batch_size| { - usize::from_str(&batch_size).or(Ok(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT)) + .map(|batch_size| { + usize::from_str(&batch_size).unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT) }) - .unwrap(); + .unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT); // max export batch size must be less or equal to max queue size. // we set max export batch size to max queue size if it's larger than max queue size. if max_export_batch_size > max_queue_size { @@ -376,11 +376,11 @@ pub struct BatchSpanProcessorBuilder { } impl BatchSpanProcessorBuilder - where - E: exporter::trace::SpanExporter + 'static, - S: Fn(BatchSpanProcessorWorker) -> SO, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, +where + E: exporter::trace::SpanExporter + 'static, + S: Fn(BatchSpanProcessorWorker) -> SO, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, { /// Set max queue size for batches pub fn with_max_queue_size(self, size: usize) -> Self { @@ -403,7 +403,7 @@ impl BatchSpanProcessorBuilder /// If input is larger than max queue size, will lower it to be equal to max queue size pub fn with_max_export_batch_size(self, size: usize) -> Self { let mut config = self.config; - if size > config.max_queue_size{ + if size > config.max_queue_size { config.max_export_batch_size = config.max_queue_size; } else { config.max_export_batch_size = size; @@ -426,30 +426,43 @@ impl BatchSpanProcessorBuilder #[cfg(test)] mod tests { use crate::exporter::trace::stdout; - use crate::sdk::trace::span_processor::{OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS}; + use crate::sdk::trace::span_processor::{ + OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, + OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS, + OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT, + }; use crate::sdk::BatchSpanProcessor; use std::time; #[test] fn test_build_batch_span_processor_from_env() { - std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "5000"); - std::env::set_var(OTEL_BSP_SCHEDULE_DELAY_MILLIS, "120"); - std::env::set_var(OTEL_BSP_MAX_QUEUE_SIZE, "I am not number"); + std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500"); + std::env::set_var(OTEL_BSP_SCHEDULE_DELAY_MILLIS, "I am not number"); - let builder = BatchSpanProcessor::from_env( + let mut builder = BatchSpanProcessor::from_env( stdout::Exporter::new(std::io::stdout(), true), tokio::spawn, tokio::time::interval, ); // export batch size cannot exceed max queue size - assert_eq!(builder.config.max_export_batch_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT); + assert_eq!(builder.config.max_export_batch_size, 500); assert_eq!( builder.config.scheduled_delay, - time::Duration::from_millis(120) + time::Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT) ); assert_eq!( builder.config.max_queue_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT ); + + std::env::set_var(OTEL_BSP_MAX_QUEUE_SIZE, "120"); + builder = BatchSpanProcessor::from_env( + stdout::Exporter::new(std::io::stdout(), true), + tokio::spawn, + tokio::time::interval, + ); + + assert_eq!(builder.config.max_export_batch_size, 120); + assert_eq!(builder.config.max_queue_size, 120); } }