Skip to content

Commit

Permalink
Merge pull request #3 from KarthikSubbarao/unstable
Browse files Browse the repository at this point in the history
Add unit testing for scaling & non scaling filters
  • Loading branch information
KarthikSubbarao authored Sep 18, 2024
2 parents a180f42 + 736a48a commit 6d0a4b6
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 5 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ bloomfilter = "1.0.13"
lazy_static = "1.4.0"
libc = "0.2"

[dev-dependencies]
rand = "0.8"

[lib]
crate-type = ["cdylib"]
name = "valkey_bloom"
Expand Down
4 changes: 2 additions & 2 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
.as_str()
{
"CAPACITY" => Ok(ValkeyValue::Integer(val.capacity())),
"SIZE" => Ok(ValkeyValue::Integer(val.get_memory_usage() as i64)),
"SIZE" => Ok(ValkeyValue::Integer(val.memory_usage() as i64)),
"FILTERS" => Ok(ValkeyValue::Integer(val.filters.len() as i64)),
"ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())),
"EXPANSION" => {
Expand All @@ -411,7 +411,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
ValkeyValue::SimpleStringStatic("Capacity"),
ValkeyValue::Integer(val.capacity()),
ValkeyValue::SimpleStringStatic("Size"),
ValkeyValue::Integer(val.get_memory_usage() as i64),
ValkeyValue::Integer(val.memory_usage() as i64),
ValkeyValue::SimpleStringStatic("Number of filters"),
ValkeyValue::Integer(val.filters.len() as i64),
ValkeyValue::SimpleStringStatic("Number of items inserted"),
Expand Down
195 changes: 194 additions & 1 deletion src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)";
pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)";
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR max number of scaling filters reached";

#[derive(Debug, PartialEq)]
pub enum BloomError {
NonScalingFilterFull,
MaxNumScalingFilters,
Expand Down Expand Up @@ -69,7 +70,7 @@ impl BloomFilterType {
}

/// Return the total memory usage of the BloomFilterType object.
pub fn get_memory_usage(&self) -> usize {
pub fn memory_usage(&self) -> usize {
let mut mem: usize = std::mem::size_of::<BloomFilterType>();
for filter in &self.filters {
mem += filter.number_of_bytes();
Expand Down Expand Up @@ -220,3 +221,195 @@ impl BloomFilter {
)
}
}

#[cfg(test)]
mod tests {
use super::*;
use rand::{distributions::Alphanumeric, Rng};

/// Returns random string with specified number of characters.
fn random_prefix(len: usize) -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(len)
.map(char::from)
.collect()
}

/// Loops until the capacity of the provided bloom filter is reached and adds a new item to it in every iteration.
/// The item name is rand_prefix + the index (starting from starting_item_idx).
/// With every add operation, fp_count is tracked as we expect the add operation to return 1, since it is a new item.
/// Returns the number of errors (false positives) and the final item index.
fn add_items_till_capacity(
bf: &mut BloomFilterType,
capacity_needed: i64,
starting_item_idx: i64,
rand_prefix: &String,
) -> (i64, i64) {
let mut new_item_idx = starting_item_idx;
let mut fp_count = 0;
while bf.cardinality() < capacity_needed {
let item = format!("{}{}", rand_prefix, new_item_idx);
let result = bf.add_item(item.as_bytes());
match result {
Ok(0) => {
fp_count += 1;
}
Ok(1) => {}
Ok(i64::MIN..=-1_i64) | Ok(2_i64..=i64::MAX) => {
panic!("We do not expect add_item to return any Integer other than 0 or 1.")
}
Err(e) => {
panic!("We do not expect add_item to throw errors on this scalable filter test, {:?}", e);
}
};
new_item_idx += 1;
}
(fp_count, new_item_idx)
}

/// Loops from the start index till the end index and uses the exists operation on the provided bloom filter.
/// The item name used in exists operations is rand_prefix + the index (based on the iteration).
/// The results are matched against the `expected_result` and an error_count tracks the wrong results.
/// Asserts that the error_count is within the expected false positive (+ margin) rate.
fn item_exists_test(
bf: &BloomFilterType,
start_idx: i64,
end_idx: i64,
expected_fp_rate: f32,
fp_margin: f32,
expected_result: bool,
rand_prefix: &String,
) {
let mut error_count = 0;
for i in start_idx..=end_idx {
let item = format!("{}{}", rand_prefix, i);
let result = bf.item_exists(item.as_bytes());
if result != expected_result {
error_count += 1;
}
}
let num_operations = (end_idx - start_idx) + 1;
// Validate that the real fp_rate is not much more than the configured fp_rate.
fp_assert(error_count, num_operations, expected_fp_rate, fp_margin);
}

fn fp_assert(error_count: i64, num_operations: i64, expected_fp_rate: f32, fp_margin: f32) {
let real_fp_rate = error_count as f32 / num_operations as f32;
let fp_rate_with_margin = expected_fp_rate + fp_margin;
assert!(
real_fp_rate < fp_rate_with_margin,
"The actual fp_rate, {}, is greater than the configured fp_rate with margin. {}.",
real_fp_rate,
fp_rate_with_margin
);
}

#[test]
fn test_non_scaling_filter() {
let rand_prefix = random_prefix(7);
// 1 in every 1000 operations is expected to be a false positive.
let expected_fp_rate: f32 = 0.001;
let initial_capacity = 10000;
// Expansion of 0 indicates non scaling.
let expansion = 0;
// Validate the non scaling behavior of the bloom filter.
let mut bf = BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion);
let (error_count, add_operation_idx) =
add_items_till_capacity(&mut bf, initial_capacity as i64, 1, &rand_prefix);
assert_eq!(
bf.add_item(b"new_item"),
Err(BloomError::NonScalingFilterFull)
);
assert_eq!(bf.capacity(), initial_capacity as i64);
assert_eq!(bf.cardinality(), initial_capacity as i64);
assert_eq!(bf.free_effort(), 1);
assert!(bf.memory_usage() > 0);
// Use a margin on the expected_fp_rate when asserting for correctness.
let fp_margin = 0.002;
// Validate that item "add" operations on bloom filters are ensuring correctness.
fp_assert(error_count, add_operation_idx, expected_fp_rate, fp_margin);
// Validate item "exists" operations on bloom filters are ensuring correctness.
// This tests for items already added to the filter and expects them to exist.
item_exists_test(
&bf,
1,
add_operation_idx,
expected_fp_rate,
fp_margin,
true,
&rand_prefix,
);
// This tests for items which are not added to the filter and expects them to not exist.
item_exists_test(
&bf,
add_operation_idx + 1,
add_operation_idx * 2,
expected_fp_rate,
fp_margin,
false,
&rand_prefix,
)
}

#[test]
fn test_scaling_filter() {
let rand_prefix = random_prefix(7);
// 1 in every 1000 operations is expected to be a false positive.
let expected_fp_rate: f32 = 0.001;
let initial_capacity = 10000;
let expansion = 2;
let num_filters_to_scale = 5;
let mut bf = BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion);
assert_eq!(bf.capacity(), initial_capacity as i64);
assert_eq!(bf.cardinality(), 0);
let mut total_error_count = 0;
let mut add_operation_idx = 1;
// Validate the scaling behavior of the bloom filter.
for filter_idx in 1..=num_filters_to_scale {
let expected_total_capacity = initial_capacity * (expansion.pow(filter_idx) - 1);
let (error_count, new_add_operation_idx) = add_items_till_capacity(
&mut bf,
expected_total_capacity as i64,
add_operation_idx,
&rand_prefix,
);
add_operation_idx = new_add_operation_idx;
total_error_count += error_count;
assert_eq!(bf.capacity(), expected_total_capacity as i64);
assert_eq!(bf.cardinality(), expected_total_capacity as i64);
assert_eq!(bf.free_effort(), filter_idx as usize);
assert!(bf.memory_usage() > 0);
}
// Use a margin on the expected_fp_rate when asserting for correctness.
let fp_margin = 0.002;
// Validate that item "add" operations on bloom filters are ensuring correctness.
fp_assert(
total_error_count,
add_operation_idx,
expected_fp_rate,
fp_margin,
);
// Validate item "exists" operations on bloom filters are ensuring correctness.
// This tests for items already added to the filter and expects them to exist.
item_exists_test(
&bf,
1,
add_operation_idx,
expected_fp_rate,
fp_margin,
true,
&rand_prefix,
);
// This tests for items which are not added to the filter and expects them to not exist.
item_exists_test(
&bf,
add_operation_idx + 1,
add_operation_idx * 2,
expected_fp_rate,
fp_margin,
false,
&rand_prefix,
)
}
}
2 changes: 1 addition & 1 deletion src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ lazy_static! {
}

/// Constants
pub const TIGHTENING_RATIO: f32 = 0.8515625;
pub const TIGHTENING_RATIO: f32 = 0.5;
pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX;
2 changes: 1 addition & 1 deletion src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub unsafe extern "C" fn bloom_free(value: *mut c_void) {
/// Compute the memory usage for a bloom object.
pub unsafe extern "C" fn bloom_mem_usage(value: *const c_void) -> usize {
let item = &*value.cast::<BloomFilterType>();
item.get_memory_usage()
item.memory_usage()
}

/// # Safety
Expand Down

0 comments on commit 6d0a4b6

Please sign in to comment.