Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle bloom object max allowed size limit, switch fp rate to f64 #18

Merged
merged 4 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
git clone "${{ env.VALKEY_REPO_URL }}"
cd valkey
git checkout ${{ matrix.server_version }}
make
make -j
cp src/valkey-server ../binaries/${{ matrix.server_version }}/
- name: Set up Python
uses: actions/setup-python@v3
Expand Down Expand Up @@ -87,7 +87,7 @@ jobs:
git clone "${{ env.VALKEY_REPO_URL }}"
cd valkey
git checkout ${{ matrix.server_version }}
make SANITIZER=address SERVER_CFLAGS='-Werror' BUILD_TLS=module
make -j SANITIZER=address SERVER_CFLAGS='-Werror' BUILD_TLS=module
cp src/valkey-server ../binaries/${{ matrix.server_version }}/
- name: Set up Python
uses: actions/setup-python@v3
Expand Down
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ else
git clone "$REPO_URL"
cd valkey
git checkout "$SERVER_VERSION"
make
make -j
cp src/valkey-server ../binaries/$SERVER_VERSION/
fi

Expand Down
81 changes: 65 additions & 16 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::configs::{
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
};
use std::sync::atomic::Ordering;
use valkey_module::ContextFlags;
use valkey_module::NotifyEvent;
use valkey_module::{Context, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, VALKEY_OK};

Expand All @@ -17,12 +18,13 @@ fn handle_bloom_add(
bf: &mut BloomFilterType,
multi: bool,
add_succeeded: &mut bool,
validate_size_limit: bool,
) -> Result<ValkeyValue, ValkeyError> {
match multi {
true => {
let mut result = Vec::new();
for item in args.iter().take(argc).skip(item_idx) {
match bf.add_item(item.as_slice()) {
match bf.add_item(item.as_slice(), validate_size_limit) {
Ok(add_result) => {
if add_result == 1 {
*add_succeeded = true;
Expand All @@ -39,7 +41,7 @@ fn handle_bloom_add(
}
false => {
let item = args[item_idx].as_slice();
match bf.add_item(item) {
match bf.add_item(item, validate_size_limit) {
Ok(add_result) => {
*add_succeeded = add_result == 1;
Ok(ValkeyValue::Integer(add_result))
Expand Down Expand Up @@ -88,16 +90,19 @@ pub fn bloom_filter_add_value(
return Err(ValkeyError::Str(utils::ERROR));
}
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut add_succeeded = false;
match value {
Some(bf) => {
Some(bloom) => {
let response = handle_bloom_add(
input_args,
argc,
curr_cmd_idx,
bf,
bloom,
multi,
&mut add_succeeded,
validate_size_limit,
);
replicate_and_notify_events(ctx, filter_name, add_succeeded, false);
response
Expand All @@ -107,16 +112,25 @@ pub fn bloom_filter_add_value(
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut bf = BloomFilterType::new_reserved(fp_rate, capacity, expansion);
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
capacity,
expansion,
validate_size_limit,
) {
Ok(bf) => bf,
Err(err) => return Err(ValkeyError::Str(err.as_str())),
};
let response = handle_bloom_add(
input_args,
argc,
curr_cmd_idx,
&mut bf,
&mut bloom,
multi,
&mut add_succeeded,
validate_size_limit,
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) {
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(()) => {
replicate_and_notify_events(ctx, filter_name, add_succeeded, true);
response
Expand Down Expand Up @@ -204,7 +218,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let filter_name = &input_args[curr_cmd_idx];
curr_cmd_idx += 1;
// Parse the error rate
let fp_rate = match input_args[curr_cmd_idx].to_string_lossy().parse::<f32>() {
let fp_rate = match input_args[curr_cmd_idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX => num,
Ok(num) if !(num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX) => {
return Err(ValkeyError::Str(utils::ERROR_RATE_RANGE));
Expand Down Expand Up @@ -260,7 +274,17 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
match value {
Some(_) => Err(ValkeyError::Str(utils::ITEM_EXISTS)),
None => {
let bloom = BloomFilterType::new_reserved(fp_rate, capacity, expansion);
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let bloom = match BloomFilterType::new_reserved(
fp_rate,
capacity,
expansion,
validate_size_limit,
) {
Ok(bf) => bf,
Err(err) => return Err(ValkeyError::Str(err.as_str())),
};
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(()) => {
replicate_and_notify_events(ctx, filter_name, false, true);
Expand Down Expand Up @@ -293,7 +317,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
return Err(ValkeyError::WrongArity);
}
idx += 1;
fp_rate = match input_args[idx].to_string_lossy().parse::<f32>() {
fp_rate = match input_args[idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX => num,
Ok(num) if !(num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX) => {
return Err(ValkeyError::Str(utils::ERROR_RATE_RANGE));
Expand Down Expand Up @@ -358,21 +382,46 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
return Err(ValkeyError::Str(utils::ERROR));
}
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut add_succeeded = false;
match value {
Some(bf) => {
let response = handle_bloom_add(input_args, argc, idx, bf, true, &mut add_succeeded);
Some(bloom) => {
let response = handle_bloom_add(
input_args,
argc,
idx,
bloom,
true,
&mut add_succeeded,
validate_size_limit,
);
replicate_and_notify_events(ctx, filter_name, add_succeeded, false);
response
}
None => {
if nocreate {
return Err(ValkeyError::Str(utils::NOT_FOUND));
}
let mut bf = BloomFilterType::new_reserved(fp_rate, capacity, expansion);
let response =
handle_bloom_add(input_args, argc, idx, &mut bf, true, &mut add_succeeded);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) {
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
capacity,
expansion,
validate_size_limit,
) {
Ok(bf) => bf,
Err(err) => return Err(ValkeyError::Str(err.as_str())),
};
let response = handle_bloom_add(
input_args,
argc,
idx,
&mut bloom,
true,
&mut add_succeeded,
validate_size_limit,
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(()) => {
replicate_and_notify_events(ctx, filter_name, add_succeeded, true);
response
Expand Down
9 changes: 7 additions & 2 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::os::raw::c_int;
use valkey_module::native_types::ValkeyType;
use valkey_module::{logging, raw};

const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 0;
const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 1;

pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
"bloomfltr",
Expand Down Expand Up @@ -64,7 +64,7 @@ impl ValkeyDataType for BloomFilterType {
let Ok(expansion) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(fp_rate) = raw::load_float(rdb) else {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};
for i in 0..num_filters {
Expand All @@ -74,6 +74,11 @@ impl ValkeyDataType for BloomFilterType {
let Ok(number_of_bits) = raw::load_unsigned(rdb) else {
return None;
};
// Reject RDB Load if any bloom filter within a bloom object of a size greater than what is allowed.
if !BloomFilter::validate_size_with_bits(number_of_bits) {
logging::log_warning("Failed to restore bloom object because it contains a filter larger than the max allowed size limit.");
return None;
}
let Ok(number_of_hash_functions) = raw::load_unsigned(rdb) else {
return None;
};
Expand Down
Loading
Loading