diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 258f4e88a5a..b54ee59f543 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -487,7 +487,7 @@ async def _handle_stream(self, stream, address): try: await self.on_connection(comm) except CommClosedError: - logger.info("Connection closed before handshake completed") + logger.info("Connection from %s closed before handshake completed", address) return await self.comm_handler(comm) diff --git a/distributed/core.py b/distributed/core.py index 6812ee1c163..843fc42dcfc 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -264,10 +264,17 @@ async def finished(self): def __await__(self): async def _(): + if self.status == Status.running: + return self + + if self.status in (Status.closing, Status.closed): + # We should never await an object which is already closing but + # we should also not start it up again otherwise we'd produce + # zombies + await self.finished() + timeout = getattr(self, "death_timeout", 0) async with self._startup_lock: - if self.status == Status.running: - return self if timeout: try: await asyncio.wait_for(self.start(), timeout=timeout) @@ -423,7 +430,7 @@ async def handle_comm(self, comm): logger.debug("Connection from %r to %s", address, type(self).__name__) self._comms[comm] = op - await self + try: while True: try: diff --git a/distributed/nanny.py b/distributed/nanny.py index a7fcd705476..a1779c5284d 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -385,7 +385,7 @@ async def instantiate(self, comm=None) -> Status: try: result = await self.process.start() except Exception: - await asyncio.shield(self.close()) + await self.close() raise return result @@ -747,6 +747,7 @@ def _run( loop.make_current() worker = Worker(**worker_kwargs) + @shielded async def do_stop(timeout=5, executor_wait=True): try: await worker.close( @@ -796,7 +797,7 @@ async def run(): # properly handled. See also # WorkerProcess._wait_until_connected (the 2 is for good # measure) - sync_sleep(cls._init_msg_interval * 2) + await asyncio.sleep(cls._init_msg_interval * 2) else: try: assert worker.address diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 81728c674ef..df30c4e6283 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -601,12 +601,6 @@ async def test_nanny_kill_handler(s, a): while a.process.status != Status.stopped: await asyncio.sleep(0.05) - while a.status != Status.stopped: - await asyncio.sleep(0.05) - - # already closed should be noop - await a.close() - @gen_cluster(nthreads=[("127.0.0.1", 0)], Worker=Nanny) async def test_nanny_close_gracefully_handler(s, a): diff --git a/distributed/worker.py b/distributed/worker.py index fc4f2315e65..a5cf9aa46d6 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1232,6 +1232,14 @@ async def close( ): await self.finished() return + + if self.status not in (Status.running, Status.closing_gracefully): + logger.info( + "Closed worker %s has not yet started: %s", + self.address, + self.status, + ) + self.status = Status.closing self.reconnect = False @@ -1241,8 +1249,6 @@ async def close( logger.info("Stopping worker at %s", self.address) except ValueError: # address not available if already closed logger.info("Stopping worker") - if self.status not in (Status.running, Status.closing_gracefully): - logger.info("Closed worker has not yet started: %s", self.status) for preload in self.preloads: await preload.teardown()