diff --git a/tests/functional/forklift/test_legacy.py b/tests/functional/forklift/test_legacy.py index 8088b20475d7..d520d6b09558 100644 --- a/tests/functional/forklift/test_legacy.py +++ b/tests/functional/forklift/test_legacy.py @@ -17,6 +17,8 @@ import pymacaroons import pytest +from webob.multidict import MultiDict + from warehouse.macaroons import caveats from ...common.db.accounts import UserFactory @@ -58,7 +60,14 @@ def test_remove_doc_upload(webtest): ) -def test_file_upload(webtest): +@pytest.mark.parametrize( + ("upload_url", "additional_data"), + [ + ("/legacy/?:action=file_upload", {}), + ("/legacy/", {":action": "file_upload", "protocol_version": "1"}), + ], +) +def test_file_upload(webtest, upload_url, additional_data): user = UserFactory.create( with_verified_primary_email=True, password=( # 'password' @@ -90,10 +99,8 @@ def test_file_upload(webtest): with open("./tests/functional/_fixtures/sampleproject-3.0.0.tar.gz", "rb") as f: content = f.read() - webtest.post( - "/legacy/?:action=file_upload", - headers={"Authorization": f"Basic {credentials}"}, - params={ + params = MultiDict( + { "name": "sampleproject", "sha256_digest": ( "117ed88e5db073bb92969a7545745fd977ee85b7019706dd256a64058f70963d" @@ -101,7 +108,18 @@ def test_file_upload(webtest): "filetype": "sdist", "metadata_version": "2.1", "version": "3.0.0", - }, + } + ) + params.update(additional_data) + params.add("project-url", "https://example.com/foo") + params.add("project-url", "https://example.com/bar") + params.add("classifiers", "Programming Language :: Python :: 3.10") + params.add("classifiers", "Programming Language :: Python :: 3.11") + + webtest.post( + upload_url, + headers={"Authorization": f"Basic {credentials}"}, + params=params, upload_files=[("content", "sampleproject-3.0.0.tar.gz", content)], status=HTTPStatus.OK, ) diff --git a/tests/functional/test_config.py b/tests/functional/test_config.py index c69c95ff321c..9cc3133aa7c7 100644 --- a/tests/functional/test_config.py +++ b/tests/functional/test_config.py @@ -36,4 +36,6 @@ def test_rejects_duplicate_post_keys(webtest, socket_enabled): body.add("foo", "bar") body.add("foo", "baz") - webtest.post("/account/login", params=body, status=HTTPStatus.BAD_REQUEST) + resp = webtest.post("/account/login", params=body, status=HTTPStatus.BAD_REQUEST) + assert "POST body may not contain duplicate keys" in resp.body.decode() + assert "(URL: 'http://localhost/account/login')" in resp.body.decode() diff --git a/warehouse/config.py b/warehouse/config.py index ec71b3278b27..753bb9aaee76 100644 --- a/warehouse/config.py +++ b/warehouse/config.py @@ -274,11 +274,13 @@ def reject_duplicate_post_keys_view(view, info): @functools.wraps(view) def wrapped(context, request): if request.POST: - # Attempt to cast to a dict to catch duplicate keys - try: - dict(**request.POST) - except TypeError: - return HTTPBadRequest("POST body may not contain duplicate keys") + # Determine if there are any duplicate keys + keys = list(request.POST.keys()) + if len(keys) != len(set(keys)): + return HTTPBadRequest( + "POST body may not contain duplicate keys " + f"(URL: {request.url!r})" + ) # Casting succeeded, so just return the regular view return view(context, request)