diff --git a/src/watchdog/utils/dirsnapshot.py b/src/watchdog/utils/dirsnapshot.py index abf060469..d18dbbe98 100644 --- a/src/watchdog/utils/dirsnapshot.py +++ b/src/watchdog/utils/dirsnapshot.py @@ -119,6 +119,26 @@ def __init__(self, ref, snapshot): self._files_modified = list(modified - set(self._dirs_modified)) self._files_moved = list(moved - set(self._dirs_moved)) + def __str__(self): + return self.__repr__() + + def __repr__(self): + fmt = ( + '<{0} files(created={1}, deleted={2}, modified={3}, moved={4}),' + ' folders(created={5}, deleted={6}, modified={7}, moved={8})>' + ) + return fmt.format( + type(self).__name__, + len(self._files_created), + len(self._files_deleted), + len(self._files_modified), + len(self._files_moved), + len(self._dirs_created), + len(self._dirs_deleted), + len(self._dirs_modified), + len(self._dirs_moved) + ) + @property def files_created(self): """List of files that were created.""" @@ -205,6 +225,11 @@ def __init__(self, path, recursive=True, walker_callback=(lambda p, s: None), stat=default_stat, listdir=scandir): + self.recursive = recursive + self.walker_callback = walker_callback + self.stat = stat + self.listdir = listdir + self._stat_info = {} self._inode_to_path = {} @@ -212,39 +237,48 @@ def __init__(self, path, recursive=True, self._stat_info[path] = st self._inode_to_path[(st.st_ino, st.st_dev)] = path - def walk(root): - try: - paths = [os.path.join(root, entry if isinstance(entry, str) else entry.name) - for entry in listdir(root)] - except OSError as e: - # Directory may have been deleted between finding it in the directory - # list of its parent and trying to delete its contents. If this - # happens we treat it as empty. Likewise if the directory was replaced - # with a file of the same name (less likely, but possible). - if e.errno in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL): - return - else: - raise - entries = [] - for p in paths: - try: - entries.append((p, stat(p))) - except OSError: - continue - for _ in entries: - yield _ - if recursive: - for path, st in entries: - if S_ISDIR(st.st_mode): - for _ in walk(path): - yield _ - - for p, st in walk(path): + for p, st in self.walk(path): i = (st.st_ino, st.st_dev) self._inode_to_path[i] = p self._stat_info[p] = st walker_callback(p, st) + def walk(self, root): + try: + paths = [os.path.join(root, entry if isinstance(entry, str) else entry.name) + for entry in self.listdir(root)] + except OSError as e: + # Directory may have been deleted between finding it in the directory + # list of its parent and trying to delete its contents. If this + # happens we treat it as empty. Likewise if the directory was replaced + # with a file of the same name (less likely, but possible). + if e.errno in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL): + return + else: + raise + + entries = [] + for p in paths: + try: + entry = (p, self.stat(p)) + entries.append(entry) + yield entry + except OSError: + continue + + if self.recursive: + for path, st in entries: + try: + if S_ISDIR(st.st_mode): + for entry in self.walk(path): + yield entry + except (IOError, OSError) as e: + # IOError for Python 2 + # OSError for Python 3 + # (should be only PermissionError when dropping Python 2 support) + if e.errno != errno.EACCES: + raise + @property def paths(self): """ diff --git a/tests/test_snapshot_diff.py b/tests/test_snapshot_diff.py index 1f8dbe50e..9c881c3d6 100644 --- a/tests/test_snapshot_diff.py +++ b/tests/test_snapshot_diff.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import errno import os import time from .shell import mkdir, touch, mv, rm @@ -124,3 +125,34 @@ def listdir_fcn(path): # Should NOT raise an OSError (ENOTDIR) DirectorySnapshot(p('root'), listdir=listdir_fcn) + + +def test_permission_error(monkeypatch, p): + # Test that unreadable folders are not raising exceptions + mkdir(p('a', 'b', 'c'), parents=True) + + ref = DirectorySnapshot(p('')) + + def walk(self, root): + """Generate a permission error on folder "a/b".""" + # Generate the permission error + if root.startswith(p('a', 'b')): + raise OSError(errno.EACCES, os.strerror(errno.EACCES)) + + # Mimic the original method + for entry in walk_orig(self, root): + yield entry + + walk_orig = DirectorySnapshot.walk + monkeypatch.setattr(DirectorySnapshot, "walk", walk) + + # Should NOT raise an OSError (EACCES) + new_snapshot = DirectorySnapshot(p('')) + + monkeypatch.undo() + + diff = DirectorySnapshotDiff(ref, new_snapshot) + assert repr(diff) + + # Children of a/b/ are no more accessible and so removed in the new snapshot + assert diff.dirs_deleted == [(p('a', 'b', 'c'))]