Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC improvements 6: introduce batched GC #4400

Merged
merged 9 commits into from
Dec 2, 2023
Merged
2 changes: 2 additions & 0 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ fn gc(c: &mut Criterion) {
protect_latest: 0,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
});
stats_diff
});
Expand All @@ -315,6 +316,7 @@ fn gc(c: &mut Criterion) {
protect_latest: 0,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
});
stats_diff
});
Expand Down
106 changes: 70 additions & 36 deletions crates/re_arrow_store/benches/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ mod constants {

use constants::{NUM_ENTITY_PATHS, NUM_ROWS_PER_ENTITY_PATH};

fn gc_batching() -> &'static [bool] {
#[cfg(feature = "core_benchmarks_only")]
{
&[false]
}
#[cfg(not(feature = "core_benchmarks_only"))]
{
&[false, true]
}
}

fn num_rows_per_bucket() -> &'static [u64] {
#[cfg(feature = "core_benchmarks_only")]
{
Expand Down Expand Up @@ -63,8 +74,10 @@ fn plotting_dashboard(c: &mut Criterion) {
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
};

// NOTE: insert in multiple timelines to more closely match real world scenarios.
let mut timegen = |i| {
[
build_log_time(Time::from_seconds_since_epoch(i as _)),
Expand Down Expand Up @@ -98,26 +111,37 @@ fn plotting_dashboard(c: &mut Criterion) {

// Emulate more or less bucket
for &num_rows_per_bucket in num_rows_per_bucket() {
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
let store = build_store(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
for &gc_batching in gc_batching() {
group.bench_function(
if gc_batching {
format!("bucketsz={num_rows_per_bucket}/gc_batching=true")
} else {
format!("bucketsz={num_rows_per_bucket}")
},
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
|b| {
let store = build_store(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
},
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
let mut gc_settings = gc_settings.clone();
gc_settings.enable_batching = gc_batching;
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
},
BatchSize::LargeInput,
);
});
}
}
}

Expand All @@ -138,6 +162,7 @@ fn timeless_logs(c: &mut Criterion) {
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
};

let mut timegen = |_| TimePoint::timeless();
Expand Down Expand Up @@ -165,28 +190,38 @@ fn timeless_logs(c: &mut Criterion) {
);
});

// Emulate more or less bucket
for &num_rows_per_bucket in num_rows_per_bucket() {
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
let store = build_store(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
for &gc_batching in gc_batching() {
group.bench_function(
if gc_batching {
format!("bucketsz={num_rows_per_bucket}/gc_batching=true")
} else {
format!("bucketsz={num_rows_per_bucket}")
},
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
|b| {
let store = build_store(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
},
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
let mut gc_settings = gc_settings.clone();
gc_settings.enable_batching = gc_batching;
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
},
BatchSize::LargeInput,
);
});
}
}
}

Expand Down Expand Up @@ -241,7 +276,6 @@ where
(0..NUM_ROWS_PER_ENTITY_PATH).map(move |i| {
DataRow::from_component_batches(
RowId::random(),
// NOTE: insert in multiple timelines to more closely match real world scenarios.
timegen(i),
entity_path.clone(),
datagen(i)
Expand Down
52 changes: 45 additions & 7 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::{BTreeMap, VecDeque};
use std::sync::atomic::AtomicU64;

use ahash::HashMap;
use arrow2::datatypes::DataType;
use nohash_hasher::IntMap;
use parking_lot::RwLock;
Expand Down Expand Up @@ -210,7 +209,7 @@ pub struct DataStore {
/// All temporal [`IndexedTable`]s for all entities on all timelines.
///
/// See also [`Self::timeless_tables`].
pub(crate) tables: HashMap<(Timeline, EntityPathHash), IndexedTable>,
pub(crate) tables: BTreeMap<(EntityPathHash, Timeline), IndexedTable>,

/// All timeless indexed tables for all entities. Never garbage collected.
///
Expand Down Expand Up @@ -335,9 +334,9 @@ impl DataStore {
/// Do _not_ use this to try and assert the internal state of the datastore.
pub fn iter_indices(
&self,
) -> impl ExactSizeIterator<Item = ((Timeline, EntityPath), &IndexedTable)> {
self.tables.iter().map(|((timeline, _), table)| {
((*timeline, table.ent_path.clone() /* shallow */), table)
) -> impl ExactSizeIterator<Item = ((EntityPath, Timeline), &IndexedTable)> {
self.tables.iter().map(|((_, timeline), table)| {
((table.ent_path.clone() /* shallow */, *timeline), table)
})
}
}
Expand Down Expand Up @@ -439,13 +438,44 @@ impl IndexedTable {
Self {
timeline,
ent_path,
buckets: [(i64::MIN.into(), bucket)].into(),
buckets: [(TimeInt::MIN, bucket)].into(),
cluster_key,
all_components: Default::default(),
buckets_num_rows: 0,
buckets_size_bytes,
}
}

/// Makes sure bucketing invariants are upheld, and takes necessary actions if not.
///
/// Invariants are:
/// 1. There must always be at least one bucket alive.
/// 2. The first bucket must always have an _indexing time_ `-∞`.
pub(crate) fn uphold_indexing_invariants(&mut self) {
if self.buckets.is_empty() {
let Self {
timeline,
ent_path: _,
cluster_key,
buckets,
all_components: _, // keep the history on purpose
buckets_num_rows,
buckets_size_bytes,
} = self;

let bucket = IndexedBucket::new(*cluster_key, *timeline);
let size_bytes = bucket.total_size_bytes();

*buckets = [(TimeInt::MIN, bucket)].into();
*buckets_num_rows = 0;
*buckets_size_bytes = size_bytes;
}
// NOTE: Make sure the first bucket is responsible for `-∞`, which might or might not be
// the case now if we've been moving buckets around.
else if let Some((_, bucket)) = self.buckets.pop_first() {
self.buckets.insert(TimeInt::MIN, bucket);
}
}
}

/// An `IndexedBucket` holds a chunk of rows from an [`IndexedTable`]
Expand Down Expand Up @@ -474,7 +504,7 @@ impl Clone for IndexedBucket {
}

impl IndexedBucket {
fn new(cluster_key: ComponentName, timeline: Timeline) -> Self {
pub(crate) fn new(cluster_key: ComponentName, timeline: Timeline) -> Self {
Self {
timeline,
inner: RwLock::new(IndexedBucketInner::default()),
Expand Down Expand Up @@ -510,6 +540,13 @@ pub struct IndexedBucketInner {
/// Keeps track of the unique identifier for each row that was generated by the clients.
pub col_row_id: RowIdVec,

/// Keeps track of the latest/newest [`RowId`] present in this bucket.
///
/// Useful to batch GC buckets.
///
/// `RowId::ZERO` for empty buckets.
pub max_row_id: RowId,

/// The entire column of `num_instances`.
///
/// Keeps track of the expected number of instances in each row.
Expand Down Expand Up @@ -539,6 +576,7 @@ impl Default for IndexedBucketInner {
col_time: Default::default(),
col_insert_id: Default::default(),
col_row_id: Default::default(),
max_row_id: RowId::ZERO,
col_num_instances: Default::default(),
columns: Default::default(),
size_bytes: 0, // NOTE: computed below
Expand Down
1 change: 1 addition & 0 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl IndexedBucket {
col_time,
col_insert_id,
col_row_id,
max_row_id: _,
col_num_instances,
columns,
size_bytes: _,
Expand Down
2 changes: 2 additions & 0 deletions crates/re_arrow_store/src/store_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl DataStore {
col_time,
col_insert_id: _,
col_row_id,
max_row_id: _,
col_num_instances,
columns,
size_bytes: _,
Expand Down Expand Up @@ -169,6 +170,7 @@ impl DataStore {
col_time,
col_insert_id: _,
col_row_id,
max_row_id: _,
col_num_instances,
columns,
size_bytes: _,
Expand Down
3 changes: 2 additions & 1 deletion crates/re_arrow_store/src/store_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,8 @@ mod tests {
view,
);

view.on_events(&store.gc(&GarbageCollectionOptions::gc_everything()).0);
let events = store.gc(&GarbageCollectionOptions::gc_everything()).0;
view.on_events(&events);

similar_asserts::assert_eq!(
GlobalCounts::new(
Expand Down
Loading
Loading