diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index dbd1615f9d02..14d7df4ed8ba 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -473,7 +473,7 @@ impl Clone for IndexedBucket { } impl IndexedBucket { - fn new(cluster_key: ComponentName, timeline: Timeline) -> Self { + pub(crate) fn new(cluster_key: ComponentName, timeline: Timeline) -> Self { Self { timeline, inner: RwLock::new(IndexedBucketInner::default()), diff --git a/crates/re_arrow_store/src/store_event.rs b/crates/re_arrow_store/src/store_event.rs index 697d190dbddd..8cf6710fe0e8 100644 --- a/crates/re_arrow_store/src/store_event.rs +++ b/crates/re_arrow_store/src/store_event.rs @@ -433,7 +433,8 @@ mod tests { view, ); - view.on_events(&store.gc(&GarbageCollectionOptions::gc_everything()).0); + let events = store.gc(&GarbageCollectionOptions::gc_everything()).0; + view.on_events(&events); similar_asserts::assert_eq!( GlobalCounts::new( diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index 8a912b9545fc..155d0a50f578 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -2,12 +2,16 @@ use std::collections::BTreeMap; use ahash::{HashMap, HashSet}; -use re_log_types::{EntityPath, RowId, TimeInt, TimeRange, Timeline, VecDequeRemovalExt as _}; +use nohash_hasher::IntMap; +use re_log_types::{ + EntityPath, EntityPathHash, RowId, TimeInt, TimePoint, TimeRange, Timeline, + VecDequeRemovalExt as _, +}; use re_types_core::{ComponentName, SizeBytes as _}; use crate::{ store::{ - ClusterCellCache, IndexedBucketInner, IndexedTable, PersistentIndexedTable, + ClusterCellCache, IndexedBucket, IndexedBucketInner, IndexedTable, PersistentIndexedTable, PersistentIndexedTableInner, }, DataStore, DataStoreStats, StoreDiff, StoreDiffKind, StoreEvent, @@ -42,6 +46,11 @@ pub struct GarbageCollectionOptions { /// Components which should not be protected from GC when using `protect_latest` pub dont_protect: HashSet, + + /// Whether to enable batched bucket drops. + /// + /// Disabled by default as it is currently slower in most cases (somehow). + pub enable_batching: bool, } impl GarbageCollectionOptions { @@ -52,6 +61,7 @@ impl GarbageCollectionOptions { protect_latest: 0, purge_empty_tables: true, dont_protect: Default::default(), + enable_batching: false, } } } @@ -97,8 +107,6 @@ impl DataStore { /// component on each timeline. The only practical guarantee this gives is that a latest-at query /// with a value of max-int will be unchanged. However, latest-at queries from other arbitrary /// points in time may provide different results pre- and post- GC. - // - // TODO(#1823): Workload specific optimizations. pub fn gc(&mut self, options: &GarbageCollectionOptions) -> (Vec, DataStoreStats) { re_tracing::profile_function!(); @@ -131,6 +139,7 @@ impl DataStore { ); self.gc_drop_at_least_num_bytes( + options.enable_batching, num_bytes_to_drop, options.gc_timeless, &protected_rows, @@ -146,7 +155,12 @@ impl DataStore { "starting GC" ); - self.gc_drop_at_least_num_bytes(f64::INFINITY, options.gc_timeless, &protected_rows) + self.gc_drop_at_least_num_bytes( + options.enable_batching, + f64::INFINITY, + options.gc_timeless, + &protected_rows, + ) } }; @@ -203,12 +217,9 @@ impl DataStore { /// Tries to drop _at least_ `num_bytes_to_drop` bytes of data from the store. /// /// Returns the list of `RowId`s that were purged from the store. - // - // TODO(jleibs): There are some easy optimizations here if we find GC taking too long: - // - If we know we are clearing almost everything, then we can batch-clear the rows from the - // the tables instead of needing to iterate over every single row incrementally. fn gc_drop_at_least_num_bytes( &mut self, + enable_batching: bool, mut num_bytes_to_drop: f64, include_timeless: bool, protected_rows: &HashSet, @@ -222,22 +233,179 @@ impl DataStore { // 2. Find all tables that potentially hold data associated with that `RowId` // 3. Drop the associated row and account for the space we got back - for (&row_id, (timepoint, entity_path_hash)) in &self.metadata_registry.registry { + let batch_size = (self.config.indexed_bucket_num_rows as usize).saturating_mul(2); + let batch_size = batch_size.clamp(64, 4096); + // let batch_size = 1; + let mut batch: Vec<(TimePoint, (EntityPathHash, RowId))> = Vec::with_capacity(batch_size); + let mut batch_is_protected = false; + + let Self { + cluster_key, + metadata_registry, + cluster_cell_cache, + tables, + timeless_tables, + .. + } = self; + + for (&row_id, (timepoint, entity_path_hash)) in &metadata_registry.registry { + if protected_rows.contains(&row_id) { + batch_is_protected = true; + continue; + } + + batch.push((timepoint.clone(), (*entity_path_hash, row_id))); + if batch.len() < batch_size { + continue; + } + + let dropped = Self::drop_batch( + enable_batching, + tables, + timeless_tables, + cluster_cell_cache, + *cluster_key, + include_timeless, + &mut num_bytes_to_drop, + &batch, + batch_is_protected, + ); + + // Only decrement the metadata size trackers if we're actually certain that we'll drop + // that RowId in the end. + for dropped in dropped { + let metadata_dropped_size_bytes = dropped.row_id.total_size_bytes() + + dropped.timepoint().total_size_bytes() + + dropped.entity_path.hash().total_size_bytes(); + metadata_registry.heap_size_bytes = metadata_registry + .heap_size_bytes + .checked_sub(metadata_dropped_size_bytes) + .unwrap_or_else(|| { + re_log::warn_once!( + "GC metadata_registry size tracker underflowed, this is a bug!" + ); + 0 + }); + num_bytes_to_drop -= metadata_dropped_size_bytes as f64; + + diffs.push(dropped); + } + if num_bytes_to_drop <= 0.0 { break; } - if protected_rows.contains(&row_id) { - continue; + batch.clear(); + batch_is_protected = false; + } + + // Handle leftovers. + { + let dropped = Self::drop_batch( + enable_batching, + tables, + timeless_tables, + cluster_cell_cache, + *cluster_key, + include_timeless, + &mut num_bytes_to_drop, + &batch, + batch_is_protected, + ); + + // Only decrement the metadata size trackers if we're actually certain that we'll drop + // that RowId in the end. + for dropped in dropped { + let metadata_dropped_size_bytes = dropped.row_id.total_size_bytes() + + dropped.timepoint().total_size_bytes() + + dropped.entity_path.hash().total_size_bytes(); + metadata_registry.heap_size_bytes = metadata_registry + .heap_size_bytes + .checked_sub(metadata_dropped_size_bytes) + .unwrap_or_else(|| { + re_log::warn_once!( + "GC metadata_registry size tracker underflowed, this is a bug!" + ); + 0 + }); + num_bytes_to_drop -= metadata_dropped_size_bytes as f64; + + diffs.push(dropped); + } + } + + // Purge the removed rows from the metadata_registry. + // This is safe because the entire GC process is driven by RowId-order. + for diff in &diffs { + metadata_registry.remove(&diff.row_id); + } + + diffs + } + + #[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)] + fn drop_batch( + enable_batching: bool, + tables: &mut BTreeMap<(EntityPathHash, Timeline), IndexedTable>, + timeless_tables: &mut IntMap, + cluster_cell_cache: &ClusterCellCache, + cluster_key: ComponentName, + include_timeless: bool, + num_bytes_to_drop: &mut f64, + batch: &[(TimePoint, (EntityPathHash, RowId))], + batch_is_protected: bool, + ) -> Vec { + let mut diffs = Vec::new(); + + // NOTE: The batch is already sorted by definition since it's extracted from the registry's btreemap. + let max_row_id = batch.last().map(|(_, (_, row_id))| *row_id); + + if enable_batching && max_row_id.is_some() && !batch_is_protected { + // NOTE: unwrap cannot fail but just a precaution in case this code moves around… + let max_row_id = max_row_id.unwrap_or(RowId::ZERO); + + let mut batch_removed: HashMap = HashMap::default(); + let mut cur_entity_path_hash = None; + + // NOTE: We _must_ go through all tables no matter what, since the batch might contain + // any number of distinct entities. + for ((entity_path_hash, _), table) in &mut *tables { + let (removed, num_bytes_removed) = + table.try_drop_bucket(cluster_cell_cache, cluster_key, max_row_id); + + *num_bytes_to_drop -= num_bytes_removed as f64; + + if cur_entity_path_hash != Some(*entity_path_hash) { + diffs.extend(batch_removed.drain().map(|(_, diff)| diff)); + + cur_entity_path_hash = Some(*entity_path_hash); + } + + for mut removed in removed { + batch_removed + .entry(removed.row_id) + .and_modify(|diff| { + diff.times.extend(std::mem::take(&mut removed.times)); + }) + .or_insert(removed); + } } + diffs.extend(batch_removed.drain().map(|(_, diff)| diff)); + } + + if *num_bytes_to_drop <= 0.0 { + return diffs; + } + + for (timepoint, (entity_path_hash, row_id)) in batch { let mut diff: Option = None; // find all tables that could possibly contain this `RowId` for (&timeline, &time) in timepoint { - if let Some(table) = self.tables.get_mut(&(timeline, *entity_path_hash)) { + if let Some(table) = tables.get_mut(&(*entity_path_hash, timeline)) { let (removed, num_bytes_removed) = - table.try_drop_row(&self.cluster_cell_cache, row_id, time.as_i64()); + table.try_drop_row(cluster_cell_cache, *row_id, time.as_i64()); if let Some(inner) = diff.as_mut() { if let Some(removed) = removed { inner.times.extend(removed.times); @@ -245,17 +413,17 @@ impl DataStore { } else { diff = removed; } - num_bytes_to_drop -= num_bytes_removed as f64; + *num_bytes_to_drop -= num_bytes_removed as f64; } } // TODO(jleibs): This is a worst-case removal-order. Would be nice to collect all the rows // first and then remove them in one pass. if timepoint.is_timeless() && include_timeless { - for table in self.timeless_tables.values_mut() { + for table in timeless_tables.values_mut() { // let deleted_comps = deleted.timeless.entry(ent_path.clone()_hash).or_default(); let (removed, num_bytes_removed) = - table.try_drop_row(&self.cluster_cell_cache, row_id); + table.try_drop_row(cluster_cell_cache, *row_id); if let Some(inner) = diff.as_mut() { if let Some(removed) = removed { inner.times.extend(removed.times); @@ -263,36 +431,15 @@ impl DataStore { } else { diff = removed; } - num_bytes_to_drop -= num_bytes_removed as f64; + *num_bytes_to_drop -= num_bytes_removed as f64; } } - // Only decrement the metadata size trackers if we're actually certain that we'll drop - // that RowId in the end. - if diff.is_some() { - let metadata_dropped_size_bytes = row_id.total_size_bytes() - + timepoint.total_size_bytes() - + entity_path_hash.total_size_bytes(); - self.metadata_registry.heap_size_bytes = self - .metadata_registry - .heap_size_bytes - .checked_sub(metadata_dropped_size_bytes) - .unwrap_or_else(|| { - re_log::warn_once!( - "GC metadata_registry size tracker underflowed, this is a bug!" - ); - 0 - }); - num_bytes_to_drop -= metadata_dropped_size_bytes as f64; - } - diffs.extend(diff); - } - // Purge the removed rows from the metadata_registry. - // This is safe because the entire GC process is driven by RowId-order. - for diff in &diffs { - self.metadata_registry.remove(&diff.row_id); + if *num_bytes_to_drop <= 0.0 { + break; + } } diffs @@ -444,7 +591,7 @@ impl DataStore { }); // Drop any empty temporal tables that aren't backed by a timeless table - self.tables.retain(|(_, entity), table| { + self.tables.retain(|(entity, _), table| { // If the timeless table still exists, this table might be storing empty values // that hide the timeless values, so keep it around. if self.timeless_tables.contains_key(entity) { @@ -490,11 +637,120 @@ impl DataStore { false }); + // TODO(cmc): Hmm, this is dropping buckets but doesn't seem to handle the case where all + // buckets are removed (which is an illegal state). + // Doesn't seem to handle the case where the only bucket left isn't indexed at -inf either. + diffs.into_values() } } impl IndexedTable { + /// Try to drop an entire bucket at once if it doesn't contain any `RowId` greater than `max_row_id`. + fn try_drop_bucket( + &mut self, + cluster_cache: &ClusterCellCache, + cluster_key: ComponentName, + max_row_id: RowId, + ) -> (Vec, u64) { + re_tracing::profile_function!(); + + let ent_path = self.ent_path.clone(); + let timeline = self.timeline; + + let mut diffs: Vec = Vec::new(); + let mut dropped_num_bytes = 0u64; + let mut dropped_num_rows = 0u64; + + let mut dropped_bucket_times = Vec::new(); + + // TODO(cmc): scaling linearly with the number of buckets could be improved, although this + // is quite fast in practice because of the early check. + for (bucket_time, bucket) in &self.buckets { + let inner = &mut *bucket.inner.write(); + + if inner.col_time.is_empty() || max_row_id < inner.max_row_id { + continue; + } + + let IndexedBucketInner { + mut col_time, + mut col_row_id, + mut columns, + size_bytes, + .. + } = std::mem::take(inner); + + dropped_bucket_times.push(*bucket_time); + + while let Some(row_id) = col_row_id.pop_front() { + let mut diff = StoreDiff::deletion(row_id, ent_path.clone()); + + if let Some(time) = col_time.pop_front() { + diff.times.push((timeline, time.into())); + } + + for (component_name, column) in &mut columns { + if let Some(cell) = column.pop_front().flatten() { + if cell.component_name() == cluster_key { + if let Some(cached_cell) = cluster_cache.get(&cell.num_instances()) { + if std::ptr::eq(cell.as_ptr(), cached_cell.as_ptr()) { + // We don't fire events when inserting autogenerated cluster cells, and + // therefore must not fire when removing them either. + continue; + } + } + } + + diff.cells.insert(*component_name, cell); + } + } + + diffs.push(diff); + } + + dropped_num_bytes += size_bytes; + dropped_num_rows += col_time.len() as u64; + } + + for bucket_time in dropped_bucket_times { + let previous = self.buckets.remove(&bucket_time); + debug_assert!(previous.is_some()); + } + + if self.buckets.is_empty() { + let Self { + timeline, + ent_path: _, + cluster_key, + buckets, + all_components: _, // keep the history on purpose + buckets_num_rows, + buckets_size_bytes, + } = self; + + let bucket = IndexedBucket::new(*cluster_key, *timeline); + let size_bytes = bucket.total_size_bytes(); + + *buckets = [(i64::MIN.into(), bucket)].into(); + *buckets_num_rows = 0; + *buckets_size_bytes = size_bytes; + + return (diffs, dropped_num_bytes); + } + + // NOTE: Make sure the first bucket is responsible for `-∞`, which might or might not be + // the case now that we've been moving buckets around. + if let Some((_, bucket)) = self.buckets.pop_first() { + self.buckets.insert(TimeInt::MIN, bucket); + } + + self.buckets_num_rows -= dropped_num_rows; + self.buckets_size_bytes -= dropped_num_bytes; + + (diffs, dropped_num_bytes) + } + /// Tries to drop the given `row_id` from the table, which is expected to be found at the /// specified `time`. /// @@ -540,12 +796,10 @@ impl IndexedTable { dropped_num_bytes = bucket_num_bytes; self.buckets.remove(&bucket_key); - // NOTE: If this is the first bucket of the table that we've just removed, we need the - // next one to become responsible for `-∞`. - if bucket_key == TimeInt::MIN { - if let Some((_, bucket)) = self.buckets.pop_first() { - self.buckets.insert(TimeInt::MIN, bucket); - } + // NOTE: Make sure the first bucket is responsible for `-∞`, which might or might not be + // the case now that we've been moving buckets around. + if let Some((_, bucket)) = self.buckets.pop_first() { + self.buckets.insert(TimeInt::MIN, bucket); } }