Skip to content

Commit

Permalink
Acoid zombie workers
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Jun 9, 2021
1 parent 155d2c0 commit 6dfaf28
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 14 deletions.
2 changes: 1 addition & 1 deletion distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ async def _handle_stream(self, stream, address):
try:
await self.on_connection(comm)
except CommClosedError:
logger.info("Connection closed before handshake completed")
logger.info("Connection from %s closed before handshake completed", address)
return

await self.comm_handler(comm)
Expand Down
13 changes: 10 additions & 3 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,17 @@ async def finished(self):

def __await__(self):
async def _():
if self.status == Status.running:
return self

if self.status in (Status.closing, Status.closed):
# We should never await an object which is already closing but
# we should also not start it up again otherwise we'd produce
# zombies
await self.finished()

timeout = getattr(self, "death_timeout", 0)
async with self._startup_lock:
if self.status == Status.running:
return self
if timeout:
try:
await asyncio.wait_for(self.start(), timeout=timeout)
Expand Down Expand Up @@ -423,7 +430,7 @@ async def handle_comm(self, comm):

logger.debug("Connection from %r to %s", address, type(self).__name__)
self._comms[comm] = op
await self

try:
while True:
try:
Expand Down
5 changes: 3 additions & 2 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ async def instantiate(self, comm=None) -> Status:
try:
result = await self.process.start()
except Exception:
await asyncio.shield(self.close())
await self.close()
raise
return result

Expand Down Expand Up @@ -747,6 +747,7 @@ def _run(
loop.make_current()
worker = Worker(**worker_kwargs)

@shielded
async def do_stop(timeout=5, executor_wait=True):
try:
await worker.close(
Expand Down Expand Up @@ -796,7 +797,7 @@ async def run():
# properly handled. See also
# WorkerProcess._wait_until_connected (the 2 is for good
# measure)
sync_sleep(cls._init_msg_interval * 2)
await asyncio.sleep(cls._init_msg_interval * 2)
else:
try:
assert worker.address
Expand Down
6 changes: 0 additions & 6 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,12 +601,6 @@ async def test_nanny_kill_handler(s, a):
while a.process.status != Status.stopped:
await asyncio.sleep(0.05)

while a.status != Status.stopped:
await asyncio.sleep(0.05)

# already closed should be noop
await a.close()


@gen_cluster(nthreads=[("127.0.0.1", 0)], Worker=Nanny)
async def test_nanny_close_gracefully_handler(s, a):
Expand Down
10 changes: 8 additions & 2 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,14 @@ async def close(
):
await self.finished()
return

if self.status not in (Status.running, Status.closing_gracefully):
logger.info(
"Closed worker %s has not yet started: %s",
self.address,
self.status,
)

self.status = Status.closing

self.reconnect = False
Expand All @@ -1241,8 +1249,6 @@ async def close(
logger.info("Stopping worker at %s", self.address)
except ValueError: # address not available if already closed
logger.info("Stopping worker")
if self.status not in (Status.running, Status.closing_gracefully):
logger.info("Closed worker has not yet started: %s", self.status)

for preload in self.preloads:
await preload.teardown()
Expand Down

0 comments on commit 6dfaf28

Please sign in to comment.