Skip to content

Commit

Permalink
Large bloom object handling + Rename to BloomFilterType to BloomObject
Browse files Browse the repository at this point in the history
Signed-off-by: Karthik Subbarao <karthikrs2021@gmail.com>
  • Loading branch information
KarthikSubbarao committed Jan 10, 2025
1 parent 1c949fb commit ee019ed
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 312 deletions.
112 changes: 60 additions & 52 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ jobs:
- uses: actions/checkout@v4
- name: Set the server verison for python integeration tests
run: echo "SERVER_VERSION=${{ matrix.server_version }}" >> $GITHUB_ENV
- name: Update cargo dependencies
run: |
cargo update
cargo clean
- name: Run cargo and clippy format check
run: |
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
# cargo clippy --profile release --all-targets -- -D clippy::all
- name: Release Build
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
- name: Run unit tests
Expand All @@ -49,55 +53,59 @@ jobs:
- name: Run integration tests
run: python -m pytest --cache-clear -v "tests/"

build-macos-latest:
runs-on: macos-latest
steps:
- uses: actions/checkout@v4
- name: Run cargo and clippy format check
run: |
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
- name: Release Build
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
- name: Run unit tests
run: cargo test --features enable-system-alloc
# build-macos-latest:
# runs-on: macos-latest
# steps:
# - uses: actions/checkout@v4
# - name: Update cargo dependencies
# run: cargo update
# - name: Run cargo and clippy format check
# run: |
# cargo fmt --check
# cargo clippy --profile release --all-targets -- -D clippy::all
# - name: Release Build
# run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
# - name: Run unit tests
# run: cargo test --features enable-system-alloc

asan-build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
server_version: ['unstable', '8.0.0']
steps:
- uses: actions/checkout@v4
- name: Set the server verison for python integeration tests
run: echo "SERVER_VERSION=${{ matrix.server_version }}" >> $GITHUB_ENV
- name: Run cargo and clippy format check
run: |
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
- name: Release Build
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
- name: Run unit tests
run: cargo test --features enable-system-alloc
- name: Make Valkey-server binary with asan
run: |
mkdir -p "tests/.build/binaries/${{ matrix.server_version }}"
cd tests/.build
git clone "${{ env.VALKEY_REPO_URL }}"
cd valkey
git checkout ${{ matrix.server_version }}
make -j SANITIZER=address SERVER_CFLAGS='-Werror' BUILD_TLS=module
cp src/valkey-server ../binaries/${{ matrix.server_version }}/
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: '3.8'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Update module path
run: echo "MODULE_PATH=$(realpath target/release/libvalkey_bloom.so)" >> $GITHUB_ENV
- name: Run integration tests
run: python -m pytest --cache-clear -v "tests/" -m "not skip_for_asan"
# asan-build:
# runs-on: ubuntu-latest
# strategy:
# fail-fast: false
# matrix:
# server_version: ['unstable', '8.0.0']
# steps:
# - uses: actions/checkout@v4
# - name: Set the server verison for python integeration tests
# run: echo "SERVER_VERSION=${{ matrix.server_version }}" >> $GITHUB_ENV
# - name: Update cargo dependencies
# run: cargo update
# - name: Run cargo and clippy format check
# run: |
# cargo fmt --check
# cargo clippy --profile release --all-targets -- -D clippy::all
# - name: Release Build
# run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
# - name: Run unit tests
# run: cargo test --features enable-system-alloc
# - name: Make Valkey-server binary with asan
# run: |
# mkdir -p "tests/.build/binaries/${{ matrix.server_version }}"
# cd tests/.build
# git clone "${{ env.VALKEY_REPO_URL }}"
# cd valkey
# git checkout ${{ matrix.server_version }}
# make -j SANITIZER=address SERVER_CFLAGS='-Werror' BUILD_TLS=module
# cp src/valkey-server ../binaries/${{ matrix.server_version }}/
# - name: Set up Python
# uses: actions/setup-python@v3
# with:
# python-version: '3.8'
# - name: Install dependencies
# run: |
# python -m pip install --upgrade pip
# pip install -r requirements.txt
# - name: Update module path
# run: echo "MODULE_PATH=$(realpath target/release/libvalkey_bloom.so)" >> $GITHUB_ENV
# - name: Run integration tests
# run: python -m pytest --cache-clear -v "tests/" -m "not skip_for_asan"
32 changes: 16 additions & 16 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::bloom::data_type::BLOOM_FILTER_TYPE;
use crate::bloom::utils;
use crate::bloom::utils::BloomFilterType;
use crate::bloom::utils::BloomObject;
use crate::configs;
use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
Expand All @@ -18,7 +18,7 @@ fn handle_bloom_add(
args: &[ValkeyString],
argc: usize,
item_idx: usize,
bf: &mut BloomFilterType,
bf: &mut BloomObject,
multi: bool,
add_succeeded: &mut bool,
validate_size_limit: bool,
Expand Down Expand Up @@ -170,7 +170,7 @@ pub fn bloom_filter_add_value(
curr_cmd_idx += 1;
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -216,7 +216,7 @@ pub fn bloom_filter_add_value(
true => (None, true),
false => (Some(configs::FIXED_SEED), false),
};
let mut bloom = match BloomFilterType::new_reserved(
let mut bloom = match BloomObject::new_reserved(
fp_rate,
tightening_ratio,
capacity,
Expand Down Expand Up @@ -262,7 +262,7 @@ pub fn bloom_filter_add_value(
}

/// Helper function used to check whether an item (or multiple items) exists on a bloom object.
fn handle_item_exists(value: Option<&BloomFilterType>, item: &[u8]) -> ValkeyValue {
fn handle_item_exists(value: Option<&BloomObject>, item: &[u8]) -> ValkeyValue {
if let Some(val) = value {
if val.item_exists(item) {
return ValkeyValue::Integer(1);
Expand Down Expand Up @@ -290,7 +290,7 @@ pub fn bloom_filter_exists(
curr_cmd_idx += 1;
// Parse the value to be checked whether it exists in the filter
let filter_key = ctx.open_key(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -319,7 +319,7 @@ pub fn bloom_filter_card(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
// Parse the filter name
let filter_name = &input_args[curr_cmd_idx];
let filter_key = ctx.open_key(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -389,7 +389,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
}
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand All @@ -408,7 +408,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Unable to get a lock on tightening ratio static");
let bloom = match BloomFilterType::new_reserved(
let bloom = match BloomObject::new_reserved(
fp_rate,
tightening_ratio,
capacity,
Expand Down Expand Up @@ -498,10 +498,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
if !(num > BLOOM_TIGHTENING_RATIO_MIN
&& num < BLOOM_TIGHTENING_RATIO_MAX) =>
{
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
return Err(ValkeyError::Str(utils::TIGHTENING_RATIO_RANGE));
}
_ => {
return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO));
return Err(ValkeyError::Str(utils::BAD_TIGHTENING_RATIO));
}
};
}
Expand Down Expand Up @@ -571,7 +571,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -606,7 +606,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
if nocreate {
return Err(ValkeyError::Str(utils::NOT_FOUND));
}
let mut bloom = match BloomFilterType::new_reserved(
let mut bloom = match BloomObject::new_reserved(
fp_rate,
tightening_ratio,
capacity,
Expand Down Expand Up @@ -662,7 +662,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
let filter_name = &input_args[curr_cmd_idx];
curr_cmd_idx += 1;
let filter_key = ctx.open_key(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -724,7 +724,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
// find filter
let filter_key = ctx.open_key_writable(filter_name);

let filter = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let filter = match filter_key.get_value::<BloomObject>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
// error
Expand All @@ -740,7 +740,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
// if filter not exists, create it.
let hex = value.to_vec();
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let bloom = match BloomFilterType::decode_bloom_filter(&hex, validate_size_limit) {
let bloom = match BloomObject::decode_object(&hex, validate_size_limit) {
Ok(v) => v,
Err(err) => {
return Err(ValkeyError::Str(err.as_str()));
Expand Down
31 changes: 20 additions & 11 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::bloom::utils::BloomObject;
use crate::configs;
use crate::wrapper::bloom_callback;
use crate::wrapper::digest::Digest;
Expand All @@ -8,10 +8,11 @@ use std::os::raw::c_int;
use valkey_module::native_types::ValkeyType;
use valkey_module::{logging, raw};

/// Used for decoding and encoding `BloomFilterType`. Currently used in AOF Rewrite.
/// This value must increased when `BloomFilterType` struct change.
pub const BLOOM_TYPE_VERSION: u8 = 1;
/// Used for decoding and encoding `BloomObject`. Currently used in AOF Rewrite.
/// This value must increased when `BloomObject` struct change.
pub const BLOOM_OBJECT_VERSION: u8 = 1;

/// Bloom Module data type RDB encoding version.
const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 1;

pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
Expand Down Expand Up @@ -48,13 +49,13 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
);

pub trait ValkeyDataType {
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType>;
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomObject>;
fn debug_digest(&self, dig: Digest);
}

impl ValkeyDataType for BloomFilterType {
impl ValkeyDataType for BloomObject {
/// Callback to load and parse RDB data of a bloom item and create it.
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType> {
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomObject> {
if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION {
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str());
return None;
Expand All @@ -79,7 +80,8 @@ impl ValkeyDataType for BloomFilterType {
// We start off with capacity as 1 to match the same expansion of the vector that would have occurred during bloom
// object creation and scaling as a result of BF.* operations.
let mut filters = Vec::with_capacity(1);

// Calculate the memory usage of the BloomFilter/s by summing up BloomFilter sizes as they are de-serialized.
let mut filters_memory_usage = 0;
for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
Expand All @@ -97,10 +99,17 @@ impl ValkeyDataType for BloomFilterType {
return None;
}
};
if !BloomFilter::validate_size(capacity as i64, new_fp_rate) {
logging::log_warning("Failed to restore bloom object: Contains a filter larger than the max allowed size limit.");
let curr_filter_size = BloomFilter::compute_size(capacity as i64, new_fp_rate);
let curr_object_size = BloomObject::compute_size(filters.capacity())
+ filters_memory_usage
+ curr_filter_size;
if !BloomObject::validate_size(curr_object_size) {
logging::log_warning(
"Failed to restore bloom object: Object larger than the allowed memory limit.",
);
return None;
}
filters_memory_usage += curr_filter_size;
// Only load num_items when it's the last filter
let num_items = if i == num_filters - 1 {
match raw::load_unsigned(rdb) {
Expand All @@ -118,7 +127,7 @@ impl ValkeyDataType for BloomFilterType {
}
filters.push(Box::new(filter));
}
let item = BloomFilterType::from_existing(
let item = BloomObject::from_existing(
expansion as u32,
fp_rate,
tightening_ratio,
Expand Down
Loading

0 comments on commit ee019ed

Please sign in to comment.