From 158524a0fcbb6f86d9df72433c675e638ff21035 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Tue, 14 Jan 2025 17:00:32 +0000 Subject: [PATCH 1/2] Fix BatchLogProcessor --- opentelemetry-sdk/src/logs/log_processor.rs | 42 ++++++++++++--------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 4ad4c0342d..48a1263782 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -199,7 +199,7 @@ impl LogProcessor for SimpleLogProcessor { #[allow(clippy::large_enum_variant)] #[derive(Debug)] enum BatchMessage { - /// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`. + /// This is ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`. ExportLog(Arc), /// ForceFlush flushes the current buffer to the exporter. ForceFlush(mpsc::SyncSender), @@ -306,7 +306,7 @@ impl LogProcessor for BatchLogProcessor { // If not, send a control message to export logs. // `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message. - if !self.export_log_message_sent.load(Ordering::Relaxed) { + if !self.export_log_message_sent.load(Ordering::Acquire) { // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line. // Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false. // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures. @@ -457,23 +457,31 @@ impl BatchLogProcessor { where E: LogExporter + Send + Sync + 'static, { - // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec - while let Ok(log) = logs_receiver.try_recv() { - logs.push(log); - if logs.len() == config.max_export_batch_size { - break; + let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting logs. + let mut result = LogResult::Ok(()); + let mut total_exported_logs: usize = 0; + + while target > 0 && total_exported_logs < target { + // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec + while let Ok(log) = logs_receiver.try_recv() { + logs.push(log); + if logs.len() == config.max_export_batch_size { + break; + } } - } - let count_of_logs = logs.len(); // Count of logs that will be exported - let result = export_with_timeout_sync( - config.max_export_timeout, - exporter, - logs, - last_export_time, - ); // This method clears the logs vec after exporting + let count_of_logs = logs.len(); // Count of logs that will be exported + total_exported_logs += count_of_logs; - current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); + result = export_with_timeout_sync( + config.max_export_timeout, + exporter, + logs, + last_export_time, + ); // This method clears the logs vec after exporting + + current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); + } result } @@ -499,7 +507,7 @@ impl BatchLogProcessor { ); // Reset the export log message sent flag now it has has been processed. - export_log_message_sent.store(false, Ordering::Relaxed); + export_log_message_sent.store(false, Ordering::Release); } Ok(BatchMessage::ForceFlush(sender)) => { otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush"); From c426db42334db03e62eac0e0247233a1cb09e696 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Tue, 14 Jan 2025 21:37:40 +0000 Subject: [PATCH 2/2] Address PR comments --- opentelemetry-sdk/src/logs/log_processor.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 48a1263782..be83244719 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -306,7 +306,7 @@ impl LogProcessor for BatchLogProcessor { // If not, send a control message to export logs. // `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message. - if !self.export_log_message_sent.load(Ordering::Acquire) { + if !self.export_log_message_sent.load(Ordering::Relaxed) { // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line. // Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false. // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures. @@ -493,6 +493,9 @@ impl BatchLogProcessor { match message_receiver.recv_timeout(remaining_time) { Ok(BatchMessage::ExportLog(export_log_message_sent)) => { + // Reset the export log message sent flag now it has has been processed. + export_log_message_sent.store(false, Ordering::Relaxed); + otel_debug!( name: "BatchLogProcessor.ExportingDueToBatchSize", ); @@ -505,9 +508,6 @@ impl BatchLogProcessor { ¤t_batch_size, &config, ); - - // Reset the export log message sent flag now it has has been processed. - export_log_message_sent.store(false, Ordering::Release); } Ok(BatchMessage::ForceFlush(sender)) => { otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");