Skip to content

Commit

Permalink
GC improvements 4: index EntityPathHashes in metadata registry (#4398)
Browse files Browse the repository at this point in the history
Indexes `EntityPathHash`es alongside `TimePoint`s in the metadata
registry to avoid having to run fullscans during garbage collection.

Yields some more significant wins in the common case.

### Benchmarks

Compared to `main`:
```
group                                                     gc_improvements_0                       gc_improvements_4
-----                                                     -----------------                       -----------------
.../plotting_dashboard/drop_at_least=0.3/bucketsz=1024    10.32  1084.0±4.47ms 54.1 KElem/sec     1.00    105.0±0.91ms 558.1 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=2048    19.80      2.1±0.02s 27.6 KElem/sec     1.00    107.3±0.83ms 546.2 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=256     4.38    465.8±2.50ms 125.8 KElem/sec    1.00    106.3±0.74ms 551.3 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=512     6.16    655.3±2.61ms 89.4 KElem/sec     1.00    106.4±0.94ms 550.6 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/default          6.34    652.8±4.12ms 89.8 KElem/sec     1.00    102.9±0.75ms 569.4 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=1024         37.12      2.4±0.05s 24.2 KElem/sec     1.00     65.3±0.81ms 897.6 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=2048         37.54      2.4±0.03s 24.1 KElem/sec     1.00     64.9±1.07ms 903.2 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=256          38.81      2.5±0.08s 23.5 KElem/sec     1.00     64.4±0.99ms 910.2 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=512          37.00      2.4±0.02s 24.5 KElem/sec     1.00     64.6±1.08ms 906.9 KElem/sec
.../timeless_logs/drop_at_least=0.3/default               36.82      2.4±0.03s 24.4 KElem/sec     1.00     65.3±1.29ms 897.3 KElem/sec
```

Compared to previous PR:
```
group                                                     gc_improvements_3                        gc_improvements_4
-----                                                     -----------------                        -----------------
.../plotting_dashboard/drop_at_least=0.3/bucketsz=1024    2.30    241.0±1.66ms 243.1 KElem/sec     1.00    105.0±0.91ms 558.1 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=2048    2.24    239.9±2.70ms 244.3 KElem/sec     1.00    107.3±0.83ms 546.2 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=256     2.33    247.4±3.94ms 236.8 KElem/sec     1.00    106.3±0.74ms 551.3 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=512     2.27    241.2±2.06ms 243.0 KElem/sec     1.00    106.4±0.94ms 550.6 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/default          2.33    239.6±1.98ms 244.6 KElem/sec     1.00    102.9±0.75ms 569.4 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=1024         1.00     60.3±1.16ms 972.3 KElem/sec     1.08     65.3±0.81ms 897.6 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=2048         1.00     60.8±1.14ms 964.3 KElem/sec     1.07     64.9±1.07ms 903.2 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=256          1.00     61.0±1.99ms 960.9 KElem/sec     1.06     64.4±0.99ms 910.2 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=512          1.00     60.6±1.45ms 966.9 KElem/sec     1.07     64.6±1.08ms 906.9 KElem/sec
.../timeless_logs/drop_at_least=0.3/default               1.00     57.6±0.35ms 1018.1 KElem/sec    1.13     65.3±1.29ms 897.3 KElem/sec
```

---

Part of the GC improvements series:
- #4394
- #4395
- #4396
- #4397
- #4398
- #4399
- #4400
- #4401
  • Loading branch information
teh-cmc authored Dec 2, 2023
1 parent 0d81047 commit 1ce4563
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 35 deletions.
6 changes: 2 additions & 4 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub struct MetadataRegistry<T: Clone> {
pub heap_size_bytes: u64,
}

impl Default for MetadataRegistry<TimePoint> {
impl Default for MetadataRegistry<(TimePoint, EntityPathHash)> {
fn default() -> Self {
let mut this = Self {
registry: Default::default(),
Expand Down Expand Up @@ -201,9 +201,7 @@ pub struct DataStore {
pub(crate) type_registry: DataTypeRegistry,

/// Keeps track of arbitrary per-row metadata.
///
/// Only used to map `RowId`s to their original [`TimePoint`]s at the moment.
pub(crate) metadata_registry: MetadataRegistry<TimePoint>,
pub(crate) metadata_registry: MetadataRegistry<(TimePoint, EntityPathHash)>,

/// Used to cache auto-generated cluster cells (`[0]`, `[0, 1]`, `[0, 1, 2]`, …)
/// so that they can be properly deduplicated on insertion.
Expand Down
19 changes: 9 additions & 10 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,6 @@ impl DataStore {
/// Returns the list of `RowId`s that were purged from the store.
//
// TODO(jleibs): There are some easy optimizations here if we find GC taking too long:
// - If we stored the entity_path_hash along with timepoints in the metadata_registry we could jump
// directly to the relevant tables instead of needing to iterate over all tables.
// - If we know we are clearing almost everything, then we can batch-clear the rows from the
// the tables instead of needing to iterate over every single row incrementally.
fn gc_drop_at_least_num_bytes(
Expand All @@ -224,22 +222,22 @@ impl DataStore {
// 2. Find all tables that potentially hold data associated with that `RowId`
// 3. Drop the associated row and account for the space we got back

for (row_id, timepoint) in &self.metadata_registry.registry {
for (&row_id, (timepoint, entity_path_hash)) in &self.metadata_registry.registry {
if num_bytes_to_drop <= 0.0 {
break;
}

if protected_rows.contains(row_id) {
if protected_rows.contains(&row_id) {
continue;
}

let mut diff: Option<StoreDiff> = None;

// find all tables that could possibly contain this `RowId`
for ((timeline, _), table) in &mut self.tables {
if let Some(time) = timepoint.get(timeline) {
for (&timeline, &time) in timepoint {
if let Some(table) = self.tables.get_mut(&(timeline, *entity_path_hash)) {
let (removed, num_bytes_removed) =
table.try_drop_row(&self.cluster_cell_cache, *row_id, time.as_i64());
table.try_drop_row(&self.cluster_cell_cache, row_id, time.as_i64());
if let Some(inner) = diff.as_mut() {
if let Some(removed) = removed {
diff = inner.union(&removed);
Expand All @@ -257,7 +255,7 @@ impl DataStore {
for table in self.timeless_tables.values_mut() {
// let deleted_comps = deleted.timeless.entry(ent_path.clone()_hash).or_default();
let (removed, num_bytes_removed) =
table.try_drop_row(&self.cluster_cell_cache, *row_id);
table.try_drop_row(&self.cluster_cell_cache, row_id);
if let Some(inner) = diff.as_mut() {
if let Some(removed) = removed {
diff = inner.union(&removed);
Expand All @@ -272,8 +270,9 @@ impl DataStore {
// Only decrement the metadata size trackers if we're actually certain that we'll drop
// that RowId in the end.
if diff.is_some() {
let metadata_dropped_size_bytes =
row_id.total_size_bytes() + timepoint.total_size_bytes();
let metadata_dropped_size_bytes = row_id.total_size_bytes()
+ timepoint.total_size_bytes()
+ entity_path_hash.total_size_bytes();
self.metadata_registry.heap_size_bytes = self
.metadata_registry
.heap_size_bytes
Expand Down
9 changes: 5 additions & 4 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::{collections::VecDeque, ops::RangeBounds, sync::atomic::Ordering};

use itertools::Itertools;
use re_log::trace;
use re_log_types::{DataCell, EntityPath, RowId, TimeInt, TimePoint, TimeRange, Timeline};
use re_log_types::{
DataCell, EntityPath, EntityPathHash, RowId, TimeInt, TimePoint, TimeRange, Timeline,
};
use re_types_core::{ComponentName, ComponentNameSet};

use crate::{
Expand Down Expand Up @@ -495,9 +497,8 @@ impl DataStore {
}
}

pub fn get_msg_metadata(&self, row_id: &RowId) -> Option<&TimePoint> {
re_tracing::profile_function!();

#[inline]
pub fn get_msg_metadata(&self, row_id: &RowId) -> Option<&(TimePoint, EntityPathHash)> {
self.metadata_registry.get(row_id)
}

Expand Down
4 changes: 2 additions & 2 deletions crates/re_arrow_store/src/store_stats.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use nohash_hasher::IntMap;
use re_log_types::{TimePoint, TimeRange};
use re_log_types::{EntityPathHash, TimePoint, TimeRange};
use re_types_core::{ComponentName, SizeBytes};

use crate::{
Expand Down Expand Up @@ -183,7 +183,7 @@ impl SizeBytes for DataTypeRegistry {
}
}

impl SizeBytes for MetadataRegistry<TimePoint> {
impl SizeBytes for MetadataRegistry<(TimePoint, EntityPathHash)> {
#[inline]
fn heap_size_bytes(&self) -> u64 {
self.heap_size_bytes
Expand Down
29 changes: 16 additions & 13 deletions crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use parking_lot::RwLock;

use re_log::{debug, trace};
use re_log_types::{
DataCell, DataCellColumn, DataCellError, DataRow, RowId, TimeInt, TimePoint, TimeRange,
VecDequeRemovalExt as _,
DataCell, DataCellColumn, DataCellError, DataRow, EntityPathHash, RowId, TimeInt, TimePoint,
TimeRange, VecDequeRemovalExt as _,
};
use re_types_core::{
components::InstanceKey, ComponentName, ComponentNameSet, Loggable, SizeBytes as _,
Expand Down Expand Up @@ -72,12 +72,13 @@ impl DataStore {
let DataRow {
row_id,
timepoint,
entity_path: ent_path,
entity_path,
num_instances,
cells,
} = row;

self.metadata_registry.upsert(*row_id, timepoint.clone())?;
self.metadata_registry
.upsert(*row_id, (timepoint.clone(), entity_path.hash()))?;

re_tracing::profile_function!();

Expand Down Expand Up @@ -113,7 +114,7 @@ impl DataStore {
}
}

let ent_path_hash = ent_path.hash();
let ent_path_hash = entity_path.hash();
let num_instances = *num_instances;

trace!(
Expand All @@ -123,7 +124,7 @@ impl DataStore {
timelines = ?timepoint.iter()
.map(|(timeline, time)| (timeline.name(), timeline.typ().format_utc(*time)))
.collect::<Vec<_>>(),
entity = %ent_path,
entity = %entity_path,
components = ?cells.iter().map(|cell| cell.component_name()).collect_vec(),
"insertion started…"
);
Expand Down Expand Up @@ -165,12 +166,14 @@ impl DataStore {
let index = self
.timeless_tables
.entry(ent_path_hash)
.or_insert_with(|| PersistentIndexedTable::new(self.cluster_key, ent_path.clone()));
.or_insert_with(|| {
PersistentIndexedTable::new(self.cluster_key, entity_path.clone())
});

index.insert_row(insert_id, generated_cluster_cell.clone(), row);
} else {
for (timeline, time) in timepoint.iter() {
let ent_path = ent_path.clone(); // shallow
let ent_path = entity_path.clone(); // shallow
let index = self
.tables
.entry((*timeline, ent_path_hash))
Expand All @@ -186,7 +189,7 @@ impl DataStore {
}
}

let diff = StoreDiff::addition(*row_id, ent_path.clone())
let diff = StoreDiff::addition(*row_id, entity_path.clone())
.at_timepoint(timepoint.clone())
.with_cells(cells.iter().cloned());

Expand Down Expand Up @@ -249,16 +252,16 @@ impl DataStore {
}
}

impl MetadataRegistry<TimePoint> {
fn upsert(&mut self, row_id: RowId, timepoint: TimePoint) -> WriteResult<()> {
impl MetadataRegistry<(TimePoint, EntityPathHash)> {
fn upsert(&mut self, row_id: RowId, data: (TimePoint, EntityPathHash)) -> WriteResult<()> {
match self.entry(row_id) {
std::collections::btree_map::Entry::Occupied(_) => Err(WriteError::ReusedRowId(row_id)),
std::collections::btree_map::Entry::Vacant(entry) => {
// NOTE: In a map, thus on the heap!
let added_size_bytes = row_id.total_size_bytes() + timepoint.total_size_bytes();
let added_size_bytes = row_id.total_size_bytes() + data.total_size_bytes();

// This is valuable information even for a timeless timepoint!
entry.insert(timepoint);
entry.insert(data);

self.heap_size_bytes += added_size_bytes;

Expand Down
7 changes: 7 additions & 0 deletions crates/re_log_types/src/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ use std::hash::BuildHasher;
#[derive(Copy, Clone, Eq, PartialOrd, Ord)]
pub struct Hash64(u64);

impl re_types_core::SizeBytes for Hash64 {
#[inline]
fn heap_size_bytes(&self) -> u64 {
0
}
}

impl Hash64 {
pub const ZERO: Hash64 = Hash64(0);

Expand Down
7 changes: 7 additions & 0 deletions crates/re_log_types/src/path/entity_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ use crate::{hash::Hash64, path::entity_path_impl::EntityPathImpl, EntityPathPart
#[derive(Copy, Clone, Eq, PartialOrd, Ord)]
pub struct EntityPathHash(Hash64);

impl re_types_core::SizeBytes for EntityPathHash {
#[inline]
fn heap_size_bytes(&self) -> u64 {
self.0.heap_size_bytes()
}
}

impl EntityPathHash {
/// Sometimes used as the hash of `None`.
pub const NONE: EntityPathHash = EntityPathHash(Hash64::ZERO);
Expand Down
4 changes: 2 additions & 2 deletions crates/re_space_view_text_log/src/space_view_class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,10 @@ impl ViewTextFilters {
// ---

fn get_time_point(ctx: &ViewerContext<'_>, entry: &Entry) -> Option<TimePoint> {
if let Some(time_point) = ctx.store_db.store().get_msg_metadata(&entry.row_id) {
if let Some((time_point, _)) = ctx.store_db.store().get_msg_metadata(&entry.row_id) {
Some(time_point.clone())
} else {
re_log::warn_once!("Missing meta-data for {:?}", entry.entity_path);
re_log::warn_once!("Missing metadata for {:?}", entry.entity_path);
None
}
}
Expand Down
39 changes: 39 additions & 0 deletions crates/re_types_core/src/size_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,45 @@ macro_rules! impl_size_bytes_pod {

impl_size_bytes_pod!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, bool, f32, f64);

impl<T, U> SizeBytes for (T, U)
where
T: SizeBytes,
U: SizeBytes,
{
#[inline]
fn heap_size_bytes(&self) -> u64 {
let (a, b) = self;
a.heap_size_bytes() + b.heap_size_bytes()
}
}

impl<T, U, V> SizeBytes for (T, U, V)
where
T: SizeBytes,
U: SizeBytes,
V: SizeBytes,
{
#[inline]
fn heap_size_bytes(&self) -> u64 {
let (a, b, c) = self;
a.heap_size_bytes() + b.heap_size_bytes() + c.heap_size_bytes()
}
}

impl<T, U, V, W> SizeBytes for (T, U, V, W)
where
T: SizeBytes,
U: SizeBytes,
V: SizeBytes,
W: SizeBytes,
{
#[inline]
fn heap_size_bytes(&self) -> u64 {
let (a, b, c, d) = self;
a.heap_size_bytes() + b.heap_size_bytes() + c.heap_size_bytes() + d.heap_size_bytes()
}
}

// --- Arrow ---

impl SizeBytes for DataType {
Expand Down

0 comments on commit 1ce4563

Please sign in to comment.