diff --git a/src/poetry/publishing/uploader.py b/src/poetry/publishing/uploader.py index 03f7e325064..4a1ae3ace65 100644 --- a/src/poetry/publishing/uploader.py +++ b/src/poetry/publishing/uploader.py @@ -3,9 +3,11 @@ import hashlib import io +from contextlib import suppress from pathlib import Path from typing import TYPE_CHECKING from typing import Any +from typing import NamedTuple import requests @@ -48,6 +50,59 @@ def __init__(self, error: ConnectionError | HTTPError | str) -> None: super().__init__(message) +class Hexdigest(NamedTuple): + md5: str | None + sha256: str | None + blake2_256: str | None + + +class HashManager: + def __init__(self) -> None: + self._sha2_hasher = hashlib.sha256() + + self._md5_hasher = None + with suppress(ValueError): + # FIPS mode disables MD5 + self._md5_hasher = hashlib.md5() + + self._blake_hasher = None + with suppress(ValueError, TypeError): + # FIPS mode disables blake2 + self._blake_hasher = hashlib.blake2b(digest_size=256 // 8) + + def _md5_update(self, content: bytes) -> None: + if self._md5_hasher is not None: + self._md5_hasher.update(content) + + def _md5_hexdigest(self) -> str | None: + if self._md5_hasher is not None: + return self._md5_hasher.hexdigest() + return None + + def _blake_update(self, content: bytes) -> None: + if self._blake_hasher is not None: + self._blake_hasher.update(content) + + def _blake_hexdigest(self) -> str | None: + if self._blake_hasher is not None: + return self._blake_hasher.hexdigest() + return None + + def hash(self, file: Path) -> None: + with file.open("rb") as fp: + for content in iter(lambda: fp.read(io.DEFAULT_BUFFER_SIZE), b""): + self._md5_update(content) + self._sha2_hasher.update(content) + self._blake_update(content) + + def hexdigest(self) -> Hexdigest: + return Hexdigest( + self._md5_hexdigest(), + self._sha2_hasher.hexdigest(), + self._blake_hexdigest(), + ) + + class Uploader: def __init__(self, poetry: Poetry, io: IO, dist_dir: Path | None = None) -> None: self._poetry = poetry @@ -126,19 +181,13 @@ def post_data(self, file: Path) -> dict[str, Any]: file_type = self._get_type(file) - blake2_256_hash = hashlib.blake2b(digest_size=256 // 8) - - md5_hash = hashlib.md5() - sha256_hash = hashlib.sha256() - with file.open("rb") as fp: - for content in iter(lambda: fp.read(io.DEFAULT_BUFFER_SIZE), b""): - md5_hash.update(content) - sha256_hash.update(content) - blake2_256_hash.update(content) + hash_manager = HashManager() + hash_manager.hash(file) + file_hashes = hash_manager.hexdigest() - md5_digest = md5_hash.hexdigest() - sha2_digest = sha256_hash.hexdigest() - blake2_256_digest = blake2_256_hash.hexdigest() + md5_digest = file_hashes.md5 + sha2_digest = file_hashes.sha256 + blake2_256_digest = file_hashes.blake2_256 py_version: str | None = None if file_type == "bdist_wheel": diff --git a/tests/publishing/test_file_hashes.py b/tests/publishing/test_file_hashes.py new file mode 100644 index 00000000000..af36b8c4588 --- /dev/null +++ b/tests/publishing/test_file_hashes.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import hashlib + +from typing import TYPE_CHECKING +from typing import Any + +import pytest + +from poetry.publishing.uploader import HashManager + + +if TYPE_CHECKING: + from pathlib import Path + + from tests.types import FixtureDirGetter + + +@pytest.fixture +def distributions_dir(fixture_dir: FixtureDirGetter) -> Path: + return fixture_dir("distributions") + + +@pytest.mark.parametrize( + "file, hashes", + ( + ( + "demo-0.1.0.tar.gz", + ( + "d1912c917363a64e127318655f7d1fe7", + "9fa123ad707a5c6c944743bf3e11a0e80d86cb518d3cf25320866ca3ef43e2ad", + "cb638093d63df647e70b03e963bedc31e021cb088695e29101b69f525e3d5fef", + ), + ), + ( + "demo-0.1.2-py2.py3-none-any.whl", + ( + "53b4e10d2bfa81a4206221c4b87843d9", + "55dde4e6828081de7a1e429f33180459c333d9da593db62a3d75a8f5e505dde1", + "b35b9aab064e88fffe42309550ebe425907fb42ccb3b1d173b7d6b7509f38eac", + ), + ), + ), +) +def test_file_hashes_returns_proper_hashes_for_file( + file: str, hashes: tuple[str, ...], distributions_dir: Path +) -> None: + manager = HashManager() + manager.hash(distributions_dir / file) + file_hashes = manager.hexdigest() + assert file_hashes == hashes + + +def test_file_hashes_returns_none_for_md5_with_fips(distributions_dir: Path) -> None: + # disable md5 + def fips_md5(*args: Any, **kwargs: Any) -> Any: + raise ValueError("Disabled by FIPS") + + hashlib.md5 = fips_md5 + + manager = HashManager() + manager.hash(distributions_dir / "demo-0.1.0.tar.gz") + file_hashes = manager.hexdigest() + + assert file_hashes.md5 is None