Skip to content

Commit

Permalink
refactor: make the block pruning more clear (#15079)
Browse files Browse the repository at this point in the history
chore: make the block pruning more clear
  • Loading branch information
BohuTANG authored Mar 24, 2024
1 parent eeed57b commit 707396d
Showing 1 changed file with 77 additions and 42 deletions.
119 changes: 77 additions & 42 deletions src/query/storages/fuse/src/pruning/block_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,55 +49,90 @@ impl BlockPruner {
segment_location: SegmentLocation,
block_metas: Vec<Arc<BlockMeta>>,
) -> Result<Vec<(BlockMetaIndex, Arc<BlockMeta>)>> {
let mut block_meta_indexes: Vec<(usize, Arc<BlockMeta>)> =
block_metas.clone().into_iter().enumerate().collect();
// Apply internal column pruning.
let block_meta_indexes = self.internal_column_pruning(&block_metas);

if let Some(internal_column_pruner) = &self.pruning_ctx.internal_column_pruner {
block_meta_indexes = block_meta_indexes
.into_iter()
.filter(|(_, block_meta)| {
internal_column_pruner.should_keep(BLOCK_NAME_COL_NAME, &block_meta.location.0)
})
.collect::<Vec<_>>();
}
// Apply inverted index pruning.
let (block_meta_indexes, matched_rows_map) = self
.inverted_index_pruning(&segment_location, &block_metas, block_meta_indexes)
.await?;

let mut matched_rows_map = None;
if let Some(inverted_index_pruner) = &self.pruning_ctx.inverted_index_pruner {
let mut row_count = 0;
let mut row_counts = Vec::with_capacity(block_metas.len());
for block_meta in &block_metas {
row_count += block_meta.row_count as usize;
row_counts.push(row_count);
// Apply block pruning.
match &self.pruning_ctx.bloom_pruner {
// async pruning with bloom index.
Some(bloom_pruner) => {
self.block_pruning(
bloom_pruner,
segment_location,
block_metas,
block_meta_indexes,
matched_rows_map,
)
.await
}
let block_matched_rows_map = inverted_index_pruner
.should_keep_block(&segment_location.location.0, row_counts)?;

block_meta_indexes = block_meta_indexes
.into_iter()
.filter(|(block_idx, _)| block_matched_rows_map.contains_key(block_idx))
.collect::<Vec<_>>();

matched_rows_map = Some(block_matched_rows_map);
}

if let Some(bloom_pruner) = &self.pruning_ctx.bloom_pruner {
self.block_pruning(
bloom_pruner,
segment_location,
block_metas,
block_meta_indexes,
matched_rows_map,
)
.await
} else {
// if no available filter pruners, just prune the blocks by
// using zone map index, and do not spawn async tasks
self.block_pruning_sync(
// sync pruning without a bloom index.
None => self.block_pruning_sync(
segment_location,
block_metas,
block_meta_indexes,
matched_rows_map,
)
),
}
}

/// Apply internal column pruning.
fn internal_column_pruning(
&self,
block_metas: &[Arc<BlockMeta>],
) -> Vec<(usize, Arc<BlockMeta>)> {
match &self.pruning_ctx.internal_column_pruner {
Some(pruner) => block_metas
.iter()
.enumerate()
.filter(|(_, block_meta)| {
pruner.should_keep(BLOCK_NAME_COL_NAME, &block_meta.location.0)
})
.map(|(index, block_meta)| (index, block_meta.clone()))
.collect(),
None => block_metas
.iter()
.enumerate()
.map(|(index, block_meta)| (index, block_meta.clone()))
.collect(),
}
}

/// Apply inverted index pruning.
async fn inverted_index_pruning(
&self,
segment_location: &SegmentLocation,
block_metas: &[Arc<BlockMeta>],
block_meta_indexes: Vec<(usize, Arc<BlockMeta>)>,
) -> Result<(
Vec<(usize, Arc<BlockMeta>)>,
Option<BTreeMap<usize, Vec<(usize, F32)>>>,
)> {
match &self.pruning_ctx.inverted_index_pruner {
Some(inverted_index_pruner) => {
// Get the row count of each block.
let row_counts: Vec<usize> = block_metas
.iter()
.scan(0, |state, meta| {
*state += meta.row_count as usize;
Some(*state)
})
.collect();
let block_matched_rows_map = inverted_index_pruner
.should_keep_block(&segment_location.location.0, row_counts)?;

let filtered_block_meta_indexes = block_meta_indexes
.into_iter()
.filter(|(idx, _)| block_matched_rows_map.contains_key(idx))
.collect();

Ok((filtered_block_meta_indexes, Some(block_matched_rows_map)))
}
None => Ok((block_meta_indexes, None)),
}
}

Expand Down

0 comments on commit 707396d

Please sign in to comment.