Skip to content

Commit

Permalink
Introduce HashManager to work around FIPS
Browse files Browse the repository at this point in the history
  • Loading branch information
Secrus committed Mar 3, 2024
1 parent 2416f5c commit 12d6849
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 12 deletions.
73 changes: 61 additions & 12 deletions src/poetry/publishing/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import hashlib
import io

from contextlib import suppress
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from typing import NamedTuple

import requests

Expand Down Expand Up @@ -48,6 +50,59 @@ def __init__(self, error: ConnectionError | HTTPError | str) -> None:
super().__init__(message)


class Hexdigest(NamedTuple):
md5: str | None
sha256: str | None
blake2_256: str | None


class HashManager:
def __init__(self) -> None:
self._sha2_hasher = hashlib.sha256()

self._md5_hasher = None
with suppress(ValueError):
# FIPS mode disables MD5
self._md5_hasher = hashlib.md5()

self._blake_hasher = None
with suppress(ValueError, TypeError):
# FIPS mode disables blake2
self._blake_hasher = hashlib.blake2b(digest_size=256 // 8)

def _md5_update(self, content: bytes) -> None:
if self._md5_hasher is not None:
self._md5_hasher.update(content)

def _md5_hexdigest(self) -> str | None:
if self._md5_hasher is not None:
return self._md5_hasher.hexdigest()
return None

def _blake_update(self, content: bytes) -> None:
if self._blake_hasher is not None:
self._blake_hasher.update(content)

def _blake_hexdigest(self) -> str | None:
if self._blake_hasher is not None:
return self._blake_hasher.hexdigest()
return None

def hash(self, file: Path) -> None:
with file.open("rb") as fp:
for content in iter(lambda: fp.read(io.DEFAULT_BUFFER_SIZE), b""):
self._md5_update(content)
self._sha2_hasher.update(content)
self._blake_update(content)

def hexdigest(self) -> Hexdigest:
return Hexdigest(
self._md5_hexdigest(),
self._sha2_hasher.hexdigest(),
self._blake_hexdigest(),
)


class Uploader:
def __init__(self, poetry: Poetry, io: IO, dist_dir: Path | None = None) -> None:
self._poetry = poetry
Expand Down Expand Up @@ -126,19 +181,13 @@ def post_data(self, file: Path) -> dict[str, Any]:

file_type = self._get_type(file)

blake2_256_hash = hashlib.blake2b(digest_size=256 // 8)

md5_hash = hashlib.md5()
sha256_hash = hashlib.sha256()
with file.open("rb") as fp:
for content in iter(lambda: fp.read(io.DEFAULT_BUFFER_SIZE), b""):
md5_hash.update(content)
sha256_hash.update(content)
blake2_256_hash.update(content)
hash_manager = HashManager()
hash_manager.hash(file)
file_hashes = hash_manager.hexdigest()

md5_digest = md5_hash.hexdigest()
sha2_digest = sha256_hash.hexdigest()
blake2_256_digest = blake2_256_hash.hexdigest()
md5_digest = file_hashes.md5
sha2_digest = file_hashes.sha256
blake2_256_digest = file_hashes.blake2_256

py_version: str | None = None
if file_type == "bdist_wheel":
Expand Down
67 changes: 67 additions & 0 deletions tests/publishing/test_file_hashes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import Any

import pytest

from poetry.publishing.uploader import HashManager


if TYPE_CHECKING:
from pathlib import Path

from pytest_mock import MockerFixture

from tests.types import FixtureDirGetter


@pytest.fixture
def distributions_dir(fixture_dir: FixtureDirGetter) -> Path:
return fixture_dir("distributions")


@pytest.mark.parametrize(
"file, hashes",
(
(
"demo-0.1.0.tar.gz",
(
"d1912c917363a64e127318655f7d1fe7",
"9fa123ad707a5c6c944743bf3e11a0e80d86cb518d3cf25320866ca3ef43e2ad",
"cb638093d63df647e70b03e963bedc31e021cb088695e29101b69f525e3d5fef",
),
),
(
"demo-0.1.2-py2.py3-none-any.whl",
(
"53b4e10d2bfa81a4206221c4b87843d9",
"55dde4e6828081de7a1e429f33180459c333d9da593db62a3d75a8f5e505dde1",
"b35b9aab064e88fffe42309550ebe425907fb42ccb3b1d173b7d6b7509f38eac",
),
),
),
)
def test_file_hashes_returns_proper_hashes_for_file(
file: str, hashes: tuple[str, ...], distributions_dir: Path
) -> None:
manager = HashManager()
manager.hash(distributions_dir / file)
file_hashes = manager.hexdigest()
assert file_hashes == hashes


def test_file_hashes_returns_none_for_md5_with_fips(
mocker: MockerFixture, distributions_dir: Path
) -> None:
# disable md5
def fips_md5(*args: Any, **kwargs: Any) -> Any:
raise ValueError("Disabled by FIPS")

mocker.patch("hashlib.md5", new=fips_md5)

manager = HashManager()
manager.hash(distributions_dir / "demo-0.1.0.tar.gz")
file_hashes = manager.hexdigest()

assert file_hashes.md5 is None

0 comments on commit 12d6849

Please sign in to comment.