diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index cd61763..dad87a0 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -4,7 +4,7 @@ use crate::bloom::utils::BloomFilterType; use crate::configs; use crate::configs::{ BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, - BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, + BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX, TIGHTENING_RATIO_MIN, }; use std::sync::atomic::Ordering; use valkey_module::ContextFlags; @@ -110,10 +110,12 @@ pub fn bloom_filter_add_value( None => { // Instantiate empty bloom filter. let fp_rate = configs::BLOOM_FP_RATE_DEFAULT; + let tightening_ratio = configs::TIGHTENING_RATIO; let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32; let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; let mut bloom = match BloomFilterType::new_reserved( fp_rate, + tightening_ratio, capacity, expansion, validate_size_limit, @@ -228,6 +230,16 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke } }; curr_cmd_idx += 1; + let tightening_ratio = match input_args[curr_cmd_idx].to_string_lossy().parse::() { + Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num, + Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => { + return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE)); + } + _ => { + return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO)); + } + }; + curr_cmd_idx += 1; // Parse the capacity let capacity = match input_args[curr_cmd_idx].to_string_lossy().parse::() { Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num, @@ -278,6 +290,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); let bloom = match BloomFilterType::new_reserved( fp_rate, + tightening_ratio, capacity, expansion, validate_size_limit, @@ -307,6 +320,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey let filter_name = &input_args[idx]; idx += 1; let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT; + let mut tightening_ratio = configs::TIGHTENING_RATIO; let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32; let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; let mut nocreate = false; @@ -327,6 +341,18 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } }; } + "RATIO" => { + idx += 1; + tightening_ratio = match input_args[idx].to_string_lossy().parse::() { + Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num, + Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => { + return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE)); + } + _ => { + return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO)); + } + }; + } "CAPACITY" => { if idx >= (argc - 1) { return Err(ValkeyError::WrongArity); @@ -405,6 +431,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } let mut bloom = match BloomFilterType::new_reserved( fp_rate, + tightening_ratio, capacity, expansion, validate_size_limit, diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 187afc5..0de5cbd 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -75,6 +75,9 @@ impl ValkeyDataType for BloomFilterType { let Ok(fp_rate) = raw::load_double(rdb) else { return None; }; + let Ok(tightening_ratio) = raw::load_double(rdb) else { + return None; + }; for i in 0..num_filters { let Ok(bitmap) = raw::load_string_buffer(rdb) else { return None; @@ -124,6 +127,7 @@ impl ValkeyDataType for BloomFilterType { let item = BloomFilterType { expansion: expansion as u32, fp_rate, + tightening_ratio, filters, }; Some(item) @@ -133,6 +137,7 @@ impl ValkeyDataType for BloomFilterType { fn debug_digest(&self, mut dig: Digest) { dig.add_long_long(self.expansion.into()); dig.add_string_buffer(&self.fp_rate.to_le_bytes()); + // dig.add_string_buffer(&self.tightening_ratio.to_le_bytes()); for filter in &self.filters { dig.add_string_buffer(&filter.bloom.bitmap()); for &(key1, key2) in &filter.sip_keys() { diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 84adbcb..0a4e1d9 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -1,6 +1,7 @@ use crate::{ configs::{ self, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, + TIGHTENING_RATIO_MAX, TIGHTENING_RATIO_MIN, }, metrics, }; @@ -24,6 +25,8 @@ pub const BAD_EXPANSION: &str = "ERR bad expansion"; pub const BAD_CAPACITY: &str = "ERR bad capacity"; pub const BAD_ERROR_RATE: &str = "ERR bad error rate"; pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)"; +pub const BAD_ERROR_RATIO: &str = "ERR bad error ratio"; +pub const ERROR_RATIO_RANGE: &str = "ERR (0 < error ratio range < 1)"; pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)"; pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters"; pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received"; @@ -69,6 +72,7 @@ impl BloomError { pub struct BloomFilterType { pub expansion: u32, pub fp_rate: f64, + pub tightening_ratio: f64, pub filters: Vec, } @@ -76,6 +80,7 @@ impl BloomFilterType { /// Create a new BloomFilterType object. pub fn new_reserved( fp_rate: f64, + tightening_ratio: f64, capacity: u32, expansion: u32, validate_size_limit: bool, @@ -97,6 +102,7 @@ impl BloomFilterType { let bloom = BloomFilterType { expansion, fp_rate, + tightening_ratio, filters, }; Ok(bloom) @@ -117,6 +123,7 @@ impl BloomFilterType { BloomFilterType { expansion: from_bf.expansion, fp_rate: from_bf.fp_rate, + tightening_ratio: from_bf.tightening_ratio, filters, } } @@ -245,39 +252,47 @@ impl BloomFilterType { 1 => { // always use new version to init bloomFilterType. // This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future. - let (expansion, fp_rate, filters): (u32, f64, Vec) = - match bincode::deserialize::<(u32, f64, Vec)>(&decoded_bytes[1..]) - { - Ok(values) => { - if !(BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&values.0) { - return Err(BloomError::BadExpansion); - } - if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) { - return Err(BloomError::ErrorRateRange); - } - if values.2.len() >= configs::MAX_FILTERS_PER_OBJ as usize { - return Err(BloomError::MaxNumScalingFilters); - } - for _filter in values.2.iter() { - // Reject the request, if the operation will result in creation of a filter of size greater than what is allowed. - if validate_size_limit - && _filter.number_of_bytes() - > configs::BLOOM_MEMORY_LIMIT_PER_FILTER - .load(Ordering::Relaxed) - as usize - { - return Err(BloomError::ExceedsMaxBloomSize); - } - } - values + let (expansion, fp_rate, tightening_ratio, filters): ( + u32, + f64, + f64, + Vec, + ) = match bincode::deserialize::<(u32, f64, f64, Vec)>( + &decoded_bytes[1..], + ) { + Ok(values) => { + if !(BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&values.0) { + return Err(BloomError::BadExpansion); + } + if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) { + return Err(BloomError::ErrorRateRange); } - Err(_) => { - return Err(BloomError::DecodeBloomFilterFailed); + if !(values.2 > TIGHTENING_RATIO_MIN && values.1 < TIGHTENING_RATIO_MAX) { + return Err(BloomError::ErrorRateRange); } - }; + if values.3.len() >= configs::MAX_FILTERS_PER_OBJ as usize { + return Err(BloomError::MaxNumScalingFilters); + } + for _filter in values.3.iter() { + // Reject the request, if the operation will result in creation of a filter of size greater than what is allowed. + if validate_size_limit + && _filter.number_of_bytes() + > configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) + as usize + { + return Err(BloomError::ExceedsMaxBloomSize); + } + } + values + } + Err(_) => { + return Err(BloomError::DecodeBloomFilterFailed); + } + }; let item = BloomFilterType { expansion, fp_rate, + tightening_ratio, filters, }; // add bloom filter type metrics. @@ -613,13 +628,19 @@ mod tests { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; + let expected_tightening_ratio: f64 = 0.5; let initial_capacity = 10000; // Expansion of 0 indicates non scaling. let expansion = 0; // Validate the non scaling behavior of the bloom filter. - let mut bf = - BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true) - .expect("Expect bloom creation to succeed"); + let mut bf = BloomFilterType::new_reserved( + expected_fp_rate, + expected_tightening_ratio, + initial_capacity, + expansion, + true, + ) + .expect("Expect bloom creation to succeed"); let (error_count, add_operation_idx) = add_items_till_capacity(&mut bf, initial_capacity as i64, 1, &rand_prefix); assert_eq!( @@ -671,12 +692,18 @@ mod tests { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; + let expected_tightening_ratio: f64 = 0.5; let initial_capacity = 10000; let expansion = 2; let num_filters_to_scale = 5; - let mut bf = - BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true) - .expect("Expect bloom creation to succeed"); + let mut bf = BloomFilterType::new_reserved( + expected_fp_rate, + expected_tightening_ratio, + initial_capacity, + expansion, + true, + ) + .expect("Expect bloom creation to succeed"); assert_eq!(bf.capacity(), initial_capacity as i64); assert_eq!(bf.cardinality(), 0); let mut total_error_count = 0; @@ -749,18 +776,18 @@ mod tests { fn test_exceeded_size_limit() { // Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond // the configured limit. - let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true); + let result = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, u32::MAX, 1, true); assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize)); let capacity = 50000000; assert!(!BloomFilter::validate_size(capacity, 0.001_f64)); - let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true); + let result2 = BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, true); assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize)); } #[test] fn test_bf_encode_and_decode() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); @@ -779,6 +806,7 @@ mod tests { // verify new_bf and bf assert_eq!(bf.fp_rate, new_bf.fp_rate); + assert_eq!(bf.tightening_ratio, new_bf.tightening_ratio); assert_eq!(bf.expansion, new_bf.expansion); assert_eq!(bf.capacity(), new_bf.capacity()); @@ -789,7 +817,7 @@ mod tests { #[test] fn test_bf_decode_when_unsupported_version_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true).unwrap(); @@ -811,7 +839,7 @@ mod tests { #[test] fn test_bf_decode_when_bytes_is_empty_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); @@ -831,7 +859,7 @@ mod tests { #[test] fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); let origin_expansion = bf.expansion; @@ -873,7 +901,7 @@ mod tests { // 3. build a larger than 64mb filter let extra_large_filter = - BloomFilterType::new_reserved(0.01_f64, 57000000, 2, false).unwrap(); + BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, false).unwrap(); let vec = extra_large_filter.encode_bloom_filter().unwrap(); // should return error assert_eq!( diff --git a/src/configs.rs b/src/configs.rs index db83832..f2e6d00 100644 --- a/src/configs.rs +++ b/src/configs.rs @@ -32,6 +32,8 @@ lazy_static! { // Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to // maintain the bloom object's overall fp_rate to the configured value. pub const TIGHTENING_RATIO: f64 = 0.5; +pub const TIGHTENING_RATIO_MIN: f64 = 0.001; +pub const TIGHTENING_RATIO_MAX: f64 = 1.0; // Max number of filters allowed within a bloom object. pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX; /// Below constants are fixed seed and sip keys to help create bloom objects using the same seed and to restore the bloom objects with the same hasher which