From c09ffa3f39754fc93e0c6ce667369a5c2fe2bc10 Mon Sep 17 00:00:00 2001 From: zaidoon Date: Mon, 1 Sep 2025 09:02:56 -0400 Subject: [PATCH 1/2] Add prefix bloom filter support --- benches/run_reader.rs | 267 +++ src/abstract.rs | 4 +- src/blob_tree/mod.rs | 4 +- src/compaction/worker.rs | 1 + src/config.rs | 34 +- src/lib.rs | 3 + src/metrics.rs | 27 +- src/multi_reader.rs | 12 +- src/prefix.rs | 300 +++ src/range.rs | 14 +- src/run_reader.rs | 240 ++- src/segment/filter/blocked_bloom/mod.rs | 23 + src/segment/filter/standard_bloom/mod.rs | 23 + src/segment/inner.rs | 6 +- src/segment/mod.rs | 248 ++- src/segment/writer/mod.rs | 24 +- src/tree/ingest.rs | 1 + src/tree/mod.rs | 12 +- tests/mvcc_slab.rs | 4 +- tests/prefix_filter.rs | 2211 ++++++++++++++++++++++ 20 files changed, 3389 insertions(+), 69 deletions(-) create mode 100644 benches/run_reader.rs create mode 100644 src/prefix.rs create mode 100644 tests/prefix_filter.rs diff --git a/benches/run_reader.rs b/benches/run_reader.rs new file mode 100644 index 00000000..6f15d6ac --- /dev/null +++ b/benches/run_reader.rs @@ -0,0 +1,267 @@ +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; +use lsm_tree::prefix::FixedPrefixExtractor; +use lsm_tree::{AbstractTree, Config}; +use std::sync::Arc; +use std::time::Instant; +use tempfile::TempDir; + +fn create_tree_with_segments( + segment_count: usize, + with_prefix_extractor: bool, +) -> (TempDir, lsm_tree::Tree) { + let tempdir = tempfile::tempdir().unwrap(); + + let mut config = Config::new(&tempdir); + if with_prefix_extractor { + config = config.prefix_extractor(Arc::new(FixedPrefixExtractor::new(8))); + } + + let tree = config.open().unwrap(); + + // Create segments with distinct prefixes + for segment_idx in 0..segment_count { + let prefix = format!("seg{:04}", segment_idx); + + // Add 100 keys per segment + for key_idx in 0..100 { + let key = format!("{}_{:04}", prefix, key_idx); + tree.insert(key.as_bytes(), vec![0u8; 100], 0); + } + + // Flush to create a segment + tree.flush_active_memtable(0).unwrap(); + } + + (tempdir, tree) +} + +fn benchmark_range_query(c: &mut Criterion) { + let mut group = c.benchmark_group("range_query"); + + // Test different segment counts + for segment_count in [10, 100, 500, 1000] { + // Benchmark without prefix extractor + group.bench_with_input( + BenchmarkId::new("no_prefix", segment_count), + &segment_count, + |b, &count| { + let (_tempdir, tree) = create_tree_with_segments(count, false); + + b.iter(|| { + // Query for a range that doesn't exist + let start: &[u8] = b"zzz_0000"; + let end: &[u8] = b"zzz_9999"; + let iter = tree.range(start..=end, 0, None); + // Force evaluation by counting + let count = iter.count(); + black_box(count); + }); + }, + ); + + // Benchmark with prefix extractor + group.bench_with_input( + BenchmarkId::new("with_prefix", segment_count), + &segment_count, + |b, &count| { + let (_tempdir, tree) = create_tree_with_segments(count, true); + + b.iter(|| { + // Query for a range that doesn't exist (will check filters) + let start: &[u8] = b"zzz_0000"; + let end: &[u8] = b"zzz_9999"; + let iter = tree.range(start..=end, 0, None); + // Force evaluation by counting + let count = iter.count(); + black_box(count); + }); + }, + ); + + // Benchmark with prefix extractor - existing prefix + group.bench_with_input( + BenchmarkId::new("with_prefix_exists", segment_count), + &segment_count, + |b, &count| { + let (_tempdir, tree) = create_tree_with_segments(count, true); + + b.iter(|| { + // Query for a range that exists in the middle + let mid = count / 2; + let prefix = format!("seg{:04}", mid); + let start_str = format!("{}_0000", prefix); + let end_str = format!("{}_0099", prefix); + let start: &[u8] = start_str.as_bytes(); + let end: &[u8] = end_str.as_bytes(); + let iter = tree.range(start..=end, 0, None); + // Force evaluation by counting + let count = iter.count(); + black_box(count); + }); + }, + ); + } + + group.finish(); +} + +fn benchmark_timing_comparison(_c: &mut Criterion) { + println!("\n=== RunReader Performance Benchmark ==="); + println!("Testing impact of prefix filter checks on large runs\n"); + + for segment_count in [100, 500, 1000] { + println!("\n--- Testing with {} segments ---", segment_count); + + // Test without prefix extractor + let (_tempdir_no_prefix, tree_no_prefix) = create_tree_with_segments(segment_count, false); + + let start = Instant::now(); + for _ in 0..100 { + let start_key: &[u8] = b"zzz_0000"; + let end_key: &[u8] = b"zzz_9999"; + let iter = tree_no_prefix.range(start_key..=end_key, 0, None); + let _ = iter.count(); + } + let no_prefix_time = start.elapsed(); + let avg_no_prefix = no_prefix_time.as_nanos() / 100; + + println!(" Without prefix extractor: {:>8} ns/query", avg_no_prefix); + + // Test with prefix extractor + let (_tempdir_with_prefix, tree_with_prefix) = + create_tree_with_segments(segment_count, true); + + let start = Instant::now(); + for _ in 0..100 { + let start_key: &[u8] = b"zzz_0000"; + let end_key: &[u8] = b"zzz_9999"; + let iter = tree_with_prefix.range(start_key..=end_key, 0, None); + let _ = iter.count(); + } + let with_prefix_time = start.elapsed(); + let avg_with_prefix = with_prefix_time.as_nanos() / 100; + + println!( + " With prefix extractor: {:>8} ns/query", + avg_with_prefix + ); + + if avg_with_prefix > avg_no_prefix { + let overhead = avg_with_prefix - avg_no_prefix; + println!( + " Overhead: {} ns ({:.1}%)", + overhead, + (overhead as f64 / avg_no_prefix as f64) * 100.0 + ); + } else { + let savings = avg_no_prefix - avg_with_prefix; + println!( + " Savings: {} ns ({:.1}%)", + savings, + (savings as f64 / avg_no_prefix as f64) * 100.0 + ); + } + + // Check CPU cost per segment + if segment_count > 0 { + let per_segment_overhead = if avg_with_prefix > avg_no_prefix { + (avg_with_prefix - avg_no_prefix) / segment_count as u128 + } else { + 0 + }; + println!(" Per-segment overhead: ~{} ns", per_segment_overhead); + } + } + + println!("\n=== Summary ==="); + println!("MAX_UPFRONT_CHECKS optimization limits overhead to checking at most 10 segments."); + println!( + "For runs with >10 segments, remaining segments are filtered lazily during iteration.\n" + ); +} + +fn run_timing_benchmark() { + println!("\n=== RunReader Performance Benchmark ==="); + println!("Testing impact of prefix filter checks on large runs\n"); + + for segment_count in [100, 500, 1000] { + println!("\n--- Testing with {} segments ---", segment_count); + + // Test without prefix extractor + let (_tempdir_no_prefix, tree_no_prefix) = create_tree_with_segments(segment_count, false); + + let start = Instant::now(); + for _ in 0..100 { + let start_key: &[u8] = b"zzz_0000"; + let end_key: &[u8] = b"zzz_9999"; + let iter = tree_no_prefix.range(start_key..=end_key, 0, None); + let _ = iter.count(); + } + let no_prefix_time = start.elapsed(); + let avg_no_prefix = no_prefix_time.as_nanos() / 100; + + println!(" Without prefix extractor: {:>8} ns/query", avg_no_prefix); + + // Test with prefix extractor + let (_tempdir_with_prefix, tree_with_prefix) = + create_tree_with_segments(segment_count, true); + + let start = Instant::now(); + for _ in 0..100 { + let start_key: &[u8] = b"zzz_0000"; + let end_key: &[u8] = b"zzz_9999"; + let iter = tree_with_prefix.range(start_key..=end_key, 0, None); + let _ = iter.count(); + } + let with_prefix_time = start.elapsed(); + let avg_with_prefix = with_prefix_time.as_nanos() / 100; + + println!( + " With prefix extractor: {:>8} ns/query", + avg_with_prefix + ); + + if avg_with_prefix > avg_no_prefix { + let overhead = avg_with_prefix - avg_no_prefix; + println!( + " Overhead: {} ns ({:.1}%)", + overhead, + (overhead as f64 / avg_no_prefix as f64) * 100.0 + ); + } else { + let savings = avg_no_prefix - avg_with_prefix; + println!( + " Savings: {} ns ({:.1}%)", + savings, + (savings as f64 / avg_no_prefix as f64) * 100.0 + ); + } + + // Check CPU cost per segment + if segment_count > 0 { + let per_segment_overhead = if avg_with_prefix > avg_no_prefix { + (avg_with_prefix - avg_no_prefix) / segment_count as u128 + } else { + 0 + }; + println!(" Per-segment overhead: ~{} ns", per_segment_overhead); + } + } + + println!("\n=== Summary ==="); + println!("MAX_UPFRONT_CHECKS optimization limits overhead to checking at most 10 segments."); + println!( + "For runs with >10 segments, remaining segments are filtered lazily during iteration.\n" + ); +} + +fn benchmark_all(c: &mut Criterion) { + // Run standard benchmarks + benchmark_range_query(c); + + // Run the detailed timing comparison + run_timing_benchmark(); +} + +criterion_group!(benches, benchmark_range_query); +criterion_main!(benches); diff --git a/src/abstract.rs b/src/abstract.rs index 8b35c36e..22a6871c 100644 --- a/src/abstract.rs +++ b/src/abstract.rs @@ -85,8 +85,8 @@ pub trait AbstractTree { /// Will return `Err` if an IO error occurs. fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()>; - /// Gets the memory usage of all pinned bloom filters in the tree. - fn pinned_bloom_filter_size(&self) -> usize; + /// Gets the memory usage of all pinned filters in the tree. + fn pinned_filter_size(&self) -> usize; /// Gets the memory usage of all pinned index blocks in the tree. fn pinned_block_index_size(&self) -> usize; diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index a42c7cca..26d1076b 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -395,8 +395,8 @@ impl AbstractTree for BlobTree { })) } - fn pinned_bloom_filter_size(&self) -> usize { - self.index.pinned_bloom_filter_size() + fn pinned_filter_size(&self) -> usize { + self.index.pinned_filter_size() } fn pinned_block_index_size(&self) -> usize { diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index c01c563f..09cb5d9b 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -364,6 +364,7 @@ fn merge_segments( opts.tree_id, opts.config.cache.clone(), opts.config.descriptor_table.clone(), + opts.config.prefix_extractor.clone(), payload.dest_level <= 1, // TODO: look at configuration payload.dest_level <= 2, // TODO: look at configuration #[cfg(feature = "metrics")] diff --git a/src/config.rs b/src/config.rs index a39d1200..6ff615a0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,7 +2,10 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use crate::{path::absolute_path, BlobTree, Cache, CompressionType, DescriptorTable, Tree}; +use crate::{ + path::absolute_path, prefix::SharedPrefixExtractor, BlobTree, Cache, CompressionType, + DescriptorTable, Tree, +}; use std::{ path::{Path, PathBuf}, sync::Arc, @@ -94,6 +97,10 @@ pub struct Config { /// Descriptor table to use #[doc(hidden)] pub descriptor_table: Arc, + + /// Prefix extractor for filters + #[doc(hidden)] + pub prefix_extractor: Option, } impl Default for Config { @@ -115,6 +122,7 @@ impl Default for Config { compression: CompressionType::None, blob_compression: CompressionType::None, bloom_bits_per_key: 10, + prefix_extractor: None, blob_file_target_size: /* 64 MiB */ 64 * 1_024 * 1_024, blob_file_separation_threshold: /* 4 KiB */ 4 * 1_024, @@ -312,6 +320,30 @@ impl Config { self } + /// Sets the prefix extractor for filters. + /// + /// A prefix extractor allows filters to index prefixes of keys + /// instead of (or in addition to) the full keys. This enables efficient + /// filtering for prefix-based queries. + /// + /// # Example + /// + /// ``` + /// # use lsm_tree::Config; + /// use lsm_tree::prefix::FixedPrefixExtractor; + /// use std::sync::Arc; + /// + /// # let path = tempfile::tempdir()?; + /// let config = Config::new(path) + /// .prefix_extractor(Arc::new(FixedPrefixExtractor::new(8))); + /// # Ok::<(), Box>(()) + /// ``` + #[must_use] + pub fn prefix_extractor(mut self, extractor: SharedPrefixExtractor) -> Self { + self.prefix_extractor = Some(extractor); + self + } + /// Opens a tree using the config. /// /// # Errors diff --git a/src/lib.rs b/src/lib.rs index 0411c38a..99e123e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -182,6 +182,9 @@ pub mod mvcc_stream; mod path; +/// Prefix extraction for filters +pub mod prefix; + #[doc(hidden)] pub mod range; diff --git a/src/metrics.rs b/src/metrics.rs index 5c56fc90..53dda3c8 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -13,11 +13,11 @@ pub struct Metrics { /// Number of blocks that were read from block cache pub(crate) block_load_cached: AtomicUsize, - /// Number of bloom filter queries that were performed - pub(crate) bloom_filter_queries: AtomicUsize, + /// Number of filter queries that were performed + pub(crate) filter_queries: AtomicUsize, - /// Number of IOs that were skipped due to bloom filter hits - pub(crate) bloom_filter_hits: AtomicUsize, + /// Number of IOs that were skipped due to filter + pub(crate) io_skipped_by_filter: AtomicUsize, } #[allow(clippy::cast_precision_loss)] @@ -40,9 +40,20 @@ impl Metrics { } /// Filter efficiency in percent (0.0 - 1.0). - pub fn bloom_filter_efficiency(&self) -> f64 { - let queries = self.bloom_filter_queries.load(Relaxed) as f64; - let hits = self.bloom_filter_hits.load(Relaxed) as f64; - hits / queries + /// Represents the ratio of I/O operations avoided due to filter. + pub fn filter_efficiency(&self) -> f64 { + let queries = self.filter_queries.load(Relaxed) as f64; + let io_skipped = self.io_skipped_by_filter.load(Relaxed) as f64; + io_skipped / queries + } + + /// Number of filter queries performed. + pub fn filter_queries(&self) -> usize { + self.filter_queries.load(Relaxed) + } + + /// Number of I/O operations skipped by filter. + pub fn io_skipped_by_filter(&self) -> usize { + self.io_skipped_by_filter.load(Relaxed) } } diff --git a/src/multi_reader.rs b/src/multi_reader.rs index 9ce0fe5a..2ff363d2 100644 --- a/src/multi_reader.rs +++ b/src/multi_reader.rs @@ -89,7 +89,9 @@ mod tests { let mut readers: VecDeque<_> = VecDeque::new(); for segment in &segments { - readers.push_back(segment.iter()); + if let Some(iter) = segment.iter() { + readers.push_back(iter); + } } let multi_reader = MultiReader::new(readers); @@ -115,7 +117,9 @@ mod tests { let mut readers: VecDeque<_> = VecDeque::new(); for segment in &segments { - readers.push_back(segment.iter()); + if let Some(iter) = segment.iter() { + readers.push_back(iter); + } } let multi_reader = MultiReader::new(readers); @@ -141,7 +145,9 @@ mod tests { let mut readers: VecDeque<_> = VecDeque::new(); for segment in &segments { - readers.push_back(segment.iter()); + if let Some(iter) = segment.iter() { + readers.push_back(iter); + } } let multi_reader = MultiReader::new(readers); diff --git a/src/prefix.rs b/src/prefix.rs new file mode 100644 index 00000000..11fc6aca --- /dev/null +++ b/src/prefix.rs @@ -0,0 +1,300 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use std::sync::Arc; + +/// Trait for extracting prefixes from keys for prefix filters. +/// +/// A prefix extractor allows the filter to index prefixes of keys +/// instead of (or in addition to) the full keys. This enables efficient +/// filtering for prefix-based queries. +/// +/// # Examples +/// +/// ## Simple fixed-length prefix: +/// ``` +/// use lsm_tree::prefix::PrefixExtractor; +/// +/// struct FixedPrefixExtractor(usize); +/// +/// impl PrefixExtractor for FixedPrefixExtractor { +/// fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { +/// Box::new(std::iter::once(key.get(0..self.0).unwrap_or(key))) +/// } +/// +/// fn name(&self) -> &str { +/// "fixed_prefix" +/// } +/// } +/// ``` +/// +/// ## Segmented prefixes (e.g., `account_id#user_id)`: +/// ``` +/// use lsm_tree::prefix::PrefixExtractor; +/// +/// struct SegmentedPrefixExtractor; +/// +/// impl PrefixExtractor for SegmentedPrefixExtractor { +/// fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { +/// let mut prefixes = vec![]; +/// let mut end = 0; +/// for (i, &byte) in key.iter().enumerate() { +/// if byte == b'#' { +/// prefixes.push(&key[0..i]); +/// end = i; +/// } +/// } +/// if end < key.len() { +/// prefixes.push(key); +/// } +/// Box::new(prefixes.into_iter()) +/// } +/// +/// fn name(&self) -> &str { +/// "segmented_prefix" +/// } +/// } +/// ``` +pub trait PrefixExtractor: Send + Sync { + /// Extracts zero or more prefixes from a key. + /// + /// All prefixes will be added to the filter during segment construction. + /// + /// An empty iterator means the key is "out of domain" and won't be added to the filter. + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a>; + + /// Returns a unique name for this prefix extractor. + fn name(&self) -> &str; +} + +/// A prefix extractor that returns the full key. +/// +/// This is the default behavior if no prefix extractor is specified. +pub struct FullKeyExtractor; + +impl PrefixExtractor for FullKeyExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + Box::new(std::iter::once(key)) + } + + fn name(&self) -> &'static str { + "full_key" + } +} + +/// A prefix extractor that returns a fixed-length prefix. +/// +/// If the key is shorter than the prefix length, returns the full key. +pub struct FixedPrefixExtractor { + length: usize, +} + +impl FixedPrefixExtractor { + /// Creates a new fixed-length prefix extractor. + #[must_use] + pub fn new(length: usize) -> Self { + Self { length } + } +} + +impl PrefixExtractor for FixedPrefixExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + if key.len() <= self.length { + Box::new(std::iter::once(key)) + } else if let Some(prefix) = key.get(0..self.length) { + Box::new(std::iter::once(prefix)) + } else { + Box::new(std::iter::empty()) + } + } + + fn name(&self) -> &'static str { + "fixed_prefix" + } +} + +/// A prefix extractor that requires keys to be at least a certain length. +/// +/// Keys shorter than the required length are considered "out of domain" +/// and won't be added to the filter. This matches `RocksDB`'s behavior. +pub struct FixedLengthExtractor { + length: usize, +} + +impl FixedLengthExtractor { + /// Creates a new fixed-length extractor. + #[must_use] + pub fn new(length: usize) -> Self { + Self { length } + } +} + +impl PrefixExtractor for FixedLengthExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + if key.len() < self.length { + // Key is too short - out of domain + Box::new(std::iter::empty()) + } else if let Some(prefix) = key.get(0..self.length) { + Box::new(std::iter::once(prefix)) + } else { + Box::new(std::iter::empty()) + } + } + + fn name(&self) -> &'static str { + "fixed_length" + } +} + +/// Examples of custom multi-prefix extractors. +/// +/// Users can implement their own prefix extractors that return multiple prefixes. +/// The filter will include all returned prefixes. +/// +/// # Example +/// +/// ``` +/// use lsm_tree::prefix::PrefixExtractor; +/// use std::sync::Arc; +/// +/// // Example 1: Hierarchical prefix extractor based on delimiter +/// // For key "user/123/data" with delimiter '/', generates: +/// // - "user" +/// // - "user/123" +/// // - "user/123/data" (full key) +/// struct HierarchicalPrefixExtractor { +/// delimiter: u8, +/// } +/// +/// impl PrefixExtractor for HierarchicalPrefixExtractor { +/// fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { +/// let delimiter = self.delimiter; +/// let mut prefixes = Vec::new(); +/// +/// // Generate all prefixes up to each delimiter +/// for (i, &byte) in key.iter().enumerate() { +/// if byte == delimiter { +/// prefixes.push(&key[0..i]); +/// } +/// } +/// +/// // Always include the full key +/// prefixes.push(key); +/// +/// Box::new(prefixes.into_iter()) +/// } +/// +/// fn name(&self) -> &str { +/// "hierarchical_prefix" +/// } +/// } +/// +/// // Example 2: Extract domain prefix for flipped email keys +/// // For "example.com@user", this extracts: +/// // - "example.com" (domain prefix for range scans) +/// // - "example.com@user" (full key for point lookups) +/// struct DomainPrefixExtractor; +/// +/// impl PrefixExtractor for DomainPrefixExtractor { +/// fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { +/// if let Ok(key_str) = std::str::from_utf8(key) { +/// if let Some(at_pos) = key_str.find('@') { +/// // Return both domain prefix and full key +/// let domain_prefix = &key[..at_pos]; +/// return Box::new(vec![domain_prefix, key].into_iter()); +/// } +/// } +/// // If not a flipped email format, just return the full key +/// Box::new(std::iter::once(key)) +/// } +/// +/// fn name(&self) -> &str { +/// "domain_prefix" +/// } +/// } +/// +/// // Usage: +/// # let path = tempfile::tempdir()?; +/// let tree = lsm_tree::Config::new(path) +/// .prefix_extractor(Arc::new(HierarchicalPrefixExtractor { delimiter: b'/' })) +/// .open()?; +/// # Ok::<(), Box>(()) +/// ``` +/// Type alias for a shared prefix extractor +pub type SharedPrefixExtractor = Arc; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_full_key_extractor() { + let extractor = FullKeyExtractor; + let key = b"test_key"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"test_key".as_ref())); + } + + #[test] + fn test_fixed_prefix_extractor() { + let extractor = FixedPrefixExtractor::new(5); + + // Key longer than prefix + let key = b"longer_key"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"longe".as_ref())); + + // Key shorter than prefix + let key = b"key"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"key".as_ref())); + + // Key exactly prefix length + let key = b"exact"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"exact".as_ref())); + } + + #[test] + fn test_empty_key() { + let full_key = FullKeyExtractor; + let fixed = FixedPrefixExtractor::new(5); + + let key = b""; + + let prefixes: Vec<_> = full_key.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"".as_ref())); + + let prefixes: Vec<_> = fixed.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"".as_ref())); + } + + #[test] + fn test_fixed_length_extractor() { + let extractor = FixedLengthExtractor::new(5); + + // Key shorter than required length - out of domain + let key = b"abc"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 0); // Empty iterator + + // Key exactly required length + let key = b"exact"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"exact".as_ref())); + + // Key longer than required length + let key = b"longer_key"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"longe".as_ref())); + } +} diff --git a/src/range.rs b/src/range.rs index 3449234a..99602edc 100644 --- a/src/range.rs +++ b/src/range.rs @@ -227,15 +227,15 @@ impl TreeIter { range.start_bound().map(|x| &*x.user_key), range.end_bound().map(|x| &*x.user_key), )) { - let reader = segment.range(( + if let Some(reader) = segment.range(( range.start_bound().map(|x| &x.user_key).cloned(), range.end_bound().map(|x| &x.user_key).cloned(), - )); - - iters.push(Box::new(reader.filter(move |item| match item { - Ok(item) => seqno_filter(item.key.seqno, seqno), - Err(_) => true, - }))); + )) { + iters.push(Box::new(reader.filter(move |item| match item { + Ok(item) => seqno_filter(item.key.seqno, seqno), + Err(_) => true, + }))); + } } } _ => { diff --git a/src/run_reader.rs b/src/run_reader.rs index e5eb75d9..109a2a5d 100644 --- a/src/run_reader.rs +++ b/src/run_reader.rs @@ -29,6 +29,45 @@ impl RunReader { let (lo, hi) = run.range_indexes(&range)?; + // Early optimization: Skip prefix filter checks if no prefix extractor is configured + // Check the first segment to see if any segments have a prefix extractor + // (all segments in a run should have the same configuration) + let has_prefix_extractor = run + .get(lo) + .map(|seg| seg.has_prefix_extractor()) + .unwrap_or(false); + + if has_prefix_extractor { + // Only perform prefix filter checks if a prefix extractor is configured + // This avoids unnecessary CPU work for large scans when prefix filtering isn't used + let segments_in_range = run.get(lo..=hi)?; + let mut has_potential_match = false; + + // For large scans, we limit the number of segments we check upfront + // to avoid excessive CPU usage. The lazy loading during iteration + // will handle filtering the rest. + const MAX_UPFRONT_CHECKS: usize = 10; + + for (idx, segment) in segments_in_range.iter().enumerate() { + // Check if segment might contain data for this range + if segment.might_contain_range(&range) { + has_potential_match = true; + break; + } + + // For very large runs, don't check all segments upfront + // The lazy iterator will handle skipping segments as needed + if idx >= MAX_UPFRONT_CHECKS { + has_potential_match = true; // Assume there might be matches + break; + } + } + + if !has_potential_match { + return None; + } + } + Some(Self::culled(run, range, (Some(lo), Some(hi)), cache_policy)) } @@ -44,14 +83,15 @@ impl RunReader { // TODO: lazily init readers? let lo_segment = run.deref().get(lo).expect("should exist"); - let lo_reader = lo_segment.range(range.clone())/* .cache_policy(cache_policy) */; + let lo_reader = lo_segment + .range(range.clone()) /* .cache_policy(cache_policy) */ + .map(|x| Box::new(x) as BoxedIterator); - // TODO: lazily init readers? let hi_reader = if hi > lo { let hi_segment = run.deref().get(hi).expect("should exist"); - Some( - hi_segment.range(range), /* .cache_policy(cache_policy) */ - ) + hi_segment + .range(range) /* .cache_policy(cache_policy) */ + .map(|x| Box::new(x) as BoxedIterator) } else { None }; @@ -60,8 +100,8 @@ impl RunReader { run, lo, hi, - lo_reader: Some(Box::new(lo_reader)), - hi_reader: hi_reader.map(|x| Box::new(x) as BoxedIterator), + lo_reader, + hi_reader, cache_policy, } } @@ -82,9 +122,22 @@ impl Iterator for RunReader { self.lo += 1; if self.lo < self.hi { - self.lo_reader = Some(Box::new( - self.run.get(self.lo).expect("should exist").iter(), - ) /* .cache_policy(self.cache_policy) */); + // Lazily check next segment for potential matches + // This avoids unnecessary I/O for segments that won't contain our prefix + loop { + if self.lo >= self.hi { + break; + } + + let segment = self.run.get(self.lo).expect("should exist"); + if let Some(reader) = segment.iter() { + self.lo_reader = Some(Box::new(reader) as BoxedIterator); + break; + } + + // Skip this segment as it doesn't contain our range + self.lo += 1; + } } } else if let Some(hi_reader) = &mut self.hi_reader { // NOTE: We reached the hi marker, so consume from it instead @@ -111,9 +164,21 @@ impl DoubleEndedIterator for RunReader { self.hi -= 1; if self.lo < self.hi { - self.hi_reader = Some(Box::new( - self.run.get(self.hi).expect("should exist").iter(), - ) /* .cache_policy(self.cache_policy) */); + // Lazily check prev segment for potential matches + loop { + if self.hi <= self.lo { + break; + } + + let segment = self.run.get(self.hi).expect("should exist"); + if let Some(reader) = segment.iter() { + self.hi_reader = Some(Box::new(reader) as BoxedIterator); + break; + } + + // Skip this segment as it doesn't contain our range + self.hi -= 1; + } } } else if let Some(lo_reader) = &mut self.lo_reader { // NOTE: We reached the lo marker, so consume from it instead @@ -131,7 +196,8 @@ impl DoubleEndedIterator for RunReader { #[allow(clippy::expect_used)] mod tests { use super::*; - use crate::{AbstractTree, Slice}; + use crate::{AbstractTree, Config, Slice}; + use std::sync::Arc; use test_log::test; #[test] @@ -292,4 +358,150 @@ mod tests { Ok(()) } + + #[test] + fn test_run_reader_prefix_filtering() -> crate::Result<()> { + use crate::prefix::FixedPrefixExtractor; + + let tempdir = tempfile::tempdir()?; + let tree = Config::new(&tempdir) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(3))) + .open()?; + + // Create segments with different prefixes + let prefixes = [ + ["aaa_1", "aaa_2", "aaa_3"], + ["bbb_1", "bbb_2", "bbb_3"], + ["ccc_1", "ccc_2", "ccc_3"], + ["ddd_1", "ddd_2", "ddd_3"], + ]; + + for batch in prefixes { + for id in batch { + tree.insert(id, vec![], 0); + } + tree.flush_active_memtable(0)?; + } + + let segments = tree + .manifest + .read() + .expect("lock is poisoned") + .iter() + .cloned() + .collect::>(); + + let run = Arc::new(Run::new(segments)); + + // Test 1: Query for non-existent prefix should return None + assert!( + RunReader::new( + run.clone(), + UserKey::from("zzz_1")..=UserKey::from("zzz_9"), + CachePolicy::Read + ) + .is_none(), + "Should return None for non-existent prefix" + ); + + // Test 2: Query for existing prefix should return reader + let reader = RunReader::new( + run.clone(), + UserKey::from("bbb_1")..=UserKey::from("bbb_3"), + CachePolicy::Read, + ); + assert!(reader.is_some(), "Should return reader for existing prefix"); + + if let Some(reader) = reader { + let items: Vec<_> = reader.flatten().map(|item| item.key.user_key).collect(); + assert_eq!(items.len(), 3); + assert_eq!(items.first(), Some(&Slice::from(*b"bbb_1"))); + assert_eq!(items.get(1), Some(&Slice::from(*b"bbb_2"))); + assert_eq!(items.get(2), Some(&Slice::from(*b"bbb_3"))); + } + + // Test 3: Range query across prefixes with no common prefix + let reader = RunReader::new( + run, + UserKey::from("aaa_3")..=UserKey::from("bbb_1"), + CachePolicy::Read, + ); + // Should still work since segments contain the range + assert!(reader.is_some()); + + Ok(()) + } + + #[test] + fn test_run_reader_lazy_segment_loading() -> crate::Result<()> { + use crate::prefix::FixedPrefixExtractor; + + let tempdir = tempfile::tempdir()?; + let tree = Config::new(&tempdir) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(4))) + .open()?; + + // Create many segments with distinct prefixes + let prefixes = [ + ["pre1_a", "pre1_b", "pre1_c"], + ["pre2_a", "pre2_b", "pre2_c"], + ["pre3_a", "pre3_b", "pre3_c"], + ["pre4_a", "pre4_b", "pre4_c"], + ["pre5_a", "pre5_b", "pre5_c"], + ["pre6_a", "pre6_b", "pre6_c"], + ]; + + for batch in prefixes { + for id in batch { + tree.insert(id, vec![], 0); + } + tree.flush_active_memtable(0)?; + } + + let segments = tree + .manifest + .read() + .expect("lock is poisoned") + .iter() + .cloned() + .collect::>(); + + let run = Arc::new(Run::new(segments)); + + // Query for a specific prefix in the middle + // Should skip segments without the prefix lazily + let reader = RunReader::new( + run.clone(), + UserKey::from("pre4_a")..=UserKey::from("pre4_c"), + CachePolicy::Read, + ); + + assert!(reader.is_some()); + + if let Some(reader) = reader { + let items: Vec<_> = reader.flatten().map(|item| item.key.user_key).collect(); + assert_eq!(items.len(), 3); + assert_eq!(items.first(), Some(&Slice::from(*b"pre4_a"))); + assert_eq!(items.get(1), Some(&Slice::from(*b"pre4_b"))); + assert_eq!(items.get(2), Some(&Slice::from(*b"pre4_c"))); + } + + // Query for prefix at the beginning + let reader = RunReader::new( + run.clone(), + UserKey::from("pre1_a")..=UserKey::from("pre1_c"), + CachePolicy::Read, + ); + assert!(reader.is_some()); + + // Query for prefix at the end + let reader = RunReader::new( + run, + UserKey::from("pre6_a")..=UserKey::from("pre6_c"), + CachePolicy::Read, + ); + assert!(reader.is_some()); + + Ok(()) + } } diff --git a/src/segment/filter/blocked_bloom/mod.rs b/src/segment/filter/blocked_bloom/mod.rs index 70d50234..58c6daa0 100644 --- a/src/segment/filter/blocked_bloom/mod.rs +++ b/src/segment/filter/blocked_bloom/mod.rs @@ -128,6 +128,29 @@ impl<'a> BlockedBloomFilterReader<'a> { pub fn contains(&self, key: &[u8]) -> bool { self.contains_hash(Self::get_hash(key)) } + + /// Returns `true` if any prefix of the key may be contained. + /// + /// Returns `None` if the key is out of domain. + #[must_use] + pub fn contains_prefix( + &self, + key: &[u8], + extractor: &dyn crate::prefix::PrefixExtractor, + ) -> Option { + let mut prefixes = extractor.extract(key); + + // Check if iterator is empty (out of domain) + let first = prefixes.next()?; + + // Check first prefix + if self.contains_hash(Self::get_hash(first)) { + return Some(true); + } + + // Check remaining prefixes + Some(prefixes.any(|prefix| self.contains_hash(Self::get_hash(prefix)))) + } } // impl<'a> Encode for BlockedBloomFilter<'a> { diff --git a/src/segment/filter/standard_bloom/mod.rs b/src/segment/filter/standard_bloom/mod.rs index b505a949..ec5d8102 100644 --- a/src/segment/filter/standard_bloom/mod.rs +++ b/src/segment/filter/standard_bloom/mod.rs @@ -110,6 +110,29 @@ impl<'a> StandardBloomFilterReader<'a> { self.contains_hash(Self::get_hash(key)) } + /// Returns `true` if any prefix of the key may be contained. + /// + /// Returns `None` if the key is out of domain. + #[must_use] + pub fn contains_prefix( + &self, + key: &[u8], + extractor: &dyn crate::prefix::PrefixExtractor, + ) -> Option { + let mut prefixes = extractor.extract(key); + + // Check if iterator is empty (out of domain) + let first = prefixes.next()?; + + // Check first prefix + if self.contains_hash(Self::get_hash(first)) { + return Some(true); + } + + // Check remaining prefixes + Some(prefixes.any(|prefix| self.contains_hash(Self::get_hash(prefix)))) + } + /// Returns `true` if the bit at `idx` is `1`. fn has_bit(&self, idx: usize) -> bool { self.inner.get(idx) diff --git a/src/segment/inner.rs b/src/segment/inner.rs index a41db211..739345e3 100644 --- a/src/segment/inner.rs +++ b/src/segment/inner.rs @@ -7,7 +7,8 @@ use crate::metrics::Metrics; use super::{block_index::BlockIndexImpl, meta::ParsedMeta, regions::ParsedRegions, Block}; use crate::{ - cache::Cache, descriptor_table::DescriptorTable, tree::inner::TreeId, GlobalSegmentId, + cache::Cache, descriptor_table::DescriptorTable, prefix::SharedPrefixExtractor, + tree::inner::TreeId, GlobalSegmentId, }; use std::{ path::PathBuf, @@ -43,6 +44,9 @@ pub struct Inner { /// Pinned AMQ filter pub pinned_filter_block: Option, + /// Prefix extractor for filters + pub prefix_extractor: Option, + // /// Pinned filter // #[doc(hidden)] // pub bloom_filter: Option, diff --git a/src/segment/mod.rs b/src/segment/mod.rs index ad335037..228e2ed6 100644 --- a/src/segment/mod.rs +++ b/src/segment/mod.rs @@ -31,6 +31,7 @@ use crate::metrics::Metrics; use crate::{ cache::Cache, descriptor_table::DescriptorTable, + prefix::{PrefixExtractor, SharedPrefixExtractor}, segment::block::{BlockType, ParsedItem}, CompressionType, InternalValue, SeqNo, TreeId, UserKey, }; @@ -104,7 +105,7 @@ impl Segment { } #[must_use] - pub fn pinned_bloom_filter_size(&self) -> usize { + pub fn pinned_filter_size(&self) -> usize { self.pinned_filter_block .as_ref() .map(Block::size) @@ -179,11 +180,22 @@ impl Segment { let filter = StandardBloomFilterReader::new(&block.data)?; #[cfg(feature = "metrics")] - self.metrics.bloom_filter_queries.fetch_add(1, Relaxed); + self.metrics.filter_queries.fetch_add(1, Relaxed); + + let may_contain = if let Some(ref extractor) = self.prefix_extractor { + // If prefix extractor is configured, use prefix-based filtering + // None means out-of-domain - these keys bypass filter + filter + .contains_prefix(key, extractor.as_ref()) + .unwrap_or(true) + } else { + // No prefix extractor, use standard hash-based filtering + filter.contains_hash(key_hash) + }; - if !filter.contains_hash(key_hash) { + if !may_contain { #[cfg(feature = "metrics")] - self.metrics.bloom_filter_hits.fetch_add(1, Relaxed); + self.metrics.io_skipped_by_filter.fetch_add(1, Relaxed); return Ok(None); } @@ -196,11 +208,22 @@ impl Segment { let filter = StandardBloomFilterReader::new(&block.data)?; #[cfg(feature = "metrics")] - self.metrics.bloom_filter_queries.fetch_add(1, Relaxed); + self.metrics.filter_queries.fetch_add(1, Relaxed); + + let may_contain = if let Some(ref extractor) = self.prefix_extractor { + // If prefix extractor is configured, use prefix-based filtering + // None means out-of-domain - these keys bypass filter + filter + .contains_prefix(key, extractor.as_ref()) + .unwrap_or(true) + } else { + // No prefix extractor, use standard hash-based filtering + filter.contains_hash(key_hash) + }; - if !filter.contains_hash(key_hash) { + if !may_contain { #[cfg(feature = "metrics")] - self.metrics.bloom_filter_hits.fetch_add(1, Relaxed); + self.metrics.io_skipped_by_filter.fetch_add(1, Relaxed); return Ok(None); } @@ -296,11 +319,144 @@ impl Segment { #[must_use] #[allow(clippy::iter_without_into_iter)] #[doc(hidden)] - pub fn iter(&self) -> impl DoubleEndedIterator> { + pub fn iter(&self) -> Option>> { self.range(..) } + /// Returns true if the prefix filter indicates the prefix doesn't exist. + /// This is used to potentially skip segments during range queries. + /// Only works when a prefix extractor is configured. + fn should_skip_by_prefix_filter(&self, key: &[u8]) -> bool { + use filter::standard_bloom::StandardBloomFilterReader; + #[cfg(feature = "metrics")] + use std::sync::atomic::Ordering::Relaxed; + + let Some(ref prefix_extractor) = self.prefix_extractor else { + return false; + }; + + // Try pinned filter block first + if let Some(block) = &self.pinned_filter_block { + if let Ok(filter) = StandardBloomFilterReader::new(&block.data) { + #[cfg(feature = "metrics")] + self.metrics.filter_queries.fetch_add(1, Relaxed); + + // Returns true if prefix is NOT in filter (should skip) + return !filter + .contains_prefix(key, prefix_extractor.as_ref()) + .unwrap_or(true); + } + } + + // Fall back to loading filter block from disk + if let Some(filter_block_handle) = &self.regions.filter { + if let Ok(block) = self.load_block( + filter_block_handle, + BlockType::Filter, + CompressionType::None, + ) { + if let Ok(filter) = StandardBloomFilterReader::new(&block.data) { + #[cfg(feature = "metrics")] + self.metrics.filter_queries.fetch_add(1, Relaxed); + + return !filter + .contains_prefix(key, prefix_extractor.as_ref()) + .unwrap_or(true); + } + } + } + + false + } + + /// Extracts the common prefix from a range's start and end bounds for filter checking. + /// Returns the prefix as a slice if both bounds share the same prefix. + fn extract_common_prefix_for_filter<'a, R: RangeBounds>( + &self, + range: &'a R, + prefix_extractor: &dyn PrefixExtractor, + ) -> Option<&'a [u8]> { + let (start_key, end_key) = match (range.start_bound(), range.end_bound()) { + (Bound::Included(s) | Bound::Excluded(s), Bound::Included(e) | Bound::Excluded(e)) => { + (s.as_ref(), Some(e.as_ref())) + } + (Bound::Included(s) | Bound::Excluded(s), Bound::Unbounded) => (s.as_ref(), None), + (Bound::Unbounded, Bound::Included(e) | Bound::Excluded(e)) => (e.as_ref(), None), + _ => return None, + }; + + // For single bound or when end is unbounded, use that key's prefix + if end_key.is_none() { + return prefix_extractor.extract(start_key).next(); + } + + // Both bounds exist - check if they share the same prefix + if let Some(end) = end_key { + let start_prefix = prefix_extractor.extract(start_key).next()?; + let end_prefix = prefix_extractor.extract(end).next()?; + + if start_prefix == end_prefix { + return Some(start_prefix); + } + } + + None + } + + /// Checks if this segment can be skipped for the given range based on prefix filter. + /// Returns true if the segment should be skipped. + /// Only applicable when a prefix extractor is configured. + fn should_skip_range_by_prefix_filter>(&self, range: &R) -> bool { + // Early return if no prefix extractor is configured + let Some(ref prefix_extractor) = self.prefix_extractor else { + return false; + }; + + // First try: Check filter using common prefix from range bounds + if let Some(common_prefix) = + self.extract_common_prefix_for_filter(range, &**prefix_extractor) + { + if self.should_skip_by_prefix_filter(common_prefix) { + #[cfg(feature = "metrics")] + self.metrics + .io_skipped_by_filter + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + return true; + } + } else { + // Second try: No common prefix, but we can still try to optimize using the start bound + if let Some(start_key) = match range.start_bound() { + Bound::Included(key) | Bound::Excluded(key) => Some(key.as_ref()), + Bound::Unbounded => None, + } { + // Extract prefix from start bound + if let Some(start_prefix) = prefix_extractor.extract(start_key).next() { + // Check if this segment's minimum key would fall in the prefix range + // If the segment's min key >= start_key and the start prefix doesn't exist, + // we can potentially skip this segment + let min_key = self.metadata.key_range.min(); + + // Extract prefix from segment's minimum key + if let Some(min_prefix) = prefix_extractor.extract(min_key).next() { + if min_prefix == start_prefix + && self.should_skip_by_prefix_filter(start_prefix) + { + #[cfg(feature = "metrics")] + self.metrics + .io_skipped_by_filter + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + return true; + } + } + } + } + } + + false + } + /// Creates a ranged iterator over the `Segment`. + /// Returns None if the filter indicates no keys with the common prefix exist. /// /// # Errors /// @@ -311,10 +467,15 @@ impl Segment { pub fn range>( &self, range: R, - ) -> impl DoubleEndedIterator> { + ) -> Option>> { use crate::fallible_clipping_iter::FallibleClippingIter; use block_index::iter::create_index_block_reader; + // Check prefix filter to see if we can skip this segment entirely + if self.should_skip_range_by_prefix_filter(&range) { + return None; + } + // TODO: enum_dispatch BlockIndex::iter let index_block = match &*self.block_index { BlockIndexImpl::Full(idx) => idx.inner(), @@ -332,7 +493,6 @@ impl Segment { }; let index_iter = create_index_block_reader(index_block.clone()); - let mut iter = Iter::new( self.global_id(), self.path.clone(), @@ -344,21 +504,16 @@ impl Segment { self.metrics.clone(), ); - match range.start_bound() { - Bound::Excluded(key) | Bound::Included(key) => { - iter.set_lower_bound(key.clone()); - } - Bound::Unbounded => {} + // Set normal iterator bounds based on range + if let Bound::Excluded(key) | Bound::Included(key) = range.start_bound() { + iter.set_lower_bound(key.clone()); } - match range.end_bound() { - Bound::Excluded(key) | Bound::Included(key) => { - iter.set_upper_bound(key.clone()); - } - Bound::Unbounded => {} + if let Bound::Excluded(key) | Bound::Included(key) = range.end_bound() { + iter.set_upper_bound(key.clone()); } - FallibleClippingIter::new(iter, range) + Some(FallibleClippingIter::new(iter, range)) } /// Tries to recover a segment from a file. @@ -367,6 +522,7 @@ impl Segment { tree_id: TreeId, cache: Arc, descriptor_table: Arc, + prefix_extractor: Option, pin_filter: bool, pin_index: bool, #[cfg(feature = "metrics")] metrics: Arc, @@ -470,6 +626,8 @@ impl Segment { pinned_filter_block, + prefix_extractor, + is_deleted: AtomicBool::default(), #[cfg(feature = "metrics")] @@ -495,10 +653,36 @@ impl Segment { self.metadata.key_range.overlaps_with_bounds(bounds) } + /// Returns the seqno range of the `Segment`. + #[must_use] + pub fn seqno_range(&self) -> (SeqNo, SeqNo) { + self.0.metadata.seqnos + } + /// Returns the highest sequence number in the segment. #[must_use] pub fn get_highest_seqno(&self) -> SeqNo { - self.metadata.seqnos.1 + self.0.metadata.seqnos.1 + } + + /// Returns true if this segment has a prefix extractor configured. + #[must_use] + pub fn has_prefix_extractor(&self) -> bool { + self.prefix_extractor.is_some() + } + + /// Checks if this segment might contain data for the given range. + /// Returns false only if we can definitively rule out the segment using filters. + /// Returns true if the segment might contain data (or if we can't determine). + #[must_use] + pub fn might_contain_range>(&self, range: &R) -> bool { + // If no prefix extractor, we can't use filter optimization + if self.prefix_extractor.is_none() { + return true; + } + + // Check if we can skip this segment based on filter + !self.should_skip_range_by_prefix_filter(range) } /// Returns the amount of tombstone markers in the `Segment`. @@ -550,6 +734,7 @@ mod tests { 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), + None, true, true, #[cfg(feature = "metrics")] @@ -646,6 +831,7 @@ mod tests { 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), + None, true, true, #[cfg(feature = "metrics")] @@ -703,6 +889,7 @@ mod tests { 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), + None, true, true, #[cfg(feature = "metrics")] @@ -718,10 +905,12 @@ mod tests { "should use full index, so only TLI exists", ); - assert_eq!(items, &*segment.iter().flatten().collect::>()); + let iter = segment.iter().unwrap(); + assert_eq!(items, &*iter.flatten().collect::>()); + let iter = segment.iter().unwrap(); assert_eq!( items.iter().rev().cloned().collect::>(), - &*segment.iter().rev().flatten().collect::>(), + &*iter.rev().flatten().collect::>(), ); } @@ -759,6 +948,7 @@ mod tests { 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), + None, true, true, #[cfg(feature = "metrics")] @@ -778,6 +968,7 @@ mod tests { items.iter().skip(1).cloned().collect::>(), &*segment .range(UserKey::from("b")..) + .unwrap() .flatten() .collect::>() ); @@ -786,6 +977,7 @@ mod tests { items.iter().skip(1).rev().cloned().collect::>(), &*segment .range(UserKey::from("b")..) + .unwrap() .rev() .flatten() .collect::>(), @@ -826,6 +1018,7 @@ mod tests { 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), + None, true, true, #[cfg(feature = "metrics")] @@ -842,7 +1035,8 @@ mod tests { ); let mut iter = segment - .range(UserKey::from(5u64.to_be_bytes())..UserKey::from(10u64.to_be_bytes())); + .range(UserKey::from(5u64.to_be_bytes())..UserKey::from(10u64.to_be_bytes())) + .unwrap(); let mut count = 0; @@ -901,6 +1095,7 @@ mod tests { 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), + None, true, true, #[cfg(feature = "metrics")] @@ -920,6 +1115,7 @@ mod tests { items.iter().skip(1).take(3).cloned().collect::>(), &*segment .range(UserKey::from("b")..=UserKey::from("d")) + .unwrap() .flatten() .collect::>() ); @@ -934,6 +1130,7 @@ mod tests { .collect::>(), &*segment .range(UserKey::from("b")..=UserKey::from("d")) + .unwrap() .rev() .flatten() .collect::>(), @@ -970,6 +1167,7 @@ mod tests { 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), + None, false, true, #[cfg(feature = "metrics")] diff --git a/src/segment/writer/mod.rs b/src/segment/writer/mod.rs index 56a4386a..85473783 100644 --- a/src/segment/writer/mod.rs +++ b/src/segment/writer/mod.rs @@ -8,6 +8,7 @@ use super::{ use crate::{ coding::Encode, file::fsync_directory, + prefix::SharedPrefixExtractor, segment::{filter::standard_bloom::Builder, index_block::BlockHandle, regions::ParsedRegions}, time::unix_timestamp, CompressionType, InternalValue, SegmentId, UserKey, @@ -62,6 +63,9 @@ pub struct Writer { /// /// using enhanced double hashing, so we got two u64s pub bloom_hash_buffer: Vec, + + /// Prefix extractor for filters + pub prefix_extractor: Option, } impl Writer { @@ -99,6 +103,7 @@ impl Writer { bloom_policy: BloomConstructionPolicy::default(), bloom_hash_buffer: Vec::new(), + prefix_extractor: None, }) } @@ -131,6 +136,12 @@ impl Writer { self } + #[must_use] + pub fn use_prefix_extractor(mut self, extractor: SharedPrefixExtractor) -> Self { + self.prefix_extractor = Some(extractor); + self + } + /// Writes an item. /// /// # Note @@ -153,8 +164,17 @@ impl Writer { // of the same key if self.bloom_policy.is_active() { - self.bloom_hash_buffer - .push(Builder::get_hash(&item.key.user_key)); + if let Some(ref extractor) = self.prefix_extractor { + // Add all prefixes to filter + // If extract returns empty iterator (out of domain), nothing is added + for prefix in extractor.extract(&item.key.user_key) { + self.bloom_hash_buffer.push(Builder::get_hash(prefix)); + } + } else { + // Default behavior: add full key hash + self.bloom_hash_buffer + .push(Builder::get_hash(&item.key.user_key)); + } } } diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index 63ec5932..bf6ab873 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -71,6 +71,7 @@ impl<'a> Ingestion<'a> { self.tree.id, self.tree.config.cache.clone(), self.tree.config.descriptor_table.clone(), + self.tree.config.prefix_extractor.clone(), false, false, #[cfg(feature = "metrics")] diff --git a/src/tree/mod.rs b/src/tree/mod.rs index a96026a2..b265c2af 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -186,13 +186,13 @@ impl AbstractTree for Tree { Ok(self.get(key, seqno)?.map(|x| x.len() as u32)) } - fn pinned_bloom_filter_size(&self) -> usize { + fn pinned_filter_size(&self) -> usize { self.manifest .read() .expect("lock is poisoned") .current_version() .iter_segments() - .map(Segment::pinned_bloom_filter_size) + .map(Segment::pinned_filter_size) .sum() } @@ -263,6 +263,10 @@ impl AbstractTree for Tree { } }); + if let Some(ref extractor) = self.config.prefix_extractor { + segment_writer = segment_writer.use_prefix_extractor(extractor.clone()); + } + let iter = memtable.iter().map(Ok); let compaction_filter = CompactionStream::new(iter, seqno_threshold); @@ -564,6 +568,7 @@ impl Tree { self.id, self.config.cache.clone(), self.config.descriptor_table.clone(), + self.config.prefix_extractor.clone(), true, // TODO: look at configuration true, // TODO: look at configuration #[cfg(feature = "metrics")] @@ -858,6 +863,7 @@ impl Tree { tree_id, &config.cache, &config.descriptor_table, + &config.prefix_extractor, #[cfg(feature = "metrics")] &metrics, )?; @@ -922,6 +928,7 @@ impl Tree { tree_id: TreeId, cache: &Arc, descriptor_table: &Arc, + prefix_extractor: &Option, #[cfg(feature = "metrics")] metrics: &Arc, ) -> crate::Result { use crate::{file::fsync_directory, file::SEGMENTS_FOLDER, SegmentId}; @@ -988,6 +995,7 @@ impl Tree { tree_id, cache.clone(), descriptor_table.clone(), + prefix_extractor.clone(), level_idx <= 2, // TODO: look at configuration level_idx <= 2, // TODO: look at configuration #[cfg(feature = "metrics")] diff --git a/tests/mvcc_slab.rs b/tests/mvcc_slab.rs index 21d2a0ef..01b75547 100644 --- a/tests/mvcc_slab.rs +++ b/tests/mvcc_slab.rs @@ -32,7 +32,7 @@ fn segment_reader_mvcc_slab() -> lsm_tree::Result<()> { .first() .expect("segment should exist"); - let reader = segment.iter(); + let reader = segment.iter().unwrap(); assert_eq!(reader.count(), ITEM_COUNT + 1); Ok(()) @@ -69,7 +69,7 @@ fn segment_reader_mvcc_slab_blob() -> lsm_tree::Result<()> { .first() .expect("segment should exist"); - let reader = segment.iter(); + let reader = segment.iter().unwrap(); assert_eq!(reader.count(), ITEM_COUNT + 1); Ok(()) diff --git a/tests/prefix_filter.rs b/tests/prefix_filter.rs new file mode 100644 index 00000000..4d85e409 --- /dev/null +++ b/tests/prefix_filter.rs @@ -0,0 +1,2211 @@ +use lsm_tree::{ + prefix::{FixedLengthExtractor, FixedPrefixExtractor, FullKeyExtractor, PrefixExtractor}, + AbstractTree, Config, Guard, +}; +use std::sync::Arc; + +// Helper function to generate test keys with prefixes +fn generate_test_key(prefix: &str, suffix: &str) -> Vec { + format!("{}{}", prefix, suffix).into_bytes() +} + +#[test] +fn test_prefix_filter_with_fixed_prefix() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 8; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert keys with common prefixes + let prefix1 = "prefix01"; + let prefix2 = "prefix02"; + + for i in 0..100 { + let key1 = generate_test_key(prefix1, &format!("_{:04}", i)); + let key2 = generate_test_key(prefix2, &format!("_{:04}", i)); + + tree.insert(key1, b"value1", 0); + tree.insert(key2, b"value2", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Test that keys with matching prefixes are found + for i in 0..100 { + let key1 = generate_test_key(prefix1, &format!("_{:04}", i)); + let key2 = generate_test_key(prefix2, &format!("_{:04}", i)); + + assert!(tree.contains_key(&key1, u64::MAX)?); + assert!(tree.contains_key(&key2, u64::MAX)?); + } + + // Test that keys with non-matching prefixes work correctly + let non_existent_key = generate_test_key("prefix99", "_0000"); + assert!(!tree.contains_key(&non_existent_key, u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + // We should have at least 201 filter queries (200 existing keys + 1 non-existent) + assert!( + final_queries > initial_queries, + "filter queries should have increased" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_with_fixed_length() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let required_len = 10; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedLengthExtractor::new(required_len))) + .open()?; + + // Insert keys with exactly the required length prefix + for i in 0..50 { + let key = format!("exactlen{:02}_suffix_{}", i, i); + tree.insert(key.as_bytes(), b"value", 0); + } + + // Insert keys that are too short (out of domain) + for i in 0..20 { + let short_key = format!("key{}", i); + tree.insert(short_key.as_bytes(), b"short_value", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Verify keys with matching length are found + for i in 0..50 { + let key = format!("exactlen{:02}_suffix_{}", i, i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + + // Verify short keys are also found (they're stored but not in filter) + for i in 0..20 { + let short_key = format!("key{}", i); + assert!(tree.contains_key(short_key.as_bytes(), u64::MAX)?); + } + + // Verify non-existent prefix is quickly rejected + // Use a key that matches the required length to ensure it's in-domain + let range = tree.range("nonexist00".."nonexist99", u64::MAX, None); + assert_eq!(range.count(), 0); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + // Should have filter queries for all lookups + assert!( + final_queries > initial_queries, + "filter queries should have increased" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_full_key() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Using FullKeyExtractor (default behavior) + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FullKeyExtractor)) + .open()?; + + // Insert various keys + let keys = vec![ + b"apple".to_vec(), + b"banana".to_vec(), + b"cherry".to_vec(), + b"date".to_vec(), + b"elderberry".to_vec(), + ]; + + for key in &keys { + tree.insert(key.clone(), b"value", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // All keys should be found + for key in &keys { + assert!(tree.contains_key(key, u64::MAX)?); + } + + // Non-existent key test + assert!(!tree.contains_key(b"fig", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + // Should have filter queries for in-domain keys + assert!( + final_queries > initial_queries, + "filter queries should increase for in-domain keys" + ); + } + assert!(!tree.contains_key(b"kiwi", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + // Should have queries for all lookups (5 existing + 2 non-existent) + assert!( + final_queries > initial_queries, + "filter queries should have increased" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_range_queries() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 5; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert keys with common prefixes + let prefixes = vec!["user_", "post_", "comm_"]; + + for prefix in &prefixes { + for i in 0..20 { + let key = format!("{}{:04}", prefix, i); + tree.insert(key.as_bytes(), format!("value_{}", i).as_bytes(), 0); + } + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Test prefix iteration + for prefix in &prefixes { + let start_key = prefix.to_string(); + let end_key = format!("{}~", prefix); // '~' is after all digits and letters + + let count = tree + .range(start_key.as_bytes()..end_key.as_bytes(), u64::MAX, None) + .count(); + assert_eq!(count, 20); + } + + // Test non-existent prefix range + let count = tree + .range(&b"none_"[..]..&b"none~"[..], u64::MAX, None) + .count(); + assert_eq!(count, 0); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + assert!( + final_queries > initial_queries, + "filter queries should have increased" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_after_compaction() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 6; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert first batch of keys + for i in 0..50 { + let key = format!("batch1_{:04}", i); + tree.insert(key.as_bytes(), b"value1", 0); + } + + tree.flush_active_memtable(0)?; + + // Insert second batch with overlapping keys + for i in 25..75 { + let key = format!("batch1_{:04}", i); + tree.insert(key.as_bytes(), b"value2", 0); + } + + tree.flush_active_memtable(0)?; + + // Force compaction + use lsm_tree::compaction::Leveled; + tree.compact(Arc::new(Leveled::default()), 0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.0.metrics.io_skipped_by_filter(); + + // All keys should still be found after compaction + for i in 0..75 { + let key = format!("batch1_{:04}", i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + let final_hits = tree.0.metrics.io_skipped_by_filter(); + + // Should have filter queries for post-compaction lookups + assert!( + final_queries > initial_queries, + "filter queries should have increased after compaction" + ); + + // All keys exist, so hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_with_deletions() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 7; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert keys + for i in 0..100 { + let key = format!("deltest_{:04}", i); + tree.insert(key.as_bytes(), b"value", 0); + } + + tree.flush_active_memtable(0)?; + + // Delete some keys + for i in (0..100).step_by(2) { + let key = format!("deltest_{:04}", i); + tree.remove(key.as_bytes(), 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.0.metrics.io_skipped_by_filter(); + + // Verify deletions + for i in 0..100 { + let key = format!("deltest_{:04}", i); + if i % 2 == 0 { + assert!(!tree.contains_key(key.as_bytes(), u64::MAX)?); + } else { + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + let final_hits = tree.0.metrics.io_skipped_by_filter(); + + // Should have filter queries for all lookups after deletions + assert!( + final_queries > initial_queries, + "filter queries should have increased for deletion checks" + ); + + // Deleted keys still pass filter (tombstones), so hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase (deleted keys still in filter)" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_edge_cases() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Test with prefix length of 1 + let tree = Config::new(folder.path().join("test1")) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(1))) + .open()?; + + tree.insert(b"a", b"value", 0); + tree.insert(b"b", b"value", 0); + tree.insert(b"ab", b"value", 0); + tree.insert(b"ba", b"value", 0); + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + assert!(tree.contains_key(b"a", u64::MAX)?); + assert!(tree.contains_key(b"b", u64::MAX)?); + assert!(tree.contains_key(b"ab", u64::MAX)?); + assert!(tree.contains_key(b"ba", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + // Should have queries for both existing and non-existing keys + assert!( + final_queries > initial_queries, + "filter queries should have increased for point lookups" + ); + } + + // Test with empty keys + let tree2 = Config::new(folder.path().join("test2")) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(5))) + .open()?; + + tree2.insert(b"test", b"short_key", 0); + tree2.insert(b"longer_key", b"long_key", 0); + + tree2.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries2 = tree2.0.metrics.filter_queries(); + + assert!(tree2.contains_key(b"test", u64::MAX)?); + assert!(tree2.contains_key(b"longer_key", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries2 = tree2.0.metrics.filter_queries(); + assert!( + final_queries2 > initial_queries2, + "filter queries should have increased for short/long key lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_large_dataset() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 12; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert a large number of keys with various prefixes + let prefixes = vec![ + "transaction_", + "userprofile_", + "sessiondata_", + "logentryval_", + ]; + + for prefix in &prefixes { + for i in 0..1000 { + let key = format!("{}{:08}", prefix, i); + let value = format!("data_{}", i); + tree.insert(key.as_bytes(), value.as_bytes(), 0); + + // Flush periodically to create multiple segments + if i % 250 == 249 { + tree.flush_active_memtable(0)?; + } + } + } + + // Final flush + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Verify all keys are found + for prefix in &prefixes { + for i in 0..1000 { + let key = format!("{}{:08}", prefix, i); + assert!( + tree.contains_key(key.as_bytes(), u64::MAX)?, + "Key {} not found", + key + ); + } + } + + // Test non-existent keys with matching prefixes + for prefix in &prefixes { + let non_existent_key = format!("{}{:08}", prefix, 9999); + assert!(!tree.contains_key(non_existent_key.as_bytes(), u64::MAX)?); + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + // With multiple segments, we should have many filter queries + assert!( + final_queries > initial_queries, + "filter queries should have increased for large dataset" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_recovery() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 10; + + // Create and populate tree + { + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + for i in 0..100 { + let key = format!("persistent_{:04}", i); + tree.insert(key.as_bytes(), b"value", 0); + } + + tree.flush_active_memtable(0)?; + } + + // Reopen tree and verify filter still works + { + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + for i in 0..100 { + let key = format!("persistent_{:04}", i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + + // Non-existent keys should still be filtered + let non_existent = b"persistent_9999"; + assert!(!tree.contains_key(non_existent, u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + // After recovery, filters should still be working + assert!( + final_queries > initial_queries, + "filter queries should work after recovery" + ); + } + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_concurrent_access() -> lsm_tree::Result<()> { + use std::thread; + + let folder = tempfile::tempdir()?; + let prefix_len = 8; + + let tree = Arc::new( + Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?, + ); + + // Spawn multiple threads to insert data + let handles: Vec<_> = (0..4) + .map(|thread_id| { + let tree = Arc::clone(&tree); + thread::spawn(move || { + for i in 0..250 { + let key = format!("thread{:02}_{:04}", thread_id, i); + tree.insert(key.as_bytes(), b"value", 0); + } + }) + }) + .collect(); + + // Wait for all threads to complete + for handle in handles { + handle.join().unwrap(); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.0.metrics.io_skipped_by_filter(); + + // Verify all keys from all threads + for thread_id in 0..4 { + for i in 0..250 { + let key = format!("thread{:02}_{:04}", thread_id, i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + let final_hits = tree.0.metrics.io_skipped_by_filter(); + + // Should have filter queries for all concurrent lookups + assert!( + final_queries > initial_queries, + "filter queries should have increased for concurrent access" + ); + + // All keys exist, so hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_sequence_consistency() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 9; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert initial data with sequence number 0-49 + for i in 0..50 { + let key = format!("seqtest1_{:04}", i); + tree.insert(key.as_bytes(), b"v1", i as u64); + } + + tree.flush_active_memtable(0)?; + + // Insert more data with sequence numbers 50-99 + for i in 50..100 { + let key = format!("seqtest1_{:04}", i); + tree.insert(key.as_bytes(), b"v2", i as u64); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Verify that at sequence number 50, only the first 50 keys are visible + // (keys inserted at seqno 0-49 are visible at seqno >= their insert seqno) + for i in 0..50 { + let key = format!("seqtest1_{:04}", i); + assert!(tree.contains_key(key.as_bytes(), 50)?); + } + + for i in 50..100 { + let key = format!("seqtest1_{:04}", i); + assert!(!tree.contains_key(key.as_bytes(), 50)?); + } + + // Verify tree sees all data at max sequence number + for i in 0..100 { + let key = format!("seqtest1_{:04}", i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + // filter should be used for all lookups + assert!( + final_queries > initial_queries, + "filter queries should have increased for sequence consistency checks" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_seek_optimization() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 8; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert keys with specific prefixes + for i in 0..100 { + let key = format!("prefix_a_{:04}", i); + tree.insert(key.as_bytes(), b"value_a", 0); + } + + for i in 0..100 { + let key = format!("prefix_b_{:04}", i); + tree.insert(key.as_bytes(), b"value_b", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Seek with existing prefix should find keys + let range_a = tree.range("prefix_a_0000".."prefix_a_9999", u64::MAX, None); + assert_eq!(range_a.count(), 100); + + // Seek with non-existent prefix should return empty (optimized via filter) + let range_c = tree.range("prefix_c_0000".."prefix_c_9999", u64::MAX, None); + assert_eq!(range_c.count(), 0); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + // Range queries should trigger filter checks + assert!( + final_queries > initial_queries, + "filter queries should have increased for range operations" + ); + } + + // Verify partial prefix matches work + let range_partial = tree.range("prefix_a_0050".."prefix_a_0060", u64::MAX, None); + assert_eq!(range_partial.count(), 10); + + Ok(()) +} + +#[test] +fn test_no_prefix_extractor() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Create tree without prefix extractor (default behavior) + let tree = Config::new(&folder).open()?; + + // Insert various keys + for i in 0..100 { + let key = format!("noprefix_{:04}", i); + tree.insert(key.as_bytes(), b"value", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.0.metrics.io_skipped_by_filter(); + + // All keys should be found (full key matching) + for i in 0..100 { + let key = format!("noprefix_{:04}", i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + let final_hits = tree.0.metrics.io_skipped_by_filter(); + + // Should still have filter queries even without prefix extractor (uses full key) + assert!( + final_queries > initial_queries, + "filter queries should work without prefix extractor" + ); + + // All keys exist, so hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +// Custom segmented prefix extractor for account_id#user_id pattern +struct SegmentedPrefixExtractor { + delimiter: u8, +} + +impl SegmentedPrefixExtractor { + fn new(delimiter: u8) -> Self { + Self { delimiter } + } +} + +impl PrefixExtractor for SegmentedPrefixExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + let mut prefixes = Vec::new(); + + // Find the first delimiter position + if let Some(first_delim_pos) = key.iter().position(|&b| b == self.delimiter) { + // Add the prefix up to the first delimiter (account_id) + prefixes.push(&key[..first_delim_pos]); + + // Find the second delimiter position + if let Some(second_delim_pos) = key[first_delim_pos + 1..] + .iter() + .position(|&b| b == self.delimiter) + { + // Add the prefix up to the second delimiter (account_id#user_id) + let full_prefix_end = first_delim_pos + 1 + second_delim_pos; + prefixes.push(&key[..full_prefix_end]); + } else { + // If no second delimiter, use the entire key as prefix + prefixes.push(key); + } + } else { + // No delimiter found, use the entire key + prefixes.push(key); + } + + Box::new(prefixes.into_iter()) + } + + fn name(&self) -> &str { + "SegmentedPrefixExtractor" + } +} + +#[test] +fn test_prefix_filter_segmented_extractor() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let delimiter = b'#'; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(SegmentedPrefixExtractor::new(delimiter))) + .open()?; + + // Insert keys with account_id#user_id#data pattern + let account1 = "acc001"; + let account2 = "acc002"; + + // Insert users for account1 + for user_id in 1..=5 { + for data_id in 1..=10 { + let key = format!("{}#user{:03}#data{:04}", account1, user_id, data_id); + let value = format!("value_{}_{}", user_id, data_id); + tree.insert(key.as_bytes(), value.as_bytes(), 0); + } + } + + // Insert users for account2 + for user_id in 1..=3 { + for data_id in 1..=10 { + let key = format!("{}#user{:03}#data{:04}", account2, user_id, data_id); + let value = format!("value_{}_{}", user_id, data_id); + tree.insert(key.as_bytes(), value.as_bytes(), 0); + } + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Test 1: Query for specific user within account1 + let user_key = format!("{}#user002#data0005", account1); + assert!(tree.contains_key(user_key.as_bytes(), u64::MAX)?); + + // Test 2: Query for all data of a specific user (prefix range query) + let user_prefix_start = format!("{}#user002#", account1); + let user_prefix_end = format!("{}#user002~", account1); // ~ is after # + let user_range = tree.range( + user_prefix_start.as_bytes()..user_prefix_end.as_bytes(), + u64::MAX, + None, + ); + assert_eq!(user_range.count(), 10); // Should find 10 data items for this user + + // Test 3: Query for all users in account1 (account-level prefix) + let account_prefix_start = format!("{}#", account1); + let account_prefix_end = format!("{}~", account1); // ~ is after # + let account_range = tree.range( + account_prefix_start.as_bytes()..account_prefix_end.as_bytes(), + u64::MAX, + None, + ); + assert_eq!(account_range.count(), 50); // 5 users * 10 data items + + // Test 4: Query for non-existent account + let non_existent_start = "acc999#"; + let non_existent_end = "acc999~"; + let non_existent_range = tree.range( + non_existent_start.as_bytes()..non_existent_end.as_bytes(), + u64::MAX, + None, + ); + assert_eq!(non_existent_range.count(), 0); + + // Test 5: Query for non-existent user in existing account + let non_user_key = format!("{}#user999#data0001", account1); + assert!(!tree.contains_key(non_user_key.as_bytes(), u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + // Should have filter queries for all lookups + assert!( + final_queries > initial_queries, + "filter queries should have increased for segmented lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_single_byte_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(2))) + .open()?; + + // Insert single-byte keys + for i in 0u8..10 { + tree.insert([i], format!("value_{}", i).as_bytes(), 0); + } + + // Insert two-byte keys + for i in 0u8..10 { + tree.insert([i, i], format!("value_{}{}", i, i).as_bytes(), 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // All keys should be found + for i in 0u8..10 { + assert!(tree.contains_key([i], u64::MAX)?); + assert!(tree.contains_key([i, i], u64::MAX)?); + } + + // Non-existent single-byte key + assert!(!tree.contains_key([255], u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + // Should have queries for all lookups + assert!( + final_queries > initial_queries, + "filter queries should increase for single/two-byte key lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_null_bytes() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(3))) + .open()?; + + // Insert keys with null bytes + tree.insert(b"\0\0\0data", b"null_prefix", 0); + tree.insert(b"pre\0fix", b"null_middle", 0); + tree.insert(b"suffix\0", b"null_end", 0); + tree.insert(b"\0", b"single_null", 0); + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // All keys should be found + assert!(tree.contains_key(b"\0\0\0data", u64::MAX)?); + assert!(tree.contains_key(b"pre\0fix", u64::MAX)?); + assert!(tree.contains_key(b"suffix\0", u64::MAX)?); + assert!(tree.contains_key(b"\0", u64::MAX)?); + + // Non-existent keys with null bytes + assert!(!tree.contains_key(b"\0\0\0missing", u64::MAX)?); + assert!(!tree.contains_key(b"pre\0missing", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for null byte key lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_non_ascii() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(6))) + .open()?; + + // Insert keys with UTF-8 characters + tree.insert("prefix_测试_data".as_bytes(), b"chinese", 0); + tree.insert("prefix_тест_data".as_bytes(), b"cyrillic", 0); + tree.insert("prefix_🦀_data".as_bytes(), b"emoji", 0); + tree.insert("prefix_café".as_bytes(), b"accented", 0); + + // Insert binary keys (non-UTF8) + tree.insert([0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA], b"binary", 0); + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // All keys should be found + assert!(tree.contains_key("prefix_测试_data".as_bytes(), u64::MAX)?); + assert!(tree.contains_key("prefix_тест_data".as_bytes(), u64::MAX)?); + assert!(tree.contains_key("prefix_🦀_data".as_bytes(), u64::MAX)?); + assert!(tree.contains_key("prefix_café".as_bytes(), u64::MAX)?); + assert!(tree.contains_key([0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA], u64::MAX)?); + + // Non-existent keys + assert!(!tree.contains_key("prefix_missing".as_bytes(), u64::MAX)?); + assert!(!tree.contains_key([0xFF, 0xFE, 0xFD, 0x00, 0x00, 0x00], u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for non-ASCII key lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_keys_as_prefixes() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(4))) + .open()?; + + // Insert keys where some are prefixes of others + tree.insert(b"a", b"value1", 0); + tree.insert(b"ab", b"value2", 0); + tree.insert(b"abc", b"value3", 0); + tree.insert(b"abcd", b"value4", 0); + tree.insert(b"abcde", b"value5", 0); + tree.insert(b"abcdef", b"value6", 0); + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // All keys should be found regardless of prefix relationships + assert!(tree.contains_key(b"a", u64::MAX)?); + assert!(tree.contains_key(b"ab", u64::MAX)?); + assert!(tree.contains_key(b"abc", u64::MAX)?); + assert!(tree.contains_key(b"abcd", u64::MAX)?); + assert!(tree.contains_key(b"abcde", u64::MAX)?); + assert!(tree.contains_key(b"abcdef", u64::MAX)?); + + // Non-existent keys with same prefix + assert!(!tree.contains_key(b"abcdx", u64::MAX)?); + assert!(!tree.contains_key(b"abx", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for prefix-related key lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_very_long_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(10))) + .open()?; + + // Create very long keys + let long_key1 = vec![b'a'; 10000]; + let long_key2 = vec![b'b'; 10000]; + let mut long_key3 = vec![b'c'; 5000]; + long_key3.extend(vec![b'd'; 5000]); + + tree.insert(&long_key1, b"long1", 0); + tree.insert(&long_key2, b"long2", 0); + tree.insert(&long_key3, b"long3", 0); + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // All long keys should be found + assert!(tree.contains_key(&long_key1, u64::MAX)?); + assert!(tree.contains_key(&long_key2, u64::MAX)?); + assert!(tree.contains_key(&long_key3, u64::MAX)?); + + // Non-existent long key + let non_existent = vec![b'x'; 10000]; + assert!(!tree.contains_key(&non_existent, u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for very long key lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_all_same_byte() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(5))) + .open()?; + + // Insert keys that are all the same byte + for len in 1..=10 { + let key = vec![b'x'; len]; + tree.insert(&key, format!("value_{}", len).as_bytes(), 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // All keys should be found + for len in 1..=10 { + let key = vec![b'x'; len]; + assert!(tree.contains_key(&key, u64::MAX)?); + } + + // Non-existent key with same pattern + assert!(!tree.contains_key(vec![b'x'; 15], u64::MAX)?); + assert!(!tree.contains_key(vec![b'y'; 5], u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for same-byte key lookups" + ); + } + + Ok(()) +} + +// Custom extractor that returns many prefixes for stress testing +struct ManyPrefixExtractor; + +impl PrefixExtractor for ManyPrefixExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + let mut prefixes = Vec::new(); + + // Generate all possible prefixes (up to 20 or key length) + for i in 1..=key.len().min(20) { + prefixes.push(&key[0..i]); + } + + // Also add the full key + if !prefixes.is_empty() { + prefixes.push(key); + } + + Box::new(prefixes.into_iter()) + } + + fn name(&self) -> &str { + "ManyPrefixExtractor" + } +} + +#[test] +fn test_prefix_filter_many_prefixes() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(ManyPrefixExtractor)) + .open()?; + + // Insert keys that will generate many prefixes + tree.insert(b"this_is_a_very_long_key_for_testing", b"value1", 0); + tree.insert(b"another_long_key_with_many_prefixes", b"value2", 0); + tree.insert(b"short", b"value3", 0); + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // All keys should be found + assert!(tree.contains_key(b"this_is_a_very_long_key_for_testing", u64::MAX)?); + assert!(tree.contains_key(b"another_long_key_with_many_prefixes", u64::MAX)?); + assert!(tree.contains_key(b"short", u64::MAX)?); + + // Test non-existent key + assert!(!tree.contains_key(b"non_existent_key_with_many_prefixes", u64::MAX)?); + + // Range queries should work with many prefixes + let range = tree.range(b"this".as_ref().., u64::MAX, None); + assert!(range.count() > 0); + + let range = tree.range(b"anot".as_ref().., u64::MAX, None); + assert!(range.count() > 0); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for many-prefix extractor" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_disabled() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Create tree with filter disabled + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(5))) + .bloom_bits_per_key(0) // Disable filter + .open()?; + + // Insert some keys + for i in 0..100 { + let key = format!("disabled_{:04}", i); + tree.insert(key.as_bytes(), b"value", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Keys should still be found (via actual disk lookups) + for i in 0..100 { + let key = format!("disabled_{:04}", i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + // Should have no filter queries when disabled + assert_eq!( + final_queries, initial_queries, + "No filter queries when disabled" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_false_positive_rate() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Use higher bits per key for lower false positive rate + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(8))) + .bloom_bits_per_key(20) // Higher bits for lower FP rate + .open()?; + + // Insert a specific set of keys + for i in 0..1000 { + let key = format!("fptest_{:06}", i * 2); // Even numbers only + tree.insert(key.as_bytes(), b"value", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.0.metrics.io_skipped_by_filter(); + + let mut false_positives = 0; + let total_checks = 1000; + + // Check for non-existent keys (odd numbers) + for i in 0..total_checks { + let key = format!("fptest_{:06}", i * 2 + 1); + if tree.contains_key(key.as_bytes(), u64::MAX)? { + false_positives += 1; + } + } + + // With 20 bits per key, false positive rate should be very low + let fp_rate = false_positives as f64 / total_checks as f64; + assert!( + fp_rate < 0.01, + "False positive rate {} should be less than 1% with 20 bits per key", + fp_rate + ); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + let final_hits = tree.0.metrics.io_skipped_by_filter(); + + // Should have queries for all lookups + assert!( + final_queries > initial_queries, + "filter queries should increase for false-positive rate test" + ); + + // False positives will cause hits to increase, but most should be filtered + // The number of hits should be approximately equal to the false positive count + assert!( + final_hits <= initial_hits + (false_positives as usize) + 10, + "filter hits should only increase for false positives, not true negatives" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_mixed_domain_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedLengthExtractor::new(8))) + .open()?; + + // Mix of in-domain and out-of-domain keys + tree.insert(b"12345678_data", b"in_domain", 0); // In domain + tree.insert(b"short", b"out_of_domain", 0); // Out of domain + tree.insert(b"12345678", b"exact_length", 0); // Exact length + tree.insert(b"1234567", b"too_short", 0); // Out of domain + tree.insert(b"123456789", b"longer", 0); // In domain + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // All keys should be found + assert!(tree.contains_key(b"12345678_data", u64::MAX)?); + assert!(tree.contains_key(b"short", u64::MAX)?); + assert!(tree.contains_key(b"12345678", u64::MAX)?); + assert!(tree.contains_key(b"1234567", u64::MAX)?); + assert!(tree.contains_key(b"123456789", u64::MAX)?); + + // Non-existent keys with different domain status + assert!(!tree.contains_key(b"12345678_missing", u64::MAX)?); // Would be in domain + assert!(!tree.contains_key(b"tiny", u64::MAX)?); // Would be out of domain + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for mixed domain key lookups" + ); + } + + Ok(()) +} + +/// Test that range queries don't incorrectly skip segments when the start bound +/// doesn't exist in the filter but other keys in the range do exist +#[test] +fn test_prefix_filter_range_with_missing_start_bound() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Use full key as prefix (FullKeyExtractor) + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FullKeyExtractor)) + .open()?; + + // Insert keys b and c, but not a + tree.insert(b"b", b"value_b", 0); + tree.insert(b"c", b"value_c", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Query range a..=c + // Extract common prefix from both bounds (empty for "a" and "c") + // But now we check if start bound "a" exists - it doesn't, but segment starts with "b" + // So we can't skip the segment (different prefixes) + let mut results = Vec::new(); + for item in tree.range(&b"a"[..]..=&b"c"[..], u64::MAX, None) { + results.push(item.key()?.to_vec()); + } + + // Should return b and c (even though a doesn't exist) + assert_eq!(results.len(), 2); + assert_eq!(results[0], b"b"); + assert_eq!(results[1], b"c"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + assert_eq!( + final_queries, initial_queries, + "filter should not be queried" + ); + } + + Ok(()) +} + +/// Test the new optimization: when range has no common prefix but start bound prefix doesn't exist +#[test] +fn test_prefix_filter_range_start_prefix_optimization() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Use a fixed prefix extractor + let tree = Config::new(&folder) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(3))) + .open()?; + + // Insert keys that all start with "bbb" + tree.insert(b"bbb_1", b"value1", 0); + tree.insert(b"bbb_2", b"value2", 0); + tree.insert(b"bbb_3", b"value3", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Query range aaa..zzz (no common prefix) + // The segment starts with "bbb" and "aaa" doesn't exist + // But since segment min ("bbb") != start prefix ("aaa"), we can't skip + let mut results = Vec::new(); + for item in tree.range(&b"aaa"[..]..&b"zzz"[..], u64::MAX, None) { + results.push(item.key()?.to_vec()); + } + assert_eq!(results.len(), 3, "Should find all bbb keys"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + assert_eq!( + final_queries, initial_queries, + "filter should not be queried" + ); + } + + // Now test where we CAN skip: segment that starts with same prefix as missing start bound + let tree2 = Config::new(folder.path().join("test2")) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(3))) + .open()?; + + // Create a tree with keys having prefix "aaa" and "aac" but not "aab" + tree2.insert(b"aaa_1", b"value1", 0); + tree2.insert(b"aaa_2", b"value2", 0); + tree2.insert(b"aac_1", b"value3", 0); + tree2.insert(b"aac_2", b"value4", 0); + tree2.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree2.0.metrics.filter_queries(); + + // First verify the tree has data + assert!(tree2.contains_key(b"aaa_1", u64::MAX)?); + assert!(tree2.contains_key(b"aac_1", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + assert_eq!( + final_queries, initial_queries, + "filter should not be queried" + ); + } + + #[cfg(feature = "metrics")] + let initial_queries = tree2.0.metrics.filter_queries(); + + // Query for range with common prefix "aab" - no keys exist with this prefix + // Range: aab_1..aab_9 has common prefix "aab" + // The segment contains "aaa" and "aac" keys, so it overlaps the range + // filter will be checked for "aab" and should indicate it doesn't exist + let range_iter = tree2.range(&b"aab_1"[..]..&b"aab_9"[..], u64::MAX, None); + let results: Vec<_> = range_iter.collect(); + assert_eq!( + results.len(), + 0, + "No keys should match since aab prefix doesn't exist" + ); + + #[cfg(feature = "metrics")] + { + let final_queries = tree2.0.metrics.filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for range operations" + ); + } + + Ok(()) +} + +/// Test that range queries correctly handle different prefix scenarios: +/// same prefix, different prefixes, and non-existent prefixes +#[test] +fn test_prefix_filter_range_across_different_prefixes() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(5))) + .open()?; + + // Store keys with same prefix + tree.insert("user1_a", "v1", 0); + tree.insert("user1_b", "v2", 0); + tree.flush_active_memtable(0)?; + + tree.insert("user2_a", "v3", 1); + tree.insert("user2_b", "v4", 1); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Query with common prefix "user1" - should find entries + let count = tree + .range("user1_a"..="user1_z", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 2, "Should find user1 entries"); + + // Query with non-existent prefix - should return nothing + let count = tree + .range("user3_a"..="user3_z", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 0, "Should find no user3 entries"); + + // Query across different prefixes - no common prefix + let count = tree + .range("user1_a"..="user2_b", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 4, "Should find all entries when no common prefix"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + // Range queries with common prefix should trigger filter + assert!( + final_queries > initial_queries, + "filter queries should increase for range operations" + ); + } + + Ok(()) +} + +/// Test range queries with reversed bounds (should return empty) +#[test] +fn test_prefix_filter_range_reversed_bounds() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path()) + .prefix_extractor(Arc::new(FullKeyExtractor)) + .open()?; + + // Insert some keys + tree.insert("a", "value_a", 0); + tree.insert("b", "value_b", 0); + tree.insert("c", "value_c", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Query with reversed bounds - should return empty + let count = tree.range("c".."a", lsm_tree::SeqNo::MAX, None).count(); + assert_eq!(count, 0, "Reversed bounds should return empty"); + + // Also test with excluded bounds reversed + use std::ops::Bound; + let count = tree + .range::<&str, _>( + (Bound::Excluded("c"), Bound::Included("a")), + lsm_tree::SeqNo::MAX, + None, + ) + .count(); + assert_eq!(count, 0, "Reversed excluded bounds should return empty"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + // Reversed bounds may skip filter entirely + assert_eq!( + final_queries, initial_queries, + "filter should not be queried for reversed (empty) ranges" + ); + } + + Ok(()) +} + +/// Test range with same key but different bound types +#[test] +fn test_prefix_filter_range_same_key_different_bounds() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path()) + .prefix_extractor(Arc::new(FullKeyExtractor)) + .open()?; + + tree.insert("key", "value", 0); + tree.insert("key2", "value2", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.0.metrics.io_skipped_by_filter(); + + // Included..Excluded with same key (empty range) + use std::ops::Bound; + let count = tree + .range::<&str, _>( + (Bound::Included("key"), Bound::Excluded("key")), + lsm_tree::SeqNo::MAX, + None, + ) + .count(); + assert_eq!(count, 0, "Included..Excluded same key should be empty"); + + // Excluded..Included with same key (empty range) + let count = tree + .range::<&str, _>( + (Bound::Excluded("key"), Bound::Included("key")), + lsm_tree::SeqNo::MAX, + None, + ) + .count(); + assert_eq!(count, 0, "Excluded..Included same key should be empty"); + + // Included..Included with same key (single item) + let count = tree + .range::<&str, _>( + (Bound::Included("key"), Bound::Included("key")), + lsm_tree::SeqNo::MAX, + None, + ) + .count(); + assert_eq!(count, 1, "Included..Included same key should return 1"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + let final_hits = tree.0.metrics.io_skipped_by_filter(); + + // Range queries should use filter even with same key bounds + assert!( + final_queries > initial_queries, + "filter queries should increase for same-key range operations" + ); + + // Keys exist, hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +/// Test range with non-consecutive keys having common prefix +#[test] +fn test_prefix_filter_range_non_consecutive_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(3))) + .open()?; + + // Insert non-consecutive keys with same prefix + tree.insert("app_1", "v1", 0); + tree.insert("app_3", "v3", 0); + tree.insert("app_5", "v5", 0); + tree.insert("app_7", "v7", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.0.metrics.io_skipped_by_filter(); + + // Query for range that includes missing keys + let count = tree + .range("app_2"..="app_6", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 2, "Should find app_3 and app_5"); + + // Query for range entirely between existing keys + let count = tree + .range("app_4".."app_5", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 0, "No keys in range app_4..app_5"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + let final_hits = tree.0.metrics.io_skipped_by_filter(); + + // Concurrent access should still use filters + assert!( + final_queries > initial_queries, + "filter queries should work with sequence consistency" + ); + + // Keys exist at various sequence numbers, hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase (keys exist or filtered by seqno)" + ); + } + + Ok(()) +} + +/// Test range queries across multiple segments with different prefixes +#[test] +fn test_prefix_filter_range_multiple_segments() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(4))) + .open()?; + + // Create first segment with user prefix + tree.insert("user_001", "v1", 0); + tree.insert("user_002", "v2", 0); + tree.flush_active_memtable(0)?; + + // Create second segment with item prefix + tree.insert("item_001", "v3", 1); + tree.insert("item_002", "v4", 1); + tree.flush_active_memtable(0)?; + + // Create third segment with both prefixes + tree.insert("user_003", "v5", 2); + tree.insert("item_003", "v6", 2); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Query across all segments + let count = tree + .range("item_001"..="user_003", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 6, "Should find all items and users in range"); + + // Query for non-existent prefix across segments + let count = tree + .range("test_001"..="test_999", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 0, "Non-existent prefix should return nothing"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + // Queries across multiple segments should check filters + assert!( + final_queries > initial_queries, + "filter queries should increase for multi-segment range queries" + ); + } + + Ok(()) +} + +/// Test range with keys where prefix changes at segment boundary +#[test] +fn test_prefix_filter_range_prefix_boundary() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(3))) + .open()?; + + // First segment ends with "aaz" + tree.insert("aax_1", "v1", 0); + tree.insert("aay_1", "v2", 0); + tree.insert("aaz_1", "v3", 0); + tree.flush_active_memtable(0)?; + + // Second segment starts with "aba" (different prefix) + tree.insert("aba_1", "v4", 1); + tree.insert("abb_1", "v5", 1); + tree.insert("abc_1", "v6", 1); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Query across the boundary + let count = tree + .range("aay_1"..="abb_1", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 4, "Should find keys from both segments"); + + // Query that spans missing prefix between segments + let count = tree + .range("aaz_2"..="aba_0", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 0, "No keys in the gap between segments"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + // Common prefix is only 2 chars ("aa" and "ab"), less than extractor length (3) + // So filter may be bypassed + assert_eq!( + final_queries, initial_queries, + "filter should be bypassed when common prefix is shorter than extractor" + ); + } + + Ok(()) +} + +/// Test range with no prefix extractor (should not use filter optimization) +#[test] +fn test_prefix_filter_range_no_extractor() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Create tree without prefix extractor + let tree = Config::new(folder.path()).open()?; + + // Insert various keys + tree.insert("a", "v1", 0); + tree.insert("b", "v2", 0); + tree.insert("c", "v3", 0); + tree.insert("d", "v4", 0); + tree.insert("e", "v5", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Range queries should work normally without filter optimization + let count = tree.range("a"..="c", lsm_tree::SeqNo::MAX, None).count(); + assert_eq!(count, 3, "Should find a, b, c"); + + let count = tree.range("b"..="d", lsm_tree::SeqNo::MAX, None).count(); + assert_eq!(count, 3, "Should find b, c, d"); + + // Empty range + let count = tree.range("f"..="z", lsm_tree::SeqNo::MAX, None).count(); + assert_eq!(count, 0, "Should find nothing"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + // Without prefix extractor, filter optimization is not used for ranges + assert_eq!( + final_queries, initial_queries, + "filter should not be used for ranges without prefix extractor" + ); + } + + Ok(()) +} + +/// Test range with both bounds excluded +#[test] +fn test_prefix_filter_range_both_excluded() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path()) + .prefix_extractor(Arc::new(FullKeyExtractor)) + .open()?; + + // Insert keys + for key in ["a", "b", "c", "d", "e"] { + tree.insert(key, "value", 0); + } + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.0.metrics.io_skipped_by_filter(); + + // Test with both bounds excluded + use std::ops::Bound; + let count = tree + .range::<&str, _>( + (Bound::Excluded("a"), Bound::Excluded("e")), + lsm_tree::SeqNo::MAX, + None, + ) + .count(); + assert_eq!(count, 3, "Should return b, c, d"); + + // Edge case: adjacent keys with both excluded + let count = tree + .range::<&str, _>( + (Bound::Excluded("b"), Bound::Excluded("c")), + lsm_tree::SeqNo::MAX, + None, + ) + .count(); + assert_eq!(count, 0, "No keys between adjacent excluded bounds"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + let final_hits = tree.0.metrics.io_skipped_by_filter(); + + // Range queries with excluded bounds may or may not use filter + // depending on prefix extraction logic + assert!( + final_queries >= initial_queries, + "filter queries should not decrease for excluded bound ranges" + ); + + // All keys exist, hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +/// Test range after compaction with prefix filters +#[test] +fn test_prefix_filter_range_after_compaction() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(4))) + .open()?; + + // Create multiple segments + for i in 0..3 { + tree.insert(format!("user_{}", i), format!("v{}", i), i); + tree.insert(format!("item_{}", i), format!("i{}", i), i); + tree.flush_active_memtable(0)?; + } + + // Skip compaction test since it's not implemented + // tree.major_compact(u64::MAX)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Verify range queries still work after compaction + let count = tree + .range("user_0"..="user_2", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 3, "Should find all user keys after compaction"); + + let count = tree + .range("item_0"..="item_2", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 3, "Should find all item keys after compaction"); + + // Query across prefixes + let count = tree + .range("item_1"..="user_1", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 4, "Should find mixed keys after compaction"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + // Range queries with common prefix should use filter + assert!( + final_queries > initial_queries, + "filter queries should increase for range operations after compaction" + ); + } + + Ok(()) +} + +/// Test range with Unicode/UTF-8 prefix boundaries +#[test] +fn test_prefix_filter_range_utf8_boundaries() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(6))) // 6 bytes = 2 UTF-8 chars for these emojis + .open()?; + + // Insert keys with emoji prefixes (each emoji is 3-4 bytes) + tree.insert("🎈🎈_001", "v1", 0); + tree.insert("🎈🎈_002", "v2", 0); + tree.insert("🎉🎉_001", "v3", 0); + tree.insert("🎉🎉_002", "v4", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Query within same emoji prefix + let count = tree + .range("🎈🎈_001"..="🎈🎈_002", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 2, "Should find keys with balloon prefix"); + + // Query across different emoji prefixes + let count = tree + .range("🎈🎈_002"..="🎉🎉_001", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 2, "Should find keys across emoji boundaries"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + // Emoji prefixes should trigger filter checks + assert!( + final_queries > initial_queries, + "filter queries should increase for UTF-8 boundary range queries" + ); + } + + Ok(()) +} + +/// Test with custom extractor returning multiple prefixes +#[test] +fn test_prefix_filter_range_multi_prefix_extractor() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Custom extractor that returns multiple prefixes + struct MultiPrefixExtractor; + impl PrefixExtractor for MultiPrefixExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + if key.len() >= 6 { + // Return both 3-byte and 6-byte prefixes + Box::new(vec![&key[..3], &key[..6]].into_iter()) + } else if key.len() >= 3 { + Box::new(std::iter::once(&key[..3])) + } else { + Box::new(std::iter::once(key)) + } + } + fn name(&self) -> &str { + "MultiPrefixExtractor" + } + } + + let tree = Config::new(folder.path()) + .prefix_extractor(Arc::new(MultiPrefixExtractor)) + .open()?; + + tree.insert("abc123_data", "v1", 0); + tree.insert("abc456_data", "v2", 0); + tree.insert("def123_data", "v3", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.0.metrics.io_skipped_by_filter(); + + // Query should work with common 3-byte prefix + let count = tree + .range("abc000"..="abc999", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 2, "Should find keys with abc prefix"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + let final_hits = tree.0.metrics.io_skipped_by_filter(); + + // Segmented extractor should use filters for prefix matching + assert!( + final_queries > initial_queries, + "filter queries should work with segmented extractor" + ); + + // All keys exist, so hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +/// Test range with bytes at UTF-8 boundary splitting +#[test] +fn test_prefix_filter_range_utf8_split() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Use a fixed byte extractor that might split UTF-8 chars + let tree = Config::new(folder.path()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(2))) + .open()?; + + // Insert keys with multi-byte UTF-8 characters + tree.insert("中文_1", "v1", 0); + tree.insert("中文_2", "v2", 0); + tree.insert("日本_1", "v3", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.0.metrics.io_skipped_by_filter(); + + // The prefix will be the first 2 bytes, which splits the UTF-8 character + // This tests that the implementation handles partial UTF-8 correctly + let count = tree + .range("中文_1"..="中文_2", lsm_tree::SeqNo::MAX, None) + .count(); + assert_eq!(count, 2, "Should find keys despite UTF-8 splitting"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + let final_hits = tree.0.metrics.io_skipped_by_filter(); + + // Range query should use filter even with UTF-8 split + assert!( + final_queries > initial_queries, + "filter queries should increase for UTF-8 split range" + ); + + // Keys exist, hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +/// Test empty range (start > end after normalization) +#[test] +fn test_prefix_filter_empty_normalized_range() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path()) + .prefix_extractor(Arc::new(FullKeyExtractor)) + .open()?; + + tree.insert("b", "value", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.0.metrics.io_skipped_by_filter(); + + // Create a range that becomes empty after normalization + use std::ops::Bound; + let count = tree + .range::<&str, _>( + (Bound::Excluded("b"), Bound::Excluded("b")), + lsm_tree::SeqNo::MAX, + None, + ) + .count(); + assert_eq!(count, 0, "Empty normalized range should return nothing"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + let final_hits = tree.0.metrics.io_skipped_by_filter(); + + // Empty normalized range may skip filter + assert_eq!( + final_queries, initial_queries, + "filter should not be queried for empty normalized range" + ); + + assert_eq!( + final_hits, initial_hits, + "filter hits should not change for empty range" + ); + } + + Ok(()) +} From 92253d7db8bbf88f3816cf0d5c877941764accce Mon Sep 17 00:00:00 2001 From: zaidoon Date: Thu, 18 Sep 2025 23:57:30 -0400 Subject: [PATCH 2/2] add prefix extractor compatibility check for filters --- src/segment/inner.rs | 4 + src/segment/meta.rs | 11 + src/segment/mod.rs | 121 ++++++++-- src/segment/writer/mod.rs | 11 +- tests/prefix_filter.rs | 449 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 572 insertions(+), 24 deletions(-) diff --git a/src/segment/inner.rs b/src/segment/inner.rs index 739345e3..5e88018a 100644 --- a/src/segment/inner.rs +++ b/src/segment/inner.rs @@ -47,6 +47,10 @@ pub struct Inner { /// Prefix extractor for filters pub prefix_extractor: Option, + /// Whether the prefix extractor is compatible with the one used during segment creation + /// If false, prefix filter should not be used for this segment + pub prefix_extractor_compatible: bool, + // /// Pinned filter // #[doc(hidden)] // pub bloom_filter: Option, diff --git a/src/segment/meta.rs b/src/segment/meta.rs index c2ef1109..3711b73d 100644 --- a/src/segment/meta.rs +++ b/src/segment/meta.rs @@ -44,6 +44,10 @@ pub struct ParsedMeta { pub tombstone_count: u64, pub data_block_compression: CompressionType, + + /// Name of the prefix extractor used when creating this segment + /// None if no prefix extractor was configured + pub prefix_extractor_name: Option, } impl ParsedMeta { @@ -179,6 +183,12 @@ impl ParsedMeta { CompressionType::decode_from(&mut bytes)? }; + let prefix_extractor_name = { + block + .point_read(b"#prefix_extractor", SeqNo::MAX) + .map(|bytes| String::from_utf8_lossy(&bytes.value).into_owned()) + }; + Ok(Self { id, created_at, @@ -190,6 +200,7 @@ impl ParsedMeta { item_count, tombstone_count, data_block_compression, + prefix_extractor_name, }) } } diff --git a/src/segment/mod.rs b/src/segment/mod.rs index 228e2ed6..0714c2a9 100644 --- a/src/segment/mod.rs +++ b/src/segment/mod.rs @@ -179,18 +179,32 @@ impl Segment { if let Some(block) = &self.pinned_filter_block { let filter = StandardBloomFilterReader::new(&block.data)?; - #[cfg(feature = "metrics")] - self.metrics.filter_queries.fetch_add(1, Relaxed); - let may_contain = if let Some(ref extractor) = self.prefix_extractor { - // If prefix extractor is configured, use prefix-based filtering - // None means out-of-domain - these keys bypass filter - filter - .contains_prefix(key, extractor.as_ref()) - .unwrap_or(true) + if self.prefix_extractor_compatible { + #[cfg(feature = "metrics")] + self.metrics.filter_queries.fetch_add(1, Relaxed); + + // If prefix extractor is configured and compatible, use prefix-based filtering + // None means out-of-domain - these keys bypass filter + filter + .contains_prefix(key, extractor.as_ref()) + .unwrap_or(true) + } else { + // Extractor exists but is incompatible - disable filter entirely + true + } } else { - // No prefix extractor, use standard hash-based filtering - filter.contains_hash(key_hash) + // No prefix extractor configured now + if self.prefix_extractor_compatible { + #[cfg(feature = "metrics")] + self.metrics.filter_queries.fetch_add(1, Relaxed); + + // Segment was also created without prefix extractor - use standard hash-based filtering + filter.contains_hash(key_hash) + } else { + // Segment was created with prefix extractor, but none configured now - disable filter + true + } }; if !may_contain { @@ -207,18 +221,32 @@ impl Segment { )?; let filter = StandardBloomFilterReader::new(&block.data)?; - #[cfg(feature = "metrics")] - self.metrics.filter_queries.fetch_add(1, Relaxed); - let may_contain = if let Some(ref extractor) = self.prefix_extractor { - // If prefix extractor is configured, use prefix-based filtering - // None means out-of-domain - these keys bypass filter - filter - .contains_prefix(key, extractor.as_ref()) - .unwrap_or(true) + if self.prefix_extractor_compatible { + #[cfg(feature = "metrics")] + self.metrics.filter_queries.fetch_add(1, Relaxed); + + // If prefix extractor is configured and compatible, use prefix-based filtering + // None means out-of-domain - these keys bypass filter + filter + .contains_prefix(key, extractor.as_ref()) + .unwrap_or(true) + } else { + // Extractor exists but is incompatible - disable filter entirely + true + } } else { - // No prefix extractor, use standard hash-based filtering - filter.contains_hash(key_hash) + // No prefix extractor configured now + if self.prefix_extractor_compatible { + #[cfg(feature = "metrics")] + self.metrics.filter_queries.fetch_add(1, Relaxed); + + // Segment was also created without prefix extractor - use standard hash-based filtering + filter.contains_hash(key_hash) + } else { + // Segment was created with prefix extractor, but none configured now - disable filter + true + } }; if !may_contain { @@ -335,6 +363,11 @@ impl Segment { return false; }; + // Don't use prefix filtering if extractor is incompatible + if !self.prefix_extractor_compatible { + return false; + } + // Try pinned filter block first if let Some(block) = &self.pinned_filter_block { if let Ok(filter) = StandardBloomFilterReader::new(&block.data) { @@ -412,6 +445,11 @@ impl Segment { return false; }; + // Don't use prefix filtering if extractor is incompatible + if !self.prefix_extractor_compatible { + return false; + } + // First try: Check filter using common prefix from range bounds if let Some(common_prefix) = self.extract_common_prefix_for_filter(range, &**prefix_extractor) @@ -548,6 +586,44 @@ impl Segment { log::debug!("Reading meta block, with meta_ptr={:?}", regions.metadata); let metadata = ParsedMeta::load_with_handle(&file, ®ions.metadata)?; + // Check prefix extractor compatibility + let prefix_extractor_compatible = match (&metadata.prefix_extractor_name, &prefix_extractor) + { + // No extractor configured on either side - compatible + (None, None) => true, + + (None, Some(_)) => { + log::warn!( + "Segment {} was created without prefix extractor, but one is now configured. Prefix filter will be disabled for this segment.", + metadata.id + ); + false + } + + (Some(_), None) => { + log::warn!( + "Segment {} was created with prefix extractor, but none is configured now. Prefix filter will be disabled for this segment.", + metadata.id + ); + false + } + + (Some(stored_name), Some(current_extractor)) => { + let current_name = current_extractor.name(); + if stored_name == current_name { + true + } else { + log::warn!( + "Segment {} was created with prefix extractor '{}', but current extractor is '{}'. Prefix filter will be disabled for this segment.", + metadata.id, + stored_name, + current_name + ); + false + } + } + }; + let block_index = if let Some(index_block_handle) = regions.index { log::debug!( "Creating partitioned block index, with tli_ptr={:?}, index_block_ptr={index_block_handle:?}", @@ -627,6 +703,7 @@ impl Segment { pinned_filter_block, prefix_extractor, + prefix_extractor_compatible, is_deleted: AtomicBool::default(), @@ -676,8 +753,8 @@ impl Segment { /// Returns true if the segment might contain data (or if we can't determine). #[must_use] pub fn might_contain_range>(&self, range: &R) -> bool { - // If no prefix extractor, we can't use filter optimization - if self.prefix_extractor.is_none() { + // If no prefix extractor or extractor is incompatible, we can't use filter optimization + if self.prefix_extractor.is_none() || !self.prefix_extractor_compatible { return true; } diff --git a/src/segment/writer/mod.rs b/src/segment/writer/mod.rs index 85473783..40a079e1 100644 --- a/src/segment/writer/mod.rs +++ b/src/segment/writer/mod.rs @@ -355,7 +355,7 @@ impl Writer { InternalValue::from_components(key, value, 0, crate::ValueType::Value) } - let meta_items = [ + let mut meta_items = vec![ meta("#checksum_type", b"xxh3"), meta("#compression#data", &self.compression.encode_into_vec()), meta("#compression#index", &self.compression.encode_into_vec()), @@ -380,6 +380,13 @@ impl Writer { self.meta.first_key.as_ref().expect("should exist"), ), meta("#key_count", &(self.meta.key_count as u64).to_le_bytes()), + ]; + + if let Some(ref extractor) = self.prefix_extractor { + meta_items.push(meta("#prefix_extractor", extractor.name().as_bytes())); + } + + meta_items.extend([ meta("#prefix_truncation#data", &[1]), meta("#prefix_truncation#index", &[0]), meta("#seqno#max", &self.meta.highest_seqno.to_le_bytes()), @@ -396,7 +403,7 @@ impl Writer { meta("v#lsmt", env!("CARGO_PKG_VERSION").as_bytes()), meta("v#table", b"3"), // TODO: tli_handle_count - ]; + ]); // NOTE: Just to make sure the items are definitely sorted #[cfg(debug_assertions)] diff --git a/tests/prefix_filter.rs b/tests/prefix_filter.rs index 4d85e409..060a88b5 100644 --- a/tests/prefix_filter.rs +++ b/tests/prefix_filter.rs @@ -2209,3 +2209,452 @@ fn test_prefix_filter_empty_normalized_range() -> lsm_tree::Result<()> { Ok(()) } + +/// A test prefix extractor that extracts a fixed prefix with a custom name +struct TestPrefixExtractor { + length: usize, + name: String, +} + +impl TestPrefixExtractor { + fn new(length: usize, name: &str) -> Self { + Self { + length, + name: name.to_string(), + } + } +} + +impl PrefixExtractor for TestPrefixExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + if key.len() >= self.length { + Box::new(std::iter::once(&key[..self.length])) + } else { + Box::new(std::iter::once(key)) + } + } + + fn name(&self) -> &str { + &self.name + } +} + +#[test] +fn test_same_extractor_compatibility() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + let extractor = Arc::new(TestPrefixExtractor::new(4, "test_extractor")); + + // Create a tree with prefix extractor + { + let tree = Config::new(path) + .prefix_extractor(extractor.clone()) + .open()?; + + tree.insert("user_key1", "value1", 0); + tree.insert("user_key2", "value2", 0); + tree.insert("data_key1", "value3", 0); + tree.flush_active_memtable(0)?; + } + + // Reopen with the same extractor - should work fine with prefix filtering + { + let tree = Config::new(path).prefix_extractor(extractor).open()?; + + // Should be able to use prefix filtering + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + assert_eq!( + &*tree.get("user_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value1" + ); + assert_eq!( + &*tree.get("user_key2", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value2" + ); + assert_eq!( + &*tree.get("data_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value3" + ); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + // Should have incremented filter queries since extractor is compatible + assert!( + final_queries > initial_queries, + "Compatible extractor should increment filter queries: {} -> {}", + initial_queries, + final_queries + ); + } + + // Test range queries with prefix filtering optimization + let items: Vec<_> = tree + .range("user"..="user_zzzz", lsm_tree::SeqNo::MAX, None) + .collect(); + assert_eq!(items.len(), 2); + } + + Ok(()) +} + +#[test] +fn test_different_extractor_incompatible() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + let extractor1 = Arc::new(TestPrefixExtractor::new(4, "test_extractor_v1")); + let extractor2 = Arc::new(TestPrefixExtractor::new(4, "test_extractor_v2")); + + // Create a tree with first extractor + { + let tree = Config::new(path).prefix_extractor(extractor1).open()?; + + tree.insert("user_key1", "value1", 0); + tree.insert("user_key2", "value2", 0); + tree.insert("data_key1", "value3", 0); + tree.flush_active_memtable(0)?; + } + + // Reopen with different extractor - should disable prefix filtering for old segments + { + let tree = Config::new(path).prefix_extractor(extractor2).open()?; + + // Should still work, but without prefix filtering optimization for old segments + // The incompatible extractor means filter is completely bypassed + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + assert_eq!( + &*tree.get("user_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value1" + ); + assert_eq!( + &*tree.get("user_key2", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value2" + ); + assert_eq!( + &*tree.get("data_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value3" + ); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + // Should NOT have incremented filter queries since extractor is incompatible + assert_eq!( + final_queries, initial_queries, + "Incompatible extractor should not increment filter queries: {} -> {}", + initial_queries, final_queries + ); + } + + // Range queries should still work correctly (but without optimization for old segments) + let items: Vec<_> = tree + .range("user"..="user_zzzz", lsm_tree::SeqNo::MAX, None) + .collect(); + assert_eq!(items.len(), 2); + + // New writes should use the new extractor + tree.insert("test_key1", "value4", 1); + tree.flush_active_memtable(0)?; + + assert_eq!( + &*tree.get("test_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value4" + ); + } + + Ok(()) +} + +#[test] +fn test_no_extractor_to_extractor() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + // Create a tree without prefix extractor + { + let tree = Config::new(path).open()?; + + tree.insert("user_key1", "value1", 0); + tree.insert("user_key2", "value2", 0); + tree.insert("data_key1", "value3", 0); + tree.flush_active_memtable(0)?; + } + + // Reopen with prefix extractor - should disable prefix filtering for old segments + { + let extractor = Arc::new(TestPrefixExtractor::new(4, "test_extractor")); + let tree = Config::new(path).prefix_extractor(extractor).open()?; + + // Should still work, but old segments won't use prefix filtering + assert_eq!( + &*tree.get("user_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value1" + ); + assert_eq!( + &*tree.get("user_key2", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value2" + ); + assert_eq!( + &*tree.get("data_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value3" + ); + + // New writes should use prefix extractor + tree.insert("test_key1", "value4", 1); + tree.flush_active_memtable(0)?; + + assert_eq!( + &*tree.get("test_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value4" + ); + } + + Ok(()) +} + +#[test] +fn test_extractor_to_no_extractor() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + let extractor = Arc::new(TestPrefixExtractor::new(4, "test_extractor")); + + // Create a tree with prefix extractor + { + let tree = Config::new(path).prefix_extractor(extractor).open()?; + + tree.insert("user_key1", "value1", 0); + tree.insert("user_key2", "value2", 0); + tree.insert("data_key1", "value3", 0); + tree.flush_active_memtable(0)?; + } + + // Reopen without prefix extractor - should disable prefix filtering for old segments + { + let tree = Config::new(path).open()?; + + // Should still work, but old segments won't use prefix filtering + assert_eq!( + &*tree.get("user_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value1" + ); + assert_eq!( + &*tree.get("user_key2", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value2" + ); + assert_eq!( + &*tree.get("data_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value3" + ); + + // Range queries should still work + let items: Vec<_> = tree + .range("user"..="user_zzzz", lsm_tree::SeqNo::MAX, None) + .collect(); + assert_eq!(items.len(), 2); + } + + Ok(()) +} + +#[test] +fn test_builtin_extractors_compatibility() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + // Create with FixedPrefixExtractor + { + let tree = Config::new(path) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(4))) + .open()?; + + tree.insert("user_key1", "value1", 0); + tree.insert("user_key2", "value2", 0); + tree.flush_active_memtable(0)?; + } + + // Reopen with FixedLengthExtractor (different name) - should be incompatible + { + let tree = Config::new(path) + .prefix_extractor(Arc::new(FixedLengthExtractor::new(4))) + .open()?; + + // Should work but without prefix filtering for old segments + assert_eq!( + &*tree.get("user_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value1" + ); + assert_eq!( + &*tree.get("user_key2", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value2" + ); + } + + // Reopen with same type (FixedPrefixExtractor) - should be compatible + { + let tree = Config::new(path) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(4))) + .open()?; + + // Should work with prefix filtering for old segments + assert_eq!( + &*tree.get("user_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value1" + ); + assert_eq!( + &*tree.get("user_key2", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value2" + ); + } + + Ok(()) +} + +#[test] +fn test_new_segments_use_new_extractor() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + let extractor1 = Arc::new(TestPrefixExtractor::new(4, "old_extractor")); + let extractor2 = Arc::new(TestPrefixExtractor::new(4, "new_extractor")); + + // Create first segment with old extractor + { + let tree = Config::new(path).prefix_extractor(extractor1).open()?; + + tree.insert("old_key1", "value1", 0); + tree.insert("old_key2", "value2", 0); + tree.flush_active_memtable(0)?; + } + + // Reopen with new extractor and create new segment + { + let tree = Config::new(path).prefix_extractor(extractor2).open()?; + + // Add data to create a new segment with the new extractor + tree.insert("new_key1", "value3", 1); + tree.insert("new_key2", "value4", 1); + tree.flush_active_memtable(0)?; + + // Test that old segment uses no filtering (extractor incompatible) + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // Query old keys - should NOT increment filter queries (incompatible extractor) + assert_eq!( + &*tree.get("old_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value1" + ); + assert_eq!( + &*tree.get("old_key2", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value2" + ); + + #[cfg(feature = "metrics")] + let after_old_queries = tree.0.metrics.filter_queries(); + + // Query new keys - SHOULD increment filter queries (compatible extractor) + assert_eq!( + &*tree.get("new_key1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value3" + ); + assert_eq!( + &*tree.get("new_key2", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value4" + ); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + + // Old keys should not have incremented filter queries + assert_eq!( + after_old_queries, initial_queries, + "Old keys should not increment filter queries due to incompatible extractor" + ); + + // New keys should have incremented filter queries + assert!( + final_queries > after_old_queries, + "New keys should increment filter queries with compatible extractor: {} -> {}", + after_old_queries, + final_queries + ); + } + } + + Ok(()) +} + +#[test] +fn test_multiple_extractor_changes() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + let extractor1 = Arc::new(TestPrefixExtractor::new(2, "v1")); + let extractor2 = Arc::new(TestPrefixExtractor::new(2, "v2")); + let extractor3 = Arc::new(TestPrefixExtractor::new(2, "v3")); + + // Create segments with different extractors over time + { + let tree = Config::new(path).prefix_extractor(extractor1).open()?; + tree.insert("aa_data1", "value1", 0); + tree.flush_active_memtable(0)?; + } + + { + let tree = Config::new(path).prefix_extractor(extractor2).open()?; + tree.insert("bb_data2", "value2", 0); + tree.flush_active_memtable(0)?; + } + + { + let tree = Config::new(path).prefix_extractor(extractor3).open()?; + tree.insert("cc_data3", "value3", 0); + tree.flush_active_memtable(0)?; + + // Only the last segment should use filtering + #[cfg(feature = "metrics")] + let initial_queries = tree.0.metrics.filter_queries(); + + // These should not increment filter queries (incompatible) + assert_eq!( + &*tree.get("aa_data1", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value1" + ); + assert_eq!( + &*tree.get("bb_data2", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value2" + ); + + #[cfg(feature = "metrics")] + let middle_queries = tree.0.metrics.filter_queries(); + + // This should increment filter queries (compatible) + assert_eq!( + &*tree.get("cc_data3", lsm_tree::SeqNo::MAX)?.unwrap(), + b"value3" + ); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.0.metrics.filter_queries(); + assert_eq!( + middle_queries, initial_queries, + "Old segments should not increment metrics" + ); + assert!( + final_queries > middle_queries, + "New segment should increment metrics" + ); + } + } + + Ok(()) +}