Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure exceptions in handlers are handled equally for sync and async #4734

Closed
wants to merge 8 commits into from

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Apr 22, 2021

I do not have a strong opinion at the moment about what should happen if a handler raises an exception but regardless, I believe an async and sync handler should show the same behaviour. Everything else is pretty hard to distinguish for any developer.

One easy way to achieve this is to simply await the handler during handle_stream. I am not sure if it was implemented this way hoping that messages would be faster worked off or what the motivation behind this was.
If we want to stick to the old add_callback we need to implement exception handling on the returned future/task.

@fjetter
Copy link
Member Author

fjetter commented Apr 22, 2021

Test failure in test_no_danglng_asyncio_tasks is a genuine failure and I can reproduce locally

distributed/core.py Outdated Show resolved Hide resolved
@@ -577,7 +575,7 @@ async def handle_stream(self, comm, extra=None, every_cycle=[]):

for func in every_cycle:
if is_coroutine_function(func):
self.loop.add_callback(func)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tracked down the dangling task and this was the ensure_compute scheduled here. In particular it concerned the ensure_computing which uses the async interface to offload deserialisation. This was introduced in #4307 just before the infamous 2020.12.0 release

@fjetter fjetter force-pushed the ensure_handlers_behave_the_same branch from baf1281 to 2826308 Compare May 14, 2021 13:44
@fjetter
Copy link
Member Author

fjetter commented May 14, 2021

This situation is a bit awkward and I see two options

  1. We await both handler and every-cycle-callbacks and let asyncio/python deal with the exceptions themselves. This puts us at risk of having a slow handler or slow cycle-callback. This could, for instance, cause the worker to accumulate messages in the queue faster than it could work off. Even worse, I believe there is a chance for the comm / tcp socket to overflow which either results in weird exceptions or dropped messages, I'd need to research this a bit. In either case this would probably only happen if there is a quite significant load on the entire system in which case I'd wonder if we're coming out of this alive, anyhow. Not an expert though.
  2. We implement the task offloading to the loop ourselves and take care that the exceptions are properly handled. The way tornado deals with this is not 100% well suited to our use case here. In the cases where we put tasks on the loop, we often would like to check if they completed, erred or cancel them even. Currently we do not have this possibility.

Note: this problem pops up in many places in the worker so solving 2.) once might benefit other situations as well.

@mrocklin Any thoughts on this? Do we have any asyncio/tornado experts we can loop into this discussion?


Note: The PR grew again in scope. I ignored the warning in the gen.coroutine comment and figured it was easy to rewrite. Turns out this introduces race conditions during closing since the behaviour of gen.coroutine and plain async coroutines differ in the way they are cancelled (or not). Will break this off into another PR (At least I hope this is the explanation and the changes can be decoupled :/ )

@fjetter fjetter force-pushed the ensure_handlers_behave_the_same branch from 2826308 to 6270e4f Compare May 14, 2021 13:53
@mrocklin
Copy link
Member

This could, for instance, cause the worker to accumulate messages in the queue faster than it could work off

I'm not sure that this is true. There is a handle_stream coroutine for every comm. Comms are most often used with the rpc objects as follows:

## caller side
await worker.some_handler(arg=foo)

The caller side generally waits until the call to the handler completes before moving on, so the backup won't be inside the socket on the receiver side, it will be in application code on the caller side, which seems safe. Previously this call would have completed very quickly and now it will take longer. That's probably ok.

In summary, if things aren't breaking here I think that we're ok. In principle this change feels safe to me. I wouldn't be surprised if it did break some things, but even if it did, those things were probably broken beforehand and relying on bad logic in the worker. For example, if the caller side was doing the following:

await worker.some_handler(arg=1)
await worker.some_handler(arg=2)
await worker.some_handler(arg=3)
await worker.some_handler(arg=4)

Then with async handlers this would have previously run much more quickly than it should have. In a situation like the following:

await asyncio.gather(
    worker.some_handler(arg=1),
    worker.some_handler(arg=2),
    ...
)

The caller side will actually open many comms to the server to handle the requests in parallel, and so we'll get many handle_stream coroutines, and everything should behave nicely.

(Usual disclaimer of "I'm a bit rusty on all of this, so please don't believe me")

@fjetter
Copy link
Member Author

fjetter commented May 14, 2021

I'm not sure that this is true. There is a handle_stream coroutine for every comm. Comms are most often used with the rpc objects as follows:

I'm mostly concerned about the primary link between scheduler and worker which is based on a single comm. Most of the communication between the two is done via this single comm using a BatchedSend. For instance all "compute-task", "forget-task" msgs are moving along this single comm and I'm wondering if this becomes an issue.

However, with the exception of Worker.close all of our stream handlers seem to be sync anyhow, so I'm worrying unnecessarily 🤦

@mrocklin
Copy link
Member

However, with the exception of Worker.close all of our stream handlers seem to be sync anyhow, so I'm worrying unnecessarily

Yeah, anything sent by a BatchedSend is intended to run immediately.

If you wanted to be careful though you could add a read into the BatchedSend._background_send method. This would close the loop and establish back pressure. Really we would want to allow a few of these to be in flight before awaiting. I'm not too worried about this though.

@fjetter fjetter force-pushed the ensure_handlers_behave_the_same branch 3 times, most recently from 5a71ee0 to 0f6a8e1 Compare May 27, 2021 16:38
@fjetter fjetter force-pushed the ensure_handlers_behave_the_same branch 2 times, most recently from b28bca2 to c2cb945 Compare May 31, 2021 09:37
@fjetter fjetter force-pushed the ensure_handlers_behave_the_same branch from 71d8b6f to 6dfaf28 Compare June 9, 2021 13:34
@fjetter fjetter force-pushed the ensure_handlers_behave_the_same branch from f2b38d4 to 284bced Compare June 21, 2021 13:18
@@ -422,7 +430,7 @@ async def handle_comm(self, comm):

logger.debug("Connection from %r to %s", address, type(self).__name__)
self._comms[comm] = op
await self
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This await self without the guard I introduced in __await__ would allow an incoming comm to restart a currently shutting down scheduler. There is a time window while the scheduler is handling its affairs, before the actual server closes, where it still accepts incoming comms. If during this time a new comm is opened, the scheduler is restarted although it is currently trying to shutdown.

self.status = Status.closing
logger.info("Scheduler closing...")
setproctitle("dask-scheduler [closing]")
for preload in self.preloads:
await preload.teardown()
if close_workers:
await self.broadcast(msg={"op": "close_gracefully"}, nanny=True)
for worker in parent._workers_dv:
self.worker_send(worker, {"op": "close"})
for i in range(20): # wait a second for send signals to clear
if parent._workers_dv:
await asyncio.sleep(0.05)
else:
break
await asyncio.gather(*[plugin.close() for plugin in self.plugins])
for pc in self.periodic_callbacks.values():
pc.stop()
self.periodic_callbacks.clear()
self.stop_services()
for ext in parent._extensions.values():
with suppress(AttributeError):
ext.teardown()
logger.info("Scheduler closing all comms")
futures = []
for w, comm in list(self.stream_comms.items()):
if not comm.closed():
comm.send({"op": "close", "report": False})
comm.send({"op": "close-stream"})
with suppress(AttributeError):
futures.append(comm.close())
for future in futures: # TODO: do all at once
await future
for comm in self.client_comms.values():
comm.abort()
await self.rpc.close()
self.status = Status.closed
self.stop()
await super().close()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add a dedicated test for this since there are actually many failing tests once the close coroutines are shielded

Comment on lines +206 to +207
def shielded(func):
"""
Shield decorated method or function from cancellation. Note that the
decorated coroutine will immediately scheduled as a task if the decorated
function is invoked.

See also https://docs.python.org/3/library/asyncio-task.html#asyncio.shield
"""

@wraps(func)
def _(*args, **kwargs):
return asyncio.shield(func(*args, **kwargs))

return _
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not absolutely required but with this decorator we can define methods which are always shielded (e.g. close) instead of relying on remembering it on every invocation

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about the reason for a few of the changes here. Also, do we know why tests aren't happy yet?

for w in to_close
if w in self.workers
]
tasks = [self.workers[w].close() for w in to_close if w in self.workers]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call asyncio.wait here and avoid the tasks variable entirely?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

try:
if self.process is not None:
await self.kill(timeout=timeout)
except Exception:
pass
self.process = None
await self.rpc.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same argument as above about cleaning up

@@ -506,18 +509,15 @@ async def close(self, comm=None, timeout=5, report=None):
for preload in self.preloads:
await preload.teardown()

self.stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is redundant since we do the same in Server.close. it looked like being redundant and I felt removing it would reduce complexity. If there is some purpose to stopping earlier, I can revert this logic. This is only my sense of cleaning up

await self.rpc.close()

self.status = Status.closed
self.stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is performed by super().close() since there are no other things happening in between, letting the server close itself felt like the right thing to do to reduce complexity.

*[
self.remove_worker(address=w, safe=True, close=False)
for w in worker_keys
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change adds close=False (default is True). This causes in most situations to issue a close_worker call twice since there is a dedicated close_worker call above.

It might be the correct thing to change this to close=not close_workers to avoid the duplication

@fjetter fjetter force-pushed the ensure_handlers_behave_the_same branch from 4e36762 to 6ddf5b1 Compare June 24, 2021 07:31
@jrbourbeau
Copy link
Member

Sorry @fjetter, it looks like #4966 caused some merge conflicts here

@fjetter fjetter force-pushed the ensure_handlers_behave_the_same branch 3 times, most recently from 853848f to fceb5ce Compare June 28, 2021 12:23
@fjetter
Copy link
Member Author

fjetter commented Jun 28, 2021

I'm hitting a lot of known "flaky" tests here (cannot reproduce locally, yet)

@fjetter fjetter force-pushed the ensure_handlers_behave_the_same branch 2 times, most recently from 1cee222 to 2fdb036 Compare June 30, 2021 15:51
@fjetter fjetter force-pushed the ensure_handlers_behave_the_same branch 2 times, most recently from 75f33ce to 353bd34 Compare July 8, 2021 13:16
Comment on lines +418 to +414
def __await__(self):
return self.start().__await__()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This MultiWorker is a bit strange since it isn't a Server although Workers are always Servers

Unless the task is entirely forgotten by
the worker it is confusing behaviour if
the dependent or a task is removed
once it finished executing. if this
information is required a dedicated
dynamic attribute should be used like
waiters on the scheduler side
If not awaited, the server might server handlers before the server is
properly initialized. For instance plugins and preload modules might
not be fully loaded before that happens
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants