Skip to content

Commit

Permalink
fix: parse distribution filenames consistently in upload and delete
Browse files Browse the repository at this point in the history
  • Loading branch information
mdwint committed Jan 1, 2024
1 parent 4ea5d5b commit cdeeb01
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 45 deletions.
48 changes: 19 additions & 29 deletions s3pypi/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import email
import logging
import re
from contextlib import suppress
Expand All @@ -7,7 +6,6 @@
from operator import attrgetter
from pathlib import Path
from typing import List
from zipfile import ZipFile

import boto3

Expand All @@ -19,18 +17,20 @@

log = logging.getLogger(__prog__)

PackageMetadata = email.message.Message


@dataclass
class Config:
s3: S3Config


@dataclass
class Distribution:
class DistributionId:
name: str
version: str


@dataclass
class Distribution(DistributionId):
local_path: Path


Expand Down Expand Up @@ -76,19 +76,19 @@ def upload_packages(


def parse_distribution(path: Path) -> Distribution:
d = parse_distribution_id(path.name)
return Distribution(d.name, d.version, path)


def parse_distribution_id(filename: str) -> DistributionId:
extensions = (".whl", ".tar.gz", ".tar.bz2", ".tar.xz", ".zip")

ext = next((ext for ext in extensions if path.name.endswith(ext)), "")
ext = next((ext for ext in extensions if filename.endswith(ext)), "")
if not ext:
raise S3PyPiError(f"Unknown file type: {path}")

if ext == ".whl":
meta = extract_wheel_metadata(path)
name, version = meta["Name"], meta["Version"]
else:
name, version = path.name[: -len(ext)].rsplit("-", 1)
raise S3PyPiError(f"Unknown file type: {filename}")

return Distribution(name, version, path)
name, version = filename[: -len(ext)].split("-", 2)[:2]
return DistributionId(name, version)


def parse_distributions(paths: List[Path]) -> List[Distribution]:
Expand All @@ -110,26 +110,16 @@ def parse_distributions(paths: List[Path]) -> List[Distribution]:
return dists


def extract_wheel_metadata(path: Path) -> PackageMetadata:
with ZipFile(path, "r") as whl:
try:
text = next(
whl.open(fname).read().decode()
for fname in whl.namelist()
if fname.endswith("METADATA")
)
except StopIteration:
raise S3PyPiError(f"No wheel metadata found in {path}") from None

return email.message_from_string(text)


def delete_package(cfg: Config, name: str, version: str) -> None:
storage = S3Storage(cfg.s3)
directory = normalize_package_name(name)

with storage.locked_index(directory) as index:
filenames = [f for f in index.filenames if f.split("-", 2)[1] == version]
filenames = [
filename
for filename in index.filenames
if parse_distribution_id(filename).version == version
]
if not filenames:
raise S3PyPiError(f"Package not found: {name} {version}")

Expand Down
Empty file.
1 change: 1 addition & 0 deletions tests/integration/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def assert_pkg_exists(pkg: str, filename: str):
for deleted_key in [
"hello-world/",
"hello-world/hello_world-0.1.0-py3-none-any.whl",
"hello-world/hello_world-0.1.0.tar.gz",
]:
with pytest.raises(s3_bucket.meta.client.exceptions.NoSuchKey):
s3_bucket.Object(deleted_key).get()
Expand Down
21 changes: 5 additions & 16 deletions tests/unit/test_core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from pathlib import Path

import pytest

from s3pypi import core
Expand All @@ -18,20 +16,11 @@ def test_normalize_package_name(name, normalized):


@pytest.mark.parametrize(
"dist",
"filename, dist",
[
core.Distribution(
name="hello-world",
version="0.1.0",
local_path=Path("dist/hello-world-0.1.0.tar.gz"),
),
core.Distribution(
name="foo-bar",
version="1.2.3",
local_path=Path("dist/foo-bar-1.2.3.zip"),
),
("hello_world-0.1.0.tar.gz", core.DistributionId("hello_world", "0.1.0")),
("foo_bar-1.2.3-py3-none-any.whl", core.DistributionId("foo_bar", "1.2.3")),
],
)
def test_parse_distribution(dist):
path = dist.local_path
assert core.parse_distribution(path) == dist
def test_parse_distribution_id(filename, dist):
assert core.parse_distribution_id(filename) == dist

0 comments on commit cdeeb01

Please sign in to comment.