diff --git a/distributed/process.py b/distributed/process.py index a0d462d407..1c11dd2e3d 100644 --- a/distributed/process.py +++ b/distributed/process.py @@ -1,4 +1,3 @@ -import atexit import logging import os from queue import Queue as PyQueue @@ -81,8 +80,10 @@ def __init__(self, loop=None, target=None, name=None, args=(), kwargs={}): dask.config.global_config, ), ) - _dangling.add(self._process) self._name = self._process.name + self._proc_finalizer = weakref.finalize( + self, _asyncprocess_finalizer, self._process + ) self._watch_q = PyQueue() self._exit_future = Future() self._exit_callback = None @@ -118,8 +119,8 @@ def stop_thread(q): # We don't join the thread here as a finalizer can be called # asynchronously from anywhere - self._finalizer = weakref.finalize(self, stop_thread, q=self._watch_q) - self._finalizer.atexit = False + self._thread_finalizer = weakref.finalize(self, stop_thread, q=self._watch_q) + self._thread_finalizer.atexit = False def _on_exit(self, exitcode): # Called from the event loop when the child process exited @@ -292,7 +293,7 @@ def close(self): immediately and does not ensure the child process has exited. """ if not self._closed: - self._finalizer() + self._thread_finalizer() self._process = None self._closed = True @@ -334,15 +335,10 @@ def daemon(self, value): self._process.daemon = value -_dangling = weakref.WeakSet() - - -@atexit.register -def _cleanup_dangling(): - for proc in list(_dangling): - if proc.is_alive(): - try: - logger.info("reaping stray process %s" % (proc,)) - proc.terminate() - except OSError: - pass +def _asyncprocess_finalizer(proc): + if proc.is_alive(): + try: + logger.info("reaping stray process %s" % (proc,)) + proc.terminate() + except OSError: + pass diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 1a373aeac2..85b4f17d60 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -44,7 +44,6 @@ from .core import connect, rpc, CommClosedError, Status from .deploy import SpecCluster from .metrics import time -from .process import _cleanup_dangling from .proctitle import enable_proctitle_on_children from .security import Security from .utils import ( @@ -1451,7 +1450,6 @@ def check_process_leak(check=True): else: assert not mp_context.active_children() - _cleanup_dangling() for proc in mp_context.active_children(): proc.terminate()