diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index 2fe7464..12c50f6 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -4,7 +4,7 @@ use crate::bloom::utils::BloomFilterType; use crate::configs; use crate::configs::{ BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, - BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, + BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX, TIGHTENING_RATIO_MIN, }; use std::sync::atomic::Ordering; use valkey_module::ContextFlags; @@ -110,11 +110,13 @@ pub fn bloom_filter_add_value( None => { // Instantiate empty bloom filter. let fp_rate = configs::BLOOM_FP_RATE_DEFAULT; + let tightening_ratio = configs::TIGHTENING_RATIO; let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32; let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); let mut bloom = match BloomFilterType::new_reserved( fp_rate, + tightening_ratio, capacity, expansion, use_random_seed, @@ -279,8 +281,10 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); // Skip bloom filter size validation on replicated cmds. let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); + let tightening_ratio = configs::TIGHTENING_RATIO; let bloom = match BloomFilterType::new_reserved( fp_rate, + tightening_ratio, capacity, expansion, use_random_seed, @@ -302,6 +306,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); + let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED); // At the very least, we need: BF.INSERT ITEMS if argc < 4 { return Err(ValkeyError::WrongArity); @@ -311,6 +316,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey let filter_name = &input_args[idx]; idx += 1; let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT; + let mut tightening_ratio = configs::TIGHTENING_RATIO; let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32; let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; let mut nocreate = false; @@ -331,6 +337,21 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } }; } + "TIGHTENING" if replicated_cmd => { + if idx >= (argc - 1) { + return Err(ValkeyError::WrongArity); + } + idx += 1; + tightening_ratio = match input_args[idx].to_string_lossy().parse::() { + Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num, + Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => { + return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE)); + } + _ => { + return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO)); + } + }; + } "CAPACITY" => { if idx >= (argc - 1) { return Err(ValkeyError::WrongArity); @@ -387,7 +408,6 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } }; // Skip bloom filter size validation on replicated cmds. - let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); let mut add_succeeded = false; match value { Some(bloom) => { @@ -398,7 +418,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey bloom, true, &mut add_succeeded, - validate_size_limit, + !replicated_cmd, ); replicate_and_notify_events(ctx, filter_name, add_succeeded, false); response @@ -410,10 +430,11 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); let mut bloom = match BloomFilterType::new_reserved( fp_rate, + tightening_ratio, capacity, expansion, use_random_seed, - validate_size_limit, + !replicated_cmd, ) { Ok(bf) => bf, Err(err) => return Err(ValkeyError::Str(err.as_str())), @@ -425,7 +446,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey &mut bloom, true, &mut add_succeeded, - validate_size_limit, + !replicated_cmd, ); match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { Ok(()) => { diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 58131b4..a985db1 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -71,6 +71,9 @@ impl ValkeyDataType for BloomFilterType { let Ok(fp_rate) = raw::load_double(rdb) else { return None; }; + let Ok(tightening_ratio) = raw::load_double(rdb) else { + return None; + }; let mut filters: Vec = Vec::with_capacity(num_filters as usize); let Ok(is_seed_random_u64) = raw::load_unsigned(rdb) else { return None; @@ -121,6 +124,7 @@ impl ValkeyDataType for BloomFilterType { let item = BloomFilterType { expansion: expansion as u32, fp_rate, + tightening_ratio, is_seed_random, filters, }; @@ -131,6 +135,7 @@ impl ValkeyDataType for BloomFilterType { fn debug_digest(&self, mut dig: Digest) { dig.add_long_long(self.expansion.into()); dig.add_string_buffer(&self.fp_rate.to_le_bytes()); + dig.add_string_buffer(&self.tightening_ratio.to_le_bytes()); for filter in &self.filters { dig.add_string_buffer(filter.bloom.as_slice()); dig.add_long_long(filter.num_items.into()); diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 5894a24..89ba9ca 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -1,6 +1,9 @@ use super::data_type::BLOOM_TYPE_VERSION; use crate::{ - configs::{self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN}, + configs::{ + self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX, + TIGHTENING_RATIO_MIN, + }, metrics, }; use bloomfilter; @@ -22,6 +25,8 @@ pub const BAD_EXPANSION: &str = "ERR bad expansion"; pub const BAD_CAPACITY: &str = "ERR bad capacity"; pub const BAD_ERROR_RATE: &str = "ERR bad error rate"; pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)"; +pub const BAD_ERROR_RATIO: &str = "ERR bad error ratio"; +pub const ERROR_RATIO_RANGE: &str = "ERR (0 < error ratio range < 1)"; pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)"; pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters"; pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received"; @@ -67,6 +72,7 @@ impl BloomError { pub struct BloomFilterType { pub expansion: u32, pub fp_rate: f64, + pub tightening_ratio: f64, pub is_seed_random: bool, pub filters: Vec, } @@ -75,6 +81,7 @@ impl BloomFilterType { /// Create a new BloomFilterType object. pub fn new_reserved( fp_rate: f64, + tightening_ratio: f64, capacity: u32, expansion: u32, use_random_seed: bool, @@ -99,6 +106,7 @@ impl BloomFilterType { let bloom = BloomFilterType { expansion, fp_rate, + tightening_ratio, filters, is_seed_random: use_random_seed, }; @@ -120,6 +128,7 @@ impl BloomFilterType { BloomFilterType { expansion: from_bf.expansion, fp_rate: from_bf.fp_rate, + tightening_ratio: from_bf.tightening_ratio, is_seed_random: from_bf.is_seed_random, filters, } @@ -264,12 +273,13 @@ impl BloomFilterType { 1 => { // always use new version to init bloomFilterType. // This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future. - let (expansion, fp_rate, is_seed_random, filters): ( + let (expansion, fp_rate, tightening_ratio, is_seed_random, filters): ( u32, f64, + f64, bool, Vec, - ) = match bincode::deserialize::<(u32, f64, bool, Vec)>( + ) = match bincode::deserialize::<(u32, f64, f64, bool, Vec)>( &decoded_bytes[1..], ) { Ok(values) => { @@ -281,10 +291,13 @@ impl BloomFilterType { if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) { return Err(BloomError::ErrorRateRange); } - if values.3.len() >= configs::MAX_FILTERS_PER_OBJ as usize { + if !(values.2 > TIGHTENING_RATIO_MIN && values.2 < TIGHTENING_RATIO_MAX) { + return Err(BloomError::ErrorRateRange); + } + if values.4.len() >= configs::MAX_FILTERS_PER_OBJ as usize { return Err(BloomError::MaxNumScalingFilters); } - for _filter in values.3.iter() { + for _filter in values.4.iter() { // Reject the request, if the operation will result in creation of a filter of size greater than what is allowed. if validate_size_limit && _filter.number_of_bytes() @@ -303,6 +316,7 @@ impl BloomFilterType { let item = BloomFilterType { expansion, fp_rate, + tightening_ratio, is_seed_random, filters, }; @@ -579,6 +593,14 @@ mod tests { .any(|filter| (filter.seed() == restore_filter.seed()) && (restore_filter.seed() == configs::FIXED_SEED)))); } + assert_eq!( + restored_bloom_filter_type.fp_rate, + original_bloom_filter_type.fp_rate + ); + assert_eq!( + restored_bloom_filter_type.tightening_ratio, + original_bloom_filter_type.tightening_ratio + ); assert_eq!( restored_bloom_filter_type.capacity(), original_bloom_filter_type.capacity() @@ -633,12 +655,14 @@ mod tests { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; + let tightening_ratio: f64 = 0.5; let initial_capacity = 10000; // Expansion of 0 indicates non scaling. let expansion = 0; // Validate the non scaling behavior of the bloom filter. let mut bf = BloomFilterType::new_reserved( expected_fp_rate, + tightening_ratio, initial_capacity, expansion, is_seed_random, @@ -695,11 +719,13 @@ mod tests { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; + let tightening_ratio: f64 = 0.5; let initial_capacity = 10000; let expansion = 2; let num_filters_to_scale = 5; let mut bf = BloomFilterType::new_reserved( expected_fp_rate, + tightening_ratio, initial_capacity, expansion, is_seed_random, @@ -779,18 +805,19 @@ mod tests { fn test_exceeded_size_limit() { // Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond // the configured limit. - let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true, true); + let result = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, u32::MAX, 1, true, true); assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize)); let capacity = 50000000; assert!(!BloomFilter::validate_size(capacity, 0.001_f64)); - let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true, true); + let result2 = BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, true, true); assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize)); } #[rstest(expansion, case::nonscaling(0), case::scaling(2))] fn test_bf_encode_and_decode(expansion: u32) { let mut bf = - BloomFilterType::new_reserved(0.5_f64, 1000_u32, expansion, true, true).unwrap(); + BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, expansion, true, true) + .unwrap(); let item = "item1"; let _ = bf.add_item(item.as_bytes(), true); // action @@ -803,6 +830,7 @@ mod tests { let new_bf = new_bf_result.unwrap(); // verify new_bf and bf assert_eq!(bf.fp_rate, new_bf.fp_rate); + assert_eq!(bf.tightening_ratio, new_bf.tightening_ratio); assert_eq!(bf.expansion, new_bf.expansion); assert_eq!(bf.capacity(), new_bf.capacity()); // verify item1 exists. @@ -812,7 +840,8 @@ mod tests { #[test] fn test_bf_decode_when_unsupported_version_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap(); + let mut bf = + BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true).unwrap(); @@ -834,7 +863,8 @@ mod tests { #[test] fn test_bf_decode_when_bytes_is_empty_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap(); + let mut bf = + BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); @@ -854,7 +884,8 @@ mod tests { #[test] fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap(); + let mut bf = + BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); let origin_expansion = bf.expansion; @@ -882,7 +913,7 @@ mod tests { // 3. build a larger than 64mb filter let extra_large_filter = - BloomFilterType::new_reserved(0.01_f64, 57000000, 2, true, false).unwrap(); + BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, true, false).unwrap(); let vec = extra_large_filter.encode_bloom_filter().unwrap(); // should return error assert_eq!( diff --git a/src/configs.rs b/src/configs.rs index 7ede53e..e16fc11 100644 --- a/src/configs.rs +++ b/src/configs.rs @@ -36,6 +36,8 @@ lazy_static! { // Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to // maintain the bloom object's overall fp_rate to the configured value. pub const TIGHTENING_RATIO: f64 = 0.5; +pub const TIGHTENING_RATIO_MIN: f64 = 0.0; +pub const TIGHTENING_RATIO_MAX: f64 = 1.0; // Max number of filters allowed within a bloom object. pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX; /// Below constants are fixed seed and sip keys to help create bloom objects using the same seed and to restore the bloom objects with the same hasher which diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index a1e9a53..bf4d13d 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -22,6 +22,7 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu raw::save_unsigned(rdb, v.filters.len() as u64); raw::save_unsigned(rdb, v.expansion as u64); raw::save_double(rdb, v.fp_rate); + raw::save_double(rdb, v.tightening_ratio); let mut is_seed_random = 0; if v.is_seed_random { is_seed_random = 1; diff --git a/tests/test_bloom_metrics.py b/tests/test_bloom_metrics.py index 4f33ed1..ddf7e8d 100644 --- a/tests/test_bloom_metrics.py +++ b/tests/test_bloom_metrics.py @@ -4,7 +4,7 @@ from valkeytests.conftest import resource_port_tracker from util.waiters import * -DEFAULT_BLOOM_FILTER_SIZE = 179952 +DEFAULT_BLOOM_FILTER_SIZE = 179960 DEFAULT_BLOOM_FILTER_CAPACITY = 100000 class TestBloomMetrics(ValkeyBloomTestCaseBase):