Skip to content

Commit

Permalink
✨ NEW: Add extract_tree method (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisjsewell authored Nov 9, 2020
1 parent 0832fd7 commit f33d5a9
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 2 deletions.
1 change: 0 additions & 1 deletion archive_path/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,5 @@ def match_glob(base: str, pattern: str, iterator: Iterable[str]) -> Iterable[str
continue
if name_parts[:at_parts_len] != at_parts:
continue
print(name, name_parts, at_parts_len, match)
if fnmatch(name_parts[-1], match):
yield name
44 changes: 44 additions & 0 deletions archive_path/tar_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,50 @@ def puttree(
)
self._tarfile.add(str(subpath), tarpath, recursive=False)

def extract_tree(
self,
outpath: Union[str, Path],
*,
pattern: str = "**/*",
allow_dev: bool = False,
allow_symlink: bool = False,
callback: Optional[Callable[[str, Any], None]] = None,
cb_descript: str = "Extracting objects",
):
"""Extract the archive path (and recursive children) to an external path.
:param outpath: The path to output to
:param pattern: the glob pattern for selecting children to extract
:param allow_dev: output block devices
:param allow_symlink: output symlinks
:param callback: a callback to report on the process, ``callback(action, value)``,
with the following callback signatures:
- ``callback('init', {'total': <int>, 'description': <str>})``,
to signal the start of a process, its total iterations and description
- ``callback('update', <int>)``,
to signal an update to the process and the number of iterations to progress
:param cb_descript: the description to return in the callback
"""
if callback is None:
callback = lambda action, value: None # noqa: E731
else:
callback("init", {"total": 1, "description": "Counting objects to extract"})
count = sum(1 for _ in self.glob(pattern, include_virtual=False))
callback("init", {"total": count, "description": cb_descript})

for path in self.glob(pattern, include_virtual=False):
callback("update", 1)
info = self._tarfile.getmember(path.at)
if (not allow_dev) and info.isdev():
continue
if (not allow_symlink) and (info.islnk() or info.issym()):
continue
self._tarfile.extract(path=outpath, member=info)


def read_file_in_tar(
filepath: str, path: str, encoding: Optional[str] = "utf8", mode="r:*"
Expand Down
41 changes: 41 additions & 0 deletions archive_path/zip_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,47 @@ def puttree(
)
self._zipfile.write(subpath, zippath)

def extract_tree(
self,
outpath: Union[str, Path],
*,
pattern: str = "**/*",
callback: Optional[Callable[[str, Any], None]] = None,
cb_descript: str = "Extracting objects",
):
"""Extract the archive path (and recursive children) to an external path.
:param outpath: The path to output to
:param pattern: the glob pattern for selecting children to extract
:param callback: a callback to report on the process, ``callback(action, value)``,
with the following callback signatures:
- ``callback('init', {'total': <int>, 'description': <str>})``,
to signal the start of a process, its total iterations and description
- ``callback('update', <int>)``,
to signal an update to the process and the number of iterations to progress
:param cb_descript: the description to return in the callback
"""
outpath = cast(str, os.path.abspath(outpath))

if callback is None:
callback = lambda action, value: None # noqa: E731
else:
callback("init", {"total": 1, "description": "Counting objects to extract"})
count = sum(1 for _ in self.glob(pattern, include_virtual=False))
callback("init", {"total": count, "description": cb_descript})

for path in self.glob(pattern, include_virtual=False):
callback("update", 1)
try:
info = self._zipfile.getinfo(path.at)
except KeyError:
info = self._zipfile.getinfo(path.at + "/")
self._zipfile.extract(path=outpath, member=info)


class FileList(Sequence):
"""A list of ``zipfile.ZipInfo`` which mirrors the ``zipfile.ZipFile.NameToInfo`` mapping.
Expand Down
50 changes: 49 additions & 1 deletion tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
# For further information on the license, see the LICENSE file #
###########################################################################
"""Test compression utilities"""
from typing import Type, Union

import pytest

from archive_path import TarPath, ZipPath, read_file_in_tar, read_file_in_zip
Expand All @@ -19,7 +21,14 @@
],
ids=("zip", "tar.gz"),
)
def test_path(tmp_path, klass, filename, write_mode, read_mode, read_func):
def test_path(
tmp_path,
klass: Union[Type[TarPath], Type[ZipPath]],
filename,
write_mode,
read_mode,
read_func,
):
"""Test basic functionality and equivalence of ``ZipPath`` and ``TarPath``."""

# test write
Expand Down Expand Up @@ -133,3 +142,42 @@ def test_path(tmp_path, klass, filename, write_mode, read_mode, read_func):
assert (klass(tmp_path / filename) / "a") == klass(tmp_path / filename).joinpath(
"a"
)

# test extract_tree
file_read.extract_tree(tmp_path / "extract_tree_all")
assert {
p.as_posix().replace(tmp_path.as_posix(), "")
for p in (tmp_path / "extract_tree_all").glob("**/*")
} == {
"/extract_tree_all/new_file.txt",
"/extract_tree_all/other_folder",
"/extract_tree_all/other_folder/sub_file.txt",
"/extract_tree_all/folder",
"/extract_tree_all/folder/other_file.txt",
"/extract_tree_all/other_folder/nested1",
"/extract_tree_all/bytes.exe",
"/extract_tree_all/other_folder/nested1/nested2",
"/extract_tree_all/bytes2.exe",
"/extract_tree_all/other_folder/sub_folder",
}

file_read.joinpath("folder").extract_tree(tmp_path / "extract_tree_folder")
assert {
p.as_posix().replace(tmp_path.as_posix(), "")
for p in (tmp_path / "extract_tree_folder").glob("**/*")
} == {
"/extract_tree_folder/folder",
"/extract_tree_folder/folder/other_file.txt",
}

file_read.extract_tree(tmp_path / "extract_tree_txt", pattern="**/*.txt")
assert {
p.as_posix().replace(tmp_path.as_posix(), "")
for p in (tmp_path / "extract_tree_txt").glob("**/*")
} == {
"/extract_tree_txt/new_file.txt",
"/extract_tree_txt/other_folder",
"/extract_tree_txt/other_folder/sub_file.txt",
"/extract_tree_txt/folder",
"/extract_tree_txt/folder/other_file.txt",
}

0 comments on commit f33d5a9

Please sign in to comment.