Skip to content

Commit

Permalink
feat(compaction): support filter data by delete-range (#5998)
Browse files Browse the repository at this point in the history
* support filter data by delete-range

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix bug for watermark

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* add test

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* remove inf

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* remove inf

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* add more test

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* refactor delete range algorithm

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix bug

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* add more test

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* refactor for read

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix format

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix delete range

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* add more test

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* refactor some variable

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix format

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix format

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* add test

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* add comment

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
Little-Wallace and mergify[bot] authored Nov 3, 2022
1 parent a6c88e9 commit db6691b
Show file tree
Hide file tree
Showing 11 changed files with 608 additions and 50 deletions.
15 changes: 9 additions & 6 deletions src/meta/src/hummock/compaction/overlap_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
// limitations under the License.

use itertools::Itertools;
use risingwave_hummock_sdk::key::user_key;
use risingwave_hummock_sdk::key_range::KeyRangeCommon;
use risingwave_hummock_sdk::VersionedComparator;
use risingwave_pb::hummock::{KeyRange, SstableInfo};

pub trait OverlapInfo {
Expand Down Expand Up @@ -76,16 +76,19 @@ impl OverlapInfo for RangeOverlapInfo {
Some(key_range) => {
let mut tables = vec![];
let overlap_begin = others.partition_point(|table_status| {
user_key(&table_status.key_range.as_ref().unwrap().right)
< user_key(&key_range.left)
VersionedComparator::less_than(
&table_status.key_range.as_ref().unwrap().right,
&key_range.left,
)
});
if overlap_begin >= others.len() {
return vec![];
}
for table in &others[overlap_begin..] {
if user_key(&table.key_range.as_ref().unwrap().left)
> user_key(&key_range.right)
{
if VersionedComparator::less_than(
&key_range.right,
&table.key_range.as_ref().unwrap().left,
) {
break;
}
tables.push(table.clone());
Expand Down
10 changes: 10 additions & 0 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ pub fn user_key(full_key: &[u8]) -> &[u8] {
split_key_epoch(full_key).0
}

#[inline(always)]
/// Extract user key without epoch part but allow empty slice
pub fn get_user_key(full_key: &[u8]) -> Vec<u8> {
if full_key.is_empty() {
vec![]
} else {
user_key(full_key).to_vec()
}
}

/// Extract table id in key prefix
#[inline(always)]
pub fn get_table_id(full_key: &[u8]) -> u32 {
Expand Down
5 changes: 5 additions & 0 deletions src/storage/hummock_sdk/src/version_cmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ impl VersionedComparator {
l_p.cmp(r_p).then_with(|| r_s.cmp(l_s))
}

#[inline]
pub fn less_than(lhs: &[u8], rhs: &[u8]) -> bool {
Self::compare_key(lhs, rhs) == cmp::Ordering::Less
}

#[inline]
pub fn same_user_key(lhs: &[u8], rhs: &[u8]) -> bool {
user_key(lhs) == user_key(rhs)
Expand Down
55 changes: 52 additions & 3 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use bytes::Bytes;
use itertools::Itertools;
use risingwave_hummock_sdk::can_concat;
use risingwave_hummock_sdk::filter_key_extractor::FilterKeyExtractorImpl;
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
use risingwave_pb::hummock::{CompactTask, LevelType};

use super::task_progress::TaskProgress;
Expand All @@ -27,13 +27,16 @@ use crate::hummock::compactor::{
CompactOutput, CompactionFilter, Compactor, CompactorContext, CompactorSstableStoreRef,
};
use crate::hummock::iterator::{Forward, HummockIterator, UnorderedMergeIteratorInner};
use crate::hummock::sstable::DeleteRangeAggregator;
use crate::hummock::{CachePolicy, CompressionAlgorithm, HummockResult, SstableBuilderOptions};
use crate::monitor::StoreLocalStatistic;

#[derive(Clone)]
pub struct CompactorRunner {
compact_task: CompactTask,
compactor: Compactor,
sstable_store: CompactorSstableStoreRef,
key_range: KeyRange,
split_index: usize,
}

Expand Down Expand Up @@ -64,7 +67,7 @@ impl CompactorRunner {
let compactor = Compactor::new(
context.context.clone(),
options,
key_range,
key_range.clone(),
CachePolicy::NotFill,
task.gc_delete_keys,
task.watermark,
Expand All @@ -74,6 +77,7 @@ impl CompactorRunner {
compactor,
compact_task: task,
sstable_store: context.sstable_store.clone(),
key_range,
split_index,
}
}
Expand All @@ -85,18 +89,50 @@ impl CompactorRunner {
task_progress: Arc<TaskProgress>,
) -> HummockResult<CompactOutput> {
let iter = self.build_sst_iter()?;
let del_agg = self.build_delete_range_iter().await?;
let ssts = self
.compactor
.compact_key_range(
iter,
compaction_filter,
del_agg,
filter_key_extractor,
Some(task_progress),
)
.await?;
Ok((self.split_index, ssts))
}

async fn build_delete_range_iter(&self) -> HummockResult<Arc<DeleteRangeAggregator>> {
let mut aggregator = DeleteRangeAggregator::new(
self.key_range.clone(),
self.compact_task.watermark,
self.compact_task.gc_delete_keys,
);
let mut local_stats = StoreLocalStatistic::default();
for level in &self.compact_task.input_ssts {
if level.table_infos.is_empty() {
continue;
}

for table_info in &level.table_infos {
let key_range = KeyRange::from(table_info.key_range.as_ref().unwrap());
if !self.key_range.full_key_overlap(&key_range) {
continue;
}
let table = self
.compactor
.context
.sstable_store
.sstable(table_info, &mut local_stats)
.await?;
aggregator.add_tombstone(table.value().meta.range_tombstone_list.clone());
}
}
aggregator.sort();
Ok(Arc::new(aggregator))
}

/// Build the merge iterator based on the given input ssts.
fn build_sst_iter(&self) -> HummockResult<impl HummockIterator<Direction = Forward>> {
let mut table_iters = Vec::new();
Expand All @@ -109,13 +145,26 @@ impl CompactorRunner {
// Do not need to filter the table because manager has done it.
if level.level_type == LevelType::Nonoverlapping as i32 {
debug_assert!(can_concat(&level.table_infos.iter().collect_vec()));
let tables = level
.table_infos
.iter()
.filter(|info| {
let key_range = KeyRange::from(info.key_range.as_ref().unwrap());
self.key_range.full_key_overlap(&key_range)
})
.cloned()
.collect_vec();
table_iters.push(ConcatSstableIterator::new(
level.table_infos.clone(),
tables,
self.compactor.task_config.key_range.clone(),
self.sstable_store.clone(),
));
} else {
for table_info in &level.table_infos {
let key_range = KeyRange::from(table_info.key_range.as_ref().unwrap());
if !self.key_range.full_key_overlap(&key_range) {
continue;
}
table_iters.push(ConcatSstableIterator::new(
vec![table_info.clone()],
self.compactor.task_config.key_range.clone(),
Expand Down
24 changes: 16 additions & 8 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use crate::hummock::compactor::compactor_runner::CompactorRunner;
use crate::hummock::compactor::task_progress::TaskProgressGuard;
use crate::hummock::iterator::{Forward, HummockIterator};
use crate::hummock::multi_builder::{SplitTableOutput, TableBuilderFactory};
use crate::hummock::sstable::DeleteRangeAggregator;
use crate::hummock::utils::MemoryLimiter;
use crate::hummock::vacuum::Vacuum;
use crate::hummock::{
Expand Down Expand Up @@ -131,6 +132,7 @@ pub struct Compactor {
context: Arc<Context>,
task_config: TaskConfig,
options: SstableBuilderOptions,
get_id_time: Arc<AtomicU64>,
}

pub type CompactOutput = (usize, Vec<SstableInfo>);
Expand Down Expand Up @@ -533,6 +535,7 @@ impl Compactor {
let mut last_key = BytesMut::new();
let mut watermark_can_see_last_key = false;
let mut local_stats = StoreLocalStatistic::default();
let mut del_iter = sst_builder.del_agg.iter();

while iter.is_valid() {
let iter_key = iter.key();
Expand Down Expand Up @@ -566,12 +569,14 @@ impl Compactor {
// in our design, frontend avoid to access keys which had be deleted, so we dont
// need to consider the epoch when the compaction_filter match (it
// means that mv had drop)
let current_user_key = user_key(iter_key);
if (epoch <= task_config.watermark && task_config.gc_delete_keys && value.is_delete())
|| (epoch < task_config.watermark && watermark_can_see_last_key)
|| (epoch < task_config.watermark
&& (watermark_can_see_last_key
|| del_iter.should_delete(current_user_key, epoch)))
{
drop = true;
}

if !drop && compaction_filter.should_delete(iter_key) {
drop = true;
}
Expand Down Expand Up @@ -617,6 +622,7 @@ impl Compactor {
gc_delete_keys,
watermark,
},
get_id_time: Arc::new(AtomicU64::new(0)),
}
}

Expand All @@ -628,10 +634,10 @@ impl Compactor {
&self,
iter: impl HummockIterator<Direction = Forward>,
compaction_filter: impl CompactionFilter,
del_agg: Arc<DeleteRangeAggregator>,
filter_key_extractor: Arc<FilterKeyExtractorImpl>,
task_progress: Option<Arc<TaskProgress>>,
) -> HummockResult<Vec<SstableInfo>> {
let get_id_time = Arc::new(AtomicU64::new(0));
// Monitor time cost building shared buffer to SSTs.
let compact_timer = if self.context.is_share_buffer_compact {
self.context.stats.write_build_l0_sst_duration.start_timer()
Expand All @@ -646,8 +652,8 @@ impl Compactor {
StreamingSstableWriterFactory::new(self.context.sstable_store.clone()),
iter,
compaction_filter,
del_agg,
filter_key_extractor,
get_id_time.clone(),
task_progress.clone(),
)
.await?
Expand All @@ -656,8 +662,8 @@ impl Compactor {
BatchSstableWriterFactory::new(self.context.sstable_store.clone()),
iter,
compaction_filter,
del_agg,
filter_key_extractor,
get_id_time.clone(),
task_progress.clone(),
)
.await?
Expand Down Expand Up @@ -704,7 +710,7 @@ impl Compactor {
self.context
.stats
.get_table_id_total_time_duration
.observe(get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
.observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);

debug_assert!(ssts
.iter()
Expand All @@ -717,16 +723,16 @@ impl Compactor {
writer_factory: F,
iter: impl HummockIterator<Direction = Forward>,
compaction_filter: impl CompactionFilter,
del_agg: Arc<DeleteRangeAggregator>,
filter_key_extractor: Arc<FilterKeyExtractorImpl>,
get_id_time: Arc<AtomicU64>,
task_progress: Option<Arc<TaskProgress>>,
) -> HummockResult<Vec<SplitTableOutput>> {
let builder_factory = RemoteBuilderFactory {
sstable_id_manager: self.context.sstable_id_manager.clone(),
limiter: self.context.read_memory_limiter.clone(),
options: self.options.clone(),
policy: self.task_config.cache_policy,
remote_rpc_cost: get_id_time,
remote_rpc_cost: self.get_id_time.clone(),
filter_key_extractor,
sstable_writer_factory: writer_factory,
};
Expand All @@ -735,6 +741,8 @@ impl Compactor {
builder_factory,
self.context.stats.clone(),
task_progress,
del_agg,
self.task_config.key_range.clone(),
);
Compactor::compact_and_build_sst(
&mut sst_builder,
Expand Down
12 changes: 10 additions & 2 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::hummock::compactor::{CompactOutput, Compactor};
use crate::hummock::iterator::{Forward, HummockIterator};
use crate::hummock::shared_buffer::shared_buffer_uploader::UploadTaskPayload;
use crate::hummock::shared_buffer::{build_ordered_merge_iter, UncommittedData};
use crate::hummock::sstable::SstableIteratorReadOptions;
use crate::hummock::sstable::{DeleteRangeAggregator, SstableIteratorReadOptions};
use crate::hummock::{
CachePolicy, ForwardIter, HummockError, HummockResult, SstableBuilderOptions,
};
Expand Down Expand Up @@ -291,9 +291,17 @@ impl SharedBufferCompactRunner {
filter_key_extractor: Arc<FilterKeyExtractorImpl>,
) -> HummockResult<CompactOutput> {
let dummy_compaction_filter = DummyCompactionFilter {};
// TODO: add delete-range-tombstone from shared-buffer-batch.
let del_agg = Arc::new(DeleteRangeAggregator::new(KeyRange::inf(), 0, false));
let ssts = self
.compactor
.compact_key_range(iter, dummy_compaction_filter, filter_key_extractor, None)
.compact_key_range(
iter,
dummy_compaction_filter,
del_agg,
filter_key_extractor,
None,
)
.await?;
Ok((self.split_index, ssts))
}
Expand Down
Loading

0 comments on commit db6691b

Please sign in to comment.