From 1bf7f199061b84ba9e32989cf9b03b395e30d331 Mon Sep 17 00:00:00 2001 From: Raj Shah Date: Sat, 5 Dec 2020 09:10:08 -0800 Subject: [PATCH] Adapt interface I don't like to one I do like https://youtu.be/wf-BqAjZb8M --- pottery/executor.py | 15 +++++++++++ pottery/nextid.py | 41 +++++++++++++++--------------- pottery/redlock.py | 61 ++++++++++++++++++++++----------------------- 3 files changed, 66 insertions(+), 51 deletions(-) create mode 100644 pottery/executor.py diff --git a/pottery/executor.py b/pottery/executor.py new file mode 100644 index 00000000..f1a0bb0d --- /dev/null +++ b/pottery/executor.py @@ -0,0 +1,15 @@ +# --------------------------------------------------------------------------- # +# executor.py # +# # +# Copyright © 2015-2020, Rajiv Bakulesh Shah, original author. # +# All rights reserved. # +# --------------------------------------------------------------------------- # + + +import concurrent.futures + + +class BailOutExecutor(concurrent.futures.ThreadPoolExecutor): + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown(wait=False) + return False diff --git a/pottery/nextid.py b/pottery/nextid.py index 0f42a136..81e4f62e 100644 --- a/pottery/nextid.py +++ b/pottery/nextid.py @@ -29,6 +29,7 @@ from .base import Primitive from .exceptions import QuorumNotAchieved +from .executor import BailOutExecutor _logger: Final[logging.Logger] = logging.getLogger('pottery') @@ -149,26 +150,26 @@ def __current_id(self) -> int: @__current_id.setter def __current_id(self, value: int) -> None: - executor = concurrent.futures.ThreadPoolExecutor() - futures, num_masters_set, quorum = set(), 0, False - for master in self.masters: - future = executor.submit( - cast(Script, self._set_id_script), - keys=(self.key,), - args=(value,), - client=master, - ) - futures.add(future) - for future in concurrent.futures.as_completed(futures): - try: - num_masters_set += future.result() == value - except RedisError as error: - _logger.error(error, exc_info=True) - else: - quorum = num_masters_set >= len(self.masters) // 2 + 1 - if quorum: # pragma: no cover - break - executor.shutdown(wait=False) + quorum = False + with BailOutExecutor() as executor: + futures, num_masters_set = set(), 0 + for master in self.masters: + future = executor.submit( + cast(Script, self._set_id_script), + keys=(self.key,), + args=(value,), + client=master, + ) + futures.add(future) + for future in concurrent.futures.as_completed(futures): + try: + num_masters_set += future.result() == value + except RedisError as error: + _logger.error(error, exc_info=True) + else: + quorum = num_masters_set >= len(self.masters) // 2 + 1 + if quorum: # pragma: no cover + break if not quorum: raise QuorumNotAchieved(self.masters, self.key) diff --git a/pottery/redlock.py b/pottery/redlock.py index 61104a7a..3a27fb7a 100644 --- a/pottery/redlock.py +++ b/pottery/redlock.py @@ -44,6 +44,7 @@ from .exceptions import ExtendUnlockedLock from .exceptions import ReleaseUnlockedLock from .exceptions import TooManyExtensions +from .executor import BailOutExecutor from .timer import ContextTimer @@ -248,8 +249,7 @@ def __acquire_masters(self) -> bool: self._value = os.urandom(self.num_random_bytes) self._extension_num = 0 quorum, validity_time = False, 0.0 - with ContextTimer() as timer: - executor = concurrent.futures.ThreadPoolExecutor() + with ContextTimer() as timer, BailOutExecutor() as executor: futures, num_masters_acquired = set(), 0 for master in self.masters: futures.add(executor.submit(self.__acquire_master, master)) @@ -264,7 +264,6 @@ def __acquire_masters(self) -> bool: elapsed = timer.elapsed() - self.__drift() validity_time = self.auto_release_time - elapsed break - executor.shutdown(wait=False) if quorum and max(validity_time, 0): return True else: @@ -400,20 +399,20 @@ def extend(self) -> None: if self._extension_num >= self.num_extensions: raise TooManyExtensions(self.masters, self.key) else: - executor = concurrent.futures.ThreadPoolExecutor() - futures, num_masters_extended, quorum = set(), 0, False - for master in self.masters: - futures.add(executor.submit(self.__extend_master, master)) - for future in concurrent.futures.as_completed(futures): - try: - num_masters_extended += future.result() - except RedisError as error: # pragma: no cover - _logger.error(error, exc_info=True) - else: - quorum = num_masters_extended >= len(self.masters) // 2 + 1 - if quorum: - break - executor.shutdown(wait=False) + quorum = False + with BailOutExecutor() as executor: + futures, num_masters_extended = set(), 0 + for master in self.masters: + futures.add(executor.submit(self.__extend_master, master)) + for future in concurrent.futures.as_completed(futures): + try: + num_masters_extended += future.result() + except RedisError as error: # pragma: no cover + _logger.error(error, exc_info=True) + else: + quorum = num_masters_extended >= len(self.masters) // 2 + 1 + if quorum: + break self._extension_num += quorum if not quorum: raise ExtendUnlockedLock(self.masters, self.key) @@ -434,20 +433,20 @@ def release(self) -> None: >>> bool(printer_lock.locked()) False ''' - executor = concurrent.futures.ThreadPoolExecutor() - futures, num_masters_released, quorum = set(), 0, False - for master in self.masters: - futures.add(executor.submit(self.__release_master, master)) - for future in concurrent.futures.as_completed(futures): - try: - num_masters_released += future.result() - except RedisError as error: # pragma: no cover - _logger.error(error, exc_info=True) - else: - quorum = num_masters_released >= len(self.masters) // 2 + 1 - if quorum: - break - executor.shutdown(wait=False) + quorum = False + with BailOutExecutor() as executor: + futures, num_masters_released = set(), 0 + for master in self.masters: + futures.add(executor.submit(self.__release_master, master)) + for future in concurrent.futures.as_completed(futures): + try: + num_masters_released += future.result() + except RedisError as error: # pragma: no cover + _logger.error(error, exc_info=True) + else: + quorum = num_masters_released >= len(self.masters) // 2 + 1 + if quorum: + break if not quorum: raise ReleaseUnlockedLock(self.masters, self.key)