From 8630322802de85151e43155d98bf20f15db6ffa0 Mon Sep 17 00:00:00 2001 From: alexanderoberegger Date: Thu, 15 Apr 2021 00:17:24 +0200 Subject: [PATCH 1/3] fix rocksdb use with global tables or tables that use_partitioner to produce to c hangelog topics --- faust/stores/rocksdb.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/faust/stores/rocksdb.py b/faust/stores/rocksdb.py index 9507afc62..00161ea3c 100644 --- a/faust/stores/rocksdb.py +++ b/faust/stores/rocksdb.py @@ -289,7 +289,10 @@ def _open_for_partition(self, partition: int) -> DB: def _get(self, key: bytes) -> Optional[bytes]: event = current_event() - if event is not None: + partition_from_message = (event is not None and + not self.table.is_global and + not self.table.use_partitioner) + if partition_from_message: partition = event.message.partition db = self._db_for_partition(partition) value = db.get(key) From c97e0738b896143c0c3cfa4bf27c8c19867290fa Mon Sep 17 00:00:00 2001 From: alexanderoberegger Date: Fri, 16 Apr 2021 19:16:02 +0200 Subject: [PATCH 2/3] fix linting --- faust/stores/rocksdb.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/faust/stores/rocksdb.py b/faust/stores/rocksdb.py index 00161ea3c..dd4399338 100644 --- a/faust/stores/rocksdb.py +++ b/faust/stores/rocksdb.py @@ -289,9 +289,11 @@ def _open_for_partition(self, partition: int) -> DB: def _get(self, key: bytes) -> Optional[bytes]: event = current_event() - partition_from_message = (event is not None and - not self.table.is_global and - not self.table.use_partitioner) + partition_from_message = ( + event is not None and + not self.table.is_global and + not self.table.use_partitioner + ) if partition_from_message: partition = event.message.partition db = self._db_for_partition(partition) From ab8a4c511e4cf237124ddd953b4e76e5addcfd0a Mon Sep 17 00:00:00 2001 From: alexanderoberegger Date: Fri, 16 Apr 2021 19:18:46 +0200 Subject: [PATCH 3/3] fix linting --- faust/stores/rocksdb.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/faust/stores/rocksdb.py b/faust/stores/rocksdb.py index dd4399338..1c0d64074 100644 --- a/faust/stores/rocksdb.py +++ b/faust/stores/rocksdb.py @@ -290,9 +290,9 @@ def _open_for_partition(self, partition: int) -> DB: def _get(self, key: bytes) -> Optional[bytes]: event = current_event() partition_from_message = ( - event is not None and - not self.table.is_global and - not self.table.use_partitioner + event is not None + and not self.table.is_global + and not self.table.use_partitioner ) if partition_from_message: partition = event.message.partition