Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ NEW: Allow parsing ZipPath -> putfile #13

Merged
merged 1 commit into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions archive_path/zip_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
)

NOTSET = ()
NotSetType = Any # once python 3.7 dropped: Literal[NOTSET]


class ZipPath:
Expand Down Expand Up @@ -271,7 +272,12 @@ def __truediv__(self, path: str) -> "ZipPath":

@contextmanager # noqa: A003
def open( # noqa: A003
self, mode: str = "rb", *, compression=NOTSET, level=NOTSET, comment=NOTSET
self,
mode: str = "rb",
*,
compression: Union[NotSetType, int] = NOTSET,
level: Union[NotSetType, int] = NOTSET,
comment: Union[NotSetType, bytes] = NOTSET,
):
"""Open the file pointed by this path and return a file object.

Expand Down Expand Up @@ -300,6 +306,10 @@ def open( # noqa: A003
zinfo.comment = comment
elif mode == "rb":
zinfo = self.at
try:
self._zipfile.getinfo(self.at)
except KeyError:
raise FileNotFoundError(self.at)
else:
raise ValueError('open() requires mode "rb" or "wb"')
with self.root.open(zinfo, mode=mode[0]) as handle:
Expand Down Expand Up @@ -401,11 +411,40 @@ def glob(self, pattern: str, include_virtual: bool = True):

# shutil like interface

def putfile(self, path: Union[str, Path]):
"""Copy a file's bytes to this path in the zip file."""
def _putpath(self, path: "ZipPath") -> None:
"""Copy a file's bytes from another open `ZipPath`.

This method propagates compression type/level and comments.
"""
if "r" in self.root.mode: # type: ignore
raise IOError("Cannot write a file in read ('r') mode")
try:
info = path._zipfile.getinfo(path.at)
except KeyError:
raise FileNotFoundError(f"Source file not found: {path}")
if info.is_dir():
raise IsADirectoryError(f"Source is not a file: {path}")
with path.open("rb") as handle:
with self.open(
"wb",
compression=info.compress_type,
level=info._compresslevel, # type: ignore[attr-defined]
comment=info.comment,
) as new_handle:
shutil.copyfileobj(handle, new_handle)

def putfile(self, path: Union[str, Path, "ZipPath"]) -> None:
"""Copy a file's bytes to this path in the zip file.

If a `ZipPath`, then compression type+level and comments are propagated.
"""
if "r" in self.root.mode: # type: ignore
raise IOError("Cannot write a file in read ('r') mode")

if isinstance(path, ZipPath):
self._putpath(path)
return

path = cast(Path, Path(path))
if not path.exists():
raise FileNotFoundError(f"Source file not found: {path}")
Expand Down
18 changes: 18 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,21 @@ def test_zip_mkdir(tmp_path):
folder = tmp_path / "extracted" / "folder"
assert folder.exists()
assert folder.is_dir()


def test_putfile_zip(tmp_path):
"""Test copying a file from one zip file to another."""
with ZipPath(tmp_path / "test.zip", mode="w") as path:
with path.joinpath("name").open(
"wb", compression=zipfile.ZIP_DEFLATED, level=3, comment=b"comment"
) as handle:
handle.write(b"test")
with ZipPath(tmp_path / "test.zip", mode="r") as path:
with ZipPath(
tmp_path / "test2.zip", mode="w", compression=zipfile.ZIP_STORED
) as new_path:
new_path.joinpath("name").putfile(path.joinpath("name"))
with zipfile.ZipFile(tmp_path / "test.zip", mode="r") as handle:
info = handle.getinfo("name")
assert info.compress_type == zipfile.ZIP_DEFLATED
assert info.comment == b"comment"