Skip to content

Commit

Permalink
[Mac] Performance improvements for the fsevents module (#680)
Browse files Browse the repository at this point in the history
* Enable file-level watches in fsevents

* Remove obsolete snapshot attribute

* Add Changelog entry
  • Loading branch information
CCP-Aporia authored Oct 10, 2020
1 parent d95692b commit 3b9904c
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 41 deletions.
3 changes: 2 additions & 1 deletion changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Changelog

- Add logger parameter for the LoggingEventHandler (`#676 <https://github.com/gorakhargosh/watchdog/pull/676>`_)
- Replace mutable default arguments with ``if None`` implementation (`#677 <https://github.com/gorakhargosh/watchdog/pull/677>`_)
- Thanks to our beloved contributors: @Sraw
- [mac] Performance improvements for the `fsevents` module (`#680 <https://github.com/gorakhargosh/watchdog/pull/680>`_)
- Thanks to our beloved contributors: @Sraw, @CCP-Aporia


0.10.3
Expand Down
83 changes: 53 additions & 30 deletions src/watchdog/observers/fsevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from __future__ import with_statement

import os
import sys
import threading
import unicodedata
Expand All @@ -41,7 +42,6 @@
DirMovedEvent
)

from watchdog.utils.dirsnapshot import DirectorySnapshot
from watchdog.observers.api import (
BaseObserver,
EventEmitter,
Expand Down Expand Up @@ -70,7 +70,6 @@ class FSEventsEmitter(EventEmitter):
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
EventEmitter.__init__(self, event_queue, watch, timeout)
self._lock = threading.Lock()
self.snapshot = DirectorySnapshot(watch.path, watch.is_recursive)

def on_thread_stop(self):
if self.watch:
Expand All @@ -80,37 +79,61 @@ def on_thread_stop(self):

def queue_events(self, timeout):
with self._lock:
if (not self.watch.is_recursive
and self.watch.path not in self.pathnames):
return
new_snapshot = DirectorySnapshot(self.watch.path,
self.watch.is_recursive)
events = new_snapshot - self.snapshot
self.snapshot = new_snapshot

# Files.
for src_path in events.files_deleted:
self.queue_event(FileDeletedEvent(src_path))
for src_path in events.files_modified:
self.queue_event(FileModifiedEvent(src_path))
for src_path in events.files_created:
self.queue_event(FileCreatedEvent(src_path))
for src_path, dest_path in events.files_moved:
self.queue_event(FileMovedEvent(src_path, dest_path))

# Directories.
for src_path in events.dirs_deleted:
self.queue_event(DirDeletedEvent(src_path))
for src_path in events.dirs_modified:
self.queue_event(DirModifiedEvent(src_path))
for src_path in events.dirs_created:
self.queue_event(DirCreatedEvent(src_path))
for src_path, dest_path in events.dirs_moved:
self.queue_event(DirMovedEvent(src_path, dest_path))
events = self.native_events
i = 0
while i < len(events):
event = events[i]

# For some reason the create and remove flags are sometimes also
# set for rename and modify type events, so let those take
# precedence.
if event.is_renamed:
# Internal moves appears to always be consecutive in the same
# buffer and have IDs differ by exactly one (while others
# don't) making it possible to pair up the two events coming
# from a singe move operation. (None of this is documented!)
# Otherwise, guess whether file was moved in or out.
# TODO: handle id wrapping
if (i + 1 < len(events) and events[i + 1].is_renamed
and events[i + 1].event_id == event.event_id + 1):
cls = DirMovedEvent if event.is_directory else FileMovedEvent
self.queue_event(cls(event.path, events[i + 1].path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
self.queue_event(DirModifiedEvent(os.path.dirname(events[i + 1].path)))
i += 1
elif os.path.exists(event.path):
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
else:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
# TODO: generate events for tree

elif event.is_modified or event.is_inode_meta_mod or event.is_xattr_mod :
cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
self.queue_event(cls(event.path))

elif event.is_created:
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))

elif event.is_removed:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
i += 1

def run(self):
try:
def callback(pathnames, flags, emitter=self):
def callback(pathnames, flags, ids, emitter=self):
with emitter._lock:
emitter.native_events = [
_fsevents.NativeEvent(event_path, event_flags, event_id)
for event_path, event_flags, event_id in zip(pathnames, flags, ids)
]
emitter.queue_events(emitter.timeout)

# for pathname, flag in zip(pathnames, flags):
Expand Down
159 changes: 149 additions & 10 deletions src/watchdog_fsevents.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
#define G_RETURN_IF_NOT(condition) do { if (!condition) { return; } } while (0)
#define UNUSED(x) (void)x

#if PY_MAJOR_VERSION < 3
#define AS_PYTHON_STRING(x) PyString_FromString(x)
#else /* PY_MAJOR_VERSION < 3 */
#define AS_PYTHON_STRING(x) PyUnicode_FromString(x)
#endif /* PY_MAJOR_VERSION < 3 */

/* Error message definitions. */
#define ERROR_CANNOT_CALL_CALLBACK "Unable to call Python callback."

Expand All @@ -56,7 +62,7 @@ typedef struct {
* function must accept 2 arguments, both of which
* are Python lists::
*
* def python_callback(event_paths, event_flags):
* def python_callback(event_paths, event_flags, event_ids):
* pass
*/
PyObject *python_callback;
Expand All @@ -77,6 +83,116 @@ typedef struct {
} StreamCallbackInfo;


/**
* NativeEvent type so that we don't need to expose the FSEvents constants to Python land
*/
typedef struct {
PyObject_HEAD
const char *path;
FSEventStreamEventFlags flags;
FSEventStreamEventId id;
} NativeEventObject;

PyObject* NativeEventTypeString(PyObject* instance, void* closure)
{
UNUSED(closure);
NativeEventObject *self = (NativeEventObject*)instance;
if (self->flags & kFSEventStreamEventFlagItemCreated)
return AS_PYTHON_STRING("Created");
if (self->flags & kFSEventStreamEventFlagItemRemoved)
return AS_PYTHON_STRING("Removed");
if (self->flags & kFSEventStreamEventFlagItemRenamed)
return AS_PYTHON_STRING("Renamed");
if (self->flags & kFSEventStreamEventFlagItemModified)
return AS_PYTHON_STRING("Modified");

return AS_PYTHON_STRING("Unknown");
}

PyObject* NativeEventTypeFlags(PyObject* instance, void* closure)
{
UNUSED(closure);
NativeEventObject *self = (NativeEventObject*)instance;
#if PY_MAJOR_VERSION < 3
return PyInt_FromLong(self->flags);
#else /* PY_MAJOR_VERSION < 3 */
return PyLong_FromLong(self->flags);
#endif /* PY_MAJOR_VERSION < 3 */
}

PyObject* NativeEventTypePath(PyObject* instance, void* closure)
{
UNUSED(closure);
NativeEventObject *self = (NativeEventObject*)instance;
return AS_PYTHON_STRING(self->path);
}

PyObject* NativeEventTypeID(PyObject* instance, void* closure)
{
UNUSED(closure);
NativeEventObject *self = (NativeEventObject*)instance;
#if PY_MAJOR_VERSION < 3
return PyInt_FromLong(self->id);
#else /* PY_MAJOR_VERSION < 3 */
return PyLong_FromLong(self->id);
#endif /* PY_MAJOR_VERSION < 3 */
}

#define FLAG_PROPERTY(suffix, flag) \
PyObject* NativeEventType##suffix(PyObject* instance, void* closure) \
{ \
UNUSED(closure); \
NativeEventObject *self = (NativeEventObject*)instance; \
if (self->flags & flag) { \
Py_RETURN_TRUE; \
} \
Py_RETURN_FALSE; \
}

FLAG_PROPERTY(IsCreated, kFSEventStreamEventFlagItemCreated)
FLAG_PROPERTY(IsRemoved, kFSEventStreamEventFlagItemRemoved)
FLAG_PROPERTY(IsRenamed, kFSEventStreamEventFlagItemRenamed)
FLAG_PROPERTY(IsModified, kFSEventStreamEventFlagItemModified)
FLAG_PROPERTY(IsDirectory, kFSEventStreamEventFlagItemIsDir)

static int NativeEventInit(NativeEventObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"path", "flags", "id", NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwds, "|sIL", kwlist, &self->path, &self->flags, &self->id)) {
return -1;
}

return 0;
}

static PyGetSetDef NativeEventProperties[] = {
{"_event_type", NativeEventTypeString, NULL, "Textual representation of the native event that occurred", NULL},
{"flags", NativeEventTypeFlags, NULL, "The raw mask of flags as returend by FSEvents", NULL},
{"path", NativeEventTypePath, NULL, "The path for which this event was generated", NULL},
{"id", NativeEventTypeID, NULL, "The id of the generated event", NULL},
{"is_created", NativeEventTypeIsCreated, NULL, "True if self.path was created on the filesystem", NULL},
{"is_removed", NativeEventTypeIsRemoved, NULL, "True if self.path was removed from the filesystem", NULL},
{"is_renamed", NativeEventTypeIsRenamed, NULL, "True if self.path was renamed on the filesystem", NULL},
{"is_modified", NativeEventTypeIsModified, NULL, "True if self.path was modified", NULL},
{"is_directory", NativeEventTypeIsDirectory, NULL, "True if self.path is a directory", NULL},
{NULL, NULL, NULL, NULL, NULL},
};


static PyTypeObject NativeEventType = {
PyVarObject_HEAD_INIT(NULL, 0)
.tp_name = "_watchdog_fsevents.NativeEvent",
.tp_doc = "A wrapper around native FSEvents events",
.tp_basicsize = sizeof(NativeEventObject),
.tp_itemsize = 0,
.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
.tp_new = PyType_GenericNew,
.tp_getset = NativeEventProperties,
.tp_init = (initproc) NativeEventInit,
};


/**
* Dictionary to keep track of which run loop
* belongs to which emitter thread.
Expand Down Expand Up @@ -136,12 +252,13 @@ watchdog_FSEventStreamCallback(ConstFSEventStreamRef stream_ref,
const FSEventStreamEventId event_ids[])
{
UNUSED(stream_ref);
UNUSED(event_ids);
size_t i = 0;
PyObject *callback_result = NULL;
PyObject *path = NULL;
PyObject *id = NULL;
PyObject *flags = NULL;
PyObject *py_event_flags = NULL;
PyObject *py_event_ids = NULL;
PyObject *py_event_paths = NULL;
PyThreadState *saved_thread_state = NULL;

Expand All @@ -152,41 +269,46 @@ watchdog_FSEventStreamCallback(ConstFSEventStreamRef stream_ref,
/* Convert event flags and paths to Python ints and strings. */
py_event_paths = PyList_New(num_events);
py_event_flags = PyList_New(num_events);
if (G_NOT(py_event_paths && py_event_flags))
py_event_ids = PyList_New(num_events);
if (G_NOT(py_event_paths && py_event_flags && py_event_ids))
{
Py_DECREF(py_event_paths);
Py_DECREF(py_event_ids);
Py_DECREF(py_event_flags);
return /*NULL*/;
}
for (i = 0; i < num_events; ++i)
{
id = PyLong_FromLongLong(event_flags[i]);
#if PY_MAJOR_VERSION >= 3
path = PyUnicode_FromString(event_paths[i]);
flags = PyLong_FromLong(event_flags[i]);
#else
path = PyString_FromString(event_paths[i]);
flags = PyInt_FromLong(event_flags[i]);
#endif
if (G_NOT(path && flags))
if (G_NOT(path && flags && id))
{
Py_DECREF(py_event_paths);
Py_DECREF(py_event_flags);
Py_DECREF(py_event_ids);
return /*NULL*/;
}
PyList_SET_ITEM(py_event_paths, i, path);
PyList_SET_ITEM(py_event_flags, i, flags);
PyList_SET_ITEM(py_event_ids, i, id);
}

/* Call the Python callback function supplied by the stream information
* struct. The Python callback function should accept two arguments,
* both being Python lists:
*
* def python_callback(event_paths, event_flags):
* def python_callback(event_paths, event_flags, event_ids):
* pass
*/
callback_result = \
PyObject_CallFunction(stream_callback_info_ref->python_callback,
"OO", py_event_paths, py_event_flags);
"OOO", py_event_paths, py_event_flags, py_event_ids);
if (G_IS_NULL(callback_result))
{
if (G_NOT(PyErr_Occurred()))
Expand Down Expand Up @@ -306,7 +428,7 @@ watchdog_FSEventStreamCreate(StreamCallbackInfo *stream_callback_info_ref,
paths,
kFSEventStreamEventIdSinceNow,
stream_latency,
kFSEventStreamCreateFlagNoDefer);
kFSEventStreamCreateFlagNoDefer | kFSEventStreamCreateFlagFileEvents);
CFRelease(paths);
return stream_ref;
}
Expand All @@ -322,9 +444,9 @@ PyDoc_STRVAR(watchdog_add_watch__doc__,
:param callback:\n\
The callback function to call when an event occurs.\n\n\
Example::\n\n\
def callback(paths, flags):\n\
for path, flag in zip(paths, flags):\n\
print(\"%s=%ul\" % (path, flag))\n\
def callback(paths, flags, ids):\n\
for path, flag, event_id in zip(paths, flags, ids):\n\
print(\"%d: %s=%ul\" % (event_id, path, flag))\n\
:param paths:\n\
A list of paths to monitor.\n");
static PyObject *
Expand Down Expand Up @@ -589,9 +711,18 @@ watchdog_module_add_attributes(PyObject *module)
void
init_watchdog_fsevents(void)
{
NativeEventType.tp_new = PyType_GenericNew;
G_RETURN_IF(PyType_Ready(&NativeEventType) < 0);
PyObject *module = Py_InitModule3(MODULE_NAME,
watchdog_fsevents_methods,
watchdog_fsevents_module__doc__);
G_RETURN_IF(module == NULL);
Py_INCREF(&NativeEventType);
if (PyModule_AddObject(module, "NativeEvent", (PyObject*)&NativeEventType) < 0) {
Py_DECREF(&NativeEventType);
Py_DECREF(module);
return;
}
watchdog_module_add_attributes(module);
watchdog_module_init();
}
Expand All @@ -615,7 +746,15 @@ static struct PyModuleDef watchdog_fsevents_module = {
*/
PyMODINIT_FUNC
PyInit__watchdog_fsevents(void){
G_RETURN_NULL_IF(PyType_Ready(&NativeEventType) < 0);
PyObject *module = PyModule_Create(&watchdog_fsevents_module);
G_RETURN_NULL_IF_NULL(module);
Py_INCREF(&NativeEventType);
if (PyModule_AddObject(module, "NativeEvent", (PyObject*)&NativeEventType) < 0) {
Py_DECREF(&NativeEventType);
Py_DECREF(module);
return NULL;
}
watchdog_module_add_attributes(module);
watchdog_module_init();
return module;
Expand Down

0 comments on commit 3b9904c

Please sign in to comment.