Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix BatchLogProcessor #2510

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum BatchMessage {
/// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
/// This is ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
ExportLog(Arc<AtomicBool>),
/// ForceFlush flushes the current buffer to the exporter.
ForceFlush(mpsc::SyncSender<ExportResult>),
Expand Down Expand Up @@ -457,23 +457,31 @@
where
E: LogExporter + Send + Sync + 'static,
{
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
while let Ok(log) = logs_receiver.try_recv() {
logs.push(log);
if logs.len() == config.max_export_batch_size {
break;
let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting logs.
let mut result = LogResult::Ok(());
let mut total_exported_logs: usize = 0;

while target > 0 && total_exported_logs < target {
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
while let Ok(log) = logs_receiver.try_recv() {
logs.push(log);
if logs.len() == config.max_export_batch_size {
break;

Check warning on line 469 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L469

Added line #L469 was not covered by tests
}
}
}

let count_of_logs = logs.len(); // Count of logs that will be exported
let result = export_with_timeout_sync(
config.max_export_timeout,
exporter,
logs,
last_export_time,
); // This method clears the logs vec after exporting
let count_of_logs = logs.len(); // Count of logs that will be exported
total_exported_logs += count_of_logs;

result = export_with_timeout_sync(
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
config.max_export_timeout,
exporter,
logs,
last_export_time,
); // This method clears the logs vec after exporting

current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
}
result
}

Expand All @@ -485,6 +493,9 @@

match message_receiver.recv_timeout(remaining_time) {
Ok(BatchMessage::ExportLog(export_log_message_sent)) => {
// Reset the export log message sent flag now it has has been processed.
export_log_message_sent.store(false, Ordering::Relaxed);

Check warning on line 498 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L496-L498

Added lines #L496 - L498 were not covered by tests
otel_debug!(
name: "BatchLogProcessor.ExportingDueToBatchSize",
);
Expand All @@ -497,9 +508,6 @@
&current_batch_size,
&config,
);

// Reset the export log message sent flag now it has has been processed.
export_log_message_sent.store(false, Ordering::Relaxed);
}
Ok(BatchMessage::ForceFlush(sender)) => {
otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
Expand Down
Loading