diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 53ee2f9bc0..87bf76b9c2 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -85,6 +85,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is /// already set). This method is called synchronously within the `Span::end` /// API, therefore it should not block or throw an exception. + /// TODO - This method should take reference to `SpanData` fn on_end(&self, span: SpanData); /// Force the spans lying in the cache to be exported. fn force_flush(&self) -> TraceResult<()>; @@ -163,6 +164,7 @@ impl SpanProcessor for SimpleSpanProcessor { } } +use crate::export::trace::ExportResult; /// The `BatchSpanProcessor` collects finished spans in a buffer and exports them /// in batches to the configured `SpanExporter`. This processor is ideal for /// high-throughput environments, as it minimizes the overhead of exporting spans @@ -217,8 +219,8 @@ impl SpanProcessor for SimpleSpanProcessor { /// provider.shutdown(); /// } /// ``` -use futures_executor::block_on; use std::sync::mpsc::sync_channel; +use std::sync::mpsc::Receiver; use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::SyncSender; @@ -226,7 +228,8 @@ use std::sync::mpsc::SyncSender; #[allow(clippy::large_enum_variant)] #[derive(Debug)] enum BatchMessage { - ExportSpan(SpanData), + //ExportSpan(SpanData), + ExportSpan(Arc), ForceFlush(SyncSender>), Shutdown(SyncSender>), SetResource(Arc), @@ -235,12 +238,17 @@ enum BatchMessage { /// A batch span processor with a dedicated background thread. #[derive(Debug)] pub struct BatchSpanProcessor { - message_sender: SyncSender, + span_sender: SyncSender, // Data channel to store spans + message_sender: SyncSender, // Control channel to store control messages. handle: Mutex>>, forceflush_timeout: Duration, shutdown_timeout: Duration, is_shutdown: AtomicBool, dropped_span_count: Arc, + export_span_message_sent: Arc, + current_batch_size: Arc, + max_export_batch_size: usize, + max_queue_size: usize, } impl BatchSpanProcessor { @@ -255,7 +263,12 @@ impl BatchSpanProcessor { where E: SpanExporter + Send + 'static, { - let (message_sender, message_receiver) = sync_channel(config.max_queue_size); + let (span_sender, span_receiver) = sync_channel::(config.max_queue_size); + let (message_sender, message_receiver) = sync_channel::(64); // Is this a reasonable bound? + let max_queue_size = config.max_queue_size; + let max_export_batch_size = config.max_export_batch_size; + let current_batch_size = Arc::new(AtomicUsize::new(0)); + let current_batch_size_for_thread = current_batch_size.clone(); let handle = thread::Builder::new() .name("OpenTelemetry.Traces.BatchProcessor".to_string()) @@ -268,7 +281,7 @@ impl BatchSpanProcessor { ); let mut spans = Vec::with_capacity(config.max_export_batch_size); let mut last_export_time = Instant::now(); - + let current_batch_size = current_batch_size_for_thread; loop { let remaining_time_option = config .scheduled_delay @@ -279,28 +292,52 @@ impl BatchSpanProcessor { }; match message_receiver.recv_timeout(remaining_time) { Ok(message) => match message { - BatchMessage::ExportSpan(span) => { - spans.push(span); - if spans.len() >= config.max_queue_size - || last_export_time.elapsed() >= config.scheduled_delay - { - if let Err(err) = block_on(exporter.export(spans.split_off(0))) - { - otel_error!( - name: "BatchSpanProcessor.ExportError", - error = format!("{}", err) - ); - } - last_export_time = Instant::now(); - } + BatchMessage::ExportSpan(export_span_message_sent) => { + // Reset the export span message sent flag now it has has been processed. + export_span_message_sent.store(false, Ordering::Relaxed); + otel_debug!( + name: "BatchSpanProcessor.ExportingDueToBatchSize", + ); + let _ = Self::get_spans_and_export( + &span_receiver, + &mut exporter, + &mut spans, + &mut last_export_time, + ¤t_batch_size, + &config, + ); } BatchMessage::ForceFlush(sender) => { - let result = block_on(exporter.export(spans.split_off(0))); + otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush"); + let result = Self::get_spans_and_export( + &span_receiver, + &mut exporter, + &mut spans, + &mut last_export_time, + ¤t_batch_size, + &config, + ); let _ = sender.send(result); } BatchMessage::Shutdown(sender) => { - let result = block_on(exporter.export(spans.split_off(0))); + otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown"); + let result = Self::get_spans_and_export( + &span_receiver, + &mut exporter, + &mut spans, + &mut last_export_time, + ¤t_batch_size, + &config, + ); let _ = sender.send(result); + + otel_debug!( + name: "BatchSpanProcessor.ThreadExiting", + reason = "ShutdownRequested" + ); + // + // break out the loop and return from the current background thread. + // break; } BatchMessage::SetResource(resource) => { @@ -308,15 +345,18 @@ impl BatchSpanProcessor { } }, Err(RecvTimeoutError::Timeout) => { - if last_export_time.elapsed() >= config.scheduled_delay { - if let Err(err) = block_on(exporter.export(spans.split_off(0))) { - otel_error!( - name: "BatchSpanProcessor.ExportError", - error = format!("{}", err) - ); - } - last_export_time = Instant::now(); - } + otel_debug!( + name: "BatchSpanProcessor.ExportingDueToTimer", + ); + + let _ = Self::get_spans_and_export( + &span_receiver, + &mut exporter, + &mut spans, + &mut last_export_time, + ¤t_batch_size, + &config, + ); } Err(RecvTimeoutError::Disconnected) => { // Channel disconnected, only thing to do is break @@ -336,12 +376,17 @@ impl BatchSpanProcessor { .expect("Failed to spawn thread"); //TODO: Handle thread spawn failure Self { + span_sender, message_sender, handle: Mutex::new(Some(handle)), forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable is_shutdown: AtomicBool::new(false), dropped_span_count: Arc::new(AtomicUsize::new(0)), + max_queue_size, + export_span_message_sent: Arc::new(AtomicBool::new(false)), + current_batch_size, + max_export_batch_size, } } @@ -355,6 +400,72 @@ impl BatchSpanProcessor { config: BatchConfig::default(), } } + + // This method gets upto `max_export_batch_size` amount of spans from the channel and exports them. + // It returns the result of the export operation. + // It expects the span vec to be empty when it's called. + #[inline] + fn get_spans_and_export( + spans_receiver: &Receiver, + exporter: &mut E, + spans: &mut Vec, + last_export_time: &mut Instant, + current_batch_size: &AtomicUsize, + config: &BatchConfig, + ) -> ExportResult + where + E: SpanExporter + Send + Sync + 'static, + { + // Get upto `max_export_batch_size` amount of spans from the channel and push them to the span vec + while let Ok(span) = spans_receiver.try_recv() { + spans.push(span); + if spans.len() == config.max_export_batch_size { + break; + } + } + + let count_of_spans = spans.len(); // Count of spans that will be exported + let result = Self::export_with_timeout_sync( + config.max_export_timeout, + exporter, + spans, + last_export_time, + ); // This method clears the spans vec after exporting + + current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed); + result + } + + #[allow(clippy::vec_box)] + fn export_with_timeout_sync( + _: Duration, // TODO, enforcing timeout in exporter. + exporter: &mut E, + batch: &mut Vec, + last_export_time: &mut Instant, + ) -> ExportResult + where + E: SpanExporter + Send + Sync + 'static, + { + *last_export_time = Instant::now(); + + if batch.is_empty() { + return TraceResult::Ok(()); + } + + let export = exporter.export(batch.split_off(0)); + let export_result = futures_executor::block_on(export); + + match export_result { + Ok(_) => TraceResult::Ok(()), + Err(err) => { + otel_error!( + name: "BatchSpanProcessor.ExportError", + error = format!("{}", err) + ); + TraceResult::Err(err) + } + } + } } impl SpanProcessor for BatchSpanProcessor { @@ -369,10 +480,11 @@ impl SpanProcessor for BatchSpanProcessor { // this is a warning, as the user is trying to emit after the processor has been shutdown otel_warn!( name: "BatchSpanProcessor.Emit.ProcessorShutdown", + message = "BatchSpanProcessor has been shutdown. No further spans will be emitted." ); return; } - let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); + let result = self.span_sender.try_send(span); if result.is_err() { // Increment dropped span count. The first time we have to drop a span, @@ -382,6 +494,36 @@ impl SpanProcessor for BatchSpanProcessor { message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped."); } } + // At this point, sending the span to the data channel was successful. + // Increment the current batch size and check if it has reached the max export batch size. + if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size + { + // Check if the a control message for exporting spans is already sent to the worker thread. + // If not, send a control message to export spans. + // `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message. + + if !self.export_span_message_sent.load(Ordering::Relaxed) { + // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line. + // Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false. + // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures. + // We could have used compare_exchange as well here, but it's more verbose than swap. + if !self.export_span_message_sent.swap(true, Ordering::Relaxed) { + match self.message_sender.try_send(BatchMessage::ExportSpan( + self.export_span_message_sent.clone(), + )) { + Ok(_) => { + // Control message sent successfully. + } + Err(_err) => { + // TODO: Log error + // If the control message could not be sent, reset the `export_span_message_sent` flag. + self.export_span_message_sent + .store(false, Ordering::Relaxed); + } + } + } + } + } } /// Flushes all pending spans. @@ -401,17 +543,20 @@ impl SpanProcessor for BatchSpanProcessor { /// Shuts down the processor. fn shutdown(&self) -> TraceResult<()> { + if self.is_shutdown.swap(true, Ordering::Relaxed) { + return Err(TraceError::Other("Processor already shutdown".into())); + } let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed); + let max_queue_size = self.max_queue_size; if dropped_spans > 0 { otel_warn!( - name: "BatchSpanProcessor.LogsDropped", + name: "BatchSpanProcessor.SpansDropped", dropped_span_count = dropped_spans, + max_queue_size = max_queue_size, message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals." ); } - if self.is_shutdown.swap(true, Ordering::Relaxed) { - return Err(TraceError::Other("Processor already shutdown".into())); - } + let (sender, receiver) = sync_channel(1); self.message_sender .try_send(BatchMessage::Shutdown(sender))