From c1ee5313d5a4cd56ad358f0fd4fa11ffb4597276 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 22 Dec 2023 16:13:10 +0100 Subject: [PATCH 01/10] BufferedSink should probably also warn when dropping data --- crates/re_sdk/src/log_sink.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/crates/re_sdk/src/log_sink.rs b/crates/re_sdk/src/log_sink.rs index 01daceb39f1d..c5cd86762f3a 100644 --- a/crates/re_sdk/src/log_sink.rs +++ b/crates/re_sdk/src/log_sink.rs @@ -42,6 +42,20 @@ pub trait LogSink: Send + Sync + 'static { #[derive(Default)] pub struct BufferedSink(parking_lot::Mutex>); +impl Drop for BufferedSink { + fn drop(&mut self) { + for msg in self.0.lock().iter() { + // Sinks intentionally end up with pending SetStoreInfo messages + // these are fine to drop safely. Anything else should produce a + // warning. + if !matches!(msg, LogMsg::SetStoreInfo(_)) { + re_log::warn!("Dropping data in BufferedSink"); + return; + } + } + } +} + impl BufferedSink { /// An empty buffer. #[inline] From d75d4fb8919b8a6fa88be0686fb20b206d47ee12 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 22 Dec 2023 16:14:45 +0100 Subject: [PATCH 02/10] introduce ArrowChunkReleaseCallback --- crates/re_log_types/src/arrow_msg.rs | 69 ++++++++++++++++++- crates/re_log_types/src/data_table.rs | 2 + crates/re_log_types/src/data_table_batcher.rs | 10 ++- crates/re_log_types/src/lib.rs | 2 +- crates/re_sdk/src/recording_stream.rs | 21 +++--- 5 files changed, 93 insertions(+), 11 deletions(-) diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index d7b8970dc08e..b7ed316fddce 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -3,12 +3,67 @@ //! We have custom implementations of [`serde::Serialize`] and [`serde::Deserialize`] that wraps //! the inner Arrow serialization of [`Schema`] and [`Chunk`]. +use std::sync::Arc; + use crate::{TableId, TimePoint}; use arrow2::{array::Array, chunk::Chunk, datatypes::Schema}; +/// An arbitrary callback to be run when an [`ArrowMsg`], and more specifically the +/// Arrow [`Chunk`] within it, goes out of scope. +/// +/// If the [`ArrowMsg`] has been cloned in a bunch of places, the callback will run for each and +/// every instance. +/// It is up to the callback implementer to handle this, if needed. +#[allow(clippy::type_complexity)] +#[derive(Clone)] +pub struct ArrowChunkReleaseCallback(Arc>) + Send + Sync>); + +impl std::ops::Deref for ArrowChunkReleaseCallback { + type Target = dyn Fn(Chunk>) + Send + Sync; + + fn deref(&self) -> &Self::Target { + &*self.0 + } +} + +impl From for ArrowChunkReleaseCallback +where + F: Fn(Chunk>) + Send + Sync + 'static, +{ + #[inline] + fn from(f: F) -> Self { + Self(Arc::new(f)) + } +} + +impl ArrowChunkReleaseCallback { + #[inline] + pub fn as_ptr(&self) -> *const () { + Arc::as_ptr(&self.0).cast::<()>() + } +} + +impl PartialEq for ArrowChunkReleaseCallback { + #[inline] + fn eq(&self, other: &Self) -> bool { + std::ptr::eq(self.as_ptr(), other.as_ptr()) + } +} + +impl Eq for ArrowChunkReleaseCallback {} + +impl std::fmt::Debug for ArrowChunkReleaseCallback { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("ArrowChunkReleaseCallback") + .field(&format!("{:p}", self.as_ptr())) + .finish() + } +} + /// Message containing an Arrow payload -#[must_use] #[derive(Clone, Debug, PartialEq)] +#[must_use] pub struct ArrowMsg { /// Unique identifier for the [`crate::DataTable`] in this message. pub table_id: TableId, @@ -24,6 +79,17 @@ pub struct ArrowMsg { /// Data for all control & data columns. pub chunk: Chunk>, + + // pub on_release: Option>, + pub on_release: Option, +} + +impl Drop for ArrowMsg { + fn drop(&mut self) { + if let Some(on_release) = self.on_release.take() { + (*on_release)(self.chunk.clone() /* shallow */); + } + } } #[cfg(feature = "serde")] @@ -127,6 +193,7 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg { timepoint_max, schema, chunk, + on_release: None, }) } else { Err(serde::de::Error::custom( diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index 4c4c1349f686..0fc2567758f6 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -1048,6 +1048,7 @@ impl DataTable { timepoint_max: _, schema, chunk, + on_release: _, } = msg; Self::deserialize(*table_id, schema, chunk) @@ -1066,6 +1067,7 @@ impl DataTable { timepoint_max, schema, chunk, + on_release: None, }) } } diff --git a/crates/re_log_types/src/data_table_batcher.rs b/crates/re_log_types/src/data_table_batcher.rs index 89cf50e40e3e..ebe9f7ced3dd 100644 --- a/crates/re_log_types/src/data_table_batcher.rs +++ b/crates/re_log_types/src/data_table_batcher.rs @@ -37,7 +37,7 @@ pub type DataTableBatcherResult = Result; /// Defines the different thresholds of the associated [`DataTableBatcher`]. /// /// See [`Self::default`] and [`Self::from_env`]. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct DataTableBatcherConfig { /// Duration of the periodic tick. // @@ -64,6 +64,11 @@ pub struct DataTableBatcherConfig { /// /// Unbounded if left unspecified. pub max_tables_in_flight: Option, + + /// Callback to be run when an Arrow [`Chunk`] goes out of scope. + /// + /// See [`crate::ArrowChunkReleaseCallback`] for more information. + pub on_release: Option, } impl Default for DataTableBatcherConfig { @@ -80,6 +85,7 @@ impl DataTableBatcherConfig { flush_num_rows: u64::MAX, max_commands_in_flight: None, max_tables_in_flight: None, + on_release: None, }; /// Always flushes ASAP. @@ -89,6 +95,7 @@ impl DataTableBatcherConfig { flush_num_rows: 0, max_commands_in_flight: None, max_tables_in_flight: None, + on_release: None, }; /// Never flushes unless manually told to. @@ -98,6 +105,7 @@ impl DataTableBatcherConfig { flush_num_rows: u64::MAX, max_commands_in_flight: None, max_tables_in_flight: None, + on_release: None, }; /// Environment variable to configure [`Self::flush_tick`]. diff --git a/crates/re_log_types/src/lib.rs b/crates/re_log_types/src/lib.rs index 329a4683fd2d..29e072541785 100644 --- a/crates/re_log_types/src/lib.rs +++ b/crates/re_log_types/src/lib.rs @@ -37,7 +37,7 @@ mod data_table_batcher; use std::sync::Arc; -pub use self::arrow_msg::ArrowMsg; +pub use self::arrow_msg::{ArrowChunkReleaseCallback, ArrowMsg}; pub use self::data_cell::{DataCell, DataCellError, DataCellInner, DataCellResult}; pub use self::data_row::{ DataCellRow, DataCellVec, DataReadError, DataReadResult, DataRow, DataRowError, DataRowResult, diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index 4e1c4743d4ab..028221e97871 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -5,9 +5,10 @@ use ahash::HashMap; use crossbeam::channel::{Receiver, Sender}; use re_log_types::{ - ApplicationId, DataCell, DataCellError, DataRow, DataTable, DataTableBatcher, - DataTableBatcherConfig, DataTableBatcherError, EntityPath, LogMsg, RowId, StoreId, StoreInfo, - StoreKind, StoreSource, Time, TimeInt, TimePoint, TimeType, Timeline, TimelineName, + ApplicationId, ArrowChunkReleaseCallback, DataCell, DataCellError, DataRow, DataTable, + DataTableBatcher, DataTableBatcherConfig, DataTableBatcherError, EntityPath, LogMsg, RowId, + StoreId, StoreInfo, StoreKind, StoreSource, Time, TimeInt, TimePoint, TimeType, Timeline, + TimelineName, }; use re_types_core::{components::InstanceKey, AsComponents, ComponentBatch, SerializationError}; @@ -610,6 +611,7 @@ impl RecordingStreamInner { batcher_config: DataTableBatcherConfig, sink: Box, ) -> RecordingStreamResult { + let on_release = batcher_config.on_release.clone(); let batcher = DataTableBatcher::new(batcher_config)?; { @@ -636,7 +638,7 @@ impl RecordingStreamInner { .spawn({ let info = info.clone(); let batcher = batcher.clone(); - move || forwarding_thread(info, sink, cmds_rx, batcher.tables()) + move || forwarding_thread(info, sink, cmds_rx, batcher.tables(), on_release) }) .map_err(|err| RecordingStreamError::SpawnThread { name: NAME, err })? }; @@ -956,6 +958,7 @@ fn forwarding_thread( mut sink: Box, cmds_rx: Receiver, tables: Receiver, + on_release: Option, ) { /// Returns `true` to indicate that processing can continue; i.e. `false` means immediate /// shutdown. @@ -1018,7 +1021,7 @@ fn forwarding_thread( // NOTE: Always pop tables first, this is what makes `Command::PopPendingTables` possible, // which in turns makes `RecordingStream::flush_blocking` well defined. while let Ok(table) = tables.try_recv() { - let table = match table.to_arrow_msg() { + let mut arrow_msg = match table.to_arrow_msg() { Ok(table) => table, Err(err) => { re_log::error!(%err, @@ -1026,7 +1029,8 @@ fn forwarding_thread( continue; } }; - sink.send(LogMsg::ArrowMsg(info.store_id.clone(), table)); + arrow_msg.on_release = on_release.clone(); + sink.send(LogMsg::ArrowMsg(info.store_id.clone(), arrow_msg)); } select! { @@ -1037,7 +1041,7 @@ fn forwarding_thread( re_log::trace!("Shutting down forwarding_thread: batcher is gone"); break; }; - let table = match table.to_arrow_msg() { + let mut arrow_msg = match table.to_arrow_msg() { Ok(table) => table, Err(err) => { re_log::error!(%err, @@ -1045,7 +1049,8 @@ fn forwarding_thread( continue; } }; - sink.send(LogMsg::ArrowMsg(info.store_id.clone(), table)); + arrow_msg.on_release = on_release.clone(); + sink.send(LogMsg::ArrowMsg(info.store_id.clone(), arrow_msg)); } recv(cmds_rx) -> res => { let Ok(cmd) = res else { From 167b2a16d6de9d55fe38a86459e147ac7881cbd0 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 22 Dec 2023 16:15:31 +0100 Subject: [PATCH 03/10] introduce Python SDK's garbage queue --- rerun_py/Cargo.toml | 1 + rerun_py/src/python_bridge.rs | 108 +++++++++++++++++++++++++++++----- 2 files changed, 94 insertions(+), 15 deletions(-) diff --git a/rerun_py/Cargo.toml b/rerun_py/Cargo.toml index 5b826afab50b..1b0298d9c2e8 100644 --- a/rerun_py/Cargo.toml +++ b/rerun_py/Cargo.toml @@ -53,6 +53,7 @@ re_viewer.workspace = true re_viewport.workspace = true arrow2 = { workspace = true, features = ["io_ipc", "io_print"] } +crossbeam.workspace = true document-features.workspace = true itertools = { workspace = true } mimalloc = { workspace = true, features = ["local_dynamic_tls"] } diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index e09f0334c21d..4c3166f7a618 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -37,7 +37,7 @@ use re_ws_comms::RerunServerPort; // --- FFI --- -use once_cell::sync::OnceCell; +use once_cell::sync::{Lazy, OnceCell}; use parking_lot::Mutex; // The bridge needs to have complete control over the lifetimes of the individual recordings, @@ -51,6 +51,48 @@ fn all_recordings() -> parking_lot::MutexGuard<'static, HashMap>; +type GarbageSender = crossbeam::channel::Sender; +type GarbageReceiver = crossbeam::channel::Receiver; + +/// ## Release Callbacks +/// +/// When Arrow data gets logged from Python to Rust across FFI, it carries with it a `release` +/// callback (see Arrow spec) that will be run when the data gets dropped. +/// +/// This is an issue in this case because running that callback will likely try and grab the GIL, +/// which is something that should only happen at very specific times, else we end up with deadlocks, +/// segfaults, aborts... +/// +/// ## The garbage queue +/// +/// When a [`LogMsg`] that was logged from Python gets dropped on the Rust side, it will end up +/// in this queue. +/// +/// The mere fact that the data still exists in this queue prevents the underlying Arrow refcount +/// to go below one, which in turn prevents the associated FFI `release` callback to run, which +/// avoids the issue mentioned above. +/// +/// When the time is right, call [`flush_garbage_queue`] to flush the queue and deallocate all the +/// accumulated data for real. +// +// NOTE: `crossbeam` rather than `std` because we need a `Send` & `Sync` receiver. +static GARBAGE_QUEUE: Lazy<(GarbageSender, GarbageReceiver)> = + Lazy::new(crossbeam::channel::unbounded); + +/// Flushes the [`GARBAGE_QUEUE`], therefore running all the associated FFI `release` callbacks. +/// +/// Any time you release the GIL (e.g. `py.allow_threads()`), try to slip in a call to this +/// function so we don't accumulate too much garbage. +fn flush_garbage_queue() { + while GARBAGE_QUEUE.1.try_recv().is_ok() { + // Implicitly dropping chunks, therefore triggering their `release` callbacks, therefore + // triggering the native Python GC. + } +} + +// --- + #[cfg(feature = "web_viewer")] fn global_web_viewer_server( ) -> parking_lot::MutexGuard<'static, Option> { @@ -205,7 +247,14 @@ fn new_recording( default_store_id(py, StoreKind::Recording, &application_id) }; + let mut batcher_config = re_log_types::DataTableBatcherConfig::from_env().unwrap_or_default(); + let on_release = |chunk| { + GARBAGE_QUEUE.0.send(chunk).ok(); + }; + batcher_config.on_release = Some(on_release.into()); + let recording = RecordingStreamBuilder::new(application_id) + .batcher_config(batcher_config) .is_official_example(is_official_example) .store_id(recording_id.clone()) .store_source(re_log_types::StoreSource::PythonSdk(python_version(py))) @@ -255,7 +304,14 @@ fn new_blueprint( default_store_id(py, StoreKind::Blueprint, &application_id) }; + let mut batcher_config = re_log_types::DataTableBatcherConfig::from_env().unwrap_or_default(); + let on_release = |chunk| { + GARBAGE_QUEUE.0.send(chunk).ok(); + }; + batcher_config.on_release = Some(on_release.into()); + let blueprint = RecordingStreamBuilder::new(application_id) + .batcher_config(batcher_config) .store_id(blueprint_id.clone()) .store_source(re_log_types::StoreSource::PythonSdk(python_version(py))) .default_enabled(default_enabled) @@ -282,13 +338,14 @@ fn new_blueprint( } #[pyfunction] -fn shutdown(py: Python<'_>) { +fn shutdown() { re_log::debug!("Shutting down the Rerun SDK"); // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { for (_, recording) in all_recordings().drain() { recording.disconnect(); } + flush_garbage_queue(); }); } @@ -360,11 +417,13 @@ fn set_global_data_recording( // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than // sorry. py.allow_threads(|| { - RecordingStream::set_global( + let rec = RecordingStream::set_global( rerun::StoreKind::Recording, recording.map(|rec| rec.0.clone()), ) - .map(PyRecordingStream) + .map(PyRecordingStream); + flush_garbage_queue(); + rec }) } @@ -390,11 +449,13 @@ fn set_thread_local_data_recording( // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than // sorry. py.allow_threads(|| { - RecordingStream::set_thread_local( + let rec = RecordingStream::set_thread_local( rerun::StoreKind::Recording, recording.map(|rec| rec.0.clone()), ) - .map(PyRecordingStream) + .map(PyRecordingStream); + flush_garbage_queue(); + rec }) } @@ -431,11 +492,13 @@ fn set_global_blueprint_recording( // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than // sorry. py.allow_threads(|| { - RecordingStream::set_global( + let rec = RecordingStream::set_global( rerun::StoreKind::Blueprint, recording.map(|rec| rec.0.clone()), ) - .map(PyRecordingStream) + .map(PyRecordingStream); + flush_garbage_queue(); + rec }) } @@ -461,11 +524,13 @@ fn set_thread_local_blueprint_recording( // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than // sorry. py.allow_threads(|| { - RecordingStream::set_thread_local( + let rec = RecordingStream::set_thread_local( rerun::StoreKind::Blueprint, recording.map(|rec| rec.0.clone()), ) - .map(PyRecordingStream) + .map(PyRecordingStream); + flush_garbage_queue(); + rec }) } @@ -507,6 +572,7 @@ fn connect( blueprint.connect_opts(addr, flush_timeout); }; } + flush_garbage_queue(); }); Ok(()) @@ -522,9 +588,11 @@ fn save(path: &str, recording: Option<&PyRecordingStream>, py: Python<'_>) -> Py // The call to save may internally flush. // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { - recording + let res = recording .save(path) - .map_err(|err| PyRuntimeError::new_err(err.to_string())) + .map_err(|err| PyRuntimeError::new_err(err.to_string())); + flush_garbage_queue(); + res }) } @@ -538,9 +606,11 @@ fn stdout(recording: Option<&PyRecordingStream>, py: Python<'_>) -> PyResult<()> // The call to stdout may internally flush. // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { - recording + let res = recording .stdout() - .map_err(|err| PyRuntimeError::new_err(err.to_string())) + .map_err(|err| PyRuntimeError::new_err(err.to_string())); + flush_garbage_queue(); + res }) } @@ -554,7 +624,11 @@ fn memory_recording( get_data_recording(recording).map(|rec| { // The call to memory may internally flush. // Release the GIL in case any flushing behavior needs to cleanup a python object. - let inner = py.allow_threads(|| rec.memory()); + let inner = py.allow_threads(|| { + let storage = rec.memory(); + flush_garbage_queue(); + storage + }); PyMemorySinkStorage { rec: rec.0, inner } }) } @@ -579,6 +653,7 @@ impl PyMemorySinkStorage { // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { self.rec.flush_blocking(); + flush_garbage_queue(); }); MemorySinkStorage::concat_memory_sinks_as_bytes( @@ -599,6 +674,7 @@ impl PyMemorySinkStorage { // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { self.rec.flush_blocking(); + flush_garbage_queue(); }); self.inner.num_msgs() @@ -669,6 +745,7 @@ fn disconnect(py: Python<'_>, recording: Option<&PyRecordingStream>) { // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { recording.disconnect(); + flush_garbage_queue(); }); } @@ -685,6 +762,7 @@ fn flush(py: Python<'_>, blocking: bool, recording: Option<&PyRecordingStream>) } else { recording.flush_async(); } + flush_garbage_queue(); }); } From 36411e67aff999e7dc321ec25b604b1f771d0fc0 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 22 Dec 2023 16:16:42 +0100 Subject: [PATCH 04/10] Do we still need the ALL_RECORDINGS business then? --- rerun_py/src/python_bridge.rs | 36 ----------------------------------- 1 file changed, 36 deletions(-) diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 4c3166f7a618..dbd6681d9a16 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -40,17 +40,6 @@ use re_ws_comms::RerunServerPort; use once_cell::sync::{Lazy, OnceCell}; use parking_lot::Mutex; -// The bridge needs to have complete control over the lifetimes of the individual recordings, -// otherwise all the recording shutdown machinery (which includes deallocating C, Rust and Python -// data and joining a bunch of threads) can end up running at any time depending on what the -// Python GC is doing, which obviously leads to very bad things :tm:. -// -// TODO(#2116): drop unused recordings -fn all_recordings() -> parking_lot::MutexGuard<'static, HashMap> { - static ALL_RECORDINGS: OnceCell>> = OnceCell::new(); - ALL_RECORDINGS.get_or_init(Default::default).lock() -} - type GarbageChunk = arrow2::chunk::Chunk>; type GarbageSender = crossbeam::channel::Sender; type GarbageReceiver = crossbeam::channel::Receiver; @@ -275,9 +264,6 @@ fn new_recording( ); } - // NOTE: The Rust-side of the bindings must be in control of the lifetimes of the recordings! - all_recordings().insert(recording_id, recording.clone()); - Ok(PyRecordingStream(recording)) } @@ -331,22 +317,12 @@ fn new_blueprint( ); } - // NOTE: The Rust-side of the bindings must be in control of the lifetimes of the recordings! - all_recordings().insert(blueprint_id, blueprint.clone()); - Ok(PyRecordingStream(blueprint)) } #[pyfunction] fn shutdown() { re_log::debug!("Shutting down the Rerun SDK"); - // Release the GIL in case any flushing behavior needs to cleanup a python object. - py.allow_threads(|| { - for (_, recording) in all_recordings().drain() { - recording.disconnect(); - } - flush_garbage_queue(); - }); } // --- Recordings --- @@ -413,9 +389,6 @@ fn set_global_data_recording( // to zero, which means dropping it, which means flushing it, which potentially means // deallocating python-owned data, which means grabbing the GIL, thus we need to release the // GIL first. - // - // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than - // sorry. py.allow_threads(|| { let rec = RecordingStream::set_global( rerun::StoreKind::Recording, @@ -445,9 +418,6 @@ fn set_thread_local_data_recording( // to zero, which means dropping it, which means flushing it, which potentially means // deallocating python-owned data, which means grabbing the GIL, thus we need to release the // GIL first. - // - // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than - // sorry. py.allow_threads(|| { let rec = RecordingStream::set_thread_local( rerun::StoreKind::Recording, @@ -488,9 +458,6 @@ fn set_global_blueprint_recording( // to zero, which means dropping it, which means flushing it, which potentially means // deallocating python-owned blueprint, which means grabbing the GIL, thus we need to release the // GIL first. - // - // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than - // sorry. py.allow_threads(|| { let rec = RecordingStream::set_global( rerun::StoreKind::Blueprint, @@ -520,9 +487,6 @@ fn set_thread_local_blueprint_recording( // to zero, which means dropping it, which means flushing it, which potentially means // deallocating python-owned blueprint, which means grabbing the GIL, thus we need to release the // GIL first. - // - // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than - // sorry. py.allow_threads(|| { let rec = RecordingStream::set_thread_local( rerun::StoreKind::Blueprint, From 10a69ccc019eff5e1ee11e91904fe352c58f51ad Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 22 Dec 2023 16:19:22 +0100 Subject: [PATCH 05/10] flush garbage anytime we log data --- rerun_py/src/python_bridge.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index dbd6681d9a16..82899f838064 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -902,6 +902,7 @@ fn set_auto_space_views(enabled: bool, blueprint: Option<&PyRecordingStream>) { recording=None, ))] fn log_arrow_msg( + py: Python<'_>, entity_path: &str, components: &PyDict, timeless: bool, @@ -924,6 +925,8 @@ fn log_arrow_msg( recording.record_row(row, !timeless); + py.allow_threads(flush_garbage_queue); + Ok(()) } From 09241e11ecdab76a296f7a1a77259185798b1c3d Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 22 Dec 2023 16:24:06 +0100 Subject: [PATCH 06/10] lints --- Cargo.lock | 21 +++++++-------------- rerun_py/src/python_bridge.rs | 8 ++------ 2 files changed, 9 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2fbe7ead01c..b5e9edf684df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2868,15 +2868,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.12.0" @@ -4239,7 +4230,7 @@ checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" dependencies = [ "bytes", "heck 0.4.1", - "itertools 0.11.0", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -4260,7 +4251,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.32", @@ -5583,6 +5574,7 @@ name = "rerun_py" version = "0.12.0-alpha.1+dev" dependencies = [ "arrow2", + "crossbeam", "document-features", "itertools 0.12.0", "mimalloc", @@ -7418,13 +7410,14 @@ dependencies = [ [[package]] name = "which" -version = "4.4.0" +version = "4.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2441c784c52b289a054b7201fc93253e288f094e2f4be9058343127c4226a269" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" dependencies = [ "either", - "libc", + "home", "once_cell", + "rustix 0.38.24", ] [[package]] diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 82899f838064..7fb0d065239e 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -2,7 +2,7 @@ #![allow(clippy::borrow_deref_ref)] // False positive due to #[pufunction] macro #![allow(unsafe_op_in_unsafe_fn)] // False positive due to #[pufunction] macro -use std::{collections::HashMap, path::PathBuf}; +use std::path::PathBuf; use itertools::Itertools; use pyo3::{ @@ -11,8 +11,6 @@ use pyo3::{ types::{PyBytes, PyDict}, }; -//use re_viewer_context::SpaceViewId; -//use re_viewport::{SpaceViewBlueprint, VIEWPORT_PATH}; use re_viewport::VIEWPORT_PATH; use re_log_types::{DataRow, EntityPathPart, StoreKind}; @@ -37,8 +35,7 @@ use re_ws_comms::RerunServerPort; // --- FFI --- -use once_cell::sync::{Lazy, OnceCell}; -use parking_lot::Mutex; +use once_cell::sync::Lazy; type GarbageChunk = arrow2::chunk::Chunk>; type GarbageSender = crossbeam::channel::Sender; @@ -648,7 +645,6 @@ impl PyMemorySinkStorage { #[cfg(feature = "web_viewer")] #[must_use = "the tokio_runtime guard must be kept alive while using tokio"] fn enter_tokio_runtime() -> tokio::runtime::EnterGuard<'static> { - use once_cell::sync::Lazy; static TOKIO_RUNTIME: Lazy = Lazy::new(|| tokio::runtime::Runtime::new().expect("Failed to create tokio runtime")); TOKIO_RUNTIME.enter() From 6a7bb504893378254e7b14cd79b0eac25f5511e9 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 22 Dec 2023 16:36:03 +0100 Subject: [PATCH 07/10] add stress test --- tests/python/gil_stress/main.py | 51 +++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 tests/python/gil_stress/main.py diff --git a/tests/python/gil_stress/main.py b/tests/python/gil_stress/main.py new file mode 100644 index 000000000000..14e266ed5abd --- /dev/null +++ b/tests/python/gil_stress/main.py @@ -0,0 +1,51 @@ +""" +Stress test for things that tend to GIL deadlock. + +Logs many large recordings that contain a lot of large rows. + +Usage: +``` +python main.py +""" +from __future__ import annotations + +import rerun as rr + +rec = rr.new_recording(application_id="test") + +rec = rr.new_recording(application_id="test") +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test", make_default=True) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test", make_thread_default=True) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test") # this works +rr.set_global_data_recording(rec) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test") # this works +rr.set_thread_local_data_recording(rec) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test", spawn=True) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test") +rr.connect(recording=rec) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test") +rr.memory_recording(recording=rec) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +for _ in range(3): + rec = rr.new_recording(application_id="test", make_default=False, make_thread_default=False) + mem = rec.memory_recording() + rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +for _ in range(3): + rec = rr.new_recording(application_id="test", make_default=False, make_thread_default=False) + rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) From d47a124a8771c87ec423055b2dad4a243aff86b2 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 22 Dec 2023 16:36:09 +0100 Subject: [PATCH 08/10] more lints --- crates/re_log_types/src/arrow_msg.rs | 1 + crates/re_log_types/src/data_table_batcher.rs | 2 +- rerun_py/src/python_bridge.rs | 10 ++++++---- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index b7ed316fddce..20d1ea574397 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -21,6 +21,7 @@ pub struct ArrowChunkReleaseCallback(Arc>) + Send + impl std::ops::Deref for ArrowChunkReleaseCallback { type Target = dyn Fn(Chunk>) + Send + Sync; + #[inline] fn deref(&self) -> &Self::Target { &*self.0 } diff --git a/crates/re_log_types/src/data_table_batcher.rs b/crates/re_log_types/src/data_table_batcher.rs index ebe9f7ced3dd..d5bbe6a00b10 100644 --- a/crates/re_log_types/src/data_table_batcher.rs +++ b/crates/re_log_types/src/data_table_batcher.rs @@ -65,7 +65,7 @@ pub struct DataTableBatcherConfig { /// Unbounded if left unspecified. pub max_tables_in_flight: Option, - /// Callback to be run when an Arrow [`Chunk`] goes out of scope. + /// Callback to be run when an Arrow Chunk` goes out of scope. /// /// See [`crate::ArrowChunkReleaseCallback`] for more information. pub on_release: Option, diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 7fb0d065239e..e27b87c6b23b 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -48,11 +48,11 @@ type GarbageReceiver = crossbeam::channel::Receiver; /// /// This is an issue in this case because running that callback will likely try and grab the GIL, /// which is something that should only happen at very specific times, else we end up with deadlocks, -/// segfaults, aborts... +/// segfaults, aborts… /// /// ## The garbage queue /// -/// When a [`LogMsg`] that was logged from Python gets dropped on the Rust side, it will end up +/// When a [`re_log_types::LogMsg`] that was logged from Python gets dropped on the Rust side, it will end up /// in this queue. /// /// The mere fact that the data still exists in this queue prevents the underlying Arrow refcount @@ -82,8 +82,10 @@ fn flush_garbage_queue() { #[cfg(feature = "web_viewer")] fn global_web_viewer_server( ) -> parking_lot::MutexGuard<'static, Option> { - static WEB_HANDLE: OnceCell>> = - OnceCell::new(); + use once_cell::sync::OnceCell; + static WEB_HANDLE: OnceCell< + parking_lot::Mutex>, + > = OnceCell::new(); WEB_HANDLE.get_or_init(Default::default).lock() } From eaf838324c3bbf8ae3b948d7fc5bef5fcb4c626e Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 22 Dec 2023 16:58:43 +0100 Subject: [PATCH 09/10] Yes, we still need the ALL_RECORDINGS business This reverts commit 29beb4be0165a124a49f3bffc8fd8a949bfda68b. --- rerun_py/src/python_bridge.rs | 43 ++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index e27b87c6b23b..35bd6da07a93 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -2,6 +2,7 @@ #![allow(clippy::borrow_deref_ref)] // False positive due to #[pufunction] macro #![allow(unsafe_op_in_unsafe_fn)] // False positive due to #[pufunction] macro +use std::collections::HashMap; use std::path::PathBuf; use itertools::Itertools; @@ -35,7 +36,19 @@ use re_ws_comms::RerunServerPort; // --- FFI --- -use once_cell::sync::Lazy; +use once_cell::sync::{Lazy, OnceCell}; + +// The bridge needs to have complete control over the lifetimes of the individual recordings, +// otherwise all the recording shutdown machinery (which includes deallocating C, Rust and Python +// data and joining a bunch of threads) can end up running at any time depending on what the +// Python GC is doing, which obviously leads to very bad things :tm:. +// +// TODO(#2116): drop unused recordings +fn all_recordings() -> parking_lot::MutexGuard<'static, HashMap> { + static ALL_RECORDINGS: OnceCell>> = + OnceCell::new(); + ALL_RECORDINGS.get_or_init(Default::default).lock() +} type GarbageChunk = arrow2::chunk::Chunk>; type GarbageSender = crossbeam::channel::Sender; @@ -82,7 +95,6 @@ fn flush_garbage_queue() { #[cfg(feature = "web_viewer")] fn global_web_viewer_server( ) -> parking_lot::MutexGuard<'static, Option> { - use once_cell::sync::OnceCell; static WEB_HANDLE: OnceCell< parking_lot::Mutex>, > = OnceCell::new(); @@ -263,6 +275,9 @@ fn new_recording( ); } + // NOTE: The Rust-side of the bindings must be in control of the lifetimes of the recordings! + all_recordings().insert(recording_id, recording.clone()); + Ok(PyRecordingStream(recording)) } @@ -316,12 +331,22 @@ fn new_blueprint( ); } + // NOTE: The Rust-side of the bindings must be in control of the lifetimes of the recordings! + all_recordings().insert(blueprint_id, blueprint.clone()); + Ok(PyRecordingStream(blueprint)) } #[pyfunction] -fn shutdown() { +fn shutdown(py: Python<'_>) { re_log::debug!("Shutting down the Rerun SDK"); + // Release the GIL in case any flushing behavior needs to cleanup a python object. + py.allow_threads(|| { + for (_, recording) in all_recordings().drain() { + recording.disconnect(); + } + flush_garbage_queue(); + }); } // --- Recordings --- @@ -388,6 +413,9 @@ fn set_global_data_recording( // to zero, which means dropping it, which means flushing it, which potentially means // deallocating python-owned data, which means grabbing the GIL, thus we need to release the // GIL first. + // + // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than + // sorry. py.allow_threads(|| { let rec = RecordingStream::set_global( rerun::StoreKind::Recording, @@ -417,6 +445,9 @@ fn set_thread_local_data_recording( // to zero, which means dropping it, which means flushing it, which potentially means // deallocating python-owned data, which means grabbing the GIL, thus we need to release the // GIL first. + // + // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than + // sorry. py.allow_threads(|| { let rec = RecordingStream::set_thread_local( rerun::StoreKind::Recording, @@ -457,6 +488,9 @@ fn set_global_blueprint_recording( // to zero, which means dropping it, which means flushing it, which potentially means // deallocating python-owned blueprint, which means grabbing the GIL, thus we need to release the // GIL first. + // + // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than + // sorry. py.allow_threads(|| { let rec = RecordingStream::set_global( rerun::StoreKind::Blueprint, @@ -486,6 +520,9 @@ fn set_thread_local_blueprint_recording( // to zero, which means dropping it, which means flushing it, which potentially means // deallocating python-owned blueprint, which means grabbing the GIL, thus we need to release the // GIL first. + // + // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than + // sorry. py.allow_threads(|| { let rec = RecordingStream::set_thread_local( rerun::StoreKind::Blueprint, From 40f8fa545efe7d27848a900ff22ccfe210c2236d Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 2 Jan 2024 08:51:55 +0100 Subject: [PATCH 10/10] get rid of legacy workarounds in BufferedClient --- crates/re_sdk_comms/src/buffered_client.rs | 49 +--------------------- 1 file changed, 1 insertion(+), 48 deletions(-) diff --git a/crates/re_sdk_comms/src/buffered_client.rs b/crates/re_sdk_comms/src/buffered_client.rs index 0778931be75b..326c0d78a583 100644 --- a/crates/re_sdk_comms/src/buffered_client.rs +++ b/crates/re_sdk_comms/src/buffered_client.rs @@ -43,10 +43,8 @@ pub struct Client { flushed_rx: Receiver, encode_quit_tx: Sender, send_quit_tx: Sender, - drop_quit_tx: Sender, encode_join: Option>, send_join: Option>, - drop_join: Option>, /// Only used for diagnostics, not for communication after `new()`. addr: SocketAddr, @@ -65,12 +63,10 @@ impl Client { // TODO(emilk): keep track of how much memory is in each pipe // and apply back-pressure to not use too much RAM. let (msg_tx, msg_rx) = crossbeam::channel::unbounded(); - let (msg_drop_tx, msg_drop_rx) = crossbeam::channel::unbounded(); let (packet_tx, packet_rx) = crossbeam::channel::unbounded(); let (flushed_tx, flushed_rx) = crossbeam::channel::unbounded(); let (encode_quit_tx, encode_quit_rx) = crossbeam::channel::unbounded(); let (send_quit_tx, send_quit_rx) = crossbeam::channel::unbounded(); - let (drop_quit_tx, drop_quit_rx) = crossbeam::channel::unbounded(); // We don't compress the stream because we assume the SDK // and server are on the same machine and compression @@ -80,13 +76,7 @@ impl Client { let encode_join = std::thread::Builder::new() .name("msg_encoder".into()) .spawn(move || { - msg_encode( - encoding_options, - &msg_rx, - &msg_drop_tx, - &encode_quit_rx, - &packet_tx, - ); + msg_encode(encoding_options, &msg_rx, &encode_quit_rx, &packet_tx); }) .expect("Failed to spawn thread"); @@ -97,22 +87,13 @@ impl Client { }) .expect("Failed to spawn thread"); - let drop_join = std::thread::Builder::new() - .name("msg_dropper".into()) - .spawn(move || { - msg_drop(&msg_drop_rx, &drop_quit_rx); - }) - .expect("Failed to spawn thread"); - Self { msg_tx, flushed_rx, encode_quit_tx, send_quit_tx, - drop_quit_tx, encode_join: Some(encode_join), send_join: Some(send_join), - drop_join: Some(drop_join), addr, } } @@ -166,9 +147,7 @@ impl Drop for Client { self.encode_join.take().map(|j| j.join().ok()); // Then the other threads: self.send_quit_tx.send(InterruptMsg::Quit).ok(); - self.drop_quit_tx.send(QuitMsg).ok(); self.send_join.take().map(|j| j.join().ok()); - self.drop_join.take().map(|j| j.join().ok()); re_log::debug!("TCP client has shut down."); } } @@ -182,31 +161,9 @@ impl fmt::Debug for Client { } } -// We drop messages in a separate thread because the PyO3 + Arrow memory model -// means in some cases these messages actually store pointers back to -// python-managed memory. We don't want to block our send-thread waiting for the -// GIL. -fn msg_drop(msg_drop_rx: &Receiver, quit_rx: &Receiver) { - loop { - select! { - recv(msg_drop_rx) -> msg_msg => { - if msg_msg.is_err() { - re_log::trace!("Shutting down msg dropper thread: channel has closed"); - return; - } - } - recv(quit_rx) -> _quit_msg => { - re_log::trace!("Shutting down msg dropper thread: quit message received"); - return; - } - } - } -} - fn msg_encode( encoding_options: re_log_encoding::EncodingOptions, msg_rx: &Receiver, - msg_drop_tx: &Sender, quit_rx: &Receiver, packet_tx: &Sender, ) { @@ -240,10 +197,6 @@ fn msg_encode( return; } } - if msg_drop_tx.send(msg_msg).is_err() { - re_log::error!("Failed to send message to msg_drop thread. Likely a shutdown race-condition"); - return; - } } recv(quit_rx) -> _quit_msg => { re_log::debug!("Shutting down msg_encode thread: quit received");