Skip to content

Commit

Permalink
Merge branch 'valkey-io:unstable' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
YueTang-Vanessa authored Nov 15, 2024
2 parents 36cfb90 + 9d61cad commit a4c387f
Show file tree
Hide file tree
Showing 10 changed files with 441 additions and 5 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ homepage = "https://github.com/valkey-io/valkey-bloom"
valkey-module = "0.1.2"
valkey-module-macros = "0"
linkme = "0"
bloomfilter = "1.0.13"
bloomfilter = { version = "1.0.13", features = ["serde"] }
lazy_static = "1.4.0"
libc = "0.2"
serde = { version = "1.0", features = ["derive"] }
bincode = "1.3"

[dev-dependencies]
rand = "0.8"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ BF.CARD
BF.RESERVE
BF.INFO
BF.INSERT
BF.LOAD
```

Build instructions for Linux.
Expand Down
45 changes: 45 additions & 0 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,3 +490,48 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
_ => Err(ValkeyError::Str(utils::NOT_FOUND)),
}
}

pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult {
let argc = input_args.len();
if argc != 3 {
return Err(ValkeyError::WrongArity);
}
let mut idx = 1;
let filter_name = &input_args[idx];
idx += 1;
let value = &input_args[idx];
// find filter
let filter_key = ctx.open_key_writable(filter_name);

let filter = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
// error
return Err(ValkeyError::Str(utils::ERROR));
}
};
match filter {
Some(_) => {
// if bloom exists, return exists error.
Err(ValkeyError::Str(utils::KEY_EXISTS))
}
None => {
// if filter not exists, create it.
let hex = value.to_vec();
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let bf = match BloomFilterType::decode_bloom_filter(&hex, validate_size_limit) {
Ok(v) => v,
Err(err) => {
return Err(ValkeyError::Str(err.as_str()));
}
};
match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) {
Ok(_) => {
replicate_and_notify_events(ctx, filter_name, false, true);
VALKEY_OK
}
Err(_) => Err(ValkeyError::Str(utils::ERROR)),
}
}
}
}
7 changes: 5 additions & 2 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ use std::os::raw::c_int;
use valkey_module::native_types::ValkeyType;
use valkey_module::{logging, raw};

/// Used for decoding and encoding `BloomFilterType`. Currently used in AOF Rewrite.
/// This value must increased when `BloomFilterType` struct change.
pub const BLOOM_TYPE_VERSION: u8 = 1;

const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 1;

pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
Expand All @@ -21,8 +25,7 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
version: raw::REDISMODULE_TYPE_METHOD_VERSION as u64,
rdb_load: Some(bloom_callback::bloom_rdb_load),
rdb_save: Some(bloom_callback::bloom_rdb_save),
// TODO
aof_rewrite: None,
aof_rewrite: Some(bloom_callback::bloom_aof_rewrite),

mem_usage: Some(bloom_callback::bloom_mem_usage),
// TODO
Expand Down
237 changes: 236 additions & 1 deletion src/bloom/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
use crate::{configs, metrics};
use crate::{
configs::{
self, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
},
metrics,
};
use bloomfilter;
use serde::{Deserialize, Serialize};
use std::{mem, sync::atomic::Ordering};

use super::data_type::BLOOM_TYPE_VERSION;

/// KeySpace Notification Events
pub const ADD_EVENT: &str = "bloom.add";
pub const RESERVE_EVENT: &str = "bloom.reserve";
Expand All @@ -21,12 +29,22 @@ pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number o
pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received";
pub const EXCEEDS_MAX_BLOOM_SIZE: &str =
"ERR operation results in filter allocation exceeding size limit";
pub const KEY_EXISTS: &str = "BUSYKEY Target key name already exists.";
pub const ENCODE_BLOOM_FILTER_FAILED: &str = "ERR encode bloom filter failed.";
pub const DECODE_BLOOM_FILTER_FAILED: &str = "ERR decode bloom filter failed.";
pub const DECODE_UNSUPPORTED_VERSION: &str =
"ERR decode bloom filter failed. Unsupported version.";

#[derive(Debug, PartialEq)]
pub enum BloomError {
NonScalingFilterFull,
MaxNumScalingFilters,
ExceedsMaxBloomSize,
EncodeBloomFilterFailed,
DecodeBloomFilterFailed,
DecodeUnsupportedVersion,
ErrorRateRange,
BadExpansion,
}

impl BloomError {
Expand All @@ -35,13 +53,19 @@ impl BloomError {
BloomError::NonScalingFilterFull => NON_SCALING_FILTER_FULL,
BloomError::MaxNumScalingFilters => MAX_NUM_SCALING_FILTERS,
BloomError::ExceedsMaxBloomSize => EXCEEDS_MAX_BLOOM_SIZE,
BloomError::EncodeBloomFilterFailed => ENCODE_BLOOM_FILTER_FAILED,
BloomError::DecodeBloomFilterFailed => DECODE_BLOOM_FILTER_FAILED,
BloomError::DecodeUnsupportedVersion => DECODE_UNSUPPORTED_VERSION,
BloomError::ErrorRateRange => ERROR_RATE_RANGE,
BloomError::BadExpansion => BAD_EXPANSION,
}
}
}

/// The BloomFilterType structure. 40 bytes.
/// Can contain one or more filters.
/// This is a generic top level structure which is not coupled to any bloom crate.
#[derive(Serialize, Deserialize)]
pub struct BloomFilterType {
pub expansion: u32,
pub fp_rate: f64,
Expand Down Expand Up @@ -193,6 +217,91 @@ impl BloomFilterType {
}
Ok(0)
}

/// Serializes bloomFilter to a byte array.
pub fn encode_bloom_filter(&self) -> Result<Vec<u8>, BloomError> {
match bincode::serialize(self) {
Ok(vec) => {
let mut final_vec = Vec::with_capacity(1 + vec.len());
final_vec.push(BLOOM_TYPE_VERSION);
final_vec.extend(vec);
Ok(final_vec)
}
Err(_) => Err(BloomError::EncodeBloomFilterFailed),
}
}

/// Deserialize a byte array to bloom filter.
/// We will need to handle any current or previous version and deserializing the bytes into a bloom object of the running Module's current version `BLOOM_TYPE_VERSION`.
pub fn decode_bloom_filter(
decoded_bytes: &[u8],
validate_size_limit: bool,
) -> Result<BloomFilterType, BloomError> {
if decoded_bytes.is_empty() {
return Err(BloomError::DecodeBloomFilterFailed);
}
let version = decoded_bytes[0];
match version {
1 => {
// always use new version to init bloomFilterType.
// This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future.
let (expansion, fp_rate, filters): (u32, f64, Vec<BloomFilter>) =
match bincode::deserialize::<(u32, f64, Vec<BloomFilter>)>(&decoded_bytes[1..])
{
Ok(values) => {
if !(BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&values.0) {
return Err(BloomError::BadExpansion);
}
if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) {
return Err(BloomError::ErrorRateRange);
}
if values.2.len() >= configs::MAX_FILTERS_PER_OBJ as usize {
return Err(BloomError::MaxNumScalingFilters);
}
for _filter in values.2.iter() {
// Reject the request, if the operation will result in creation of a filter of size greater than what is allowed.
if validate_size_limit
&& _filter.number_of_bytes()
> configs::BLOOM_MEMORY_LIMIT_PER_FILTER
.load(Ordering::Relaxed)
as usize
{
return Err(BloomError::ExceedsMaxBloomSize);
}
}
values
}
Err(_) => {
return Err(BloomError::DecodeBloomFilterFailed);
}
};
let item = BloomFilterType {
expansion,
fp_rate,
filters,
};
// add bloom filter type metrics.
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
std::sync::atomic::Ordering::Relaxed,
);
// add bloom filter metrics.
for filter in &item.filters {
metrics::BLOOM_NUM_FILTERS_ACROSS_OBJECTS
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
filter.number_of_bytes(),
std::sync::atomic::Ordering::Relaxed,
);
}

Ok(item)
}

_ => Err(BloomError::DecodeUnsupportedVersion),
}
}
}

/// Structure representing a single bloom filter. 200 Bytes.
Expand All @@ -201,6 +310,7 @@ impl BloomFilterType {
/// we have a limit on the memory usage of a `BloomFilter` to be 64MB.
/// Based on this, we expect the number of items on the `BloomFilter` to be
/// well within the u32::MAX limit.
#[derive(Serialize, Deserialize)]
pub struct BloomFilter {
pub bloom: bloomfilter::Bloom<[u8]>,
pub num_items: u32,
Expand Down Expand Up @@ -636,4 +746,129 @@ mod tests {
let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true);
assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize));
}

#[test]
fn test_bf_encode_and_decode() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);

// action
let encoder_result = bf.encode_bloom_filter();

// assert
// encoder sucess
assert!(encoder_result.is_ok());
let vec = encoder_result.unwrap();

// assert decode:
let new_bf_result = BloomFilterType::decode_bloom_filter(&vec, true);

let new_bf = new_bf_result.unwrap();

// verify new_bf and bf
assert_eq!(bf.fp_rate, new_bf.fp_rate);
assert_eq!(bf.expansion, new_bf.expansion);
assert_eq!(bf.capacity(), new_bf.capacity());

// contains key
assert!(new_bf.item_exists(key.as_bytes()));
}

#[test]
fn test_bf_decode_when_unsupported_version_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true).unwrap();

let encoder_result = bf.encode_bloom_filter();
assert!(encoder_result.is_ok());

// 1. unsupport version should return error
let mut vec = encoder_result.unwrap();
vec[0] = 10;

// assert decode:
// should return error
assert_eq!(
BloomFilterType::decode_bloom_filter(&vec, true).err(),
Some(BloomError::DecodeUnsupportedVersion)
);
}

#[test]
fn test_bf_decode_when_bytes_is_empty_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);

let encoder_result = bf.encode_bloom_filter();
assert!(encoder_result.is_ok());

// 1. empty vec should return error
let vec: Vec<u8> = Vec::new();
// assert decode:
// should return error
assert_eq!(
BloomFilterType::decode_bloom_filter(&vec, true).err(),
Some(BloomError::DecodeBloomFilterFailed)
);
}

#[test]
fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);
let origin_expansion = bf.expansion;
let origin_fp_rate = bf.fp_rate;
// unsupoort expansion
bf.expansion = 0;

let encoder_result = bf.encode_bloom_filter();

// 1. unsupport expansion
let vec = encoder_result.unwrap();
// assert decode:
// should return error
assert_eq!(
BloomFilterType::decode_bloom_filter(&vec, true).err(),
Some(BloomError::BadExpansion)
);

// 1.2 Exceeded the maximum expansion
bf.expansion = BLOOM_EXPANSION_MAX + 1;

let vec = bf.encode_bloom_filter().unwrap();
assert_eq!(
BloomFilterType::decode_bloom_filter(&vec, true).err(),
Some(BloomError::BadExpansion)
);
// recover
bf.expansion = origin_expansion;

// 2. unsupport fp_rate
bf.fp_rate = -0.5;
let vec = bf.encode_bloom_filter().unwrap();
// should return error
assert_eq!(
BloomFilterType::decode_bloom_filter(&vec, true).err(),
Some(BloomError::ErrorRateRange)
);
bf.fp_rate = origin_fp_rate;

// 3. build a larger than 64mb filter
let extra_large_filter =
BloomFilterType::new_reserved(0.01_f64, 57000000, 2, false).unwrap();
let vec = extra_large_filter.encode_bloom_filter().unwrap();
// should return error
assert_eq!(
BloomFilterType::decode_bloom_filter(&vec, true).err(),
Some(BloomError::ExceedsMaxBloomSize)
);
}
}
Loading

0 comments on commit a4c387f

Please sign in to comment.