Skip to content

Commit

Permalink
Fixing ATLEASTCAPACITY calculation as well as adding MAXCAPACITY func…
Browse files Browse the repository at this point in the history
…tionality for info

Signed-off-by: zackcam <zackcam@amazon.com>
  • Loading branch information
zackcam committed Jan 29, 2025
1 parent 2be839e commit a2e007b
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 52 deletions.
55 changes: 45 additions & 10 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
true => (None, true),
false => (Some(configs::FIXED_SEED), false),
};
let mut wanted_capacity = -1;
let mut validate_scale_to = -1;
let mut nocreate = false;
let mut items_provided = false;
while idx < argc {
Expand Down Expand Up @@ -554,12 +554,12 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"ATLEASTCAPACITY" => {
"VALIDATESCALETO" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
}
idx += 1;
wanted_capacity = match input_args[idx].to_string_lossy().parse::<i64>() {
validate_scale_to = match input_args[idx].to_string_lossy().parse::<i64>() {
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
Ok(0) => {
return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0));
Expand All @@ -584,25 +584,25 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
// When the `ITEMS` argument is provided, we expect additional item arg/s to be provided.
return Err(ValkeyError::WrongArity);
}
// Check if we have a wanted capacity and calculate if we can reach that capacity
if wanted_capacity > 0 {
// Check if we have a wanted capacity and calculate if we can reach that capacity. Using VALIDATESCALETO and NONSCALING options together is invalid.
if validate_scale_to > 0 {
if expansion == 0 {
return Err(ValkeyError::Str(
utils::NON_SCALING_AND_WANTED_CAPACITY_IS_INVALID,
utils::NON_SCALING_AND_VALIDATE_SCALE_TO_IS_INVALID,
));
}
match utils::BloomObject::calculate_if_wanted_capacity_is_valid(
match utils::BloomObject::calculate_max_scaled_capacity(
capacity,
fp_rate,
wanted_capacity,
validate_scale_to,
tightening_ratio,
expansion,
) {
Ok(result) => result,
Ok(_) => (),
Err(e) => {
return Err(e);
}
}
};
}
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
Expand Down Expand Up @@ -714,12 +714,29 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
"SIZE" => Ok(ValkeyValue::Integer(val.memory_usage() as i64)),
"FILTERS" => Ok(ValkeyValue::Integer(val.num_filters() as i64)),
"ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())),
"ERROR" => Ok(ValkeyValue::Float(val.fp_rate())),
"EXPANSION" => {
if val.expansion() == 0 {
return Ok(ValkeyValue::Null);
}
Ok(ValkeyValue::Integer(val.expansion() as i64))
}
// Only calculate and expose MAXSCALEDCAPACITY for scaling bloom objects.
"MAXSCALEDCAPACITY" if val.expansion() > 0 => {
let max_capacity = match utils::BloomObject::calculate_max_scaled_capacity(
val.starting_capacity(),
val.fp_rate(),
-1,
val.tightening_ratio(),
val.expansion(),
) {
Ok(result) => result,
Err(e) => {
return Err(e);
}
};
Ok(ValkeyValue::Integer(max_capacity))
}
_ => Err(ValkeyError::Str(utils::INVALID_INFO_VALUE)),
}
}
Expand All @@ -733,13 +750,31 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
ValkeyValue::Integer(val.num_filters() as i64),
ValkeyValue::SimpleStringStatic("Number of items inserted"),
ValkeyValue::Integer(val.cardinality()),
ValkeyValue::SimpleStringStatic("Error rate"),
ValkeyValue::Float(val.fp_rate()),
ValkeyValue::SimpleStringStatic("Expansion rate"),
];
if val.expansion() == 0 {
result.push(ValkeyValue::Null);
} else {
result.push(ValkeyValue::Integer(val.expansion() as i64));
}
if val.expansion() != 0 {
let max_capacity = match utils::BloomObject::calculate_max_scaled_capacity(
val.starting_capacity(),
val.fp_rate(),
-1,
val.tightening_ratio(),
val.expansion(),
) {
Ok(result) => result,
Err(e) => {
return Err(e);
}
};
result.push(ValkeyValue::SimpleStringStatic("Max scaled capacity"));
result.push(ValkeyValue::Integer(max_capacity));
}
Ok(ValkeyValue::Array(result))
}
_ => Err(ValkeyError::Str(utils::NOT_FOUND)),
Expand Down
19 changes: 9 additions & 10 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,15 @@ impl ValkeyDataType for BloomObject {
let Ok(capacity) = raw::load_unsigned(rdb) else {
return None;
};
let new_fp_rate =
match Self::calculate_fp_rate(fp_rate, num_filters as i32, tightening_ratio) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning(
"Failed to restore bloom object: Reached max number of filters",
);
return None;
}
};
let new_fp_rate = match Self::calculate_fp_rate(fp_rate, i as i32, tightening_ratio) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning(
"Failed to restore bloom object: Reached max number of filters",
);
return None;
}
};
let curr_filter_size = BloomFilter::compute_size(capacity as i64, new_fp_rate);
let curr_object_size = BloomObject::compute_size(filters.capacity())
+ filters_memory_usage
Expand Down
154 changes: 131 additions & 23 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,20 @@ pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)";
pub const BAD_TIGHTENING_RATIO: &str = "ERR bad tightening ratio";
pub const TIGHTENING_RATIO_RANGE: &str = "ERR (0 < tightening ratio range < 1)";
pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)";
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters";
pub const FALSE_POSITIVE_DEGRADES_TO_O: &str = "ERR false positive degrades to 0 on scale out";
pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received";
pub const EXCEEDS_MAX_BLOOM_SIZE: &str = "ERR operation exceeds bloom object memory limit";
pub const WANTED_CAPACITY_EXCEEDS_MAX_SIZE: &str =
"ERR Wanted capacity would go beyond bloom object memory limit";
pub const WANTED_CAPACITY_FALSE_POSITIVE_INVALID: &str =
"ERR False positive degrades too much to reach wanted capacity";
pub const VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: &str =
"ERR provided VALIDATESCALETO causes bloom object to exceed memory limit";
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters";
pub const VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: &str =
"ERR provided VALIDATESCALETO causes false positive to degrade to 0";
pub const KEY_EXISTS: &str = "BUSYKEY Target key name already exists.";
pub const DECODE_BLOOM_OBJECT_FAILED: &str = "ERR bloom object decoding failed";
pub const DECODE_UNSUPPORTED_VERSION: &str =
"ERR bloom object decoding failed. Unsupported version";
pub const NON_SCALING_AND_WANTED_CAPACITY_IS_INVALID: &str =
"ERR Specifying NONSCALING and ATLEASTCAPCITY is not allowed";
pub const NON_SCALING_AND_VALIDATE_SCALE_TO_IS_INVALID: &str =
"ERR cannot use NONSCALING and VALIDATESCALETO options together";
/// Logging Error messages
pub const ENCODE_BLOOM_OBJECT_FAILED: &str = "Failed to encode bloom object.";

Expand All @@ -56,6 +57,8 @@ pub enum BloomError {
DecodeUnsupportedVersion,
ErrorRateRange,
BadExpansion,
FalsePositiveReachesZero,
BadCapacity,
}

impl BloomError {
Expand All @@ -69,6 +72,8 @@ impl BloomError {
BloomError::DecodeUnsupportedVersion => DECODE_UNSUPPORTED_VERSION,
BloomError::ErrorRateRange => ERROR_RATE_RANGE,
BloomError::BadExpansion => BAD_EXPANSION,
BloomError::FalsePositiveReachesZero => FALSE_POSITIVE_DEGRADES_TO_O,
BloomError::BadCapacity => BAD_CAPACITY,
}
}
}
Expand Down Expand Up @@ -248,6 +253,13 @@ impl BloomObject {
.expect("Every BloomObject is expected to have at least one filter")
.seed()
}
/// Return the starting capacity used by the Bloom object. This capacity is held within the first filter
pub fn starting_capacity(&self) -> i64 {
self.filters
.first()
.expect("Every BloomObject is expected to have at least one filter")
.capacity()
}

/// Return the expansion of the bloom object.
pub fn expansion(&self) -> u32 {
Expand Down Expand Up @@ -319,7 +331,7 @@ impl BloomObject {
Some(new_capacity) => new_capacity,
None => {
// u32:max cannot be reached with 64MB memory usage limit per filter even with a high fp rate (e.g. 0.9).
return Err(BloomError::MaxNumScalingFilters);
return Err(BloomError::BadCapacity);
}
};
// Reject the request, if the operation will result in creation of a filter of size greater than what is allowed.
Expand Down Expand Up @@ -373,7 +385,7 @@ impl BloomObject {
) -> Result<f64, BloomError> {
match fp_rate * tightening_ratio.powi(num_filters) {
x if x > f64::MIN_POSITIVE => Ok(x),
_ => Err(BloomError::MaxNumScalingFilters),
_ => Err(BloomError::FalsePositiveReachesZero),
}
}

Expand Down Expand Up @@ -463,42 +475,76 @@ impl BloomObject {
}
}

pub fn calculate_if_wanted_capacity_is_valid(
/// This method is called from two different bloom commands: BF.INFO and BF.INSERT. The functionality varies slightly on which command it
/// is called from. When called from BF.INFO, this method is used to find the maximum possible size that the bloom object could scale to
/// without throwing an error. When called from BF.INSERT, this method is used to determine if it is possible to reach the provided `validate_scale_to`.
///
/// # Arguments
///
/// * `capacity` - The size of the initial filter in the bloom object.
/// * `fp_rate` - the false positive rate for the bloom object
/// * `validate_scale_to` - the capacity we check to see if it can scale to. If this method is called from BF.INFO this is set as -1 as we
/// want to check the maximum size we could scale up till
/// * `tightening_ratio` - The tightening ratio of the object
/// * `expansion` - The expanison rate of the object
///
/// # Returns
/// * i64 - The maximum capacity that can be reached if called from BF.INFO. If called from BF.INSERT the size it reached when it became greater than `validate_scale_to`
/// * ValkeyError - Can return two different errors:
/// VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: When scaling to the wanted capacity would go over the bloom object memory limit
/// VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: When scaling to the wanted capacity would cause the false positive rate to reach 0
pub fn calculate_max_scaled_capacity(
capacity: i64,
fp_rate: f64,
wanted_capacity: i64,
validate_scale_to: i64,
tightening_ratio: f64,
expansion: u32,
) -> Result<(), ValkeyError> {
let mut curr_capacity = capacity;
let mut curr_num_filters: u64 = 1;
let mut curr_fp_rate = fp_rate;
) -> Result<i64, ValkeyError> {
let mut curr_filter_capacity = capacity;
let mut curr_total_capacity = 0;
let mut curr_num_filters: u64 = 0;
let mut filters_memory_usage = 0;
while curr_capacity < wanted_capacity {
curr_fp_rate = match BloomObject::calculate_fp_rate(
curr_fp_rate,
while curr_total_capacity < validate_scale_to || validate_scale_to == -1 {
// Check to see if scaling to the next filter will cause a degradation in FP to 0
let curr_fp_rate = match BloomObject::calculate_fp_rate(
fp_rate,
curr_num_filters as i32,
tightening_ratio,
) {
Ok(rate) => rate,
Err(_) => {
return Err(ValkeyError::Str(WANTED_CAPACITY_FALSE_POSITIVE_INVALID));
if validate_scale_to == -1 {
return Ok(curr_total_capacity);
}
return Err(ValkeyError::Str(VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID));
}
};
let curr_filter_size = BloomFilter::compute_size(curr_capacity, curr_fp_rate);
// Check that if it scales to this number of filters that the object won't exceed the memory limit
let curr_filter_size = BloomFilter::compute_size(curr_filter_capacity, curr_fp_rate);
// For vectors of size < 4 the capacity of the vector is 4. However after that the capacity is always a power of two above or equal to the size
let curr_object_size = BloomObject::compute_size(
std::cmp::max(4, curr_num_filters).next_power_of_two() as usize,
) + filters_memory_usage
+ curr_filter_size;
if !BloomObject::validate_size(curr_object_size) {
return Err(ValkeyError::Str(WANTED_CAPACITY_EXCEEDS_MAX_SIZE));
if validate_scale_to == -1 {
return Ok(curr_total_capacity);
}
return Err(ValkeyError::Str(VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE));
}
// Update overall memory usage
filters_memory_usage += curr_filter_size;
curr_capacity *= expansion as i64;
curr_total_capacity += curr_filter_capacity;
curr_filter_capacity = match curr_filter_capacity.checked_mul(expansion.into()) {
Some(new_capacity) => new_capacity,
None => {
// u32:max cannot be reached with 64MB memory usage limit per filter even with a high fp rate (e.g. 0.9).
return Err(ValkeyError::Str(BAD_CAPACITY));
}
};
curr_num_filters += 1;
}
Ok(())
Ok(curr_total_capacity)
}
}

Expand Down Expand Up @@ -658,6 +704,7 @@ impl Drop for BloomFilter {
#[cfg(test)]
mod tests {
use super::*;
use crate::configs::TIGHTENING_RATIO_DEFAULT;
use configs;
use rand::{distributions::Alphanumeric, Rng};
use rstest::rstest;
Expand Down Expand Up @@ -1006,6 +1053,10 @@ mod tests {
let test_bloom_filter2 = BloomFilter::with_random_seed(0.5_f64, 1000_i64);
let test_seed2 = test_bloom_filter2.seed();
assert_ne!(test_seed2, configs::FIXED_SEED);
// Check that the random seed changes for each BloomFilter
let test_bloom_filter3 = BloomFilter::with_random_seed(0.5_f64, 1000_i64);
let test_seed3 = test_bloom_filter3.seed();
assert_ne!(test_seed2, test_seed3);
}

#[test]
Expand All @@ -1024,6 +1075,63 @@ mod tests {
assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize));
}

#[rstest]
#[case(1000, 0.01, 10000, 2, 15000)]
#[case(10000, 0.001, 100000, 4, 210000)]
#[case(50000, 0.0001, 500000, 3, 650000)]
#[case(100000, 0.00001, 1000000, 2, 1500000)]
#[case(100, 0.00001, 1000, 1, 1000)]
fn test_calculate_max_scaled_capacity(
#[case] capacity: i64,
#[case] fp_rate: f64,
#[case] validate_scale_to: i64,
#[case] expansion: u32,
#[case] resulting_size: i64,
) {
// Validate that max scaled capacity returns the correct capacity reached when a valid can_scale to is provided
let returned_size = BloomObject::calculate_max_scaled_capacity(
capacity,
fp_rate,
validate_scale_to,
TIGHTENING_RATIO_DEFAULT
.parse()
.expect("global config should always be 0.5"),
expansion,
);
assert_eq!(resulting_size, returned_size.unwrap());
if expansion == 1 {
// Validate that when false positive rate reaches 0 we get the correct error message returned
let failed_returned_size = BloomObject::calculate_max_scaled_capacity(
capacity,
fp_rate,
validate_scale_to * 10000,
TIGHTENING_RATIO_DEFAULT
.parse()
.expect("global config should always be 0.5"),
expansion,
);
assert!(failed_returned_size
.unwrap_err()
.to_string()
.contains("provided VALIDATESCALETO causes false positive to degrade to 0"));
} else {
// Validate that when going over the max capacity in validate_scale_to then we get the correct error message returned
let failed_returned_size = BloomObject::calculate_max_scaled_capacity(
capacity,
fp_rate,
validate_scale_to * 10000,
TIGHTENING_RATIO_DEFAULT
.parse()
.expect("global config should always be 0.5"),
expansion,
);
assert!(failed_returned_size
.unwrap_err()
.to_string()
.contains("provided VALIDATESCALETO causes bloom object to exceed memory limit"));
}
}

#[rstest(expansion, case::nonscaling(0), case::scaling(2))]
fn test_bf_encode_and_decode(expansion: u32) {
let mut bf =
Expand Down
Loading

0 comments on commit a2e007b

Please sign in to comment.