Skip to content

Commit

Permalink
Handle bloom object max allowed size limit, switch fp rate to f64, up…
Browse files Browse the repository at this point in the history
…date defrag exemption logic and free effort logic
  • Loading branch information
KarthikSubbarao committed Oct 23, 2024
1 parent 57a7901 commit 3be5454
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 64 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
git clone "${{ env.VALKEY_REPO_URL }}"
cd valkey
git checkout ${{ matrix.server_version }}
make
make -j
cp src/valkey-server ../binaries/${{ matrix.server_version }}/
- name: Set up Python
uses: actions/setup-python@v3
Expand Down Expand Up @@ -87,7 +87,7 @@ jobs:
git clone "${{ env.VALKEY_REPO_URL }}"
cd valkey
git checkout ${{ matrix.server_version }}
make SANITIZER=address SERVER_CFLAGS='-Werror' BUILD_TLS=module
make -j SANITIZER=address SERVER_CFLAGS='-Werror' BUILD_TLS=module
cp src/valkey-server ../binaries/${{ matrix.server_version }}/
- name: Set up Python
uses: actions/setup-python@v3
Expand Down
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ else
git clone "$REPO_URL"
cd valkey
git checkout "$SERVER_VERSION"
make
make -j
cp src/valkey-server ../binaries/$SERVER_VERSION/
fi

Expand Down
78 changes: 62 additions & 16 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::configs::{
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
};
use std::sync::atomic::Ordering;
use valkey_module::ContextFlags;
use valkey_module::NotifyEvent;
use valkey_module::{Context, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, VALKEY_OK};

Expand All @@ -17,12 +18,13 @@ fn handle_bloom_add(
bf: &mut BloomFilterType,
multi: bool,
add_succeeded: &mut bool,
validate_size_limit: bool,
) -> Result<ValkeyValue, ValkeyError> {
match multi {
true => {
let mut result = Vec::new();
for item in args.iter().take(argc).skip(item_idx) {
match bf.add_item(item.as_slice()) {
match bf.add_item(item.as_slice(), validate_size_limit) {
Ok(add_result) => {
if add_result == 1 {
*add_succeeded = true;
Expand All @@ -39,7 +41,7 @@ fn handle_bloom_add(
}
false => {
let item = args[item_idx].as_slice();
match bf.add_item(item) {
match bf.add_item(item, validate_size_limit) {
Ok(add_result) => {
*add_succeeded = add_result == 1;
Ok(ValkeyValue::Integer(add_result))
Expand Down Expand Up @@ -88,16 +90,18 @@ pub fn bloom_filter_add_value(
return Err(ValkeyError::Str(utils::ERROR));
}
};
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut add_succeeded = false;
match value {
Some(bf) => {
Some(bloom) => {
let response = handle_bloom_add(
input_args,
argc,
curr_cmd_idx,
bf,
bloom,
multi,
&mut add_succeeded,
validate_size_limit,
);
replicate_and_notify_events(ctx, filter_name, add_succeeded, false);
response
Expand All @@ -107,16 +111,25 @@ pub fn bloom_filter_add_value(
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut bf = BloomFilterType::new_reserved(fp_rate, capacity, expansion);
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
capacity,
expansion,
validate_size_limit,
) {
Ok(bf) => bf,
Err(err) => return Err(ValkeyError::Str(err.as_str())),
};
let response = handle_bloom_add(
input_args,
argc,
curr_cmd_idx,
&mut bf,
&mut bloom,
multi,
&mut add_succeeded,
validate_size_limit,
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) {
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(()) => {
replicate_and_notify_events(ctx, filter_name, add_succeeded, true);
response
Expand Down Expand Up @@ -204,7 +217,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let filter_name = &input_args[curr_cmd_idx];
curr_cmd_idx += 1;
// Parse the error rate
let fp_rate = match input_args[curr_cmd_idx].to_string_lossy().parse::<f32>() {
let fp_rate = match input_args[curr_cmd_idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX => num,
Ok(num) if !(num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX) => {
return Err(ValkeyError::Str(utils::ERROR_RATE_RANGE));
Expand Down Expand Up @@ -260,7 +273,16 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
match value {
Some(_) => Err(ValkeyError::Str(utils::ITEM_EXISTS)),
None => {
let bloom = BloomFilterType::new_reserved(fp_rate, capacity, expansion);
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let bloom = match BloomFilterType::new_reserved(
fp_rate,
capacity,
expansion,
validate_size_limit,
) {
Ok(bf) => bf,
Err(err) => return Err(ValkeyError::Str(err.as_str())),
};
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(()) => {
replicate_and_notify_events(ctx, filter_name, false, true);
Expand Down Expand Up @@ -293,7 +315,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
return Err(ValkeyError::WrongArity);
}
idx += 1;
fp_rate = match input_args[idx].to_string_lossy().parse::<f32>() {
fp_rate = match input_args[idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX => num,
Ok(num) if !(num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX) => {
return Err(ValkeyError::Str(utils::ERROR_RATE_RANGE));
Expand Down Expand Up @@ -358,21 +380,45 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
return Err(ValkeyError::Str(utils::ERROR));
}
};
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut add_succeeded = false;
match value {
Some(bf) => {
let response = handle_bloom_add(input_args, argc, idx, bf, true, &mut add_succeeded);
Some(bloom) => {
let response = handle_bloom_add(
input_args,
argc,
idx,
bloom,
true,
&mut add_succeeded,
validate_size_limit,
);
replicate_and_notify_events(ctx, filter_name, add_succeeded, false);
response
}
None => {
if nocreate {
return Err(ValkeyError::Str(utils::NOT_FOUND));
}
let mut bf = BloomFilterType::new_reserved(fp_rate, capacity, expansion);
let response =
handle_bloom_add(input_args, argc, idx, &mut bf, true, &mut add_succeeded);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) {
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
capacity,
expansion,
validate_size_limit,
) {
Ok(bf) => bf,
Err(err) => return Err(ValkeyError::Str(err.as_str())),
};
let response = handle_bloom_add(
input_args,
argc,
idx,
&mut bloom,
true,
&mut add_succeeded,
validate_size_limit,
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(()) => {
replicate_and_notify_events(ctx, filter_name, add_succeeded, true);
response
Expand Down
5 changes: 3 additions & 2 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::os::raw::c_int;
use valkey_module::native_types::ValkeyType;
use valkey_module::{logging, raw};

const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 0;
const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 1;

pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
"bloomfltr",
Expand Down Expand Up @@ -64,7 +64,7 @@ impl ValkeyDataType for BloomFilterType {
let Ok(expansion) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(fp_rate) = raw::load_float(rdb) else {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};
for i in 0..num_filters {
Expand Down Expand Up @@ -93,6 +93,7 @@ impl ValkeyDataType for BloomFilterType {
(FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B),
(FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B),
];
// Perform a check here ??
let filter = BloomFilter::from_existing(
bitmap.as_ref(),
number_of_bits,
Expand Down
Loading

0 comments on commit 3be5454

Please sign in to comment.