Skip to content

Commit

Permalink
Remove delay from non-move events
Browse files Browse the repository at this point in the history
Only add delay to IN_MOVE_* events if they do not have a matching
event in the event buffer already. In practice, most filesystems
implement moves as atomic operations, so we don't expect to see
any delays in processing of inode events.
  • Loading branch information
mbakiev authored and BoboTiG committed Jan 12, 2019
1 parent 6cf6d5d commit 2b2091a
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 37 deletions.
50 changes: 33 additions & 17 deletions src/watchdog/observers/inotify_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,33 @@ def close(self):
self.stop()
self.join()

def _group_events(self, event_list):
"""Group any matching move events"""
grouped = []
for inotify_event in event_list:
logger.debug("in-event %s", inotify_event)
def matching_from_event(event):
return (not isinstance(event, tuple) and event.is_moved_from
and event.cookie == inotify_event.cookie)

if inotify_event.is_moved_to:
# Check if move_from is already in the buffer
for index, event in enumerate(grouped):
if matching_from_event(event):
grouped[index] = (event, inotify_event)
break
else:
# Check if move_from is in delayqueue already
from_event = self._queue.remove(matching_from_event)
if from_event is not None:
grouped.append((from_event, inotify_event))
else:
logger.debug("could not find matching move_from event")
grouped.append(inotify_event)
else:
grouped.append(inotify_event)
return grouped

def run(self):
"""Read event from `inotify` and add them to `queue`. When reading a
IN_MOVE_TO event, remove the previous added matching IN_MOVE_FROM event
Expand All @@ -58,24 +85,13 @@ def run(self):
deleted_self = False
while self.should_keep_running() and not deleted_self:
inotify_events = self._inotify.read_events()
for inotify_event in inotify_events:
logger.debug("in-event %s", inotify_event)
if inotify_event.is_moved_to:

def matching_from_event(event):
return (not isinstance(event, tuple) and event.is_moved_from
and event.cookie == inotify_event.cookie)

from_event = self._queue.remove(matching_from_event)
if from_event is not None:
self._queue.put((from_event, inotify_event))
else:
logger.debug("could not find matching move_from event")
self._queue.put(inotify_event)
else:
self._queue.put(inotify_event)
grouped_events = self._group_events(inotify_events)
for inotify_event in grouped_events:
# Only add delay for unmatched move_from events
delay = not isinstance(inotify_event, tuple) and inotify_event.is_moved_from
self._queue.put(inotify_event, delay)

if inotify_event.is_delete_self and \
if not isinstance(inotify_event, tuple) and inotify_event.is_delete_self and \
inotify_event.src_path == self._inotify.path:
# Deleted the watched directory, stop watching for events
deleted_self = True
31 changes: 13 additions & 18 deletions src/watchdog/utils/delayed_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@
class DelayedQueue(object):

def __init__(self, delay):
self.delay = delay
self.delay_sec = delay
self._lock = threading.Lock()
self._not_empty = threading.Condition(self._lock)
self._queue = deque()
self._closed = False

def put(self, element):
def put(self, element, delay=False):
"""Add element to queue."""
self._lock.acquire()
self._queue.append((element, time.time()))
self._queue.append((element, time.time(), delay))
self._not_empty.notify()
self._lock.release()

Expand All @@ -56,33 +56,28 @@ def get(self):
if self._closed:
self._not_empty.release()
return None
head, insert_time = self._queue[0]
head, insert_time, delay = self._queue[0]
self._not_empty.release()

# wait for delay
time_left = insert_time + self.delay - time.time()
while time_left > 0:
time.sleep(time_left)
time_left = insert_time + self.delay - time.time()
# wait for delay if required
if delay:
time_left = insert_time + self.delay_sec - time.time()
while time_left > 0:
time.sleep(time_left)
time_left = insert_time + self.delay_sec - time.time()

# return element if it's still in the queue
self._lock.acquire()
try:
with self._lock:
if len(self._queue) > 0 and self._queue[0][0] is head:
self._queue.popleft()
return head
finally:
self._lock.release()

def remove(self, predicate):
"""Remove and return the first items for which predicate is True,
ignoring delay."""
try:
self._lock.acquire()
for i, (elem, t) in enumerate(self._queue):
with self._lock:
for i, (elem, t, delay) in enumerate(self._queue):
if predicate(elem):
del self._queue[i]
return elem
finally:
self._lock.release()
return None
15 changes: 13 additions & 2 deletions tests/test_delayed_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,22 @@
from watchdog.utils.delayed_queue import DelayedQueue


def test_get():
def test_delayed_get():
q = DelayedQueue(2)
q.put("")
q.put("", True)
inserted = time()
q.get()
elapsed = time() - inserted
# 2.10 instead of 2.05 for slow macOS slaves on Travis
assert 2.10 > elapsed > 1.99

def test_nondelayed_get():
q = DelayedQueue(2)
q.put("", False)
inserted = time()
q.get()
elapsed = time() - inserted
# Far less than 1 second
assert elapsed < 1


0 comments on commit 2b2091a

Please sign in to comment.