Skip to content

Commit

Permalink
rdb format optimization: using fixed seed for bloom filters (#2)
Browse files Browse the repository at this point in the history
RDB format optimization: Using a fixed seed for bloom filters

Signed-off-by: Vanessa Tang <yuetan@amazon.com>
  • Loading branch information
YueTang-Vanessa authored Sep 19, 2024
1 parent 145f9b2 commit bc178be
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 31 deletions.
34 changes: 15 additions & 19 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::configs::{
FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B,
};
use crate::wrapper::bloom_callback;
use crate::MODULE_NAME;
use std::os::raw::c_int;
Expand Down Expand Up @@ -62,7 +65,7 @@ pub fn bloom_rdb_load_data_object(
return None;
};
let mut filters = Vec::new();
for _ in 0..num_filters {
for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
};
Expand All @@ -72,28 +75,21 @@ pub fn bloom_rdb_load_data_object(
let Ok(number_of_hash_functions) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(sip_key_one_a) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(sip_key_one_b) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(sip_key_two_a) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(sip_key_two_b) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(num_items) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(capacity) = raw::load_unsigned(rdb) else {
return None;
};

// Only load num_items when it's the last filter
let num_items = if i == num_filters - 1 {
match raw::load_unsigned(rdb) {
Ok(num_items) => num_items,
Err(_) => return None,
}
} else {
capacity
};
let sip_keys = [
(sip_key_one_a, sip_key_one_b),
(sip_key_two_a, sip_key_two_b),
(FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B),
(FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B),
];
let filter = BloomFilter::from_existing(
bitmap.as_ref(),
Expand Down
126 changes: 121 additions & 5 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::configs::MAX_FILTERS_PER_OBJ;
use crate::configs::TIGHTENING_RATIO;
use crate::configs::{FIXED_SEED, MAX_FILTERS_PER_OBJ, TIGHTENING_RATIO};
use bloomfilter;

/// KeySpace Notification Events
Expand Down Expand Up @@ -167,7 +166,11 @@ pub struct BloomFilter {
impl BloomFilter {
/// Instantiate empty BloomFilter object.
pub fn new(fp_rate: f32, capacity: u32) -> BloomFilter {
let bloom = bloomfilter::Bloom::new_for_fp_rate(capacity as usize, fp_rate as f64);
let bloom = bloomfilter::Bloom::new_for_fp_rate_with_seed(
capacity as usize,
fp_rate as f64,
&FIXED_SEED,
);
BloomFilter {
bloom,
num_items: 0,
Expand Down Expand Up @@ -225,6 +228,9 @@ impl BloomFilter {
#[cfg(test)]
mod tests {
use super::*;
use crate::configs::{
FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B,
};
use rand::{distributions::Alphanumeric, Rng};

/// Returns random string with specified number of characters.
Expand Down Expand Up @@ -305,6 +311,79 @@ mod tests {
);
}

fn verify_restored_items(
original_bloom_filter_type: &BloomFilterType,
restored_bloom_filter_type: &BloomFilterType,
add_operation_idx: i64,
expected_fp_rate: f32,
fp_margin: f32,
rand_prefix: &String,
) {
let expected_sip_keys = [
(FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B),
(FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B),
];
assert_eq!(
restored_bloom_filter_type.capacity(),
original_bloom_filter_type.capacity()
);
assert_eq!(
restored_bloom_filter_type.cardinality(),
original_bloom_filter_type.cardinality(),
);
assert_eq!(
restored_bloom_filter_type.free_effort(),
original_bloom_filter_type.free_effort()
);
assert_eq!(
restored_bloom_filter_type.memory_usage(),
original_bloom_filter_type.memory_usage()
);
assert!(restored_bloom_filter_type
.filters
.iter()
.all(|restore_filter| original_bloom_filter_type
.filters
.iter()
.any(
|filter| (filter.bloom.sip_keys() == restore_filter.bloom.sip_keys())
&& (restore_filter.bloom.sip_keys() == expected_sip_keys)
)));
assert!(restored_bloom_filter_type
.filters
.iter()
.all(|restore_filter| original_bloom_filter_type
.filters
.iter()
.any(|filter| filter.bloom.number_of_hash_functions()
== restore_filter.bloom.number_of_hash_functions())));
assert!(restored_bloom_filter_type
.filters
.iter()
.all(|restore_filter| original_bloom_filter_type
.filters
.iter()
.any(|filter| filter.bloom.bitmap() == restore_filter.bloom.bitmap())));
item_exists_test(
restored_bloom_filter_type,
1,
add_operation_idx,
expected_fp_rate,
fp_margin,
true,
rand_prefix,
);
item_exists_test(
restored_bloom_filter_type,
add_operation_idx + 1,
add_operation_idx * 2,
expected_fp_rate,
fp_margin,
false,
rand_prefix,
);
}

#[test]
fn test_non_scaling_filter() {
let rand_prefix = random_prefix(7);
Expand Down Expand Up @@ -349,7 +428,22 @@ mod tests {
fp_margin,
false,
&rand_prefix,
)
);

// Verify restore
let mut restore_bf = BloomFilterType::create_copy_from(&bf);
assert_eq!(
restore_bf.add_item(b"new_item"),
Err(BloomError::NonScalingFilterFull)
);
verify_restored_items(
&bf,
&restore_bf,
add_operation_idx,
expected_fp_rate,
fp_margin,
&rand_prefix,
);
}

#[test]
Expand Down Expand Up @@ -410,6 +504,28 @@ mod tests {
fp_margin,
false,
&rand_prefix,
)
);

// Verify restore
let restore_bloom_filter_type = BloomFilterType::create_copy_from(&bf);
verify_restored_items(
&bf,
&restore_bloom_filter_type,
add_operation_idx,
expected_fp_rate,
fp_margin,
&rand_prefix,
);
}

#[test]
fn test_sip_keys() {
// The value of sip keys generated by the sip_keys with fixed seed should be equal to the constant in configs.rs
let test_bloom_filter = BloomFilter::new(0.5_f32, 1000_u32);
let test_sip_keys = test_bloom_filter.bloom.sip_keys();
assert_eq!(test_sip_keys[0].0, FIXED_SIP_KEY_ONE_A);
assert_eq!(test_sip_keys[0].1, FIXED_SIP_KEY_ONE_B);
assert_eq!(test_sip_keys[1].0, FIXED_SIP_KEY_TWO_A);
assert_eq!(test_sip_keys[1].1, FIXED_SIP_KEY_TWO_B);
}
}
10 changes: 10 additions & 0 deletions src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,13 @@ lazy_static! {
/// Constants
pub const TIGHTENING_RATIO: f32 = 0.5;
pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX;
/// Below constants are fixed seed and sip keys to help create bloom objects using the same seed and to restore the bloom objects with the same hasher which
/// generated using rust crate bloomfilter https://crates.io/crates/bloomfilter
pub const FIXED_SEED: [u8; 32] = [
89, 15, 245, 34, 234, 120, 17, 218, 167, 20, 216, 9, 59, 62, 123, 217, 29, 137, 138, 115, 62,
152, 136, 135, 48, 127, 151, 205, 40, 7, 51, 131,
];
pub const FIXED_SIP_KEY_ONE_A: u64 = 15713473521876537177;
pub const FIXED_SIP_KEY_ONE_B: u64 = 15671187751654921383;
pub const FIXED_SIP_KEY_TWO_A: u64 = 9766223185946773789;
pub const FIXED_SIP_KEY_TWO_B: u64 = 9453907914610147120;
12 changes: 5 additions & 7 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu
raw::save_unsigned(rdb, v.expansion as u64);
raw::save_float(rdb, v.fp_rate);
let filter_list = &v.filters;
for filter in filter_list {
let mut filter_list_iter = filter_list.iter().peekable();
while let Some(filter) = filter_list_iter.next() {
let bloom = &filter.bloom;
let bitmap = bloom.bitmap();
raw::RedisModule_SaveStringBuffer.unwrap()(
Expand All @@ -26,13 +27,10 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu
);
raw::save_unsigned(rdb, bloom.number_of_bits());
raw::save_unsigned(rdb, bloom.number_of_hash_functions() as u64);
let sip_keys = bloom.sip_keys();
raw::save_unsigned(rdb, sip_keys[0].0);
raw::save_unsigned(rdb, sip_keys[0].1);
raw::save_unsigned(rdb, sip_keys[1].0);
raw::save_unsigned(rdb, sip_keys[1].1);
raw::save_unsigned(rdb, filter.num_items as u64);
raw::save_unsigned(rdb, filter.capacity as u64);
if filter_list_iter.peek().is_none() {
raw::save_unsigned(rdb, filter.num_items as u64);
}
}
}

Expand Down
40 changes: 40 additions & 0 deletions tests/test_save_and_restore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import pytest, time
import os
from valkey_test_case import ValkeyTestCase

class TestBloomSaveRestore(ValkeyTestCase):

def get_custom_args(self):
self.set_server_version(os.environ['SERVER_VERSION'])
return {
'loadmodule': os.getenv('MODULE_PATH'),
}

def test_basic_save_and_restore(self):
client = self.server.get_new_client()
bf_add_result_1 = client.execute_command('BF.ADD testSave item')
assert bf_add_result_1 == 1
bf_exists_result_1 = client.execute_command('BF.EXISTS testSave item')
assert bf_exists_result_1 == 1
bf_info_result_1 = client.execute_command('BF.INFO testSave')
assert(len(bf_info_result_1)) != 0
curr_item_count_1 = client.info_obj().num_keys()

# save rdb, restart sever
client.bgsave()
self.server.wait_for_save_done()
uptime_in_sec_1 = self.client.info_obj().uptime_in_secs()
time.sleep(0.5)
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True)
uptime_in_sec_2 = self.client.info_obj().uptime_in_secs()
assert self.server.is_alive()
assert uptime_in_sec_1 > uptime_in_sec_2
assert self.server.is_rdb_done_loading()

# verify restore results
curr_item_count_2 = client.info_obj().num_keys()
assert curr_item_count_2 == curr_item_count_1
bf_exists_result_2 = client.execute_command('BF.EXISTS testSave item')
assert bf_exists_result_2 == 1
bf_info_result_2 = client.execute_command('BF.INFO testSave')
assert bf_info_result_2 == bf_info_result_1
6 changes: 6 additions & 0 deletions tests/valkeytests/valkey_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ def was_save_successful(self):
def used_memory(self):
return self.info['used_memory']

def uptime_in_secs(self):
return self.info['uptime_in_seconds']

# An extension of the StrictValkey client
# that supports additional Valkey functionality
Expand Down Expand Up @@ -364,6 +366,10 @@ def wait_for_save_in_progress(self):
def _wait_for_save_in_progress(self):
return self.client.info_obj().is_save_in_progress()

def is_rdb_done_loading(self):
rdb_load_log = "Done loading RDB"
return self.verify_string_in_logfile(rdb_load_log) == True

class ValkeyTestCaseBase:
testdir = "test-data"
rdbdir = "rdbs"
Expand Down

0 comments on commit bc178be

Please sign in to comment.