diff --git a/crates/re_arrow_store/benches/data_store.rs b/crates/re_arrow_store/benches/data_store.rs index 6717efccb155..e2ff39e483f1 100644 --- a/crates/re_arrow_store/benches/data_store.rs +++ b/crates/re_arrow_store/benches/data_store.rs @@ -291,6 +291,7 @@ fn gc(c: &mut Criterion) { protect_latest: 0, purge_empty_tables: false, dont_protect: Default::default(), + enable_batching: false, }); stats_diff }); @@ -315,6 +316,7 @@ fn gc(c: &mut Criterion) { protect_latest: 0, purge_empty_tables: false, dont_protect: Default::default(), + enable_batching: false, }); stats_diff }); diff --git a/crates/re_arrow_store/benches/gc.rs b/crates/re_arrow_store/benches/gc.rs index 8504602aefb9..7f88804e8ed8 100644 --- a/crates/re_arrow_store/benches/gc.rs +++ b/crates/re_arrow_store/benches/gc.rs @@ -33,6 +33,17 @@ mod constants { use constants::{NUM_ENTITY_PATHS, NUM_ROWS_PER_ENTITY_PATH}; +fn gc_batching() -> &'static [bool] { + #[cfg(feature = "core_benchmarks_only")] + { + &[false] + } + #[cfg(not(feature = "core_benchmarks_only"))] + { + &[false, true] + } +} + fn num_rows_per_bucket() -> &'static [u64] { #[cfg(feature = "core_benchmarks_only")] { @@ -63,8 +74,10 @@ fn plotting_dashboard(c: &mut Criterion) { protect_latest: 1, purge_empty_tables: false, dont_protect: Default::default(), + enable_batching: false, }; + // NOTE: insert in multiple timelines to more closely match real world scenarios. let mut timegen = |i| { [ build_log_time(Time::from_seconds_since_epoch(i as _)), @@ -98,26 +111,37 @@ fn plotting_dashboard(c: &mut Criterion) { // Emulate more or less bucket for &num_rows_per_bucket in num_rows_per_bucket() { - group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| { - let store = build_store( - DataStoreConfig { - indexed_bucket_num_rows: num_rows_per_bucket, - ..Default::default() + for &gc_batching in gc_batching() { + group.bench_function( + if gc_batching { + format!("bucketsz={num_rows_per_bucket}/gc_batching=true") + } else { + format!("bucketsz={num_rows_per_bucket}") }, - InstanceKey::name(), - false, - &mut timegen, - &mut datagen, - ); - b.iter_batched( - || store.clone(), - |mut store| { - let (_, stats_diff) = store.gc(&gc_settings); - stats_diff + |b| { + let store = build_store( + DataStoreConfig { + indexed_bucket_num_rows: num_rows_per_bucket, + ..Default::default() + }, + InstanceKey::name(), + false, + &mut timegen, + &mut datagen, + ); + let mut gc_settings = gc_settings.clone(); + gc_settings.enable_batching = gc_batching; + b.iter_batched( + || store.clone(), + |mut store| { + let (_, stats_diff) = store.gc(&gc_settings); + stats_diff + }, + BatchSize::LargeInput, + ); }, - BatchSize::LargeInput, ); - }); + } } } @@ -138,6 +162,7 @@ fn timeless_logs(c: &mut Criterion) { protect_latest: 1, purge_empty_tables: false, dont_protect: Default::default(), + enable_batching: false, }; let mut timegen = |_| TimePoint::timeless(); @@ -165,28 +190,38 @@ fn timeless_logs(c: &mut Criterion) { ); }); - // Emulate more or less bucket for &num_rows_per_bucket in num_rows_per_bucket() { - group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| { - let store = build_store( - DataStoreConfig { - indexed_bucket_num_rows: num_rows_per_bucket, - ..Default::default() + for &gc_batching in gc_batching() { + group.bench_function( + if gc_batching { + format!("bucketsz={num_rows_per_bucket}/gc_batching=true") + } else { + format!("bucketsz={num_rows_per_bucket}") }, - InstanceKey::name(), - false, - &mut timegen, - &mut datagen, - ); - b.iter_batched( - || store.clone(), - |mut store| { - let (_, stats_diff) = store.gc(&gc_settings); - stats_diff + |b| { + let store = build_store( + DataStoreConfig { + indexed_bucket_num_rows: num_rows_per_bucket, + ..Default::default() + }, + InstanceKey::name(), + false, + &mut timegen, + &mut datagen, + ); + let mut gc_settings = gc_settings.clone(); + gc_settings.enable_batching = gc_batching; + b.iter_batched( + || store.clone(), + |mut store| { + let (_, stats_diff) = store.gc(&gc_settings); + stats_diff + }, + BatchSize::LargeInput, + ); }, - BatchSize::LargeInput, ); - }); + } } } @@ -241,7 +276,6 @@ where (0..NUM_ROWS_PER_ENTITY_PATH).map(move |i| { DataRow::from_component_batches( RowId::random(), - // NOTE: insert in multiple timelines to more closely match real world scenarios. timegen(i), entity_path.clone(), datagen(i) diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index 3880f59d4e88..cd4583a8e055 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -1,7 +1,6 @@ use std::collections::{BTreeMap, VecDeque}; use std::sync::atomic::AtomicU64; -use ahash::HashMap; use arrow2::datatypes::DataType; use nohash_hasher::IntMap; use parking_lot::RwLock; @@ -210,7 +209,7 @@ pub struct DataStore { /// All temporal [`IndexedTable`]s for all entities on all timelines. /// /// See also [`Self::timeless_tables`]. - pub(crate) tables: HashMap<(Timeline, EntityPathHash), IndexedTable>, + pub(crate) tables: BTreeMap<(EntityPathHash, Timeline), IndexedTable>, /// All timeless indexed tables for all entities. Never garbage collected. /// @@ -335,9 +334,9 @@ impl DataStore { /// Do _not_ use this to try and assert the internal state of the datastore. pub fn iter_indices( &self, - ) -> impl ExactSizeIterator { - self.tables.iter().map(|((timeline, _), table)| { - ((*timeline, table.ent_path.clone() /* shallow */), table) + ) -> impl ExactSizeIterator { + self.tables.iter().map(|((_, timeline), table)| { + ((table.ent_path.clone() /* shallow */, *timeline), table) }) } } @@ -439,13 +438,44 @@ impl IndexedTable { Self { timeline, ent_path, - buckets: [(i64::MIN.into(), bucket)].into(), + buckets: [(TimeInt::MIN, bucket)].into(), cluster_key, all_components: Default::default(), buckets_num_rows: 0, buckets_size_bytes, } } + + /// Makes sure bucketing invariants are upheld, and takes necessary actions if not. + /// + /// Invariants are: + /// 1. There must always be at least one bucket alive. + /// 2. The first bucket must always have an _indexing time_ `-∞`. + pub(crate) fn uphold_indexing_invariants(&mut self) { + if self.buckets.is_empty() { + let Self { + timeline, + ent_path: _, + cluster_key, + buckets, + all_components: _, // keep the history on purpose + buckets_num_rows, + buckets_size_bytes, + } = self; + + let bucket = IndexedBucket::new(*cluster_key, *timeline); + let size_bytes = bucket.total_size_bytes(); + + *buckets = [(TimeInt::MIN, bucket)].into(); + *buckets_num_rows = 0; + *buckets_size_bytes = size_bytes; + } + // NOTE: Make sure the first bucket is responsible for `-∞`, which might or might not be + // the case now if we've been moving buckets around. + else if let Some((_, bucket)) = self.buckets.pop_first() { + self.buckets.insert(TimeInt::MIN, bucket); + } + } } /// An `IndexedBucket` holds a chunk of rows from an [`IndexedTable`] @@ -474,7 +504,7 @@ impl Clone for IndexedBucket { } impl IndexedBucket { - fn new(cluster_key: ComponentName, timeline: Timeline) -> Self { + pub(crate) fn new(cluster_key: ComponentName, timeline: Timeline) -> Self { Self { timeline, inner: RwLock::new(IndexedBucketInner::default()), @@ -510,6 +540,13 @@ pub struct IndexedBucketInner { /// Keeps track of the unique identifier for each row that was generated by the clients. pub col_row_id: RowIdVec, + /// Keeps track of the latest/newest [`RowId`] present in this bucket. + /// + /// Useful to batch GC buckets. + /// + /// `RowId::ZERO` for empty buckets. + pub max_row_id: RowId, + /// The entire column of `num_instances`. /// /// Keeps track of the expected number of instances in each row. @@ -539,6 +576,7 @@ impl Default for IndexedBucketInner { col_time: Default::default(), col_insert_id: Default::default(), col_row_id: Default::default(), + max_row_id: RowId::ZERO, col_num_instances: Default::default(), columns: Default::default(), size_bytes: 0, // NOTE: computed below diff --git a/crates/re_arrow_store/src/store_arrow.rs b/crates/re_arrow_store/src/store_arrow.rs index 8148ede60b54..77e97d135dcc 100644 --- a/crates/re_arrow_store/src/store_arrow.rs +++ b/crates/re_arrow_store/src/store_arrow.rs @@ -36,6 +36,7 @@ impl IndexedBucket { col_time, col_insert_id, col_row_id, + max_row_id: _, col_num_instances, columns, size_bytes: _, diff --git a/crates/re_arrow_store/src/store_dump.rs b/crates/re_arrow_store/src/store_dump.rs index 6f8bc31461dc..c08b1e1b385c 100644 --- a/crates/re_arrow_store/src/store_dump.rs +++ b/crates/re_arrow_store/src/store_dump.rs @@ -118,6 +118,7 @@ impl DataStore { col_time, col_insert_id: _, col_row_id, + max_row_id: _, col_num_instances, columns, size_bytes: _, @@ -169,6 +170,7 @@ impl DataStore { col_time, col_insert_id: _, col_row_id, + max_row_id: _, col_num_instances, columns, size_bytes: _, diff --git a/crates/re_arrow_store/src/store_event.rs b/crates/re_arrow_store/src/store_event.rs index 0a9f02ace8c7..5a4002c83f82 100644 --- a/crates/re_arrow_store/src/store_event.rs +++ b/crates/re_arrow_store/src/store_event.rs @@ -437,7 +437,8 @@ mod tests { view, ); - view.on_events(&store.gc(&GarbageCollectionOptions::gc_everything()).0); + let events = store.gc(&GarbageCollectionOptions::gc_everything()).0; + view.on_events(&events); similar_asserts::assert_eq!( GlobalCounts::new( diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index 19686e8b9e86..a561e99bfc5c 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -2,7 +2,10 @@ use std::collections::BTreeMap; use ahash::{HashMap, HashSet}; -use re_log_types::{EntityPath, RowId, TimeInt, TimeRange, Timeline, VecDequeRemovalExt as _}; +use nohash_hasher::IntMap; +use re_log_types::{ + EntityPath, EntityPathHash, RowId, TimePoint, TimeRange, Timeline, VecDequeRemovalExt as _, +}; use re_types_core::{ComponentName, SizeBytes as _}; use crate::{ @@ -42,6 +45,11 @@ pub struct GarbageCollectionOptions { /// Components which should not be protected from GC when using `protect_latest` pub dont_protect: HashSet, + + /// Whether to enable batched bucket drops. + /// + /// Disabled by default as it is currently slower in most cases (somehow). + pub enable_batching: bool, } impl GarbageCollectionOptions { @@ -52,6 +60,7 @@ impl GarbageCollectionOptions { protect_latest: 0, purge_empty_tables: true, dont_protect: Default::default(), + enable_batching: false, } } } @@ -97,8 +106,6 @@ impl DataStore { /// component on each timeline. The only practical guarantee this gives is that a latest-at query /// with a value of max-int will be unchanged. However, latest-at queries from other arbitrary /// points in time may provide different results pre- and post- GC. - // - // TODO(#1823): Workload specific optimizations. pub fn gc(&mut self, options: &GarbageCollectionOptions) -> (Vec, DataStoreStats) { re_tracing::profile_function!(); @@ -131,6 +138,7 @@ impl DataStore { ); self.gc_drop_at_least_num_bytes( + options.enable_batching, num_bytes_to_drop, options.gc_timeless, &protected_rows, @@ -146,7 +154,12 @@ impl DataStore { "starting GC" ); - self.gc_drop_at_least_num_bytes(f64::INFINITY, options.gc_timeless, &protected_rows) + self.gc_drop_at_least_num_bytes( + options.enable_batching, + f64::INFINITY, + options.gc_timeless, + &protected_rows, + ) } }; @@ -201,14 +214,9 @@ impl DataStore { } /// Tries to drop _at least_ `num_bytes_to_drop` bytes of data from the store. - /// - /// Returns the list of `RowId`s that were purged from the store. - // - // TODO(jleibs): There are some easy optimizations here if we find GC taking too long: - // - If we know we are clearing almost everything, then we can batch-clear the rows from the - // the tables instead of needing to iterate over every single row incrementally. fn gc_drop_at_least_num_bytes( &mut self, + enable_batching: bool, mut num_bytes_to_drop: f64, include_timeless: bool, protected_rows: &HashSet, @@ -218,26 +226,192 @@ impl DataStore { let mut diffs = Vec::new(); // The algorithm is straightforward: - // 1. Find the oldest `RowId` that is not protected - // 2. Find all tables that potentially hold data associated with that `RowId` - // 3. Drop the associated row and account for the space we got back + // 1. Accumulate a bunch of `RowId`s in ascending order, starting from the beginning of time. + // 2. Check if any `RowId` in the batch is protected, in which case the entire batch is + // considered protected and cannot be dropped all at once. + // 3. Send the batch to `drop_batch` to handle the actual deletion. + // 4. Removed the dropped rows from the metadata registry. + + let batch_size = (self.config.indexed_bucket_num_rows as usize).saturating_mul(2); + let batch_size = batch_size.clamp(64, 4096); + + let mut batch: Vec<(TimePoint, (EntityPathHash, RowId))> = Vec::with_capacity(batch_size); + let mut batch_is_protected = false; + + let Self { + cluster_key, + metadata_registry, + cluster_cell_cache, + tables, + timeless_tables, + .. + } = self; + + for (&row_id, (timepoint, entity_path_hash)) in &metadata_registry.registry { + if protected_rows.contains(&row_id) { + batch_is_protected = true; + continue; + } + + batch.push((timepoint.clone(), (*entity_path_hash, row_id))); + if batch.len() < batch_size { + continue; + } + + let dropped = Self::drop_batch( + enable_batching, + tables, + timeless_tables, + cluster_cell_cache, + *cluster_key, + include_timeless, + &mut num_bytes_to_drop, + &batch, + batch_is_protected, + ); + + // Only decrement the metadata size trackers if we're actually certain that we'll drop + // that RowId in the end. + for dropped in dropped { + let metadata_dropped_size_bytes = dropped.row_id.total_size_bytes() + + dropped.timepoint().total_size_bytes() + + dropped.entity_path.hash().total_size_bytes(); + metadata_registry.heap_size_bytes = metadata_registry + .heap_size_bytes + .checked_sub(metadata_dropped_size_bytes) + .unwrap_or_else(|| { + re_log::warn_once!( + "GC metadata_registry size tracker underflowed, this is a bug!" + ); + 0 + }); + num_bytes_to_drop -= metadata_dropped_size_bytes as f64; + + diffs.push(dropped); + } - for (&row_id, (timepoint, entity_path_hash)) in &self.metadata_registry.registry { if num_bytes_to_drop <= 0.0 { break; } - if protected_rows.contains(&row_id) { - continue; + batch.clear(); + batch_is_protected = false; + } + + // Handle leftovers. + { + let dropped = Self::drop_batch( + enable_batching, + tables, + timeless_tables, + cluster_cell_cache, + *cluster_key, + include_timeless, + &mut num_bytes_to_drop, + &batch, + batch_is_protected, + ); + + // Only decrement the metadata size trackers if we're actually certain that we'll drop + // that RowId in the end. + for dropped in dropped { + let metadata_dropped_size_bytes = dropped.row_id.total_size_bytes() + + dropped.timepoint().total_size_bytes() + + dropped.entity_path.hash().total_size_bytes(); + metadata_registry.heap_size_bytes = metadata_registry + .heap_size_bytes + .checked_sub(metadata_dropped_size_bytes) + .unwrap_or_else(|| { + re_log::warn_once!( + "GC metadata_registry size tracker underflowed, this is a bug!" + ); + 0 + }); + num_bytes_to_drop -= metadata_dropped_size_bytes as f64; + + diffs.push(dropped); + } + } + + // Purge the removed rows from the metadata_registry. + // This is safe because the entire GC process is driven by RowId-order. + for diff in &diffs { + metadata_registry.remove(&diff.row_id); + } + + diffs + } + + #[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)] + fn drop_batch( + enable_batching: bool, + tables: &mut BTreeMap<(EntityPathHash, Timeline), IndexedTable>, + timeless_tables: &mut IntMap, + cluster_cell_cache: &ClusterCellCache, + cluster_key: ComponentName, + include_timeless: bool, + num_bytes_to_drop: &mut f64, + batch: &[(TimePoint, (EntityPathHash, RowId))], + batch_is_protected: bool, + ) -> Vec { + let mut diffs = Vec::new(); + + // The algorithm is straightforward: + // 1. If the batch isn't protected, find and drop all buckets that are guaranteed to + // contain only rows older than the ones in the batch. + // 2. Check how many bytes were dropped; continue if we haven't met our objective. + // 3. Fallback to deletion of individual rows. + // 4. Check how many bytes were dropped; continue if we haven't met our objective. + + // NOTE: The batch is already sorted by definition since it's extracted from the registry's btreemap. + let max_row_id = batch.last().map(|(_, (_, row_id))| *row_id); + + if enable_batching && max_row_id.is_some() && !batch_is_protected { + // NOTE: unwrap cannot fail but just a precaution in case this code moves around… + let max_row_id = max_row_id.unwrap_or(RowId::ZERO); + + let mut batch_removed: HashMap = HashMap::default(); + let mut cur_entity_path_hash = None; + + // NOTE: We _must_ go through all tables no matter what, since the batch might contain + // any number of distinct entities. + for ((entity_path_hash, _), table) in &mut *tables { + let (removed, num_bytes_removed) = + table.try_drop_bucket(cluster_cell_cache, cluster_key, max_row_id); + + *num_bytes_to_drop -= num_bytes_removed as f64; + + if cur_entity_path_hash != Some(*entity_path_hash) { + diffs.extend(batch_removed.drain().map(|(_, diff)| diff)); + + cur_entity_path_hash = Some(*entity_path_hash); + } + + for mut removed in removed { + batch_removed + .entry(removed.row_id) + .and_modify(|diff| { + diff.times.extend(std::mem::take(&mut removed.times)); + }) + .or_insert(removed); + } } + diffs.extend(batch_removed.drain().map(|(_, diff)| diff)); + } + + if *num_bytes_to_drop <= 0.0 { + return diffs; + } + + for (timepoint, (entity_path_hash, row_id)) in batch { let mut diff: Option = None; // find all tables that could possibly contain this `RowId` for (&timeline, &time) in timepoint { - if let Some(table) = self.tables.get_mut(&(timeline, *entity_path_hash)) { + if let Some(table) = tables.get_mut(&(*entity_path_hash, timeline)) { let (removed, num_bytes_removed) = - table.try_drop_row(&self.cluster_cell_cache, row_id, time.as_i64()); + table.try_drop_row(cluster_cell_cache, *row_id, time.as_i64()); if let Some(inner) = diff.as_mut() { if let Some(removed) = removed { inner.times.extend(removed.times); @@ -245,17 +419,17 @@ impl DataStore { } else { diff = removed; } - num_bytes_to_drop -= num_bytes_removed as f64; + *num_bytes_to_drop -= num_bytes_removed as f64; } } // TODO(jleibs): This is a worst-case removal-order. Would be nice to collect all the rows // first and then remove them in one pass. if timepoint.is_timeless() && include_timeless { - for table in self.timeless_tables.values_mut() { + for table in timeless_tables.values_mut() { // let deleted_comps = deleted.timeless.entry(ent_path.clone()_hash).or_default(); let (removed, num_bytes_removed) = - table.try_drop_row(&self.cluster_cell_cache, row_id); + table.try_drop_row(cluster_cell_cache, *row_id); if let Some(inner) = diff.as_mut() { if let Some(removed) = removed { inner.times.extend(removed.times); @@ -263,36 +437,15 @@ impl DataStore { } else { diff = removed; } - num_bytes_to_drop -= num_bytes_removed as f64; + *num_bytes_to_drop -= num_bytes_removed as f64; } } - // Only decrement the metadata size trackers if we're actually certain that we'll drop - // that RowId in the end. - if diff.is_some() { - let metadata_dropped_size_bytes = row_id.total_size_bytes() - + timepoint.total_size_bytes() - + entity_path_hash.total_size_bytes(); - self.metadata_registry.heap_size_bytes = self - .metadata_registry - .heap_size_bytes - .checked_sub(metadata_dropped_size_bytes) - .unwrap_or_else(|| { - re_log::warn_once!( - "GC metadata_registry size tracker underflowed, this is a bug!" - ); - 0 - }); - num_bytes_to_drop -= metadata_dropped_size_bytes as f64; - } - diffs.extend(diff); - } - // Purge the removed rows from the metadata_registry. - // This is safe because the entire GC process is driven by RowId-order. - for diff in &diffs { - self.metadata_registry.remove(&diff.row_id); + if *num_bytes_to_drop <= 0.0 { + break; + } } diffs @@ -444,7 +597,7 @@ impl DataStore { }); // Drop any empty temporal tables that aren't backed by a timeless table - self.tables.retain(|(_, entity), table| { + self.tables.retain(|(entity, _), table| { // If the timeless table still exists, this table might be storing empty values // that hide the timeless values, so keep it around. if self.timeless_tables.contains_key(entity) { @@ -495,6 +648,84 @@ impl DataStore { } impl IndexedTable { + /// Try to drop an entire bucket at once if it doesn't contain any `RowId` greater than `max_row_id`. + fn try_drop_bucket( + &mut self, + cluster_cache: &ClusterCellCache, + cluster_key: ComponentName, + max_row_id: RowId, + ) -> (Vec, u64) { + re_tracing::profile_function!(); + + let ent_path = self.ent_path.clone(); + let timeline = self.timeline; + + let mut diffs: Vec = Vec::new(); + let mut dropped_num_bytes = 0u64; + let mut dropped_num_rows = 0u64; + + let mut dropped_bucket_times = HashSet::default(); + + // TODO(cmc): scaling linearly with the number of buckets could be improved, although this + // is quite fast in practice because of the early check. + for (bucket_time, bucket) in &self.buckets { + let inner = &mut *bucket.inner.write(); + + if inner.col_time.is_empty() || max_row_id < inner.max_row_id { + continue; + } + + let IndexedBucketInner { + mut col_time, + mut col_row_id, + mut columns, + size_bytes, + .. + } = std::mem::take(inner); + + dropped_bucket_times.insert(*bucket_time); + + while let Some(row_id) = col_row_id.pop_front() { + let mut diff = StoreDiff::deletion(row_id, ent_path.clone()); + + if let Some(time) = col_time.pop_front() { + diff.times.push((timeline, time.into())); + } + + for (component_name, column) in &mut columns { + if let Some(cell) = column.pop_front().flatten() { + if cell.component_name() == cluster_key { + if let Some(cached_cell) = cluster_cache.get(&cell.num_instances()) { + if std::ptr::eq(cell.as_ptr(), cached_cell.as_ptr()) { + // We don't fire events when inserting autogenerated cluster cells, and + // therefore must not fire when removing them either. + continue; + } + } + } + + diff.cells.insert(*component_name, cell); + } + } + + diffs.push(diff); + } + + dropped_num_bytes += size_bytes; + dropped_num_rows += col_time.len() as u64; + } + + self.buckets + .retain(|bucket_time, _| !dropped_bucket_times.contains(bucket_time)); + + self.uphold_indexing_invariants(); + + self.buckets_num_rows -= dropped_num_rows; + self.buckets_size_bytes -= dropped_num_bytes; + + (diffs, dropped_num_bytes) + } + /// Tries to drop the given `row_id` from the table, which is expected to be found at the /// specified `time`. /// @@ -540,13 +771,7 @@ impl IndexedTable { dropped_num_bytes = bucket_num_bytes; self.buckets.remove(&bucket_key); - // NOTE: If this is the first bucket of the table that we've just removed, we need the - // next one to become responsible for `-∞`. - if bucket_key == TimeInt::MIN { - if let Some((_, bucket)) = self.buckets.pop_first() { - self.buckets.insert(TimeInt::MIN, bucket); - } - } + self.uphold_indexing_invariants(); } self.buckets_size_bytes -= dropped_num_bytes; @@ -578,6 +803,7 @@ impl IndexedBucketInner { col_time, col_insert_id, col_row_id, + max_row_id, col_num_instances, columns, size_bytes, @@ -664,6 +890,12 @@ impl IndexedBucketInner { } } + if *max_row_id == removed_row_id { + // NOTE: We _have_ to fullscan here: the bucket is sorted by `(Time, RowId)`, there + // could very well be a greater lurking in a lesser entry. + *max_row_id = col_row_id.iter().max().copied().unwrap_or(RowId::ZERO); + } + // NOTE: A single `RowId` cannot possibly have more than one datapoint for // a single timeline. break; diff --git a/crates/re_arrow_store/src/store_polars.rs b/crates/re_arrow_store/src/store_polars.rs index 5417236789d9..bb7f7dc9e005 100644 --- a/crates/re_arrow_store/src/store_polars.rs +++ b/crates/re_arrow_store/src/store_polars.rs @@ -114,7 +114,7 @@ impl DataStore { let timelines: BTreeSet<&str> = self .tables .keys() - .map(|(timeline, _)| timeline.name().as_str()) + .map(|(_, timeline)| timeline.name().as_str()) .collect(); let df = sort_df_columns(&df, self.config.store_insert_ids, &timelines); @@ -221,6 +221,7 @@ impl IndexedBucket { col_time, col_insert_id, col_row_id, + max_row_id: _, col_num_instances, columns, size_bytes: _, diff --git a/crates/re_arrow_store/src/store_read.rs b/crates/re_arrow_store/src/store_read.rs index 05f98aa9136c..a63f7b5b4c88 100644 --- a/crates/re_arrow_store/src/store_read.rs +++ b/crates/re_arrow_store/src/store_read.rs @@ -117,7 +117,7 @@ impl DataStore { let temporal = self .tables - .get(&(*timeline, ent_path_hash)) + .get(&(ent_path_hash, *timeline)) .map(|table| &table.all_components); let components = match (timeless, temporal) { @@ -170,7 +170,7 @@ impl DataStore { // Otherwise see if it exists in the specified timeline self.tables - .get(&(*timeline, ent_path_hash)) + .get(&(ent_path_hash, *timeline)) .map_or(false, |table| table.all_components.contains(component)) } @@ -185,7 +185,7 @@ impl DataStore { let min_time = self .tables - .get(&(*timeline, ent_path_hash))? + .get(&(ent_path_hash, *timeline))? .buckets .first_key_value()? .1 @@ -284,7 +284,7 @@ impl DataStore { let cells = self .tables - .get(&(query.timeline, ent_path_hash)) + .get(&(ent_path_hash, query.timeline)) .and_then(|table| { let cells = table.latest_at(query.at, primary, components); trace!( @@ -474,7 +474,7 @@ impl DataStore { let temporal = self .tables - .get(&(query.timeline, ent_path_hash)) + .get(&(ent_path_hash, query.timeline)) .map(|index| index.range(query.range, components)) .into_iter() .flatten() @@ -733,6 +733,7 @@ impl IndexedBucket { col_time, col_insert_id: _, col_row_id, + max_row_id: _, col_num_instances: _, columns, size_bytes: _, @@ -846,6 +847,7 @@ impl IndexedBucket { col_time, col_insert_id: _, col_row_id, + max_row_id: _, col_num_instances: _, columns, size_bytes: _, @@ -958,6 +960,7 @@ impl IndexedBucketInner { col_time, col_insert_id, col_row_id, + max_row_id: _, col_num_instances, columns, size_bytes: _, diff --git a/crates/re_arrow_store/src/store_sanity.rs b/crates/re_arrow_store/src/store_sanity.rs index 50e79008d3bb..770bfe387050 100644 --- a/crates/re_arrow_store/src/store_sanity.rs +++ b/crates/re_arrow_store/src/store_sanity.rs @@ -18,6 +18,11 @@ pub enum SanityError { )] TimeRangeOutOfSync { expected: TimeRange, got: TimeRange }, + #[error( + "Reported max RowId for indexed bucket is out of sync: got {got}, expected {expected}" + )] + MaxRowIdOutOfSync { expected: RowId, got: RowId }, + #[error("Reported size for {origin} is out of sync: got {got}, expected {expected}")] SizeOutOfSync { origin: &'static str, @@ -163,6 +168,7 @@ impl IndexedBucket { col_time, col_insert_id, col_row_id, + max_row_id, col_num_instances, columns, size_bytes: _, @@ -185,6 +191,17 @@ impl IndexedBucket { } } + // Make sure `max_row_id` isn't out of sync + { + let expected = col_row_id.iter().max().copied().unwrap_or(RowId::ZERO); + if expected != *max_row_id { + return Err(SanityError::MaxRowIdOutOfSync { + expected, + got: *max_row_id, + }); + } + } + // All columns should be `Self::num_rows` long. { const COLUMN_TIMEPOINT: &str = "rerun.controls.TimePoint"; diff --git a/crates/re_arrow_store/src/store_stats.rs b/crates/re_arrow_store/src/store_stats.rs index e3bacb64c362..929f07416f6d 100644 --- a/crates/re_arrow_store/src/store_stats.rs +++ b/crates/re_arrow_store/src/store_stats.rs @@ -252,7 +252,7 @@ impl DataStore { timeline: re_log_types::Timeline, entity_path_hash: re_log_types::EntityPathHash, ) -> EntityStats { - let mut entity_stats = self.tables.get(&(timeline, entity_path_hash)).map_or( + let mut entity_stats = self.tables.get(&(entity_path_hash, timeline)).map_or( EntityStats::default(), |table| EntityStats { num_rows: table.buckets_num_rows, @@ -394,6 +394,7 @@ impl IndexedBucketInner { col_time, col_insert_id, col_row_id, + max_row_id, col_num_instances, columns, size_bytes, @@ -404,6 +405,7 @@ impl IndexedBucketInner { + col_time.total_size_bytes() + col_insert_id.total_size_bytes() + col_row_id.total_size_bytes() + + max_row_id.total_size_bytes() + col_num_instances.total_size_bytes() + columns.total_size_bytes() + size_bytes.total_size_bytes(); diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index 46681e6b224c..024ee9777b82 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -176,7 +176,7 @@ impl DataStore { let ent_path = entity_path.clone(); // shallow let index = self .tables - .entry((*timeline, ent_path_hash)) + .entry((ent_path_hash, *timeline)) .or_insert_with(|| IndexedTable::new(self.cluster_key, *timeline, ent_path)); index.insert_row( @@ -443,6 +443,7 @@ impl IndexedBucket { col_time, col_insert_id, col_row_id, + max_row_id, col_num_instances, columns, size_bytes, @@ -465,6 +466,7 @@ impl IndexedBucket { size_bytes_added += insert_id.total_size_bytes(); } col_row_id.push_back(row.row_id()); + *max_row_id = RowId::max(*max_row_id, row.row_id()); size_bytes_added += row.row_id().total_size_bytes(); col_num_instances.push_back(row.num_instances()); size_bytes_added += row.num_instances().total_size_bytes(); @@ -576,6 +578,7 @@ impl IndexedBucket { col_time: col_time1, col_insert_id: col_insert_id1, col_row_id: col_row_id1, + max_row_id: max_row_id1, col_num_instances: col_num_instances1, columns: columns1, size_bytes: _, // NOTE: recomputed below @@ -604,6 +607,9 @@ impl IndexedBucket { col_num_instances1.split_off_or_default(split_idx), ) }; + // NOTE: We _have_ to fullscan here: the bucket is sorted by `(Time, RowId)`, there + // could very well be a greater lurking in a lesser entry. + *max_row_id1 = col_row_id1.iter().max().copied().unwrap_or(RowId::ZERO); // this updates `columns1` in-place! let columns2: IntMap<_, _> = { @@ -623,12 +629,16 @@ impl IndexedBucket { }; let inner2 = { + // NOTE: We _have_ to fullscan here: the bucket is sorted by `(Time, RowId)`, there + // could very well be a greater lurking in a lesser entry. + let max_row_id2 = col_row_id2.iter().max().copied().unwrap_or(RowId::ZERO); let mut inner2 = IndexedBucketInner { is_sorted: true, time_range: time_range2, col_time: col_time2, col_insert_id: col_insert_id2, col_row_id: col_row_id2, + max_row_id: max_row_id2, col_num_instances: col_num_instances2, columns: columns2, size_bytes: 0, // NOTE: computed below diff --git a/crates/re_arrow_store/tests/correctness.rs b/crates/re_arrow_store/tests/correctness.rs index 7fb01f4fd127..a456824d17f8 100644 --- a/crates/re_arrow_store/tests/correctness.rs +++ b/crates/re_arrow_store/tests/correctness.rs @@ -563,39 +563,43 @@ fn check_still_readable(_store: &DataStore) { // getting the confirmation that the row was really removed. #[test] fn gc_metadata_size() -> anyhow::Result<()> { - let mut store = DataStore::new( - re_log_types::StoreId::random(re_log_types::StoreKind::Recording), - InstanceKey::name(), - Default::default(), - ); + for enable_batching in [false, true] { + let mut store = DataStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + InstanceKey::name(), + Default::default(), + ); - let point = MyPoint::new(1.0, 1.0); + let point = MyPoint::new(1.0, 1.0); - for _ in 0..3 { - let row = DataRow::from_component_batches( - RowId::random(), - TimePoint::timeless(), - "xxx".into(), - [&[point] as _], - )?; - store.insert_row(&row).unwrap(); - } + for _ in 0..3 { + let row = DataRow::from_component_batches( + RowId::random(), + TimePoint::timeless(), + "xxx".into(), + [&[point] as _], + )?; + store.insert_row(&row).unwrap(); + } - for _ in 0..2 { - _ = store.gc(&GarbageCollectionOptions { - target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(1.0), - gc_timeless: false, - protect_latest: 1, - purge_empty_tables: false, - dont_protect: Default::default(), - }); - _ = store.gc(&GarbageCollectionOptions { - target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(1.0), - gc_timeless: false, - protect_latest: 1, - purge_empty_tables: false, - dont_protect: Default::default(), - }); + for _ in 0..2 { + _ = store.gc(&GarbageCollectionOptions { + target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(1.0), + gc_timeless: false, + protect_latest: 1, + purge_empty_tables: false, + dont_protect: Default::default(), + enable_batching, + }); + _ = store.gc(&GarbageCollectionOptions { + target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(1.0), + gc_timeless: false, + protect_latest: 1, + purge_empty_tables: false, + dont_protect: Default::default(), + enable_batching, + }); + } } Ok(()) diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index 7fc6f1a33199..fe71ed7cf206 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -932,6 +932,7 @@ fn gc_impl(store: &mut DataStore) { protect_latest: 0, purge_empty_tables: false, dont_protect: Default::default(), + enable_batching: false, }); for event in store_events { assert!(store.get_msg_metadata(&event.row_id).is_none()); @@ -1011,6 +1012,7 @@ fn protected_gc_impl(store: &mut DataStore) { protect_latest: 1, purge_empty_tables: true, dont_protect: Default::default(), + enable_batching: false, }); let mut assert_latest_components = |frame_nr: TimeInt, rows: &[(ComponentName, &DataRow)]| { @@ -1107,6 +1109,7 @@ fn protected_gc_clear_impl(store: &mut DataStore) { protect_latest: 1, purge_empty_tables: true, dont_protect: Default::default(), + enable_batching: false, }); let mut assert_latest_components = |frame_nr: TimeInt, rows: &[(ComponentName, &DataRow)]| { @@ -1149,6 +1152,7 @@ fn protected_gc_clear_impl(store: &mut DataStore) { protect_latest: 1, purge_empty_tables: true, dont_protect: Default::default(), + enable_batching: false, }); // No rows should remain because the table should have been purged diff --git a/crates/re_data_store/src/store_db.rs b/crates/re_data_store/src/store_db.rs index 7e63d12e8fad..f1cbe400ce43 100644 --- a/crates/re_data_store/src/store_db.rs +++ b/crates/re_data_store/src/store_db.rs @@ -433,6 +433,7 @@ impl StoreDb { ] .into_iter() .collect(), + enable_batching: false, }); } @@ -449,6 +450,7 @@ impl StoreDb { protect_latest: 1, purge_empty_tables: false, dont_protect: Default::default(), + enable_batching: false, }); }