From ae47f0deab864f883cdb91ddeaf711c09083878e Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 3 Jan 2025 11:02:30 +0000 Subject: [PATCH 1/3] fix(bloom-filter): filter rows with segment precision Signed-off-by: Zhenchi --- Cargo.lock | 1 - src/index/Cargo.toml | 1 - src/index/src/bloom_filter.rs | 2 +- src/index/src/bloom_filter/applier.rs | 198 +++-- src/index/src/bloom_filter/reader.rs | 25 + .../src/cache/index/bloom_filter_index.rs | 6 + .../src/sst/index/bloom_filter/applier.rs | 770 +++++++----------- .../sst/index/bloom_filter/applier/builder.rs | 531 ++++++++++++ .../src/sst/index/bloom_filter/creator.rs | 10 +- src/mito2/src/sst/parquet/reader.rs | 110 +-- src/mito2/src/test_util/sst_util.rs | 1 - src/mito2/src/test_util/version_util.rs | 1 - 12 files changed, 1026 insertions(+), 630 deletions(-) create mode 100644 src/mito2/src/sst/index/bloom_filter/applier/builder.rs diff --git a/Cargo.lock b/Cargo.lock index 6c7e56398e4b..93499ed4268a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5290,7 +5290,6 @@ dependencies = [ "futures", "greptime-proto", "mockall", - "parquet", "pin-project", "prost 0.12.6", "rand", diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index 898be43b7d56..f46c64a17606 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -22,7 +22,6 @@ fst.workspace = true futures.workspace = true greptime-proto.workspace = true mockall.workspace = true -parquet.workspace = true pin-project.workspace = true prost.workspace = true regex.workspace = true diff --git a/src/index/src/bloom_filter.rs b/src/index/src/bloom_filter.rs index 600f6e80e84d..43eebe592e30 100644 --- a/src/index/src/bloom_filter.rs +++ b/src/index/src/bloom_filter.rs @@ -45,7 +45,7 @@ pub struct BloomFilterMeta { } /// The location of the bloom filter segment in the file. -#[derive(Debug, Serialize, Deserialize, Clone, Hash, PartialEq, Eq)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, Hash, PartialEq, Eq)] pub struct BloomFilterSegmentLocation { /// The offset of the bloom filter segment in the file. pub offset: u64, diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index 2750cbb92b6b..47fc9926318f 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -12,29 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashSet}; - -use parquet::arrow::arrow_reader::RowSelection; -use parquet::file::metadata::RowGroupMetaData; +use std::collections::HashSet; +use std::ops::Range; use crate::bloom_filter::error::Result; use crate::bloom_filter::reader::BloomFilterReader; -use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes}; - -/// Enumerates types of predicates for value filtering. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Predicate { - /// Predicate for matching values in a list. - InList(InListPredicate), -} - -/// `InListPredicate` contains a list of acceptable values. A value needs to match at least -/// one of the elements (logical OR semantic) for the predicate to be satisfied. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct InListPredicate { - /// List of acceptable values. - pub list: HashSet, -} +use crate::bloom_filter::{BloomFilterMeta, Bytes}; pub struct BloomFilterApplier { reader: Box, @@ -48,86 +31,121 @@ impl BloomFilterApplier { Ok(Self { reader, meta }) } - /// Searches for matching row groups using bloom filters. - /// - /// This method applies bloom filter index to eliminate row groups that definitely - /// don't contain the searched values. It works by: - /// - /// 1. Computing prefix sums for row counts - /// 2. Calculating bloom filter segment locations for each row group - /// 1. A row group may span multiple bloom filter segments - /// 3. Probing bloom filter segments - /// 4. Removing non-matching row groups from the basement - /// 1. If a row group doesn't match any bloom filter segment with any probe, it is removed - /// - /// # Note - /// The method modifies the `basement` map in-place by removing row groups that - /// don't match the bloom filter criteria. + /// Searches ranges of rows that match the given probes in the given search range. pub async fn search( &mut self, probes: &HashSet, - row_group_metas: &[RowGroupMetaData], - basement: &mut BTreeMap>, - ) -> Result<()> { - // 0. Fast path - if basement is empty return empty vec - if basement.is_empty() { - return Ok(()); - } - - // 1. Compute prefix sum for row counts - let mut sum = 0usize; - let mut prefix_sum = Vec::with_capacity(row_group_metas.len() + 1); - prefix_sum.push(0usize); - for meta in row_group_metas { - sum += meta.num_rows() as usize; - prefix_sum.push(sum); - } - - // 2. Calculate bloom filter segment locations - let mut row_groups_to_remove = HashSet::new(); - for &row_group_idx in basement.keys() { - // TODO(ruihang): support further filter over row selection - - // todo: dedup & overlap - let rows_range_start = prefix_sum[row_group_idx] / self.meta.rows_per_segment; - let rows_range_end = (prefix_sum[row_group_idx + 1] as f64 - / self.meta.rows_per_segment as f64) - .ceil() as usize; - - let mut is_any_range_hit = false; - for i in rows_range_start..rows_range_end { - // 3. Probe each bloom filter segment - let loc = BloomFilterSegmentLocation { - offset: self.meta.bloom_filter_segments[i].offset, - size: self.meta.bloom_filter_segments[i].size, - elem_count: self.meta.bloom_filter_segments[i].elem_count, - }; - let bloom = self.reader.bloom_filter(&loc).await?; - - // Check if any probe exists in bloom filter - let mut matches = false; - for probe in probes { - if bloom.contains(probe) { - matches = true; - break; + search_range: Range, + ) -> Result>> { + let rows_per_segment = self.meta.rows_per_segment; + let start_seg = search_range.start / rows_per_segment; + let end_seg = search_range.end.div_ceil(rows_per_segment); + + let locs = &self.meta.bloom_filter_segments[start_seg..end_seg]; + let bfs = self.reader.bloom_filter_vec(locs).await?; + + let mut ranges: Vec> = Vec::with_capacity(end_seg - start_seg); + for (seg_id, bloom) in (start_seg..end_seg).zip(bfs) { + let start = seg_id * rows_per_segment; + for probe in probes { + if bloom.contains(probe) { + let end = (start + rows_per_segment).min(search_range.end); + let start = start.max(search_range.start); + + match ranges.last_mut() { + Some(last) if last.end == start => { + last.end = end; + } + _ => { + ranges.push(start..end); + } } - } - - is_any_range_hit |= matches; - if matches { break; } } - if !is_any_range_hit { - row_groups_to_remove.insert(row_group_idx); - } } - // 4. Remove row groups that do not match any bloom filter segment - for row_group_idx in row_groups_to_remove { - basement.remove(&row_group_idx); + Ok(ranges) + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicUsize; + use std::sync::Arc; + + use futures::io::Cursor; + + use super::*; + use crate::bloom_filter::creator::BloomFilterCreator; + use crate::bloom_filter::reader::BloomFilterReaderImpl; + use crate::external_provider::MockExternalTempFileProvider; + + #[tokio::test] + #[allow(clippy::single_range_in_vec_init)] + async fn test_appliter() { + let mut writer = Cursor::new(Vec::new()); + let mut creator = BloomFilterCreator::new( + 4, + Arc::new(MockExternalTempFileProvider::new()), + Arc::new(AtomicUsize::new(0)), + None, + ); + + let rows = vec![ + vec![b"row00".to_vec(), b"seg00".to_vec(), b"overl".to_vec()], + vec![b"row01".to_vec(), b"seg00".to_vec(), b"overl".to_vec()], + vec![b"row02".to_vec(), b"seg00".to_vec(), b"overl".to_vec()], + vec![b"row03".to_vec(), b"seg00".to_vec(), b"overl".to_vec()], + vec![b"row04".to_vec(), b"seg01".to_vec(), b"overl".to_vec()], + vec![b"row05".to_vec(), b"seg01".to_vec(), b"overl".to_vec()], + vec![b"row06".to_vec(), b"seg01".to_vec(), b"overp".to_vec()], + vec![b"row07".to_vec(), b"seg01".to_vec(), b"overp".to_vec()], + vec![b"row08".to_vec(), b"seg02".to_vec(), b"overp".to_vec()], + vec![b"row09".to_vec(), b"seg02".to_vec(), b"overp".to_vec()], + vec![b"row10".to_vec(), b"seg02".to_vec(), b"overp".to_vec()], + vec![b"row11".to_vec(), b"seg02".to_vec(), b"overp".to_vec()], + ]; + + let cases = vec![ + (vec![b"row00".to_vec()], 0..12, vec![0..4]), // search one row in full range + (vec![b"row05".to_vec()], 4..8, vec![4..8]), // search one row in partial range + (vec![b"row03".to_vec()], 4..8, vec![]), // search for a row that doesn't exist in the partial range + ( + vec![b"row01".to_vec(), b"row06".to_vec()], + 0..12, + vec![0..8], + ), // search multiple rows in multiple ranges + (vec![b"row99".to_vec()], 0..12, vec![]), // search for a row that doesn't exist in the full range + (vec![b"row00".to_vec()], 12..12, vec![]), // search in an empty range + ( + vec![b"row04".to_vec(), b"row05".to_vec()], + 0..12, + vec![4..8], + ), // search multiple rows in same segment + (vec![b"seg01".to_vec()], 0..12, vec![4..8]), // search rows in a segment + (vec![b"seg01".to_vec()], 6..12, vec![6..8]), // search rows in a segment in partial range + (vec![b"overl".to_vec()], 0..12, vec![0..8]), // search rows in multiple segments + (vec![b"overl".to_vec()], 2..12, vec![2..8]), // search range starts from the middle of a segment + (vec![b"overp".to_vec()], 0..10, vec![4..10]), // search range ends at the middle of a segment + ]; + + for row in rows { + creator.push_row_elems(row).await.unwrap(); } - Ok(()) + creator.finish(&mut writer).await.unwrap(); + + let bytes = writer.into_inner(); + + let reader = BloomFilterReaderImpl::new(bytes); + + let mut applier = BloomFilterApplier::new(Box::new(reader)).await.unwrap(); + + for (probes, search_range, expected) in cases { + let probes: HashSet = probes.into_iter().collect(); + let ranges = applier.search(&probes, search_range).await.unwrap(); + assert_eq!(ranges, expected); + } } } diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index 02085fa671f7..3c1fc6d77d99 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -63,6 +63,31 @@ pub trait BloomFilterReader { .expected_items(loc.elem_count); Ok(bm) } + + async fn bloom_filter_vec( + &mut self, + locs: &[BloomFilterSegmentLocation], + ) -> Result> { + let ranges = locs + .iter() + .map(|l| l.offset..l.offset + l.size) + .collect::>(); + let bss = self.read_vec(&ranges).await?; + + let mut result = Vec::with_capacity(bss.len()); + for (bs, loc) in bss.into_iter().zip(locs.iter()) { + let vec = bs + .chunks_exact(std::mem::size_of::()) + .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap())) + .collect(); + let bm = BloomFilter::from_vec(vec) + .seed(&SEED) + .expected_items(loc.elem_count); + result.push(bm); + } + + Ok(result) + } } /// `BloomFilterReaderImpl` reads the bloom filter from the file. diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index 08ac7e8273d1..f01d3ab65e64 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Range; use std::sync::Arc; use async_trait::async_trait; @@ -101,6 +102,11 @@ impl BloomFilterReader for CachedBloomFilterIndexBl .map(|b| b.into()) } + async fn read_vec(&mut self, ranges: &[Range]) -> Result> { + // TODO(zhongzc): wait for #5276 to make it utilize cache. + Ok(self.inner.read_vec(ranges).await?) + } + /// Reads the meta information of the bloom filter. async fn metadata(&mut self) -> Result { if let Some(cached) = self.cache.get_metadata((self.file_id, self.column_id)) { diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 1e7533a7044c..218431a2b8af 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -12,25 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap, HashSet}; +mod builder; + +use std::collections::HashMap; +use std::ops::Range; use std::sync::Arc; use common_base::range_read::RangeReader; use common_telemetry::warn; -use datafusion_common::ScalarValue; -use datafusion_expr::expr::InList; -use datafusion_expr::{BinaryExpr, Expr, Operator}; -use datatypes::data_type::ConcreteDataType; -use datatypes::value::Value; -use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate, Predicate}; +use index::bloom_filter::applier::BloomFilterApplier; use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; use object_store::ObjectStore; -use parquet::arrow::arrow_reader::RowSelection; -use parquet::file::metadata::RowGroupMetaData; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; -use snafu::{OptionExt, ResultExt}; -use store_api::metadata::RegionMetadata; +use snafu::ResultExt; use store_api::storage::{ColumnId, RegionId}; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; @@ -38,32 +33,49 @@ use crate::cache::index::bloom_filter_index::{ BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, }; use crate::error::{ - ApplyBloomFilterIndexSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Error, MetadataSnafu, - PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result, + ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, + Result, }; use crate::metrics::INDEX_APPLY_ELAPSED; -use crate::row_converter::SortField; use crate::sst::file::FileId; +pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder; +use crate::sst::index::bloom_filter::applier::builder::Predicate; use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; -use crate::sst::index::codec::IndexValueCodec; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; use crate::sst::index::TYPE_BLOOM_FILTER_INDEX; use crate::sst::location; pub(crate) type BloomFilterIndexApplierRef = Arc; +/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file. pub struct BloomFilterIndexApplier { + /// Directory of the region. region_dir: String, + + /// ID of the region. region_id: RegionId, + + /// Object store to read the index file. object_store: ObjectStore, + + /// File cache to read the index file. file_cache: Option, + + /// Factory to create puffin manager. puffin_manager_factory: PuffinManagerFactory, + + /// Cache for puffin metadata. puffin_metadata_cache: Option, + + /// Cache for bloom filter index. bloom_filter_index_cache: Option, + + /// Bloom filter predicates. filters: HashMap>, } impl BloomFilterIndexApplier { + /// Creates a new `BloomFilterIndexApplier`. pub fn new( region_dir: String, region_id: RegionId, @@ -104,19 +116,38 @@ impl BloomFilterIndexApplier { self } - /// Applies bloom filter predicates to the provided SST file and returns a bitmap - /// indicating which segments may contain matching rows + /// Applies bloom filter predicates to the provided SST file and returns a + /// list of row group ranges that match the predicates. + /// + /// The `row_groups` iterator provides the row group lengths and whether to search in the row group. pub async fn apply( &self, file_id: FileId, file_size_hint: Option, - row_group_metas: &[RowGroupMetaData], - basement: &mut BTreeMap>, - ) -> Result<()> { + row_groups: impl Iterator, + ) -> Result>)>> { let _timer = INDEX_APPLY_ELAPSED .with_label_values(&[TYPE_BLOOM_FILTER_INDEX]) .start_timer(); + // Calculates row groups' ranges based on start of the file. + let mut input = Vec::with_capacity(row_groups.size_hint().0); + let mut start = 0; + for (i, (len, to_search)) in row_groups.enumerate() { + let end = start + len; + if to_search { + input.push((i, start..end)); + } + start = end; + } + + // Initializes output with input ranges, but ranges are based on start of the file not the row group, + // so we need to adjust them later. + let mut output = input + .iter() + .map(|(i, range)| (*i, vec![range.clone()])) + .collect::>(); + for (column_id, predicates) in &self.filters { let mut blob = match self .blob_reader(file_id, *column_id, file_size_hint) @@ -136,18 +167,28 @@ impl BloomFilterIndexApplier { BloomFilterReaderImpl::new(blob), bloom_filter_cache.clone(), ); - self.apply_filters(reader, predicates, row_group_metas, basement) + self.apply_filters(reader, predicates, &input, &mut output) .await .context(ApplyBloomFilterIndexSnafu)?; } else { let reader = BloomFilterReaderImpl::new(blob); - self.apply_filters(reader, predicates, row_group_metas, basement) + self.apply_filters(reader, predicates, &input, &mut output) .await .context(ApplyBloomFilterIndexSnafu)?; } } - Ok(()) + // adjust ranges to be based on row group + for ((_, output), (_, input)) in output.iter_mut().zip(input) { + let start = input.start; + for range in output.iter_mut() { + range.start -= start; + range.end -= start; + } + } + output.retain(|(_, ranges)| !ranges.is_empty()); + + Ok(output) } /// Creates a blob reader from the cached or remote index file. @@ -159,7 +200,10 @@ impl BloomFilterIndexApplier { column_id: ColumnId, file_size_hint: Option, ) -> Result> { - let reader = match self.cached_blob_reader(file_id, column_id).await { + let reader = match self + .cached_blob_reader(file_id, column_id, file_size_hint) + .await + { Ok(Some(puffin_reader)) => puffin_reader, other => { if let Err(err) = other { @@ -192,6 +236,7 @@ impl BloomFilterIndexApplier { &self, file_id: FileId, column_id: ColumnId, + file_size_hint: Option, ) -> Result> { let Some(file_cache) = &self.file_cache else { return Ok(None); @@ -209,6 +254,7 @@ impl BloomFilterIndexApplier { .reader(&puffin_file_name) .await .context(PuffinBuildReaderSnafu)? + .with_file_size_hint(file_size_hint) .blob(&Self::column_blob_name(column_id)) .await .context(PuffinReadBlobSnafu)? @@ -253,17 +299,31 @@ impl BloomFilterIndexApplier { &self, reader: R, predicates: &[Predicate], - row_group_metas: &[RowGroupMetaData], - basement: &mut BTreeMap>, + input: &[(usize, Range)], + output: &mut [(usize, Vec>)], ) -> std::result::Result<(), index::bloom_filter::error::Error> { let mut applier = BloomFilterApplier::new(Box::new(reader)).await?; - for predicate in predicates { - match predicate { - Predicate::InList(in_list) => { - applier - .search(&in_list.list, row_group_metas, basement) - .await?; + for ((_, r), (_, output)) in input.iter().zip(output.iter_mut()) { + // All rows are filtered out, skip the search + if output.is_empty() { + continue; + } + + for predicate in predicates { + match predicate { + Predicate::InList(in_list) => { + let res = applier.search(&in_list.list, r.clone()).await?; + if res.is_empty() { + output.clear(); + break; + } + + *output = intersect_ranges(output, &res); + if output.is_empty() { + break; + } + } } } } @@ -272,491 +332,241 @@ impl BloomFilterIndexApplier { } } -fn is_blob_not_found(err: &Error) -> bool { - matches!( - err, - Error::PuffinBuildReader { - source: puffin::error::Error::BlobNotFound { .. }, - .. - } - ) -} - -pub struct BloomFilterIndexApplierBuilder<'a> { - region_dir: String, - object_store: ObjectStore, - metadata: &'a RegionMetadata, - puffin_manager_factory: PuffinManagerFactory, - file_cache: Option, - puffin_metadata_cache: Option, - bloom_filter_index_cache: Option, - output: HashMap>, -} - -impl<'a> BloomFilterIndexApplierBuilder<'a> { - pub fn new( - region_dir: String, - object_store: ObjectStore, - metadata: &'a RegionMetadata, - puffin_manager_factory: PuffinManagerFactory, - ) -> Self { - Self { - region_dir, - object_store, - metadata, - puffin_manager_factory, - file_cache: None, - puffin_metadata_cache: None, - bloom_filter_index_cache: None, - output: HashMap::default(), - } - } - - pub fn with_file_cache(mut self, file_cache: Option) -> Self { - self.file_cache = file_cache; - self - } - - pub fn with_puffin_metadata_cache( - mut self, - puffin_metadata_cache: Option, - ) -> Self { - self.puffin_metadata_cache = puffin_metadata_cache; - self - } - - pub fn with_bloom_filter_index_cache( - mut self, - bloom_filter_index_cache: Option, - ) -> Self { - self.bloom_filter_index_cache = bloom_filter_index_cache; - self - } - - /// Builds the applier with given filter expressions - pub fn build(mut self, exprs: &[Expr]) -> Result> { - for expr in exprs { - self.traverse_and_collect(expr); - } - - if self.output.is_empty() { - return Ok(None); - } - - let applier = BloomFilterIndexApplier::new( - self.region_dir, - self.metadata.region_id, - self.object_store, - self.puffin_manager_factory, - self.output, - ) - .with_file_cache(self.file_cache) - .with_puffin_metadata_cache(self.puffin_metadata_cache) - .with_bloom_filter_cache(self.bloom_filter_index_cache); - - Ok(Some(applier)) - } - - /// Recursively traverses expressions to collect bloom filter predicates - fn traverse_and_collect(&mut self, expr: &Expr) { - let res = match expr { - Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { - Operator::And => { - self.traverse_and_collect(left); - self.traverse_and_collect(right); - Ok(()) - } - Operator::Eq => self.collect_eq(left, right), - _ => Ok(()), - }, - Expr::InList(in_list) => self.collect_in_list(in_list), - _ => Ok(()), - }; +/// Intersects two lists of ranges and returns the intersection. +/// +/// The input lists are assumed to be sorted and non-overlapping. +fn intersect_ranges(lhs: &[Range], rhs: &[Range]) -> Vec> { + let mut i = 0; + let mut j = 0; - if let Err(err) = res { - warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}"); - } - } - - /// Helper function to get the column id and type - fn column_id_and_type( - &self, - column_name: &str, - ) -> Result> { - let column = self - .metadata - .column_by_name(column_name) - .context(ColumnNotFoundSnafu { - column: column_name, - })?; - - Ok(Some(( - column.column_id, - column.column_schema.data_type.clone(), - ))) - } - - /// Collects an equality expression (column = value) - fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> { - let (col, lit) = match (left, right) { - (Expr::Column(col), Expr::Literal(lit)) => (col, lit), - (Expr::Literal(lit), Expr::Column(col)) => (col, lit), - _ => return Ok(()), - }; - if lit.is_null() { - return Ok(()); - } - let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else { - return Ok(()); - }; - let value = encode_lit(lit, data_type)?; - - // Create bloom filter predicate - let mut set = HashSet::new(); - set.insert(value); - let predicate = Predicate::InList(InListPredicate { list: set }); - - // Add to output predicates - self.output.entry(column_id).or_default().push(predicate); - - Ok(()) - } - - /// Collects an in list expression in the form of `column IN (lit, lit, ...)`. - fn collect_in_list(&mut self, in_list: &InList) -> Result<()> { - // Only collect InList predicates if they reference a column - let Expr::Column(column) = &in_list.expr.as_ref() else { - return Ok(()); - }; - if in_list.list.is_empty() || in_list.negated { - return Ok(()); - } + let mut output = Vec::new(); + while i < lhs.len() && j < rhs.len() { + let r1 = &lhs[i]; + let r2 = &rhs[j]; - let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else { - return Ok(()); - }; + // Find intersection if exists + let start = r1.start.max(r2.start); + let end = r1.end.min(r2.end); - // Convert all non-null literals to predicates - let predicates = in_list - .list - .iter() - .filter_map(Self::nonnull_lit) - .map(|lit| encode_lit(lit, data_type.clone())); - - // Collect successful conversions - let mut valid_predicates = HashSet::new(); - for predicate in predicates { - match predicate { - Ok(p) => { - valid_predicates.insert(p); - } - Err(e) => warn!(e; "Failed to convert value in InList"), - } + if start < end { + output.push(start..end); } - if !valid_predicates.is_empty() { - self.output - .entry(column_id) - .or_default() - .push(Predicate::InList(InListPredicate { - list: valid_predicates, - })); + // Move forward the range that ends first + if r1.end < r2.end { + i += 1; + } else { + j += 1; } - - Ok(()) } - /// Helper function to get non-null literal value - fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> { - match expr { - Expr::Literal(lit) if !lit.is_null() => Some(lit), - _ => None, - } - } + output } -// TODO(ruihang): extract this and the one under inverted_index into a common util mod. -/// Helper function to encode a literal into bytes. -fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result> { - let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?; - let mut bytes = vec![]; - let field = SortField::new(data_type); - IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?; - Ok(bytes) +fn is_blob_not_found(err: &Error) -> bool { + matches!( + err, + Error::PuffinBuildReader { + source: puffin::error::Error::BlobNotFound { .. }, + .. + } + ) } #[cfg(test)] mod tests { - use api::v1::SemanticType; - use datafusion_common::Column; - use datatypes::schema::ColumnSchema; - use object_store::services::Memory; - use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; - use super::*; - - fn test_region_metadata() -> RegionMetadata { - let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678)); - builder - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "column1", - ConcreteDataType::string_datatype(), - false, - ), - semantic_type: SemanticType::Tag, - column_id: 1, - }) - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "column2", - ConcreteDataType::int64_datatype(), - false, - ), - semantic_type: SemanticType::Field, - column_id: 2, - }) - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "column3", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - semantic_type: SemanticType::Timestamp, - column_id: 3, - }) - .primary_key(vec![1]); - builder.build().unwrap() - } + use datafusion_expr::{col, lit, Expr}; + use futures::future::BoxFuture; + use puffin::puffin_manager::PuffinWriter; + use store_api::metadata::RegionMetadata; - fn test_object_store() -> ObjectStore { - ObjectStore::new(Memory::default()).unwrap().finish() - } + use super::*; + use crate::sst::index::bloom_filter::creator::tests::{ + mock_object_store, mock_region_metadata, new_batch, new_intm_mgr, + }; + use crate::sst::index::bloom_filter::creator::BloomFilterIndexer; - fn column(name: &str) -> Expr { - Expr::Column(Column { - relation: None, - name: name.to_string(), - }) - } + #[allow(clippy::type_complexity)] + fn tester( + region_dir: String, + object_store: ObjectStore, + metadata: &RegionMetadata, + puffin_manager_factory: PuffinManagerFactory, + file_id: FileId, + ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec>)>> + + use<'_> { + move |exprs, row_groups| { + let region_dir = region_dir.clone(); + let object_store = object_store.clone(); + let metadata = metadata.clone(); + let puffin_manager_factory = puffin_manager_factory.clone(); + let exprs = exprs.to_vec(); + + Box::pin(async move { + let builder = BloomFilterIndexApplierBuilder::new( + region_dir, + object_store, + &metadata, + puffin_manager_factory, + ); - fn string_lit(s: impl Into) -> Expr { - Expr::Literal(ScalarValue::Utf8(Some(s.into()))) + let applier = builder.build(&exprs).unwrap().unwrap(); + applier + .apply(file_id, None, row_groups.into_iter()) + .await + .unwrap() + }) + } } - #[test] - fn test_build_with_exprs() { - let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_"); - let metadata = test_region_metadata(); - let builder = BloomFilterIndexApplierBuilder::new( - "test".to_string(), - test_object_store(), - &metadata, - factory, + #[tokio::test] + #[allow(clippy::single_range_in_vec_init)] + async fn test_bloom_filter_applier() { + // tag_str: + // - type: string + // - index: bloom filter + // - granularity: 2 + // - column_id: 1 + // + // ts: + // - type: timestamp + // - index: time index + // - column_id: 2 + // + // field_u64: + // - type: uint64 + // - index: bloom filter + // - granularity: 4 + // - column_id: 3 + let region_metadata = mock_region_metadata(); + let prefix = "test_bloom_filter_applier_"; + let object_store = mock_object_store(); + let intm_mgr = new_intm_mgr(prefix).await; + let memory_usage_threshold = Some(1024); + let file_id = FileId::random(); + let region_dir = "region_dir".to_string(); + let path = location::index_file_path(®ion_dir, file_id); + + let mut indexer = + BloomFilterIndexer::new(file_id, ®ion_metadata, intm_mgr, memory_usage_threshold) + .unwrap() + .unwrap(); + + // push 20 rows + let batch = new_batch("tag1", 0..10); + indexer.update(&batch).await.unwrap(); + let batch = new_batch("tag2", 10..20); + indexer.update(&batch).await.unwrap(); + + let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; + let puffin_manager = factory.build(object_store.clone()); + + let mut puffin_writer = puffin_manager.writer(&path).await.unwrap(); + indexer.finish(&mut puffin_writer).await.unwrap(); + puffin_writer.finish().await.unwrap(); + + let tester = tester( + region_dir.clone(), + object_store.clone(), + ®ion_metadata, + factory.clone(), + file_id, ); - let exprs = vec![Expr::BinaryExpr(BinaryExpr { - left: Box::new(column("column1")), - op: Operator::Eq, - right: Box::new(string_lit("value1")), - })]; - - let result = builder.build(&exprs).unwrap(); - assert!(result.is_some()); - - let filters = result.unwrap().filters; - assert_eq!(filters.len(), 1); - - let column_predicates = filters.get(&1).unwrap(); - assert_eq!(column_predicates.len(), 1); - - let expected = encode_lit( - &ScalarValue::Utf8(Some("value1".to_string())), - ConcreteDataType::string_datatype(), + // rows 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19 + // row group: | o row group | o row group | o row group | o row group | + // tag_str: | o pred | x pred | + let res = tester( + &[col("tag_str").eq(lit("tag1"))], + vec![(5, true), (5, true), (5, true), (5, true)], ) - .unwrap(); - match &column_predicates[0] { - Predicate::InList(p) => { - assert_eq!(p.list.iter().next().unwrap(), &expected); - } - } - } - - fn int64_lit(i: i64) -> Expr { - Expr::Literal(ScalarValue::Int64(Some(i))) + .await; + assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]); + + // rows 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19 + // row group: | o row group | x row group | o row group | o row group | + // tag_str: | o pred | x pred | + let res = tester( + &[col("tag_str").eq(lit("tag1"))], + vec![(5, true), (5, false), (5, true), (5, true)], + ) + .await; + assert_eq!(res, vec![(0, vec![0..5])]); + + // rows 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19 + // row group: | o row group | o row group | o row group | o row group | + // tag_str: | o pred | x pred | + // field_u64: | o pred | x pred | x pred | x pred | x pred | + let res = tester( + &[ + col("tag_str").eq(lit("tag1")), + col("field_u64").eq(lit(1u64)), + ], + vec![(5, true), (5, true), (5, true), (5, true)], + ) + .await; + assert_eq!(res, vec![(0, vec![0..4])]); + + // rows 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19 + // row group: | o row group | o row group | x row group | o row group | + // field_u64: | o pred | x pred | o pred | x pred | x pred | + let res = tester( + &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)], + vec![(5, true), (5, true), (5, false), (5, true)], + ) + .await; + assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]); } #[test] - fn test_build_with_in_list() { - let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_"); - let metadata = test_region_metadata(); - let builder = BloomFilterIndexApplierBuilder::new( - "test".to_string(), - test_object_store(), - &metadata, - factory, + #[allow(clippy::single_range_in_vec_init)] + fn test_intersect_ranges() { + // empty inputs + assert_eq!(intersect_ranges(&[], &[]), Vec::>::new()); + assert_eq!(intersect_ranges(&[1..5], &[]), Vec::>::new()); + assert_eq!(intersect_ranges(&[], &[1..5]), Vec::>::new()); + + // no overlap + assert_eq!( + intersect_ranges(&[1..3, 5..7], &[3..5, 7..9]), + Vec::>::new() ); - let exprs = vec![Expr::InList(InList { - expr: Box::new(column("column2")), - list: vec![int64_lit(1), int64_lit(2), int64_lit(3)], - negated: false, - })]; + // single overlap + assert_eq!(intersect_ranges(&[1..5], &[3..7]), vec![3..5]); - let result = builder.build(&exprs).unwrap(); - assert!(result.is_some()); - - let filters = result.unwrap().filters; - let column_predicates = filters.get(&2).unwrap(); - assert_eq!(column_predicates.len(), 1); - - match &column_predicates[0] { - Predicate::InList(p) => { - assert_eq!(p.list.len(), 3); - } - } - } - - #[test] - fn test_build_with_and_expressions() { - let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_"); - let metadata = test_region_metadata(); - let builder = BloomFilterIndexApplierBuilder::new( - "test".to_string(), - test_object_store(), - &metadata, - factory, + // multiple overlaps + assert_eq!( + intersect_ranges(&[1..5, 7..10, 12..15], &[2..6, 8..13]), + vec![2..5, 8..10, 12..13] ); - let exprs = vec![Expr::BinaryExpr(BinaryExpr { - left: Box::new(Expr::BinaryExpr(BinaryExpr { - left: Box::new(column("column1")), - op: Operator::Eq, - right: Box::new(string_lit("value1")), - })), - op: Operator::And, - right: Box::new(Expr::BinaryExpr(BinaryExpr { - left: Box::new(column("column2")), - op: Operator::Eq, - right: Box::new(int64_lit(42)), - })), - })]; - - let result = builder.build(&exprs).unwrap(); - assert!(result.is_some()); - - let filters = result.unwrap().filters; - assert_eq!(filters.len(), 2); - assert!(filters.contains_key(&1)); - assert!(filters.contains_key(&2)); - } - - #[test] - fn test_build_with_null_values() { - let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_"); - let metadata = test_region_metadata(); - let builder = BloomFilterIndexApplierBuilder::new( - "test".to_string(), - test_object_store(), - &metadata, - factory, + // exact overlap + assert_eq!( + intersect_ranges(&[1..3, 5..7], &[1..3, 5..7]), + vec![1..3, 5..7] ); - let exprs = vec![ - Expr::BinaryExpr(BinaryExpr { - left: Box::new(column("column1")), - op: Operator::Eq, - right: Box::new(Expr::Literal(ScalarValue::Utf8(None))), - }), - Expr::InList(InList { - expr: Box::new(column("column2")), - list: vec![ - int64_lit(1), - Expr::Literal(ScalarValue::Int64(None)), - int64_lit(3), - ], - negated: false, - }), - ]; - - let result = builder.build(&exprs).unwrap(); - assert!(result.is_some()); - - let filters = result.unwrap().filters; - assert!(!filters.contains_key(&1)); // Null equality should be ignored - let column2_predicates = filters.get(&2).unwrap(); - match &column2_predicates[0] { - Predicate::InList(p) => { - assert_eq!(p.list.len(), 2); // Only non-null values should be included - } - } - } - - #[test] - fn test_build_with_invalid_expressions() { - let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_"); - let metadata = test_region_metadata(); - let builder = BloomFilterIndexApplierBuilder::new( - "test".to_string(), - test_object_store(), - &metadata, - factory, + // contained ranges + assert_eq!( + intersect_ranges(&[1..10], &[2..4, 5..7, 8..9]), + vec![2..4, 5..7, 8..9] ); - let exprs = vec![ - // Non-equality operator - Expr::BinaryExpr(BinaryExpr { - left: Box::new(column("column1")), - op: Operator::Gt, - right: Box::new(string_lit("value1")), - }), - // Non-existent column - Expr::BinaryExpr(BinaryExpr { - left: Box::new(column("non_existent")), - op: Operator::Eq, - right: Box::new(string_lit("value")), - }), - // Negated IN list - Expr::InList(InList { - expr: Box::new(column("column2")), - list: vec![int64_lit(1), int64_lit(2)], - negated: true, - }), - ]; - - let result = builder.build(&exprs).unwrap(); - assert!(result.is_none()); - } + // partial overlaps + assert_eq!( + intersect_ranges(&[1..4, 6..9], &[2..7, 8..10]), + vec![2..4, 6..7, 8..9] + ); - #[test] - fn test_build_with_multiple_predicates_same_column() { - let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_"); - let metadata = test_region_metadata(); - let builder = BloomFilterIndexApplierBuilder::new( - "test".to_string(), - test_object_store(), - &metadata, - factory, + // single point overlap + assert_eq!( + intersect_ranges(&[1..3], &[3..5]), + Vec::>::new() ); - let exprs = vec![ - Expr::BinaryExpr(BinaryExpr { - left: Box::new(column("column1")), - op: Operator::Eq, - right: Box::new(string_lit("value1")), - }), - Expr::InList(InList { - expr: Box::new(column("column1")), - list: vec![string_lit("value2"), string_lit("value3")], - negated: false, - }), - ]; - - let result = builder.build(&exprs).unwrap(); - assert!(result.is_some()); - - let filters = result.unwrap().filters; - let column_predicates = filters.get(&1).unwrap(); - assert_eq!(column_predicates.len(), 2); + // large ranges + assert_eq!(intersect_ranges(&[0..100], &[50..150]), vec![50..100]); } } diff --git a/src/mito2/src/sst/index/bloom_filter/applier/builder.rs b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs new file mode 100644 index 000000000000..14b55cb047e6 --- /dev/null +++ b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs @@ -0,0 +1,531 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use common_telemetry::warn; +use datafusion_common::ScalarValue; +use datafusion_expr::expr::InList; +use datafusion_expr::{BinaryExpr, Expr, Operator}; +use datatypes::data_type::ConcreteDataType; +use datatypes::value::Value; +use index::bloom_filter::Bytes; +use object_store::ObjectStore; +use puffin::puffin_manager::cache::PuffinMetadataCacheRef; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadata; +use store_api::storage::ColumnId; + +use crate::cache::file_cache::FileCacheRef; +use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef; +use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, Result}; +use crate::row_converter::SortField; +use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier; +use crate::sst::index::codec::IndexValueCodec; +use crate::sst::index::puffin_manager::PuffinManagerFactory; + +/// Enumerates types of predicates for value filtering. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Predicate { + /// Predicate for matching values in a list. + InList(InListPredicate), +} + +/// `InListPredicate` contains a list of acceptable values. A value needs to match at least +/// one of the elements (logical OR semantic) for the predicate to be satisfied. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct InListPredicate { + /// List of acceptable values. + pub list: HashSet, +} + +pub struct BloomFilterIndexApplierBuilder<'a> { + region_dir: String, + object_store: ObjectStore, + metadata: &'a RegionMetadata, + puffin_manager_factory: PuffinManagerFactory, + file_cache: Option, + puffin_metadata_cache: Option, + bloom_filter_index_cache: Option, + output: HashMap>, +} + +impl<'a> BloomFilterIndexApplierBuilder<'a> { + pub fn new( + region_dir: String, + object_store: ObjectStore, + metadata: &'a RegionMetadata, + puffin_manager_factory: PuffinManagerFactory, + ) -> Self { + Self { + region_dir, + object_store, + metadata, + puffin_manager_factory, + file_cache: None, + puffin_metadata_cache: None, + bloom_filter_index_cache: None, + output: HashMap::default(), + } + } + + pub fn with_file_cache(mut self, file_cache: Option) -> Self { + self.file_cache = file_cache; + self + } + + pub fn with_puffin_metadata_cache( + mut self, + puffin_metadata_cache: Option, + ) -> Self { + self.puffin_metadata_cache = puffin_metadata_cache; + self + } + + pub fn with_bloom_filter_index_cache( + mut self, + bloom_filter_index_cache: Option, + ) -> Self { + self.bloom_filter_index_cache = bloom_filter_index_cache; + self + } + + /// Builds the applier with given filter expressions + pub fn build(mut self, exprs: &[Expr]) -> Result> { + for expr in exprs { + self.traverse_and_collect(expr); + } + + if self.output.is_empty() { + return Ok(None); + } + + let applier = BloomFilterIndexApplier::new( + self.region_dir, + self.metadata.region_id, + self.object_store, + self.puffin_manager_factory, + self.output, + ) + .with_file_cache(self.file_cache) + .with_puffin_metadata_cache(self.puffin_metadata_cache) + .with_bloom_filter_cache(self.bloom_filter_index_cache); + + Ok(Some(applier)) + } + + /// Recursively traverses expressions to collect bloom filter predicates + fn traverse_and_collect(&mut self, expr: &Expr) { + let res = match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { + Operator::And => { + self.traverse_and_collect(left); + self.traverse_and_collect(right); + Ok(()) + } + Operator::Eq => self.collect_eq(left, right), + _ => Ok(()), + }, + Expr::InList(in_list) => self.collect_in_list(in_list), + _ => Ok(()), + }; + + if let Err(err) = res { + warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}"); + } + } + + /// Helper function to get the column id and type + fn column_id_and_type( + &self, + column_name: &str, + ) -> Result> { + let column = self + .metadata + .column_by_name(column_name) + .context(ColumnNotFoundSnafu { + column: column_name, + })?; + + Ok(Some(( + column.column_id, + column.column_schema.data_type.clone(), + ))) + } + + /// Collects an equality expression (column = value) + fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> { + let (col, lit) = match (left, right) { + (Expr::Column(col), Expr::Literal(lit)) => (col, lit), + (Expr::Literal(lit), Expr::Column(col)) => (col, lit), + _ => return Ok(()), + }; + if lit.is_null() { + return Ok(()); + } + let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else { + return Ok(()); + }; + let value = encode_lit(lit, data_type)?; + + // Create bloom filter predicate + let mut set = HashSet::new(); + set.insert(value); + let predicate = Predicate::InList(InListPredicate { list: set }); + + // Add to output predicates + self.output.entry(column_id).or_default().push(predicate); + + Ok(()) + } + + /// Collects an in list expression in the form of `column IN (lit, lit, ...)`. + fn collect_in_list(&mut self, in_list: &InList) -> Result<()> { + // Only collect InList predicates if they reference a column + let Expr::Column(column) = &in_list.expr.as_ref() else { + return Ok(()); + }; + if in_list.list.is_empty() || in_list.negated { + return Ok(()); + } + + let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else { + return Ok(()); + }; + + // Convert all non-null literals to predicates + let predicates = in_list + .list + .iter() + .filter_map(Self::nonnull_lit) + .map(|lit| encode_lit(lit, data_type.clone())); + + // Collect successful conversions + let mut valid_predicates = HashSet::new(); + for predicate in predicates { + match predicate { + Ok(p) => { + valid_predicates.insert(p); + } + Err(e) => warn!(e; "Failed to convert value in InList"), + } + } + + if !valid_predicates.is_empty() { + self.output + .entry(column_id) + .or_default() + .push(Predicate::InList(InListPredicate { + list: valid_predicates, + })); + } + + Ok(()) + } + + /// Helper function to get non-null literal value + fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> { + match expr { + Expr::Literal(lit) if !lit.is_null() => Some(lit), + _ => None, + } + } +} + +// TODO(ruihang): extract this and the one under inverted_index into a common util mod. +/// Helper function to encode a literal into bytes. +fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result> { + let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?; + let mut bytes = vec![]; + let field = SortField::new(data_type); + IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?; + Ok(bytes) +} + +#[cfg(test)] +mod tests { + use api::v1::SemanticType; + use datafusion_common::Column; + use datatypes::schema::ColumnSchema; + use object_store::services::Memory; + use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::*; + + fn test_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "column1", + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "column2", + ConcreteDataType::int64_datatype(), + false, + ), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "column3", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![1]); + builder.build().unwrap() + } + + fn test_object_store() -> ObjectStore { + ObjectStore::new(Memory::default()).unwrap().finish() + } + + fn column(name: &str) -> Expr { + Expr::Column(Column { + relation: None, + name: name.to_string(), + }) + } + + fn string_lit(s: impl Into) -> Expr { + Expr::Literal(ScalarValue::Utf8(Some(s.into()))) + } + + #[test] + fn test_build_with_exprs() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(string_lit("value1")), + })]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + assert_eq!(filters.len(), 1); + + let column_predicates = filters.get(&1).unwrap(); + assert_eq!(column_predicates.len(), 1); + + let expected = encode_lit( + &ScalarValue::Utf8(Some("value1".to_string())), + ConcreteDataType::string_datatype(), + ) + .unwrap(); + match &column_predicates[0] { + Predicate::InList(p) => { + assert_eq!(p.list.iter().next().unwrap(), &expected); + } + } + } + + fn int64_lit(i: i64) -> Expr { + Expr::Literal(ScalarValue::Int64(Some(i))) + } + + #[test] + fn test_build_with_in_list() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![Expr::InList(InList { + expr: Box::new(column("column2")), + list: vec![int64_lit(1), int64_lit(2), int64_lit(3)], + negated: false, + })]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + let column_predicates = filters.get(&2).unwrap(); + assert_eq!(column_predicates.len(), 1); + + match &column_predicates[0] { + Predicate::InList(p) => { + assert_eq!(p.list.len(), 3); + } + } + } + + #[test] + fn test_build_with_and_expressions() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(string_lit("value1")), + })), + op: Operator::And, + right: Box::new(Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column2")), + op: Operator::Eq, + right: Box::new(int64_lit(42)), + })), + })]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + assert_eq!(filters.len(), 2); + assert!(filters.contains_key(&1)); + assert!(filters.contains_key(&2)); + } + + #[test] + fn test_build_with_null_values() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![ + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Utf8(None))), + }), + Expr::InList(InList { + expr: Box::new(column("column2")), + list: vec![ + int64_lit(1), + Expr::Literal(ScalarValue::Int64(None)), + int64_lit(3), + ], + negated: false, + }), + ]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + assert!(!filters.contains_key(&1)); // Null equality should be ignored + let column2_predicates = filters.get(&2).unwrap(); + match &column2_predicates[0] { + Predicate::InList(p) => { + assert_eq!(p.list.len(), 2); // Only non-null values should be included + } + } + } + + #[test] + fn test_build_with_invalid_expressions() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![ + // Non-equality operator + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Gt, + right: Box::new(string_lit("value1")), + }), + // Non-existent column + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("non_existent")), + op: Operator::Eq, + right: Box::new(string_lit("value")), + }), + // Negated IN list + Expr::InList(InList { + expr: Box::new(column("column2")), + list: vec![int64_lit(1), int64_lit(2)], + negated: true, + }), + ]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_build_with_multiple_predicates_same_column() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![ + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(string_lit("value1")), + }), + Expr::InList(InList { + expr: Box::new(column("column1")), + list: vec![string_lit("value2"), string_lit("value3")], + negated: false, + }), + ]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + let column_predicates = filters.get(&1).unwrap(); + assert_eq!(column_predicates.len(), 2); + } +} diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index d1b73a0bde25..18c4d6dc8cba 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -321,7 +321,7 @@ impl BloomFilterIndexer { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use std::iter; use api::v1::SemanticType; @@ -341,11 +341,11 @@ mod tests { use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::index::puffin_manager::PuffinManagerFactory; - fn mock_object_store() -> ObjectStore { + pub fn mock_object_store() -> ObjectStore { ObjectStore::new(Memory::default()).unwrap().finish() } - async fn new_intm_mgr(path: impl AsRef) -> IntermediateManager { + pub async fn new_intm_mgr(path: impl AsRef) -> IntermediateManager { IntermediateManager::init_fs(path).await.unwrap() } @@ -365,7 +365,7 @@ mod tests { /// - index: bloom filter /// - granularity: 4 /// - column_id: 3 - fn mock_region_metadata() -> RegionMetadataRef { + pub fn mock_region_metadata() -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); builder .push_column_metadata(ColumnMetadata { @@ -410,7 +410,7 @@ mod tests { Arc::new(builder.build().unwrap()) } - fn new_batch(str_tag: impl AsRef, u64_field: impl IntoIterator) -> Batch { + pub fn new_batch(str_tag: impl AsRef, u64_field: impl IntoIterator) -> Batch { let fields = vec![SortField::new(ConcreteDataType::string_datatype())]; let codec = McmpRowCodec::new(fields); let row: [ValueRef; 1] = [str_tag.as_ref().into()]; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 579f80a433cc..f15f2759df0e 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -570,6 +570,66 @@ impl ParquetReaderBuilder { true } + async fn prune_row_groups_by_bloom_filter( + &self, + parquet_meta: &ParquetMetaData, + output: &mut BTreeMap>, + metrics: &mut ReaderFilterMetrics, + ) -> bool { + let Some(index_applier) = &self.bloom_filter_index_applier else { + return false; + }; + + if !self.file_handle.meta_ref().bloom_filter_index_available() { + return false; + } + + let file_size_hint = self.file_handle.meta_ref().bloom_filter_index_size(); + let apply_output = match index_applier + .apply( + self.file_handle.file_id(), + file_size_hint, + parquet_meta + .row_groups() + .iter() + .enumerate() + .map(|(i, rg)| (rg.num_rows() as usize, output.contains_key(&i))), + ) + .await + { + Ok(apply_output) => apply_output, + Err(err) => { + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {:?}", + self.file_handle.region_id(), + self.file_handle.file_id(), + err + ); + } else { + warn!( + err; "Failed to apply bloom filter index, region_id: {}, file_id: {}", + self.file_handle.region_id(), self.file_handle.file_id() + ); + } + + return false; + } + }; + + Self::prune_row_groups_by_ranges( + parquet_meta, + apply_output + .into_iter() + .map(|(rg, ranges)| (rg, ranges.into_iter())), + output, + &mut metrics.rg_bloom_filtered, + &mut metrics.rows_bloom_filtered, + ); + + true + } + /// Prunes row groups by rows. The `rows_in_row_groups` is like a map from row group to /// a list of row ids to keep. fn prune_row_groups_by_rows( @@ -623,56 +683,6 @@ impl ParquetReaderBuilder { *output = res; } - async fn prune_row_groups_by_bloom_filter( - &self, - parquet_meta: &ParquetMetaData, - output: &mut BTreeMap>, - metrics: &mut ReaderFilterMetrics, - ) -> bool { - let Some(index_applier) = &self.bloom_filter_index_applier else { - return false; - }; - - if !self.file_handle.meta_ref().bloom_filter_index_available() { - return false; - } - - let before_rg = output.len(); - - let file_size_hint = self.file_handle.meta_ref().bloom_filter_index_size(); - if let Err(err) = index_applier - .apply( - self.file_handle.file_id(), - file_size_hint, - parquet_meta.row_groups(), - output, - ) - .await - { - if cfg!(any(test, feature = "test")) { - panic!( - "Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {:?}", - self.file_handle.region_id(), - self.file_handle.file_id(), - err - ); - } else { - warn!( - err; "Failed to apply bloom filter index, region_id: {}, file_id: {}", - self.file_handle.region_id(), self.file_handle.file_id() - ); - } - - return false; - }; - - let after_rg = output.len(); - // Update metrics. - metrics.rg_bloom_filtered += before_rg - after_rg; - - true - } - /// Prunes row groups by ranges. The `ranges_in_row_groups` is like a map from row group to /// a list of row ranges to keep. fn prune_row_groups_by_ranges( diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 63c3fc09d621..ecce870d851e 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -14,7 +14,6 @@ //! Utilities for testing SSTs. -use std::num::NonZeroU64; use std::sync::Arc; use api::v1::{OpType, SemanticType}; diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 68534d34eeb8..9b98fbf026f4 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -15,7 +15,6 @@ //! Utilities to mock version. use std::collections::HashMap; -use std::num::NonZeroU64; use std::sync::Arc; use api::v1::value::ValueData; From 72b299eabde054ccd094f63d94c6fc6a1f67d854 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 6 Jan 2025 07:02:45 +0000 Subject: [PATCH 2/3] add case Signed-off-by: Zhenchi --- src/index/src/bloom_filter/applier.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index 47fc9926318f..2fd7126b3860 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -116,6 +116,11 @@ mod tests { 0..12, vec![0..8], ), // search multiple rows in multiple ranges + ( + vec![b"row01".to_vec(), b"row11".to_vec()], + 0..12, + vec![0..4, 8..12], + ), // search multiple rows in multiple ranges (vec![b"row99".to_vec()], 0..12, vec![]), // search for a row that doesn't exist in the full range (vec![b"row00".to_vec()], 12..12, vec![]), // search in an empty range ( From b2484294b65cfff2715a765148954a3ae204b222 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 6 Jan 2025 09:04:22 +0000 Subject: [PATCH 3/3] address TODO Signed-off-by: Zhenchi --- src/index/src/bloom_filter/reader.rs | 2 +- .../src/cache/index/bloom_filter_index.rs | 20 ++++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index 4a49d321ecd5..2a52f9d0711b 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -65,7 +65,7 @@ pub trait BloomFilterReader: Sync { } async fn bloom_filter_vec( - &mut self, + &self, locs: &[BloomFilterSegmentLocation], ) -> Result> { let ranges = locs diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index 1ba2fa405996..1d600336a4b6 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; +use futures::future::try_join_all; use index::bloom_filter::error::Result; use index::bloom_filter::reader::BloomFilterReader; use index::bloom_filter::BloomFilterMeta; @@ -102,9 +103,22 @@ impl BloomFilterReader for CachedBloomFilterIndexBl .map(|b| b.into()) } - async fn read_vec(&mut self, ranges: &[Range]) -> Result> { - // TODO(zhongzc): wait for #5276 to make it utilize cache. - Ok(self.inner.read_vec(ranges).await?) + async fn read_vec(&self, ranges: &[Range]) -> Result> { + let fetch = ranges.iter().map(|range| { + let inner = &self.inner; + self.cache.get_or_load( + (self.file_id, self.column_id), + self.blob_size, + range.start, + (range.end - range.start) as u32, + move |ranges| async move { inner.read_vec(&ranges).await }, + ) + }); + Ok(try_join_all(fetch) + .await? + .into_iter() + .map(Bytes::from) + .collect::>()) } /// Reads the meta information of the bloom filter.