-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure exceptions in handlers are handled equally for sync and async #4734
Conversation
Test failure in |
@@ -577,7 +575,7 @@ async def handle_stream(self, comm, extra=None, every_cycle=[]): | |||
|
|||
for func in every_cycle: | |||
if is_coroutine_function(func): | |||
self.loop.add_callback(func) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tracked down the dangling task and this was the ensure_compute
scheduled here. In particular it concerned the ensure_computing
which uses the async interface to offload deserialisation. This was introduced in #4307 just before the infamous 2020.12.0
release
baf1281
to
2826308
Compare
This situation is a bit awkward and I see two options
Note: this problem pops up in many places in the worker so solving 2.) once might benefit other situations as well. @mrocklin Any thoughts on this? Do we have any asyncio/tornado experts we can loop into this discussion? Note: The PR grew again in scope. I ignored the warning in the gen.coroutine comment and figured it was easy to rewrite. Turns out this introduces race conditions during closing since the behaviour of gen.coroutine and plain async coroutines differ in the way they are cancelled (or not). Will break this off into another PR (At least I hope this is the explanation and the changes can be decoupled :/ ) |
2826308
to
6270e4f
Compare
I'm not sure that this is true. There is a handle_stream coroutine for every comm. Comms are most often used with the ## caller side
await worker.some_handler(arg=foo) The caller side generally waits until the call to the handler completes before moving on, so the backup won't be inside the socket on the receiver side, it will be in application code on the caller side, which seems safe. Previously this call would have completed very quickly and now it will take longer. That's probably ok. In summary, if things aren't breaking here I think that we're ok. In principle this change feels safe to me. I wouldn't be surprised if it did break some things, but even if it did, those things were probably broken beforehand and relying on bad logic in the worker. For example, if the caller side was doing the following: await worker.some_handler(arg=1)
await worker.some_handler(arg=2)
await worker.some_handler(arg=3)
await worker.some_handler(arg=4) Then with async handlers this would have previously run much more quickly than it should have. In a situation like the following: await asyncio.gather(
worker.some_handler(arg=1),
worker.some_handler(arg=2),
...
) The caller side will actually open many comms to the server to handle the requests in parallel, and so we'll get many (Usual disclaimer of "I'm a bit rusty on all of this, so please don't believe me") |
I'm mostly concerned about the primary link between scheduler and worker which is based on a single comm. Most of the communication between the two is done via this single comm using a However, with the exception of |
Yeah, anything sent by a BatchedSend is intended to run immediately. If you wanted to be careful though you could add a read into the |
5a71ee0
to
0f6a8e1
Compare
b28bca2
to
c2cb945
Compare
71d8b6f
to
6dfaf28
Compare
f2b38d4
to
284bced
Compare
@@ -422,7 +430,7 @@ async def handle_comm(self, comm): | |||
|
|||
logger.debug("Connection from %r to %s", address, type(self).__name__) | |||
self._comms[comm] = op | |||
await self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This await self
without the guard I introduced in __await__
would allow an incoming comm to restart a currently shutting down scheduler. There is a time window while the scheduler is handling its affairs, before the actual server closes, where it still accepts incoming comms. If during this time a new comm is opened, the scheduler is restarted although it is currently trying to shutdown.
distributed/distributed/scheduler.py
Lines 3759 to 3808 in d9bc3c6
self.status = Status.closing | |
logger.info("Scheduler closing...") | |
setproctitle("dask-scheduler [closing]") | |
for preload in self.preloads: | |
await preload.teardown() | |
if close_workers: | |
await self.broadcast(msg={"op": "close_gracefully"}, nanny=True) | |
for worker in parent._workers_dv: | |
self.worker_send(worker, {"op": "close"}) | |
for i in range(20): # wait a second for send signals to clear | |
if parent._workers_dv: | |
await asyncio.sleep(0.05) | |
else: | |
break | |
await asyncio.gather(*[plugin.close() for plugin in self.plugins]) | |
for pc in self.periodic_callbacks.values(): | |
pc.stop() | |
self.periodic_callbacks.clear() | |
self.stop_services() | |
for ext in parent._extensions.values(): | |
with suppress(AttributeError): | |
ext.teardown() | |
logger.info("Scheduler closing all comms") | |
futures = [] | |
for w, comm in list(self.stream_comms.items()): | |
if not comm.closed(): | |
comm.send({"op": "close", "report": False}) | |
comm.send({"op": "close-stream"}) | |
with suppress(AttributeError): | |
futures.append(comm.close()) | |
for future in futures: # TODO: do all at once | |
await future | |
for comm in self.client_comms.values(): | |
comm.abort() | |
await self.rpc.close() | |
self.status = Status.closed | |
self.stop() | |
await super().close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't add a dedicated test for this since there are actually many failing tests once the close
coroutines are shielded
def shielded(func): | ||
""" | ||
Shield decorated method or function from cancellation. Note that the | ||
decorated coroutine will immediately scheduled as a task if the decorated | ||
function is invoked. | ||
|
||
See also https://docs.python.org/3/library/asyncio-task.html#asyncio.shield | ||
""" | ||
|
||
@wraps(func) | ||
def _(*args, **kwargs): | ||
return asyncio.shield(func(*args, **kwargs)) | ||
|
||
return _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not absolutely required but with this decorator we can define methods which are always shielded (e.g. close) instead of relying on remembering it on every invocation
284bced
to
5196890
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious about the reason for a few of the changes here. Also, do we know why tests aren't happy yet?
distributed/deploy/spec.py
Outdated
for w in to_close | ||
if w in self.workers | ||
] | ||
tasks = [self.workers[w].close() for w in to_close if w in self.workers] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we call asyncio.wait
here and avoid the tasks variable entirely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
try: | ||
if self.process is not None: | ||
await self.kill(timeout=timeout) | ||
except Exception: | ||
pass | ||
self.process = None | ||
await self.rpc.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same argument as above about cleaning up
@@ -506,18 +509,15 @@ async def close(self, comm=None, timeout=5, report=None): | |||
for preload in self.preloads: | |||
await preload.teardown() | |||
|
|||
self.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is redundant since we do the same in Server.close
. it looked like being redundant and I felt removing it would reduce complexity. If there is some purpose to stopping earlier, I can revert this logic. This is only my sense of cleaning up
await self.rpc.close() | ||
|
||
self.status = Status.closed | ||
self.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is performed by super().close()
since there are no other things happening in between, letting the server close itself felt like the right thing to do to reduce complexity.
*[ | ||
self.remove_worker(address=w, safe=True, close=False) | ||
for w in worker_keys | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change adds close=False
(default is True). This causes in most situations to issue a close_worker call twice since there is a dedicated close_worker call above.
It might be the correct thing to change this to close=not close_workers
to avoid the duplication
4e36762
to
6ddf5b1
Compare
853848f
to
fceb5ce
Compare
I'm hitting a lot of known "flaky" tests here (cannot reproduce locally, yet) |
1cee222
to
2fdb036
Compare
75f33ce
to
353bd34
Compare
def __await__(self): | ||
return self.start().__await__() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This MultiWorker
is a bit strange since it isn't a Server although Worker
s are always Server
s
Unless the task is entirely forgotten by the worker it is confusing behaviour if the dependent or a task is removed once it finished executing. if this information is required a dedicated dynamic attribute should be used like waiters on the scheduler side
If not awaited, the server might server handlers before the server is properly initialized. For instance plugins and preload modules might not be fully loaded before that happens
1d04276
to
62825c2
Compare
I do not have a strong opinion at the moment about what should happen if a handler raises an exception but regardless, I believe an async and sync handler should show the same behaviour. Everything else is pretty hard to distinguish for any developer.
One easy way to achieve this is to simply await the handler during handle_stream. I am not sure if it was implemented this way hoping that messages would be faster worked off or what the motivation behind this was.
If we want to stick to the old
add_callback
we need to implement exception handling on the returned future/task.