Skip to content

Commit

Permalink
Add tightening_ratio support in BloomFilterType structure
Browse files Browse the repository at this point in the history
Signed-off-by: Nihal Mehta <nnmehta@amazon.com>
  • Loading branch information
nnmehta committed Dec 5, 2024
1 parent 1b0d819 commit de6ec44
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 18 deletions.
31 changes: 26 additions & 5 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX, TIGHTENING_RATIO_MIN,
};
use std::sync::atomic::Ordering;
use valkey_module::ContextFlags;
Expand Down Expand Up @@ -110,11 +110,13 @@ pub fn bloom_filter_add_value(
None => {
// Instantiate empty bloom filter.
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let tightening_ratio = configs::TIGHTENING_RATIO;
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
use_random_seed,
Expand Down Expand Up @@ -279,8 +281,10 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let tightening_ratio = configs::TIGHTENING_RATIO;
let bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
use_random_seed,
Expand All @@ -302,6 +306,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke

pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult {
let argc = input_args.len();
let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED);
// At the very least, we need: BF.INSERT <key> ITEMS <item>
if argc < 4 {
return Err(ValkeyError::WrongArity);
Expand All @@ -311,6 +316,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let filter_name = &input_args[idx];
idx += 1;
let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let mut tightening_ratio = configs::TIGHTENING_RATIO;
let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut nocreate = false;
Expand All @@ -331,6 +337,21 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"TIGHTENING" if replicated_cmd => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
}
idx += 1;
tightening_ratio = match input_args[idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num,
Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => {
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
}
_ => {
return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO));
}
};
}
"CAPACITY" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
Expand Down Expand Up @@ -387,7 +408,6 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut add_succeeded = false;
match value {
Some(bloom) => {
Expand All @@ -398,7 +418,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
bloom,
true,
&mut add_succeeded,
validate_size_limit,
!replicated_cmd,
);
replicate_and_notify_events(ctx, filter_name, add_succeeded, false);
response
Expand All @@ -410,10 +430,11 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
use_random_seed,
validate_size_limit,
!replicated_cmd,
) {
Ok(bf) => bf,
Err(err) => return Err(ValkeyError::Str(err.as_str())),
Expand All @@ -425,7 +446,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
&mut bloom,
true,
&mut add_succeeded,
validate_size_limit,
!replicated_cmd,
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(()) => {
Expand Down
5 changes: 5 additions & 0 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ impl ValkeyDataType for BloomFilterType {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};
let Ok(tightening_ratio) = raw::load_double(rdb) else {
return None;
};
let mut filters: Vec<BloomFilter> = Vec::with_capacity(num_filters as usize);
let Ok(is_seed_random_u64) = raw::load_unsigned(rdb) else {
return None;
Expand Down Expand Up @@ -121,6 +124,7 @@ impl ValkeyDataType for BloomFilterType {
let item = BloomFilterType {
expansion: expansion as u32,
fp_rate,
tightening_ratio,
is_seed_random,
filters,
};
Expand All @@ -131,6 +135,7 @@ impl ValkeyDataType for BloomFilterType {
fn debug_digest(&self, mut dig: Digest) {
dig.add_long_long(self.expansion.into());
dig.add_string_buffer(&self.fp_rate.to_le_bytes());
dig.add_string_buffer(&self.tightening_ratio.to_le_bytes());
for filter in &self.filters {
dig.add_string_buffer(filter.bloom.as_slice());
dig.add_long_long(filter.num_items.into());
Expand Down
55 changes: 43 additions & 12 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use super::data_type::BLOOM_TYPE_VERSION;
use crate::{
configs::{self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN},
configs::{
self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX,
TIGHTENING_RATIO_MIN,
},
metrics,
};
use bloomfilter;
Expand All @@ -22,6 +25,8 @@ pub const BAD_EXPANSION: &str = "ERR bad expansion";
pub const BAD_CAPACITY: &str = "ERR bad capacity";
pub const BAD_ERROR_RATE: &str = "ERR bad error rate";
pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)";
pub const BAD_ERROR_RATIO: &str = "ERR bad error ratio";
pub const ERROR_RATIO_RANGE: &str = "ERR (0 < error ratio range < 1)";
pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)";
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters";
pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received";
Expand Down Expand Up @@ -67,6 +72,7 @@ impl BloomError {
pub struct BloomFilterType {
pub expansion: u32,
pub fp_rate: f64,
pub tightening_ratio: f64,
pub is_seed_random: bool,
pub filters: Vec<BloomFilter>,
}
Expand All @@ -75,6 +81,7 @@ impl BloomFilterType {
/// Create a new BloomFilterType object.
pub fn new_reserved(
fp_rate: f64,
tightening_ratio: f64,
capacity: u32,
expansion: u32,
use_random_seed: bool,
Expand All @@ -99,6 +106,7 @@ impl BloomFilterType {
let bloom = BloomFilterType {
expansion,
fp_rate,
tightening_ratio,
filters,
is_seed_random: use_random_seed,
};
Expand All @@ -120,6 +128,7 @@ impl BloomFilterType {
BloomFilterType {
expansion: from_bf.expansion,
fp_rate: from_bf.fp_rate,
tightening_ratio: from_bf.tightening_ratio,
is_seed_random: from_bf.is_seed_random,
filters,
}
Expand Down Expand Up @@ -264,12 +273,13 @@ impl BloomFilterType {
1 => {
// always use new version to init bloomFilterType.
// This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future.
let (expansion, fp_rate, is_seed_random, filters): (
let (expansion, fp_rate, tightening_ratio, is_seed_random, filters): (
u32,
f64,
f64,
bool,
Vec<BloomFilter>,
) = match bincode::deserialize::<(u32, f64, bool, Vec<BloomFilter>)>(
) = match bincode::deserialize::<(u32, f64, f64, bool, Vec<BloomFilter>)>(
&decoded_bytes[1..],
) {
Ok(values) => {
Expand All @@ -281,10 +291,13 @@ impl BloomFilterType {
if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) {
return Err(BloomError::ErrorRateRange);
}
if values.3.len() >= configs::MAX_FILTERS_PER_OBJ as usize {
if !(values.2 > TIGHTENING_RATIO_MIN && values.2 < TIGHTENING_RATIO_MAX) {
return Err(BloomError::ErrorRateRange);
}
if values.4.len() >= configs::MAX_FILTERS_PER_OBJ as usize {
return Err(BloomError::MaxNumScalingFilters);
}
for _filter in values.3.iter() {
for _filter in values.4.iter() {
// Reject the request, if the operation will result in creation of a filter of size greater than what is allowed.
if validate_size_limit
&& _filter.number_of_bytes()
Expand All @@ -303,6 +316,7 @@ impl BloomFilterType {
let item = BloomFilterType {
expansion,
fp_rate,
tightening_ratio,
is_seed_random,
filters,
};
Expand Down Expand Up @@ -579,6 +593,14 @@ mod tests {
.any(|filter| (filter.seed() == restore_filter.seed())
&& (restore_filter.seed() == configs::FIXED_SEED))));
}
assert_eq!(
restored_bloom_filter_type.fp_rate,
original_bloom_filter_type.fp_rate
);
assert_eq!(
restored_bloom_filter_type.tightening_ratio,
original_bloom_filter_type.tightening_ratio
);
assert_eq!(
restored_bloom_filter_type.capacity(),
original_bloom_filter_type.capacity()
Expand Down Expand Up @@ -633,12 +655,14 @@ mod tests {
let rand_prefix = random_prefix(7);
// 1 in every 1000 operations is expected to be a false positive.
let expected_fp_rate: f64 = 0.001;
let tightening_ratio: f64 = 0.5;
let initial_capacity = 10000;
// Expansion of 0 indicates non scaling.
let expansion = 0;
// Validate the non scaling behavior of the bloom filter.
let mut bf = BloomFilterType::new_reserved(
expected_fp_rate,
tightening_ratio,
initial_capacity,
expansion,
is_seed_random,
Expand Down Expand Up @@ -695,11 +719,13 @@ mod tests {
let rand_prefix = random_prefix(7);
// 1 in every 1000 operations is expected to be a false positive.
let expected_fp_rate: f64 = 0.001;
let tightening_ratio: f64 = 0.5;
let initial_capacity = 10000;
let expansion = 2;
let num_filters_to_scale = 5;
let mut bf = BloomFilterType::new_reserved(
expected_fp_rate,
tightening_ratio,
initial_capacity,
expansion,
is_seed_random,
Expand Down Expand Up @@ -779,18 +805,19 @@ mod tests {
fn test_exceeded_size_limit() {
// Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond
// the configured limit.
let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true, true);
let result = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, u32::MAX, 1, true, true);
assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize));
let capacity = 50000000;
assert!(!BloomFilter::validate_size(capacity, 0.001_f64));
let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true, true);
let result2 = BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, true, true);
assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize));
}

#[rstest(expansion, case::nonscaling(0), case::scaling(2))]
fn test_bf_encode_and_decode(expansion: u32) {
let mut bf =
BloomFilterType::new_reserved(0.5_f64, 1000_u32, expansion, true, true).unwrap();
BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, expansion, true, true)
.unwrap();
let item = "item1";
let _ = bf.add_item(item.as_bytes(), true);
// action
Expand All @@ -803,6 +830,7 @@ mod tests {
let new_bf = new_bf_result.unwrap();
// verify new_bf and bf
assert_eq!(bf.fp_rate, new_bf.fp_rate);
assert_eq!(bf.tightening_ratio, new_bf.tightening_ratio);
assert_eq!(bf.expansion, new_bf.expansion);
assert_eq!(bf.capacity(), new_bf.capacity());
// verify item1 exists.
Expand All @@ -812,7 +840,8 @@ mod tests {
#[test]
fn test_bf_decode_when_unsupported_version_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap();
let mut bf =
BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true).unwrap();

Expand All @@ -834,7 +863,8 @@ mod tests {
#[test]
fn test_bf_decode_when_bytes_is_empty_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap();
let mut bf =
BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);

Expand All @@ -854,7 +884,8 @@ mod tests {
#[test]
fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap();
let mut bf =
BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);
let origin_expansion = bf.expansion;
Expand Down Expand Up @@ -882,7 +913,7 @@ mod tests {

// 3. build a larger than 64mb filter
let extra_large_filter =
BloomFilterType::new_reserved(0.01_f64, 57000000, 2, true, false).unwrap();
BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, true, false).unwrap();
let vec = extra_large_filter.encode_bloom_filter().unwrap();
// should return error
assert_eq!(
Expand Down
2 changes: 2 additions & 0 deletions src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ lazy_static! {
// Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to
// maintain the bloom object's overall fp_rate to the configured value.
pub const TIGHTENING_RATIO: f64 = 0.5;
pub const TIGHTENING_RATIO_MIN: f64 = 0.0;
pub const TIGHTENING_RATIO_MAX: f64 = 1.0;
// Max number of filters allowed within a bloom object.
pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX;
/// Below constants are fixed seed and sip keys to help create bloom objects using the same seed and to restore the bloom objects with the same hasher which
Expand Down
1 change: 1 addition & 0 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu
raw::save_unsigned(rdb, v.filters.len() as u64);
raw::save_unsigned(rdb, v.expansion as u64);
raw::save_double(rdb, v.fp_rate);
raw::save_double(rdb, v.tightening_ratio);
let mut is_seed_random = 0;
if v.is_seed_random {
is_seed_random = 1;
Expand Down
2 changes: 1 addition & 1 deletion tests/test_bloom_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from valkeytests.conftest import resource_port_tracker
from util.waiters import *

DEFAULT_BLOOM_FILTER_SIZE = 179952
DEFAULT_BLOOM_FILTER_SIZE = 179960
DEFAULT_BLOOM_FILTER_CAPACITY = 100000
class TestBloomMetrics(ValkeyBloomTestCaseBase):

Expand Down

0 comments on commit de6ec44

Please sign in to comment.