Skip to content

Commit

Permalink
Add tests for pathlib._VirtualPath
Browse files Browse the repository at this point in the history
  • Loading branch information
barneygale committed Jul 2, 2023
1 parent 4b29e2e commit 8ce0139
Showing 1 changed file with 282 additions and 42 deletions.
324 changes: 282 additions & 42 deletions Lib/test/test_pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1566,14 +1566,165 @@ def test_group(self):


#
# Tests for the concrete classes.
# Tests for the virtual classes.
#

class PathTest(unittest.TestCase):
"""Tests for the FS-accessing functionalities of the Path classes."""
class VirtualPathTest(PurePathTest):
cls = pathlib._VirtualPath

cls = pathlib.Path
can_symlink = os_helper.can_symlink()
def test_unsupported_operation(self):
P = self.cls
p = self.cls()
e = pathlib.UnsupportedOperation
self.assertRaises(e, p.stat)
self.assertRaises(e, p.lstat)
self.assertRaises(e, p.exists)
self.assertRaises(e, p.samefile, 'foo')
self.assertRaises(e, p.is_dir)
self.assertRaises(e, p.is_file)
self.assertRaises(e, p.is_mount)
self.assertRaises(e, p.is_symlink)
self.assertRaises(e, p.is_block_device)
self.assertRaises(e, p.is_char_device)
self.assertRaises(e, p.is_fifo)
self.assertRaises(e, p.is_socket)
self.assertRaises(e, p.is_junction)
self.assertRaises(e, p.open)
self.assertRaises(e, p.read_bytes)
self.assertRaises(e, p.read_text)
self.assertRaises(e, p.write_bytes, b'foo')
self.assertRaises(e, p.write_text, 'foo')
self.assertRaises(e, p.iterdir)
self.assertRaises(e, p.glob, '*')
self.assertRaises(e, p.rglob, '*')
self.assertRaises(e, lambda: list(p.walk()))
self.assertRaises(e, p.absolute)
self.assertRaises(e, P.cwd)
self.assertRaises(e, p.expanduser)
self.assertRaises(e, p.home)
self.assertRaises(e, p.readlink)
self.assertRaises(e, p.symlink_to, 'foo')
self.assertRaises(e, p.hardlink_to, 'foo')
self.assertRaises(e, p.mkdir)
self.assertRaises(e, p.touch)
self.assertRaises(e, p.rename, 'foo')
self.assertRaises(e, p.replace, 'foo')
self.assertRaises(e, p.chmod, 0o755)
self.assertRaises(e, p.lchmod, 0o755)
self.assertRaises(e, p.unlink)
self.assertRaises(e, p.rmdir)
self.assertRaises(e, p.owner)
self.assertRaises(e, p.group)
self.assertRaises(e, p.as_uri)

def test_as_uri_common(self):
e = pathlib.UnsupportedOperation
self.assertRaises(e, self.cls().as_uri)

def test_fspath_common(self):
self.assertRaises(TypeError, os.fspath, self.cls())

def test_as_bytes_common(self):
self.assertRaises(TypeError, bytes, self.cls())


class DummyVirtualPathIO(io.BytesIO):
"""
Used by DummyVirtualPath to implement `open('w')`
"""

def __init__(self, files, path):
super().__init__()
self.files = files
self.path = path

def close(self):
self.files[self.path] = self.getvalue()
super().close()


class DummyVirtualPath(pathlib._VirtualPath):
"""
Simple implementation of VirtualPath that keeps files and directories in
memory.
"""
_files = {}
_directories = {}
_symlinks = {}

def stat(self, *, follow_symlinks=True):
if follow_symlinks:
path = str(self.resolve())
else:
path = str(self.parent.resolve() / self.name)
if path in self._files:
st_mode = stat.S_IFREG
elif path in self._directories:
st_mode = stat.S_IFDIR
elif path in self._symlinks:
st_mode = stat.S_IFLNK
else:
raise FileNotFoundError(errno.ENOENT, "Not found", str(self))
return os.stat_result((st_mode, hash(str(self)), 0, 0, 0, 0, 0, 0, 0, 0))

def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
if buffering != -1:
raise NotImplementedError
path_obj = self.resolve()
path = str(path_obj)
name = path_obj.name
parent = str(path_obj.parent)
if path in self._directories:
raise IsADirectoryError(errno.EISDIR, "Is a directory", path)

text = 'b' not in mode
mode = ''.join(c for c in mode if c not in 'btU')
if mode == 'r':
if path not in self._files:
raise FileNotFoundError(errno.ENOENT, "File not found", path)
stream = io.BytesIO(self._files[path])
elif mode == 'w':
if parent not in self._directories:
raise FileNotFoundError(errno.ENOENT, "File not found", parent)
stream = DummyVirtualPathIO(self._files, path)
self._files[path] = b''
self._directories[parent].add(name)
else:
raise NotImplementedError
if text:
stream = io.TextIOWrapper(stream, encoding=encoding, errors=errors, newline=newline)
return stream

def iterdir(self):
path = str(self.resolve())
if path in self._files:
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", path)
elif path in self._directories:
for name in self._directories[path]:
yield self / name
else:
raise FileNotFoundError(errno.ENOENT, "File not found", path)

def mkdir(self, mode=0o777, parents=False, exist_ok=False):
try:
self._directories[str(self.parent)].add(self.name)
self._directories[str(self)] = set()
except KeyError:
if not parents or self.parent == self:
raise FileNotFoundError(errno.ENOENT, "File not found", str(self.parent)) from None
self.parent.mkdir(parents=True, exist_ok=True)
self.mkdir(mode, parents=False, exist_ok=exist_ok)
except FileExistsError:
if not exist_ok:
raise


class DummyVirtualPathTest(unittest.TestCase):
"""Tests for VirtualPath methods that use stat(), open() and iterdir()."""

cls = DummyVirtualPath
can_symlink = False

# (BASE)
# |
Expand All @@ -1596,37 +1747,37 @@ class PathTest(unittest.TestCase):
#

def setUp(self):
def cleanup():
os.chmod(join('dirE'), 0o777)
os_helper.rmtree(BASE)
self.addCleanup(cleanup)
os.mkdir(BASE)
os.mkdir(join('dirA'))
os.mkdir(join('dirB'))
os.mkdir(join('dirC'))
os.mkdir(join('dirC', 'dirD'))
os.mkdir(join('dirE'))
with open(join('fileA'), 'wb') as f:
f.write(b"this is file A\n")
with open(join('dirB', 'fileB'), 'wb') as f:
f.write(b"this is file B\n")
with open(join('dirC', 'fileC'), 'wb') as f:
f.write(b"this is file C\n")
with open(join('dirC', 'novel.txt'), 'wb') as f:
f.write(b"this is a novel\n")
with open(join('dirC', 'dirD', 'fileD'), 'wb') as f:
f.write(b"this is file D\n")
os.chmod(join('dirE'), 0)
if self.can_symlink:
# Relative symlinks.
os.symlink('fileA', join('linkA'))
os.symlink('non-existing', join('brokenLink'))
os.symlink('dirB', join('linkB'), target_is_directory=True)
os.symlink(os.path.join('..', 'dirB'), join('dirA', 'linkC'), target_is_directory=True)
# This one goes upwards, creating a loop.
os.symlink(os.path.join('..', 'dirB'), join('dirB', 'linkD'), target_is_directory=True)
# Broken symlink (pointing to itself).
os.symlink('brokenLinkLoop', join('brokenLinkLoop'))
# note: this must be kept in sync with `PathTest.setUp()`
cls = self.cls
cls._files.clear()
cls._directories.clear()
cls._symlinks.clear()
cls._files.update({
f'{BASE}/fileA': b'this is file A\n',
f'{BASE}/dirB/fileB': b'this is file B\n',
f'{BASE}/dirC/fileC': b'this is file C\n',
f'{BASE}/dirC/dirD/fileD': b'this is file D\n',
f'{BASE}/dirC/novel.txt': b'this is a novel\n',
})
cls._directories.update({
BASE: {'dirA', 'dirB', 'dirC', 'dirE', 'fileA', },
f'{BASE}/dirA': set(),
f'{BASE}/dirB': {'fileB'},
f'{BASE}/dirC': {'dirD', 'fileC', 'novel.txt'},
f'{BASE}/dirC/dirD': {'fileD'},
f'{BASE}/dirE': {},
})
dirname = BASE
while True:
dirname, basename = os.path.split(dirname)
if not basename:
break
cls._directories[dirname] = {basename}

def tempdir(self):
path = self.cls(BASE).with_name('tmp-dirD')
path.mkdir()
return path

def assertFileNotFound(self, func, *args, **kwargs):
with self.assertRaises(FileNotFoundError) as cm:
Expand Down Expand Up @@ -1975,9 +2126,11 @@ def test_rglob_symlink_loop(self):
def test_glob_many_open_files(self):
depth = 30
P = self.cls
base = P(BASE) / 'deep'
p = P(base, *(['d']*depth))
p.mkdir(parents=True)
p = base = P(BASE) / 'deep'
p.mkdir()
for _ in range(depth):
p /= 'd'
p.mkdir()
pattern = '/'.join(['*'] * depth)
iters = [base.glob(pattern) for j in range(100)]
for it in iters:
Expand Down Expand Up @@ -2109,9 +2262,7 @@ def test_resolve_common(self):
# resolves to 'dirB/..' first before resolving to parent of dirB.
self._check_resolve_relative(p, P(BASE, 'foo', 'in', 'spam'), False)
# Now create absolute symlinks.
d = os_helper._longpath(tempfile.mkdtemp(suffix='-dirD',
dir=os.getcwd()))
self.addCleanup(os_helper.rmtree, d)
d = self.tempdir()
P(BASE, 'dirA', 'linkX').symlink_to(d)
P(BASE, str(d), 'linkY').symlink_to(join('dirB'))
p = P(BASE, 'dirA', 'linkX', 'linkY', 'fileB')
Expand Down Expand Up @@ -2353,6 +2504,10 @@ def _check_complex_symlinks(self, link0_target):
self.assertEqualNormCase(str(p), BASE)

# Resolve relative paths.
try:
self.cls().absolute()
except pathlib.UnsupportedOperation:
return
old_path = os.getcwd()
os.chdir(BASE)
try:
Expand Down Expand Up @@ -2380,6 +2535,91 @@ def test_complex_symlinks_relative(self):
def test_complex_symlinks_relative_dot_dot(self):
self._check_complex_symlinks(os.path.join('dirA', '..'))


class DummyVirtualPathWithSymlinks(DummyVirtualPath):
def readlink(self):
path = str(self)
if path in self._symlinks:
return self.with_segments(self._symlinks[path])
elif path in self._files or path in self._directories:
raise OSError(errno.EINVAL, "Not a symlink", path)
else:
raise FileNotFoundError(errno.ENOENT, "File not found", path)

def symlink_to(self, target, target_is_directory=False):
self._directories[str(self.parent)].add(self.name)
self._symlinks[str(self)] = str(target)


class DummyVirtualPathWithSymlinksTest(DummyVirtualPathTest):
cls = DummyVirtualPathWithSymlinks
can_symlink = True

def setUp(self):
super().setUp()
cls = self.cls
cls._symlinks.update({
f'{BASE}/linkA': 'fileA',
f'{BASE}/linkB': 'dirB',
f'{BASE}/dirA/linkC': '../dirB',
f'{BASE}/dirB/linkD': '../dirB',
f'{BASE}/brokenLink': 'non-existing',
f'{BASE}/brokenLinkLoop': 'brokenLinkLoop',
})
cls._directories[BASE].update({'linkA', 'linkB', 'brokenLink', 'brokenLinkLoop'})
cls._directories[f'{BASE}/dirA'].add('linkC')
cls._directories[f'{BASE}/dirB'].add('linkD')


#
# Tests for the concrete classes.
#

class PathTest(DummyVirtualPathTest):
"""Tests for the FS-accessing functionalities of the Path classes."""
cls = pathlib.Path
can_symlink = os_helper.can_symlink()

def setUp(self):
# note: this must be kept in sync with `DummyVirtualPathTest.setUp()`
def cleanup():
os.chmod(join('dirE'), 0o777)
os_helper.rmtree(BASE)
self.addCleanup(cleanup)
os.mkdir(BASE)
os.mkdir(join('dirA'))
os.mkdir(join('dirB'))
os.mkdir(join('dirC'))
os.mkdir(join('dirC', 'dirD'))
os.mkdir(join('dirE'))
with open(join('fileA'), 'wb') as f:
f.write(b"this is file A\n")
with open(join('dirB', 'fileB'), 'wb') as f:
f.write(b"this is file B\n")
with open(join('dirC', 'fileC'), 'wb') as f:
f.write(b"this is file C\n")
with open(join('dirC', 'novel.txt'), 'wb') as f:
f.write(b"this is a novel\n")
with open(join('dirC', 'dirD', 'fileD'), 'wb') as f:
f.write(b"this is file D\n")
os.chmod(join('dirE'), 0)
if self.can_symlink:
# Relative symlinks.
os.symlink('fileA', join('linkA'))
os.symlink('non-existing', join('brokenLink'))
os.symlink('dirB', join('linkB'), target_is_directory=True)
os.symlink(os.path.join('..', 'dirB'), join('dirA', 'linkC'), target_is_directory=True)
# This one goes upwards, creating a loop.
os.symlink(os.path.join('..', 'dirB'), join('dirB', 'linkD'), target_is_directory=True)
# Broken symlink (pointing to itself).
os.symlink('brokenLinkLoop', join('brokenLinkLoop'))

def tempdir(self):
d = os_helper._longpath(tempfile.mkdtemp(suffix='-dirD',
dir=os.getcwd()))
self.addCleanup(os_helper.rmtree, d)
return d

def test_concrete_class(self):
if self.cls is pathlib.Path:
expected = pathlib.WindowsPath if os.name == 'nt' else pathlib.PosixPath
Expand Down

0 comments on commit 8ce0139

Please sign in to comment.