Skip to content

Commit

Permalink
Replace AsyncProcess exit handler by weakref.finalize (#4184)
Browse files Browse the repository at this point in the history
(cherry picked from commit 8612473)
  • Loading branch information
pentschev authored and jrbourbeau committed Oct 30, 2020
1 parent a1dc5f4 commit e927771
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 19 deletions.
30 changes: 13 additions & 17 deletions distributed/process.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import atexit
import logging
import os
from queue import Queue as PyQueue
Expand Down Expand Up @@ -81,8 +80,10 @@ def __init__(self, loop=None, target=None, name=None, args=(), kwargs={}):
dask.config.global_config,
),
)
_dangling.add(self._process)
self._name = self._process.name
self._proc_finalizer = weakref.finalize(
self, _asyncprocess_finalizer, self._process
)
self._watch_q = PyQueue()
self._exit_future = Future()
self._exit_callback = None
Expand Down Expand Up @@ -118,8 +119,8 @@ def stop_thread(q):
# We don't join the thread here as a finalizer can be called
# asynchronously from anywhere

self._finalizer = weakref.finalize(self, stop_thread, q=self._watch_q)
self._finalizer.atexit = False
self._thread_finalizer = weakref.finalize(self, stop_thread, q=self._watch_q)
self._thread_finalizer.atexit = False

def _on_exit(self, exitcode):
# Called from the event loop when the child process exited
Expand Down Expand Up @@ -292,7 +293,7 @@ def close(self):
immediately and does not ensure the child process has exited.
"""
if not self._closed:
self._finalizer()
self._thread_finalizer()
self._process = None
self._closed = True

Expand Down Expand Up @@ -334,15 +335,10 @@ def daemon(self, value):
self._process.daemon = value


_dangling = weakref.WeakSet()


@atexit.register
def _cleanup_dangling():
for proc in list(_dangling):
if proc.is_alive():
try:
logger.info("reaping stray process %s" % (proc,))
proc.terminate()
except OSError:
pass
def _asyncprocess_finalizer(proc):
if proc.is_alive():
try:
logger.info("reaping stray process %s" % (proc,))
proc.terminate()
except OSError:
pass
2 changes: 0 additions & 2 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
from .core import connect, rpc, CommClosedError, Status
from .deploy import SpecCluster
from .metrics import time
from .process import _cleanup_dangling
from .proctitle import enable_proctitle_on_children
from .security import Security
from .utils import (
Expand Down Expand Up @@ -1451,7 +1450,6 @@ def check_process_leak(check=True):
else:
assert not mp_context.active_children()

_cleanup_dangling()
for proc in mp_context.active_children():
proc.terminate()

Expand Down

0 comments on commit e927771

Please sign in to comment.