From 6904b88275c5bb835be5071a19c4f137b9053448 Mon Sep 17 00:00:00 2001 From: zackcam Date: Sat, 25 Jan 2025 00:10:55 +0000 Subject: [PATCH] Fixing ATLEASTCAPACITY calculation as well as adding MAXCAPACITY functionality for info Signed-off-by: zackcam --- src/bloom/command_handler.rs | 59 +++++++++++++++--- src/bloom/utils.rs | 89 +++++++++++++++++++-------- tests/test_bloom_command.py | 8 ++- tests/test_bloom_correctness.py | 31 ++++++++++ tests/valkeytests/valkey_test_case.py | 6 -- 5 files changed, 152 insertions(+), 41 deletions(-) diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index bb3006b..75dca2c 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -462,7 +462,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey true => (None, true), false => (Some(configs::FIXED_SEED), false), }; - let mut wanted_capacity = -1; + let mut can_scale_to = -1; let mut nocreate = false; let mut items_provided = false; while idx < argc { @@ -554,12 +554,12 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } }; } - "ATLEASTCAPACITY" => { + "CANSCALETO" => { if idx >= (argc - 1) { return Err(ValkeyError::WrongArity); } idx += 1; - wanted_capacity = match input_args[idx].to_string_lossy().parse::() { + can_scale_to = match input_args[idx].to_string_lossy().parse::() { Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num, Ok(0) => { return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0)); @@ -585,16 +585,16 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey return Err(ValkeyError::WrongArity); } // Check if we have a wanted capacity and calculate if we can reach that capacity - if wanted_capacity > 0 { + if can_scale_to > 0 { if expansion == 0 { return Err(ValkeyError::Str( - utils::NON_SCALING_AND_WANTED_CAPACITY_IS_INVALID, + utils::NON_SCALING_AND_CAN_SCALE_TO_IS_INVALID, )); } - match utils::BloomObject::calculate_if_wanted_capacity_is_valid( + match utils::BloomObject::calculate_if_can_scale_to_is_valid( capacity, fp_rate, - wanted_capacity, + can_scale_to, tightening_ratio, expansion, ) { @@ -602,7 +602,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey Err(e) => { return Err(e); } - } + }; } // If the filter does not exist, create one let filter_key = ctx.open_key_writable(filter_name); @@ -714,12 +714,34 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe "SIZE" => Ok(ValkeyValue::Integer(val.memory_usage() as i64)), "FILTERS" => Ok(ValkeyValue::Integer(val.num_filters() as i64)), "ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())), + "ERROR" => Ok(ValkeyValue::Float(val.fp_rate())), "EXPANSION" => { if val.expansion() == 0 { return Ok(ValkeyValue::Null); } Ok(ValkeyValue::Integer(val.expansion() as i64)) } + "MAXCAPACITY" => { + if val.expansion() == 0 { + return Ok(ValkeyValue::Integer(val.capacity())); + } + let max_capacity = match utils::BloomObject::calculate_if_can_scale_to_is_valid( + val.filters() + .first() + .expect("Filter will be populated") + .capacity(), + val.fp_rate(), + -1, + val.tightening_ratio(), + val.expansion(), + ) { + Ok(result) => result, + Err(e) => { + return Err(e); + } + }; + Ok(ValkeyValue::Integer(max_capacity)) + } _ => Err(ValkeyError::Str(utils::INVALID_INFO_VALUE)), } } @@ -733,6 +755,8 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe ValkeyValue::Integer(val.num_filters() as i64), ValkeyValue::SimpleStringStatic("Number of items inserted"), ValkeyValue::Integer(val.cardinality()), + ValkeyValue::SimpleStringStatic("Error rate"), + ValkeyValue::Float(val.fp_rate()), ValkeyValue::SimpleStringStatic("Expansion rate"), ]; if val.expansion() == 0 { @@ -740,6 +764,25 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe } else { result.push(ValkeyValue::Integer(val.expansion() as i64)); } + if val.expansion() != 0 { + let max_capacity = match utils::BloomObject::calculate_if_can_scale_to_is_valid( + val.filters() + .first() + .expect("Filter will be populated") + .capacity(), + val.fp_rate(), + -1, + val.tightening_ratio(), + val.expansion(), + ) { + Ok(result) => result, + Err(e) => { + return Err(e); + } + }; + result.push(ValkeyValue::SimpleStringStatic("Maximum Capacity")); + result.push(ValkeyValue::Integer(max_capacity)); + } Ok(ValkeyValue::Array(result)) } _ => Err(ValkeyError::Str(utils::NOT_FOUND)), diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 67ef1d1..c738827 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -30,19 +30,20 @@ pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)"; pub const BAD_TIGHTENING_RATIO: &str = "ERR bad tightening ratio"; pub const TIGHTENING_RATIO_RANGE: &str = "ERR (0 < tightening ratio range < 1)"; pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)"; -pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters"; +pub const FALSE_POSITIVE_DEGRADES_TO_O: &str = "ERR false positive degrades to 0 on scale out"; pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received"; pub const EXCEEDS_MAX_BLOOM_SIZE: &str = "ERR operation exceeds bloom object memory limit"; -pub const WANTED_CAPACITY_EXCEEDS_MAX_SIZE: &str = - "ERR Wanted capacity would go beyond bloom object memory limit"; -pub const WANTED_CAPACITY_FALSE_POSITIVE_INVALID: &str = - "ERR False positive degrades too much to reach wanted capacity"; +pub const CAN_SCALE_TO_EXCEEDS_MAX_SIZE: &str = + "ERR provided CANSCALETO causes bloom object to exceed memory limit"; +pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters"; +pub const CAN_SCALE_TO_FALSE_POSITIVE_INVALID: &str = + "ERR provided CANSCALETO causes false positive to degrades to 0"; pub const KEY_EXISTS: &str = "BUSYKEY Target key name already exists."; pub const DECODE_BLOOM_OBJECT_FAILED: &str = "ERR bloom object decoding failed"; pub const DECODE_UNSUPPORTED_VERSION: &str = "ERR bloom object decoding failed. Unsupported version"; -pub const NON_SCALING_AND_WANTED_CAPACITY_IS_INVALID: &str = - "ERR Specifying NONSCALING and ATLEASTCAPCITY is not allowed"; +pub const NON_SCALING_AND_CAN_SCALE_TO_IS_INVALID: &str = + "ERR cannot use NONSCALING and CANSCALETO options together"; /// Logging Error messages pub const ENCODE_BLOOM_OBJECT_FAILED: &str = "Failed to encode bloom object."; @@ -56,6 +57,8 @@ pub enum BloomError { DecodeUnsupportedVersion, ErrorRateRange, BadExpansion, + FalsePositiveReachesZero, + BadCapacity, } impl BloomError { @@ -69,6 +72,8 @@ impl BloomError { BloomError::DecodeUnsupportedVersion => DECODE_UNSUPPORTED_VERSION, BloomError::ErrorRateRange => ERROR_RATE_RANGE, BloomError::BadExpansion => BAD_EXPANSION, + BloomError::FalsePositiveReachesZero => FALSE_POSITIVE_DEGRADES_TO_O, + BloomError::BadCapacity => BAD_CAPACITY, } } } @@ -319,7 +324,7 @@ impl BloomObject { Some(new_capacity) => new_capacity, None => { // u32:max cannot be reached with 64MB memory usage limit per filter even with a high fp rate (e.g. 0.9). - return Err(BloomError::MaxNumScalingFilters); + return Err(BloomError::BadCapacity); } }; // Reject the request, if the operation will result in creation of a filter of size greater than what is allowed. @@ -373,7 +378,7 @@ impl BloomObject { ) -> Result { match fp_rate * tightening_ratio.powi(num_filters) { x if x > f64::MIN_POSITIVE => Ok(x), - _ => Err(BloomError::MaxNumScalingFilters), + _ => Err(BloomError::FalsePositiveReachesZero), } } @@ -463,42 +468,78 @@ impl BloomObject { } } - pub fn calculate_if_wanted_capacity_is_valid( + /// This method is called from two different bloom commands BF.INFO and BF.INSERT. The functionality varies slightly on which command it + /// is called from. When called from BF.INFO this method is used to find the maximum possible size that the bloom object could scale to + /// without throwing an error. When called from BF.INSERT this method is used to determine if it is possible to reach the capacity that + /// has been provided. + /// + /// # Arguments + /// + /// * `capacity` - The size of the initial filter in the bloom object. + /// * `fp_rate` - the false positive rate for the bloom object + /// * `can_scale_to` - the capcity we check to see if it can scale to. If this method is called from BF.INFO this is set as -1 as we + /// want to check the maximum size we could scale up till + /// * `tightening_ratio` - The tightening ratio of the object + /// * `expansion` - The expanison rate of the object + /// + /// # Returns + /// * i64 - The maximum capacity that can be reached if called from BF.INFO. If called from BF.INSERT the size it reached after going past max capacity + /// * ValkeyError - Can return two different errors: + /// CAN_SCALE_TO_EXCEEDS_MAX_SIZE: When scaling to the wanted capacity would go over the bloom object memory limit + /// CAN_SCALE_TO_FALSE_POSITIVE_INVALID: When scaling to the wanted capcity would cause the false positive rate to reach 0 + pub fn calculate_if_can_scale_to_is_valid( capacity: i64, fp_rate: f64, - wanted_capacity: i64, + can_scale_to: i64, tightening_ratio: f64, expansion: u32, - ) -> Result<(), ValkeyError> { - let mut curr_capacity = capacity; - let mut curr_num_filters: u64 = 1; - let mut curr_fp_rate = fp_rate; + ) -> Result { + let mut curr_filter_capacity = capacity; + let mut curr_total_capacity = capacity; + let mut curr_num_filters: u64 = 0; let mut filters_memory_usage = 0; - while curr_capacity < wanted_capacity { - curr_fp_rate = match BloomObject::calculate_fp_rate( - curr_fp_rate, + while curr_total_capacity < can_scale_to || can_scale_to == -1 { + curr_filter_capacity = match curr_filter_capacity.checked_mul(expansion.into()) { + Some(new_capacity) => new_capacity, + None => { + // u32:max cannot be reached with 64MB memory usage limit per filter even with a high fp rate (e.g. 0.9). + return Err(ValkeyError::Str(BAD_CAPACITY)); + } + }; + curr_total_capacity += curr_filter_capacity; + curr_num_filters += 1; + + // Check to see if scaling to the next filter will cause a degradation in FP to 0 + let curr_fp_rate = match BloomObject::calculate_fp_rate( + fp_rate, curr_num_filters as i32, tightening_ratio, ) { Ok(rate) => rate, Err(_) => { - return Err(ValkeyError::Str(WANTED_CAPACITY_FALSE_POSITIVE_INVALID)); + if can_scale_to == -1 { + return Ok(curr_total_capacity - curr_filter_capacity); + } + return Err(ValkeyError::Str(CAN_SCALE_TO_FALSE_POSITIVE_INVALID)); } }; - let curr_filter_size = BloomFilter::compute_size(curr_capacity, curr_fp_rate); + // Check that if it scales to this number of filters that the object won't exceed the memory limit + let curr_filter_size = BloomFilter::compute_size(curr_filter_capacity, curr_fp_rate); // For vectors of size < 4 the capacity of the vector is 4. However after that the capacity is always a power of two above or equal to the size let curr_object_size = BloomObject::compute_size( std::cmp::max(4, curr_num_filters).next_power_of_two() as usize, ) + filters_memory_usage + curr_filter_size; if !BloomObject::validate_size(curr_object_size) { - return Err(ValkeyError::Str(WANTED_CAPACITY_EXCEEDS_MAX_SIZE)); + if can_scale_to == -1 { + return Ok(curr_total_capacity - curr_filter_capacity); + } + return Err(ValkeyError::Str(CAN_SCALE_TO_EXCEEDS_MAX_SIZE)); } + // Update overall memory usage filters_memory_usage += curr_filter_size; - curr_capacity *= expansion as i64; - curr_num_filters += 1; } - Ok(()) + Ok(curr_total_capacity / expansion as i64) } } diff --git a/tests/test_bloom_command.py b/tests/test_bloom_command.py index 890f407..9332cd7 100644 --- a/tests/test_bloom_command.py +++ b/tests/test_bloom_command.py @@ -44,9 +44,9 @@ def test_bloom_command_error(self): ('BF.INSERT TEST_LIMIT EXPANSION 4294967299 ITEMS EXPAN', 'bad expansion'), ('BF.INSERT TEST_NOCREATE NOCREATE ITEMS A B', 'not found'), ('BF.INSERT KEY HELLO', 'unknown argument received'), - ('BF.INSERT KEY CAPACITY 1 ERROR 0.0000000001 ATLEASTCAPACITY 10000000 EXPANSION 1', 'False positive degrades too much to reach wanted capacity'), - ('BF.INSERT KEY ATLEASTCAPACITY 1000000000000', 'Wanted capacity would go beyond bloom object memory limit'), - ('BF.INSERT KEY ATLEASTCAPACITY 1000000000000 NONSCALING', 'Specifying NONSCALING and ATLEASTCAPCITY is not allowed'), + ('BF.INSERT KEY CAPACITY 1 ERROR 0.0000000001 CANSCALETO 10000000 EXPANSION 1', 'provided CANSCALETO causes false positive to degrades to 0'), + ('BF.INSERT KEY CANSCALETO 1000000000000', 'provided CANSCALETO causes bloom object to exceed memory limit'), + ('BF.INSERT KEY CANSCALETO 1000000000000 NONSCALING', 'cannot use NONSCALING and CANSCALETO options together'), ('BF.RESERVE KEY String 100', 'bad error rate'), ('BF.RESERVE KEY 0.99999999999999999 3000', '(0 < error rate range < 1)'), ('BF.RESERVE KEY 2 100', '(0 < error rate range < 1)'), @@ -120,6 +120,7 @@ def test_bloom_command_behavior(self): ('bf.info TEST expansion', 2), ('BF.INFO TEST_EXPANSION EXPANSION', 9), ('BF.INFO TEST_CAPACITY CAPACITY', 2000), + ('BF.INFO TEST MAXCAPACITY', 26214300), ('BF.CARD key', 3), ('BF.CARD hello', 5), ('BF.CARD TEST', 5), @@ -130,6 +131,7 @@ def test_bloom_command_behavior(self): ('BF.RESERVE bf_non 0.01 1000 NONSCALING', b'OK'), ('bf.info bf_exp expansion', 2), ('BF.INFO bf_non expansion', None), + ('BF.INFO bf_non MAXCAPACITY', 1000), ] for test_case in basic_behavior_test_case: diff --git a/tests/test_bloom_correctness.py b/tests/test_bloom_correctness.py index d389511..905bf75 100644 --- a/tests/test_bloom_correctness.py +++ b/tests/test_bloom_correctness.py @@ -25,6 +25,7 @@ def test_non_scaling_filter(self): assert info_dict[b'Number of filters'] == 1 assert info_dict[b'Size'] > 0 assert info_dict[b'Expansion rate'] == None + assert "Maximum Capacity" not in info_dict # Use a margin on the expected_fp_rate when asserting for correctness. fp_margin = 0.002 # Validate that item "add" operations on bloom filters are ensuring correctness. @@ -74,6 +75,7 @@ def test_scaling_filter(self): assert info_dict[b'Number of filters'] == 1 assert info_dict[b'Size'] > 0 assert info_dict[b'Expansion rate'] == expansion + assert info_dict[b'Maximum Capacity'] == 20470000 # Scale out by adding items. total_error_count = 0 @@ -92,6 +94,7 @@ def test_scaling_filter(self): assert info_dict[b'Number of filters'] == filter_idx assert info_dict[b'Size'] > 0 assert info_dict[b'Expansion rate'] == expansion + assert info_dict[b'Maximum Capacity'] == 20470000 # Use a margin on the expected_fp_rate when asserting for correctness. fp_margin = 0.002 @@ -127,3 +130,31 @@ def test_scaling_filter(self): info_dict = dict(zip(it, it)) # Validate correctness on a copy of a scaling bloom filter. self.validate_copied_bloom_correctness(client, filter_name, item_prefix, add_operation_idx, expected_fp_rate, fp_margin, info_dict) + + def test_max_and_can_scale_to_correctness(self): + can_scale_to_commands = [ + ('BF.INSERT key ERROR 0.00000001 CANSCALETO 13107101', "provided CANSCALETO causes bloom object to exceed memory limit" ), + ('BF.INSERT key EXPANSION 1 CANSCALETO 101601', "provided CANSCALETO causes false positive to degrades to 0" ) + ] + for cmd in can_scale_to_commands: + try: + self.client.execute_command(cmd[0]) + assert False, "Expect BF.INSERT to fail if the wanted capacity would cause an error" + except Exception as e: + print(cmd[0]) + assert cmd[1] == str(e), f"Unexpected error message: {e}" + self.client.execute_command('BF.INSERT MemLimitKey ERROR 0.00000001 CANSCALETO 13107100') + self.client.execute_command('BF.INSERT FPKey CANSCALETO 101600 EXPANSION 1') + FPKey_max_capacity = self.client.execute_command(f'BF.INFO FPKey MAXCAPACITY') + MemLimitKeyMaxCapacity = self.client.execute_command(f'BF.INFO MemLimitKey MAXCAPACITY') + self.add_items_till_capacity(self.client, "FPKey", 101600, 1, "item") + self.add_items_till_capacity(self.client, "MemLimitKey", 13107100, 1, "item") + key_names = [("MemLimitKey", MemLimitKeyMaxCapacity, "operation exceeds bloom object memory limit"), ("FPKey", FPKey_max_capacity, "false positive degrades to 0 on scale out")] + for key in key_names: + try: + self.add_items_till_capacity(self.client, key[0], key[1]+1, 1, "new_item") + assert False, "Expect adding to an item after reaching max capacity should fail" + except Exception as e: + assert key[2] in str(e) + # Check that max capacity doesnt change even after adding items. + assert self.client.execute_command(f'BF.INFO {key[0]} MAXCAPACITY') == key[1] diff --git a/tests/valkeytests/valkey_test_case.py b/tests/valkeytests/valkey_test_case.py index 284e53b..4ef3dc4 100644 --- a/tests/valkeytests/valkey_test_case.py +++ b/tests/valkeytests/valkey_test_case.py @@ -1,17 +1,11 @@ import subprocess import time -import random import os import pytest import re -import struct -import threading -import io -import socket from contextlib import contextmanager from functools import wraps from valkey import * -from valkey.client import Pipeline from util.waiters import * from enum import Enum