Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Karthik Subbarao <karthikrs2021@gmail.com>
  • Loading branch information
KarthikSubbarao committed Oct 15, 2024
1 parent 6377437 commit 83c191d
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 143 deletions.
54 changes: 27 additions & 27 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_bloom_obj_access(self):
assert client.execute_command('BF.ADD key2 val2') == 1
assert client.execute_command('TOUCH key1 key2') == 2
assert client.execute_command('TOUCH key3') == 0
self.verify_key_number(client, 2)
self.verify_server_key_count(client, 2)
assert client.execute_command('DBSIZE') == 2
random_key = client.execute_command('RANDOMKEY')
assert random_key == b"key1" or random_key == b"key2"
Expand All @@ -84,9 +84,9 @@ def test_bloom_transaction(self):
assert client.execute_command('DEL M1') == b'QUEUED'
assert client.execute_command('BF.EXISTS M1 V1') == b'QUEUED'
assert client.execute_command('EXEC') == [1, 1, 1, 1, 0]
self.verify_bloom_filter_existence(client, 'M2', 'V2')
self.verify_bloom_filter_existence(client, 'M1', 'V1', should_exist=False)
self.verify_key_number(client, 1)
self.verify_bloom_filter_item_existence(client, 'M2', 'V2')
self.verify_bloom_filter_item_existence(client, 'M1', 'V1', should_exist=False)
self.verify_server_key_count(client, 1)

def test_bloom_lua(self):
client = self.server.get_new_client()
Expand All @@ -98,62 +98,62 @@ def test_bloom_lua(self):
"""
client.eval(load_filter, 0)
assert client.execute_command('BF.MEXISTS LUA2 ITEM1 ITEM3 ITEM4') == [0, 1, 1]
self.verify_key_number(client, 2)
self.verify_server_key_count(client, 2)

def test_bloom_deletes(self):
client = self.server.get_new_client()
# delete
assert client.execute_command('BF.ADD filter1 item1') == 1
self.verify_bloom_filter_existence(client, 'filter1', 'item1')
self.verify_key_number(client, 1)
self.verify_bloom_filter_item_existence(client, 'filter1', 'item1')
self.verify_server_key_count(client, 1)
assert client.execute_command('DEL filter1') == 1
self.verify_bloom_filter_existence(client, 'filter1', 'item1', should_exist=False)
self.verify_key_number(client, 0)
self.verify_bloom_filter_item_existence(client, 'filter1', 'item1', should_exist=False)
self.verify_server_key_count(client, 0)

# flush
self.insert_bloom_filter(client, number_of_bf=10)
self.verify_key_number(client, 10)
self.create_bloom_filters_and_add_items(client, number_of_bf=10)
self.verify_server_key_count(client, 10)
assert client.execute_command('FLUSHALL')
self.verify_key_number(client, 0)
self.verify_server_key_count(client, 0)

# unlink
assert client.execute_command('BF.ADD A ITEMA') == 1
assert client.execute_command('BF.ADD B ITEMB') == 1
self.verify_bloom_filter_existence(client, 'A', 'ITEMA')
self.verify_bloom_filter_existence(client, 'B', 'ITEMB')
self.verify_bloom_filter_existence(client, 'C', 'ITEMC', should_exist=False)
self.verify_key_number(client, 2)
self.verify_bloom_filter_item_existence(client, 'A', 'ITEMA')
self.verify_bloom_filter_item_existence(client, 'B', 'ITEMB')
self.verify_bloom_filter_item_existence(client, 'C', 'ITEMC', should_exist=False)
self.verify_server_key_count(client, 2)
assert client.execute_command('UNLINK A B C') == 2
assert client.execute_command('BF.MEXISTS A ITEMA ITEMB') == [0, 0]
self.verify_bloom_filter_existence(client, 'A', 'ITEMA', should_exist=False)
self.verify_bloom_filter_existence(client, 'B', 'ITEMB', should_exist=False)
self.verify_key_number(client, 0)
self.verify_bloom_filter_item_existence(client, 'A', 'ITEMA', should_exist=False)
self.verify_bloom_filter_item_existence(client, 'B', 'ITEMB', should_exist=False)
self.verify_server_key_count(client, 0)

def test_bloom_expiration(self):
client = self.server.get_new_client()
# expiration
# cmd object idletime
self.verify_key_number(client, 0)
self.verify_server_key_count(client, 0)
assert client.execute_command('BF.ADD TEST_IDLE val3') == 1
self.verify_bloom_filter_existence(client, 'TEST_IDLE', 'val3')
self.verify_key_number(client, 1)
self.verify_bloom_filter_item_existence(client, 'TEST_IDLE', 'val3')
self.verify_server_key_count(client, 1)
time.sleep(1)
assert client.execute_command('OBJECT IDLETIME test_idle') == None
assert client.execute_command('OBJECT IDLETIME TEST_IDLE') > 0
# cmd ttl, expireat
assert client.execute_command('BF.ADD TEST_EXP ITEM') == 1
assert client.execute_command('TTL TEST_EXP') == -1
self.verify_bloom_filter_existence(client, 'TEST_EXP', 'ITEM')
self.verify_key_number(client, 2)
self.verify_bloom_filter_item_existence(client, 'TEST_EXP', 'ITEM')
self.verify_server_key_count(client, 2)
curr_time = int(time.time())
assert client.execute_command(f'EXPIREAT TEST_EXP {curr_time + 5}') == 1
wait_for_equal(lambda: client.execute_command('BF.EXISTS TEST_EXP ITEM'), 0)
self.verify_key_number(client, 1)
self.verify_server_key_count(client, 1)
# cmd persist
assert client.execute_command('BF.ADD TEST_PERSIST ITEM') == 1
assert client.execute_command('TTL TEST_PERSIST') == -1
self.verify_bloom_filter_existence(client, 'TEST_PERSIST', 'ITEM')
self.verify_key_number(client, 2)
self.verify_bloom_filter_item_existence(client, 'TEST_PERSIST', 'ITEM')
self.verify_server_key_count(client, 2)
assert client.execute_command(f'EXPIREAT TEST_PERSIST {curr_time + 100000}') == 1
assert client.execute_command('TTL TEST_PERSIST') > 0
assert client.execute_command('PERSIST TEST_PERSIST') == 1
Expand Down
130 changes: 18 additions & 112 deletions tests/test_correctness.py
Original file line number Diff line number Diff line change
@@ -1,113 +1,19 @@
import time
import pytest
from valkey import ResponseError
from valkeytests.valkey_test_case import ValkeyTestCase
from valkeytests.conftest import resource_port_tracker
import logging
import os
import random
import string
from valkey_bloom_test_case import ValkeyBloomTestCaseBase

def generate_random_string(length=7):
characters = string.ascii_letters + string.digits
random_string = ''.join(random.choice(characters) for _ in range(length))
return random_string

def add_items_till_capacity(client, filter_name, capacity_needed, starting_item_idx, rand_prefix, batch_size=1000):
new_item_idx = starting_item_idx
fp_count = 0
cardinality = client.execute_command(f'BF.CARD {filter_name}')
while cardinality < capacity_needed:
# Calculate how many more items we need to add.
remaining_capacity = capacity_needed - cardinality
batch_to_add = min(batch_size, remaining_capacity)
# Prepare a batch of items
items = [f"{rand_prefix}{new_item_idx + i}" for i in range(batch_to_add)]
new_item_idx += batch_to_add
result = client.execute_command(f'BF.MADD {filter_name} ' + ' '.join(items))
# Process results
for res in result:
if res == 0:
fp_count += 1
elif res == 1:
cardinality += 1
else:
raise RuntimeError(f"Unexpected return value from add_item: {res}")
return fp_count, new_item_idx - 1

def check_items_exist(client, filter_name, start_idx, end_idx, expected_result, rand_prefix, batch_size=1000):
error_count = 0
num_operations = (end_idx - start_idx) + 1
# Check that items exist in batches.
for batch_start in range(start_idx, end_idx + 1, batch_size):
batch_end = min(batch_start + batch_size - 1, end_idx)
# Execute BF.MEXISTS with the batch of items
items = [f"{rand_prefix}{i}" for i in range(batch_start, batch_end + 1)]
result = client.execute_command(f'BF.MEXISTS {filter_name} ' + ' '.join(items))
# Check the results
for item_result in result:
if item_result != expected_result:
error_count += 1
return error_count, num_operations

def fp_assert(error_count, num_operations, expected_fp_rate, fp_margin):
real_fp_rate = error_count / num_operations
fp_rate_with_margin = expected_fp_rate + fp_margin

assert real_fp_rate < fp_rate_with_margin, f"The actual fp_rate, {real_fp_rate}, is greater than the configured fp_rate with margin. {fp_rate_with_margin}."

def validate_copied_bloom_correctness(client, original_filter_name, item_prefix, add_operation_idx, expected_fp_rate, fp_margin, original_info_dict):
copy_filter_name = "filter_copy"
assert client.execute_command(f'COPY {original_filter_name} {copy_filter_name}') == 1
assert client.execute_command('DBSIZE') == 2
copy_info = client.execute_command(f'BF.INFO {copy_filter_name}')
copy_it = iter(copy_info)
copy_info_dict = dict(zip(copy_it, copy_it))
assert copy_info_dict[b'Capacity'] == original_info_dict[b'Capacity']
assert copy_info_dict[b'Number of items inserted'] == original_info_dict[b'Number of items inserted']
assert copy_info_dict[b'Number of filters'] == original_info_dict[b'Number of filters']
assert copy_info_dict[b'Size'] == original_info_dict[b'Size']
assert copy_info_dict[b'Expansion rate'] == original_info_dict[b'Expansion rate']
# Items added to the original filter should still exist on the copy. False Negatives are not possible.
error_count, num_operations = check_items_exist(
client,
copy_filter_name,
1,
add_operation_idx,
True,
item_prefix,
)
assert error_count == 0
# Items not added to the original filter should not exist on the copy. False Positives should be close to configured fp_rate.
error_count, num_operations = check_items_exist(
client,
copy_filter_name,
add_operation_idx + 1,
add_operation_idx * 2,
False,
item_prefix,
)
fp_assert(error_count, num_operations, expected_fp_rate, fp_margin)

class TestBloomCorrectness(ValkeyTestCase):

def get_custom_args(self):
self.set_server_version(os.environ['SERVER_VERSION'])
return {
'loadmodule': os.getenv('MODULE_PATH'),
}
class TestBloomCorrectness(ValkeyBloomTestCaseBase):

def test_non_scaling_filter(self):
client = self.server.get_new_client()
item_prefix = generate_random_string()
item_prefix = self.generate_random_string()
# 1 in every 1000 operations is expected to be a false positive.
expected_fp_rate = 0.001
capacity = 10000
# Create a non scaling bloom filter and validate its behavior.
filter_name = "filter1"
assert client.execute_command(f'BF.RESERVE {filter_name} {expected_fp_rate} {capacity} NONSCALING') == b"OK"
# Add items and fill the filter to capacity.
error_count, add_operation_idx = add_items_till_capacity(client, filter_name, capacity, 1, item_prefix)
error_count, add_operation_idx = self.add_items_till_capacity(client, filter_name, capacity, 1, item_prefix)
with pytest.raises(Exception, match="non scaling filter is full"):
client.execute_command(f'BF.ADD {filter_name} new_item')
# Validate that is is filled.
Expand All @@ -123,11 +29,11 @@ def test_non_scaling_filter(self):
fp_margin = 0.002
# Validate that item "add" operations on bloom filters are ensuring correctness.
# False positives should be close to the configured fp_rate.
fp_assert(error_count, add_operation_idx, expected_fp_rate, fp_margin)
self.fp_assert(error_count, add_operation_idx, expected_fp_rate, fp_margin)
# Validate item "exists" operations on bloom filters are ensuring correctness.
# This tests for items already added to the filter and expects them to exist.
# False negatives should not be possible.
error_count, num_operations = check_items_exist(
error_count, num_operations = self.check_items_exist(
client,
filter_name,
1,
Expand All @@ -138,21 +44,21 @@ def test_non_scaling_filter(self):
assert error_count == 0
# This tests for items which are not added to the filter and expects them to not exist.
# False positives should be close to the configured fp_rate.
error_count, num_operations = check_items_exist(
error_count, num_operations = self.check_items_exist(
client,
filter_name,
add_operation_idx + 1,
add_operation_idx * 2,
False,
item_prefix,
)
fp_assert(error_count, num_operations, expected_fp_rate, fp_margin)
# Create a copy of the non scaling bloom filter.
validate_copied_bloom_correctness(client, filter_name, item_prefix, add_operation_idx, expected_fp_rate, fp_margin, info_dict)
self.fp_assert(error_count, num_operations, expected_fp_rate, fp_margin)
# Validate correctness on a copy of a non scaling bloom filter.
self.validate_copied_bloom_correctness(client, filter_name, item_prefix, add_operation_idx, expected_fp_rate, fp_margin, info_dict)

def test_scaling_filter(self):
client = self.server.get_new_client()
item_prefix = generate_random_string()
item_prefix = self.generate_random_string()
expected_fp_rate = 0.001
initial_capacity = 10000
expansion = 2
Expand All @@ -175,7 +81,7 @@ def test_scaling_filter(self):
add_operation_idx = 0
for filter_idx in range(1, num_filters_to_scale + 1):
expected_total_capacity = initial_capacity * ((expansion ** filter_idx) - 1)
error_count, new_add_operation_idx = add_items_till_capacity(client, filter_name, expected_total_capacity, add_operation_idx + 1, item_prefix)
error_count, new_add_operation_idx = self.add_items_till_capacity(client, filter_name, expected_total_capacity, add_operation_idx + 1, item_prefix)
add_operation_idx = new_add_operation_idx
total_error_count += error_count
# Validate from BF.INFO that is filter is scaling correctly.
Expand All @@ -192,11 +98,11 @@ def test_scaling_filter(self):
fp_margin = 0.002
# Validate that item "add" operations on bloom filters are ensuring correctness.
# False positives should be close to the configured fp_rate.
fp_assert(total_error_count, add_operation_idx, expected_fp_rate, fp_margin)
self.fp_assert(total_error_count, add_operation_idx, expected_fp_rate, fp_margin)
# Validate item "exists" operations on bloom filters are ensuring correctness.
# This tests for items already added to the filter and expects them to exist.
# False negatives should not be possible.
error_count, num_operations = check_items_exist(
error_count, num_operations = self.check_items_exist(
client,
filter_name,
1,
Expand All @@ -207,18 +113,18 @@ def test_scaling_filter(self):
assert error_count == 0
# This tests for items which are not added to the filter and expects them to not exist.
# False positives should be close to the configured fp_rate.
error_count, num_operations = check_items_exist(
error_count, num_operations = self.check_items_exist(
client,
filter_name,
add_operation_idx + 1,
add_operation_idx * 2,
False,
item_prefix,
)
fp_assert(error_count, num_operations, expected_fp_rate, fp_margin)

self.fp_assert(error_count, num_operations, expected_fp_rate, fp_margin)
# Track INFO on the scaled out bloom filter.
info = client.execute_command(f'BF.INFO {filter_name}')
it = iter(info)
info_dict = dict(zip(it, it))
validate_copied_bloom_correctness(client, filter_name, item_prefix, add_operation_idx, expected_fp_rate, fp_margin, info_dict)
# Validate correctness on a copy of a scaling bloom filter.
self.validate_copied_bloom_correctness(client, filter_name, item_prefix, add_operation_idx, expected_fp_rate, fp_margin, info_dict)
Loading

0 comments on commit 83c191d

Please sign in to comment.