From f3db49387b9929251eff5f6f8dd45857fd120c19 Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Tue, 26 Jan 2021 10:06:25 -0500 Subject: [PATCH 1/5] adding performance improvement for getters --- faust/stores/rocksdb.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/faust/stores/rocksdb.py b/faust/stores/rocksdb.py index 9d62711c6..fa913dd50 100644 --- a/faust/stores/rocksdb.py +++ b/faust/stores/rocksdb.py @@ -283,15 +283,24 @@ def _open_for_partition(self, partition: int) -> DB: return self.rocksdb_options.open(self.partition_path(partition)) def _get(self, key: bytes) -> Optional[bytes]: - dbvalue = self._get_bucket_for_key(key) - if dbvalue is None: - return None - db, value = dbvalue - - if value is None: - if db.key_may_exist(key)[0]: - return db.get(key) - return value + event = current_event() + if event is not None: + partition = event.message.partition + db = self._db_for_partition(partition) + value = db.get(key) + if value is not None : + self._key_index[key] = partition + return value + else: + dbvalue = self._get_bucket_for_key(key) + if dbvalue is None: + return None + db, value = dbvalue + + if value is None: + if db.key_may_exist(key)[0]: + return db.get(key) + return value def _get_bucket_for_key(self, key: bytes) -> Optional[_DBValueTuple]: dbs: Iterable[PartitionDB] From 9c3ac926fb0979f7a9dfb58b48bdb07900af3910 Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Tue, 26 Jan 2021 14:17:23 -0500 Subject: [PATCH 2/5] fix linting error --- faust/__init__.py | 2 +- faust/stores/rocksdb.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/faust/__init__.py b/faust/__init__.py index 0a8f2c465..83f965928 100644 --- a/faust/__init__.py +++ b/faust/__init__.py @@ -23,7 +23,7 @@ import typing from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple -__version__ = "0.4.4" +__version__ = "0.4.4rc1" __author__ = "Robinhood Markets, Inc." __contact__ = "schrohm@gmail.com, vpatki@wayfair.com" __homepage__ = "https://github.com/faust-streaming/faust" diff --git a/faust/stores/rocksdb.py b/faust/stores/rocksdb.py index fa913dd50..e7f034535 100644 --- a/faust/stores/rocksdb.py +++ b/faust/stores/rocksdb.py @@ -288,7 +288,7 @@ def _get(self, key: bytes) -> Optional[bytes]: partition = event.message.partition db = self._db_for_partition(partition) value = db.get(key) - if value is not None : + if value is not None: self._key_index[key] = partition return value else: From 1ad2bc7e36fd5d132282d9bf895baa8b1bc9c799 Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Tue, 26 Jan 2021 18:13:48 -0500 Subject: [PATCH 3/5] using event context in _contains --- faust/__init__.py | 2 +- faust/stores/rocksdb.py | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/faust/__init__.py b/faust/__init__.py index 83f965928..5cfbf5da7 100644 --- a/faust/__init__.py +++ b/faust/__init__.py @@ -23,7 +23,7 @@ import typing from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple -__version__ = "0.4.4rc1" +__version__ = "0.4.4rc2" __author__ = "Robinhood Markets, Inc." __contact__ = "schrohm@gmail.com, vpatki@wayfair.com" __homepage__ = "https://github.com/faust-streaming/faust" diff --git a/faust/stores/rocksdb.py b/faust/stores/rocksdb.py index e7f034535..8d70d07f4 100644 --- a/faust/stores/rocksdb.py +++ b/faust/stores/rocksdb.py @@ -392,11 +392,21 @@ async def _try_open_db_for_partition( ... def _contains(self, key: bytes) -> bool: - for db in self._dbs_for_key(key): - # bloom filter: false positives possible, but not false negatives - if db.key_may_exist(key)[0] and db.get(key) is not None: + event = current_event() + if event is not None: + partition = event.message.partition + db = self._db_for_partition(partition) + value = db.get(key) + if value is not None: return True - return False + else: + return False + else: + for db in self._dbs_for_key(key): + # bloom filter: false positives possible, but not false negatives + if db.key_may_exist(key)[0] and db.get(key) is not None: + return True + return False def _dbs_for_key(self, key: bytes) -> Iterable[DB]: # Returns cached db if key is in index, otherwise all dbs From 2ed766f89eb1f8b324a5a9ee441e0cb548b4e96a Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Wed, 27 Jan 2021 13:19:08 -0500 Subject: [PATCH 4/5] closing all rocksdb files --- faust/__init__.py | 2 +- faust/stores/rocksdb.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/faust/__init__.py b/faust/__init__.py index 5cfbf5da7..9388a9b34 100644 --- a/faust/__init__.py +++ b/faust/__init__.py @@ -23,7 +23,7 @@ import typing from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple -__version__ = "0.4.4rc2" +__version__ = "0.4.4rc3" __author__ = "Robinhood Markets, Inc." __contact__ = "schrohm@gmail.com, vpatki@wayfair.com" __homepage__ = "https://github.com/faust-streaming/faust" diff --git a/faust/stores/rocksdb.py b/faust/stores/rocksdb.py index 8d70d07f4..5182b9592 100644 --- a/faust/stores/rocksdb.py +++ b/faust/stores/rocksdb.py @@ -383,6 +383,8 @@ async def _try_open_db_for_partition( return self._db_for_partition(partition) except rocksdb.errors.RocksIOError as exc: if i == max_retries - 1 or "lock" not in repr(exc): + # release all the locks and crash + await self.stop() raise self.log.info( "DB for partition %r is locked! Retry in 1s...", partition From 139131a190a108b7a8f7f57c3f4a00d9bb181564 Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Thu, 28 Jan 2021 08:31:39 -0500 Subject: [PATCH 5/5] closing all rocksdb files --- faust/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faust/__init__.py b/faust/__init__.py index 9388a9b34..0a8f2c465 100644 --- a/faust/__init__.py +++ b/faust/__init__.py @@ -23,7 +23,7 @@ import typing from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple -__version__ = "0.4.4rc3" +__version__ = "0.4.4" __author__ = "Robinhood Markets, Inc." __contact__ = "schrohm@gmail.com, vpatki@wayfair.com" __homepage__ = "https://github.com/faust-streaming/faust"