Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tightening_ratio support #26

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX, TIGHTENING_RATIO_MIN,
};
use std::sync::atomic::Ordering;
use valkey_module::ContextFlags;
Expand Down Expand Up @@ -110,11 +110,13 @@ pub fn bloom_filter_add_value(
None => {
// Instantiate empty bloom filter.
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let tightening_ratio = configs::TIGHTENING_RATIO;
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
use_random_seed,
Expand Down Expand Up @@ -279,8 +281,10 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let tightening_ratio = configs::TIGHTENING_RATIO;
let bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
use_random_seed,
Expand All @@ -302,6 +306,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke

pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult {
let argc = input_args.len();
let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED);
// At the very least, we need: BF.INSERT <key> ITEMS <item>
if argc < 4 {
return Err(ValkeyError::WrongArity);
Expand All @@ -311,6 +316,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let filter_name = &input_args[idx];
idx += 1;
let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let mut tightening_ratio = configs::TIGHTENING_RATIO;
let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut nocreate = false;
Expand All @@ -331,6 +337,21 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"TIGHTENING" if replicated_cmd => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
}
idx += 1;
tightening_ratio = match input_args[idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num,
Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => {
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
}
_ => {
return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO));
}
};
}
"CAPACITY" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
Expand Down Expand Up @@ -387,7 +408,6 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut add_succeeded = false;
match value {
Some(bloom) => {
Expand All @@ -398,7 +418,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
bloom,
true,
&mut add_succeeded,
validate_size_limit,
!replicated_cmd,
);
replicate_and_notify_events(ctx, filter_name, add_succeeded, false);
response
Expand All @@ -410,10 +430,11 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
use_random_seed,
validate_size_limit,
!replicated_cmd,
) {
Ok(bf) => bf,
Err(err) => return Err(ValkeyError::Str(err.as_str())),
Expand All @@ -425,7 +446,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
&mut bloom,
true,
&mut add_succeeded,
validate_size_limit,
!replicated_cmd,
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(()) => {
Expand Down
5 changes: 5 additions & 0 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ impl ValkeyDataType for BloomFilterType {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};
let Ok(tightening_ratio) = raw::load_double(rdb) else {
return None;
};
let mut filters: Vec<BloomFilter> = Vec::with_capacity(num_filters as usize);
let Ok(is_seed_random_u64) = raw::load_unsigned(rdb) else {
return None;
Expand Down Expand Up @@ -121,6 +124,7 @@ impl ValkeyDataType for BloomFilterType {
let item = BloomFilterType {
expansion: expansion as u32,
fp_rate,
tightening_ratio,
is_seed_random,
filters,
};
Expand All @@ -131,6 +135,7 @@ impl ValkeyDataType for BloomFilterType {
fn debug_digest(&self, mut dig: Digest) {
dig.add_long_long(self.expansion.into());
dig.add_string_buffer(&self.fp_rate.to_le_bytes());
dig.add_string_buffer(&self.tightening_ratio.to_le_bytes());
for filter in &self.filters {
dig.add_string_buffer(filter.bloom.as_slice());
dig.add_long_long(filter.num_items.into());
Expand Down
55 changes: 43 additions & 12 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use super::data_type::BLOOM_TYPE_VERSION;
use crate::{
configs::{self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN},
configs::{
self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX,
TIGHTENING_RATIO_MIN,
},
metrics,
};
use bloomfilter;
Expand All @@ -22,6 +25,8 @@ pub const BAD_EXPANSION: &str = "ERR bad expansion";
pub const BAD_CAPACITY: &str = "ERR bad capacity";
pub const BAD_ERROR_RATE: &str = "ERR bad error rate";
pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)";
pub const BAD_ERROR_RATIO: &str = "ERR bad error ratio";
pub const ERROR_RATIO_RANGE: &str = "ERR (0 < error ratio range < 1)";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would having a better error message i.e. something like ratio must be in the range (0, 1) or is following convention for error rate range better?

pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)";
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters";
pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received";
Expand Down Expand Up @@ -67,6 +72,7 @@ impl BloomError {
pub struct BloomFilterType {
pub expansion: u32,
pub fp_rate: f64,
pub tightening_ratio: f64,
pub is_seed_random: bool,
pub filters: Vec<BloomFilter>,
}
Expand All @@ -75,6 +81,7 @@ impl BloomFilterType {
/// Create a new BloomFilterType object.
pub fn new_reserved(
fp_rate: f64,
tightening_ratio: f64,
capacity: u32,
expansion: u32,
use_random_seed: bool,
Expand All @@ -99,6 +106,7 @@ impl BloomFilterType {
let bloom = BloomFilterType {
expansion,
fp_rate,
tightening_ratio,
filters,
is_seed_random: use_random_seed,
};
Expand All @@ -120,6 +128,7 @@ impl BloomFilterType {
BloomFilterType {
expansion: from_bf.expansion,
fp_rate: from_bf.fp_rate,
tightening_ratio: from_bf.tightening_ratio,
is_seed_random: from_bf.is_seed_random,
filters,
}
Expand Down Expand Up @@ -264,12 +273,13 @@ impl BloomFilterType {
1 => {
// always use new version to init bloomFilterType.
// This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future.
let (expansion, fp_rate, is_seed_random, filters): (
let (expansion, fp_rate, tightening_ratio, is_seed_random, filters): (
u32,
f64,
f64,
bool,
Vec<BloomFilter>,
) = match bincode::deserialize::<(u32, f64, bool, Vec<BloomFilter>)>(
) = match bincode::deserialize::<(u32, f64, f64, bool, Vec<BloomFilter>)>(
&decoded_bytes[1..],
) {
Ok(values) => {
Expand All @@ -281,10 +291,13 @@ impl BloomFilterType {
if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) {
return Err(BloomError::ErrorRateRange);
}
if values.3.len() >= configs::MAX_FILTERS_PER_OBJ as usize {
if !(values.2 > TIGHTENING_RATIO_MIN && values.2 < TIGHTENING_RATIO_MAX) {
return Err(BloomError::ErrorRateRange);
}
if values.4.len() >= configs::MAX_FILTERS_PER_OBJ as usize {
return Err(BloomError::MaxNumScalingFilters);
}
for _filter in values.3.iter() {
for _filter in values.4.iter() {
// Reject the request, if the operation will result in creation of a filter of size greater than what is allowed.
if validate_size_limit
&& _filter.number_of_bytes()
Expand All @@ -303,6 +316,7 @@ impl BloomFilterType {
let item = BloomFilterType {
expansion,
fp_rate,
tightening_ratio,
is_seed_random,
filters,
};
Expand Down Expand Up @@ -579,6 +593,14 @@ mod tests {
.any(|filter| (filter.seed() == restore_filter.seed())
&& (restore_filter.seed() == configs::FIXED_SEED))));
}
assert_eq!(
restored_bloom_filter_type.fp_rate,
original_bloom_filter_type.fp_rate
);
assert_eq!(
restored_bloom_filter_type.tightening_ratio,
original_bloom_filter_type.tightening_ratio
);
assert_eq!(
restored_bloom_filter_type.capacity(),
original_bloom_filter_type.capacity()
Expand Down Expand Up @@ -633,12 +655,14 @@ mod tests {
let rand_prefix = random_prefix(7);
// 1 in every 1000 operations is expected to be a false positive.
let expected_fp_rate: f64 = 0.001;
let tightening_ratio: f64 = 0.5;
let initial_capacity = 10000;
// Expansion of 0 indicates non scaling.
let expansion = 0;
// Validate the non scaling behavior of the bloom filter.
let mut bf = BloomFilterType::new_reserved(
expected_fp_rate,
tightening_ratio,
initial_capacity,
expansion,
is_seed_random,
Expand Down Expand Up @@ -695,11 +719,13 @@ mod tests {
let rand_prefix = random_prefix(7);
// 1 in every 1000 operations is expected to be a false positive.
let expected_fp_rate: f64 = 0.001;
let tightening_ratio: f64 = 0.5;
let initial_capacity = 10000;
let expansion = 2;
let num_filters_to_scale = 5;
let mut bf = BloomFilterType::new_reserved(
expected_fp_rate,
tightening_ratio,
initial_capacity,
expansion,
is_seed_random,
Expand Down Expand Up @@ -779,18 +805,19 @@ mod tests {
fn test_exceeded_size_limit() {
// Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond
// the configured limit.
let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true, true);
let result = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, u32::MAX, 1, true, true);
assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize));
let capacity = 50000000;
assert!(!BloomFilter::validate_size(capacity, 0.001_f64));
let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true, true);
let result2 = BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, true, true);
assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize));
}

#[rstest(expansion, case::nonscaling(0), case::scaling(2))]
fn test_bf_encode_and_decode(expansion: u32) {
let mut bf =
BloomFilterType::new_reserved(0.5_f64, 1000_u32, expansion, true, true).unwrap();
BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, expansion, true, true)
.unwrap();
let item = "item1";
let _ = bf.add_item(item.as_bytes(), true);
// action
Expand All @@ -803,6 +830,7 @@ mod tests {
let new_bf = new_bf_result.unwrap();
// verify new_bf and bf
assert_eq!(bf.fp_rate, new_bf.fp_rate);
assert_eq!(bf.tightening_ratio, new_bf.tightening_ratio);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the verify_restored_items function, can we assert on this?
You can also do this assert for the false positive rate

        assert_eq!(
            restored_bloom_filter_type.tightenting_ratio,
            original_bloom_filter_type.tightenting_ratio
        );

assert_eq!(bf.expansion, new_bf.expansion);
assert_eq!(bf.capacity(), new_bf.capacity());
// verify item1 exists.
Expand All @@ -812,7 +840,8 @@ mod tests {
#[test]
fn test_bf_decode_when_unsupported_version_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap();
let mut bf =
BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true).unwrap();

Expand All @@ -834,7 +863,8 @@ mod tests {
#[test]
fn test_bf_decode_when_bytes_is_empty_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap();
let mut bf =
BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);

Expand All @@ -854,7 +884,8 @@ mod tests {
#[test]
fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap();
let mut bf =
BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);
let origin_expansion = bf.expansion;
Expand Down Expand Up @@ -882,7 +913,7 @@ mod tests {

// 3. build a larger than 64mb filter
let extra_large_filter =
BloomFilterType::new_reserved(0.01_f64, 57000000, 2, true, false).unwrap();
BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, true, false).unwrap();
let vec = extra_large_filter.encode_bloom_filter().unwrap();
// should return error
assert_eq!(
Expand Down
2 changes: 2 additions & 0 deletions src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ lazy_static! {
// Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to
// maintain the bloom object's overall fp_rate to the configured value.
pub const TIGHTENING_RATIO: f64 = 0.5;
pub const TIGHTENING_RATIO_MIN: f64 = 0.0;
pub const TIGHTENING_RATIO_MAX: f64 = 1.0;
// Max number of filters allowed within a bloom object.
pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX;
/// Below constants are fixed seed and sip keys to help create bloom objects using the same seed and to restore the bloom objects with the same hasher which
Expand Down
1 change: 1 addition & 0 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu
raw::save_unsigned(rdb, v.filters.len() as u64);
raw::save_unsigned(rdb, v.expansion as u64);
raw::save_double(rdb, v.fp_rate);
raw::save_double(rdb, v.tightening_ratio);
let mut is_seed_random = 0;
if v.is_seed_random {
is_seed_random = 1;
Expand Down
2 changes: 1 addition & 1 deletion tests/test_bloom_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from valkeytests.conftest import resource_port_tracker
from util.waiters import *

DEFAULT_BLOOM_FILTER_SIZE = 179952
DEFAULT_BLOOM_FILTER_SIZE = 179960
DEFAULT_BLOOM_FILTER_CAPACITY = 100000
class TestBloomMetrics(ValkeyBloomTestCaseBase):

Expand Down
Loading