From 3be545464f07cfdaf3e19a0c60d59a2d19433cdb Mon Sep 17 00:00:00 2001 From: Karthik Subbarao Date: Wed, 23 Oct 2024 01:39:50 +0000 Subject: [PATCH] Handle bloom object max allowed size limit, switch fp rate to f64, update defrag exemption logic and free effort logic --- .github/workflows/ci.yml | 4 +- build.sh | 2 +- src/bloom/command_handler.rs | 78 ++++++++++++++++++----- src/bloom/data_type.rs | 5 +- src/bloom/utils.rs | 117 +++++++++++++++++++++++----------- src/configs.rs | 20 ++++-- src/lib.rs | 1 + src/wrapper/bloom_callback.rs | 7 +- tests/test_basic.py | 38 +++++++++++ tests/test_bloom_command.py | 4 +- 10 files changed, 212 insertions(+), 64 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3c46f75..a73862c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,7 +34,7 @@ jobs: git clone "${{ env.VALKEY_REPO_URL }}" cd valkey git checkout ${{ matrix.server_version }} - make + make -j cp src/valkey-server ../binaries/${{ matrix.server_version }}/ - name: Set up Python uses: actions/setup-python@v3 @@ -87,7 +87,7 @@ jobs: git clone "${{ env.VALKEY_REPO_URL }}" cd valkey git checkout ${{ matrix.server_version }} - make SANITIZER=address SERVER_CFLAGS='-Werror' BUILD_TLS=module + make -j SANITIZER=address SERVER_CFLAGS='-Werror' BUILD_TLS=module cp src/valkey-server ../binaries/${{ matrix.server_version }}/ - name: Set up Python uses: actions/setup-python@v3 diff --git a/build.sh b/build.sh index 78b610b..d8d6781 100755 --- a/build.sh +++ b/build.sh @@ -42,7 +42,7 @@ else git clone "$REPO_URL" cd valkey git checkout "$SERVER_VERSION" - make + make -j cp src/valkey-server ../binaries/$SERVER_VERSION/ fi diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index 60f3fe5..9c29272 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -7,6 +7,7 @@ use crate::configs::{ BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, }; use std::sync::atomic::Ordering; +use valkey_module::ContextFlags; use valkey_module::NotifyEvent; use valkey_module::{Context, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, VALKEY_OK}; @@ -17,12 +18,13 @@ fn handle_bloom_add( bf: &mut BloomFilterType, multi: bool, add_succeeded: &mut bool, + validate_size_limit: bool, ) -> Result { match multi { true => { let mut result = Vec::new(); for item in args.iter().take(argc).skip(item_idx) { - match bf.add_item(item.as_slice()) { + match bf.add_item(item.as_slice(), validate_size_limit) { Ok(add_result) => { if add_result == 1 { *add_succeeded = true; @@ -39,7 +41,7 @@ fn handle_bloom_add( } false => { let item = args[item_idx].as_slice(); - match bf.add_item(item) { + match bf.add_item(item, validate_size_limit) { Ok(add_result) => { *add_succeeded = add_result == 1; Ok(ValkeyValue::Integer(add_result)) @@ -88,16 +90,18 @@ pub fn bloom_filter_add_value( return Err(ValkeyError::Str(utils::ERROR)); } }; + let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); let mut add_succeeded = false; match value { - Some(bf) => { + Some(bloom) => { let response = handle_bloom_add( input_args, argc, curr_cmd_idx, - bf, + bloom, multi, &mut add_succeeded, + validate_size_limit, ); replicate_and_notify_events(ctx, filter_name, add_succeeded, false); response @@ -107,16 +111,25 @@ pub fn bloom_filter_add_value( let fp_rate = configs::BLOOM_FP_RATE_DEFAULT; let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32; let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; - let mut bf = BloomFilterType::new_reserved(fp_rate, capacity, expansion); + let mut bloom = match BloomFilterType::new_reserved( + fp_rate, + capacity, + expansion, + validate_size_limit, + ) { + Ok(bf) => bf, + Err(err) => return Err(ValkeyError::Str(err.as_str())), + }; let response = handle_bloom_add( input_args, argc, curr_cmd_idx, - &mut bf, + &mut bloom, multi, &mut add_succeeded, + validate_size_limit, ); - match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) { + match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { Ok(()) => { replicate_and_notify_events(ctx, filter_name, add_succeeded, true); response @@ -204,7 +217,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke let filter_name = &input_args[curr_cmd_idx]; curr_cmd_idx += 1; // Parse the error rate - let fp_rate = match input_args[curr_cmd_idx].to_string_lossy().parse::() { + let fp_rate = match input_args[curr_cmd_idx].to_string_lossy().parse::() { Ok(num) if num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX => num, Ok(num) if !(num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX) => { return Err(ValkeyError::Str(utils::ERROR_RATE_RANGE)); @@ -260,7 +273,16 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke match value { Some(_) => Err(ValkeyError::Str(utils::ITEM_EXISTS)), None => { - let bloom = BloomFilterType::new_reserved(fp_rate, capacity, expansion); + let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); + let bloom = match BloomFilterType::new_reserved( + fp_rate, + capacity, + expansion, + validate_size_limit, + ) { + Ok(bf) => bf, + Err(err) => return Err(ValkeyError::Str(err.as_str())), + }; match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { Ok(()) => { replicate_and_notify_events(ctx, filter_name, false, true); @@ -293,7 +315,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey return Err(ValkeyError::WrongArity); } idx += 1; - fp_rate = match input_args[idx].to_string_lossy().parse::() { + fp_rate = match input_args[idx].to_string_lossy().parse::() { Ok(num) if num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX => num, Ok(num) if !(num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX) => { return Err(ValkeyError::Str(utils::ERROR_RATE_RANGE)); @@ -358,10 +380,19 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey return Err(ValkeyError::Str(utils::ERROR)); } }; + let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); let mut add_succeeded = false; match value { - Some(bf) => { - let response = handle_bloom_add(input_args, argc, idx, bf, true, &mut add_succeeded); + Some(bloom) => { + let response = handle_bloom_add( + input_args, + argc, + idx, + bloom, + true, + &mut add_succeeded, + validate_size_limit, + ); replicate_and_notify_events(ctx, filter_name, add_succeeded, false); response } @@ -369,10 +400,25 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey if nocreate { return Err(ValkeyError::Str(utils::NOT_FOUND)); } - let mut bf = BloomFilterType::new_reserved(fp_rate, capacity, expansion); - let response = - handle_bloom_add(input_args, argc, idx, &mut bf, true, &mut add_succeeded); - match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) { + let mut bloom = match BloomFilterType::new_reserved( + fp_rate, + capacity, + expansion, + validate_size_limit, + ) { + Ok(bf) => bf, + Err(err) => return Err(ValkeyError::Str(err.as_str())), + }; + let response = handle_bloom_add( + input_args, + argc, + idx, + &mut bloom, + true, + &mut add_succeeded, + validate_size_limit, + ); + match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { Ok(()) => { replicate_and_notify_events(ctx, filter_name, add_succeeded, true); response diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 326333c..7c174c2 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -9,7 +9,7 @@ use std::os::raw::c_int; use valkey_module::native_types::ValkeyType; use valkey_module::{logging, raw}; -const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 0; +const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 1; pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new( "bloomfltr", @@ -64,7 +64,7 @@ impl ValkeyDataType for BloomFilterType { let Ok(expansion) = raw::load_unsigned(rdb) else { return None; }; - let Ok(fp_rate) = raw::load_float(rdb) else { + let Ok(fp_rate) = raw::load_double(rdb) else { return None; }; for i in 0..num_filters { @@ -93,6 +93,7 @@ impl ValkeyDataType for BloomFilterType { (FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B), (FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B), ]; + // Perform a check here ?? let filter = BloomFilter::from_existing( bitmap.as_ref(), number_of_bits, diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 6a0fc2e..2be76b0 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -1,5 +1,6 @@ -use crate::configs::{FIXED_SEED, MAX_FILTERS_PER_OBJ, TIGHTENING_RATIO}; +use crate::configs; use bloomfilter; +use std::sync::atomic::Ordering; /// KeySpace Notification Events pub const ADD_EVENT: &str = "bloom.add"; @@ -16,13 +17,16 @@ pub const BAD_CAPACITY: &str = "ERR bad capacity"; pub const BAD_ERROR_RATE: &str = "ERR bad error rate"; pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)"; pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)"; -pub const MAX_NUM_SCALING_FILTERS: &str = "ERR max number of scaling filters reached"; +pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters"; pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received"; +pub const EXCEEDS_MAX_BLOOM_SIZE: &str = + "ERR operation results in filter allocation exceeding limit"; #[derive(Debug, PartialEq)] pub enum BloomError { NonScalingFilterFull, MaxNumScalingFilters, + ExceedsMaxBloomSize, } impl BloomError { @@ -30,6 +34,7 @@ impl BloomError { match self { BloomError::NonScalingFilterFull => NON_SCALING_FILTER_FULL, BloomError::MaxNumScalingFilters => MAX_NUM_SCALING_FILTERS, + BloomError::ExceedsMaxBloomSize => EXCEEDS_MAX_BLOOM_SIZE, } } } @@ -39,20 +44,32 @@ impl BloomError { /// This is a generic top level structure which is not coupled to any bloom crate. pub struct BloomFilterType { pub expansion: u32, - pub fp_rate: f32, + pub fp_rate: f64, pub filters: Vec, } impl BloomFilterType { /// Create a new BloomFilterType object. - pub fn new_reserved(fp_rate: f32, capacity: u32, expansion: u32) -> BloomFilterType { + pub fn new_reserved( + fp_rate: f64, + capacity: u32, + expansion: u32, + validate_size_limit: bool, + ) -> Result { + // Reject the request, if the operation will result in creation of a bloom object containing a filter + // of size greater than what is allowed. + if validate_size_limit && !BloomFilter::is_within_allowed_size(capacity, fp_rate) { + return Err(BloomError::ExceedsMaxBloomSize); + } + // Create the bloom filter and add to the main BloomFilter object. let bloom = BloomFilter::new(fp_rate, capacity); let filters = vec![bloom]; - BloomFilterType { + let bloom = BloomFilterType { expansion, fp_rate, filters, - } + }; + Ok(bloom) } /// Create a new BloomFilterType object from an existing one. @@ -83,9 +100,6 @@ impl BloomFilterType { /// Else, we return the number of filters as the free_effort. /// This is similar to how the core handles aggregated objects. pub fn free_effort(&self) -> usize { - if self.filters.is_empty() { - return 1; - } self.filters.len() } @@ -115,7 +129,7 @@ impl BloomFilterType { /// Add an item to the BloomFilterType object. /// If scaling is enabled, this can result in a new sub filter creation. - pub fn add_item(&mut self, item: &[u8]) -> Result { + pub fn add_item(&mut self, item: &[u8], validate_size_limit: bool) -> Result { // Check if item exists already. if self.item_exists(item) { return Ok(0); @@ -132,15 +146,28 @@ impl BloomFilterType { if self.expansion == 0 { return Err(BloomError::NonScalingFilterFull); } - if num_filters == MAX_FILTERS_PER_OBJ { + if num_filters == configs::MAX_FILTERS_PER_OBJ { return Err(BloomError::MaxNumScalingFilters); } - // Scale out by adding a new filter with capacity bounded within the u32 range. - let new_fp_rate = self.fp_rate * TIGHTENING_RATIO.powi(num_filters); + // Scale out by adding a new filter with capacity bounded within the u32 range. false positive rate is also + // bound within the range f64::MIN_POSITIVE <= x < 1.0. + let new_fp_rate = match self.fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) { + x if x > f64::MIN_POSITIVE => x, + _ => { + return Err(BloomError::MaxNumScalingFilters); + } + }; let new_capacity = match filter.capacity.checked_mul(self.expansion) { Some(new_capacity) => new_capacity, + // We could return MaxNumScalingFilters here. u32:max cannot be reached with 64MB limit. None => u32::MAX, }; + // Reject the request, if the operation will result in creation of a filter of size greater than what is allowed. + if validate_size_limit + && !BloomFilter::is_within_allowed_size(new_capacity, new_fp_rate) + { + return Err(BloomError::ExceedsMaxBloomSize); + } let mut new_filter = BloomFilter::new(new_fp_rate, new_capacity); // Add item. new_filter.set(item); @@ -152,12 +179,12 @@ impl BloomFilterType { } } -// Structure representing a single bloom filter. 200 Bytes. -// Using Crate: "bloomfilter" -// The reason for using u32 for num_items and capacity is because -// we have a limit on the memory usage of a `BloomFilter` to be 64MB. -// Based on this, we expect the number of items on the `BloomFilter` to be -// well within the u32::MAX limit. +/// Structure representing a single bloom filter. 200 Bytes. +/// Using Crate: "bloomfilter" +/// The reason for using u32 for num_items and capacity is because +/// we have a limit on the memory usage of a `BloomFilter` to be 64MB. +/// Based on this, we expect the number of items on the `BloomFilter` to be +/// well within the u32::MAX limit. pub struct BloomFilter { pub bloom: bloomfilter::Bloom<[u8]>, pub num_items: u32, @@ -166,11 +193,11 @@ pub struct BloomFilter { impl BloomFilter { /// Instantiate empty BloomFilter object. - pub fn new(fp_rate: f32, capacity: u32) -> BloomFilter { + pub fn new(fp_rate: f64, capacity: u32) -> BloomFilter { let bloom = bloomfilter::Bloom::new_for_fp_rate_with_seed( capacity as usize, - fp_rate as f64, - &FIXED_SEED, + fp_rate, + &configs::FIXED_SEED, ); BloomFilter { bloom, @@ -205,6 +232,18 @@ impl BloomFilter { std::mem::size_of::() + (self.bloom.number_of_bits() / 8) as usize } + /// Caculates the number of bytes that the bloom filter will require to be allocated. This is used before actually + /// creating the bloom filter when checking if the filter is within the allowed size. + /// Returns whether the bloom filter is of a valid size or not. + pub fn is_within_allowed_size(capacity: u32, fp_rate: f64) -> bool { + let bytes = bloomfilter::Bloom::<[u8]>::compute_bitmap_size(capacity as usize, fp_rate) + + std::mem::size_of::(); + if bytes > configs::BLOOM_MAX_MEMORY_USAGE.load(Ordering::Relaxed) as usize { + return false; + } + true + } + pub fn check(&self, item: &[u8]) -> bool { self.bloom.check(item) } @@ -258,7 +297,7 @@ mod tests { let mut cardinality = bf.cardinality(); while cardinality < capacity_needed { let item = format!("{}{}", rand_prefix, new_item_idx); - let result = bf.add_item(item.as_bytes()); + let result = bf.add_item(item.as_bytes(), true); match result { Ok(0) => { fp_count += 1; @@ -302,8 +341,8 @@ mod tests { (error_count, num_operations) } - fn fp_assert(error_count: i64, num_operations: i64, expected_fp_rate: f32, fp_margin: f32) { - let real_fp_rate = error_count as f32 / num_operations as f32; + fn fp_assert(error_count: i64, num_operations: i64, expected_fp_rate: f64, fp_margin: f64) { + let real_fp_rate = error_count as f64 / num_operations as f64; let fp_rate_with_margin = expected_fp_rate + fp_margin; assert!( real_fp_rate < fp_rate_with_margin, @@ -317,8 +356,8 @@ mod tests { original_bloom_filter_type: &BloomFilterType, restored_bloom_filter_type: &BloomFilterType, add_operation_idx: i64, - expected_fp_rate: f32, - fp_margin: f32, + expected_fp_rate: f64, + fp_margin: f64, rand_prefix: &String, ) { let expected_sip_keys = [ @@ -388,21 +427,24 @@ mod tests { fn test_non_scaling_filter() { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. - let expected_fp_rate: f32 = 0.001; + let expected_fp_rate: f64 = 0.001; let initial_capacity = 10000; // Expansion of 0 indicates non scaling. let expansion = 0; // Validate the non scaling behavior of the bloom filter. - let mut bf = BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion); + let mut bf = + BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true) + .expect("Expect bloom creation to succeed"); let (error_count, add_operation_idx) = add_items_till_capacity(&mut bf, initial_capacity as i64, 1, &rand_prefix); assert_eq!( - bf.add_item(b"new_item"), + bf.add_item(b"new_item", true), Err(BloomError::NonScalingFilterFull) ); assert_eq!(bf.capacity(), initial_capacity as i64); assert_eq!(bf.cardinality(), initial_capacity as i64); - assert_eq!(bf.free_effort(), 1); + let expected_free_effort = 1; + assert_eq!(bf.free_effort(), expected_free_effort); assert!(bf.memory_usage() > 0); // Use a margin on the expected_fp_rate when asserting for correctness. let fp_margin = 0.002; @@ -426,7 +468,7 @@ mod tests { // Verify restore let mut restore_bf = BloomFilterType::create_copy_from(&bf); assert_eq!( - restore_bf.add_item(b"new_item"), + restore_bf.add_item(b"new_item", true), Err(BloomError::NonScalingFilterFull) ); verify_restored_items( @@ -443,11 +485,13 @@ mod tests { fn test_scaling_filter() { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. - let expected_fp_rate: f32 = 0.001; + let expected_fp_rate: f64 = 0.001; let initial_capacity = 10000; let expansion = 2; let num_filters_to_scale = 5; - let mut bf = BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion); + let mut bf = + BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true) + .expect("Expect bloom creation to succeed"); assert_eq!(bf.capacity(), initial_capacity as i64); assert_eq!(bf.cardinality(), 0); let mut total_error_count = 0; @@ -465,7 +509,8 @@ mod tests { total_error_count += error_count; assert_eq!(bf.capacity(), expected_total_capacity as i64); assert_eq!(bf.cardinality(), expected_total_capacity as i64); - assert_eq!(bf.free_effort(), filter_idx as usize); + let expected_free_effort = filter_idx as usize; + assert_eq!(bf.free_effort(), expected_free_effort); assert!(bf.memory_usage() > 0); } // Use a margin on the expected_fp_rate when asserting for correctness. @@ -507,7 +552,7 @@ mod tests { #[test] fn test_sip_keys() { // The value of sip keys generated by the sip_keys with fixed seed should be equal to the constant in configs.rs - let test_bloom_filter = BloomFilter::new(0.5_f32, 1000_u32); + let test_bloom_filter = BloomFilter::new(0.5_f64, 1000_u32); let test_sip_keys = test_bloom_filter.bloom.sip_keys(); assert_eq!(test_sip_keys[0].0, FIXED_SIP_KEY_ONE_A); assert_eq!(test_sip_keys[0].1, FIXED_SIP_KEY_ONE_B); diff --git a/src/configs.rs b/src/configs.rs index 6b11229..643f8d9 100644 --- a/src/configs.rs +++ b/src/configs.rs @@ -10,17 +10,29 @@ pub const BLOOM_EXPANSION_DEFAULT: i64 = 2; pub const BLOOM_EXPANSION_MIN: u32 = 1; pub const BLOOM_EXPANSION_MAX: u32 = 10; -pub const BLOOM_FP_RATE_DEFAULT: f32 = 0.001; -pub const BLOOM_FP_RATE_MIN: f32 = 0.0; -pub const BLOOM_FP_RATE_MAX: f32 = 1.0; +pub const BLOOM_FP_RATE_DEFAULT: f64 = 0.001; +pub const BLOOM_FP_RATE_MIN: f64 = 0.0; +pub const BLOOM_FP_RATE_MAX: f64 = 1.0; + +// Max Memory usage allowed per bloom filter within a bloom object (64MB). +// Beyond this threshold, a bloom object is classified as large and is exempt from defrag operations. +// Also, write operations that result in bloom object allocation larger than this size will be rejected. +pub const BLOOM_MAX_MEMORY_USAGE_DEFAULT: i64 = 64 * 1024 * 1024; +pub const BLOOM_MAX_MEMORY_USAGE_MIN: i64 = 0; +pub const BLOOM_MAX_MEMORY_USAGE_MAX: i64 = i64::MAX; lazy_static! { pub static ref BLOOM_CAPACITY: AtomicI64 = AtomicI64::new(BLOOM_CAPACITY_DEFAULT); pub static ref BLOOM_EXPANSION: AtomicI64 = AtomicI64::new(BLOOM_EXPANSION_DEFAULT); + pub static ref BLOOM_MAX_MEMORY_USAGE: AtomicI64 = + AtomicI64::new(BLOOM_MAX_MEMORY_USAGE_DEFAULT); } /// Constants -pub const TIGHTENING_RATIO: f32 = 0.5; +// Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to +// maintain the bloom object's overall fp_rate to the configured value. +pub const TIGHTENING_RATIO: f64 = 0.5; +// Max number of filters allowed within a bloom object. pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX; /// Below constants are fixed seed and sip keys to help create bloom objects using the same seed and to restore the bloom objects with the same hasher which /// generated using rust crate bloomfilter https://crates.io/crates/bloomfilter diff --git a/src/lib.rs b/src/lib.rs index bb3a3a7..7374723 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,6 +82,7 @@ valkey_module! { i64: [ ["bloom-max-item-size", &*configs::BLOOM_CAPACITY, configs::BLOOM_CAPACITY_DEFAULT, configs::BLOOM_CAPACITY_MIN as i64, configs::BLOOM_CAPACITY_MAX as i64, ConfigurationFlags::DEFAULT, None], ["bloom-expansion-rate", &*configs::BLOOM_EXPANSION, configs::BLOOM_EXPANSION_DEFAULT, configs::BLOOM_EXPANSION_MIN as i64, configs::BLOOM_EXPANSION_MAX as i64, ConfigurationFlags::DEFAULT, None], + ["bloom-max-memory-usage", &*configs::BLOOM_MAX_MEMORY_USAGE, configs::BLOOM_MAX_MEMORY_USAGE_DEFAULT, configs::BLOOM_MAX_MEMORY_USAGE_MIN, configs::BLOOM_MAX_MEMORY_USAGE_MAX, ConfigurationFlags::DEFAULT, None], ], string: [ ], diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index 89489d5..c615bc8 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -1,8 +1,10 @@ use crate::bloom; use crate::bloom::data_type::ValkeyDataType; use crate::bloom::utils::BloomFilterType; +use crate::configs; use std::os::raw::{c_char, c_int, c_void}; use std::ptr::null_mut; +use std::sync::atomic::Ordering; use valkey_module::raw; use valkey_module::{RedisModuleDefragCtx, RedisModuleString}; @@ -15,7 +17,7 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu let v = &*value.cast::(); raw::save_unsigned(rdb, v.filters.len() as u64); raw::save_unsigned(rdb, v.expansion as u64); - raw::save_float(rdb, v.fp_rate); + raw::save_double(rdb, v.fp_rate); let filter_list = &v.filters; let mut filter_list_iter = filter_list.iter().peekable(); while let Some(filter) = filter_list_iter.next() { @@ -102,6 +104,9 @@ pub unsafe extern "C" fn bloom_defrag( value: *mut *mut c_void, ) -> i32 { let curr_item = &*(*value).cast::(); + if curr_item.memory_usage() > configs::BLOOM_MAX_MEMORY_USAGE.load(Ordering::Relaxed) as usize { + return 0; + } let new_item = BloomFilterType::create_copy_from(curr_item); let bb = Box::new(new_item); drop(Box::from_raw((*value).cast::())); diff --git a/tests/test_basic.py b/tests/test_basic.py index 9381fff..8701642 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -51,6 +51,44 @@ def test_memory_usage_cmd(self): info_size = client.execute_command('BF.INFO filter SIZE') assert memory_usage > info_size and info_size > 0 + def test_too_large_bloom_obj(self): + client = self.server.get_new_client() + assert client.execute_command('CONFIG SET bf.bloom-max-memory-usage 1000') == b'OK' + obj_exceeds_size_err = "operation results in filter allocation exceeding limit" + # Non Scaling + # Validate that when a cmd would have resulted in a bloom object creation with the starting filter with size + # greater than allowed limit, the cmd is rejected. + cmds = [ + 'BF.RESERVE filter 0.001 100000', + 'BF.INSERT filter error 0.00001 capacity 10000 items item1', + 'BF.ADD filter item1', + 'BF.MADD filter item1 item2', + ] + for cmd in cmds: + self.verify_error_response(self.client, cmd, obj_exceeds_size_err) + # Scaling + # Validate that when scaling would have resulted in a filter with size greater than allowed limit, the cmd + # is rejected. + cmds = [ + 'BF.INSERT filter items new_item1', + 'BF.ADD filter new_item1', + 'BF.MADD filter new_item1 new_item2', + ] + # Fill a filter to capacity. + assert client.execute_command('BF.RESERVE filter 0.001 100 EXPANSION 10') == b'OK' + error_count, add_operation_idx = self.add_items_till_capacity(client, "filter", 100, 1, "item_prefix") + assert client.execute_command('BF.INFO filter CAPACITY') == 100 + assert client.execute_command('BF.INFO filter ITEMS') == 100 + assert client.execute_command('BF.INFO filter SIZE') > 400 + assert client.execute_command('BF.INFO filter FILTERS') == 1 + # Validate that scale out is rejected with appropriate error. + for cmd in cmds: + if "BF.ADD" in cmd: + self.verify_error_response(self.client, cmd, obj_exceeds_size_err) + else: + response = client.execute_command(cmd) + assert obj_exceeds_size_err in str(response[0]) + def test_large_allocation_when_below_maxmemory(self): two_megabytes = 2 * 1024 * 1024 # The command below will result in an allocation greater than 2 MB. diff --git a/tests/test_bloom_command.py b/tests/test_bloom_command.py index f27e348..e888693 100644 --- a/tests/test_bloom_command.py +++ b/tests/test_bloom_command.py @@ -37,13 +37,13 @@ def test_bloom_command_error(self): ('bf.insert key CAPACITY 10000 ERROR 0.01 EXPANSION 0.99 NOCREATE NONSCALING ITEMS test1 test2 test3', 'bad expansion'), ('BF.INSERT KEY HELLO WORLD', 'unknown argument received'), ('BF.INSERT KEY error 2 ITEMS test1', '(0 < error rate range < 1)'), - ('BF.INSERT TEST_LIMIT ERROR 0.999999999 ITEMS ERROR_RATE', '(0 < error rate range < 1)'), + ('BF.INSERT TEST_LIMIT ERROR 0.99999999999999999 ITEMS ERROR_RATE', '(0 < error rate range < 1)'), ('BF.INSERT TEST_LIMIT CAPACITY 4394967295 ITEMS CAP', 'bad capacity'), ('BF.INSERT TEST_LIMIT CAPACITY 0 ITEMS CAP0', '(capacity should be larger than 0)'), ('BF.INSERT TEST_LIMIT EXPANSION 11 ITEMS EXPAN', 'bad expansion'), ('BF.INSERT TEST_NOCREATE NOCREATE ITEMS A B', 'not found'), ('BF.RESERVE KEY String 100', 'bad error rate'), - ('BF.RESERVE KEY 0.999999999 3000', '(0 < error rate range < 1)'), + ('BF.RESERVE KEY 0.99999999999999999 3000', '(0 < error rate range < 1)'), ('BF.RESERVE KEY 2 100', '(0 < error rate range < 1)'), ('BF.RESERVE KEY 0.01 String', 'bad capacity'), ('BF.RESERVE KEY 0.01 0.01', 'bad capacity'),