From de742c3e0016a541b0bb40765055df4dcecb18b5 Mon Sep 17 00:00:00 2001 From: Chris Sewell Date: Tue, 23 Nov 2021 16:29:25 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20NEW:=20Allow=20ZipPath=20->=20putfi?= =?UTF-8?q?le?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Propagates compression type+level and comment --- archive_path/zip_path.py | 45 +++++++++++++++++++++++++++++++++++++--- tests/test_basic.py | 18 ++++++++++++++++ 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/archive_path/zip_path.py b/archive_path/zip_path.py index 86a7bdd..c95cc32 100644 --- a/archive_path/zip_path.py +++ b/archive_path/zip_path.py @@ -54,6 +54,7 @@ ) NOTSET = () +NotSetType = Any # once python 3.7 dropped: Literal[NOTSET] class ZipPath: @@ -271,7 +272,12 @@ def __truediv__(self, path: str) -> "ZipPath": @contextmanager # noqa: A003 def open( # noqa: A003 - self, mode: str = "rb", *, compression=NOTSET, level=NOTSET, comment=NOTSET + self, + mode: str = "rb", + *, + compression: Union[NotSetType, int] = NOTSET, + level: Union[NotSetType, int] = NOTSET, + comment: Union[NotSetType, bytes] = NOTSET, ): """Open the file pointed by this path and return a file object. @@ -300,6 +306,10 @@ def open( # noqa: A003 zinfo.comment = comment elif mode == "rb": zinfo = self.at + try: + self._zipfile.getinfo(self.at) + except KeyError: + raise FileNotFoundError(self.at) else: raise ValueError('open() requires mode "rb" or "wb"') with self.root.open(zinfo, mode=mode[0]) as handle: @@ -401,11 +411,40 @@ def glob(self, pattern: str, include_virtual: bool = True): # shutil like interface - def putfile(self, path: Union[str, Path]): - """Copy a file's bytes to this path in the zip file.""" + def _putpath(self, path: "ZipPath") -> None: + """Copy a file's bytes from another open `ZipPath`. + + This method propagates compression type/level and comments. + """ + if "r" in self.root.mode: # type: ignore + raise IOError("Cannot write a file in read ('r') mode") + try: + info = path._zipfile.getinfo(path.at) + except KeyError: + raise FileNotFoundError(f"Source file not found: {path}") + if info.is_dir(): + raise IsADirectoryError(f"Source is not a file: {path}") + with path.open("rb") as handle: + with self.open( + "wb", + compression=info.compress_type, + level=info._compresslevel, # type: ignore[attr-defined] + comment=info.comment, + ) as new_handle: + shutil.copyfileobj(handle, new_handle) + + def putfile(self, path: Union[str, Path, "ZipPath"]) -> None: + """Copy a file's bytes to this path in the zip file. + + If a `ZipPath`, then compression type+level and comments are propagated. + """ if "r" in self.root.mode: # type: ignore raise IOError("Cannot write a file in read ('r') mode") + if isinstance(path, ZipPath): + self._putpath(path) + return + path = cast(Path, Path(path)) if not path.exists(): raise FileNotFoundError(f"Source file not found: {path}") diff --git a/tests/test_basic.py b/tests/test_basic.py index e427318..2c56a4b 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -247,3 +247,21 @@ def test_zip_mkdir(tmp_path): folder = tmp_path / "extracted" / "folder" assert folder.exists() assert folder.is_dir() + + +def test_putfile_zip(tmp_path): + """Test copying a file from one zip file to another.""" + with ZipPath(tmp_path / "test.zip", mode="w") as path: + with path.joinpath("name").open( + "wb", compression=zipfile.ZIP_DEFLATED, level=3, comment=b"comment" + ) as handle: + handle.write(b"test") + with ZipPath(tmp_path / "test.zip", mode="r") as path: + with ZipPath( + tmp_path / "test2.zip", mode="w", compression=zipfile.ZIP_STORED + ) as new_path: + new_path.joinpath("name").putfile(path.joinpath("name")) + with zipfile.ZipFile(tmp_path / "test.zip", mode="r") as handle: + info = handle.getinfo("name") + assert info.compress_type == zipfile.ZIP_DEFLATED + assert info.comment == b"comment"