diff --git a/Cargo.toml b/Cargo.toml index 555174a..d205dbc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ homepage = "https://github.com/valkey-io/valkey-bloom" valkey-module = "0.1.2" valkey-module-macros = "0" linkme = "0" -bloomfilter = { version = "1.0.13", features = ["serde"] } +bloomfilter = { version = "3.0.1", features = ["serde"] } lazy_static = "1.4.0" libc = "0.2" serde = { version = "1.0", features = ["derive"] } diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 187afc5..c8af83a 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -1,8 +1,5 @@ use crate::bloom::utils::BloomFilter; use crate::bloom::utils::BloomFilterType; -use crate::configs::{ - FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B, -}; use crate::metrics::BLOOM_NUM_OBJECTS; use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES; use crate::wrapper::bloom_callback; @@ -61,7 +58,6 @@ pub trait ValkeyDataType { impl ValkeyDataType for BloomFilterType { /// Callback to load and parse RDB data of a bloom item and create it. fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option { - let mut filters = Vec::new(); if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION { logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str()); return None; @@ -75,24 +71,28 @@ impl ValkeyDataType for BloomFilterType { let Ok(fp_rate) = raw::load_double(rdb) else { return None; }; + let mut filters: Vec = Vec::with_capacity(num_filters as usize); + for i in 0..num_filters { let Ok(bitmap) = raw::load_string_buffer(rdb) else { return None; }; - let Ok(number_of_bits) = raw::load_unsigned(rdb) else { + let Ok(capacity) = raw::load_unsigned(rdb) else { return None; }; - // Reject RDB Load if any bloom filter within a bloom object of a size greater than what is allowed. - if !BloomFilter::validate_size_with_bits(number_of_bits) { + let new_fp_rate = match Self::calculate_fp_rate(fp_rate, num_filters as i32) { + Ok(rate) => rate, + Err(_) => { + logging::log_warning( + "Failed to restore bloom object: Reached max number of filters", + ); + return None; + } + }; + if !BloomFilter::validate_size(capacity as u32, new_fp_rate) { logging::log_warning("Failed to restore bloom object because it contains a filter larger than the max allowed size limit."); return None; } - let Ok(number_of_hash_functions) = raw::load_unsigned(rdb) else { - return None; - }; - let Ok(capacity) = raw::load_unsigned(rdb) else { - return None; - }; // Only load num_items when it's the last filter let num_items = if i == num_filters - 1 { match raw::load_unsigned(rdb) { @@ -102,18 +102,8 @@ impl ValkeyDataType for BloomFilterType { } else { capacity }; - let sip_keys = [ - (FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B), - (FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B), - ]; - let filter = BloomFilter::from_existing( - bitmap.as_ref(), - number_of_bits, - number_of_hash_functions as u32, - sip_keys, - num_items as u32, - capacity as u32, - ); + let filter = + BloomFilter::from_existing(bitmap.as_ref(), num_items as u32, capacity as u32); filters.push(filter); } BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( @@ -134,11 +124,7 @@ impl ValkeyDataType for BloomFilterType { dig.add_long_long(self.expansion.into()); dig.add_string_buffer(&self.fp_rate.to_le_bytes()); for filter in &self.filters { - dig.add_string_buffer(&filter.bloom.bitmap()); - for &(key1, key2) in &filter.sip_keys() { - dig.add_long_long(key1 as i64); - dig.add_long_long(key2 as i64); - } + dig.add_string_buffer(filter.bloom.as_slice()); dig.add_long_long(filter.num_items.into()); dig.add_long_long(filter.capacity.into()); } diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 84adbcb..f066d05 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -5,10 +5,11 @@ use crate::{ metrics, }; use bloomfilter; +use bloomfilter::{deserialize, serialize}; use serde::{Deserialize, Serialize}; -use std::{mem, sync::atomic::Ordering}; use super::data_type::BLOOM_TYPE_VERSION; +use std::{mem, sync::atomic::Ordering}; /// KeySpace Notification Events pub const ADD_EVENT: &str = "bloom.add"; @@ -104,7 +105,7 @@ impl BloomFilterType { /// Create a new BloomFilterType object from an existing one. pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType { - let mut filters = Vec::new(); + let mut filters: Vec = Vec::with_capacity(from_bf.filters.len()); metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed); metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( mem::size_of::(), @@ -188,11 +189,9 @@ impl BloomFilterType { } // Scale out by adding a new filter with capacity bounded within the u32 range. false positive rate is also // bound within the range f64::MIN_POSITIVE <= x < 1.0. - let new_fp_rate = match self.fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) { - x if x > f64::MIN_POSITIVE => x, - _ => { - return Err(BloomError::MaxNumScalingFilters); - } + let new_fp_rate = match Self::calculate_fp_rate(self.fp_rate, num_filters) { + Ok(rate) => rate, + Err(e) => return Err(e), }; let new_capacity = match filter.capacity.checked_mul(self.expansion) { Some(new_capacity) => new_capacity, @@ -231,6 +230,14 @@ impl BloomFilterType { } } + /// Calculate the false positive rate for the Nth filter using tightening ratio. + pub fn calculate_fp_rate(fp_rate: f64, num_filters: i32) -> Result { + match fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) { + x if x > f64::MIN_POSITIVE => Ok(x), + _ => Err(BloomError::MaxNumScalingFilters), + } + } + /// Deserialize a byte array to bloom filter. /// We will need to handle any current or previous version and deserializing the bytes into a bloom object of the running Module's current version `BLOOM_TYPE_VERSION`. pub fn decode_bloom_filter( @@ -318,6 +325,7 @@ impl BloomFilterType { /// well within the u32::MAX limit. #[derive(Serialize, Deserialize)] pub struct BloomFilter { + #[serde(serialize_with = "serialize", deserialize_with = "deserialize")] pub bloom: bloomfilter::Bloom<[u8]>, pub num_items: u32, pub capacity: u32, @@ -330,7 +338,8 @@ impl BloomFilter { capacity as usize, fp_rate, &configs::FIXED_SEED, - ); + ) + .expect("We expect bloomfilter::Bloom<[u8]> creation to succeed"); let fltr = BloomFilter { bloom, num_items: 0, @@ -346,20 +355,10 @@ impl BloomFilter { } /// Create a new BloomFilter from dumped information (RDB load). - pub fn from_existing( - bitmap: &[u8], - number_of_bits: u64, - number_of_hash_functions: u32, - sip_keys: [(u64, u64); 2], - num_items: u32, - capacity: u32, - ) -> BloomFilter { - let bloom = bloomfilter::Bloom::from_existing( - bitmap, - number_of_bits, - number_of_hash_functions, - sip_keys, - ); + pub fn from_existing(bitmap: &[u8], num_items: u32, capacity: u32) -> BloomFilter { + let bloom = bloomfilter::Bloom::from_slice(bitmap) + .expect("We expect bloomfilter::Bloom<[u8]> creation to succeed"); + let fltr = BloomFilter { bloom, num_items, @@ -377,7 +376,7 @@ impl BloomFilter { } pub fn number_of_bytes(&self) -> usize { - std::mem::size_of::() + (self.bloom.number_of_bits() / 8) as usize + std::mem::size_of::() + (self.bloom.len() / 8) as usize } /// Caculates the number of bytes that the bloom filter will require to be allocated. @@ -392,17 +391,6 @@ impl BloomFilter { true } - /// Caculates the number of bytes that the bloom filter will require to be allocated using provided `number_of_bits`. - /// This is used before actually creating the bloom filter when checking if the filter is within the allowed size. - /// Returns whether the bloom filter is of a valid size or not. - pub fn validate_size_with_bits(number_of_bits: u64) -> bool { - let bytes = std::mem::size_of::() as u64 + number_of_bits; - if bytes > configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) as u64 { - return false; - } - true - } - pub fn check(&self, item: &[u8]) -> bool { self.bloom.check(item) } @@ -411,20 +399,9 @@ impl BloomFilter { self.bloom.set(item) } - pub fn sip_keys(&self) -> [(u64, u64); 2] { - self.bloom.sip_keys() - } - /// Create a new BloomFilter from an existing BloomFilter object (COPY command). pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter { - BloomFilter::from_existing( - &bf.bloom.bitmap(), - bf.bloom.number_of_bits(), - bf.bloom.number_of_hash_functions(), - bf.bloom.sip_keys(), - bf.num_items, - bf.capacity, - ) + BloomFilter::from_existing(&bf.bloom.to_bytes(), bf.num_items, bf.capacity) } } @@ -453,9 +430,7 @@ impl Drop for BloomFilter { #[cfg(test)] mod tests { use super::*; - use crate::configs::{ - FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B, - }; + use configs::FIXED_SEED; use rand::{distributions::Alphanumeric, Rng}; /// Returns random string with specified number of characters. @@ -545,10 +520,6 @@ mod tests { fp_margin: f64, rand_prefix: &String, ) { - let expected_sip_keys = [ - (FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B), - (FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B), - ]; assert_eq!( restored_bloom_filter_type.capacity(), original_bloom_filter_type.capacity() @@ -572,8 +543,8 @@ mod tests { .filters .iter() .any( - |filter| (filter.bloom.sip_keys() == restore_filter.bloom.sip_keys()) - && (restore_filter.bloom.sip_keys() == expected_sip_keys) + |filter| (filter.bloom.seed() == restore_filter.bloom.seed()) + && (restore_filter.bloom.seed() == FIXED_SEED) ))); assert!(restored_bloom_filter_type .filters @@ -589,7 +560,7 @@ mod tests { .all(|restore_filter| original_bloom_filter_type .filters .iter() - .any(|filter| filter.bloom.bitmap() == restore_filter.bloom.bitmap()))); + .any(|filter| filter.bloom.as_slice() == restore_filter.bloom.as_slice()))); let (error_count, _) = check_items_exist( restored_bloom_filter_type, 1, @@ -735,14 +706,11 @@ mod tests { } #[test] - fn test_sip_keys() { + fn test_seed() { // The value of sip keys generated by the sip_keys with fixed seed should be equal to the constant in configs.rs let test_bloom_filter = BloomFilter::new(0.5_f64, 1000_u32); - let test_sip_keys = test_bloom_filter.bloom.sip_keys(); - assert_eq!(test_sip_keys[0].0, FIXED_SIP_KEY_ONE_A); - assert_eq!(test_sip_keys[0].1, FIXED_SIP_KEY_ONE_B); - assert_eq!(test_sip_keys[1].0, FIXED_SIP_KEY_TWO_A); - assert_eq!(test_sip_keys[1].1, FIXED_SIP_KEY_TWO_B); + let seed = test_bloom_filter.bloom.seed(); + assert_eq!(seed, FIXED_SEED); } #[test] diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index e2bde8e..da9e21d 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -25,14 +25,12 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu let mut filter_list_iter = filter_list.iter().peekable(); while let Some(filter) = filter_list_iter.next() { let bloom = &filter.bloom; - let bitmap = bloom.bitmap(); + let bitmap = bloom.to_bytes(); raw::RedisModule_SaveStringBuffer.unwrap()( rdb, bitmap.as_ptr().cast::(), bitmap.len(), ); - raw::save_unsigned(rdb, bloom.number_of_bits()); - raw::save_unsigned(rdb, bloom.number_of_hash_functions() as u64); raw::save_unsigned(rdb, filter.capacity as u64); if filter_list_iter.peek().is_none() { raw::save_unsigned(rdb, filter.num_items as u64); diff --git a/tests/test_bloom_metrics.py b/tests/test_bloom_metrics.py index 29eb4e6..3df634c 100644 --- a/tests/test_bloom_metrics.py +++ b/tests/test_bloom_metrics.py @@ -4,7 +4,7 @@ from valkeytests.conftest import resource_port_tracker from util.waiters import * -DEFAULT_BLOOM_FILTER_SIZE = 179960 +DEFAULT_BLOOM_FILTER_SIZE = 179952 DEFAULT_BLOOM_FILTER_CAPACITY = 100000 class TestBloomMetrics(ValkeyBloomTestCaseBase):