Skip to content

Commit

Permalink
fix: properly clean up threads when stopping Inotify. Improve Eventle…
Browse files Browse the repository at this point in the history
…t tests. (#1070)

* Improve cleaning up Inotify threads and add eventlet test cases.

* Align SkipRepeatsQueue with Eventlet's Queue implementation.

* Only run eventlet tests in Linux.
  • Loading branch information
ethan-vanderheijden authored Sep 22, 2024
1 parent 4e9a86d commit 29393f4
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 37 deletions.
42 changes: 36 additions & 6 deletions src/watchdog/observers/inotify_c.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import ctypes.util
import errno
import os
import select
import struct
import threading
from ctypes import c_char_p, c_int, c_uint32
Expand Down Expand Up @@ -148,6 +149,9 @@ def __init__(self, path: bytes, *, recursive: bool = False, event_mask: int | No
Inotify._raise_error()
self._inotify_fd = inotify_fd
self._lock = threading.Lock()
self._closed = False
self._waiting_to_read = True
self._kill_r, self._kill_w = os.pipe()

# Stores the watch descriptor for a given path.
self._wd_for_path: dict[bytes, int] = {}
Expand Down Expand Up @@ -230,13 +234,19 @@ def remove_watch(self, path: bytes) -> None:
def close(self) -> None:
"""Closes the inotify instance and removes all associated watches."""
with self._lock:
if self._path in self._wd_for_path:
wd = self._wd_for_path[self._path]
inotify_rm_watch(self._inotify_fd, wd)
if not self._closed:
self._closed = True

# descriptor may be invalid because file was deleted
with contextlib.suppress(OSError):
os.close(self._inotify_fd)
if self._path in self._wd_for_path:
wd = self._wd_for_path[self._path]
inotify_rm_watch(self._inotify_fd, wd)

if self._waiting_to_read:
# inotify_rm_watch() should write data to _inotify_fd and wake
# the thread, but writing to the kill channel will gaurentee this
os.write(self._kill_w, b'!')
else:
self._close_resources()

def read_events(self, *, event_buffer_size: int = DEFAULT_EVENT_BUFFER_SIZE) -> list[InotifyEvent]:
"""Reads events from inotify and yields them."""
Expand Down Expand Up @@ -276,6 +286,21 @@ def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]:
event_buffer = None
while True:
try:
with self._lock:
if self._closed:
return []

self._waiting_to_read = True

select.select([self._inotify_fd, self._kill_r], [], [])

with self._lock:
self._waiting_to_read = False

if self._closed:
self._close_resources()
return []

event_buffer = os.read(self._inotify_fd, event_buffer_size)
except OSError as e:
if e.errno == errno.EINTR:
Expand Down Expand Up @@ -340,6 +365,11 @@ def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]:

return event_list

def _close_resources(self):
os.close(self._inotify_fd)
os.close(self._kill_r)
os.close(self._kill_w)

# Non-synchronized methods.
def _add_dir_watch(self, path: bytes, mask: int, *, recursive: bool) -> None:
"""Adds a watch (optionally recursively) for the given directory path
Expand Down
13 changes: 6 additions & 7 deletions src/watchdog/utils/bricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,13 @@ def _init(self, maxsize: int) -> None:
super()._init(maxsize)
self._last_item = None

def _put(self, item: Any) -> None:
def put(self, item: Any, block: bool = True, timeout: float | None = None) -> None:
if self._last_item is None or item != self._last_item:
super()._put(item)
self._last_item = item
else:
# `put` increments `unfinished_tasks` even if we did not put
# anything into the queue here
self.unfinished_tasks -= 1
super().put(item, block, timeout)

def _put(self, item: Any) -> None:
super()._put(item)
self._last_item = item

def _get(self) -> Any:
item = super()._get()
Expand Down
30 changes: 30 additions & 0 deletions tests/isolated/eventlet_observer_stops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
if __name__ == '__main__':
import eventlet

eventlet.monkey_patch()

import signal
import sys
import tempfile

from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler

with tempfile.TemporaryDirectory() as temp_dir:
def run_observer():
event_handler = LoggingEventHandler()
observer = Observer()
observer.schedule(event_handler, temp_dir)
observer.start()
eventlet.sleep(1)
observer.stop()

def on_alarm(signum, frame):
print("Observer.stop() never finished!", file=sys.stderr)
sys.exit(1)

signal.signal(signal.SIGALRM, on_alarm)
signal.alarm(4)

thread = eventlet.spawn(run_observer)
thread.wait()
33 changes: 33 additions & 0 deletions tests/isolated/eventlet_skip_repeat_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
if __name__ == '__main__':
import eventlet

eventlet.monkey_patch()

from watchdog.utils.bricks import SkipRepeatsQueue

q = SkipRepeatsQueue(10)
q.put('A')
q.put('A')
q.put('A')
q.put('A')
q.put('B')
q.put('A')

value = q.get()
assert value == 'A'
q.task_done()

assert q.unfinished_tasks == 2

value = q.get()
assert value == 'B'
q.task_done()

assert q.unfinished_tasks == 1

value = q.get()
assert value == 'A'
q.task_done()

assert q.empty()
assert q.unfinished_tasks == 0
7 changes: 0 additions & 7 deletions tests/markers.py

This file was deleted.

11 changes: 10 additions & 1 deletion tests/test_inotify_c.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import errno
import logging
import os
import select
import struct
from typing import TYPE_CHECKING
from unittest.mock import patch
Expand Down Expand Up @@ -56,6 +57,13 @@ def test_late_double_deletion(helper: Helper, p: P, event_queue: TestEventQueue,
+ struct_inotify(wd=3, mask=const.IN_IGNORED)
)

select_bkp = select.select

def fakeselect(read_list, *args, **kwargs):
if inotify_fd in read_list:
return [inotify_fd], [], []
return select_bkp(read_list, *args, **kwargs)

os_read_bkp = os.read

def fakeread(fd, length):
Expand Down Expand Up @@ -92,8 +100,9 @@ def inotify_rm_watch(fd, wd):
mock3 = patch.object(inotify_c, "inotify_init", new=inotify_init)
mock4 = patch.object(inotify_c, "inotify_add_watch", new=inotify_add_watch)
mock5 = patch.object(inotify_c, "inotify_rm_watch", new=inotify_rm_watch)
mock6 = patch.object(select, "select", new=fakeselect)

with mock1, mock2, mock3, mock4, mock5:
with mock1, mock2, mock3, mock4, mock5, mock6:
start_watching(path=p(""))
# Watchdog Events
for evt_cls in [DirCreatedEvent, DirDeletedEvent] * 2:
Expand Down
24 changes: 24 additions & 0 deletions tests/test_isolated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pytest
import importlib

from watchdog.utils import platform

from .utils import run_isolated_test


# Kqueue isn't supported by Eventlet, so BSD is out
# Current usage ReadDirectoryChangesW on Windows is blocking, though async may be possible
@pytest.mark.skipif(not platform.is_linux(), reason="Eventlet only supported in Linux")
def test_observer_stops_in_eventlet():
if not importlib.util.find_spec('eventlet'):
pytest.skip("eventlet not installed")

run_isolated_test('eventlet_observer_stops.py')


@pytest.mark.skipif(not platform.is_linux(), reason="Eventlet only supported in Linux")
def test_eventlet_skip_repeat_queue():
if not importlib.util.find_spec('eventlet'):
pytest.skip("eventlet not installed")

run_isolated_test('eventlet_skip_repeat_queue.py')
17 changes: 1 addition & 16 deletions tests/test_skip_repeats_queue.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
from __future__ import annotations

import pytest

from watchdog import events
from watchdog.utils.bricks import SkipRepeatsQueue

from .markers import cpython_only


def basic_actions():
def test_basic_queue():
q = SkipRepeatsQueue()

e1 = (2, "fred")
Expand All @@ -25,10 +21,6 @@ def basic_actions():
assert q.empty()


def test_basic_queue():
basic_actions()


def test_allow_nonconsecutive():
q = SkipRepeatsQueue()

Expand Down Expand Up @@ -86,10 +78,3 @@ def test_consecutives_allowed_across_empties():
q.put(e1) # this repeat is allowed because 'last' added is now gone from queue
assert e1 == q.get()
assert q.empty()


@cpython_only
def test_eventlet_monkey_patching():
eventlet = pytest.importorskip("eventlet")
eventlet.monkey_patch()
basic_actions()
28 changes: 28 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import dataclasses
import os
import subprocess
import sys
from queue import Queue
from typing import Protocol

Expand Down Expand Up @@ -97,3 +99,29 @@ def close(self) -> None:
alive = [emitter.is_alive() for emitter in self.emitters]
self.emitters = []
assert alive == [False] * len(alive)


def run_isolated_test(path):
ISOALTED_TEST_PREFIX = os.path.join('tests', 'isolated')
path = os.path.abspath(os.path.join(ISOALTED_TEST_PREFIX, path))

src_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'src')
new_env = os.environ.copy()
new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [src_dir])

new_argv = [sys.executable, path]

p = subprocess.Popen(
new_argv,
env=new_env,
)

# in case test goes haywire, don't let it run forever
timeout = 10
try:
p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
p.kill()
assert False, 'timed out'

assert p.returncode == 0

0 comments on commit 29393f4

Please sign in to comment.