Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor BatchLogProcessor #2494

Merged
152 changes: 120 additions & 32 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum BatchMessage {
/// Export logs, called when the log is emitted.
ExportLog(Box<(LogRecord, InstrumentationScope)>),
/// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
ExportLog(Arc<AtomicBool>),
/// ForceFlush flushes the current buffer to the exporter.
ForceFlush(mpsc::SyncSender<ExportResult>),
/// Shut down the worker thread, push all logs in buffer to the exporter.
Expand All @@ -209,6 +209,8 @@
SetResource(Arc<Resource>),
}

type LogsData = Box<(LogRecord, InstrumentationScope)>;

/// The `BatchLogProcessor` collects finished logs in a buffer and exports them
/// in batches to the configured `LogExporter`. This processor is ideal for
/// high-throughput environments, as it minimizes the overhead of exporting logs
Expand Down Expand Up @@ -246,11 +248,15 @@
/// .build();
///
pub struct BatchLogProcessor {
message_sender: SyncSender<BatchMessage>,
logs_sender: SyncSender<LogsData>, // Data channel to store log records and instrumentation scopes
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread
handle: Mutex<Option<thread::JoinHandle<()>>>,
forceflush_timeout: Duration,
shutdown_timeout: Duration,
is_shutdown: AtomicBool,
export_log_message_sent: Arc<AtomicBool>,
current_batch_size: Arc<AtomicUsize>,
max_export_batch_size: usize,

// Track dropped logs - we'll log this at shutdown
dropped_logs_count: AtomicUsize,
Expand Down Expand Up @@ -279,11 +285,8 @@
}

let result = self
.message_sender
.try_send(BatchMessage::ExportLog(Box::new((
record.clone(),
instrumentation.clone(),
))));
.logs_sender
.try_send(Box::new((record.clone(), instrumentation.clone())));

if result.is_err() {
// Increment dropped logs count. The first time we have to drop a log,
Expand All @@ -292,6 +295,37 @@
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
}
return;

Check warning on line 298 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L298

Added line #L298 was not covered by tests
}

// At this point, sending the log record to the data channel was successful.
// Increment the current batch size and check if it has reached the max export batch size.
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
{
// Check if the a control message for exporting logs is already sent to the worker thread.
// If not, send a control message to export logs.
// `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message.

if !self.export_log_message_sent.load(Ordering::Relaxed) {

Check warning on line 309 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L309

Added line #L309 was not covered by tests
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
// Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false.
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
// We could have used compare_exchange as well here, but it's more verbose than swap.
if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
match self.message_sender.try_send(BatchMessage::ExportLog(
self.export_log_message_sent.clone(),
)) {
Ok(_) => {
// Control message sent successfully.
}
Err(_err) => {
// TODO: Log error
// If the control message could not be sent, reset the `export_log_message_sent` flag.
self.export_log_message_sent.store(false, Ordering::Relaxed);
}

Check warning on line 325 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L314-L325

Added lines #L314 - L325 were not covered by tests
}
}
}

Check warning on line 328 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L327-L328

Added lines #L327 - L328 were not covered by tests
}
}

Expand Down Expand Up @@ -388,8 +422,12 @@
where
E: LogExporter + Send + Sync + 'static,
{
let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size);
let (logs_sender, logs_receiver) = mpsc::sync_channel::<LogsData>(config.max_queue_size);
let (message_sender, message_receiver) = mpsc::sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
lalitb marked this conversation as resolved.
Show resolved Hide resolved
let max_queue_size = config.max_queue_size;
let max_export_batch_size = config.max_export_batch_size;
let current_batch_size = Arc::new(AtomicUsize::new(0));
let current_batch_size_for_thread = current_batch_size.clone();

let handle = thread::Builder::new()
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
Expand All @@ -402,6 +440,42 @@
);
let mut last_export_time = Instant::now();
let mut logs = Vec::with_capacity(config.max_export_batch_size);
let current_batch_size = current_batch_size_for_thread;

// This method gets upto `max_export_batch_size` amount of logs from the channel and exports them.
// It returns the result of the export operation.
// It expects the logs vec to be empty when it's called.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can get rid of logs vec and instead create a fresh array and pass its slice to export method.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For using array, we need to know the size of the array beforehand which we don't because we don't know how many items are present in the channel. It might be less than max_export_batch_size.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can allocate an array of max_export_batch_size, and slice it at the required length. Anyway, this is not a blocker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always stack allocating a big array even to export very few logs seems wasteful.

What's the benefit of using an array instead of a vec here? We are reusing the same vec for each export anyway so allocation isn't a concern.

#[inline]
fn get_logs_and_export<E>(
logs_receiver: &mpsc::Receiver<LogsData>,
exporter: &E,
logs: &mut Vec<LogsData>,
last_export_time: &mut Instant,
current_batch_size: &AtomicUsize,
config: &BatchConfig,
) -> ExportResult
where
E: LogExporter + Send + Sync + 'static,
{
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
while let Ok(log) = logs_receiver.try_recv() {
logs.push(log);
if logs.len() == config.max_export_batch_size {
break;

Check warning on line 464 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L464

Added line #L464 was not covered by tests
}
}

let count_of_logs = logs.len(); // Count of logs that will be exported
let result = export_with_timeout_sync(
config.max_export_timeout,
exporter,
logs,
last_export_time,
); // This method clears the logs vec after exporting

current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
result
}

loop {
let remaining_time = config
Expand All @@ -410,37 +484,44 @@
.unwrap_or(config.scheduled_delay);

match message_receiver.recv_timeout(remaining_time) {
Ok(BatchMessage::ExportLog(log)) => {
logs.push(log);
if logs.len() == config.max_export_batch_size {
otel_debug!(
name: "BatchLogProcessor.ExportingDueToBatchSize",
);
let _ = export_with_timeout_sync(
config.max_export_timeout,
&mut exporter,
&mut logs,
&mut last_export_time,
);
}
Ok(BatchMessage::ExportLog(export_log_message_sent)) => {
otel_debug!(
name: "BatchLogProcessor.ExportingDueToBatchSize",
);

Check warning on line 490 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L487-L490

Added lines #L487 - L490 were not covered by tests

let _ = get_logs_and_export(
&logs_receiver,
&exporter,
&mut logs,
&mut last_export_time,
&current_batch_size,
&config,
);

// Reset the export log message sent flag now it has has been processed.
export_log_message_sent.store(false, Ordering::Relaxed);

Check warning on line 502 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L492-L502

Added lines #L492 - L502 were not covered by tests
}
Ok(BatchMessage::ForceFlush(sender)) => {
otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
let result = export_with_timeout_sync(
config.max_export_timeout,
&mut exporter,
let result = get_logs_and_export(
utpilla marked this conversation as resolved.
Show resolved Hide resolved
&logs_receiver,
&exporter,
&mut logs,
&mut last_export_time,
&current_batch_size,
&config,
);
let _ = sender.send(result);
}
Ok(BatchMessage::Shutdown(sender)) => {
otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown");
let result = export_with_timeout_sync(
config.max_export_timeout,
&mut exporter,
let result = get_logs_and_export(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as flush - we need to ensure logs are continued to be exported until it finishes all logs received upto the point shutdown was invoked.

&logs_receiver,
&exporter,
&mut logs,
&mut last_export_time,
&current_batch_size,
&config,
);
let _ = sender.send(result);

Expand All @@ -460,11 +541,14 @@
otel_debug!(
name: "BatchLogProcessor.ExportingDueToTimer",
);
let _ = export_with_timeout_sync(
config.max_export_timeout,
&mut exporter,

let _ = get_logs_and_export(
&logs_receiver,
&exporter,

Check warning on line 547 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L545-L547

Added lines #L545 - L547 were not covered by tests
&mut logs,
&mut last_export_time,
&current_batch_size,
&config,

Check warning on line 551 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L550-L551

Added lines #L550 - L551 were not covered by tests
);
}
Err(RecvTimeoutError::Disconnected) => {
Expand All @@ -486,13 +570,17 @@

// Return batch processor with link to worker
BatchLogProcessor {
logs_sender,
message_sender,
handle: Mutex::new(Some(handle)),
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
is_shutdown: AtomicBool::new(false),
dropped_logs_count: AtomicUsize::new(0),
max_queue_size,
export_log_message_sent: Arc::new(AtomicBool::new(false)),
current_batch_size,
max_export_batch_size,
}
}

Expand All @@ -511,7 +599,7 @@
#[allow(clippy::vec_box)]
fn export_with_timeout_sync<E>(
_: Duration, // TODO, enforcing timeout in exporter.
exporter: &mut E,
exporter: &E,
batch: &mut Vec<Box<(LogRecord, InstrumentationScope)>>,
last_export_time: &mut Instant,
) -> ExportResult
Expand Down
Loading