Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bloom-filter): filter rows with segment precision #5286

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ fst.workspace = true
futures.workspace = true
greptime-proto.workspace = true
mockall.workspace = true
parquet.workspace = true
pin-project.workspace = true
prost.workspace = true
regex.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/bloom_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct BloomFilterMeta {
}

/// The location of the bloom filter segment in the file.
#[derive(Debug, Serialize, Deserialize, Clone, Hash, PartialEq, Eq)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Hash, PartialEq, Eq)]
pub struct BloomFilterSegmentLocation {
/// The offset of the bloom filter segment in the file.
pub offset: u64,
Expand Down
203 changes: 113 additions & 90 deletions src/index/src/bloom_filter/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashSet};

use parquet::arrow::arrow_reader::RowSelection;
use parquet::file::metadata::RowGroupMetaData;
use std::collections::HashSet;
use std::ops::Range;

use crate::bloom_filter::error::Result;
use crate::bloom_filter::reader::BloomFilterReader;
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes};

/// Enumerates types of predicates for value filtering.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Predicate {
/// Predicate for matching values in a list.
InList(InListPredicate),
}

/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InListPredicate {
/// List of acceptable values.
pub list: HashSet<Bytes>,
}
use crate::bloom_filter::{BloomFilterMeta, Bytes};

pub struct BloomFilterApplier {
reader: Box<dyn BloomFilterReader + Send>,
Expand All @@ -48,86 +31,126 @@ impl BloomFilterApplier {
Ok(Self { reader, meta })
}

/// Searches for matching row groups using bloom filters.
///
/// This method applies bloom filter index to eliminate row groups that definitely
/// don't contain the searched values. It works by:
///
/// 1. Computing prefix sums for row counts
/// 2. Calculating bloom filter segment locations for each row group
/// 1. A row group may span multiple bloom filter segments
/// 3. Probing bloom filter segments
/// 4. Removing non-matching row groups from the basement
/// 1. If a row group doesn't match any bloom filter segment with any probe, it is removed
///
/// # Note
/// The method modifies the `basement` map in-place by removing row groups that
/// don't match the bloom filter criteria.
/// Searches ranges of rows that match the given probes in the given search range.
pub async fn search(
&mut self,
probes: &HashSet<Bytes>,
row_group_metas: &[RowGroupMetaData],
basement: &mut BTreeMap<usize, Option<RowSelection>>,
) -> Result<()> {
// 0. Fast path - if basement is empty return empty vec
if basement.is_empty() {
return Ok(());
}

// 1. Compute prefix sum for row counts
let mut sum = 0usize;
let mut prefix_sum = Vec::with_capacity(row_group_metas.len() + 1);
prefix_sum.push(0usize);
for meta in row_group_metas {
sum += meta.num_rows() as usize;
prefix_sum.push(sum);
}

// 2. Calculate bloom filter segment locations
let mut row_groups_to_remove = HashSet::new();
for &row_group_idx in basement.keys() {
// TODO(ruihang): support further filter over row selection

// todo: dedup & overlap
let rows_range_start = prefix_sum[row_group_idx] / self.meta.rows_per_segment;
let rows_range_end = (prefix_sum[row_group_idx + 1] as f64
/ self.meta.rows_per_segment as f64)
.ceil() as usize;

let mut is_any_range_hit = false;
for i in rows_range_start..rows_range_end {
// 3. Probe each bloom filter segment
let loc = BloomFilterSegmentLocation {
offset: self.meta.bloom_filter_segments[i].offset,
size: self.meta.bloom_filter_segments[i].size,
elem_count: self.meta.bloom_filter_segments[i].elem_count,
};
let bloom = self.reader.bloom_filter(&loc).await?;

// Check if any probe exists in bloom filter
let mut matches = false;
for probe in probes {
if bloom.contains(probe) {
matches = true;
break;
search_range: Range<usize>,
) -> Result<Vec<Range<usize>>> {
let rows_per_segment = self.meta.rows_per_segment;
let start_seg = search_range.start / rows_per_segment;
let end_seg = search_range.end.div_ceil(rows_per_segment);

let locs = &self.meta.bloom_filter_segments[start_seg..end_seg];
let bfs = self.reader.bloom_filter_vec(locs).await?;

let mut ranges: Vec<Range<usize>> = Vec::with_capacity(end_seg - start_seg);
for (seg_id, bloom) in (start_seg..end_seg).zip(bfs) {
let start = seg_id * rows_per_segment;
for probe in probes {
if bloom.contains(probe) {
let end = (start + rows_per_segment).min(search_range.end);
let start = start.max(search_range.start);

match ranges.last_mut() {
Some(last) if last.end == start => {
last.end = end;
}
_ => {
ranges.push(start..end);
}
}
}

is_any_range_hit |= matches;
if matches {
break;
}
}
if !is_any_range_hit {
row_groups_to_remove.insert(row_group_idx);
}
}

// 4. Remove row groups that do not match any bloom filter segment
for row_group_idx in row_groups_to_remove {
basement.remove(&row_group_idx);
Ok(ranges)
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use futures::io::Cursor;

use super::*;
use crate::bloom_filter::creator::BloomFilterCreator;
use crate::bloom_filter::reader::BloomFilterReaderImpl;
use crate::external_provider::MockExternalTempFileProvider;

#[tokio::test]
#[allow(clippy::single_range_in_vec_init)]
async fn test_appliter() {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(
4,
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
);

let rows = vec![
vec![b"row00".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
vec![b"row01".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
vec![b"row02".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
vec![b"row03".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
vec![b"row04".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
vec![b"row05".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
vec![b"row06".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
vec![b"row07".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
vec![b"row08".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
vec![b"row09".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
vec![b"row10".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
vec![b"row11".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
];

let cases = vec![
(vec![b"row00".to_vec()], 0..12, vec![0..4]), // search one row in full range
(vec![b"row05".to_vec()], 4..8, vec![4..8]), // search one row in partial range
(vec![b"row03".to_vec()], 4..8, vec![]), // search for a row that doesn't exist in the partial range
(
vec![b"row01".to_vec(), b"row06".to_vec()],
0..12,
vec![0..8],
), // search multiple rows in multiple ranges
(
vec![b"row01".to_vec(), b"row11".to_vec()],
0..12,
vec![0..4, 8..12],
), // search multiple rows in multiple ranges
(vec![b"row99".to_vec()], 0..12, vec![]), // search for a row that doesn't exist in the full range
(vec![b"row00".to_vec()], 12..12, vec![]), // search in an empty range
(
vec![b"row04".to_vec(), b"row05".to_vec()],
0..12,
vec![4..8],
discord9 marked this conversation as resolved.
Show resolved Hide resolved
), // search multiple rows in same segment
(vec![b"seg01".to_vec()], 0..12, vec![4..8]), // search rows in a segment
(vec![b"seg01".to_vec()], 6..12, vec![6..8]), // search rows in a segment in partial range
(vec![b"overl".to_vec()], 0..12, vec![0..8]), // search rows in multiple segments
(vec![b"overl".to_vec()], 2..12, vec![2..8]), // search range starts from the middle of a segment
(vec![b"overp".to_vec()], 0..10, vec![4..10]), // search range ends at the middle of a segment
];

for row in rows {
creator.push_row_elems(row).await.unwrap();
}

Ok(())
creator.finish(&mut writer).await.unwrap();

let bytes = writer.into_inner();

let reader = BloomFilterReaderImpl::new(bytes);

let mut applier = BloomFilterApplier::new(Box::new(reader)).await.unwrap();

for (probes, search_range, expected) in cases {
let probes: HashSet<Bytes> = probes.into_iter().collect();
let ranges = applier.search(&probes, search_range).await.unwrap();
assert_eq!(ranges, expected);
}
}
}
25 changes: 25 additions & 0 deletions src/index/src/bloom_filter/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,31 @@ pub trait BloomFilterReader {
.expected_items(loc.elem_count);
Ok(bm)
}

async fn bloom_filter_vec(
&mut self,
locs: &[BloomFilterSegmentLocation],
) -> Result<Vec<BloomFilter>> {
let ranges = locs
.iter()
.map(|l| l.offset..l.offset + l.size)
.collect::<Vec<_>>();
let bss = self.read_vec(&ranges).await?;

let mut result = Vec::with_capacity(bss.len());
for (bs, loc) in bss.into_iter().zip(locs.iter()) {
let vec = bs
.chunks_exact(std::mem::size_of::<u64>())
.map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
.collect();
let bm = BloomFilter::from_vec(vec)
.seed(&SEED)
.expected_items(loc.elem_count);
result.push(bm);
}

Ok(result)
}
}

/// `BloomFilterReaderImpl` reads the bloom filter from the file.
Expand Down
6 changes: 6 additions & 0 deletions src/mito2/src/cache/index/bloom_filter_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Range;
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -101,6 +102,11 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
.map(|b| b.into())
}

async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
// TODO(zhongzc): wait for #5276 to make it utilize cache.
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
Ok(self.inner.read_vec(ranges).await?)
}

/// Reads the meta information of the bloom filter.
async fn metadata(&mut self) -> Result<BloomFilterMeta> {
if let Some(cached) = self.cache.get_metadata((self.file_id, self.column_id)) {
Expand Down
Loading
Loading