From 1a6122bc0ee919fd468cc2460db4da3a639375d4 Mon Sep 17 00:00:00 2001 From: barneygale Date: Mon, 3 Jul 2023 19:52:31 +0100 Subject: [PATCH] Add `tarfile.TarPath` --- Lib/tarfile.py | 229 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 228 insertions(+), 1 deletion(-) diff --git a/Lib/tarfile.py b/Lib/tarfile.py index df4e41f7a0d23a..a8ca2264040063 100755 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -37,9 +37,13 @@ # Imports #--------- from builtins import open as bltn_open +from collections import namedtuple +import errno import sys import os import io +import pathlib +import posixpath import shutil import stat import time @@ -69,7 +73,7 @@ "DEFAULT_FORMAT", "open","fully_trusted_filter", "data_filter", "tar_filter", "FilterError", "AbsoluteLinkError", "OutsideDestinationError", "SpecialFileError", "AbsolutePathError", - "LinkOutsideDestinationError"] + "LinkOutsideDestinationError", "TarPath"] #--------------------------------------------------------- @@ -2772,6 +2776,229 @@ def __exit__(self, type, value, traceback): self.fileobj.close() self.closed = True + +_tar_stat_fields = ('st_mode st_ino st_dev st_nlink st_uid st_gid ' + 'st_size st_atime st_mtime st_ctime st_uname st_gname') + + +class _TarStatResult(namedtuple('_TarStatResult', _tar_stat_fields)): + """Tar-specific version of os.stat_result. Returned by TarPath.stat().""" + __slots__ = () + + @classmethod + def from_tarinfo(cls, tarfile, tarinfo): + """Create a _TarStatResult from TarFile and TarInfo objects.""" + if tarinfo.type in REGULAR_TYPES: + st_mode = stat.S_IFREG + elif tarinfo.type == DIRTYPE: + st_mode = stat.S_IFDIR + elif tarinfo.type == SYMTYPE or tarinfo.type == LNKTYPE: + st_mode = stat.S_IFLNK + elif tarinfo.type == FIFOTYPE: + st_mode = stat.S_IFIFO + elif tarinfo.type == CHRTYPE: + st_mode = stat.S_IFCHR + elif tarinfo.type == BLKTYPE: + st_mode = stat.S_IFBLK + else: + raise ValueError(tarinfo.type) + return cls(st_mode=tarinfo.mode | st_mode, + st_ino=tarinfo.offset_data, + st_dev=id(tarfile), + st_nlink=0, + st_uid=tarinfo.uid, + st_gid=tarinfo.gid, + st_size=tarinfo.size, + st_atime=0, + st_mtime=tarinfo.mtime, + st_ctime=0, + st_uname=tarinfo.uname, + st_gname=tarinfo.gname) + + @classmethod + def implied_directory(cls, tarfile, path): + """Create a _TarStatResult for a directory that is implied to exist + by another archive member's path. + """ + return cls(stat.S_IFDIR, hash(path), id(tarfile), 0, 0, 0, 0, 0, 0, 0, None, None) + + +class _TarPathWriter(io.BytesIO): + """File object that flushes its contents to a tar archive on close. + Returned by TarPath.open(mode="w"). + """ + + def __init__(self, tarfile, path): + super().__init__() + self.tarfile = tarfile + self.path = path + + def close(self): + info = TarInfo(self.path) + info.size = self.tell() + self.seek(0) + self.tarfile.addfile(info, self) + super().close() + + +class TarPath(pathlib._VirtualPath): + """A pathlib-compatible interface for tar files.""" + + __slots__ = ('tarfile',) + _flavour = posixpath + + def __init__(self, *pathsegments, tarfile): + super().__init__(*pathsegments) + self.tarfile = tarfile + + def __repr__(self): + return f"{type(self).__name__}({str(self)!r}, tarfile={self.tarfile!r})" + + def __hash__(self): + return hash((id(self.tarfile), str(self))) + + def __eq__(self, other): + if not isinstance(other, TarPath): + return NotImplemented + elif other.tarfile is not self.tarfile: + return False + return super().__eq__(other) + + def __lt__(self, other): + if not isinstance(other, TarPath) or other.tarfile is not self.tarfile: + return NotImplemented + return super().__lt__(other) + + def __le__(self, other): + if not isinstance(other, TarPath) or other.tarfile is not self.tarfile: + return NotImplemented + return super().__le__(other) + + def __gt__(self, other): + if not isinstance(other, TarPath) or other.tarfile is not self.tarfile: + return NotImplemented + return super().__gt__(other) + + def __ge__(self, other): + if not isinstance(other, TarPath) or other.tarfile is not self.tarfile: + return NotImplemented + return super().__ge__(other) + + def with_segments(self, *pathsegments): + """Construct a new TarPath object with the same underlying TarFile + object from any number of path-like objects. + """ + return type(self)(*pathsegments, tarfile=self.tarfile) + + def stat(self, *, follow_symlinks=True): + """Return the path's status, similar to os.stat().""" + if follow_symlinks: + resolved = self.resolve() + else: + resolved = self.parent.resolve() / self.name + implied_directory = False + for info in reversed(self.tarfile.getmembers()): + path = self.with_segments(info.name) + if path == resolved: + return _TarStatResult.from_tarinfo(self.tarfile, info) + elif resolved in path.parents: + implied_directory = True + if implied_directory: + return _TarStatResult.implied_directory(self.tarfile, str(resolved)) + else: + raise FileNotFoundError(errno.ENOENT, "Not found", str(self)) + + def owner(self): + """Return the user name of the path owner.""" + name = self.stat().st_uname + if name is not None: + return name + raise pathlib.UnsupportedOperation() + + def group(self): + """Return the group name of the path owner.""" + name = self.stat().st_gname + if name is not None: + return name + raise pathlib.UnsupportedOperation() + + def open(self, mode='r', buffering=-1, encoding=None, errors=None, newline=None): + """Open the archive member pointed by this path and return a file + object, similar to the built-in open() function. + """ + if buffering != -1: + return super().open(mode, buffering, encoding, errors, newline) + action = ''.join(c for c in mode if c not in 'btU') + if action == 'r': + fileobj = self.tarfile.extractfile(str(self.resolve())) + elif action == 'w': + fileobj = _TarPathWriter(self.tarfile, str(self.resolve())) + else: + raise pathlib.UnsupportedOperation() + if 'b' not in mode: + fileobj = io.TextIOWrapper(fileobj, encoding, errors, newline) + return fileobj + + def iterdir(self): + """Yield path objects of the directory contents. The children are + yielded in arbitrary order. + """ + resolved = self.resolve() + seen = set() + for info in self.tarfile.getmembers(): + path = self.with_segments(info.name) + if path == resolved: + if info.type != DIRTYPE: + raise NotADirectoryError(errno.ENOTDIR, "Not a directory", str(self)) + while True: + parent = path.parent + if parent == path: + break + elif parent == resolved: + path_str = str(path) + if path_str not in seen: + seen.add(path_str) + yield self / path.name + break + path = parent + if not seen: + raise FileNotFoundError(errno.ENOENT, "File not found", str(self)) + + def readlink(self): + """Return the path to which the symbolic link points.""" + for info in reversed(self.tarfile.getmembers()): + path = self.with_segments(info.name) + if path == self: + if info.issym(): + return self.with_segments(info.linkname) + else: + raise OSError(errno.EINVAL, "Not a symlink", str(self)) + elif self in path.parents: + raise OSError(errno.EINVAL, "Not a symlink", str(self)) + raise FileNotFoundError(errno.ENOENT, "File not found", str(self)) + + def mkdir(self, mode=0o777, parents=False, exist_ok=False): + """Create a new directory at this given path.""" + info = TarInfo(str(self)) + info.type = DIRTYPE + info.mode = mode + self.tarfile.addfile(info) + + def symlink_to(self, target, target_is_directory=False): + """Make this path a symlink pointing to the target path.""" + info = TarInfo(str(self)) + info.type = SYMTYPE + info.linkname = str(self.with_segments(target)) + self.tarfile.addfile(info) + + def hardlink_to(self, target): + """Make this path a hard link pointing to the target path.""" + info = TarInfo(str(self)) + info.type = LNKTYPE + info.linkname = str(self.with_segments(target)) + self.tarfile.addfile(info) + + #-------------------- # exported functions #--------------------