From 2b2091a25033d15dcfb00ad6b8139e76d1e037ca Mon Sep 17 00:00:00 2001 From: mbakiev Date: Sun, 14 Oct 2018 20:59:42 -0600 Subject: [PATCH] Remove delay from non-move events Only add delay to IN_MOVE_* events if they do not have a matching event in the event buffer already. In practice, most filesystems implement moves as atomic operations, so we don't expect to see any delays in processing of inode events. --- src/watchdog/observers/inotify_buffer.py | 50 ++++++++++++++++-------- src/watchdog/utils/delayed_queue.py | 31 ++++++--------- tests/test_delayed_queue.py | 15 ++++++- 3 files changed, 59 insertions(+), 37 deletions(-) diff --git a/src/watchdog/observers/inotify_buffer.py b/src/watchdog/observers/inotify_buffer.py index dce2ae122..a8d2ba05c 100644 --- a/src/watchdog/observers/inotify_buffer.py +++ b/src/watchdog/observers/inotify_buffer.py @@ -50,6 +50,33 @@ def close(self): self.stop() self.join() + def _group_events(self, event_list): + """Group any matching move events""" + grouped = [] + for inotify_event in event_list: + logger.debug("in-event %s", inotify_event) + def matching_from_event(event): + return (not isinstance(event, tuple) and event.is_moved_from + and event.cookie == inotify_event.cookie) + + if inotify_event.is_moved_to: + # Check if move_from is already in the buffer + for index, event in enumerate(grouped): + if matching_from_event(event): + grouped[index] = (event, inotify_event) + break + else: + # Check if move_from is in delayqueue already + from_event = self._queue.remove(matching_from_event) + if from_event is not None: + grouped.append((from_event, inotify_event)) + else: + logger.debug("could not find matching move_from event") + grouped.append(inotify_event) + else: + grouped.append(inotify_event) + return grouped + def run(self): """Read event from `inotify` and add them to `queue`. When reading a IN_MOVE_TO event, remove the previous added matching IN_MOVE_FROM event @@ -58,24 +85,13 @@ def run(self): deleted_self = False while self.should_keep_running() and not deleted_self: inotify_events = self._inotify.read_events() - for inotify_event in inotify_events: - logger.debug("in-event %s", inotify_event) - if inotify_event.is_moved_to: - - def matching_from_event(event): - return (not isinstance(event, tuple) and event.is_moved_from - and event.cookie == inotify_event.cookie) - - from_event = self._queue.remove(matching_from_event) - if from_event is not None: - self._queue.put((from_event, inotify_event)) - else: - logger.debug("could not find matching move_from event") - self._queue.put(inotify_event) - else: - self._queue.put(inotify_event) + grouped_events = self._group_events(inotify_events) + for inotify_event in grouped_events: + # Only add delay for unmatched move_from events + delay = not isinstance(inotify_event, tuple) and inotify_event.is_moved_from + self._queue.put(inotify_event, delay) - if inotify_event.is_delete_self and \ + if not isinstance(inotify_event, tuple) and inotify_event.is_delete_self and \ inotify_event.src_path == self._inotify.path: # Deleted the watched directory, stop watching for events deleted_self = True diff --git a/src/watchdog/utils/delayed_queue.py b/src/watchdog/utils/delayed_queue.py index 6d98a5046..e5d91de8c 100644 --- a/src/watchdog/utils/delayed_queue.py +++ b/src/watchdog/utils/delayed_queue.py @@ -22,16 +22,16 @@ class DelayedQueue(object): def __init__(self, delay): - self.delay = delay + self.delay_sec = delay self._lock = threading.Lock() self._not_empty = threading.Condition(self._lock) self._queue = deque() self._closed = False - def put(self, element): + def put(self, element, delay=False): """Add element to queue.""" self._lock.acquire() - self._queue.append((element, time.time())) + self._queue.append((element, time.time(), delay)) self._not_empty.notify() self._lock.release() @@ -56,33 +56,28 @@ def get(self): if self._closed: self._not_empty.release() return None - head, insert_time = self._queue[0] + head, insert_time, delay = self._queue[0] self._not_empty.release() - # wait for delay - time_left = insert_time + self.delay - time.time() - while time_left > 0: - time.sleep(time_left) - time_left = insert_time + self.delay - time.time() + # wait for delay if required + if delay: + time_left = insert_time + self.delay_sec - time.time() + while time_left > 0: + time.sleep(time_left) + time_left = insert_time + self.delay_sec - time.time() # return element if it's still in the queue - self._lock.acquire() - try: + with self._lock: if len(self._queue) > 0 and self._queue[0][0] is head: self._queue.popleft() return head - finally: - self._lock.release() def remove(self, predicate): """Remove and return the first items for which predicate is True, ignoring delay.""" - try: - self._lock.acquire() - for i, (elem, t) in enumerate(self._queue): + with self._lock: + for i, (elem, t, delay) in enumerate(self._queue): if predicate(elem): del self._queue[i] return elem - finally: - self._lock.release() return None diff --git a/tests/test_delayed_queue.py b/tests/test_delayed_queue.py index 29263d8ab..024a73438 100644 --- a/tests/test_delayed_queue.py +++ b/tests/test_delayed_queue.py @@ -18,11 +18,22 @@ from watchdog.utils.delayed_queue import DelayedQueue -def test_get(): +def test_delayed_get(): q = DelayedQueue(2) - q.put("") + q.put("", True) inserted = time() q.get() elapsed = time() - inserted # 2.10 instead of 2.05 for slow macOS slaves on Travis assert 2.10 > elapsed > 1.99 + +def test_nondelayed_get(): + q = DelayedQueue(2) + q.put("", False) + inserted = time() + q.get() + elapsed = time() - inserted + # Far less than 1 second + assert elapsed < 1 + +