Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding performance improvement for getters #87

Merged
merged 5 commits into from
Jan 28, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions faust/stores/rocksdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,24 @@ def _open_for_partition(self, partition: int) -> DB:
return self.rocksdb_options.open(self.partition_path(partition))

def _get(self, key: bytes) -> Optional[bytes]:
dbvalue = self._get_bucket_for_key(key)
if dbvalue is None:
return None
db, value = dbvalue
event = current_event()
if event is not None:
partition = event.message.partition
db = self._db_for_partition(partition)
value = db.get(key)
if value is not None:
self._key_index[key] = partition
return value
else:
dbvalue = self._get_bucket_for_key(key)
if dbvalue is None:
return None
db, value = dbvalue

if value is None:
if db.key_may_exist(key)[0]:
return db.get(key)
return value
if value is None:
if db.key_may_exist(key)[0]:
return db.get(key)
return value

def _get_bucket_for_key(self, key: bytes) -> Optional[_DBValueTuple]:
dbs: Iterable[PartitionDB]
Expand Down Expand Up @@ -374,6 +383,8 @@ async def _try_open_db_for_partition(
return self._db_for_partition(partition)
except rocksdb.errors.RocksIOError as exc:
if i == max_retries - 1 or "lock" not in repr(exc):
# release all the locks and crash
await self.stop()
raise
self.log.info(
"DB for partition %r is locked! Retry in 1s...", partition
Expand All @@ -383,11 +394,21 @@ async def _try_open_db_for_partition(
...

def _contains(self, key: bytes) -> bool:
for db in self._dbs_for_key(key):
# bloom filter: false positives possible, but not false negatives
if db.key_may_exist(key)[0] and db.get(key) is not None:
event = current_event()
if event is not None:
partition = event.message.partition
db = self._db_for_partition(partition)
value = db.get(key)
if value is not None:
return True
return False
else:
return False
else:
for db in self._dbs_for_key(key):
# bloom filter: false positives possible, but not false negatives
if db.key_may_exist(key)[0] and db.get(key) is not None:
return True
return False

def _dbs_for_key(self, key: bytes) -> Iterable[DB]:
# Returns cached db if key is in index, otherwise all dbs
Expand Down