-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure exceptions in handlers are handled equally for sync and async #4734
Changes from all commits
09ac30a
ecc2cc2
81f9408
e831b3a
eb4c1e8
b9087cf
f7d9931
62825c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,6 @@ | |
|
||
import tblib | ||
from tlz import merge | ||
from tornado import gen | ||
from tornado.ioloop import IOLoop, PeriodicCallback | ||
|
||
import dask | ||
|
@@ -37,6 +36,7 @@ | |
get_traceback, | ||
has_keyword, | ||
is_coroutine_function, | ||
shielded, | ||
truncate_exception, | ||
) | ||
|
||
|
@@ -161,6 +161,7 @@ def __init__( | |
self.counters = None | ||
self.digests = None | ||
self._ongoing_coroutines = weakref.WeakSet() | ||
|
||
self._event_finished = asyncio.Event() | ||
|
||
self.listeners = [] | ||
|
@@ -223,7 +224,7 @@ def set_thread_ident(): | |
self.thread_id = threading.get_ident() | ||
|
||
self.io_loop.add_callback(set_thread_ident) | ||
self._startup_lock = asyncio.Lock() | ||
self._startup_fut = None | ||
self.status = Status.undefined | ||
|
||
self.rpc = ConnectionPool( | ||
|
@@ -264,30 +265,46 @@ async def finished(self): | |
|
||
def __await__(self): | ||
async def _(): | ||
timeout = getattr(self, "death_timeout", 0) | ||
async with self._startup_lock: | ||
if self.status == Status.running: | ||
return self | ||
if self.status in (Status.closing, Status.closed): | ||
# We should never await an object which is already closing but | ||
# we should also not start it up again otherwise we'd produce | ||
# zombies | ||
await self.finished() | ||
return | ||
|
||
timeout = getattr(self, "death_timeout", None) | ||
if self._startup_fut is None: | ||
self._startup_fut = asyncio.ensure_future( | ||
asyncio.wait_for(self._start(), timeout=timeout) | ||
) | ||
await self.rpc.start() | ||
|
||
try: | ||
await self._startup_fut | ||
except Exception: | ||
await self.rpc.close() | ||
# Suppress all exception since the objects might not have been | ||
# properly initialized for close to be successful. | ||
with suppress(Exception): | ||
await self.close() | ||
if timeout: | ||
try: | ||
await asyncio.wait_for(self.start(), timeout=timeout) | ||
self.status = Status.running | ||
except Exception: | ||
await self.close(timeout=1) | ||
raise TimeoutError( | ||
"{} failed to start in {} seconds".format( | ||
type(self).__name__, timeout | ||
) | ||
) | ||
raise TimeoutError( | ||
f"{type(self).__name__} failed to start in {timeout} seconds" | ||
) | ||
else: | ||
await self.start() | ||
self.status = Status.running | ||
raise | ||
return self | ||
|
||
return _().__await__() | ||
|
||
async def _start(self): | ||
"""Child specific logic to define startup behaviour.""" | ||
self.status = Status.running | ||
|
||
async def start(self): | ||
await self.rpc.start() | ||
"""Start the server. Child classes are supposed to overwrite | ||
Server._start for custom startup logic.""" | ||
await self | ||
|
||
async def __aenter__(self): | ||
await self | ||
|
@@ -426,7 +443,11 @@ async def handle_comm(self, comm): | |
|
||
logger.debug("Connection from %r to %s", address, type(self).__name__) | ||
self._comms[comm] = op | ||
|
||
# The server might already be listening even though it is not properly | ||
# started, yet. (e.g. preload modules not done) | ||
await self | ||
|
||
try: | ||
while True: | ||
try: | ||
|
@@ -569,17 +590,15 @@ async def handle_stream(self, comm, extra=None, every_cycle=[]): | |
break | ||
handler = self.stream_handlers[op] | ||
if is_coroutine_function(handler): | ||
self.loop.add_callback(handler, **merge(extra, msg)) | ||
await gen.sleep(0) | ||
await handler(**merge(extra, msg)) | ||
else: | ||
handler(**merge(extra, msg)) | ||
else: | ||
logger.error("odd message %s", msg) | ||
await asyncio.sleep(0) | ||
|
||
for func in every_cycle: | ||
if is_coroutine_function(func): | ||
self.loop.add_callback(func) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tracked down the dangling task and this was the |
||
await func() | ||
else: | ||
func() | ||
|
||
|
@@ -597,32 +616,39 @@ async def handle_stream(self, comm, extra=None, every_cycle=[]): | |
await comm.close() | ||
assert comm.closed() | ||
|
||
@gen.coroutine | ||
def close(self): | ||
@shielded | ||
async def close(self): | ||
self.status = Status.closing | ||
self.stop() | ||
await self.rpc.close() | ||
|
||
for pc in self.periodic_callbacks.values(): | ||
pc.stop() | ||
self.__stopped = True | ||
for listener in self.listeners: | ||
future = listener.stop() | ||
if inspect.isawaitable(future): | ||
yield future | ||
for i in range(20): | ||
await future | ||
for _ in range(20): | ||
# If there are still handlers running at this point, give them a | ||
# second to finish gracefully themselves, otherwise... | ||
if any(self._comms.values()): | ||
yield asyncio.sleep(0.05) | ||
await asyncio.sleep(0.05) | ||
else: | ||
break | ||
yield [comm.close() for comm in list(self._comms)] # then forcefully close | ||
await asyncio.gather( | ||
*[comm.close() for comm in list(self._comms)] | ||
) # then forcefully close | ||
for cb in self._ongoing_coroutines: | ||
cb.cancel() | ||
for i in range(10): | ||
for _ in range(10): | ||
if all(c.cancelled() for c in self._ongoing_coroutines): | ||
break | ||
else: | ||
yield asyncio.sleep(0.01) | ||
await asyncio.sleep(0.01) | ||
|
||
self._event_finished.set() | ||
self.status = Status.closed | ||
|
||
|
||
def pingpong(comm): | ||
|
@@ -993,6 +1019,8 @@ async def _(): | |
return _().__await__() | ||
|
||
async def start(self): | ||
if self.status is not Status.init: | ||
return | ||
# Invariant: semaphore._value == limit - open - _n_connecting | ||
self.semaphore = asyncio.Semaphore(self.limit) | ||
self.status = Status.running | ||
|
@@ -1022,7 +1050,6 @@ async def connect(self, addr, timeout=None): | |
raise CommClosedError( | ||
f"ConnectionPool not running. Status: {self.status}" | ||
) | ||
|
||
fut = asyncio.ensure_future( | ||
connect( | ||
addr, | ||
|
@@ -1126,6 +1153,7 @@ async def close(self): | |
# run into an exception and raise a commclosed | ||
while self._n_connecting: | ||
await asyncio.sleep(0.005) | ||
await asyncio.sleep(0) | ||
|
||
|
||
def coerce_to_address(o): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -409,6 +409,9 @@ def __str__(self): | |
|
||
__repr__ = __str__ | ||
|
||
def __await__(self): | ||
return self.start().__await__() | ||
|
||
Comment on lines
+412
to
+414
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
async def start(self): | ||
await asyncio.gather(*self.workers) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This
await self
without the guard I introduced in__await__
would allow an incoming comm to restart a currently shutting down scheduler. There is a time window while the scheduler is handling its affairs, before the actual server closes, where it still accepts incoming comms. If during this time a new comm is opened, the scheduler is restarted although it is currently trying to shutdown.distributed/distributed/scheduler.py
Lines 3759 to 3808 in d9bc3c6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't add a dedicated test for this since there are actually many failing tests once the
close
coroutines are shielded