Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/unstable' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
YueTang-Vanessa committed Sep 18, 2024
2 parents 3b38005 + 6d0a4b6 commit 17e10b3
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 61 deletions.
12 changes: 9 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ authors = ["Karthik Subbarao"]
version = "0.1.0"
edition = "2021"
license = "BSD-3-Clause"
repository = "https://github.com/KarthikSubbarao/valkey-bloom"
repository = "https://github.com/valkey-io/valkey-bloom"
readme = "README.md"
description = "A bloom filter module for Valkey"
homepage = "https://github.com/KarthikSubbarao/valkey-bloom"
homepage = "https://github.com/valkey-io/valkey-bloom"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
valkey-module = "0.1.1"
valkey-module = "0.1.2"
bloomfilter = "1.0.13"
lazy_static = "1.4.0"
libc = "0.2"

[dev-dependencies]
rand = "0.8"

[lib]
crate-type = ["cdylib"]
name = "valkey_bloom"
Expand All @@ -25,3 +28,6 @@ name = "valkey_bloom"
opt-level = 0
debug = 2
debug-assertions = true

[features]
enable-system-alloc = ["valkey-module/enable-system-alloc"]
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ curl https://sh.rustup.rs -sSf | sh
sudo yum install clang
git clone https://github.com/KarthikSubbarao/valkey-bloom.git
cd valkey-bloom
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
cargo build --all --all-targets --release
find . -name "libvalkey_bloom.so"
valkey-server --loadmodule ./target/release/libvalkey_bloom.so
```

Script to build, run tests, and for release
Local development script to build, run format checks, run unit / integration tests, and for cargo release:
```
# Builds the valkey-server (unstable) for integration testing.
VERSION=unstable
./build.sh
# Builds the valkey-server (7.2.6) for integration testing.
VERSION=7.2.6
./build.sh
```

Expand Down Expand Up @@ -111,4 +113,4 @@ db0:keys=1,expires=0,avg_ttl=0
1) "key"
127.0.0.1:6379> bf.exists key item
(integer) 1
```
```
7 changes: 3 additions & 4 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ cargo clippy --profile release --all-targets -- -D clippy::all
echo "Running cargo build release..."
cargo build --all --all-targets --release

# We have waiting on a new feature in the valkey-module-rs to be released which will allow unit testing of Valkey Rust Modules.
# echo "Running unit tests..."
# cargo test
echo "Running unit tests..."
cargo test --features enable-system-alloc

# Ensure VERSION environment variable is set
if [ -z "$VERSION" ]; then
Expand All @@ -26,7 +25,7 @@ if [ -z "$VERSION" ]; then
fi

if [ "$VERSION" != "unstable" ] && [ "$VERSION" != "7.2.6" ] && [ "$VERSION" != "7.2.5" ] ; then
echo "ERROR: Unsupported version - $VERSIOn"
echo "ERROR: Unsupported version - $VERSION"
exit 1
fi

Expand Down
38 changes: 20 additions & 18 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use crate::bloom::data_type::BLOOM_FILTER_TYPE;
use crate::bloom::utils;
use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::configs::BLOOM_CAPACITY_MAX;
use crate::configs::BLOOM_EXPANSION_MAX;
use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
};
use std::sync::atomic::Ordering;
use valkey_module::NotifyEvent;
use valkey_module::{Context, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, VALKEY_OK};
Expand Down Expand Up @@ -48,7 +50,7 @@ fn handle_bloom_add(
}
}

fn replicate_and_post_events(
fn replicate_and_notify_events(
ctx: &Context,
key_name: &ValkeyString,
add_operation: bool,
Expand Down Expand Up @@ -97,7 +99,7 @@ pub fn bloom_filter_add_value(
multi,
&mut add_succeeded,
);
replicate_and_post_events(ctx, filter_name, add_succeeded, false);
replicate_and_notify_events(ctx, filter_name, add_succeeded, false);
response
}
None => {
Expand All @@ -116,7 +118,7 @@ pub fn bloom_filter_add_value(
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) {
Ok(_) => {
replicate_and_post_events(ctx, filter_name, add_succeeded, true);
replicate_and_notify_events(ctx, filter_name, add_succeeded, true);
response
}
Err(_) => Err(ValkeyError::Str(utils::ERROR)),
Expand Down Expand Up @@ -203,8 +205,8 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
curr_cmd_idx += 1;
// Parse the error rate
let fp_rate = match input_args[curr_cmd_idx].to_string_lossy().parse::<f32>() {
Ok(num) if (0.0..1.0).contains(&num) => num,
Ok(num) if !(0.0..1.0).contains(&num) => {
Ok(num) if num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX => num,
Ok(num) if !(num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX) => {
return Err(ValkeyError::Str(utils::ERROR_RATE_RANGE));
}
_ => {
Expand All @@ -214,7 +216,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
curr_cmd_idx += 1;
// Parse the capacity
let capacity = match input_args[curr_cmd_idx].to_string_lossy().parse::<u32>() {
Ok(num) if num > 0 && num < BLOOM_CAPACITY_MAX => num,
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
Ok(0) => {
return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0));
}
Expand All @@ -236,7 +238,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
"EXPANSION" if argc == 6 => {
curr_cmd_idx += 1;
expansion = match input_args[curr_cmd_idx].to_string_lossy().parse::<u32>() {
Ok(num) if num > 0 && num <= BLOOM_EXPANSION_MAX => num,
Ok(num) if (BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&num) => num,
_ => {
return Err(ValkeyError::Str(utils::BAD_EXPANSION));
}
Expand All @@ -261,7 +263,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let bloom = BloomFilterType::new_reserved(fp_rate, capacity, expansion);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(_v) => {
replicate_and_post_events(ctx, filter_name, false, true);
replicate_and_notify_events(ctx, filter_name, false, true);
VALKEY_OK
}
Err(_) => Err(ValkeyError::Str(utils::ERROR)),
Expand Down Expand Up @@ -289,8 +291,8 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
"ERROR" if idx < (argc - 1) => {
idx += 1;
fp_rate = match input_args[idx].to_string_lossy().parse::<f32>() {
Ok(num) if (0.0..1.0).contains(&num) => num,
Ok(num) if !(0.0..1.0).contains(&num) => {
Ok(num) if num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX => num,
Ok(num) if !(num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX) => {
return Err(ValkeyError::Str(utils::ERROR_RATE_RANGE));
}
_ => {
Expand All @@ -301,7 +303,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
"CAPACITY" if idx < (argc - 1) => {
idx += 1;
capacity = match input_args[idx].to_string_lossy().parse::<u32>() {
Ok(num) if num > 0 && num < BLOOM_CAPACITY_MAX => num,
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
Ok(0) => {
return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0));
}
Expand All @@ -319,7 +321,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
"EXPANSION" if idx < (argc - 1) => {
idx += 1;
expansion = match input_args[idx].to_string_lossy().parse::<u32>() {
Ok(num) if num > 0 && num <= BLOOM_EXPANSION_MAX => num,
Ok(num) if (BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&num) => num,
_ => {
return Err(ValkeyError::Str(utils::BAD_EXPANSION));
}
Expand Down Expand Up @@ -347,7 +349,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
match value {
Some(bf) => {
let response = handle_bloom_add(input_args, argc, idx, bf, true, &mut add_succeeded);
replicate_and_post_events(ctx, filter_name, add_succeeded, false);
replicate_and_notify_events(ctx, filter_name, add_succeeded, false);
response
}
None => {
Expand All @@ -359,7 +361,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
handle_bloom_add(input_args, argc, idx, &mut bf, true, &mut add_succeeded);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) {
Ok(_) => {
replicate_and_post_events(ctx, filter_name, add_succeeded, true);
replicate_and_notify_events(ctx, filter_name, add_succeeded, true);
response
}
Err(_) => Err(ValkeyError::Str(utils::ERROR)),
Expand Down Expand Up @@ -392,7 +394,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
.as_str()
{
"CAPACITY" => Ok(ValkeyValue::Integer(val.capacity())),
"SIZE" => Ok(ValkeyValue::Integer(val.get_memory_usage() as i64)),
"SIZE" => Ok(ValkeyValue::Integer(val.memory_usage() as i64)),
"FILTERS" => Ok(ValkeyValue::Integer(val.filters.len() as i64)),
"ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())),
"EXPANSION" => {
Expand All @@ -409,7 +411,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
ValkeyValue::SimpleStringStatic("Capacity"),
ValkeyValue::Integer(val.capacity()),
ValkeyValue::SimpleStringStatic("Size"),
ValkeyValue::Integer(val.get_memory_usage() as i64),
ValkeyValue::Integer(val.memory_usage() as i64),
ValkeyValue::SimpleStringStatic("Number of filters"),
ValkeyValue::Integer(val.filters.len() as i64),
ValkeyValue::SimpleStringStatic("Number of items inserted"),
Expand Down
4 changes: 2 additions & 2 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ pub fn bloom_rdb_load_data_object(
encver: i32,
) -> Option<BloomFilterType> {
if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION {
logging::log_warning(format!("{}: Cannot load bloomfilter type version {} because it is higher than the current module's string type version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str());
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str());
return None;
}
let Ok(num_filters) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(expansion) = raw::load_signed(rdb) else {
let Ok(expansion) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(fp_rate) = raw::load_float(rdb) else {
Expand Down
Loading

0 comments on commit 17e10b3

Please sign in to comment.