From ccb46c04e77185be539e4f4d63630a89afae2534 Mon Sep 17 00:00:00 2001 From: Greg Trent Date: Mon, 3 May 2021 13:16:54 -0600 Subject: [PATCH 1/2] fix _contains in rocksdb for global tables --- faust/stores/rocksdb.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/faust/stores/rocksdb.py b/faust/stores/rocksdb.py index 1c0d64074..94440ad75 100644 --- a/faust/stores/rocksdb.py +++ b/faust/stores/rocksdb.py @@ -433,7 +433,12 @@ async def _try_open_db_for_partition( def _contains(self, key: bytes) -> bool: event = current_event() - if event is not None: + partition_from_message = ( + event is not None + and not self.table.is_global + and not self.table.use_partitioner + ) + if partition_from_message: partition = event.message.partition db = self._db_for_partition(partition) value = db.get(key) From 27dc3c161ab98a0dd612e1659b3be2e63c00f7a8 Mon Sep 17 00:00:00 2001 From: Greg Trent Date: Mon, 3 May 2021 23:35:03 -0600 Subject: [PATCH 2/2] Add tests covering global tables in rocksdb --- tests/unit/stores/test_rocksdb.py | 154 ++++++++++++++++++++++++++++++ 1 file changed, 154 insertions(+) diff --git a/tests/unit/stores/test_rocksdb.py b/tests/unit/stores/test_rocksdb.py index 8041922e6..bde79685b 100644 --- a/tests/unit/stores/test_rocksdb.py +++ b/tests/unit/stores/test_rocksdb.py @@ -244,6 +244,7 @@ def test__get__missing(self, *, store): assert store._get(b"key") is None def test__get(self, *, store): + # Tests scenario where event is None, (db, value) contains value db = Mock(name="db") value = b"foo" store._get_bucket_for_key = Mock(name="get_bucket_for_key") @@ -266,6 +267,76 @@ def test__get__dbvalue_is_None(self, *, store): db.get.return_value = b"bar" assert store._get(b"key") == b"bar" + def test__get__has_event(self, *, store, current_event): + partition = 1 + message = Mock(name="message") + message.partition.return_value = partition + + current_event.return_value = message + + db = Mock(name="db") + store._db_for_partition = Mock("_db_for_partition") + store._db_for_partition.return_value = db + db.get.return_value = b"value" + store.table = Mock(name="table") + store.table.is_global = False + store.table.use_partitioner = False + + assert store._get(b"key") == b"value" + + db.get.return_value = None + assert store._get(b"key2") is None + + def test__get__has_event_value_diff_partition(self, *, store, current_event): + partition = 1 + message = Mock(name="message") + message.partition.return_value = partition + + current_event.return_value = message + + dbs = {} + event_partition = current_event.message.partition + next_partition = event_partition + 1 + dbs[event_partition] = Mock(name="db") + dbs[next_partition] = Mock(name="db") + + dbs[event_partition].get.return_value = None + dbs[next_partition].get.return_value = b"value" + + store._db_for_partition = Mock("_db_for_partition") + store._db_for_partition.return_value = dbs[current_event.message.partition] + + store._dbs.update(dbs) + store._get_bucket_for_key = Mock(name="get_bucket_for_key") + store._get_bucket_for_key.return_value = (dbs[next_partition], b"value") + + store.table = Mock(name="table") + store.table.is_global = False + store.table.use_partitioner = False + + # A _get call from a stream, to a non-global, non-partitioner, table + # uses partition of event + # Which in this intentional case, is the wrong partition + assert store._get(b"key") is None + + store.table.is_global = True + store.table.use_partitioner = False + + # A global table ignores the event partition and pulls from the proper db + assert store._get(b"key") == b"value" + + store.table.is_global = False + store.table.use_partitioner = True + + # A custom-partitioned table also ignores the event partition + assert store._get(b"key") == b"value" + + store.table.is_global = True + store.table.use_partitioner = True + + # A global, custom-partitioned table will also ignore the event partition + assert store._get(b"key") == b"value" + def test_get_bucket_for_key__is_in_index(self, *, store): store._key_index[b"key"] = 30 db = store._dbs[30] = Mock(name="db-p30") @@ -415,6 +486,89 @@ def test__contains(self, *, store): db2.get.return_value = b"value" assert store._contains(b"key") + def test__contains__has_event(self, *, store, current_event): + # Test "happy-path", call comes in from stream on same partition as key + partition = 1 + message = Mock(name="message") + message.partition.return_value = partition + + current_event.return_value = message + + store.table = Mock(name="table") + store.table.is_global = False + store.table.use_partitioner = False + + dbs = {} + event_partition = current_event.message.partition + next_partition = event_partition + 1 + dbs[event_partition] = Mock(name="db") + dbs[next_partition] = Mock(name="db") + + dbs[event_partition].get.return_value = b"value" + dbs[event_partition].key_may_exist.return_value = (True,) + dbs[next_partition].get.return_value = False + dbs[next_partition].key_may_exist.return_value = (False,) + + store._db_for_partition = Mock("_db_for_partition") + store._db_for_partition.return_value = dbs[current_event.message.partition] + + # A _get call from a stream, to a non-global, non-partitioner, table + # uses partition of event + # Which in this intentional case, is the wrong partition + assert store._contains(b"key") + + def test__contains__has_event_value_diff_partition(self, *, store, current_event): + partition = 1 + message = Mock(name="message") + message.partition.return_value = partition + + current_event.return_value = message + + dbs = {} + event_partition = current_event.message.partition + next_partition = event_partition + 1 + dbs[event_partition] = Mock(name="db") + dbs[next_partition] = Mock(name="db") + + dbs[event_partition].get.return_value = None + dbs[event_partition].key_may_exist.return_value = False + dbs[next_partition].get.return_value = b"value" + dbs[next_partition].key_may_exist.return_value = (True,) + + store._db_for_partition = Mock("_db_for_partition") + store._db_for_partition.return_value = dbs[current_event.message.partition] + + store._dbs.update(dbs) + store._dbs_for_key = Mock(name="_dbs_for_key") + store._dbs_for_key.return_value = [dbs[next_partition]] + + store.table = Mock(name="table") + store.table.is_global = False + store.table.use_partitioner = False + + # A _get call from a stream, to a non-global, non-partitioner, table + # uses partition of event + # Which in this intentional case, is the wrong partition + assert not store._contains(b"key") + + store.table.is_global = True + store.table.use_partitioner = False + + # A global table ignores the event partition and pulls from the proper db + assert store._contains(b"key") + + store.table.is_global = False + store.table.use_partitioner = True + + # A custom-partitioned table also ignores the event partition + assert store._contains(b"key") + + store.table.is_global = True + store.table.use_partitioner = True + + # A global, custom-partitioned table will also ignore the event partition + assert store._contains(b"key") + def test__dbs_for_key(self, *, store): dbs = store._dbs = { 1: self.new_db("db1"),