diff --git a/requirements/main.in b/requirements/main.in index 1f1373b8a432..4f4510e55e3d 100644 --- a/requirements/main.in +++ b/requirements/main.in @@ -51,6 +51,7 @@ stdlib-list structlog transaction typeguard +webauthn whitenoise WTForms>=2.0.0 zope.sqlalchemy diff --git a/requirements/main.txt b/requirements/main.txt index ea8e016a6c84..e26050bc41f8 100644 --- a/requirements/main.txt +++ b/requirements/main.txt @@ -68,6 +68,9 @@ botocore==1.12.161 \ cachetools==3.1.1 \ --hash=sha256:428266a1c0d36dc5aca63a2d7c5942e88c2c898d72139fca0e97fdd2380517ae \ --hash=sha256:8ea2d3ce97850f31e4a08b0e2b5e6c34997d7216a9d2c98e0f3978630d4da69a +cbor2==4.1.2 \ + --hash=sha256:17b615da69964f87e48c5adb34ba63db3068f65b9cd14a7b099503d9f8a0e9ae \ + --hash=sha256:6391fd3d2a4e976ecf892638a0a2a88d85e6764124bf9f128a945bfefefe77dc celery-redbeat==0.13.0 \ --hash=sha256:8ba060f5fffa96fc6dbb0e37a10506cada03333f34ba502b134cbbdb08891266 celery[sqs]==4.3.0 \ @@ -185,6 +188,8 @@ elasticsearch==6.4.0 \ first==2.0.2 \ --hash=sha256:8d8e46e115ea8ac652c76123c0865e3ff18372aef6f03c22809ceefcea9dec86 \ --hash=sha256:ff285b08c55f8c97ce4ea7012743af2495c9f1291785f163722bd36f6af6d3bf +future==0.17.1 \ + --hash=sha256:67045236dcfd6816dc439556d009594abf643e5eb48992e36beac09c2ca659b8 google-api-core==1.11.1 \ --hash=sha256:18d58e87ce51046ad76961ba320903657182622e3e368e502381b11f63015c66 \ --hash=sha256:a1312fb2555516cfb55908e5d28762838315c21f7ce43d243ab3d11f215e2238 @@ -434,6 +439,9 @@ pycurl==7.43.0.2 \ pygments==2.4.2 \ --hash=sha256:71e430bc85c88a430f000ac1d9b331d2407f681d6f6aec95e8bcfbc3df5b0127 \ --hash=sha256:881c4c157e45f30af185c1ffe8d549d48ac9127433f2c380c24b84572ad66297 +pyOpenSSL==19.0.0 \ + --hash=sha256:aeca66338f6de19d1aa46ed634c3b9ae519a64b458f8468aec688e7e3c20f200 \ + --hash=sha256:c727930ad54b10fc157015014b666f2d8b41f70c0d03e83ab67624fd3dd5d1e6 pyparsing==2.4.0 \ --hash=sha256:1873c03321fc118f4e9746baf201ff990ceb915f433f23b395f5580d1840cb2a \ --hash=sha256:9b6323ef4ab914af344ba97510e966d64ba91055d6b9afa6b30799340e89cc03 @@ -532,6 +540,8 @@ venusian==1.2.0 \ vine==1.3.0 \ --hash=sha256:133ee6d7a9016f177ddeaf191c1f58421a1dcc6ee9a42c58b34bed40e1d2cd87 \ --hash=sha256:ea4947cc56d1fd6f2095c8d543ee25dad966f78692528e68b4fada11ba3f98af +webauthn==0.4.5 \ + --hash=sha256:db0bfc8b74896e209bfb290815c0a563f785ec5be321e1db17e9ac627ec3562f webencodings==0.5.1 \ --hash=sha256:a0af1213f3c2226497a97e2b3aa01a7e4bee4f403f95be16fc9acd2947514a78 \ --hash=sha256:b36a1c245f2d304965eb4e0a82848379241dc04b865afcc4aab16748587e1923 diff --git a/tests/unit/accounts/test_forms.py b/tests/unit/accounts/test_forms.py index fa1c8f4d5661..0a804088f14c 100644 --- a/tests/unit/accounts/test_forms.py +++ b/tests/unit/accounts/test_forms.py @@ -10,6 +10,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + import pretend import pytest import wtforms @@ -17,6 +19,7 @@ from warehouse.accounts import forms from warehouse.accounts.interfaces import TooManyFailedLogins from warehouse.accounts.models import DisableReason +from warehouse.utils.webauthn import AuthenticationRejectedException class TestLoginForm: @@ -567,22 +570,22 @@ def test_password_breached(self): ) -class TestTwoFactorForm: +class TestTOTPAuthenticationForm: def test_creation(self): user_id = pretend.stub() user_service = pretend.stub() - form = forms.TwoFactorForm(user_id=user_id, user_service=user_service) + form = forms.TOTPAuthenticationForm(user_id=user_id, user_service=user_service) assert form.user_service is user_service def test_totp_secret_exists(self): - form = forms.TwoFactorForm( + form = forms.TOTPAuthenticationForm( data={"totp_value": ""}, user_id=pretend.stub(), user_service=pretend.stub() ) assert not form.validate() assert form.totp_value.errors.pop() == "This field is required." - form = forms.TwoFactorForm( + form = forms.TOTPAuthenticationForm( data={"totp_value": "not_a_real_value"}, user_id=pretend.stub(), user_service=pretend.stub(check_totp_value=lambda *a: True), @@ -590,7 +593,7 @@ def test_totp_secret_exists(self): assert not form.validate() assert form.totp_value.errors.pop() == "TOTP code must be 6 digits." - form = forms.TwoFactorForm( + form = forms.TOTPAuthenticationForm( data={"totp_value": "123456"}, user_id=pretend.stub(), user_service=pretend.stub(check_totp_value=lambda *a: False), @@ -598,9 +601,77 @@ def test_totp_secret_exists(self): assert not form.validate() assert form.totp_value.errors.pop() == "Invalid TOTP code." - form = forms.TwoFactorForm( + form = forms.TOTPAuthenticationForm( data={"totp_value": "123456"}, user_id=pretend.stub(), user_service=pretend.stub(check_totp_value=lambda *a: True), ) assert form.validate() + + +class TestWebAuthnAuthenticationForm: + def test_creation(self): + user_id = pretend.stub() + user_service = pretend.stub() + challenge = pretend.stub() + origin = pretend.stub() + icon_url = pretend.stub() + rp_id = pretend.stub() + + form = forms.WebAuthnAuthenticationForm( + user_id=user_id, + user_service=user_service, + challenge=challenge, + origin=origin, + icon_url=icon_url, + rp_id=rp_id, + ) + + assert form.challenge is challenge + + def test_credential_bad_payload(self): + form = forms.WebAuthnAuthenticationForm( + credential="not valid json", + user_id=pretend.stub(), + user_service=pretend.stub(), + challenge=pretend.stub(), + origin=pretend.stub(), + icon_url=pretend.stub(), + rp_id=pretend.stub(), + ) + assert not form.validate() + assert form.credential.errors.pop() == "Invalid WebAuthn assertion: Bad payload" + + def test_credential_invalid(self): + form = forms.WebAuthnAuthenticationForm( + credential=json.dumps({}), + user_id=pretend.stub(), + user_service=pretend.stub( + verify_webauthn_assertion=pretend.raiser( + AuthenticationRejectedException("foo") + ) + ), + challenge=pretend.stub(), + origin=pretend.stub(), + icon_url=pretend.stub(), + rp_id=pretend.stub(), + ) + assert not form.validate() + assert form.credential.errors.pop() == "foo" + + def test_credential_valid(self): + form = forms.WebAuthnAuthenticationForm( + credential=json.dumps({}), + user_id=pretend.stub(), + user_service=pretend.stub( + verify_webauthn_assertion=pretend.call_recorder( + lambda *a, **kw: ("foo", 123456) + ) + ), + challenge=pretend.stub(), + origin=pretend.stub(), + icon_url=pretend.stub(), + rp_id=pretend.stub(), + ) + assert form.validate() + assert form.validated_credential == ("foo", 123456) diff --git a/tests/unit/accounts/test_services.py b/tests/unit/accounts/test_services.py index b6a940af832b..10155937d0f6 100644 --- a/tests/unit/accounts/test_services.py +++ b/tests/unit/accounts/test_services.py @@ -22,6 +22,7 @@ from zope.interface.verify import verifyClass import warehouse.utils.otp as otp +import warehouse.utils.webauthn as webauthn from warehouse.accounts import services from warehouse.accounts.interfaces import ( @@ -351,6 +352,24 @@ def test_has_two_factor(self, user_service): user_service.update_user(user.id, totp_secret=b"foobar") assert user_service.has_two_factor(user.id) + def test_has_totp(self, user_service): + user = UserFactory.create() + assert not user_service.has_totp(user.id) + user_service.update_user(user.id, totp_secret=b"foobar") + assert user_service.has_totp(user.id) + + def test_has_webauthn(self, user_service): + user = UserFactory.create() + assert not user_service.has_webauthn(user.id) + user_service.add_webauthn( + user.id, + label="test_label", + credential_id="foo", + public_key="bar", + sign_count=1, + ) + assert user_service.has_webauthn(user.id) + @pytest.mark.parametrize("valid", [True, False]) def test_check_totp_value(self, user_service, monkeypatch, valid): verify_totp = pretend.call_recorder(lambda *a: valid) @@ -406,6 +425,184 @@ def test_check_totp_value_user_rate_limited(self, user_service, metrics): ), ] + @pytest.mark.parametrize( + ("challenge", "rp_name", "rp_id", "icon_url"), + ( + ["fake_challenge", "fake_rp_name", "fake_rp_id", "fake_icon_url"], + [None, None, None, None], + ), + ) + def test_get_webauthn_credential_options( + self, user_service, challenge, rp_name, rp_id, icon_url + ): + user = UserFactory.create() + options = user_service.get_webauthn_credential_options( + user.id, + challenge=challenge, + rp_name=rp_name, + rp_id=rp_id, + icon_url=icon_url, + ) + + assert options["user"]["id"] == str(user.id) + assert options["user"]["name"] == user.username + assert options["user"]["displayName"] == user.name + assert options["challenge"] == challenge + assert options["rp"]["name"] == rp_name + assert options["rp"]["id"] == rp_id + + if icon_url: + assert options["user"]["icon"] == icon_url + else: + assert "icon" not in options["user"] + + def test_get_webauthn_assertion_options(self, user_service): + user = UserFactory.create() + user_service.add_webauthn( + user.id, + label="test_label", + credential_id="foo", + public_key="bar", + sign_count=1, + ) + + options = user_service.get_webauthn_assertion_options( + user.id, + challenge="fake_challenge", + icon_url="fake_icon_url", + rp_id="fake_rp_id", + ) + + assert options["challenge"] == "fake_challenge" + assert options["rpId"] == "fake_rp_id" + assert options["allowCredentials"][0]["id"] == user.webauthn[0].credential_id + + def test_verify_webauthn_credential(self, user_service, monkeypatch): + user = UserFactory.create() + user_service.add_webauthn( + user.id, + label="test_label", + credential_id="foo", + public_key="bar", + sign_count=1, + ) + + fake_validated_credential = pretend.stub(credential_id=b"bar") + verify_registration_response = pretend.call_recorder( + lambda *a, **kw: fake_validated_credential + ) + monkeypatch.setattr( + webauthn, "verify_registration_response", verify_registration_response + ) + + validated_credential = user_service.verify_webauthn_credential( + pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + assert validated_credential is fake_validated_credential + + def test_verify_webauthn_credential_already_in_use(self, user_service, monkeypatch): + user = UserFactory.create() + user_service.add_webauthn( + user.id, + label="test_label", + credential_id="foo", + public_key="bar", + sign_count=1, + ) + + fake_validated_credential = pretend.stub(credential_id=b"foo") + verify_registration_response = pretend.call_recorder( + lambda *a, **kw: fake_validated_credential + ) + monkeypatch.setattr( + webauthn, "verify_registration_response", verify_registration_response + ) + + with pytest.raises(webauthn.RegistrationRejectedException): + user_service.verify_webauthn_credential( + pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + def test_verify_webauthn_assertion(self, user_service, monkeypatch): + user = UserFactory.create() + user_service.add_webauthn( + user.id, + label="test_label", + credential_id="foo", + public_key="bar", + sign_count=1, + ) + + verify_assertion_response = pretend.call_recorder(lambda *a, **kw: 2) + monkeypatch.setattr( + webauthn, "verify_assertion_response", verify_assertion_response + ) + + updated_sign_count = user_service.verify_webauthn_assertion( + user.id, + pretend.stub(), + challenge=pretend.stub(), + origin=pretend.stub(), + icon_url=pretend.stub(), + rp_id=pretend.stub(), + ) + assert updated_sign_count == 2 + + def test_get_webauthn_by_label(self, user_service): + user = UserFactory.create() + user_service.add_webauthn( + user.id, + label="test_label", + credential_id="foo", + public_key="bar", + sign_count=1, + ) + + webauthn = user_service.get_webauthn_by_label(user.id, "test_label") + assert webauthn is not None + assert webauthn.label == "test_label" + + webauthn = user_service.get_webauthn_by_label(user.id, "not_a_real_label") + assert webauthn is None + + other_user = UserFactory.create() + webauthn = user_service.get_webauthn_by_label(other_user.id, "test_label") + assert webauthn is None + + def test_get_webauthn_by_credential_id(self, user_service): + user = UserFactory.create() + user_service.add_webauthn( + user.id, + label="foo", + credential_id="test_credential_id", + public_key="bar", + sign_count=1, + ) + + webauthn = user_service.get_webauthn_by_credential_id( + user.id, "test_credential_id" + ) + assert webauthn is not None + assert webauthn.credential_id == "test_credential_id" + + webauthn = user_service.get_webauthn_by_credential_id( + user.id, "not_a_real_label" + ) + assert webauthn is None + + other_user = UserFactory.create() + webauthn = user_service.get_webauthn_by_credential_id( + other_user.id, "test_credential_id" + ) + assert webauthn is None + class TestTokenService: def test_verify_service(self): diff --git a/tests/unit/accounts/test_views.py b/tests/unit/accounts/test_views.py index 853cc892e336..482b92bbc13b 100644 --- a/tests/unit/accounts/test_views.py +++ b/tests/unit/accounts/test_views.py @@ -316,7 +316,7 @@ def test_two_factor_auth(self, pyramid_request, redirect_url, token_service): class TestTwoFactor: @pytest.mark.parametrize("redirect_url", [None, "/foo/bar/", "/wat/"]) - def test_get_returns_form(self, pyramid_request, redirect_url): + def test_get_returns_totp_form(self, pyramid_request, redirect_url): query_params = {"userid": 1} if redirect_url: query_params["redirect_to"] = redirect_url @@ -328,6 +328,8 @@ def test_get_returns_form(self, pyramid_request, redirect_url): user_service = pretend.stub( find_userid=pretend.call_recorder(lambda username: 1), update_user=lambda *a, **k: None, + has_totp=lambda uid: True, + has_webauthn=lambda uid: False, ) pyramid_request.find_service = lambda interface, **kwargs: { @@ -340,10 +342,12 @@ def test_get_returns_form(self, pyramid_request, redirect_url): form_obj = pretend.stub() form_class = pretend.call_recorder(lambda d, user_service, **kw: form_obj) - result = views.two_factor(pyramid_request, _form_class=form_class) + result = views.two_factor_and_totp_validate( + pyramid_request, _form_class=form_class + ) assert token_service.loads.calls == [pretend.call(pyramid_request.query_string)] - assert result == {"form": form_obj} + assert result == {"totp_form": form_obj} assert form_class.calls == [ pretend.call( pyramid_request.POST, @@ -353,8 +357,38 @@ def test_get_returns_form(self, pyramid_request, redirect_url): ) ] + @pytest.mark.parametrize("redirect_url", [None, "/foo/bar/", "/wat/"]) + def test_get_returns_webauthn(self, pyramid_request, redirect_url): + query_params = {"userid": 1} + if redirect_url: + query_params["redirect_to"] = redirect_url + + token_service = pretend.stub( + loads=pretend.call_recorder(lambda s: query_params) + ) + + user_service = pretend.stub( + find_userid=pretend.call_recorder(lambda username: 1), + update_user=lambda *a, **k: None, + has_totp=lambda uid: False, + has_webauthn=lambda uid: True, + ) + + pyramid_request.find_service = lambda interface, **kwargs: { + ITokenService: token_service, + IUserService: user_service, + }[interface] + + pyramid_request.query_string = pretend.stub() + result = views.two_factor_and_totp_validate( + pyramid_request, _form_class=pretend.stub() + ) + + assert token_service.loads.calls == [pretend.call(pyramid_request.query_string)] + assert result == {"has_webauthn": True} + @pytest.mark.parametrize("redirect_url", ["test_redirect_url", None]) - def test_two_factor_auth(self, monkeypatch, pyramid_request, redirect_url): + def test_totp_auth(self, monkeypatch, pyramid_request, redirect_url): remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")]) monkeypatch.setattr(views, "remember", remember) @@ -369,6 +403,8 @@ def test_two_factor_auth(self, monkeypatch, pyramid_request, redirect_url): user_service = pretend.stub( find_userid=pretend.call_recorder(lambda username: 1), update_user=lambda *a, **k: None, + has_totp=lambda userid: True, + has_webauthn=lambda userid: False, check_totp_value=lambda userid, totp_value: True, ) @@ -402,7 +438,9 @@ def test_two_factor_auth(self, monkeypatch, pyramid_request, redirect_url): pyramid_request.params = pretend.stub( get=pretend.call_recorder(lambda k: query_params.get(k)) ) - result = views.two_factor(pyramid_request, _form_class=form_class) + result = views.two_factor_and_totp_validate( + pyramid_request, _form_class=form_class + ) token_expected_data = {"userid": str(1)} if redirect_url: @@ -415,7 +453,7 @@ def test_two_factor_auth(self, monkeypatch, pyramid_request, redirect_url): assert pyramid_request.session.new_csrf_token.calls == [pretend.call()] @pytest.mark.parametrize("redirect_url", ["test_redirect_url", None]) - def test_two_factor_auth_invalid(self, pyramid_request, redirect_url): + def test_totp_auth_invalid(self, pyramid_request, redirect_url): query_params = {"userid": str(1)} if redirect_url: query_params["redirect_to"] = redirect_url @@ -427,6 +465,8 @@ def test_two_factor_auth_invalid(self, pyramid_request, redirect_url): user_service = pretend.stub( find_userid=pretend.call_recorder(lambda username: 1), update_user=lambda *a, **k: None, + has_totp=lambda userid: True, + has_webauthn=lambda userid: False, check_totp_value=lambda userid, totp_value: False, ) @@ -448,7 +488,9 @@ def test_two_factor_auth_invalid(self, pyramid_request, redirect_url): pyramid_request.params = pretend.stub( get=pretend.call_recorder(lambda k: query_params.get(k)) ) - result = views.two_factor(pyramid_request, _form_class=form_class) + result = views.two_factor_and_totp_validate( + pyramid_request, _form_class=form_class + ) token_expected_data = {"userid": str(1)} if redirect_url: @@ -456,43 +498,28 @@ def test_two_factor_auth_invalid(self, pyramid_request, redirect_url): assert isinstance(result, HTTPSeeOther) - def test_two_factor_auth_already_authed(self): + def test_totp_auth_already_authed(self): request = pretend.stub( authenticated_userid="not_none", route_path=pretend.call_recorder(lambda p: "redirect_to"), ) - result = views.two_factor(request) + result = views.two_factor_and_totp_validate(request) assert request.route_path.calls == [pretend.call("manage.projects")] assert isinstance(result, HTTPSeeOther) assert result.headers["Location"] == "redirect_to" - def test_two_factor_missing_userid(self): - token_service = pretend.stub(loads=pretend.call_recorder(lambda s: {})) - - request = pretend.stub( - session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), - authenticated_userid=None, - route_path=pretend.call_recorder(lambda p: "redirect_to"), - find_service=lambda interface, **kwargs: {ITokenService: token_service}[ - interface - ], - query_string=pretend.stub(), - ) - result = views.two_factor(request) - - assert token_service.loads.calls == [pretend.call(request.query_string)] - assert request.route_path.calls == [pretend.call("accounts.login")] - assert request.session.flash.calls == [] - - assert isinstance(result, HTTPSeeOther) - assert result.headers["Location"] == "redirect_to" - - def test_two_factor_auth_form_invalid(self): + def test_totp_form_invalid(self): token_data = {"userid": 1} token_service = pretend.stub(loads=pretend.call_recorder(lambda s: token_data)) + user_service = pretend.stub( + has_totp=lambda userid: True, + has_webauthn=lambda userid: False, + check_totp_value=lambda userid, totp_value: False, + ) + request = pretend.stub( POST={}, method="POST", @@ -501,7 +528,7 @@ def test_two_factor_auth_form_invalid(self): route_path=pretend.call_recorder(lambda p: "redirect_to"), find_service=lambda interface, **kwargs: { ITokenService: token_service, - IUserService: pretend.stub(), + IUserService: user_service, }[interface], query_string=pretend.stub(), ) @@ -512,12 +539,35 @@ def test_two_factor_auth_form_invalid(self): ) form_class = pretend.call_recorder(lambda *a, **kw: form_obj) - result = views.two_factor(request, _form_class=form_class) + result = views.two_factor_and_totp_validate(request, _form_class=form_class) + + assert token_service.loads.calls == [pretend.call(request.query_string)] + assert result == {"totp_form": form_obj} + + def test_two_factor_token_missing_userid(self): + token_service = pretend.stub(loads=pretend.call_recorder(lambda s: {})) + + request = pretend.stub( + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), + authenticated_userid=None, + route_path=pretend.call_recorder(lambda p: "redirect_to"), + find_service=lambda interface, **kwargs: {ITokenService: token_service}[ + interface + ], + query_string=pretend.stub(), + ) + result = views.two_factor_and_totp_validate(request) assert token_service.loads.calls == [pretend.call(request.query_string)] - assert result == {"form": form_obj} + assert request.route_path.calls == [pretend.call("accounts.login")] + assert request.session.flash.calls == [ + pretend.call("Invalid or expired two factor login.", queue="error") + ] - def test_two_factor_auth_token_invalid(self): + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "redirect_to" + + def test_two_factor_token_invalid(self): token_service = pretend.stub(loads=pretend.raiser(TokenException)) request = pretend.stub( session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), @@ -529,7 +579,7 @@ def test_two_factor_auth_token_invalid(self): query_string=pretend.stub(), ) - result = views.two_factor(request) + result = views.two_factor_and_totp_validate(request) assert isinstance(result, HTTPSeeOther) assert result.headers["Location"] == "redirect_to" @@ -538,6 +588,166 @@ def test_two_factor_auth_token_invalid(self): ] +class TestWebAuthn: + def test_webauthn_get_options_already_authenticated(self): + request = pretend.stub(authenticated_userid=pretend.stub()) + result = views.webauthn_authentication_options(request) + + assert result == {"fail": {"errors": ["Already authenticated"]}} + + def test_webauthn_get_options_invalid_token(self, monkeypatch): + _get_two_factor_data = pretend.raiser(TokenException) + monkeypatch.setattr(views, "_get_two_factor_data", _get_two_factor_data) + + request = pretend.stub( + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), + authenticated_userid=None, + ) + result = views.webauthn_authentication_options(request) + + assert request.session.flash.calls == [ + pretend.call("Invalid or expired two factor login.", queue="error") + ] + assert result == {"fail": {"errors": ["Invalid two factor token"]}} + + def test_webauthn_get_options(self, monkeypatch): + _get_two_factor_data = pretend.call_recorder( + lambda r: {"redirect_to": "foobar", "userid": 1} + ) + monkeypatch.setattr(views, "_get_two_factor_data", _get_two_factor_data) + + user_service = pretend.stub( + get_webauthn_assertion_options=lambda *a, **kw: {"not": "real"} + ) + + request = pretend.stub( + session=pretend.stub( + flash=pretend.call_recorder(lambda *a, **kw: None), + get_webauthn_challenge=pretend.call_recorder(lambda: "not_real"), + ), + registry=pretend.stub(settings=pretend.stub(get=lambda *a: pretend.stub())), + domain=pretend.stub(), + authenticated_userid=None, + find_service=lambda interface, **kwargs: user_service, + ) + + result = views.webauthn_authentication_options(request) + + assert _get_two_factor_data.calls == [pretend.call(request)] + assert result == {"not": "real"} + + def test_webauthn_validate_already_authenticated(self): + request = pretend.stub(authenticated_userid=pretend.stub()) + result = views.webauthn_authentication_validate(request) + + assert result == {"fail": {"errors": ["Already authenticated"]}} + + def test_webauthn_validate_invalid_token(self, monkeypatch): + _get_two_factor_data = pretend.raiser(TokenException) + monkeypatch.setattr(views, "_get_two_factor_data", _get_two_factor_data) + + request = pretend.stub( + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), + authenticated_userid=None, + ) + result = views.webauthn_authentication_validate(request) + + assert request.session.flash.calls == [ + pretend.call("Invalid or expired two factor login.", queue="error") + ] + assert result == {"fail": {"errors": ["Invalid two factor token"]}} + + def test_webauthn_validate_invalid_form(self, monkeypatch): + _get_two_factor_data = pretend.call_recorder( + lambda r: {"redirect_to": "foobar", "userid": 1} + ) + monkeypatch.setattr(views, "_get_two_factor_data", _get_two_factor_data) + + request = pretend.stub( + authenticated_userid=None, + POST={}, + session=pretend.stub( + get_webauthn_challenge=pretend.call_recorder(lambda: "not_real"), + clear_webauthn_challenge=pretend.call_recorder(lambda: pretend.stub()), + ), + find_service=lambda *a, **kw: pretend.stub(), + host_url=pretend.stub(), + registry=pretend.stub(settings=pretend.stub(get=lambda *a: pretend.stub())), + rp_id=pretend.stub(), + domain=pretend.stub(), + ) + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: False), + credential=pretend.stub(errors=["Fake validation failure"]), + ) + form_class = pretend.call_recorder(lambda *a, **kw: form_obj) + monkeypatch.setattr(views, "WebAuthnAuthenticationForm", form_class) + + result = views.webauthn_authentication_validate(request) + + assert _get_two_factor_data.calls == [pretend.call(request)] + assert request.session.get_webauthn_challenge.calls == [pretend.call()] + assert request.session.clear_webauthn_challenge.calls == [pretend.call()] + + assert result == {"fail": {"errors": ["Fake validation failure"]}} + + def test_webauthn_validate(self, monkeypatch): + _get_two_factor_data = pretend.call_recorder( + lambda r: {"redirect_to": "foobar", "userid": 1} + ) + monkeypatch.setattr(views, "_get_two_factor_data", _get_two_factor_data) + + _login_user = pretend.call_recorder(lambda req, uid: pretend.stub()) + monkeypatch.setattr(views, "_login_user", _login_user) + + user = pretend.stub(webauthn=pretend.stub(sign_count=pretend.stub())) + + user_service = pretend.stub( + get_user=pretend.call_recorder(lambda uid: user), + get_webauthn_by_credential_id=pretend.call_recorder( + lambda *a: pretend.stub() + ), + ) + + request = pretend.stub( + authenticated_userid=None, + POST={}, + session=pretend.stub( + get_webauthn_challenge=pretend.call_recorder(lambda: "not_real"), + clear_webauthn_challenge=pretend.call_recorder(lambda: pretend.stub()), + ), + find_service=lambda *a, **kw: user_service, + host_url=pretend.stub(), + registry=pretend.stub(settings=pretend.stub(get=lambda *a: pretend.stub())), + rp_id=pretend.stub(), + domain=pretend.stub(), + response=pretend.stub( + set_cookie=pretend.call_recorder(lambda *a: pretend.stub()) + ), + ) + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + credential=pretend.stub(errors=["Fake validation failure"]), + validated_credential=(pretend.stub(), pretend.stub()), + ) + form_class = pretend.call_recorder(lambda *a, **kw: form_obj) + monkeypatch.setattr(views, "WebAuthnAuthenticationForm", form_class) + + result = views.webauthn_authentication_validate(request) + + assert _get_two_factor_data.calls == [pretend.call(request)] + assert _login_user.calls == [pretend.call(request, 1)] + assert request.session.get_webauthn_challenge.calls == [pretend.call()] + assert request.session.clear_webauthn_challenge.calls == [pretend.call()] + + assert result == { + "success": "Successful WebAuthn assertion", + "redirect_to": "foobar", + } + + class TestLogout: @pytest.mark.parametrize("next_url", [None, "/foo/bar/", "/wat/"]) def test_get_returns_empty(self, pyramid_request, next_url): diff --git a/tests/unit/manage/test_forms.py b/tests/unit/manage/test_forms.py index cef9ce84053c..e5d5107b7b18 100644 --- a/tests/unit/manage/test_forms.py +++ b/tests/unit/manage/test_forms.py @@ -17,6 +17,7 @@ from webob.multidict import MultiDict import warehouse.utils.otp as otp +import warehouse.utils.webauthn as webauthn from warehouse.manage import forms @@ -165,3 +166,158 @@ def test_creation(self): form = forms.DeleteTOTPForm(user_service=user_service) assert form.user_service is user_service + + +class TestProvisionWebAuthnForm: + def test_creation(self): + user_service = pretend.stub() + user_id = pretend.stub() + challenge = pretend.stub() + rp_id = pretend.stub() + origin = pretend.stub() + form = forms.ProvisionWebAuthnForm( + user_service=user_service, + user_id=user_id, + challenge=challenge, + rp_id=rp_id, + origin=origin, + ) + + assert form.user_service is user_service + assert form.user_id is user_id + assert form.challenge is challenge + assert form.rp_id is rp_id + assert form.origin is origin + + def test_verify_assertion_invalid_json(self): + user_service = pretend.stub( + get_webauthn_by_label=pretend.call_recorder(lambda *a: None) + ) + + form = forms.ProvisionWebAuthnForm( + data={"credential": "invalid json", "label": "fake label"}, + user_service=user_service, + user_id=pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + assert not form.validate() + assert ( + form.credential.errors.pop() == "Invalid WebAuthn credential: Bad payload" + ) + + def test_verify_assertion_invalid(self): + user_service = pretend.stub( + verify_webauthn_credential=pretend.raiser( + webauthn.RegistrationRejectedException("Fake exception") + ), + get_webauthn_by_label=pretend.call_recorder(lambda *a: None), + ) + form = forms.ProvisionWebAuthnForm( + data={"credential": "{}", "label": "fake label"}, + user_service=user_service, + user_id=pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + assert not form.validate() + assert form.credential.errors.pop() == "Fake exception" + + def test_verify_label_missing(self): + user_service = pretend.stub( + verify_webauthn_credential=lambda *a, **kw: pretend.stub() + ) + form = forms.ProvisionWebAuthnForm( + data={"credential": "{}"}, + user_service=user_service, + user_id=pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + assert not form.validate() + assert form.label.errors.pop() == "Specify a label" + + def test_verify_label_already_in_use(self): + user_service = pretend.stub( + verify_webauthn_credential=lambda *a, **kw: pretend.stub(), + get_webauthn_by_label=pretend.call_recorder(lambda *a: pretend.stub()), + ) + form = forms.ProvisionWebAuthnForm( + data={"credential": "{}", "label": "fake label"}, + user_service=user_service, + user_id=pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + assert not form.validate() + assert form.label.errors.pop() == "Label 'fake label' already in use" + + def test_creates_validated_credential(self): + fake_validated_credential = object() + user_service = pretend.stub( + verify_webauthn_credential=lambda *a, **kw: fake_validated_credential, + get_webauthn_by_label=pretend.call_recorder(lambda *a: None), + ) + form = forms.ProvisionWebAuthnForm( + data={"credential": "{}", "label": "fake label"}, + user_service=user_service, + user_id=pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + assert form.validate() + assert form.validated_credential is fake_validated_credential + + +class TestDeleteWebAuthnForm: + def test_creation(self): + user_service = pretend.stub() + user_id = pretend.stub() + form = forms.DeleteWebAuthnForm(user_service=user_service, user_id=user_id) + + assert form.user_service is user_service + + def test_validate_label_missing(self): + form = forms.DeleteWebAuthnForm( + user_service=pretend.stub(), user_id=pretend.stub() + ) + + assert not form.validate() + assert form.label.errors.pop() == "Specify a label" + + def test_validate_label_not_in_use(self): + user_service = pretend.stub( + get_webauthn_by_label=pretend.call_recorder(lambda *a: None) + ) + form = forms.DeleteWebAuthnForm( + data={"label": "fake label"}, + user_service=user_service, + user_id=pretend.stub(), + ) + + assert not form.validate() + assert form.label.errors.pop() == "No WebAuthn key with given label" + + def test_creates_webauthn_attribute(self): + fake_webauthn = object() + user_service = pretend.stub( + get_webauthn_by_label=pretend.call_recorder(lambda *a: fake_webauthn) + ) + form = forms.DeleteWebAuthnForm( + data={"label": "fake label"}, + user_service=user_service, + user_id=pretend.stub(), + ) + + assert form.validate() + assert form.webauthn is fake_webauthn diff --git a/tests/unit/manage/test_views.py b/tests/unit/manage/test_views.py index d0dffae84123..945a00d6a821 100644 --- a/tests/unit/manage/test_views.py +++ b/tests/unit/manage/test_views.py @@ -1088,6 +1088,218 @@ def test_delete_totp_two_factor_not_allowed(self): ] +class TestProvisionWebAuthn: + def test_get_webauthn_view(self): + user_service = pretend.stub() + request = pretend.stub(find_service=lambda *a, **kw: user_service) + + view = views.ProvisionWebAuthnViews(request) + result = view.webauthn_provision() + + assert result == {} + + def test_get_webauthn_options(self): + user_service = pretend.stub( + get_webauthn_credential_options=pretend.call_recorder( + lambda *a, **kw: {"not_real": "credential_options"} + ) + ) + request = pretend.stub( + user=pretend.stub(id=1234), + session=pretend.stub( + get_webauthn_challenge=pretend.call_recorder(lambda: "fake_challenge") + ), + find_service=lambda *a, **kw: user_service, + registry=pretend.stub( + settings={ + "site.name": "fake_site_name", + "warehouse.domain": "fake_domain", + } + ), + domain="fake_domain", + ) + + view = views.ProvisionWebAuthnViews(request) + result = view.webauthn_provision_options() + + assert result == {"not_real": "credential_options"} + assert user_service.get_webauthn_credential_options.calls == [ + pretend.call( + 1234, + challenge="fake_challenge", + rp_name=request.registry.settings["site.name"], + rp_id=request.domain, + icon_url=request.registry.settings["warehouse.domain"], + ) + ] + + def test_validate_webauthn_provision(self, monkeypatch): + user_service = pretend.stub( + add_webauthn=pretend.call_recorder(lambda *a, **kw: pretend.stub()) + ) + request = pretend.stub( + POST={}, + user=pretend.stub(id=1234, webauthn=None), + session=pretend.stub( + get_webauthn_challenge=pretend.call_recorder(lambda: "fake_challenge"), + clear_webauthn_challenge=pretend.call_recorder(lambda: pretend.stub()), + flash=pretend.call_recorder(lambda *a, **kw: None), + ), + find_service=lambda *a, **kw: user_service, + domain="fake_domain", + host_url="fake_host_url", + ) + + provision_webauthn_obj = pretend.stub( + validate=lambda: True, + validated_credential=pretend.stub( + credential_id=b"fake_credential_id", + public_key=b"fake_public_key", + sign_count=1, + ), + label=pretend.stub(data="fake_label"), + ) + provision_webauthn_cls = pretend.call_recorder( + lambda *a, **kw: provision_webauthn_obj + ) + monkeypatch.setattr(views, "ProvisionWebAuthnForm", provision_webauthn_cls) + + view = views.ProvisionWebAuthnViews(request) + result = view.validate_webauthn_provision() + + assert request.session.get_webauthn_challenge.calls == [pretend.call()] + assert request.session.clear_webauthn_challenge.calls == [pretend.call()] + assert user_service.add_webauthn.calls == [ + pretend.call( + 1234, + label="fake_label", + credential_id="fake_credential_id", + public_key="fake_public_key", + sign_count=1, + ) + ] + assert request.session.flash.calls == [ + pretend.call("WebAuthn successfully provisioned.", queue="success") + ] + assert result == {"success": "WebAuthn successfully provisioned"} + + def test_validate_webauthn_provision_invalid_form(self, monkeypatch): + user_service = pretend.stub( + add_webauthn=pretend.call_recorder(lambda *a, **kw: pretend.stub()) + ) + request = pretend.stub( + POST={}, + user=pretend.stub(id=1234, webauthn=None), + session=pretend.stub( + get_webauthn_challenge=pretend.call_recorder(lambda: "fake_challenge"), + clear_webauthn_challenge=pretend.call_recorder(lambda: pretend.stub()), + flash=pretend.call_recorder(lambda *a, **kw: None), + ), + find_service=lambda *a, **kw: user_service, + domain="fake_domain", + host_url="fake_host_url", + ) + + provision_webauthn_obj = pretend.stub( + validate=lambda: False, + errors=pretend.stub( + values=pretend.call_recorder(lambda: [["Not a real error"]]) + ), + ) + provision_webauthn_cls = pretend.call_recorder( + lambda *a, **kw: provision_webauthn_obj + ) + monkeypatch.setattr(views, "ProvisionWebAuthnForm", provision_webauthn_cls) + + view = views.ProvisionWebAuthnViews(request) + result = view.validate_webauthn_provision() + + assert request.session.get_webauthn_challenge.calls == [pretend.call()] + assert request.session.clear_webauthn_challenge.calls == [pretend.call()] + assert user_service.add_webauthn.calls == [] + assert result == {"fail": {"errors": ["Not a real error"]}} + + def test_delete_webauthn(self, monkeypatch): + request = pretend.stub( + POST={}, + user=pretend.stub( + id=1234, + username=pretend.stub(), + webauthn=pretend.stub( + __get__=pretend.call_recorder(lambda *a: [pretend.stub]), + __len__=pretend.call_recorder(lambda *a: 1), + remove=pretend.call_recorder(lambda *a: pretend.stub()), + ), + ), + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), + route_path=pretend.call_recorder(lambda x: "/foo/bar"), + find_service=lambda *a, **kw: pretend.stub(), + ) + + delete_webauthn_obj = pretend.stub( + validate=lambda: True, webauthn=pretend.stub() + ) + delete_webauthn_cls = pretend.call_recorder( + lambda *a, **kw: delete_webauthn_obj + ) + monkeypatch.setattr(views, "DeleteWebAuthnForm", delete_webauthn_cls) + + view = views.ProvisionWebAuthnViews(request) + result = view.delete_webauthn() + + assert request.session.flash.calls == [ + pretend.call("WebAuthn device deleted.", queue="success") + ] + assert request.route_path.calls == [pretend.call("manage.account")] + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "/foo/bar" + + def test_delete_webauthn_not_provisioned(self): + request = pretend.stub( + user=pretend.stub(id=1234, webauthn=[]), + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), + route_path=pretend.call_recorder(lambda x: "/foo/bar"), + find_service=lambda *a, **kw: pretend.stub(), + ) + + view = views.ProvisionWebAuthnViews(request) + result = view.delete_webauthn() + + assert request.session.flash.calls == [ + pretend.call("No WebAuthhn device to delete.", queue="error") + ] + assert request.route_path.calls == [pretend.call("manage.account")] + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "/foo/bar" + + def test_delete_webauthn_invalid_form(self, monkeypatch): + request = pretend.stub( + POST={}, + user=pretend.stub( + id=1234, username=pretend.stub(), webauthn=[pretend.stub()] + ), + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), + route_path=pretend.call_recorder(lambda x: "/foo/bar"), + find_service=lambda *a, **kw: pretend.stub(), + ) + + delete_webauthn_obj = pretend.stub(validate=lambda: False) + delete_webauthn_cls = pretend.call_recorder( + lambda *a, **kw: delete_webauthn_obj + ) + monkeypatch.setattr(views, "DeleteWebAuthnForm", delete_webauthn_cls) + + view = views.ProvisionWebAuthnViews(request) + result = view.delete_webauthn() + + assert request.session.flash.calls == [ + pretend.call("Invalid credentials.", queue="error") + ] + assert request.route_path.calls == [pretend.call("manage.account")] + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "/foo/bar" + + class TestManageProjects: def test_manage_projects(self, db_request): older_release = ReleaseFactory(created=datetime.datetime(2015, 1, 1)) diff --git a/tests/unit/packaging/test_models.py b/tests/unit/packaging/test_models.py index 0e25e723690c..5a7388919ce6 100644 --- a/tests/unit/packaging/test_models.py +++ b/tests/unit/packaging/test_models.py @@ -124,11 +124,19 @@ def test_acl(self, db_session): assert acls == [ (Allow, "group:admins", "admin"), (Allow, "group:moderators", "moderator"), - (Allow, str(owner1.user.id), ["manage:project", "upload"]), - (Allow, str(owner2.user.id), ["manage:project", "upload"]), - (Allow, str(maintainer1.user.id), ["upload"]), - (Allow, str(maintainer2.user.id), ["upload"]), - ] + ] + sorted( + [ + (Allow, str(owner1.user.id), ["manage:project", "upload"]), + (Allow, str(owner2.user.id), ["manage:project", "upload"]), + ], + key=lambda x: x[1], + ) + sorted( + [ + (Allow, str(maintainer1.user.id), ["upload"]), + (Allow, str(maintainer2.user.id), ["upload"]), + ], + key=lambda x: x[1], + ) class TestRelease: @@ -293,11 +301,19 @@ def test_acl(self, db_session): assert acls == [ (Allow, "group:admins", "admin"), (Allow, "group:moderators", "moderator"), - (Allow, str(owner1.user.id), ["manage:project", "upload"]), - (Allow, str(owner2.user.id), ["manage:project", "upload"]), - (Allow, str(maintainer1.user.id), ["upload"]), - (Allow, str(maintainer2.user.id), ["upload"]), - ] + ] + sorted( + [ + (Allow, str(owner1.user.id), ["manage:project", "upload"]), + (Allow, str(owner2.user.id), ["manage:project", "upload"]), + ], + key=lambda x: x[1], + ) + sorted( + [ + (Allow, str(maintainer1.user.id), ["upload"]), + (Allow, str(maintainer2.user.id), ["upload"]), + ], + key=lambda x: x[1], + ) @pytest.mark.parametrize( ("home_page", "expected"), diff --git a/tests/unit/test_routes.py b/tests/unit/test_routes.py index 54d32eb8fba2..7bdc6bc985c7 100644 --- a/tests/unit/test_routes.py +++ b/tests/unit/test_routes.py @@ -125,6 +125,16 @@ def add_policy(name, filename): ), pretend.call("accounts.login", "/account/login/", domain=warehouse), pretend.call("accounts.two-factor", "/account/two-factor/", domain=warehouse), + pretend.call( + "accounts.webauthn-authenticate.options", + "/accounts/webauthn-authenticate/options", + domain=warehouse, + ), + pretend.call( + "accounts.webauthn-authenticate.validate", + "/accounts/webauthn-authenticate/validate", + domain=warehouse, + ), pretend.call("accounts.logout", "/account/logout/", domain=warehouse), pretend.call("accounts.register", "/account/register/", domain=warehouse), pretend.call( @@ -149,6 +159,26 @@ def add_policy(name, filename): "/manage/account/totp-provision/image", domain=warehouse, ), + pretend.call( + "manage.account.webauthn-provision", + "/manage/account/webauthn-provision", + domain=warehouse, + ), + pretend.call( + "manage.account.webauthn-provision.options", + "/manage/account/webauthn-provision/options", + domain=warehouse, + ), + pretend.call( + "manage.account.webauthn-provision.validate", + "/manage/account/webauthn-provision/validate", + domain=warehouse, + ), + pretend.call( + "manage.account.webauthn-provision.delete", + "/manage/account/webauthn-provision/delete", + domain=warehouse, + ), pretend.call("manage.projects", "/manage/projects/", domain=warehouse), pretend.call( "manage.project.settings", diff --git a/tests/unit/test_sessions.py b/tests/unit/test_sessions.py index e42e26603b4a..c5acc2680388 100644 --- a/tests/unit/test_sessions.py +++ b/tests/unit/test_sessions.py @@ -21,6 +21,7 @@ import warehouse.sessions import warehouse.utils.otp as otp +import warehouse.utils.webauthn as webauthn from warehouse.sessions import ( InvalidSession, @@ -250,7 +251,7 @@ def test_get_csrf_token_empty(self): assert session.get_csrf_token() == "123456" assert session.new_csrf_token.calls == [pretend.call()] - def test_get_totp_secret(self, monkeypatch): + def test_get_totp_secret(self): session = Session() session[session._totp_secret_key] = b"foobar" @@ -271,6 +272,31 @@ def test_clear_totp_secret(self): session.clear_totp_secret() assert not session[session._totp_secret_key] + def test_get_webauthn_challenge(self): + session = Session() + session[session._webauthn_challenge_key] = "not_a_real_challenge" + + assert session.get_webauthn_challenge() == "not_a_real_challenge" + + def test_get_webauthn_challenge_empty(self, monkeypatch): + generate_webauthn_challenge = pretend.call_recorder( + lambda: "not_a_real_challenge" + ) + monkeypatch.setattr( + webauthn, "generate_webauthn_challenge", generate_webauthn_challenge + ) + + session = Session() + assert session.get_webauthn_challenge() == "not_a_real_challenge" + assert session._webauthn_challenge_key in session + + def test_clear_webauthn_challenge(self): + session = Session() + session[session._webauthn_challenge_key] = "not_a_real_challenge" + + session.clear_webauthn_challenge() + assert not session[session._webauthn_challenge_key] + class TestSessionFactory: def test_initialize(self, monkeypatch): diff --git a/tests/unit/utils/test_webauthn.py b/tests/unit/utils/test_webauthn.py new file mode 100644 index 000000000000..85b163c520e4 --- /dev/null +++ b/tests/unit/utils/test_webauthn.py @@ -0,0 +1,121 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pretend +import pytest +import webauthn as pywebauthn + +import warehouse.utils.webauthn as webauthn + + +def test_generate_webauthn_challenge(): + challenge = webauthn.generate_webauthn_challenge() + + assert isinstance(challenge, str) + assert ( + challenge + == webauthn._webauthn_b64encode( + webauthn._webauthn_b64decode(challenge) + ).decode() + ) + + +def test_verify_registration_response(monkeypatch): + response_obj = pretend.stub( + verify=pretend.call_recorder(lambda: "not a real object") + ) + response_cls = pretend.call_recorder(lambda *a, **kw: response_obj) + monkeypatch.setattr(pywebauthn, "WebAuthnRegistrationResponse", response_cls) + + resp = webauthn.verify_registration_response( + {}, "not_a_real_challenge", rp_id="fake_rp_id", origin="fake_origin" + ) + + assert response_cls.calls == [ + pretend.call( + "fake_rp_id", + "fake_origin", + {}, + webauthn._webauthn_b64encode("not_a_real_challenge".encode()).decode(), + ) + ] + assert resp == "not a real object" + + +def test_verify_registration_response_failure(monkeypatch): + response_obj = pretend.stub( + verify=pretend.raiser(pywebauthn.webauthn.RegistrationRejectedException) + ) + response_cls = pretend.call_recorder(lambda *a, **kw: response_obj) + monkeypatch.setattr(pywebauthn, "WebAuthnRegistrationResponse", response_cls) + + with pytest.raises(webauthn.RegistrationRejectedException): + webauthn.verify_registration_response( + {}, "not_a_real_challenge", rp_id="fake_rp_id", origin="fake_origin" + ) + + +def test_verify_assertion_response(monkeypatch): + assertion_obj = pretend.stub(verify=pretend.call_recorder(lambda: 1234)) + assertion_cls = pretend.call_recorder(lambda *a, **kw: assertion_obj) + monkeypatch.setattr(pywebauthn, "WebAuthnAssertionResponse", assertion_cls) + + not_a_real_user = pretend.stub(credential_id="not_a_real_credential") + get_webauthn_users = pretend.call_recorder(lambda *a, **kw: [not_a_real_user]) + monkeypatch.setattr(webauthn, "_get_webauthn_users", get_webauthn_users) + + not_a_real_assertion = object() + resp = webauthn.verify_assertion_response( + not_a_real_assertion, + challenge="not_a_real_challenge", + user=not_a_real_user, + origin="fake_origin", + icon_url="fake_icon_url", + rp_id="fake_rp_id", + ) + + assert get_webauthn_users.calls == [ + pretend.call(not_a_real_user, icon_url="fake_icon_url", rp_id="fake_rp_id") + ] + assert assertion_cls.calls == [ + pretend.call( + not_a_real_user, + not_a_real_assertion, + webauthn._webauthn_b64encode("not_a_real_challenge".encode()).decode(), + "fake_origin", + allow_credentials=["not_a_real_credential"], + ) + ] + assert resp == ("not_a_real_credential", 1234) + + +def test_verify_assertion_response_failure(monkeypatch): + assertion_obj = pretend.stub( + verify=pretend.raiser(pywebauthn.webauthn.AuthenticationRejectedException) + ) + assertion_cls = pretend.call_recorder(lambda *a, **kw: assertion_obj) + monkeypatch.setattr(pywebauthn, "WebAuthnAssertionResponse", assertion_cls) + + get_webauthn_users = pretend.call_recorder( + lambda *a, **kw: [pretend.stub(credential_id=pretend.stub())] + ) + monkeypatch.setattr(webauthn, "_get_webauthn_users", get_webauthn_users) + + with pytest.raises(webauthn.AuthenticationRejectedException): + webauthn.verify_assertion_response( + pretend.stub(), + challenge="not_a_real_challenge", + user=pretend.stub(), + origin="fake_origin", + icon_url="fake_icon_url", + rp_id="fake_rp_id", + ) diff --git a/warehouse/accounts/forms.py b/warehouse/accounts/forms.py index 24a773b70f51..c238893d100d 100644 --- a/warehouse/accounts/forms.py +++ b/warehouse/accounts/forms.py @@ -10,11 +10,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + import disposable_email_domains import jinja2 import wtforms import wtforms.fields.html5 +import warehouse.utils.webauthn as webauthn + from warehouse import forms from warehouse.accounts.interfaces import TooManyFailedLogins from warehouse.accounts.models import DisableReason @@ -46,6 +50,11 @@ class TOTPValueMixin: ) +class WebAuthnCredentialMixin: + + credential = wtforms.StringField(wtforms.validators.DataRequired()) + + class NewUsernameMixin: username = wtforms.StringField( @@ -242,18 +251,54 @@ def validate_password(self, field): ) -class TwoFactorForm(TOTPValueMixin, forms.Form): +class _TwoFactorAuthenticationForm(forms.Form): def __init__(self, *args, user_id, user_service, **kwargs): super().__init__(*args, **kwargs) self.user_id = user_id self.user_service = user_service + +class TOTPAuthenticationForm(TOTPValueMixin, _TwoFactorAuthenticationForm): def validate_totp_value(self, field): totp_value = field.data.encode("utf8") if not self.user_service.check_totp_value(self.user_id, totp_value): raise wtforms.validators.ValidationError("Invalid TOTP code.") +class WebAuthnAuthenticationForm(WebAuthnCredentialMixin, _TwoFactorAuthenticationForm): + __params__ = ["credential"] + + def __init__(self, *args, challenge, origin, icon_url, rp_id, **kwargs): + super().__init__(*args, **kwargs) + self.challenge = challenge + self.origin = origin + self.icon_url = icon_url + self.rp_id = rp_id + + def validate_credential(self, field): + try: + assertion_dict = json.loads(field.data.encode("utf8")) + except json.JSONDecodeError: + raise wtforms.validators.ValidationError( + f"Invalid WebAuthn assertion: Bad payload" + ) + + try: + validated_credential = self.user_service.verify_webauthn_assertion( + self.user_id, + assertion_dict, + challenge=self.challenge, + origin=self.origin, + icon_url=self.icon_url, + rp_id=self.rp_id, + ) + + except webauthn.AuthenticationRejectedException as e: + raise wtforms.validators.ValidationError(str(e)) + + self.validated_credential = validated_credential + + class RequestPasswordResetForm(forms.Form): username_or_email = wtforms.StringField( validators=[wtforms.validators.DataRequired()] diff --git a/warehouse/accounts/interfaces.py b/warehouse/accounts/interfaces.py index 8cb0cb2c7281..6cfa5c1a0cff 100644 --- a/warehouse/accounts/interfaces.py +++ b/warehouse/accounts/interfaces.py @@ -107,6 +107,16 @@ def has_two_factor(user_id): authentication and is allowed to use it. """ + def has_totp(user_id): + """ + Returns True if the user has a TOTP device provisioned. + """ + + def has_webauthn(user_id): + """ + Returns True if the user has a security key provisioned. + """ + def get_totp_secret(user_id): """ Returns the user's TOTP secret as bytes. @@ -120,6 +130,59 @@ def check_totp_value(user_id, totp_value, *, tags=None): Returns True if the given TOTP code is valid. """ + def add_webauthn(user_id, **kwargs): + """ + Adds a WebAuthn credential to the given user. + + Returns None if the user already has this credential. + """ + + def get_webauthn_credential_options( + user_id, *, challenge, rp_name, rp_id, icon_url + ): + """ + Returns a dictionary of credential options suitable for beginning the WebAuthn + provisioning process for the given user. + """ + + def get_webauthn_assertion_options(user_id, *, challenge, icon_url, rp_id): + """ + Returns a dictionary of assertion options suitable for beginning the WebAuthn + authentication process for the given user. + """ + + def verify_webauthn_credential(credential, *, challenge, rp_id, origin): + """ + Checks whether the given credential is valid, i.e. suitable for generating + assertions during authentication. + + Returns the validated credential on success, raises + webauthn.RegistrationRejectedException on failure. + """ + + def verify_webauthn_assertion( + user_id, assertion, *, challenge, origin, icon_url, rp_id + ): + """ + Checks whether the given assertion was produced by the given user's WebAuthn + device. + + Returns the updated signage count on success, raises + webauthn.AuthenticationRejectedException on failure. + """ + + def get_webauthn_by_label(user_id, label): + """ + Returns a WebAuthn credential for the given user by its label, + or None if no credential for the user has this label. + """ + + def get_webauthn_by_credential_id(user_id, credential_id): + """ + Returns a WebAuthn credential for the given user by its credential ID, + or None of the user doesn't have a credential with this ID. + """ + class ITokenService(Interface): def dumps(data): diff --git a/warehouse/accounts/models.py b/warehouse/accounts/models.py index 582f632104ea..54cfb592e664 100644 --- a/warehouse/accounts/models.py +++ b/warehouse/accounts/models.py @@ -80,8 +80,13 @@ class User(SitemapMixin, db.Model): Enum(DisableReason, values_callable=lambda x: [e.value for e in x]), nullable=True, ) + totp_secret = Column(Binary(length=20), nullable=True) + webauthn = orm.relationship( + "WebAuthn", backref="user", cascade="all, delete-orphan", lazy=False + ) + emails = orm.relationship( "Email", backref="user", cascade="all, delete-orphan", lazy=False ) @@ -107,15 +112,31 @@ def email(self): @property def has_two_factor(self): - # TODO: This is where user.u2f_provisioned et al. - # will also go. - return self.totp_secret is not None + return self.totp_secret is not None or len(self.webauthn) > 0 @property def two_factor_provisioning_allowed(self): return self.primary_email is not None and self.primary_email.verified +class WebAuthn(db.Model): + __tablename__ = "user_security_keys" + __table_args__ = ( + UniqueConstraint("label", name="user_security_keys_label_key"), + Index("user_security_keys_label_key", "user_id"), + ) + + user_id = Column( + UUID(as_uuid=True), + ForeignKey("users.id", deferrable=True, initially="DEFERRED"), + nullable=False, + ) + label = Column(String, nullable=False) + credential_id = Column(String, unique=True, nullable=False) + public_key = Column(String, unique=True, nullable=True) + sign_count = Column(Integer, default=0) + + class UnverifyReasons(enum.Enum): SpamComplaint = "spam complaint" diff --git a/warehouse/accounts/services.py b/warehouse/accounts/services.py index ff3ad6220ca7..d88b20c04428 100644 --- a/warehouse/accounts/services.py +++ b/warehouse/accounts/services.py @@ -26,6 +26,7 @@ from zope.interface import implementer import warehouse.utils.otp as otp +import warehouse.utils.webauthn as webauthn from warehouse.accounts.interfaces import ( IPasswordBreachedService, @@ -36,7 +37,7 @@ TokenMissing, TooManyFailedLogins, ) -from warehouse.accounts.models import Email, User +from warehouse.accounts.models import Email, User, WebAuthn from warehouse.metrics import IMetricsService from warehouse.rate_limiting import DummyRateLimiter, IRateLimiter from warehouse.utils.crypto import BadData, SignatureExpired, URLSafeTimedSerializer @@ -237,6 +238,22 @@ def has_two_factor(self, user_id): return user.has_two_factor + def has_totp(self, user_id): + """ + Returns True if the user has a TOTP device provisioned. + """ + user = self.get_user(user_id) + + return user.totp_secret is not None + + def has_webauthn(self, user_id): + """ + Returns True if the user has a security key provisioned. + """ + user = self.get_user(user_id) + + return len(user.webauthn) > 0 + def get_totp_secret(self, user_id): """ Returns the user's TOTP secret as bytes. @@ -300,6 +317,116 @@ def check_totp_value(self, user_id, totp_value, *, tags=None): return valid + def get_webauthn_credential_options( + self, user_id, *, challenge, rp_name, rp_id, icon_url + ): + """ + Returns a dictionary of credential options suitable for beginning the WebAuthn + provisioning process for the given user. + """ + user = self.get_user(user_id) + + return webauthn.get_credential_options( + user, challenge=challenge, rp_name=rp_name, rp_id=rp_id, icon_url=icon_url + ) + + def get_webauthn_assertion_options(self, user_id, *, challenge, icon_url, rp_id): + """ + Returns a dictionary of assertion options suitable for beginning the WebAuthn + authentication process for the given user. + """ + user = self.get_user(user_id) + + return webauthn.get_assertion_options( + user, challenge=challenge, icon_url=icon_url, rp_id=rp_id + ) + + def verify_webauthn_credential(self, credential, *, challenge, rp_id, origin): + """ + Checks whether the given credential is valid, i.e. suitable for generating + assertions during authentication. + + Returns the validated credential on success, raises + webauthn.RegistrationRejectedException on failure. + """ + validated_credential = webauthn.verify_registration_response( + credential, challenge=challenge, rp_id=rp_id, origin=origin + ) + + webauthn_cred = ( + self.db.query(WebAuthn) + .filter_by(credential_id=validated_credential.credential_id.decode()) + .first() + ) + + if webauthn_cred is not None: + raise webauthn.RegistrationRejectedException("Credential ID already in use") + + return validated_credential + + def verify_webauthn_assertion( + self, user_id, assertion, *, challenge, origin, icon_url, rp_id + ): + """ + Checks whether the given assertion was produced by the given user's WebAuthn + device. + + Returns the updated signage count on success, raises + webauthn.AuthenticationRejectedException on failure. + """ + user = self.get_user(user_id) + + return webauthn.verify_assertion_response( + assertion, + challenge=challenge, + user=user, + origin=origin, + icon_url=icon_url, + rp_id=rp_id, + ) + + def add_webauthn(self, user_id, **kwargs): + """ + Adds a WebAuthn credential to the given user. + + Returns None if the user already has this credential. + """ + user = self.get_user(user_id) + + webauthn = WebAuthn(user=user, **kwargs) + self.db.add(webauthn) + self.db.flush() # flush the db now so webauthn.id is available + + return webauthn + + def get_webauthn_by_label(self, user_id, label): + """ + Returns a WebAuthn credential for the given user by its label, + or None if no credential for the user has this label. + """ + user = self.get_user(user_id) + + return next( + (credential for credential in user.webauthn if credential.label == label), + None, + ) + + def get_webauthn_by_credential_id(self, user_id, credential_id): + """ + Returns a WebAuthn credential for the given user by its credential ID, + or None of the user doesn't have a credential with this ID. + """ + user = self.get_user(user_id) + + return next( + ( + credential + for credential in user.webauthn + if credential.credential_id == credential_id + ), + None, + ) + @implementer(ITokenService) class TokenService: diff --git a/warehouse/accounts/views.py b/warehouse/accounts/views.py index a85fcc8338b9..81bcddb27805 100644 --- a/warehouse/accounts/views.py +++ b/warehouse/accounts/views.py @@ -30,7 +30,8 @@ RegistrationForm, RequestPasswordResetForm, ResetPasswordForm, - TwoFactorForm, + TOTPAuthenticationForm, + WebAuthnAuthenticationForm, ) from warehouse.accounts.interfaces import ( IPasswordBreachedService, @@ -186,40 +187,35 @@ def login(request, redirect_field_name=REDIRECT_FIELD_NAME, _form_class=LoginFor require_csrf=True, require_methods=False, ) -def two_factor(request, _form_class=TwoFactorForm): +def two_factor_and_totp_validate(request, _form_class=TOTPAuthenticationForm): if request.authenticated_userid is not None: return HTTPSeeOther(request.route_path("manage.projects")) - token_service = request.find_service(ITokenService, name="two_factor") - try: - two_factor_data = token_service.loads(request.query_string) + two_factor_data = _get_two_factor_data(request) except TokenException: request.session.flash("Invalid or expired two factor login.", queue="error") return HTTPSeeOther(request.route_path("accounts.login")) userid = two_factor_data.get("userid") - if not userid: - return HTTPSeeOther(request.route_path("accounts.login")) - redirect_to = two_factor_data.get("redirect_to") user_service = request.find_service(IUserService, context=None) - form = _form_class( - request.POST, - user_id=userid, - user_service=user_service, - check_password_metrics_tags=["method:auth", "auth_method:login_form"], - ) + two_factor_state = {} + if user_service.has_totp(userid): + two_factor_state["totp_form"] = _form_class( + request.POST, + user_id=userid, + user_service=user_service, + check_password_metrics_tags=["method:auth", "auth_method:login_form"], + ) + if user_service.has_webauthn(userid): + two_factor_state["has_webauthn"] = True if request.method == "POST": + form = two_factor_state["totp_form"] if form.validate(): - # If the user-originating redirection url is not safe, then - # redirect to the index instead. - if not redirect_to or not is_safe_url(url=redirect_to, host=request.host): - redirect_to = request.route_path("manage.projects") - _login_user(request, userid) resp = HTTPSeeOther(redirect_to) @@ -234,7 +230,87 @@ def two_factor(request, _form_class=TwoFactorForm): else: form.totp_value.data = "" - return {"form": form} + return two_factor_state + + +@view_config( + uses_session=True, + request_method="GET", + route_name="accounts.webauthn-authenticate.options", + renderer="json", +) +def webauthn_authentication_options(request): + if request.authenticated_userid is not None: + return {"fail": {"errors": ["Already authenticated"]}} + + try: + two_factor_data = _get_two_factor_data(request) + except TokenException: + request.session.flash("Invalid or expired two factor login.", queue="error") + return {"fail": {"errors": ["Invalid two factor token"]}} + + userid = two_factor_data.get("userid") + user_service = request.find_service(IUserService, context=None) + return user_service.get_webauthn_assertion_options( + userid, + challenge=request.session.get_webauthn_challenge(), + icon_url=request.registry.settings.get("warehouse.domain", request.domain), + rp_id=request.domain, + ) + + +@view_config( + require_csrf=True, + require_methods=False, + uses_session=True, + request_method="POST", + request_param=WebAuthnAuthenticationForm.__params__, + route_name="accounts.webauthn-authenticate.validate", + renderer="json", +) +def webauthn_authentication_validate(request): + if request.authenticated_userid is not None: + return {"fail": {"errors": ["Already authenticated"]}} + + try: + two_factor_data = _get_two_factor_data(request) + except TokenException: + request.session.flash("Invalid or expired two factor login.", queue="error") + return {"fail": {"errors": ["Invalid two factor token"]}} + + redirect_to = two_factor_data.get("redirect_to") + userid = two_factor_data.get("userid") + + user_service = request.find_service(IUserService, context=None) + form = WebAuthnAuthenticationForm( + **request.POST, + user_id=userid, + user_service=user_service, + challenge=request.session.get_webauthn_challenge(), + origin=request.host_url, + icon_url=request.registry.settings.get("warehouse.domain", request.domain), + rp_id=request.domain, + ) + + request.session.clear_webauthn_challenge() + + if form.validate(): + credential_id, sign_count = form.validated_credential + webauthn = user_service.get_webauthn_by_credential_id(userid, credential_id) + webauthn.sign_count = sign_count + + _login_user(request, userid) + + request.response.set_cookie( + USER_ID_INSECURE_COOKIE, + hashlib.blake2b(str(userid).encode("ascii"), person=b"warehouse.userid") + .hexdigest() + .lower(), + ) + return {"success": "Successful WebAuthn assertion", "redirect_to": redirect_to} + + errors = [str(error) for error in form.credential.errors] + return {"fail": {"errors": errors}} @view_config( @@ -498,6 +574,22 @@ def _error(message): return HTTPSeeOther(request.route_path("manage.account")) +def _get_two_factor_data(request, _redirect_to="/"): + token_service = request.find_service(ITokenService, name="two_factor") + two_factor_data = token_service.loads(request.query_string) + + if two_factor_data.get("userid") is None: + raise TokenInvalid + + # If the user-originating redirection url is not safe, then + # redirect to the index instead. + redirect_to = two_factor_data.get("redirect_to") + if redirect_to is None or not is_safe_url(url=redirect_to, host=request.host): + two_factor_data["redirect_to"] = _redirect_to + + return two_factor_data + + def _login_user(request, userid): # We have a session factory associated with this request, so in order # to protect against session fixation attacks we're going to make sure diff --git a/warehouse/manage/forms.py b/warehouse/manage/forms.py index a5b56a9fd5a2..a00f26d06e87 100644 --- a/warehouse/manage/forms.py +++ b/warehouse/manage/forms.py @@ -10,9 +10,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + import wtforms import warehouse.utils.otp as otp +import warehouse.utils.webauthn as webauthn from warehouse import forms from warehouse.accounts.forms import ( @@ -20,6 +23,7 @@ NewPasswordMixin, PasswordMixin, TOTPValueMixin, + WebAuthnCredentialMixin, ) @@ -104,3 +108,78 @@ def validate_totp_value(self, field): totp_value = field.data.encode("utf8") if not otp.verify_totp(self.totp_secret, totp_value): raise wtforms.validators.ValidationError("Invalid TOTP code. Try again?") + + +class DeleteWebAuthnForm(forms.Form): + __params__ = ["confirm_key_name"] + + label = wtforms.StringField( + validators=[ + wtforms.validators.DataRequired(message="Specify a label"), + wtforms.validators.Length( + max=64, message=("Label must be 64 characters or less") + ), + ] + ) + + def __init__(self, *args, user_service, user_id, **kwargs): + super().__init__(*args, **kwargs) + self.user_service = user_service + self.user_id = user_id + + def validate_label(self, field): + label = field.data + + webauthn = self.user_service.get_webauthn_by_label(self.user_id, label) + if webauthn is None: + raise wtforms.validators.ValidationError("No WebAuthn key with given label") + self.webauthn = webauthn + + +class ProvisionWebAuthnForm(WebAuthnCredentialMixin, forms.Form): + __params__ = ["label", "credential"] + + label = wtforms.StringField( + validators=[ + wtforms.validators.DataRequired(message="Specify a label"), + wtforms.validators.Length( + max=64, message=("Label must be 64 characters or less") + ), + ] + ) + + def __init__( + self, *args, user_service, user_id, challenge, rp_id, origin, **kwargs + ): + super().__init__(*args, **kwargs) + self.user_service = user_service + self.user_id = user_id + self.challenge = challenge + self.rp_id = rp_id + self.origin = origin + + def validate_credential(self, field): + try: + credential_dict = json.loads(field.data.encode("utf8")) + except json.JSONDecodeError: + raise wtforms.validators.ValidationError( + "Invalid WebAuthn credential: Bad payload" + ) + + try: + validated_credential = self.user_service.verify_webauthn_credential( + credential_dict, + challenge=self.challenge, + rp_id=self.rp_id, + origin=self.origin, + ) + except webauthn.RegistrationRejectedException as e: + raise wtforms.validators.ValidationError(str(e)) + + self.validated_credential = validated_credential + + def validate_label(self, field): + label = field.data + + if self.user_service.get_webauthn_by_label(self.user_id, label) is not None: + raise wtforms.validators.ValidationError(f"Label '{label}' already in use") diff --git a/warehouse/manage/views.py b/warehouse/manage/views.py index 427989ff032c..3e144bace990 100644 --- a/warehouse/manage/views.py +++ b/warehouse/manage/views.py @@ -43,7 +43,9 @@ ChangeRoleForm, CreateRoleForm, DeleteTOTPForm, + DeleteWebAuthnForm, ProvisionTOTPForm, + ProvisionWebAuthnForm, SaveAccountForm, ) from warehouse.packaging.models import File, JournalEntry, Project, Release, Role @@ -421,6 +423,104 @@ def delete_totp(self): return HTTPSeeOther(self.request.route_path("manage.account")) +@view_defaults( + uses_session=True, + require_csrf=True, + require_methods=False, + permission="manage:user", + http_cache=0, +) +class ProvisionWebAuthnViews: + def __init__(self, request): + self.request = request + self.user_service = request.find_service(IUserService, context=None) + + @view_config( + request_method="GET", + route_name="manage.account.webauthn-provision", + renderer="manage/account/webauthn-provision.html", + ) + def webauthn_provision(self): + return {} + + @view_config( + request_method="GET", + route_name="manage.account.webauthn-provision.options", + renderer="json", + ) + def webauthn_provision_options(self): + return self.user_service.get_webauthn_credential_options( + self.request.user.id, + challenge=self.request.session.get_webauthn_challenge(), + rp_name=self.request.registry.settings["site.name"], + rp_id=self.request.domain, + icon_url=self.request.registry.settings.get( + "warehouse.domain", self.request.domain + ), + ) + + @view_config( + request_method="POST", + request_param=ProvisionWebAuthnForm.__params__, + route_name="manage.account.webauthn-provision.validate", + renderer="json", + ) + def validate_webauthn_provision(self): + form = ProvisionWebAuthnForm( + **self.request.POST, + user_service=self.user_service, + user_id=self.request.user.id, + challenge=self.request.session.get_webauthn_challenge(), + rp_id=self.request.domain, + origin=self.request.host_url, + ) + + self.request.session.clear_webauthn_challenge() + + if form.validate(): + self.user_service.add_webauthn( + self.request.user.id, + label=form.label.data, + credential_id=form.validated_credential.credential_id.decode(), + public_key=form.validated_credential.public_key.decode(), + sign_count=form.validated_credential.sign_count, + ) + self.request.session.flash( + "WebAuthn successfully provisioned.", queue="success" + ) + return {"success": "WebAuthn successfully provisioned"} + + errors = [ + str(error) for error_list in form.errors.values() for error in error_list + ] + return {"fail": {"errors": errors}} + + @view_config( + request_method="POST", + request_param=DeleteWebAuthnForm.__params__, + route_name="manage.account.webauthn-provision.delete", + ) + def delete_webauthn(self): + if len(self.request.user.webauthn) == 0: + self.request.session.flash("No WebAuthhn device to delete.", queue="error") + return HTTPSeeOther(self.request.route_path("manage.account")) + + form = DeleteWebAuthnForm( + **self.request.POST, + username=self.request.user.username, + user_service=self.user_service, + user_id=self.request.user.id, + ) + + if form.validate(): + self.request.user.webauthn.remove(form.webauthn) + self.request.session.flash("WebAuthn device deleted.", queue="success") + else: + self.request.session.flash("Invalid credentials.", queue="error") + + return HTTPSeeOther(self.request.route_path("manage.account")) + + @view_config( route_name="manage.projects", renderer="manage/projects.html", diff --git a/warehouse/migrations/versions/af7dca2bb2fe_webauthn_model.py b/warehouse/migrations/versions/af7dca2bb2fe_webauthn_model.py new file mode 100644 index 000000000000..7627fcbe9d95 --- /dev/null +++ b/warehouse/migrations/versions/af7dca2bb2fe_webauthn_model.py @@ -0,0 +1,52 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +webauthn model + +Revision ID: af7dca2bb2fe +Revises: e1b493d3b171 +Create Date: 2019-05-06 15:58:35.922060 +""" + +import sqlalchemy as sa + +from alembic import op +from sqlalchemy.dialects import postgresql + +revision = "af7dca2bb2fe" +down_revision = "e1b493d3b171" + + +def upgrade(): + op.create_table( + "user_security_keys", + sa.Column( + "id", + postgresql.UUID(as_uuid=True), + server_default=sa.text("gen_random_uuid()"), + nullable=False, + ), + sa.Column("user_id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("credential_id", sa.String, nullable=False), + sa.Column("public_key", sa.String, nullable=True), + sa.Column("sign_count", sa.Integer, nullable=True), + sa.ForeignKeyConstraint( + ["user_id"], ["users.id"], initially="DEFERRED", deferrable=True + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("credential_id"), + sa.UniqueConstraint("public_key"), + ) + + +def downgrade(): + op.drop_table("user_security_keys") diff --git a/warehouse/migrations/versions/cdb2915fda5c_add_webauthn_labels.py b/warehouse/migrations/versions/cdb2915fda5c_add_webauthn_labels.py new file mode 100644 index 000000000000..ed77c6dbb506 --- /dev/null +++ b/warehouse/migrations/versions/cdb2915fda5c_add_webauthn_labels.py @@ -0,0 +1,37 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +add webauthn labels + +Revision ID: cdb2915fda5c +Revises: af7dca2bb2fe +Create Date: 2019-06-08 16:31:41.681380 +""" + +import sqlalchemy as sa + +from alembic import op + +revision = "cdb2915fda5c" +down_revision = "af7dca2bb2fe" + + +def upgrade(): + op.add_column("user_security_keys", sa.Column("label", sa.String(), nullable=False)) + op.create_index( + "user_security_keys_label_key", "user_security_keys", ["user_id"], unique=False + ) + + +def downgrade(): + op.drop_index("user_security_keys_label_key", table_name="user_security_keys") + op.drop_column("user_security_keys", "label") diff --git a/warehouse/packaging/models.py b/warehouse/packaging/models.py index c20d9f43737e..64c893afaa4d 100644 --- a/warehouse/packaging/models.py +++ b/warehouse/packaging/models.py @@ -174,6 +174,7 @@ def __acl__(self): query = session.query(Role).filter(Role.project == self) query = query.options(orm.lazyload("project")) query = query.options(orm.joinedload("user").lazyload("emails")) + query = query.join(User).order_by(User.id.asc()) for role in sorted( query.all(), key=lambda x: ["Owner", "Maintainer"].index(x.role_name) ): diff --git a/warehouse/routes.py b/warehouse/routes.py index 676f7c612100..9f02f2403dd7 100644 --- a/warehouse/routes.py +++ b/warehouse/routes.py @@ -98,6 +98,16 @@ def includeme(config): ) config.add_route("accounts.login", "/account/login/", domain=warehouse) config.add_route("accounts.two-factor", "/account/two-factor/", domain=warehouse) + config.add_route( + "accounts.webauthn-authenticate.options", + "/accounts/webauthn-authenticate/options", + domain=warehouse, + ) + config.add_route( + "accounts.webauthn-authenticate.validate", + "/accounts/webauthn-authenticate/validate", + domain=warehouse, + ) config.add_route("accounts.logout", "/account/logout/", domain=warehouse) config.add_route("accounts.register", "/account/register/", domain=warehouse) config.add_route( @@ -124,6 +134,26 @@ def includeme(config): "/manage/account/totp-provision/image", domain=warehouse, ) + config.add_route( + "manage.account.webauthn-provision", + "/manage/account/webauthn-provision", + domain=warehouse, + ) + config.add_route( + "manage.account.webauthn-provision.options", + "/manage/account/webauthn-provision/options", + domain=warehouse, + ) + config.add_route( + "manage.account.webauthn-provision.validate", + "/manage/account/webauthn-provision/validate", + domain=warehouse, + ) + config.add_route( + "manage.account.webauthn-provision.delete", + "/manage/account/webauthn-provision/delete", + domain=warehouse, + ) config.add_route("manage.projects", "/manage/projects/", domain=warehouse) config.add_route( "manage.project.settings", diff --git a/warehouse/sessions.py b/warehouse/sessions.py index 070a16b95c97..4ca0c721a69c 100644 --- a/warehouse/sessions.py +++ b/warehouse/sessions.py @@ -22,6 +22,7 @@ from zope.interface import implementer import warehouse.utils.otp as otp +import warehouse.utils.webauthn as webauthn from warehouse.cache.http import add_vary from warehouse.utils import crypto @@ -84,6 +85,7 @@ class Session(dict): _csrf_token_key = "_csrf_token" _flash_key = "_flash_messages" _totp_secret_key = "_totp_secret" + _webauthn_challenge_key = "_webauthn_challenge" # A number of our methods need to be decorated so that they also call # self.changed() @@ -180,6 +182,16 @@ def get_totp_secret(self): def clear_totp_secret(self): self[self._totp_secret_key] = None + def get_webauthn_challenge(self): + webauthn_challenge = self.get(self._webauthn_challenge_key) + if webauthn_challenge is None: + self[self._webauthn_challenge_key] = webauthn.generate_webauthn_challenge() + webauthn_challenge = self[self._webauthn_challenge_key] + return webauthn_challenge + + def clear_webauthn_challenge(self): + self[self._webauthn_challenge_key] = None + @implementer(ISessionFactory) class SessionFactory: diff --git a/warehouse/static/js/warehouse/index.js b/warehouse/static/js/warehouse/index.js index 10ce8b00b027..f97d3008ee89 100644 --- a/warehouse/static/js/warehouse/index.js +++ b/warehouse/static/js/warehouse/index.js @@ -39,6 +39,7 @@ import searchFilterToggle from "warehouse/utils/search-filter-toggle"; import YouTubeIframeLoader from "youtube-iframe"; import RepositoryInfo from "warehouse/utils/repository-info"; import BindModalKeys from "warehouse/utils/bind-modal-keys"; +import {AuthenticateWebAuthn, ProvisionWebAuthn} from "warehouse/utils/webauthn"; // Do this before anything else, to potentially capture errors down the line docReady(() => { @@ -216,6 +217,12 @@ docReady(bindDropdowns); // Get modal keypress event listeners ready docReady(BindModalKeys); +// Get WebAuthn provisioning ready +docReady(ProvisionWebAuthn); + +// Get WebAuthn authentication ready +docReady(AuthenticateWebAuthn); + // Bind again when client-side includes have been loaded (for the logged-in // user dropdown) document.addEventListener("CSILoaded", bindDropdowns); diff --git a/warehouse/static/js/warehouse/utils/webauthn.js b/warehouse/static/js/warehouse/utils/webauthn.js new file mode 100644 index 000000000000..595773647bae --- /dev/null +++ b/warehouse/static/js/warehouse/utils/webauthn.js @@ -0,0 +1,217 @@ +/* Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +const populateWebAuthnErrorList = (errors) => { + const errorList = document.getElementById("webauthn-errors"); + if (errorList === null) { + return; + } + + errorList.innerHTML = ""; + + errors.forEach((error) => { + const errorItem = document.createElement("li"); + errorItem.appendChild(document.createTextNode(error)); + errorList.appendChild(errorItem); + }); +}; + +const doWebAuthn = (buttonId, func) => { + const webAuthnButton = document.getElementById(buttonId); + if (webAuthnButton === null) { + return null; + } + + const csrfToken = webAuthnButton.getAttribute("csrf-token"); + if (csrfToken === null) { + return; + } + + if (!window.PublicKeyCredential) { + populateWebAuthnErrorList(["Your browser doesn't support WebAuthn."]); + return; + } + + webAuthnButton.disabled = false; + webAuthnButton.addEventListener("click", async () => { func(csrfToken); }); +}; + +const hexEncode = (buf) => { + return Array.from(buf).map((x) => { + return ("0" + x.toString(16)).substr(-2); + }).join(""); +}; + +const webAuthnBtoA = (encoded) => { + return btoa(encoded).replace(/\+/g, "-").replace(/\//g, "_").replace(/=/g, ""); +}; + +const webAuthnBase64Normalize = (encoded) => { + return encoded.replace(/_/g, "/").replace(/-/g, "+"); +}; + +const transformAssertionOptions = (assertionOptions) => { + let {challenge, allowCredentials} = assertionOptions; + + challenge = Uint8Array.from(challenge, c => c.charCodeAt(0)); + allowCredentials = allowCredentials.map(credentialDescriptor => { + let {id} = credentialDescriptor; + id = webAuthnBase64Normalize(id); + id = Uint8Array.from(atob(id), c => c.charCodeAt(0)); + return Object.assign({}, credentialDescriptor, {id}); + }); + + const transformedOptions = Object.assign( + {}, + assertionOptions, + {challenge, allowCredentials}); + + return transformedOptions; +}; + +const transformAssertion = (assertion) => { + const authData = new Uint8Array(assertion.response.authenticatorData); + const clientDataJSON = new Uint8Array(assertion.response.clientDataJSON); + const rawId = new Uint8Array(assertion.rawId); + const sig = new Uint8Array(assertion.response.signature); + const assertionClientExtensions = assertion.getClientExtensionResults(); + + return { + id: assertion.id, + rawId: webAuthnBtoA(rawId), + type: assertion.type, + authData: webAuthnBtoA(String.fromCharCode(...authData)), + clientData: webAuthnBtoA(String.fromCharCode(...clientDataJSON)), + signature: hexEncode(sig), + assertionClientExtensions: JSON.stringify(assertionClientExtensions), + }; +}; + +const transformCredentialOptions = (credentialOptions) => { + let {challenge, user} = credentialOptions; + user.id = Uint8Array.from(credentialOptions.user.id, c => c.charCodeAt(0)); + challenge = Uint8Array.from(credentialOptions.challenge, c => c.charCodeAt(0)); + + const transformedOptions = Object.assign({}, credentialOptions, {challenge, user}); + + return transformedOptions; +}; + +const transformCredential = (credential) => { + const attObj = new Uint8Array(credential.response.attestationObject); + const clientDataJSON = new Uint8Array(credential.response.clientDataJSON); + const rawId = new Uint8Array(credential.rawId); + const registrationClientExtensions = credential.getClientExtensionResults(); + + return { + id: credential.id, + rawId: webAuthnBtoA(rawId), + type: credential.type, + attObj: webAuthnBtoA(String.fromCharCode(...attObj)), + clientData: webAuthnBtoA(String.fromCharCode(...clientDataJSON)), + registrationClientExtensions: JSON.stringify(registrationClientExtensions), + }; +}; + +const postCredential = async (label, credential, token) => { + const formData = new FormData(); + formData.set("label", label); + formData.set("credential", JSON.stringify(credential)); + formData.set("csrf_token", token); + + const resp = await fetch( + "/manage/account/webauthn-provision/validate", { + method: "POST", + cache: "no-cache", + body: formData, + } + ); + + return await resp.json(); +}; + +const postAssertion = async (assertion, token) => { + const formData = new FormData(); + formData.set("credential", JSON.stringify(assertion)); + formData.set("csrf_token", token); + + const resp = await fetch( + "/accounts/webauthn-authenticate/validate" + window.location.search, { + method: "POST", + cache: "no-cache", + body: formData, + } + ); + + return await resp.json(); +}; + +export const ProvisionWebAuthn = () => { + doWebAuthn("webauthn-provision-begin", async (csrfToken) => { + const label = document.getElementById("webauthn-provision-label").value; + + // TODO(ww): Should probably find a way to use the route string here, + // not the actual endpoint. + const resp = await fetch( + "/manage/account/webauthn-provision/options", { + cache: "no-cache", + } + ); + + const credentialOptions = await resp.json(); + const transformedOptions = transformCredentialOptions(credentialOptions); + const credential = await navigator.credentials.create({ + publicKey: transformedOptions, + }); + const transformedCredential = transformCredential(credential); + + const status = await postCredential(label, transformedCredential, csrfToken); + if (status.fail) { + populateWebAuthnErrorList(status.fail.errors); + return; + } + + window.location.replace("/manage/account"); + }); +}; + +export const AuthenticateWebAuthn = () => { + doWebAuthn("webauthn-auth-begin", async (csrfToken) => { + const resp = await fetch( + "/accounts/webauthn-authenticate/options" + window.location.search, { + cache: "no-cache", + } + ); + + const assertionOptions = await resp.json(); + if (assertionOptions.fail) { + window.location.replace("/accounts/login"); + return; + } + + const transformedOptions = transformAssertionOptions(assertionOptions); + const assertion = await navigator.credentials.get({ + publicKey: transformedOptions, + }); + const transformedAssertion = transformAssertion(assertion); + + const status = await postAssertion(transformedAssertion, csrfToken); + if (status.fail) { + populateWebAuthnErrorList(status.fail.errors); + return; + } + + window.location.replace(status.redirect_to); + }); +}; diff --git a/warehouse/static/sass/blocks/_twofa-login.scss b/warehouse/static/sass/blocks/_twofa-login.scss new file mode 100644 index 000000000000..ce8fcbadecb8 --- /dev/null +++ b/warehouse/static/sass/blocks/_twofa-login.scss @@ -0,0 +1,57 @@ +/*! + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* +