From dfa14fbdc0a1b9e282dc22c7d57bd846475ccd52 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 10 Dec 2024 08:53:06 +0100 Subject: [PATCH] [Turbopack] use thread local trace collecting (#73615) ### What? Collect trace events in a thread local buffer first and then send a batch of events to the writer thread. But there are some challenges to that: Events might be out of order in the trace file. Usually that should be fine since they have a timestamp, but * span ids might be reused faster than all events are send to the file. So we assign our own globally unique ids instead. * the trace server need to be able to process events out of order. So we queue up events when they are received for spans that are not created yet. And the thread local buffer might also have a problem when the thread is idle or hanging. This would cause events to never be written to the file. To fix that the writer thread will steal the outstanding events from the thread local buffers when it is idle for a second. This ensures that all events are eventually written, even in case of hanging threads. --- Cargo.lock | 10 +- .../src/reader/turbopack.rs | 462 ++++++++++++------ .../crates/turbopack-trace-utils/Cargo.toml | 3 + .../turbopack-trace-utils/src/flavor.rs | 16 +- .../turbopack-trace-utils/src/raw_trace.rs | 65 ++- .../turbopack-trace-utils/src/trace_writer.rs | 248 ++++++++-- 6 files changed, 573 insertions(+), 231 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eae09d5475877..84c3e0f1073f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1546,12 +1546,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" -dependencies = [ - "cfg-if", -] +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" [[package]] name = "crossterm" @@ -9262,10 +9259,13 @@ version = "0.1.0" dependencies = [ "anyhow", "crossbeam-channel", + "crossbeam-utils", "once_cell", + "parking_lot", "postcard", "rustc-hash 1.1.0", "serde", + "thread_local", "tokio", "tracing", "tracing-subscriber", diff --git a/turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs b/turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs index 1cef37cad175a..c944690f407b3 100644 --- a/turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs +++ b/turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs @@ -1,4 +1,5 @@ use std::{ + borrow::Cow, collections::{hash_map::Entry, HashMap, HashSet}, mem::transmute, ops::{Deref, DerefMut}, @@ -6,7 +7,7 @@ use std::{ }; use anyhow::Result; -use turbopack_trace_utils::tracing::TraceRow; +use turbopack_trace_utils::tracing::{TraceRow, TraceValue}; use super::TraceFormat; use crate::{ @@ -23,21 +24,124 @@ struct AllocationInfo { deallocation_count: u64, } +struct InternalRow<'a> { + id: Option, + ty: InternalRowType<'a>, +} + +impl InternalRow<'_> { + fn into_static(self) -> InternalRow<'static> { + InternalRow { + id: self.id, + ty: self.ty.into_static(), + } + } +} + +enum InternalRowType<'a> { + Start { + new_id: u64, + ts: u64, + name: Cow<'a, str>, + target: Cow<'a, str>, + values: Vec<(Cow<'a, str>, TraceValue<'a>)>, + }, + End { + ts: u64, + }, + SelfTime { + start: u64, + end: u64, + }, + Event { + ts: u64, + values: Vec<(Cow<'a, str>, TraceValue<'a>)>, + }, + Record { + values: Vec<(Cow<'a, str>, TraceValue<'a>)>, + }, + Allocation { + allocations: u64, + allocation_count: u64, + }, + Deallocation { + deallocations: u64, + deallocation_count: u64, + }, +} + +impl InternalRowType<'_> { + fn into_static(self) -> InternalRowType<'static> { + match self { + InternalRowType::Start { + ts, + new_id, + name, + target, + values, + } => InternalRowType::Start { + ts, + new_id, + name: name.into_owned().into(), + target: target.into_owned().into(), + values: values + .into_iter() + .map(|(k, v)| (k.into_owned().into(), v.into_static())) + .collect(), + }, + InternalRowType::End { ts } => InternalRowType::End { ts }, + InternalRowType::SelfTime { start, end } => InternalRowType::SelfTime { start, end }, + InternalRowType::Event { ts, values } => InternalRowType::Event { + ts, + values: values + .into_iter() + .map(|(k, v)| (k.into_owned().into(), v.into_static())) + .collect(), + }, + InternalRowType::Record { values } => InternalRowType::Record { + values: values + .into_iter() + .map(|(k, v)| (k.into_owned().into(), v.into_static())) + .collect(), + }, + InternalRowType::Allocation { + allocations, + allocation_count, + } => InternalRowType::Allocation { + allocations, + allocation_count, + }, + InternalRowType::Deallocation { + deallocations, + deallocation_count, + } => InternalRowType::Deallocation { + deallocations, + deallocation_count, + }, + } + } +} + +#[derive(Default)] +struct QueuedRows { + rows: Vec>, +} + pub struct TurbopackFormat { store: Arc, - active_ids: HashMap, - queued_rows: HashMap>>, + id_mapping: HashMap, + queued_rows: HashMap, outdated_spans: HashSet, - thread_stacks: HashMap>, + thread_stacks: HashMap>, thread_allocation_counters: HashMap, - self_time_started: HashMap<(SpanIndex, u64), u64>, + self_time_started: HashMap<(u64, u64), u64>, } impl TurbopackFormat { pub fn new(store: Arc) -> Self { Self { store, - active_ids: HashMap::new(), + id_mapping: HashMap::new(), queued_rows: HashMap::new(), outdated_spans: HashSet::new(), thread_stacks: HashMap::new(), @@ -56,98 +160,63 @@ impl TurbopackFormat { target, values, } => { - let parent = if let Some(parent) = parent { - if let Some(parent) = self.active_ids.get(&parent) { - Some(*parent) - } else { - self.queued_rows - .entry(parent) - .or_default() - .push(TraceRow::Start { - ts, - id, - parent: Some(parent), - name: name.into_owned().into(), - target: target.into_owned().into(), - values: values - .into_iter() - .map(|(k, v)| (k.into_owned().into(), v.into_static())) - .collect(), - }); - return; - } - } else { - None - }; - let span_id = store.add_span( - parent, - ts, - target.into_owned(), - name.into_owned(), - values - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(), - &mut self.outdated_spans, + self.process_internal_row( + store, + InternalRow { + id: parent, + ty: InternalRowType::Start { + ts, + new_id: id, + name, + target, + values, + }, + }, ); - self.active_ids.insert(id, span_id); } TraceRow::Record { id, values } => { - let Some(&id) = self.active_ids.get(&id) else { - self.queued_rows - .entry(id) - .or_default() - .push(TraceRow::Record { - id, - values: values - .into_iter() - .map(|(k, v)| (k.into_owned().into(), v.into_static())) - .collect(), - }); - return; - }; - store.add_args( - id, - values - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(), - &mut self.outdated_spans, + self.process_internal_row( + store, + InternalRow { + id: Some(id), + ty: InternalRowType::Record { values }, + }, ); } - TraceRow::End { ts: _, id } => { - // id might be reused - let index = self.active_ids.remove(&id); - if let Some(index) = index { - store.complete_span(index); - } + TraceRow::End { ts, id } => { + self.process_internal_row( + store, + InternalRow { + id: Some(id), + ty: InternalRowType::End { ts }, + }, + ); } TraceRow::Enter { ts, id, thread_id } => { - let Some(&id) = self.active_ids.get(&id) else { - self.queued_rows - .entry(id) - .or_default() - .push(TraceRow::Enter { ts, id, thread_id }); - return; - }; let stack = self.thread_stacks.entry(thread_id).or_default(); if let Some(&parent) = stack.last() { if let Some(parent_start) = self.self_time_started.remove(&(parent, thread_id)) { - store.add_self_time(parent, parent_start, ts, &mut self.outdated_spans); + stack.push(id); + self.process_internal_row( + store, + InternalRow { + id: Some(parent), + ty: InternalRowType::SelfTime { + start: parent_start, + end: ts, + }, + }, + ); + } else { + stack.push(id); } + } else { + stack.push(id); } - stack.push(id); self.self_time_started.insert((id, thread_id), ts); } TraceRow::Exit { ts, id, thread_id } => { - let Some(&id) = self.active_ids.get(&id) else { - self.queued_rows - .entry(id) - .or_default() - .push(TraceRow::Exit { ts, id, thread_id }); - return; - }; let stack = self.thread_stacks.entry(thread_id).or_default(); if let Some(pos) = stack.iter().rev().position(|&x| x == id) { let stack_index = stack.len() - pos - 1; @@ -158,58 +227,23 @@ impl TurbopackFormat { } } if let Some(start) = self.self_time_started.remove(&(id, thread_id)) { - store.add_self_time(id, start, ts, &mut self.outdated_spans); + self.process_internal_row( + store, + InternalRow { + id: Some(id), + ty: InternalRowType::SelfTime { start, end: ts }, + }, + ); } } TraceRow::Event { ts, parent, values } => { - let parent = if let Some(parent) = parent { - if let Some(parent) = self.active_ids.get(&parent) { - Some(*parent) - } else { - self.queued_rows - .entry(parent) - .or_default() - .push(TraceRow::Event { - ts, - parent: Some(parent), - values: values - .into_iter() - .map(|(k, v)| (k.into_owned().into(), v.into_static())) - .collect(), - }); - return; - } - } else { - None - }; - let mut values = values.into_iter().collect::>(); - let duration = values - .swap_remove("duration") - .and_then(|v| v.as_u64()) - .unwrap_or(0); - let name = values - .swap_remove("name") - .and_then(|v| v.as_str().map(|s| s.to_string())) - .unwrap_or("event".into()); - - let id = store.add_span( - parent, - ts.saturating_sub(duration), - "event".into(), - name, - values - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(), - &mut self.outdated_spans, + self.process_internal_row( + store, + InternalRow { + id: parent, + ty: InternalRowType::Event { ts, values }, + }, ); - store.add_self_time( - id, - ts.saturating_sub(duration), - ts, - &mut self.outdated_spans, - ); - store.complete_span(id); } TraceRow::Allocation { ts: _, @@ -222,19 +256,27 @@ impl TurbopackFormat { let stack = self.thread_stacks.entry(thread_id).or_default(); if let Some(&id) = stack.last() { if allocations > 0 { - store.add_allocation( - id, - allocations, - allocation_count, - &mut self.outdated_spans, + self.process_internal_row( + store, + InternalRow { + id: Some(id), + ty: InternalRowType::Allocation { + allocations, + allocation_count, + }, + }, ); } if deallocations > 0 { - store.add_deallocation( - id, - deallocations, - deallocation_count, - &mut self.outdated_spans, + self.process_internal_row( + store, + InternalRow { + id: Some(id), + ty: InternalRowType::Deallocation { + deallocations, + deallocation_count, + }, + }, ); } } @@ -274,25 +316,145 @@ impl TurbopackFormat { let stack = self.thread_stacks.entry(thread_id).or_default(); if let Some(&id) = stack.last() { if diff.allocations > 0 { - store.add_allocation( - id, - diff.allocations, - diff.allocation_count, - &mut self.outdated_spans, + self.process_internal_row( + store, + InternalRow { + id: Some(id), + ty: InternalRowType::Allocation { + allocations: diff.allocations, + allocation_count: diff.allocation_count, + }, + }, ); } if diff.deallocations > 0 { - store.add_deallocation( - id, - diff.deallocations, - diff.deallocation_count, - &mut self.outdated_spans, + self.process_internal_row( + store, + InternalRow { + id: Some(id), + ty: InternalRowType::Deallocation { + deallocations: diff.deallocations, + deallocation_count: diff.deallocation_count, + }, + }, ); } } } } } + + fn process_internal_row(&mut self, store: &mut StoreWriteGuard, row: InternalRow<'_>) { + let id = if let Some(id) = row.id { + if let Some(id) = self.id_mapping.get(&id) { + Some(*id) + } else { + self.queued_rows + .entry(id) + .or_default() + .rows + .push(row.into_static()); + return; + } + } else { + None + }; + match row.ty { + InternalRowType::Start { + ts, + new_id, + name, + target, + values, + } => { + let span_id = store.add_span( + id, + ts, + target.into_owned(), + name.into_owned(), + values + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + &mut self.outdated_spans, + ); + self.id_mapping.insert(new_id, span_id); + if let Some(QueuedRows { rows }) = self.queued_rows.remove(&new_id) { + for row in rows { + self.process_internal_row(store, row); + } + } + } + InternalRowType::Record { ref values } => { + store.add_args( + id.unwrap(), + values + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + &mut self.outdated_spans, + ); + } + InternalRowType::End { ts: _ } => { + store.complete_span(id.unwrap()); + } + InternalRowType::SelfTime { start, end } => { + store.add_self_time(id.unwrap(), start, end, &mut self.outdated_spans); + } + InternalRowType::Event { ts, values } => { + let mut values = values.into_iter().collect::>(); + let duration = values + .swap_remove("duration") + .and_then(|v| v.as_u64()) + .unwrap_or(0); + let name = values + .swap_remove("name") + .and_then(|v| v.as_str().map(|s| s.to_string())) + .unwrap_or("event".into()); + + let id = store.add_span( + id, + ts.saturating_sub(duration), + "event".into(), + name, + values + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + &mut self.outdated_spans, + ); + store.add_self_time( + id, + ts.saturating_sub(duration), + ts, + &mut self.outdated_spans, + ); + store.complete_span(id); + } + InternalRowType::Allocation { + allocations, + allocation_count, + } => { + store.add_allocation( + id.unwrap(), + allocations, + allocation_count, + &mut self.outdated_spans, + ); + } + InternalRowType::Deallocation { + deallocations, + deallocation_count, + } => { + store.add_deallocation( + id.unwrap(), + deallocations, + deallocation_count, + &mut self.outdated_spans, + ); + } + } + } } impl TraceFormat for TurbopackFormat { diff --git a/turbopack/crates/turbopack-trace-utils/Cargo.toml b/turbopack/crates/turbopack-trace-utils/Cargo.toml index 8135dbec5ef2e..dfa7172eca1d6 100644 --- a/turbopack/crates/turbopack-trace-utils/Cargo.toml +++ b/turbopack/crates/turbopack-trace-utils/Cargo.toml @@ -13,12 +13,15 @@ bench = false [dependencies] anyhow = { workspace = true } +crossbeam-utils = { version = "0.8.20" } crossbeam-channel = { workspace = true } once_cell = { workspace = true } +parking_lot = { workspace = true } postcard = { workspace = true, features = ["alloc", "use-std"] } rustc-hash = { workspace = true } serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["macros", "signal", "sync", "rt"] } +thread_local = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } turbo-tasks-malloc = { workspace = true } diff --git a/turbopack/crates/turbopack-trace-utils/src/flavor.rs b/turbopack/crates/turbopack-trace-utils/src/flavor.rs index 688d5bea50813..0d2859cf42c2b 100644 --- a/turbopack/crates/turbopack-trace-utils/src/flavor.rs +++ b/turbopack/crates/turbopack-trace-utils/src/flavor.rs @@ -1,23 +1,25 @@ use postcard::ser_flavors::Flavor; -pub struct BufFlavor { - pub buf: Vec, +use crate::trace_writer::WriteGuard; + +pub struct WriteGuardFlavor<'l> { + pub guard: WriteGuard<'l>, } -impl Flavor for BufFlavor { - type Output = Vec; +impl Flavor for WriteGuardFlavor<'_> { + type Output = (); fn try_push(&mut self, data: u8) -> postcard::Result<()> { - self.buf.push(data); + self.guard.push(data); Ok(()) } fn finalize(self) -> postcard::Result { - Ok(self.buf) + Ok(()) } fn try_extend(&mut self, data: &[u8]) -> postcard::Result<()> { - self.buf.extend_from_slice(data); + self.guard.extend(data); Ok(()) } } diff --git a/turbopack/crates/turbopack-trace-utils/src/raw_trace.rs b/turbopack/crates/turbopack-trace-utils/src/raw_trace.rs index 9e4bc5f55f8ab..c57ef84218a14 100644 --- a/turbopack/crates/turbopack-trace-utils/src/raw_trace.rs +++ b/turbopack/crates/turbopack-trace-utils/src/raw_trace.rs @@ -1,4 +1,6 @@ -use std::{borrow::Cow, fmt::Write, marker::PhantomData, thread, time::Instant}; +use std::{ + borrow::Cow, fmt::Write, marker::PhantomData, sync::atomic::AtomicU64, thread, time::Instant, +}; use tracing::{ field::{display, Visit}, @@ -8,16 +10,35 @@ use tracing_subscriber::{registry::LookupSpan, Layer}; use turbo_tasks_malloc::TurboMalloc; use crate::{ - flavor::BufFlavor, + flavor::WriteGuardFlavor, trace_writer::TraceWriter, tracing::{TraceRow, TraceValue}, }; +pub struct RawTraceLayerOptions {} + +struct RawTraceLayerExtension { + id: u64, +} + +fn get_id LookupSpan<'a>>( + ctx: tracing_subscriber::layer::Context<'_, S>, + id: &span::Id, +) -> u64 { + ctx.span(id) + .unwrap() + .extensions() + .get::() + .unwrap() + .id +} + /// A tracing layer that writes raw trace data to a writer. The data format is /// defined by [FullTraceRow]. pub struct RawTraceLayer LookupSpan<'a>> { trace_writer: TraceWriter, start: Instant, + next_id: AtomicU64, _phantom: PhantomData, } @@ -26,16 +47,15 @@ impl LookupSpan<'a>> RawTraceLayer { Self { trace_writer, start: Instant::now(), + next_id: AtomicU64::new(1), _phantom: PhantomData, } } fn write(&self, data: TraceRow<'_>) { let start = TurboMalloc::allocation_counters(); - // Buffer is recycled - let buf = self.trace_writer.try_get_buffer().unwrap_or_default(); - let buf = postcard::serialize_with_flavor(&data, BufFlavor { buf }).unwrap(); - self.trace_writer.write(buf); + let guard = self.trace_writer.start_write(); + postcard::serialize_with_flavor(&data, WriteGuardFlavor { guard }).unwrap(); TurboMalloc::reset_allocation_counters(start); } @@ -62,13 +82,20 @@ impl LookupSpan<'a>> Layer for RawTraceLayer { let ts = self.start.elapsed().as_micros() as u64; let mut values = ValuesVisitor::new(); attrs.values().record(&mut values); + let external_id = self + .next_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + ctx.span(id) + .unwrap() + .extensions_mut() + .insert(RawTraceLayerExtension { id: external_id }); self.write(TraceRow::Start { ts, - id: id.into_u64(), + id: external_id, parent: if attrs.is_contextual() { - ctx.current_span().id().map(|p| p.into_u64()) + ctx.current_span().id().map(|p| get_id(ctx, p)) } else { - attrs.parent().map(|p| p.into_u64()) + attrs.parent().map(|p| get_id(ctx, p)) }, name: attrs.metadata().name().into(), target: attrs.metadata().target().into(), @@ -76,32 +103,32 @@ impl LookupSpan<'a>> Layer for RawTraceLayer { }); } - fn on_close(&self, id: span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) { + fn on_close(&self, id: span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) { let ts = self.start.elapsed().as_micros() as u64; self.write(TraceRow::End { ts, - id: id.into_u64(), + id: get_id(ctx, &id), }); } - fn on_enter(&self, id: &span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) { + fn on_enter(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) { let ts = self.start.elapsed().as_micros() as u64; let thread_id = thread::current().id().as_u64().into(); self.report_allocations(ts, thread_id); self.write(TraceRow::Enter { ts, - id: id.into_u64(), + id: get_id(ctx, id), thread_id, }); } - fn on_exit(&self, id: &span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) { + fn on_exit(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) { let ts = self.start.elapsed().as_micros() as u64; let thread_id = thread::current().id().as_u64().into(); self.report_allocations(ts, thread_id); self.write(TraceRow::Exit { ts, - id: id.into_u64(), + id: get_id(ctx, id), thread_id, }); } @@ -113,9 +140,9 @@ impl LookupSpan<'a>> Layer for RawTraceLayer { self.write(TraceRow::Event { ts, parent: if event.is_contextual() { - ctx.current_span().id().map(|p| p.into_u64()) + ctx.current_span().id().map(|p| get_id(ctx, p)) } else { - event.parent().map(|p| p.into_u64()) + event.parent().map(|p| get_id(ctx, p)) }, values: values.values, }); @@ -125,12 +152,12 @@ impl LookupSpan<'a>> Layer for RawTraceLayer { &self, id: &span::Id, record: &span::Record<'_>, - _ctx: tracing_subscriber::layer::Context<'_, S>, + ctx: tracing_subscriber::layer::Context<'_, S>, ) { let mut values = ValuesVisitor::new(); record.record(&mut values); self.write(TraceRow::Record { - id: id.into_u64(), + id: get_id(ctx, id), values: values.values, }); } diff --git a/turbopack/crates/turbopack-trace-utils/src/trace_writer.rs b/turbopack/crates/turbopack-trace-utils/src/trace_writer.rs index 94a781a297404..3449ccbf6c637 100644 --- a/turbopack/crates/turbopack-trace-utils/src/trace_writer.rs +++ b/turbopack/crates/turbopack-trace-utils/src/trace_writer.rs @@ -1,11 +1,47 @@ -use std::{debug_assert, io::Write, thread::JoinHandle}; +use std::{debug_assert, io::Write, sync::Arc, thread::JoinHandle, time::Duration}; -use crossbeam_channel::{bounded, unbounded, Receiver, Sender, TryRecvError}; +use crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError, Sender, TryRecvError}; +use crossbeam_utils::CachePadded; +use parking_lot::{Mutex, MutexGuard}; +use thread_local::ThreadLocal; -#[derive(Clone, Debug)] +type ThreadLocalState = CachePadded>>; + +/// The amount of data that is accumulated in the thread local buffer before it is sent to the +/// writer. The buffer might grow if a single write is larger than this size. +const THREAD_LOCAL_INITIAL_BUFFER_SIZE: usize = 1024 * 1024; +/// Data buffered by the write thread before issuing a filesystem write +const WRITE_BUFFER_SIZE: usize = 100 * 1024 * 1024; + +struct TraceInfoBuffer { + buffer: Vec, +} + +impl TraceInfoBuffer { + fn new(capacity: usize) -> Self { + Self { + buffer: Vec::with_capacity(capacity), + } + } + + fn push(&mut self, data: u8) { + self.buffer.push(data); + } + + fn extend(&mut self, data: &[u8]) { + self.buffer.extend_from_slice(data); + } + + fn clear(&mut self) { + self.buffer.clear(); + } +} + +#[derive(Clone)] pub struct TraceWriter { - data_tx: Sender>, - return_rx: Receiver>, + data_tx: Sender>, + return_rx: Receiver, + thread_locals: Arc>, } impl TraceWriter { @@ -15,53 +51,116 @@ impl TraceWriter { /// * It allows writing an owned Vec instead of a reference, so avoiding additional /// allocation. /// * It uses an unbounded channel to avoid slowing down the application at all (memory) cost. - /// * It issues less writes by buffering the data into chunks of ~1MB, when possible. + /// * It issues less writes by buffering the data into chunks of WRITE_BUFFER_SIZE, when + /// possible. pub fn new(mut writer: W) -> (Self, TraceWriterGuard) { - let (data_tx, data_rx) = unbounded::>(); - let (return_tx, return_rx) = bounded::>(1024 * 10); + let (data_tx, data_rx) = unbounded::>(); + let (return_tx, return_rx) = bounded::(1024); + let thread_locals: Arc> = Default::default(); + + let trace_writer = Self { + data_tx: data_tx.clone(), + return_rx: return_rx.clone(), + thread_locals: thread_locals.clone(), + }; + + fn steal_from_thread_locals( + thread_locals: &Arc>, + stolen_buffers: &mut Vec, + ) { + for state in thread_locals.iter() { + let mut buffer = state.lock(); + if let Some(buffer) = buffer.take() { + stolen_buffers.push(buffer); + } + } + } let handle: std::thread::JoinHandle<()> = std::thread::spawn(move || { let _ = writer.write(b"TRACEv0"); - let mut buf = Vec::with_capacity(1024 * 1024 * 1024); + let mut buf = Vec::with_capacity(WRITE_BUFFER_SIZE); + let mut stolen_buffers = Vec::new(); + let mut should_exit = false; 'outer: loop { if !buf.is_empty() { let _ = writer.write_all(&buf); let _ = writer.flush(); buf.clear(); } - let Ok(mut data) = data_rx.recv() else { - break 'outer; + + let recv = if should_exit { + Ok(None) + } else { + data_rx.recv_timeout(Duration::from_secs(1)) }; - if data.is_empty() { - break 'outer; - } - if data.len() > buf.capacity() { - let _ = writer.write_all(&data); + + let mut data = match recv { + Ok(Some(data)) => data, + result => { + if result.is_ok() { + // On exit signal + should_exit = true; + } + // When we receive no data for a second or we want to exit we poll the + // thread local buffers to steal some data. This + // prevents unsend data if a thread is hanging or the + // system just go into idle. + steal_from_thread_locals(&thread_locals, &mut stolen_buffers); + if let Some(data) = stolen_buffers.pop() { + data + } else { + match result { + Ok(Some(_)) => unreachable!(), + Ok(None) | Err(RecvTimeoutError::Disconnected) => { + // We should exit. + break 'outer; + } + Err(RecvTimeoutError::Timeout) => { + // No data stolen, wait again + continue; + } + } + } + } + }; + if data.buffer.len() > buf.capacity() { + let _ = writer.write_all(&data.buffer); } else { - buf.extend_from_slice(&data); + buf.extend_from_slice(&data.buffer); } data.clear(); let _ = return_tx.try_send(data); loop { - match data_rx.try_recv() { - Ok(data) => { - if data.is_empty() { + let recv = stolen_buffers.pop().map(Some).ok_or(()).or_else(|_| { + if should_exit { + Ok(None) + } else { + data_rx.try_recv() + } + }); + match recv { + Ok(Some(mut data)) => { + let data_buffer = &data.buffer; + if data_buffer.is_empty() { break 'outer; } - if buf.len() + data.len() > buf.capacity() { + if buf.len() + data_buffer.len() > buf.capacity() { let _ = writer.write_all(&buf); buf.clear(); - if data.len() > buf.capacity() { - let _ = writer.write_all(&data); + if data_buffer.len() > buf.capacity() { + let _ = writer.write_all(data_buffer); } else { - buf.extend_from_slice(&data); + buf.extend_from_slice(data_buffer); } } else { - buf.extend_from_slice(&data); + buf.extend_from_slice(data_buffer); } + data.clear(); + let _ = return_tx.try_send(data); } - Err(TryRecvError::Disconnected) => { - break 'outer; + Ok(None) | Err(TryRecvError::Disconnected) => { + should_exit = true; + break; } Err(TryRecvError::Empty) => { break; @@ -69,47 +168,96 @@ impl TraceWriter { } } } - if !buf.is_empty() { - let _ = writer.write_all(&buf); - } - let _ = writer.flush(); drop(writer); }); - ( - Self { - data_tx: data_tx.clone(), - return_rx: return_rx.clone(), - }, - TraceWriterGuard { - data_tx: Some(data_tx), - return_rx: Some(return_rx), - handle: Some(handle), - }, - ) + let guard = TraceWriterGuard { + data_tx: Some(data_tx), + return_rx: Some(return_rx), + handle: Some(handle), + }; + (trace_writer, guard) + } + + fn send(&self, data: TraceInfoBuffer) { + debug_assert!(!data.buffer.is_empty()); + let _ = self.data_tx.send(Some(data)); } - pub fn write(&self, data: Vec) { - debug_assert!(!data.is_empty()); - let _ = self.data_tx.send(data); + fn get_empty_buffer(&self, capacity: usize) -> TraceInfoBuffer { + self.return_rx + .try_recv() + .ok() + .unwrap_or_else(|| TraceInfoBuffer::new(capacity)) } - pub fn try_get_buffer(&self) -> Option> { - self.return_rx.try_recv().ok() + pub fn start_write(&self) -> WriteGuard<'_> { + let thread_local_buffer = self.thread_locals.get_or_default(); + let buffer = thread_local_buffer.lock(); + WriteGuard::new(buffer, self) } } pub struct TraceWriterGuard { - data_tx: Option>>, - return_rx: Option>>, + data_tx: Option>>, + return_rx: Option>, handle: Option>, } impl Drop for TraceWriterGuard { fn drop(&mut self) { - let _ = self.data_tx.take().unwrap().send(Vec::new()); + // Send exit signal, we can't use disconnect since there is another instance in TraceWriter + let _ = self.data_tx.take().unwrap().send(None); + // Receive all return buffers and drop them here. The thread is already busy writing. let return_rx = self.return_rx.take().unwrap(); while return_rx.recv().is_ok() {} + // Wait for the thread to finish completely let _ = self.handle.take().unwrap().join(); } } + +pub struct WriteGuard<'l> { + // Safety: The buffer must not be None + buffer: MutexGuard<'l, Option>, + trace_writer: &'l TraceWriter, +} + +impl<'l> WriteGuard<'l> { + fn new( + mut buffer: MutexGuard<'l, Option>, + trace_writer: &'l TraceWriter, + ) -> Self { + // Safety: The buffer must not be None, so we initialize it here + if buffer.is_none() { + *buffer = Some(trace_writer.get_empty_buffer(THREAD_LOCAL_INITIAL_BUFFER_SIZE)); + }; + Self { + buffer, + trace_writer, + } + } + + fn buffer(&mut self) -> &mut TraceInfoBuffer { + // Safety: The struct invariant ensures that the buffer is not None + unsafe { self.buffer.as_mut().unwrap_unchecked() } + } + + pub fn push(&mut self, data: u8) { + self.buffer().push(data); + } + + pub fn extend(&mut self, data: &[u8]) { + self.buffer().extend(data); + } +} + +impl Drop for WriteGuard<'_> { + fn drop(&mut self) { + if self.buffer().buffer.capacity() * 2 < self.buffer().buffer.len() * 3 { + let capacity = self.buffer().buffer.capacity(); + let new_buffer = self.trace_writer.get_empty_buffer(capacity); + let buffer = std::mem::replace(self.buffer(), new_buffer); + self.trace_writer.send(buffer); + } + } +}