diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 42a488f..7bb34aa 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -1,5 +1,8 @@ use crate::bloom::utils::BloomFilter; use crate::bloom::utils::BloomFilterType; +use crate::configs::{ + FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B, +}; use crate::wrapper::bloom_callback; use crate::MODULE_NAME; use std::os::raw::c_int; @@ -62,7 +65,7 @@ pub fn bloom_rdb_load_data_object( return None; }; let mut filters = Vec::new(); - for _ in 0..num_filters { + for i in 0..num_filters { let Ok(bitmap) = raw::load_string_buffer(rdb) else { return None; }; @@ -72,28 +75,21 @@ pub fn bloom_rdb_load_data_object( let Ok(number_of_hash_functions) = raw::load_unsigned(rdb) else { return None; }; - let Ok(sip_key_one_a) = raw::load_unsigned(rdb) else { - return None; - }; - let Ok(sip_key_one_b) = raw::load_unsigned(rdb) else { - return None; - }; - let Ok(sip_key_two_a) = raw::load_unsigned(rdb) else { - return None; - }; - let Ok(sip_key_two_b) = raw::load_unsigned(rdb) else { - return None; - }; - let Ok(num_items) = raw::load_unsigned(rdb) else { - return None; - }; let Ok(capacity) = raw::load_unsigned(rdb) else { return None; }; - + // Only load num_items when it's the last filter + let num_items = if i == num_filters - 1 { + match raw::load_unsigned(rdb) { + Ok(num_items) => num_items, + Err(_) => return None, + } + } else { + capacity + }; let sip_keys = [ - (sip_key_one_a, sip_key_one_b), - (sip_key_two_a, sip_key_two_b), + (FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B), + (FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B), ]; let filter = BloomFilter::from_existing( bitmap.as_ref(), diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 116232b..4e35bdc 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -1,5 +1,4 @@ -use crate::configs::MAX_FILTERS_PER_OBJ; -use crate::configs::TIGHTENING_RATIO; +use crate::configs::{FIXED_SEED, MAX_FILTERS_PER_OBJ, TIGHTENING_RATIO}; use bloomfilter; /// KeySpace Notification Events @@ -167,7 +166,11 @@ pub struct BloomFilter { impl BloomFilter { /// Instantiate empty BloomFilter object. pub fn new(fp_rate: f32, capacity: u32) -> BloomFilter { - let bloom = bloomfilter::Bloom::new_for_fp_rate(capacity as usize, fp_rate as f64); + let bloom = bloomfilter::Bloom::new_for_fp_rate_with_seed( + capacity as usize, + fp_rate as f64, + &FIXED_SEED, + ); BloomFilter { bloom, num_items: 0, @@ -225,6 +228,9 @@ impl BloomFilter { #[cfg(test)] mod tests { use super::*; + use crate::configs::{ + FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B, + }; use rand::{distributions::Alphanumeric, Rng}; /// Returns random string with specified number of characters. @@ -305,6 +311,79 @@ mod tests { ); } + fn verify_restored_items( + original_bloom_filter_type: &BloomFilterType, + restored_bloom_filter_type: &BloomFilterType, + add_operation_idx: i64, + expected_fp_rate: f32, + fp_margin: f32, + rand_prefix: &String, + ) { + let expected_sip_keys = [ + (FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B), + (FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B), + ]; + assert_eq!( + restored_bloom_filter_type.capacity(), + original_bloom_filter_type.capacity() + ); + assert_eq!( + restored_bloom_filter_type.cardinality(), + original_bloom_filter_type.cardinality(), + ); + assert_eq!( + restored_bloom_filter_type.free_effort(), + original_bloom_filter_type.free_effort() + ); + assert_eq!( + restored_bloom_filter_type.memory_usage(), + original_bloom_filter_type.memory_usage() + ); + assert!(restored_bloom_filter_type + .filters + .iter() + .all(|restore_filter| original_bloom_filter_type + .filters + .iter() + .any( + |filter| (filter.bloom.sip_keys() == restore_filter.bloom.sip_keys()) + && (restore_filter.bloom.sip_keys() == expected_sip_keys) + ))); + assert!(restored_bloom_filter_type + .filters + .iter() + .all(|restore_filter| original_bloom_filter_type + .filters + .iter() + .any(|filter| filter.bloom.number_of_hash_functions() + == restore_filter.bloom.number_of_hash_functions()))); + assert!(restored_bloom_filter_type + .filters + .iter() + .all(|restore_filter| original_bloom_filter_type + .filters + .iter() + .any(|filter| filter.bloom.bitmap() == restore_filter.bloom.bitmap()))); + item_exists_test( + restored_bloom_filter_type, + 1, + add_operation_idx, + expected_fp_rate, + fp_margin, + true, + rand_prefix, + ); + item_exists_test( + restored_bloom_filter_type, + add_operation_idx + 1, + add_operation_idx * 2, + expected_fp_rate, + fp_margin, + false, + rand_prefix, + ); + } + #[test] fn test_non_scaling_filter() { let rand_prefix = random_prefix(7); @@ -349,7 +428,22 @@ mod tests { fp_margin, false, &rand_prefix, - ) + ); + + // Verify restore + let mut restore_bf = BloomFilterType::create_copy_from(&bf); + assert_eq!( + restore_bf.add_item(b"new_item"), + Err(BloomError::NonScalingFilterFull) + ); + verify_restored_items( + &bf, + &restore_bf, + add_operation_idx, + expected_fp_rate, + fp_margin, + &rand_prefix, + ); } #[test] @@ -410,6 +504,28 @@ mod tests { fp_margin, false, &rand_prefix, - ) + ); + + // Verify restore + let restore_bloom_filter_type = BloomFilterType::create_copy_from(&bf); + verify_restored_items( + &bf, + &restore_bloom_filter_type, + add_operation_idx, + expected_fp_rate, + fp_margin, + &rand_prefix, + ); + } + + #[test] + fn test_sip_keys() { + // The value of sip keys generated by the sip_keys with fixed seed should be equal to the constant in configs.rs + let test_bloom_filter = BloomFilter::new(0.5_f32, 1000_u32); + let test_sip_keys = test_bloom_filter.bloom.sip_keys(); + assert_eq!(test_sip_keys[0].0, FIXED_SIP_KEY_ONE_A); + assert_eq!(test_sip_keys[0].1, FIXED_SIP_KEY_ONE_B); + assert_eq!(test_sip_keys[1].0, FIXED_SIP_KEY_TWO_A); + assert_eq!(test_sip_keys[1].1, FIXED_SIP_KEY_TWO_B); } } diff --git a/src/configs.rs b/src/configs.rs index fc5a240..6b11229 100644 --- a/src/configs.rs +++ b/src/configs.rs @@ -22,3 +22,13 @@ lazy_static! { /// Constants pub const TIGHTENING_RATIO: f32 = 0.5; pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX; +/// Below constants are fixed seed and sip keys to help create bloom objects using the same seed and to restore the bloom objects with the same hasher which +/// generated using rust crate bloomfilter https://crates.io/crates/bloomfilter +pub const FIXED_SEED: [u8; 32] = [ + 89, 15, 245, 34, 234, 120, 17, 218, 167, 20, 216, 9, 59, 62, 123, 217, 29, 137, 138, 115, 62, + 152, 136, 135, 48, 127, 151, 205, 40, 7, 51, 131, +]; +pub const FIXED_SIP_KEY_ONE_A: u64 = 15713473521876537177; +pub const FIXED_SIP_KEY_ONE_B: u64 = 15671187751654921383; +pub const FIXED_SIP_KEY_TWO_A: u64 = 9766223185946773789; +pub const FIXED_SIP_KEY_TWO_B: u64 = 9453907914610147120; diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index afa6bc2..3aa03a3 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -16,7 +16,8 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu raw::save_unsigned(rdb, v.expansion as u64); raw::save_float(rdb, v.fp_rate); let filter_list = &v.filters; - for filter in filter_list { + let mut filter_list_iter = filter_list.iter().peekable(); + while let Some(filter) = filter_list_iter.next() { let bloom = &filter.bloom; let bitmap = bloom.bitmap(); raw::RedisModule_SaveStringBuffer.unwrap()( @@ -26,13 +27,10 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu ); raw::save_unsigned(rdb, bloom.number_of_bits()); raw::save_unsigned(rdb, bloom.number_of_hash_functions() as u64); - let sip_keys = bloom.sip_keys(); - raw::save_unsigned(rdb, sip_keys[0].0); - raw::save_unsigned(rdb, sip_keys[0].1); - raw::save_unsigned(rdb, sip_keys[1].0); - raw::save_unsigned(rdb, sip_keys[1].1); - raw::save_unsigned(rdb, filter.num_items as u64); raw::save_unsigned(rdb, filter.capacity as u64); + if filter_list_iter.peek().is_none() { + raw::save_unsigned(rdb, filter.num_items as u64); + } } } diff --git a/tests/test_save_and_restore.py b/tests/test_save_and_restore.py new file mode 100644 index 0000000..4787ba9 --- /dev/null +++ b/tests/test_save_and_restore.py @@ -0,0 +1,40 @@ +import pytest, time +import os +from valkey_test_case import ValkeyTestCase + +class TestBloomSaveRestore(ValkeyTestCase): + + def get_custom_args(self): + self.set_server_version(os.environ['SERVER_VERSION']) + return { + 'loadmodule': os.getenv('MODULE_PATH'), + } + + def test_basic_save_and_restore(self): + client = self.server.get_new_client() + bf_add_result_1 = client.execute_command('BF.ADD testSave item') + assert bf_add_result_1 == 1 + bf_exists_result_1 = client.execute_command('BF.EXISTS testSave item') + assert bf_exists_result_1 == 1 + bf_info_result_1 = client.execute_command('BF.INFO testSave') + assert(len(bf_info_result_1)) != 0 + curr_item_count_1 = client.info_obj().num_keys() + + # save rdb, restart sever + client.bgsave() + self.server.wait_for_save_done() + uptime_in_sec_1 = self.client.info_obj().uptime_in_secs() + time.sleep(0.5) + self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) + uptime_in_sec_2 = self.client.info_obj().uptime_in_secs() + assert self.server.is_alive() + assert uptime_in_sec_1 > uptime_in_sec_2 + assert self.server.is_rdb_done_loading() + + # verify restore results + curr_item_count_2 = client.info_obj().num_keys() + assert curr_item_count_2 == curr_item_count_1 + bf_exists_result_2 = client.execute_command('BF.EXISTS testSave item') + assert bf_exists_result_2 == 1 + bf_info_result_2 = client.execute_command('BF.INFO testSave') + assert bf_info_result_2 == bf_info_result_1 diff --git a/tests/valkeytests/valkey_test_case.py b/tests/valkeytests/valkey_test_case.py index 90b964e..b7b0699 100644 --- a/tests/valkeytests/valkey_test_case.py +++ b/tests/valkeytests/valkey_test_case.py @@ -93,6 +93,8 @@ def was_save_successful(self): def used_memory(self): return self.info['used_memory'] + def uptime_in_secs(self): + return self.info['uptime_in_seconds'] # An extension of the StrictValkey client # that supports additional Valkey functionality @@ -364,6 +366,10 @@ def wait_for_save_in_progress(self): def _wait_for_save_in_progress(self): return self.client.info_obj().is_save_in_progress() + def is_rdb_done_loading(self): + rdb_load_log = "Done loading RDB" + return self.verify_string_in_logfile(rdb_load_log) == True + class ValkeyTestCaseBase: testdir = "test-data" rdbdir = "rdbs"