From 04a749556a162d2070fcb890c945f3f00c762ea0 Mon Sep 17 00:00:00 2001 From: Raj Shah Date: Fri, 4 Dec 2020 00:33:14 -0800 Subject: [PATCH 1/2] Improve logging for distributed algorithms Wrap each `future.result()` call in a `try`/`except`. Then in the `except` block, log the exception but don't re-raise it. This keeps our distributed algorithms robust, but gives us exception logging. --- pottery/nextid.py | 22 ++++++++++++---------- pottery/redlock.py | 25 +++++++++++++++---------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/pottery/nextid.py b/pottery/nextid.py index 43ceb737..b222385d 100644 --- a/pottery/nextid.py +++ b/pottery/nextid.py @@ -24,8 +24,6 @@ from redis import Redis from redis.client import Script -from redis.exceptions import ConnectionError -from redis.exceptions import TimeoutError from typing_extensions import Final from .base import Primitive @@ -77,7 +75,8 @@ class NextId(Primitive): _set_id_script: ClassVar[Optional[Script]] = None def __init__(self, - *, key: str = KEY, + *, + key: str = KEY, num_tries: int = NUM_TRIES, masters: Iterable[Redis] = frozenset(), ) -> None: @@ -132,18 +131,19 @@ def __repr__(self) -> str: @property def __current_id(self) -> int: - futures, current_id, num_masters_gotten = set(), 0, 0 + futures, current_ids = set(), [] with concurrent.futures.ThreadPoolExecutor() as executor: for master in self.masters: futures.add(executor.submit(master.get, self.key)) for future in concurrent.futures.as_completed(futures): - with contextlib.suppress(TimeoutError, ConnectionError): - current_id = max(current_id, int(future.result())) - num_masters_gotten += 1 - if num_masters_gotten < len(self.masters) // 2 + 1: + try: + current_ids.append(int(future.result())) + except Exception as error: + _logger.error(error, exc_info=True) + if len(current_ids) < len(self.masters) // 2 + 1: raise QuorumNotAchieved(self.masters, self.key) else: - return current_id + return max(current_ids) @__current_id.setter def __current_id(self, value: int) -> None: @@ -158,8 +158,10 @@ def __current_id(self, value: int) -> None: ) futures.add(future) for future in concurrent.futures.as_completed(futures): - with contextlib.suppress(TimeoutError, ConnectionError): + try: num_masters_set += future.result() == value + except Exception as error: + _logger.error(error, exc_info=True) if num_masters_set < len(self.masters) // 2 + 1: raise QuorumNotAchieved(self.masters, self.key) diff --git a/pottery/redlock.py b/pottery/redlock.py index f506fdc3..b3dc340f 100644 --- a/pottery/redlock.py +++ b/pottery/redlock.py @@ -37,8 +37,6 @@ from redis import Redis from redis.client import Script -from redis.exceptions import ConnectionError -from redis.exceptions import TimeoutError from typing_extensions import Final from .base import Primitive @@ -254,8 +252,10 @@ def __acquire_masters(self) -> bool: for master in self.masters: futures.add(executor.submit(self.__acquire_master, master)) for future in concurrent.futures.as_completed(futures): - with contextlib.suppress(TimeoutError, ConnectionError): + try: num_masters_acquired += future.result() + except Exception as error: # pragma: no cover + _logger.error(error, exc_info=True) quorum = num_masters_acquired >= len(self.masters) // 2 + 1 elapsed = timer.elapsed() - self.__drift() validity_time = self.auto_release_time - elapsed @@ -351,16 +351,17 @@ def locked(self) -> int: >>> printer_lock_1.release() ''' - futures, num_masters_acquired, ttls = set(), 0, [] + futures, ttls = set(), [] with ContextTimer() as timer, \ concurrent.futures.ThreadPoolExecutor() as executor: for master in self.masters: futures.add(executor.submit(self.__acquired_master, master)) for future in concurrent.futures.as_completed(futures): - with contextlib.suppress(TimeoutError, ConnectionError): - ttl = future.result() - num_masters_acquired += ttl > 0 - ttls.append(ttl) + try: + ttls.append(future.result()) + except Exception as error: # pragma: no cover + _logger.error(error, exc_info=True) + num_masters_acquired = sum(1 for ttl in ttls if ttl > 0) quorum = num_masters_acquired >= len(self.masters) // 2 + 1 if quorum: ttls = sorted(ttls, reverse=True) @@ -398,8 +399,10 @@ def extend(self) -> None: for master in self.masters: futures.add(executor.submit(self.__extend_master, master)) for future in concurrent.futures.as_completed(futures): - with contextlib.suppress(TimeoutError, ConnectionError): + try: num_masters_extended += future.result() + except Exception as error: # pragma: no cover + _logger.error(error, exc_info=True) quorum = num_masters_extended >= len(self.masters) // 2 + 1 self._extension_num += quorum if not quorum: @@ -426,8 +429,10 @@ def release(self) -> None: for master in self.masters: futures.add(executor.submit(self.__release_master, master)) for future in concurrent.futures.as_completed(futures): - with contextlib.suppress(TimeoutError, ConnectionError): + try: num_masters_released += future.result() + except Exception as error: # pragma: no cover + _logger.error(error, exc_info=True) quorum = num_masters_released >= len(self.masters) // 2 + 1 if not quorum: raise ReleaseUnlockedLock(self.masters, self.key) From 647fe462a9e7ef142d5645c5ba2116736914b317 Mon Sep 17 00:00:00 2001 From: Raj Shah Date: Fri, 4 Dec 2020 00:41:01 -0800 Subject: [PATCH 2/2] Use well named variable to make code more readable --- pottery/nextid.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pottery/nextid.py b/pottery/nextid.py index b222385d..ef94c2a2 100644 --- a/pottery/nextid.py +++ b/pottery/nextid.py @@ -140,7 +140,8 @@ def __current_id(self) -> int: current_ids.append(int(future.result())) except Exception as error: _logger.error(error, exc_info=True) - if len(current_ids) < len(self.masters) // 2 + 1: + num_masters_gotten = len(current_ids) + if num_masters_gotten < len(self.masters) // 2 + 1: raise QuorumNotAchieved(self.masters, self.key) else: return max(current_ids)