Skip to content

Commit

Permalink
Add tarfile.TarPath
Browse files Browse the repository at this point in the history
  • Loading branch information
barneygale committed Jul 3, 2023
1 parent 596016f commit 1a6122b
Showing 1 changed file with 228 additions and 1 deletion.
229 changes: 228 additions & 1 deletion Lib/tarfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@
# Imports
#---------
from builtins import open as bltn_open
from collections import namedtuple
import errno
import sys
import os
import io
import pathlib
import posixpath
import shutil
import stat
import time
Expand Down Expand Up @@ -69,7 +73,7 @@
"DEFAULT_FORMAT", "open","fully_trusted_filter", "data_filter",
"tar_filter", "FilterError", "AbsoluteLinkError",
"OutsideDestinationError", "SpecialFileError", "AbsolutePathError",
"LinkOutsideDestinationError"]
"LinkOutsideDestinationError", "TarPath"]


#---------------------------------------------------------
Expand Down Expand Up @@ -2772,6 +2776,229 @@ def __exit__(self, type, value, traceback):
self.fileobj.close()
self.closed = True


_tar_stat_fields = ('st_mode st_ino st_dev st_nlink st_uid st_gid '
'st_size st_atime st_mtime st_ctime st_uname st_gname')


class _TarStatResult(namedtuple('_TarStatResult', _tar_stat_fields)):
"""Tar-specific version of os.stat_result. Returned by TarPath.stat()."""
__slots__ = ()

@classmethod
def from_tarinfo(cls, tarfile, tarinfo):
"""Create a _TarStatResult from TarFile and TarInfo objects."""
if tarinfo.type in REGULAR_TYPES:
st_mode = stat.S_IFREG
elif tarinfo.type == DIRTYPE:
st_mode = stat.S_IFDIR
elif tarinfo.type == SYMTYPE or tarinfo.type == LNKTYPE:
st_mode = stat.S_IFLNK
elif tarinfo.type == FIFOTYPE:
st_mode = stat.S_IFIFO
elif tarinfo.type == CHRTYPE:
st_mode = stat.S_IFCHR
elif tarinfo.type == BLKTYPE:
st_mode = stat.S_IFBLK
else:
raise ValueError(tarinfo.type)
return cls(st_mode=tarinfo.mode | st_mode,
st_ino=tarinfo.offset_data,
st_dev=id(tarfile),
st_nlink=0,
st_uid=tarinfo.uid,
st_gid=tarinfo.gid,
st_size=tarinfo.size,
st_atime=0,
st_mtime=tarinfo.mtime,
st_ctime=0,
st_uname=tarinfo.uname,
st_gname=tarinfo.gname)

@classmethod
def implied_directory(cls, tarfile, path):
"""Create a _TarStatResult for a directory that is implied to exist
by another archive member's path.
"""
return cls(stat.S_IFDIR, hash(path), id(tarfile), 0, 0, 0, 0, 0, 0, 0, None, None)


class _TarPathWriter(io.BytesIO):
"""File object that flushes its contents to a tar archive on close.
Returned by TarPath.open(mode="w").
"""

def __init__(self, tarfile, path):
super().__init__()
self.tarfile = tarfile
self.path = path

def close(self):
info = TarInfo(self.path)
info.size = self.tell()
self.seek(0)
self.tarfile.addfile(info, self)
super().close()


class TarPath(pathlib._VirtualPath):
"""A pathlib-compatible interface for tar files."""

__slots__ = ('tarfile',)
_flavour = posixpath

def __init__(self, *pathsegments, tarfile):
super().__init__(*pathsegments)
self.tarfile = tarfile

def __repr__(self):
return f"{type(self).__name__}({str(self)!r}, tarfile={self.tarfile!r})"

def __hash__(self):
return hash((id(self.tarfile), str(self)))

def __eq__(self, other):
if not isinstance(other, TarPath):
return NotImplemented
elif other.tarfile is not self.tarfile:
return False
return super().__eq__(other)

def __lt__(self, other):
if not isinstance(other, TarPath) or other.tarfile is not self.tarfile:
return NotImplemented
return super().__lt__(other)

def __le__(self, other):
if not isinstance(other, TarPath) or other.tarfile is not self.tarfile:
return NotImplemented
return super().__le__(other)

def __gt__(self, other):
if not isinstance(other, TarPath) or other.tarfile is not self.tarfile:
return NotImplemented
return super().__gt__(other)

def __ge__(self, other):
if not isinstance(other, TarPath) or other.tarfile is not self.tarfile:
return NotImplemented
return super().__ge__(other)

def with_segments(self, *pathsegments):
"""Construct a new TarPath object with the same underlying TarFile
object from any number of path-like objects.
"""
return type(self)(*pathsegments, tarfile=self.tarfile)

def stat(self, *, follow_symlinks=True):
"""Return the path's status, similar to os.stat()."""
if follow_symlinks:
resolved = self.resolve()
else:
resolved = self.parent.resolve() / self.name
implied_directory = False
for info in reversed(self.tarfile.getmembers()):
path = self.with_segments(info.name)
if path == resolved:
return _TarStatResult.from_tarinfo(self.tarfile, info)
elif resolved in path.parents:
implied_directory = True
if implied_directory:
return _TarStatResult.implied_directory(self.tarfile, str(resolved))
else:
raise FileNotFoundError(errno.ENOENT, "Not found", str(self))

def owner(self):
"""Return the user name of the path owner."""
name = self.stat().st_uname
if name is not None:
return name
raise pathlib.UnsupportedOperation()

def group(self):
"""Return the group name of the path owner."""
name = self.stat().st_gname
if name is not None:
return name
raise pathlib.UnsupportedOperation()

def open(self, mode='r', buffering=-1, encoding=None, errors=None, newline=None):
"""Open the archive member pointed by this path and return a file
object, similar to the built-in open() function.
"""
if buffering != -1:
return super().open(mode, buffering, encoding, errors, newline)
action = ''.join(c for c in mode if c not in 'btU')
if action == 'r':
fileobj = self.tarfile.extractfile(str(self.resolve()))
elif action == 'w':
fileobj = _TarPathWriter(self.tarfile, str(self.resolve()))
else:
raise pathlib.UnsupportedOperation()
if 'b' not in mode:
fileobj = io.TextIOWrapper(fileobj, encoding, errors, newline)
return fileobj

def iterdir(self):
"""Yield path objects of the directory contents. The children are
yielded in arbitrary order.
"""
resolved = self.resolve()
seen = set()
for info in self.tarfile.getmembers():
path = self.with_segments(info.name)
if path == resolved:
if info.type != DIRTYPE:
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", str(self))
while True:
parent = path.parent
if parent == path:
break
elif parent == resolved:
path_str = str(path)
if path_str not in seen:
seen.add(path_str)
yield self / path.name
break
path = parent
if not seen:
raise FileNotFoundError(errno.ENOENT, "File not found", str(self))

def readlink(self):
"""Return the path to which the symbolic link points."""
for info in reversed(self.tarfile.getmembers()):
path = self.with_segments(info.name)
if path == self:
if info.issym():
return self.with_segments(info.linkname)
else:
raise OSError(errno.EINVAL, "Not a symlink", str(self))
elif self in path.parents:
raise OSError(errno.EINVAL, "Not a symlink", str(self))
raise FileNotFoundError(errno.ENOENT, "File not found", str(self))

def mkdir(self, mode=0o777, parents=False, exist_ok=False):
"""Create a new directory at this given path."""
info = TarInfo(str(self))
info.type = DIRTYPE
info.mode = mode
self.tarfile.addfile(info)

def symlink_to(self, target, target_is_directory=False):
"""Make this path a symlink pointing to the target path."""
info = TarInfo(str(self))
info.type = SYMTYPE
info.linkname = str(self.with_segments(target))
self.tarfile.addfile(info)

def hardlink_to(self, target):
"""Make this path a hard link pointing to the target path."""
info = TarInfo(str(self))
info.type = LNKTYPE
info.linkname = str(self.with_segments(target))
self.tarfile.addfile(info)


#--------------------
# exported functions
#--------------------
Expand Down

0 comments on commit 1a6122b

Please sign in to comment.