Skip to content

Commit

Permalink
BatchSpanProcessor optimizations - Separate control signal queue, and…
Browse files Browse the repository at this point in the history
… wake up background thread only when required. (#2526)

Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
  • Loading branch information
lalitb and cijothomas authored Jan 21, 2025
1 parent 78db32c commit d2a6b3b
Showing 1 changed file with 180 additions and 35 deletions.
215 changes: 180 additions & 35 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
/// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
/// already set). This method is called synchronously within the `Span::end`
/// API, therefore it should not block or throw an exception.
/// TODO - This method should take reference to `SpanData`
fn on_end(&self, span: SpanData);
/// Force the spans lying in the cache to be exported.
fn force_flush(&self) -> TraceResult<()>;
Expand Down Expand Up @@ -163,6 +164,7 @@ impl SpanProcessor for SimpleSpanProcessor {
}
}

use crate::export::trace::ExportResult;
/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
/// in batches to the configured `SpanExporter`. This processor is ideal for
/// high-throughput environments, as it minimizes the overhead of exporting spans
Expand Down Expand Up @@ -217,16 +219,17 @@ impl SpanProcessor for SimpleSpanProcessor {
/// provider.shutdown();
/// }
/// ```
use futures_executor::block_on;
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::SyncSender;

/// Messages exchanged between the main thread and the background thread.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum BatchMessage {
ExportSpan(SpanData),
//ExportSpan(SpanData),
ExportSpan(Arc<AtomicBool>),
ForceFlush(SyncSender<TraceResult<()>>),
Shutdown(SyncSender<TraceResult<()>>),
SetResource(Arc<Resource>),
Expand All @@ -235,12 +238,17 @@ enum BatchMessage {
/// A batch span processor with a dedicated background thread.
#[derive(Debug)]
pub struct BatchSpanProcessor {
message_sender: SyncSender<BatchMessage>,
span_sender: SyncSender<SpanData>, // Data channel to store spans
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages.
handle: Mutex<Option<thread::JoinHandle<()>>>,
forceflush_timeout: Duration,
shutdown_timeout: Duration,
is_shutdown: AtomicBool,
dropped_span_count: Arc<AtomicUsize>,
export_span_message_sent: Arc<AtomicBool>,
current_batch_size: Arc<AtomicUsize>,
max_export_batch_size: usize,
max_queue_size: usize,
}

impl BatchSpanProcessor {
Expand All @@ -255,7 +263,12 @@ impl BatchSpanProcessor {
where
E: SpanExporter + Send + 'static,
{
let (message_sender, message_receiver) = sync_channel(config.max_queue_size);
let (span_sender, span_receiver) = sync_channel::<SpanData>(config.max_queue_size);
let (message_sender, message_receiver) = sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
let max_queue_size = config.max_queue_size;
let max_export_batch_size = config.max_export_batch_size;
let current_batch_size = Arc::new(AtomicUsize::new(0));
let current_batch_size_for_thread = current_batch_size.clone();

let handle = thread::Builder::new()
.name("OpenTelemetry.Traces.BatchProcessor".to_string())
Expand All @@ -268,7 +281,7 @@ impl BatchSpanProcessor {
);
let mut spans = Vec::with_capacity(config.max_export_batch_size);
let mut last_export_time = Instant::now();

let current_batch_size = current_batch_size_for_thread;
loop {
let remaining_time_option = config
.scheduled_delay
Expand All @@ -279,44 +292,71 @@ impl BatchSpanProcessor {
};
match message_receiver.recv_timeout(remaining_time) {
Ok(message) => match message {
BatchMessage::ExportSpan(span) => {
spans.push(span);
if spans.len() >= config.max_queue_size
|| last_export_time.elapsed() >= config.scheduled_delay
{
if let Err(err) = block_on(exporter.export(spans.split_off(0)))
{
otel_error!(
name: "BatchSpanProcessor.ExportError",
error = format!("{}", err)
);
}
last_export_time = Instant::now();
}
BatchMessage::ExportSpan(export_span_message_sent) => {
// Reset the export span message sent flag now it has has been processed.
export_span_message_sent.store(false, Ordering::Relaxed);
otel_debug!(
name: "BatchSpanProcessor.ExportingDueToBatchSize",
);
let _ = Self::get_spans_and_export(
&span_receiver,
&mut exporter,
&mut spans,
&mut last_export_time,
&current_batch_size,
&config,
);
}
BatchMessage::ForceFlush(sender) => {
let result = block_on(exporter.export(spans.split_off(0)));
otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush");
let result = Self::get_spans_and_export(
&span_receiver,
&mut exporter,
&mut spans,
&mut last_export_time,
&current_batch_size,
&config,
);
let _ = sender.send(result);
}
BatchMessage::Shutdown(sender) => {
let result = block_on(exporter.export(spans.split_off(0)));
otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown");
let result = Self::get_spans_and_export(
&span_receiver,
&mut exporter,
&mut spans,
&mut last_export_time,
&current_batch_size,
&config,
);
let _ = sender.send(result);

otel_debug!(
name: "BatchSpanProcessor.ThreadExiting",
reason = "ShutdownRequested"
);
//
// break out the loop and return from the current background thread.
//
break;
}
BatchMessage::SetResource(resource) => {
exporter.set_resource(&resource);
}
},
Err(RecvTimeoutError::Timeout) => {
if last_export_time.elapsed() >= config.scheduled_delay {
if let Err(err) = block_on(exporter.export(spans.split_off(0))) {
otel_error!(
name: "BatchSpanProcessor.ExportError",
error = format!("{}", err)
);
}
last_export_time = Instant::now();
}
otel_debug!(
name: "BatchSpanProcessor.ExportingDueToTimer",
);

let _ = Self::get_spans_and_export(
&span_receiver,
&mut exporter,
&mut spans,
&mut last_export_time,
&current_batch_size,
&config,
);
}
Err(RecvTimeoutError::Disconnected) => {
// Channel disconnected, only thing to do is break
Expand All @@ -336,12 +376,17 @@ impl BatchSpanProcessor {
.expect("Failed to spawn thread"); //TODO: Handle thread spawn failure

Self {
span_sender,
message_sender,
handle: Mutex::new(Some(handle)),
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
is_shutdown: AtomicBool::new(false),
dropped_span_count: Arc::new(AtomicUsize::new(0)),
max_queue_size,
export_span_message_sent: Arc::new(AtomicBool::new(false)),
current_batch_size,
max_export_batch_size,
}
}

Expand All @@ -355,6 +400,72 @@ impl BatchSpanProcessor {
config: BatchConfig::default(),
}
}

// This method gets upto `max_export_batch_size` amount of spans from the channel and exports them.
// It returns the result of the export operation.
// It expects the span vec to be empty when it's called.
#[inline]
fn get_spans_and_export<E>(
spans_receiver: &Receiver<SpanData>,
exporter: &mut E,
spans: &mut Vec<SpanData>,
last_export_time: &mut Instant,
current_batch_size: &AtomicUsize,
config: &BatchConfig,
) -> ExportResult
where
E: SpanExporter + Send + Sync + 'static,
{
// Get upto `max_export_batch_size` amount of spans from the channel and push them to the span vec
while let Ok(span) = spans_receiver.try_recv() {
spans.push(span);
if spans.len() == config.max_export_batch_size {
break;
}
}

let count_of_spans = spans.len(); // Count of spans that will be exported
let result = Self::export_with_timeout_sync(
config.max_export_timeout,
exporter,
spans,
last_export_time,
); // This method clears the spans vec after exporting

current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed);
result
}

#[allow(clippy::vec_box)]
fn export_with_timeout_sync<E>(
_: Duration, // TODO, enforcing timeout in exporter.
exporter: &mut E,
batch: &mut Vec<SpanData>,
last_export_time: &mut Instant,
) -> ExportResult
where
E: SpanExporter + Send + Sync + 'static,
{
*last_export_time = Instant::now();

if batch.is_empty() {
return TraceResult::Ok(());
}

let export = exporter.export(batch.split_off(0));
let export_result = futures_executor::block_on(export);

match export_result {
Ok(_) => TraceResult::Ok(()),
Err(err) => {
otel_error!(
name: "BatchSpanProcessor.ExportError",
error = format!("{}", err)
);
TraceResult::Err(err)
}
}
}
}

impl SpanProcessor for BatchSpanProcessor {
Expand All @@ -369,10 +480,11 @@ impl SpanProcessor for BatchSpanProcessor {
// this is a warning, as the user is trying to emit after the processor has been shutdown
otel_warn!(
name: "BatchSpanProcessor.Emit.ProcessorShutdown",
message = "BatchSpanProcessor has been shutdown. No further spans will be emitted."
);
return;
}
let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
let result = self.span_sender.try_send(span);

if result.is_err() {
// Increment dropped span count. The first time we have to drop a span,
Expand All @@ -382,6 +494,36 @@ impl SpanProcessor for BatchSpanProcessor {
message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
}
}
// At this point, sending the span to the data channel was successful.
// Increment the current batch size and check if it has reached the max export batch size.
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
{
// Check if the a control message for exporting spans is already sent to the worker thread.
// If not, send a control message to export spans.
// `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message.

if !self.export_span_message_sent.load(Ordering::Relaxed) {
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
// Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false.
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
// We could have used compare_exchange as well here, but it's more verbose than swap.
if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
match self.message_sender.try_send(BatchMessage::ExportSpan(
self.export_span_message_sent.clone(),
)) {
Ok(_) => {
// Control message sent successfully.
}
Err(_err) => {
// TODO: Log error
// If the control message could not be sent, reset the `export_span_message_sent` flag.
self.export_span_message_sent
.store(false, Ordering::Relaxed);
}
}
}
}
}
}

/// Flushes all pending spans.
Expand All @@ -401,17 +543,20 @@ impl SpanProcessor for BatchSpanProcessor {

/// Shuts down the processor.
fn shutdown(&self) -> TraceResult<()> {
if self.is_shutdown.swap(true, Ordering::Relaxed) {
return Err(TraceError::Other("Processor already shutdown".into()));
}
let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_spans > 0 {
otel_warn!(
name: "BatchSpanProcessor.LogsDropped",
name: "BatchSpanProcessor.SpansDropped",
dropped_span_count = dropped_spans,
max_queue_size = max_queue_size,
message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
);
}
if self.is_shutdown.swap(true, Ordering::Relaxed) {
return Err(TraceError::Other("Processor already shutdown".into()));
}

let (sender, receiver) = sync_channel(1);
self.message_sender
.try_send(BatchMessage::Shutdown(sender))
Expand Down

0 comments on commit d2a6b3b

Please sign in to comment.