From cb0d3aae5695722421372d68b19927e6b9501389 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Thu, 29 Apr 2021 16:37:03 -0700 Subject: [PATCH] Fix for recursive symlink - Notebook 4670 This change also prevents permission-related exceptions from logging the file. Co-authored-by: Shane Canon Co-authored-by: Thomas Kluyver --- .../services/contents/filemanager.py | 37 ++++++++++++++----- .../tests/services/contents/test_manager.py | 37 +++++++++++++++---- 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/jupyter_server/services/contents/filemanager.py b/jupyter_server/services/contents/filemanager.py index 6f465cf9e8..5166969427 100644 --- a/jupyter_server/services/contents/filemanager.py +++ b/jupyter_server/services/contents/filemanager.py @@ -273,7 +273,7 @@ def _dir_model(self, path, content=True): # skip over broken symlinks in listing if e.errno == errno.ENOENT: self.log.warning("%s doesn't exist", os_path) - else: + elif e.errno != errno.EACCES: # Don't provide clues about protected files self.log.warning("Error stat-ing %s: %s", os_path, e) continue @@ -283,17 +283,25 @@ def _dir_model(self, path, content=True): self.log.debug("%s not a regular file", os_path) continue - if self.should_list(name): - if self.allow_hidden or not is_file_hidden(os_path, stat_res=st): - contents.append( + try: + if self.should_list(name): + if self.allow_hidden or not is_file_hidden(os_path, stat_res=st): + contents.append( self.get(path='%s/%s' % (path, name), content=False) + ) + except OSError as e: + # ELOOP: recursive symlink, also don't show failure due to permissions + if e.errno not in [errno.ELOOP, errno.EACCES]: + self.log.warning( + "Unknown error checking if file %r is hidden", + os_path, + exc_info=True, ) model['format'] = 'json' return model - def _file_model(self, path, content=True, format=None): """Build a model for a file @@ -585,7 +593,7 @@ async def _dir_model(self, path, content=True): # skip over broken symlinks in listing if e.errno == errno.ENOENT: self.log.warning("%s doesn't exist", os_path) - else: + elif e.errno != errno.EACCES: # Don't provide clues about protected files self.log.warning("Error stat-ing %s: %s", os_path, e) continue @@ -595,10 +603,19 @@ async def _dir_model(self, path, content=True): self.log.debug("%s not a regular file", os_path) continue - if self.should_list(name): - if self.allow_hidden or not is_file_hidden(os_path, stat_res=st): - contents.append( - await self.get(path='%s/%s' % (path, name), content=False) + try: + if self.should_list(name): + if self.allow_hidden or not is_file_hidden(os_path, stat_res=st): + contents.append( + await self.get(path='%s/%s' % (path, name), content=False) + ) + except OSError as e: + # ELOOP: recursive symlink, also don't show failure due to permissions + if e.errno not in [errno.ELOOP, errno.EACCES]: + self.log.warning( + "Unknown error checking if file %r is hidden", + os_path, + exc_info=True, ) model['format'] = 'json' diff --git a/jupyter_server/tests/services/contents/test_manager.py b/jupyter_server/tests/services/contents/test_manager.py index 27534dd8fc..98f358645c 100644 --- a/jupyter_server/tests/services/contents/test_manager.py +++ b/jupyter_server/tests/services/contents/test_manager.py @@ -2,18 +2,17 @@ import sys import time import pytest -import functools from traitlets import TraitError from tornado.web import HTTPError from itertools import combinations - from nbformat import v4 as nbformat from jupyter_server.services.contents.filemanager import AsyncFileContentsManager, FileContentsManager from jupyter_server.utils import ensure_async from ...utils import expected_http_error + @pytest.fixture(params=[(FileContentsManager, True), (FileContentsManager, False), (AsyncFileContentsManager, True), @@ -29,6 +28,7 @@ def file_contents_manager_class(request, tmp_path): # -------------- Functions ---------------------------- + def _make_dir(jp_contents_manager, api_path): """ Make a directory. @@ -99,6 +99,7 @@ async def check_populated_dir_files(jp_contents_manager, api_path): # ----------------- Tests ---------------------------------- + def test_root_dir(file_contents_manager_class, tmp_path): fm = file_contents_manager_class(root_dir=str(tmp_path)) assert fm.root_dir == str(tmp_path) @@ -116,6 +117,7 @@ def test_invalid_root_dir(file_contents_manager_class, tmp_path): with pytest.raises(TraitError): file_contents_manager_class(root_dir=str(temp_file)) + def test_get_os_path(file_contents_manager_class, tmp_path): fm = file_contents_manager_class(root_dir=str(tmp_path)) path = fm._get_os_path('/path/to/notebook/test.ipynb') @@ -146,10 +148,6 @@ def test_checkpoint_subdir(file_contents_manager_class, tmp_path): assert cp_dir == os.path.join(str(tmp_path), cpm.checkpoint_dir, cp_name) -@pytest.mark.skipif( - sys.platform == 'win32' and sys.version_info[0] < 3, - reason="System platform is Windows, version < 3" -) async def test_bad_symlink(file_contents_manager_class, tmp_path): td = str(tmp_path) @@ -172,9 +170,31 @@ async def test_bad_symlink(file_contents_manager_class, tmp_path): @pytest.mark.skipif( - sys.platform == 'win32' and sys.version_info[0] < 3, - reason="System platform is Windows, version < 3" + sys.platform.startswith('win'), + reason="Windows doesn't detect symlink loops" ) +async def test_recursive_symlink(file_contents_manager_class, tmp_path): + td = str(tmp_path) + + cm = file_contents_manager_class(root_dir=td) + path = 'test recursive symlink' + _make_dir(cm, path) + + file_model = await ensure_async(cm.new_untitled(path=path, ext='.txt')) + + # create recursive symlink + symlink(cm, '%s/%s' % (path, "recursive"), '%s/%s' % (path, "recursive")) + model = await ensure_async(cm.get(path)) + + contents = { + content['name']: content for content in model['content'] + } + assert 'untitled.txt' in contents + assert contents['untitled.txt'] == file_model + # recursive symlinks should not be shown in the contents manager + assert 'recursive' not in contents + + async def test_good_symlink(file_contents_manager_class, tmp_path): td = str(tmp_path) cm = file_contents_manager_class(root_dir=td) @@ -213,6 +233,7 @@ async def test_403(file_contents_manager_class, tmp_path): except HTTPError as e: assert e.status_code == 403 + async def test_escape_root(file_contents_manager_class, tmp_path): td = str(tmp_path) cm = file_contents_manager_class(root_dir=td)