diff --git a/src/zimscraperlib/zim/creator.py b/src/zimscraperlib/zim/creator.py index 9e451877..e35db346 100644 --- a/src/zimscraperlib/zim/creator.py +++ b/src/zimscraperlib/zim/creator.py @@ -19,7 +19,9 @@ - can be used to store a filepath and content read from it (not stored) """ import datetime +import hashlib import pathlib +import re import weakref from typing import Any, Callable, Dict, Optional, Tuple, Union @@ -104,6 +106,10 @@ def __init__( self.workaround_nocancel = workaround_nocancel + self.autodedup_filters = [] + + self.dedup_items = dict() + def start(self): super().__enter__() @@ -119,6 +125,35 @@ def update_metadata(self, **kwargs): for name, value in kwargs.items(): self.add_metadata(name, value) + def add_autodedup_filter(self, filter_regex: str): + self.autodedup_filters.append(re.compile(filter_regex)) + + def check_for_duplicate( + self, + path: str, + fpath: Optional[pathlib.Path] = None, + content: Optional[bytes] = None, + ): + for dedup_filter in self.autodedup_filters: + if dedup_filter.match(path): + if content: + digest = hashlib.sha256(content).digest() + else: + sha256 = hashlib.sha256() + with open(fpath, "rb") as f: + while True: + data = f.read(65536) # lets read stuff in 64kb chunks! + if not data: + break + sha256.update(data) + digest = sha256.digest() + + if digest in self.dedup_items: + return self.dedup_items[digest] + self.dedup_items[digest] = path + break + return None + def add_item_for( self, path: str, @@ -151,6 +186,18 @@ def add_item_for( if fpath is None and content is None: raise ValueError("One of fpath or content is required") + duplicate_path = self.check_for_duplicate( + path=path, fpath=fpath, content=content + ) + if duplicate_path: + self.add_redirect( + path=path, + target_path=duplicate_path, + title=title, + is_front=is_front, + ) + return path + mimetype = mimetype_for( path=path, content=content, fpath=fpath, mimetype=mimetype ) diff --git a/tests/zim/test_zim_creator.py b/tests/zim/test_zim_creator.py index 77e12f55..1b8e55b1 100644 --- a/tests/zim/test_zim_creator.py +++ b/tests/zim/test_zim_creator.py @@ -126,6 +126,41 @@ def test_noindexlanguage(tmp_path): assert not reader.has_fulltext_index +def test_duplicatefiles(tmp_path, png_image, html_file): + fpath = tmp_path / "test.zim" + + with open(png_image, "rb") as fh: + png_data = fh.read() + + with Creator(fpath, "welcome", "") as creator: + creator.add_autodedup_filter(r"^images/.*$") + # add a file not matching filter patterns + creator.add_item_for("other_folder1/yahoo0.png", "Image1", fpath=png_image) + # add same file but first matching filter patterns => will be added as-is + creator.add_item_for("images/yahoo1.png", "Image1", fpath=png_image) + # add same file but second matching filter patterns + # => will be replaced by a redirect + creator.add_item_for("images/yahoo2.png", "Image2", fpath=png_image) + # add same file but not matching filter patterns => will be added as-is + creator.add_item_for("other_folder2/yahoo3.png", "Image1", fpath=png_image) + # add same file matching filter patterns but with content instead of fpath + # => will be replaced by a redirect + creator.add_item_for("images/yahoo4.png", "Image3", content=png_data) + + reader = Archive(fpath) + # make sure we have our image + assert reader.get_item("images/yahoo1.png") + assert not reader.get_entry_by_path("images/yahoo1.png").is_redirect + assert reader.get_item("images/yahoo2.png") + assert reader.get_entry_by_path("images/yahoo2.png").is_redirect + assert reader.get_item("images/yahoo4.png") + assert reader.get_entry_by_path("images/yahoo4.png").is_redirect + assert reader.get_item("other_folder1/yahoo0.png") + assert not reader.get_entry_by_path("other_folder1/yahoo0.png").is_redirect + assert reader.get_item("other_folder2/yahoo3.png") + assert not reader.get_entry_by_path("other_folder2/yahoo3.png").is_redirect + + def test_add_item_for(tmp_path): fpath = tmp_path / "test.zim" # test without mimetype