Skip to content

Commit

Permalink
Adapt interface I don't like to one I do like
Browse files Browse the repository at this point in the history
  • Loading branch information
brainix committed Dec 5, 2020
1 parent 96f9b8b commit 1bf7f19
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 51 deletions.
15 changes: 15 additions & 0 deletions pottery/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# --------------------------------------------------------------------------- #
# executor.py #
# #
# Copyright © 2015-2020, Rajiv Bakulesh Shah, original author. #
# All rights reserved. #
# --------------------------------------------------------------------------- #


import concurrent.futures


class BailOutExecutor(concurrent.futures.ThreadPoolExecutor):
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=False)
return False
41 changes: 21 additions & 20 deletions pottery/nextid.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from .base import Primitive
from .exceptions import QuorumNotAchieved
from .executor import BailOutExecutor


_logger: Final[logging.Logger] = logging.getLogger('pottery')
Expand Down Expand Up @@ -149,26 +150,26 @@ def __current_id(self) -> int:

@__current_id.setter
def __current_id(self, value: int) -> None:
executor = concurrent.futures.ThreadPoolExecutor()
futures, num_masters_set, quorum = set(), 0, False
for master in self.masters:
future = executor.submit(
cast(Script, self._set_id_script),
keys=(self.key,),
args=(value,),
client=master,
)
futures.add(future)
for future in concurrent.futures.as_completed(futures):
try:
num_masters_set += future.result() == value
except RedisError as error:
_logger.error(error, exc_info=True)
else:
quorum = num_masters_set >= len(self.masters) // 2 + 1
if quorum: # pragma: no cover
break
executor.shutdown(wait=False)
quorum = False
with BailOutExecutor() as executor:
futures, num_masters_set = set(), 0
for master in self.masters:
future = executor.submit(
cast(Script, self._set_id_script),
keys=(self.key,),
args=(value,),
client=master,
)
futures.add(future)
for future in concurrent.futures.as_completed(futures):
try:
num_masters_set += future.result() == value
except RedisError as error:
_logger.error(error, exc_info=True)
else:
quorum = num_masters_set >= len(self.masters) // 2 + 1
if quorum: # pragma: no cover
break
if not quorum:
raise QuorumNotAchieved(self.masters, self.key)

Expand Down
61 changes: 30 additions & 31 deletions pottery/redlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from .exceptions import ExtendUnlockedLock
from .exceptions import ReleaseUnlockedLock
from .exceptions import TooManyExtensions
from .executor import BailOutExecutor
from .timer import ContextTimer


Expand Down Expand Up @@ -248,8 +249,7 @@ def __acquire_masters(self) -> bool:
self._value = os.urandom(self.num_random_bytes)
self._extension_num = 0
quorum, validity_time = False, 0.0
with ContextTimer() as timer:
executor = concurrent.futures.ThreadPoolExecutor()
with ContextTimer() as timer, BailOutExecutor() as executor:
futures, num_masters_acquired = set(), 0
for master in self.masters:
futures.add(executor.submit(self.__acquire_master, master))
Expand All @@ -264,7 +264,6 @@ def __acquire_masters(self) -> bool:
elapsed = timer.elapsed() - self.__drift()
validity_time = self.auto_release_time - elapsed
break
executor.shutdown(wait=False)
if quorum and max(validity_time, 0):
return True
else:
Expand Down Expand Up @@ -400,20 +399,20 @@ def extend(self) -> None:
if self._extension_num >= self.num_extensions:
raise TooManyExtensions(self.masters, self.key)
else:
executor = concurrent.futures.ThreadPoolExecutor()
futures, num_masters_extended, quorum = set(), 0, False
for master in self.masters:
futures.add(executor.submit(self.__extend_master, master))
for future in concurrent.futures.as_completed(futures):
try:
num_masters_extended += future.result()
except RedisError as error: # pragma: no cover
_logger.error(error, exc_info=True)
else:
quorum = num_masters_extended >= len(self.masters) // 2 + 1
if quorum:
break
executor.shutdown(wait=False)
quorum = False
with BailOutExecutor() as executor:
futures, num_masters_extended = set(), 0
for master in self.masters:
futures.add(executor.submit(self.__extend_master, master))
for future in concurrent.futures.as_completed(futures):
try:
num_masters_extended += future.result()
except RedisError as error: # pragma: no cover
_logger.error(error, exc_info=True)
else:
quorum = num_masters_extended >= len(self.masters) // 2 + 1
if quorum:
break
self._extension_num += quorum
if not quorum:
raise ExtendUnlockedLock(self.masters, self.key)
Expand All @@ -434,20 +433,20 @@ def release(self) -> None:
>>> bool(printer_lock.locked())
False
'''
executor = concurrent.futures.ThreadPoolExecutor()
futures, num_masters_released, quorum = set(), 0, False
for master in self.masters:
futures.add(executor.submit(self.__release_master, master))
for future in concurrent.futures.as_completed(futures):
try:
num_masters_released += future.result()
except RedisError as error: # pragma: no cover
_logger.error(error, exc_info=True)
else:
quorum = num_masters_released >= len(self.masters) // 2 + 1
if quorum:
break
executor.shutdown(wait=False)
quorum = False
with BailOutExecutor() as executor:
futures, num_masters_released = set(), 0
for master in self.masters:
futures.add(executor.submit(self.__release_master, master))
for future in concurrent.futures.as_completed(futures):
try:
num_masters_released += future.result()
except RedisError as error: # pragma: no cover
_logger.error(error, exc_info=True)
else:
quorum = num_masters_released >= len(self.masters) // 2 + 1
if quorum:
break
if not quorum:
raise ReleaseUnlockedLock(self.masters, self.key)

Expand Down

0 comments on commit 1bf7f19

Please sign in to comment.