From 29393f418428e88f23b42ece03f3a9d44bb9a3e7 Mon Sep 17 00:00:00 2001 From: Ethan Van Der Heijden Date: Sun, 22 Sep 2024 03:09:30 -0400 Subject: [PATCH] fix: properly clean up threads when stopping Inotify. Improve Eventlet tests. (#1070) * Improve cleaning up Inotify threads and add eventlet test cases. * Align SkipRepeatsQueue with Eventlet's Queue implementation. * Only run eventlet tests in Linux. --- src/watchdog/observers/inotify_c.py | 42 +++++++++++++++++--- src/watchdog/utils/bricks.py | 13 +++--- tests/isolated/eventlet_observer_stops.py | 30 ++++++++++++++ tests/isolated/eventlet_skip_repeat_queue.py | 33 +++++++++++++++ tests/markers.py | 7 ---- tests/test_inotify_c.py | 11 ++++- tests/test_isolated.py | 24 +++++++++++ tests/test_skip_repeats_queue.py | 17 +------- tests/utils.py | 28 +++++++++++++ 9 files changed, 168 insertions(+), 37 deletions(-) create mode 100644 tests/isolated/eventlet_observer_stops.py create mode 100644 tests/isolated/eventlet_skip_repeat_queue.py delete mode 100644 tests/markers.py create mode 100644 tests/test_isolated.py diff --git a/src/watchdog/observers/inotify_c.py b/src/watchdog/observers/inotify_c.py index d6765e14..74c74a6f 100644 --- a/src/watchdog/observers/inotify_c.py +++ b/src/watchdog/observers/inotify_c.py @@ -5,6 +5,7 @@ import ctypes.util import errno import os +import select import struct import threading from ctypes import c_char_p, c_int, c_uint32 @@ -148,6 +149,9 @@ def __init__(self, path: bytes, *, recursive: bool = False, event_mask: int | No Inotify._raise_error() self._inotify_fd = inotify_fd self._lock = threading.Lock() + self._closed = False + self._waiting_to_read = True + self._kill_r, self._kill_w = os.pipe() # Stores the watch descriptor for a given path. self._wd_for_path: dict[bytes, int] = {} @@ -230,13 +234,19 @@ def remove_watch(self, path: bytes) -> None: def close(self) -> None: """Closes the inotify instance and removes all associated watches.""" with self._lock: - if self._path in self._wd_for_path: - wd = self._wd_for_path[self._path] - inotify_rm_watch(self._inotify_fd, wd) + if not self._closed: + self._closed = True - # descriptor may be invalid because file was deleted - with contextlib.suppress(OSError): - os.close(self._inotify_fd) + if self._path in self._wd_for_path: + wd = self._wd_for_path[self._path] + inotify_rm_watch(self._inotify_fd, wd) + + if self._waiting_to_read: + # inotify_rm_watch() should write data to _inotify_fd and wake + # the thread, but writing to the kill channel will gaurentee this + os.write(self._kill_w, b'!') + else: + self._close_resources() def read_events(self, *, event_buffer_size: int = DEFAULT_EVENT_BUFFER_SIZE) -> list[InotifyEvent]: """Reads events from inotify and yields them.""" @@ -276,6 +286,21 @@ def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]: event_buffer = None while True: try: + with self._lock: + if self._closed: + return [] + + self._waiting_to_read = True + + select.select([self._inotify_fd, self._kill_r], [], []) + + with self._lock: + self._waiting_to_read = False + + if self._closed: + self._close_resources() + return [] + event_buffer = os.read(self._inotify_fd, event_buffer_size) except OSError as e: if e.errno == errno.EINTR: @@ -340,6 +365,11 @@ def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]: return event_list + def _close_resources(self): + os.close(self._inotify_fd) + os.close(self._kill_r) + os.close(self._kill_w) + # Non-synchronized methods. def _add_dir_watch(self, path: bytes, mask: int, *, recursive: bool) -> None: """Adds a watch (optionally recursively) for the given directory path diff --git a/src/watchdog/utils/bricks.py b/src/watchdog/utils/bricks.py index 8dd0afa7..cdf9af23 100644 --- a/src/watchdog/utils/bricks.py +++ b/src/watchdog/utils/bricks.py @@ -72,14 +72,13 @@ def _init(self, maxsize: int) -> None: super()._init(maxsize) self._last_item = None - def _put(self, item: Any) -> None: + def put(self, item: Any, block: bool = True, timeout: float | None = None) -> None: if self._last_item is None or item != self._last_item: - super()._put(item) - self._last_item = item - else: - # `put` increments `unfinished_tasks` even if we did not put - # anything into the queue here - self.unfinished_tasks -= 1 + super().put(item, block, timeout) + + def _put(self, item: Any) -> None: + super()._put(item) + self._last_item = item def _get(self) -> Any: item = super()._get() diff --git a/tests/isolated/eventlet_observer_stops.py b/tests/isolated/eventlet_observer_stops.py new file mode 100644 index 00000000..1cf82bdd --- /dev/null +++ b/tests/isolated/eventlet_observer_stops.py @@ -0,0 +1,30 @@ +if __name__ == '__main__': + import eventlet + + eventlet.monkey_patch() + + import signal + import sys + import tempfile + + from watchdog.observers import Observer + from watchdog.events import LoggingEventHandler + + with tempfile.TemporaryDirectory() as temp_dir: + def run_observer(): + event_handler = LoggingEventHandler() + observer = Observer() + observer.schedule(event_handler, temp_dir) + observer.start() + eventlet.sleep(1) + observer.stop() + + def on_alarm(signum, frame): + print("Observer.stop() never finished!", file=sys.stderr) + sys.exit(1) + + signal.signal(signal.SIGALRM, on_alarm) + signal.alarm(4) + + thread = eventlet.spawn(run_observer) + thread.wait() diff --git a/tests/isolated/eventlet_skip_repeat_queue.py b/tests/isolated/eventlet_skip_repeat_queue.py new file mode 100644 index 00000000..05373934 --- /dev/null +++ b/tests/isolated/eventlet_skip_repeat_queue.py @@ -0,0 +1,33 @@ +if __name__ == '__main__': + import eventlet + + eventlet.monkey_patch() + + from watchdog.utils.bricks import SkipRepeatsQueue + + q = SkipRepeatsQueue(10) + q.put('A') + q.put('A') + q.put('A') + q.put('A') + q.put('B') + q.put('A') + + value = q.get() + assert value == 'A' + q.task_done() + + assert q.unfinished_tasks == 2 + + value = q.get() + assert value == 'B' + q.task_done() + + assert q.unfinished_tasks == 1 + + value = q.get() + assert value == 'A' + q.task_done() + + assert q.empty() + assert q.unfinished_tasks == 0 diff --git a/tests/markers.py b/tests/markers.py deleted file mode 100644 index 66291fca..00000000 --- a/tests/markers.py +++ /dev/null @@ -1,7 +0,0 @@ -from __future__ import annotations - -from platform import python_implementation - -import pytest - -cpython_only = pytest.mark.skipif(python_implementation() != "CPython", reason="CPython only.") diff --git a/tests/test_inotify_c.py b/tests/test_inotify_c.py index 5b34e6c5..8d4b59d4 100644 --- a/tests/test_inotify_c.py +++ b/tests/test_inotify_c.py @@ -11,6 +11,7 @@ import errno import logging import os +import select import struct from typing import TYPE_CHECKING from unittest.mock import patch @@ -56,6 +57,13 @@ def test_late_double_deletion(helper: Helper, p: P, event_queue: TestEventQueue, + struct_inotify(wd=3, mask=const.IN_IGNORED) ) + select_bkp = select.select + + def fakeselect(read_list, *args, **kwargs): + if inotify_fd in read_list: + return [inotify_fd], [], [] + return select_bkp(read_list, *args, **kwargs) + os_read_bkp = os.read def fakeread(fd, length): @@ -92,8 +100,9 @@ def inotify_rm_watch(fd, wd): mock3 = patch.object(inotify_c, "inotify_init", new=inotify_init) mock4 = patch.object(inotify_c, "inotify_add_watch", new=inotify_add_watch) mock5 = patch.object(inotify_c, "inotify_rm_watch", new=inotify_rm_watch) + mock6 = patch.object(select, "select", new=fakeselect) - with mock1, mock2, mock3, mock4, mock5: + with mock1, mock2, mock3, mock4, mock5, mock6: start_watching(path=p("")) # Watchdog Events for evt_cls in [DirCreatedEvent, DirDeletedEvent] * 2: diff --git a/tests/test_isolated.py b/tests/test_isolated.py new file mode 100644 index 00000000..2d3ff972 --- /dev/null +++ b/tests/test_isolated.py @@ -0,0 +1,24 @@ +import pytest +import importlib + +from watchdog.utils import platform + +from .utils import run_isolated_test + + +# Kqueue isn't supported by Eventlet, so BSD is out +# Current usage ReadDirectoryChangesW on Windows is blocking, though async may be possible +@pytest.mark.skipif(not platform.is_linux(), reason="Eventlet only supported in Linux") +def test_observer_stops_in_eventlet(): + if not importlib.util.find_spec('eventlet'): + pytest.skip("eventlet not installed") + + run_isolated_test('eventlet_observer_stops.py') + + +@pytest.mark.skipif(not platform.is_linux(), reason="Eventlet only supported in Linux") +def test_eventlet_skip_repeat_queue(): + if not importlib.util.find_spec('eventlet'): + pytest.skip("eventlet not installed") + + run_isolated_test('eventlet_skip_repeat_queue.py') diff --git a/tests/test_skip_repeats_queue.py b/tests/test_skip_repeats_queue.py index 27b1dfe4..d6f0f411 100644 --- a/tests/test_skip_repeats_queue.py +++ b/tests/test_skip_repeats_queue.py @@ -1,14 +1,10 @@ from __future__ import annotations -import pytest - from watchdog import events from watchdog.utils.bricks import SkipRepeatsQueue -from .markers import cpython_only - -def basic_actions(): +def test_basic_queue(): q = SkipRepeatsQueue() e1 = (2, "fred") @@ -25,10 +21,6 @@ def basic_actions(): assert q.empty() -def test_basic_queue(): - basic_actions() - - def test_allow_nonconsecutive(): q = SkipRepeatsQueue() @@ -86,10 +78,3 @@ def test_consecutives_allowed_across_empties(): q.put(e1) # this repeat is allowed because 'last' added is now gone from queue assert e1 == q.get() assert q.empty() - - -@cpython_only -def test_eventlet_monkey_patching(): - eventlet = pytest.importorskip("eventlet") - eventlet.monkey_patch() - basic_actions() diff --git a/tests/utils.py b/tests/utils.py index 8b82f3a8..d8f05b15 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,6 +2,8 @@ import dataclasses import os +import subprocess +import sys from queue import Queue from typing import Protocol @@ -97,3 +99,29 @@ def close(self) -> None: alive = [emitter.is_alive() for emitter in self.emitters] self.emitters = [] assert alive == [False] * len(alive) + + +def run_isolated_test(path): + ISOALTED_TEST_PREFIX = os.path.join('tests', 'isolated') + path = os.path.abspath(os.path.join(ISOALTED_TEST_PREFIX, path)) + + src_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'src') + new_env = os.environ.copy() + new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [src_dir]) + + new_argv = [sys.executable, path] + + p = subprocess.Popen( + new_argv, + env=new_env, + ) + + # in case test goes haywire, don't let it run forever + timeout = 10 + try: + p.communicate(timeout=timeout) + except subprocess.TimeoutExpired: + p.kill() + assert False, 'timed out' + + assert p.returncode == 0