Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete fix rocksdb #144

Merged
merged 4 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion faust/stores/rocksdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,12 @@ async def _try_open_db_for_partition(

def _contains(self, key: bytes) -> bool:
event = current_event()
if event is not None:
partition_from_message = (
event is not None
and not self.table.is_global
and not self.table.use_partitioner
)
if partition_from_message:
partition = event.message.partition
db = self._db_for_partition(partition)
value = db.get(key)
Expand Down
154 changes: 154 additions & 0 deletions tests/unit/stores/test_rocksdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ def test__get__missing(self, *, store):
assert store._get(b"key") is None

def test__get(self, *, store):
# Tests scenario where event is None, (db, value) contains value
db = Mock(name="db")
value = b"foo"
store._get_bucket_for_key = Mock(name="get_bucket_for_key")
Expand All @@ -266,6 +267,76 @@ def test__get__dbvalue_is_None(self, *, store):
db.get.return_value = b"bar"
assert store._get(b"key") == b"bar"

def test__get__has_event(self, *, store, current_event):
partition = 1
message = Mock(name="message")
message.partition.return_value = partition

current_event.return_value = message

db = Mock(name="db")
store._db_for_partition = Mock("_db_for_partition")
store._db_for_partition.return_value = db
db.get.return_value = b"value"
store.table = Mock(name="table")
store.table.is_global = False
store.table.use_partitioner = False

assert store._get(b"key") == b"value"

db.get.return_value = None
assert store._get(b"key2") is None

def test__get__has_event_value_diff_partition(self, *, store, current_event):
partition = 1
message = Mock(name="message")
message.partition.return_value = partition

current_event.return_value = message

dbs = {}
event_partition = current_event.message.partition
next_partition = event_partition + 1
dbs[event_partition] = Mock(name="db")
dbs[next_partition] = Mock(name="db")

dbs[event_partition].get.return_value = None
dbs[next_partition].get.return_value = b"value"

store._db_for_partition = Mock("_db_for_partition")
store._db_for_partition.return_value = dbs[current_event.message.partition]

store._dbs.update(dbs)
store._get_bucket_for_key = Mock(name="get_bucket_for_key")
store._get_bucket_for_key.return_value = (dbs[next_partition], b"value")

store.table = Mock(name="table")
store.table.is_global = False
store.table.use_partitioner = False

# A _get call from a stream, to a non-global, non-partitioner, table
# uses partition of event
# Which in this intentional case, is the wrong partition
assert store._get(b"key") is None

store.table.is_global = True
store.table.use_partitioner = False

# A global table ignores the event partition and pulls from the proper db
assert store._get(b"key") == b"value"

store.table.is_global = False
store.table.use_partitioner = True

# A custom-partitioned table also ignores the event partition
assert store._get(b"key") == b"value"

store.table.is_global = True
store.table.use_partitioner = True

# A global, custom-partitioned table will also ignore the event partition
assert store._get(b"key") == b"value"

def test_get_bucket_for_key__is_in_index(self, *, store):
store._key_index[b"key"] = 30
db = store._dbs[30] = Mock(name="db-p30")
Expand Down Expand Up @@ -415,6 +486,89 @@ def test__contains(self, *, store):
db2.get.return_value = b"value"
assert store._contains(b"key")

def test__contains__has_event(self, *, store, current_event):
# Test "happy-path", call comes in from stream on same partition as key
partition = 1
message = Mock(name="message")
message.partition.return_value = partition

current_event.return_value = message

store.table = Mock(name="table")
store.table.is_global = False
store.table.use_partitioner = False

dbs = {}
event_partition = current_event.message.partition
next_partition = event_partition + 1
dbs[event_partition] = Mock(name="db")
dbs[next_partition] = Mock(name="db")

dbs[event_partition].get.return_value = b"value"
dbs[event_partition].key_may_exist.return_value = (True,)
dbs[next_partition].get.return_value = False
dbs[next_partition].key_may_exist.return_value = (False,)

store._db_for_partition = Mock("_db_for_partition")
store._db_for_partition.return_value = dbs[current_event.message.partition]

# A _get call from a stream, to a non-global, non-partitioner, table
# uses partition of event
# Which in this intentional case, is the wrong partition
assert store._contains(b"key")

def test__contains__has_event_value_diff_partition(self, *, store, current_event):
partition = 1
message = Mock(name="message")
message.partition.return_value = partition

current_event.return_value = message

dbs = {}
event_partition = current_event.message.partition
next_partition = event_partition + 1
dbs[event_partition] = Mock(name="db")
dbs[next_partition] = Mock(name="db")

dbs[event_partition].get.return_value = None
dbs[event_partition].key_may_exist.return_value = False
dbs[next_partition].get.return_value = b"value"
dbs[next_partition].key_may_exist.return_value = (True,)

store._db_for_partition = Mock("_db_for_partition")
store._db_for_partition.return_value = dbs[current_event.message.partition]

store._dbs.update(dbs)
store._dbs_for_key = Mock(name="_dbs_for_key")
store._dbs_for_key.return_value = [dbs[next_partition]]

store.table = Mock(name="table")
store.table.is_global = False
store.table.use_partitioner = False

# A _get call from a stream, to a non-global, non-partitioner, table
# uses partition of event
# Which in this intentional case, is the wrong partition
assert not store._contains(b"key")

store.table.is_global = True
store.table.use_partitioner = False

# A global table ignores the event partition and pulls from the proper db
assert store._contains(b"key")

store.table.is_global = False
store.table.use_partitioner = True

# A custom-partitioned table also ignores the event partition
assert store._contains(b"key")

store.table.is_global = True
store.table.use_partitioner = True

# A global, custom-partitioned table will also ignore the event partition
assert store._contains(b"key")

def test__dbs_for_key(self, *, store):
dbs = store._dbs = {
1: self.new_db("db1"),
Expand Down