diff --git a/requirements/main.in b/requirements/main.in index 1f1373b8a432..4f4510e55e3d 100644 --- a/requirements/main.in +++ b/requirements/main.in @@ -51,6 +51,7 @@ stdlib-list structlog transaction typeguard +webauthn whitenoise WTForms>=2.0.0 zope.sqlalchemy diff --git a/requirements/main.txt b/requirements/main.txt index ea8e016a6c84..e26050bc41f8 100644 --- a/requirements/main.txt +++ b/requirements/main.txt @@ -68,6 +68,9 @@ botocore==1.12.161 \ cachetools==3.1.1 \ --hash=sha256:428266a1c0d36dc5aca63a2d7c5942e88c2c898d72139fca0e97fdd2380517ae \ --hash=sha256:8ea2d3ce97850f31e4a08b0e2b5e6c34997d7216a9d2c98e0f3978630d4da69a +cbor2==4.1.2 \ + --hash=sha256:17b615da69964f87e48c5adb34ba63db3068f65b9cd14a7b099503d9f8a0e9ae \ + --hash=sha256:6391fd3d2a4e976ecf892638a0a2a88d85e6764124bf9f128a945bfefefe77dc celery-redbeat==0.13.0 \ --hash=sha256:8ba060f5fffa96fc6dbb0e37a10506cada03333f34ba502b134cbbdb08891266 celery[sqs]==4.3.0 \ @@ -185,6 +188,8 @@ elasticsearch==6.4.0 \ first==2.0.2 \ --hash=sha256:8d8e46e115ea8ac652c76123c0865e3ff18372aef6f03c22809ceefcea9dec86 \ --hash=sha256:ff285b08c55f8c97ce4ea7012743af2495c9f1291785f163722bd36f6af6d3bf +future==0.17.1 \ + --hash=sha256:67045236dcfd6816dc439556d009594abf643e5eb48992e36beac09c2ca659b8 google-api-core==1.11.1 \ --hash=sha256:18d58e87ce51046ad76961ba320903657182622e3e368e502381b11f63015c66 \ --hash=sha256:a1312fb2555516cfb55908e5d28762838315c21f7ce43d243ab3d11f215e2238 @@ -434,6 +439,9 @@ pycurl==7.43.0.2 \ pygments==2.4.2 \ --hash=sha256:71e430bc85c88a430f000ac1d9b331d2407f681d6f6aec95e8bcfbc3df5b0127 \ --hash=sha256:881c4c157e45f30af185c1ffe8d549d48ac9127433f2c380c24b84572ad66297 +pyOpenSSL==19.0.0 \ + --hash=sha256:aeca66338f6de19d1aa46ed634c3b9ae519a64b458f8468aec688e7e3c20f200 \ + --hash=sha256:c727930ad54b10fc157015014b666f2d8b41f70c0d03e83ab67624fd3dd5d1e6 pyparsing==2.4.0 \ --hash=sha256:1873c03321fc118f4e9746baf201ff990ceb915f433f23b395f5580d1840cb2a \ --hash=sha256:9b6323ef4ab914af344ba97510e966d64ba91055d6b9afa6b30799340e89cc03 @@ -532,6 +540,8 @@ venusian==1.2.0 \ vine==1.3.0 \ --hash=sha256:133ee6d7a9016f177ddeaf191c1f58421a1dcc6ee9a42c58b34bed40e1d2cd87 \ --hash=sha256:ea4947cc56d1fd6f2095c8d543ee25dad966f78692528e68b4fada11ba3f98af +webauthn==0.4.5 \ + --hash=sha256:db0bfc8b74896e209bfb290815c0a563f785ec5be321e1db17e9ac627ec3562f webencodings==0.5.1 \ --hash=sha256:a0af1213f3c2226497a97e2b3aa01a7e4bee4f403f95be16fc9acd2947514a78 \ --hash=sha256:b36a1c245f2d304965eb4e0a82848379241dc04b865afcc4aab16748587e1923 diff --git a/tests/unit/accounts/test_forms.py b/tests/unit/accounts/test_forms.py index fa1c8f4d5661..0a804088f14c 100644 --- a/tests/unit/accounts/test_forms.py +++ b/tests/unit/accounts/test_forms.py @@ -10,6 +10,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + import pretend import pytest import wtforms @@ -17,6 +19,7 @@ from warehouse.accounts import forms from warehouse.accounts.interfaces import TooManyFailedLogins from warehouse.accounts.models import DisableReason +from warehouse.utils.webauthn import AuthenticationRejectedException class TestLoginForm: @@ -567,22 +570,22 @@ def test_password_breached(self): ) -class TestTwoFactorForm: +class TestTOTPAuthenticationForm: def test_creation(self): user_id = pretend.stub() user_service = pretend.stub() - form = forms.TwoFactorForm(user_id=user_id, user_service=user_service) + form = forms.TOTPAuthenticationForm(user_id=user_id, user_service=user_service) assert form.user_service is user_service def test_totp_secret_exists(self): - form = forms.TwoFactorForm( + form = forms.TOTPAuthenticationForm( data={"totp_value": ""}, user_id=pretend.stub(), user_service=pretend.stub() ) assert not form.validate() assert form.totp_value.errors.pop() == "This field is required." - form = forms.TwoFactorForm( + form = forms.TOTPAuthenticationForm( data={"totp_value": "not_a_real_value"}, user_id=pretend.stub(), user_service=pretend.stub(check_totp_value=lambda *a: True), @@ -590,7 +593,7 @@ def test_totp_secret_exists(self): assert not form.validate() assert form.totp_value.errors.pop() == "TOTP code must be 6 digits." - form = forms.TwoFactorForm( + form = forms.TOTPAuthenticationForm( data={"totp_value": "123456"}, user_id=pretend.stub(), user_service=pretend.stub(check_totp_value=lambda *a: False), @@ -598,9 +601,77 @@ def test_totp_secret_exists(self): assert not form.validate() assert form.totp_value.errors.pop() == "Invalid TOTP code." - form = forms.TwoFactorForm( + form = forms.TOTPAuthenticationForm( data={"totp_value": "123456"}, user_id=pretend.stub(), user_service=pretend.stub(check_totp_value=lambda *a: True), ) assert form.validate() + + +class TestWebAuthnAuthenticationForm: + def test_creation(self): + user_id = pretend.stub() + user_service = pretend.stub() + challenge = pretend.stub() + origin = pretend.stub() + icon_url = pretend.stub() + rp_id = pretend.stub() + + form = forms.WebAuthnAuthenticationForm( + user_id=user_id, + user_service=user_service, + challenge=challenge, + origin=origin, + icon_url=icon_url, + rp_id=rp_id, + ) + + assert form.challenge is challenge + + def test_credential_bad_payload(self): + form = forms.WebAuthnAuthenticationForm( + credential="not valid json", + user_id=pretend.stub(), + user_service=pretend.stub(), + challenge=pretend.stub(), + origin=pretend.stub(), + icon_url=pretend.stub(), + rp_id=pretend.stub(), + ) + assert not form.validate() + assert form.credential.errors.pop() == "Invalid WebAuthn assertion: Bad payload" + + def test_credential_invalid(self): + form = forms.WebAuthnAuthenticationForm( + credential=json.dumps({}), + user_id=pretend.stub(), + user_service=pretend.stub( + verify_webauthn_assertion=pretend.raiser( + AuthenticationRejectedException("foo") + ) + ), + challenge=pretend.stub(), + origin=pretend.stub(), + icon_url=pretend.stub(), + rp_id=pretend.stub(), + ) + assert not form.validate() + assert form.credential.errors.pop() == "foo" + + def test_credential_valid(self): + form = forms.WebAuthnAuthenticationForm( + credential=json.dumps({}), + user_id=pretend.stub(), + user_service=pretend.stub( + verify_webauthn_assertion=pretend.call_recorder( + lambda *a, **kw: ("foo", 123456) + ) + ), + challenge=pretend.stub(), + origin=pretend.stub(), + icon_url=pretend.stub(), + rp_id=pretend.stub(), + ) + assert form.validate() + assert form.validated_credential == ("foo", 123456) diff --git a/tests/unit/accounts/test_services.py b/tests/unit/accounts/test_services.py index b6a940af832b..10155937d0f6 100644 --- a/tests/unit/accounts/test_services.py +++ b/tests/unit/accounts/test_services.py @@ -22,6 +22,7 @@ from zope.interface.verify import verifyClass import warehouse.utils.otp as otp +import warehouse.utils.webauthn as webauthn from warehouse.accounts import services from warehouse.accounts.interfaces import ( @@ -351,6 +352,24 @@ def test_has_two_factor(self, user_service): user_service.update_user(user.id, totp_secret=b"foobar") assert user_service.has_two_factor(user.id) + def test_has_totp(self, user_service): + user = UserFactory.create() + assert not user_service.has_totp(user.id) + user_service.update_user(user.id, totp_secret=b"foobar") + assert user_service.has_totp(user.id) + + def test_has_webauthn(self, user_service): + user = UserFactory.create() + assert not user_service.has_webauthn(user.id) + user_service.add_webauthn( + user.id, + label="test_label", + credential_id="foo", + public_key="bar", + sign_count=1, + ) + assert user_service.has_webauthn(user.id) + @pytest.mark.parametrize("valid", [True, False]) def test_check_totp_value(self, user_service, monkeypatch, valid): verify_totp = pretend.call_recorder(lambda *a: valid) @@ -406,6 +425,184 @@ def test_check_totp_value_user_rate_limited(self, user_service, metrics): ), ] + @pytest.mark.parametrize( + ("challenge", "rp_name", "rp_id", "icon_url"), + ( + ["fake_challenge", "fake_rp_name", "fake_rp_id", "fake_icon_url"], + [None, None, None, None], + ), + ) + def test_get_webauthn_credential_options( + self, user_service, challenge, rp_name, rp_id, icon_url + ): + user = UserFactory.create() + options = user_service.get_webauthn_credential_options( + user.id, + challenge=challenge, + rp_name=rp_name, + rp_id=rp_id, + icon_url=icon_url, + ) + + assert options["user"]["id"] == str(user.id) + assert options["user"]["name"] == user.username + assert options["user"]["displayName"] == user.name + assert options["challenge"] == challenge + assert options["rp"]["name"] == rp_name + assert options["rp"]["id"] == rp_id + + if icon_url: + assert options["user"]["icon"] == icon_url + else: + assert "icon" not in options["user"] + + def test_get_webauthn_assertion_options(self, user_service): + user = UserFactory.create() + user_service.add_webauthn( + user.id, + label="test_label", + credential_id="foo", + public_key="bar", + sign_count=1, + ) + + options = user_service.get_webauthn_assertion_options( + user.id, + challenge="fake_challenge", + icon_url="fake_icon_url", + rp_id="fake_rp_id", + ) + + assert options["challenge"] == "fake_challenge" + assert options["rpId"] == "fake_rp_id" + assert options["allowCredentials"][0]["id"] == user.webauthn[0].credential_id + + def test_verify_webauthn_credential(self, user_service, monkeypatch): + user = UserFactory.create() + user_service.add_webauthn( + user.id, + label="test_label", + credential_id="foo", + public_key="bar", + sign_count=1, + ) + + fake_validated_credential = pretend.stub(credential_id=b"bar") + verify_registration_response = pretend.call_recorder( + lambda *a, **kw: fake_validated_credential + ) + monkeypatch.setattr( + webauthn, "verify_registration_response", verify_registration_response + ) + + validated_credential = user_service.verify_webauthn_credential( + pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + assert validated_credential is fake_validated_credential + + def test_verify_webauthn_credential_already_in_use(self, user_service, monkeypatch): + user = UserFactory.create() + user_service.add_webauthn( + user.id, + label="test_label", + credential_id="foo", + public_key="bar", + sign_count=1, + ) + + fake_validated_credential = pretend.stub(credential_id=b"foo") + verify_registration_response = pretend.call_recorder( + lambda *a, **kw: fake_validated_credential + ) + monkeypatch.setattr( + webauthn, "verify_registration_response", verify_registration_response + ) + + with pytest.raises(webauthn.RegistrationRejectedException): + user_service.verify_webauthn_credential( + pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + def test_verify_webauthn_assertion(self, user_service, monkeypatch): + user = UserFactory.create() + user_service.add_webauthn( + user.id, + label="test_label", + credential_id="foo", + public_key="bar", + sign_count=1, + ) + + verify_assertion_response = pretend.call_recorder(lambda *a, **kw: 2) + monkeypatch.setattr( + webauthn, "verify_assertion_response", verify_assertion_response + ) + + updated_sign_count = user_service.verify_webauthn_assertion( + user.id, + pretend.stub(), + challenge=pretend.stub(), + origin=pretend.stub(), + icon_url=pretend.stub(), + rp_id=pretend.stub(), + ) + assert updated_sign_count == 2 + + def test_get_webauthn_by_label(self, user_service): + user = UserFactory.create() + user_service.add_webauthn( + user.id, + label="test_label", + credential_id="foo", + public_key="bar", + sign_count=1, + ) + + webauthn = user_service.get_webauthn_by_label(user.id, "test_label") + assert webauthn is not None + assert webauthn.label == "test_label" + + webauthn = user_service.get_webauthn_by_label(user.id, "not_a_real_label") + assert webauthn is None + + other_user = UserFactory.create() + webauthn = user_service.get_webauthn_by_label(other_user.id, "test_label") + assert webauthn is None + + def test_get_webauthn_by_credential_id(self, user_service): + user = UserFactory.create() + user_service.add_webauthn( + user.id, + label="foo", + credential_id="test_credential_id", + public_key="bar", + sign_count=1, + ) + + webauthn = user_service.get_webauthn_by_credential_id( + user.id, "test_credential_id" + ) + assert webauthn is not None + assert webauthn.credential_id == "test_credential_id" + + webauthn = user_service.get_webauthn_by_credential_id( + user.id, "not_a_real_label" + ) + assert webauthn is None + + other_user = UserFactory.create() + webauthn = user_service.get_webauthn_by_credential_id( + other_user.id, "test_credential_id" + ) + assert webauthn is None + class TestTokenService: def test_verify_service(self): diff --git a/tests/unit/accounts/test_views.py b/tests/unit/accounts/test_views.py index 853cc892e336..482b92bbc13b 100644 --- a/tests/unit/accounts/test_views.py +++ b/tests/unit/accounts/test_views.py @@ -316,7 +316,7 @@ def test_two_factor_auth(self, pyramid_request, redirect_url, token_service): class TestTwoFactor: @pytest.mark.parametrize("redirect_url", [None, "/foo/bar/", "/wat/"]) - def test_get_returns_form(self, pyramid_request, redirect_url): + def test_get_returns_totp_form(self, pyramid_request, redirect_url): query_params = {"userid": 1} if redirect_url: query_params["redirect_to"] = redirect_url @@ -328,6 +328,8 @@ def test_get_returns_form(self, pyramid_request, redirect_url): user_service = pretend.stub( find_userid=pretend.call_recorder(lambda username: 1), update_user=lambda *a, **k: None, + has_totp=lambda uid: True, + has_webauthn=lambda uid: False, ) pyramid_request.find_service = lambda interface, **kwargs: { @@ -340,10 +342,12 @@ def test_get_returns_form(self, pyramid_request, redirect_url): form_obj = pretend.stub() form_class = pretend.call_recorder(lambda d, user_service, **kw: form_obj) - result = views.two_factor(pyramid_request, _form_class=form_class) + result = views.two_factor_and_totp_validate( + pyramid_request, _form_class=form_class + ) assert token_service.loads.calls == [pretend.call(pyramid_request.query_string)] - assert result == {"form": form_obj} + assert result == {"totp_form": form_obj} assert form_class.calls == [ pretend.call( pyramid_request.POST, @@ -353,8 +357,38 @@ def test_get_returns_form(self, pyramid_request, redirect_url): ) ] + @pytest.mark.parametrize("redirect_url", [None, "/foo/bar/", "/wat/"]) + def test_get_returns_webauthn(self, pyramid_request, redirect_url): + query_params = {"userid": 1} + if redirect_url: + query_params["redirect_to"] = redirect_url + + token_service = pretend.stub( + loads=pretend.call_recorder(lambda s: query_params) + ) + + user_service = pretend.stub( + find_userid=pretend.call_recorder(lambda username: 1), + update_user=lambda *a, **k: None, + has_totp=lambda uid: False, + has_webauthn=lambda uid: True, + ) + + pyramid_request.find_service = lambda interface, **kwargs: { + ITokenService: token_service, + IUserService: user_service, + }[interface] + + pyramid_request.query_string = pretend.stub() + result = views.two_factor_and_totp_validate( + pyramid_request, _form_class=pretend.stub() + ) + + assert token_service.loads.calls == [pretend.call(pyramid_request.query_string)] + assert result == {"has_webauthn": True} + @pytest.mark.parametrize("redirect_url", ["test_redirect_url", None]) - def test_two_factor_auth(self, monkeypatch, pyramid_request, redirect_url): + def test_totp_auth(self, monkeypatch, pyramid_request, redirect_url): remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")]) monkeypatch.setattr(views, "remember", remember) @@ -369,6 +403,8 @@ def test_two_factor_auth(self, monkeypatch, pyramid_request, redirect_url): user_service = pretend.stub( find_userid=pretend.call_recorder(lambda username: 1), update_user=lambda *a, **k: None, + has_totp=lambda userid: True, + has_webauthn=lambda userid: False, check_totp_value=lambda userid, totp_value: True, ) @@ -402,7 +438,9 @@ def test_two_factor_auth(self, monkeypatch, pyramid_request, redirect_url): pyramid_request.params = pretend.stub( get=pretend.call_recorder(lambda k: query_params.get(k)) ) - result = views.two_factor(pyramid_request, _form_class=form_class) + result = views.two_factor_and_totp_validate( + pyramid_request, _form_class=form_class + ) token_expected_data = {"userid": str(1)} if redirect_url: @@ -415,7 +453,7 @@ def test_two_factor_auth(self, monkeypatch, pyramid_request, redirect_url): assert pyramid_request.session.new_csrf_token.calls == [pretend.call()] @pytest.mark.parametrize("redirect_url", ["test_redirect_url", None]) - def test_two_factor_auth_invalid(self, pyramid_request, redirect_url): + def test_totp_auth_invalid(self, pyramid_request, redirect_url): query_params = {"userid": str(1)} if redirect_url: query_params["redirect_to"] = redirect_url @@ -427,6 +465,8 @@ def test_two_factor_auth_invalid(self, pyramid_request, redirect_url): user_service = pretend.stub( find_userid=pretend.call_recorder(lambda username: 1), update_user=lambda *a, **k: None, + has_totp=lambda userid: True, + has_webauthn=lambda userid: False, check_totp_value=lambda userid, totp_value: False, ) @@ -448,7 +488,9 @@ def test_two_factor_auth_invalid(self, pyramid_request, redirect_url): pyramid_request.params = pretend.stub( get=pretend.call_recorder(lambda k: query_params.get(k)) ) - result = views.two_factor(pyramid_request, _form_class=form_class) + result = views.two_factor_and_totp_validate( + pyramid_request, _form_class=form_class + ) token_expected_data = {"userid": str(1)} if redirect_url: @@ -456,43 +498,28 @@ def test_two_factor_auth_invalid(self, pyramid_request, redirect_url): assert isinstance(result, HTTPSeeOther) - def test_two_factor_auth_already_authed(self): + def test_totp_auth_already_authed(self): request = pretend.stub( authenticated_userid="not_none", route_path=pretend.call_recorder(lambda p: "redirect_to"), ) - result = views.two_factor(request) + result = views.two_factor_and_totp_validate(request) assert request.route_path.calls == [pretend.call("manage.projects")] assert isinstance(result, HTTPSeeOther) assert result.headers["Location"] == "redirect_to" - def test_two_factor_missing_userid(self): - token_service = pretend.stub(loads=pretend.call_recorder(lambda s: {})) - - request = pretend.stub( - session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), - authenticated_userid=None, - route_path=pretend.call_recorder(lambda p: "redirect_to"), - find_service=lambda interface, **kwargs: {ITokenService: token_service}[ - interface - ], - query_string=pretend.stub(), - ) - result = views.two_factor(request) - - assert token_service.loads.calls == [pretend.call(request.query_string)] - assert request.route_path.calls == [pretend.call("accounts.login")] - assert request.session.flash.calls == [] - - assert isinstance(result, HTTPSeeOther) - assert result.headers["Location"] == "redirect_to" - - def test_two_factor_auth_form_invalid(self): + def test_totp_form_invalid(self): token_data = {"userid": 1} token_service = pretend.stub(loads=pretend.call_recorder(lambda s: token_data)) + user_service = pretend.stub( + has_totp=lambda userid: True, + has_webauthn=lambda userid: False, + check_totp_value=lambda userid, totp_value: False, + ) + request = pretend.stub( POST={}, method="POST", @@ -501,7 +528,7 @@ def test_two_factor_auth_form_invalid(self): route_path=pretend.call_recorder(lambda p: "redirect_to"), find_service=lambda interface, **kwargs: { ITokenService: token_service, - IUserService: pretend.stub(), + IUserService: user_service, }[interface], query_string=pretend.stub(), ) @@ -512,12 +539,35 @@ def test_two_factor_auth_form_invalid(self): ) form_class = pretend.call_recorder(lambda *a, **kw: form_obj) - result = views.two_factor(request, _form_class=form_class) + result = views.two_factor_and_totp_validate(request, _form_class=form_class) + + assert token_service.loads.calls == [pretend.call(request.query_string)] + assert result == {"totp_form": form_obj} + + def test_two_factor_token_missing_userid(self): + token_service = pretend.stub(loads=pretend.call_recorder(lambda s: {})) + + request = pretend.stub( + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), + authenticated_userid=None, + route_path=pretend.call_recorder(lambda p: "redirect_to"), + find_service=lambda interface, **kwargs: {ITokenService: token_service}[ + interface + ], + query_string=pretend.stub(), + ) + result = views.two_factor_and_totp_validate(request) assert token_service.loads.calls == [pretend.call(request.query_string)] - assert result == {"form": form_obj} + assert request.route_path.calls == [pretend.call("accounts.login")] + assert request.session.flash.calls == [ + pretend.call("Invalid or expired two factor login.", queue="error") + ] - def test_two_factor_auth_token_invalid(self): + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "redirect_to" + + def test_two_factor_token_invalid(self): token_service = pretend.stub(loads=pretend.raiser(TokenException)) request = pretend.stub( session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), @@ -529,7 +579,7 @@ def test_two_factor_auth_token_invalid(self): query_string=pretend.stub(), ) - result = views.two_factor(request) + result = views.two_factor_and_totp_validate(request) assert isinstance(result, HTTPSeeOther) assert result.headers["Location"] == "redirect_to" @@ -538,6 +588,166 @@ def test_two_factor_auth_token_invalid(self): ] +class TestWebAuthn: + def test_webauthn_get_options_already_authenticated(self): + request = pretend.stub(authenticated_userid=pretend.stub()) + result = views.webauthn_authentication_options(request) + + assert result == {"fail": {"errors": ["Already authenticated"]}} + + def test_webauthn_get_options_invalid_token(self, monkeypatch): + _get_two_factor_data = pretend.raiser(TokenException) + monkeypatch.setattr(views, "_get_two_factor_data", _get_two_factor_data) + + request = pretend.stub( + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), + authenticated_userid=None, + ) + result = views.webauthn_authentication_options(request) + + assert request.session.flash.calls == [ + pretend.call("Invalid or expired two factor login.", queue="error") + ] + assert result == {"fail": {"errors": ["Invalid two factor token"]}} + + def test_webauthn_get_options(self, monkeypatch): + _get_two_factor_data = pretend.call_recorder( + lambda r: {"redirect_to": "foobar", "userid": 1} + ) + monkeypatch.setattr(views, "_get_two_factor_data", _get_two_factor_data) + + user_service = pretend.stub( + get_webauthn_assertion_options=lambda *a, **kw: {"not": "real"} + ) + + request = pretend.stub( + session=pretend.stub( + flash=pretend.call_recorder(lambda *a, **kw: None), + get_webauthn_challenge=pretend.call_recorder(lambda: "not_real"), + ), + registry=pretend.stub(settings=pretend.stub(get=lambda *a: pretend.stub())), + domain=pretend.stub(), + authenticated_userid=None, + find_service=lambda interface, **kwargs: user_service, + ) + + result = views.webauthn_authentication_options(request) + + assert _get_two_factor_data.calls == [pretend.call(request)] + assert result == {"not": "real"} + + def test_webauthn_validate_already_authenticated(self): + request = pretend.stub(authenticated_userid=pretend.stub()) + result = views.webauthn_authentication_validate(request) + + assert result == {"fail": {"errors": ["Already authenticated"]}} + + def test_webauthn_validate_invalid_token(self, monkeypatch): + _get_two_factor_data = pretend.raiser(TokenException) + monkeypatch.setattr(views, "_get_two_factor_data", _get_two_factor_data) + + request = pretend.stub( + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), + authenticated_userid=None, + ) + result = views.webauthn_authentication_validate(request) + + assert request.session.flash.calls == [ + pretend.call("Invalid or expired two factor login.", queue="error") + ] + assert result == {"fail": {"errors": ["Invalid two factor token"]}} + + def test_webauthn_validate_invalid_form(self, monkeypatch): + _get_two_factor_data = pretend.call_recorder( + lambda r: {"redirect_to": "foobar", "userid": 1} + ) + monkeypatch.setattr(views, "_get_two_factor_data", _get_two_factor_data) + + request = pretend.stub( + authenticated_userid=None, + POST={}, + session=pretend.stub( + get_webauthn_challenge=pretend.call_recorder(lambda: "not_real"), + clear_webauthn_challenge=pretend.call_recorder(lambda: pretend.stub()), + ), + find_service=lambda *a, **kw: pretend.stub(), + host_url=pretend.stub(), + registry=pretend.stub(settings=pretend.stub(get=lambda *a: pretend.stub())), + rp_id=pretend.stub(), + domain=pretend.stub(), + ) + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: False), + credential=pretend.stub(errors=["Fake validation failure"]), + ) + form_class = pretend.call_recorder(lambda *a, **kw: form_obj) + monkeypatch.setattr(views, "WebAuthnAuthenticationForm", form_class) + + result = views.webauthn_authentication_validate(request) + + assert _get_two_factor_data.calls == [pretend.call(request)] + assert request.session.get_webauthn_challenge.calls == [pretend.call()] + assert request.session.clear_webauthn_challenge.calls == [pretend.call()] + + assert result == {"fail": {"errors": ["Fake validation failure"]}} + + def test_webauthn_validate(self, monkeypatch): + _get_two_factor_data = pretend.call_recorder( + lambda r: {"redirect_to": "foobar", "userid": 1} + ) + monkeypatch.setattr(views, "_get_two_factor_data", _get_two_factor_data) + + _login_user = pretend.call_recorder(lambda req, uid: pretend.stub()) + monkeypatch.setattr(views, "_login_user", _login_user) + + user = pretend.stub(webauthn=pretend.stub(sign_count=pretend.stub())) + + user_service = pretend.stub( + get_user=pretend.call_recorder(lambda uid: user), + get_webauthn_by_credential_id=pretend.call_recorder( + lambda *a: pretend.stub() + ), + ) + + request = pretend.stub( + authenticated_userid=None, + POST={}, + session=pretend.stub( + get_webauthn_challenge=pretend.call_recorder(lambda: "not_real"), + clear_webauthn_challenge=pretend.call_recorder(lambda: pretend.stub()), + ), + find_service=lambda *a, **kw: user_service, + host_url=pretend.stub(), + registry=pretend.stub(settings=pretend.stub(get=lambda *a: pretend.stub())), + rp_id=pretend.stub(), + domain=pretend.stub(), + response=pretend.stub( + set_cookie=pretend.call_recorder(lambda *a: pretend.stub()) + ), + ) + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + credential=pretend.stub(errors=["Fake validation failure"]), + validated_credential=(pretend.stub(), pretend.stub()), + ) + form_class = pretend.call_recorder(lambda *a, **kw: form_obj) + monkeypatch.setattr(views, "WebAuthnAuthenticationForm", form_class) + + result = views.webauthn_authentication_validate(request) + + assert _get_two_factor_data.calls == [pretend.call(request)] + assert _login_user.calls == [pretend.call(request, 1)] + assert request.session.get_webauthn_challenge.calls == [pretend.call()] + assert request.session.clear_webauthn_challenge.calls == [pretend.call()] + + assert result == { + "success": "Successful WebAuthn assertion", + "redirect_to": "foobar", + } + + class TestLogout: @pytest.mark.parametrize("next_url", [None, "/foo/bar/", "/wat/"]) def test_get_returns_empty(self, pyramid_request, next_url): diff --git a/tests/unit/manage/test_forms.py b/tests/unit/manage/test_forms.py index cef9ce84053c..e5d5107b7b18 100644 --- a/tests/unit/manage/test_forms.py +++ b/tests/unit/manage/test_forms.py @@ -17,6 +17,7 @@ from webob.multidict import MultiDict import warehouse.utils.otp as otp +import warehouse.utils.webauthn as webauthn from warehouse.manage import forms @@ -165,3 +166,158 @@ def test_creation(self): form = forms.DeleteTOTPForm(user_service=user_service) assert form.user_service is user_service + + +class TestProvisionWebAuthnForm: + def test_creation(self): + user_service = pretend.stub() + user_id = pretend.stub() + challenge = pretend.stub() + rp_id = pretend.stub() + origin = pretend.stub() + form = forms.ProvisionWebAuthnForm( + user_service=user_service, + user_id=user_id, + challenge=challenge, + rp_id=rp_id, + origin=origin, + ) + + assert form.user_service is user_service + assert form.user_id is user_id + assert form.challenge is challenge + assert form.rp_id is rp_id + assert form.origin is origin + + def test_verify_assertion_invalid_json(self): + user_service = pretend.stub( + get_webauthn_by_label=pretend.call_recorder(lambda *a: None) + ) + + form = forms.ProvisionWebAuthnForm( + data={"credential": "invalid json", "label": "fake label"}, + user_service=user_service, + user_id=pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + assert not form.validate() + assert ( + form.credential.errors.pop() == "Invalid WebAuthn credential: Bad payload" + ) + + def test_verify_assertion_invalid(self): + user_service = pretend.stub( + verify_webauthn_credential=pretend.raiser( + webauthn.RegistrationRejectedException("Fake exception") + ), + get_webauthn_by_label=pretend.call_recorder(lambda *a: None), + ) + form = forms.ProvisionWebAuthnForm( + data={"credential": "{}", "label": "fake label"}, + user_service=user_service, + user_id=pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + assert not form.validate() + assert form.credential.errors.pop() == "Fake exception" + + def test_verify_label_missing(self): + user_service = pretend.stub( + verify_webauthn_credential=lambda *a, **kw: pretend.stub() + ) + form = forms.ProvisionWebAuthnForm( + data={"credential": "{}"}, + user_service=user_service, + user_id=pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + assert not form.validate() + assert form.label.errors.pop() == "Specify a label" + + def test_verify_label_already_in_use(self): + user_service = pretend.stub( + verify_webauthn_credential=lambda *a, **kw: pretend.stub(), + get_webauthn_by_label=pretend.call_recorder(lambda *a: pretend.stub()), + ) + form = forms.ProvisionWebAuthnForm( + data={"credential": "{}", "label": "fake label"}, + user_service=user_service, + user_id=pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + assert not form.validate() + assert form.label.errors.pop() == "Label 'fake label' already in use" + + def test_creates_validated_credential(self): + fake_validated_credential = object() + user_service = pretend.stub( + verify_webauthn_credential=lambda *a, **kw: fake_validated_credential, + get_webauthn_by_label=pretend.call_recorder(lambda *a: None), + ) + form = forms.ProvisionWebAuthnForm( + data={"credential": "{}", "label": "fake label"}, + user_service=user_service, + user_id=pretend.stub(), + challenge=pretend.stub(), + rp_id=pretend.stub(), + origin=pretend.stub(), + ) + + assert form.validate() + assert form.validated_credential is fake_validated_credential + + +class TestDeleteWebAuthnForm: + def test_creation(self): + user_service = pretend.stub() + user_id = pretend.stub() + form = forms.DeleteWebAuthnForm(user_service=user_service, user_id=user_id) + + assert form.user_service is user_service + + def test_validate_label_missing(self): + form = forms.DeleteWebAuthnForm( + user_service=pretend.stub(), user_id=pretend.stub() + ) + + assert not form.validate() + assert form.label.errors.pop() == "Specify a label" + + def test_validate_label_not_in_use(self): + user_service = pretend.stub( + get_webauthn_by_label=pretend.call_recorder(lambda *a: None) + ) + form = forms.DeleteWebAuthnForm( + data={"label": "fake label"}, + user_service=user_service, + user_id=pretend.stub(), + ) + + assert not form.validate() + assert form.label.errors.pop() == "No WebAuthn key with given label" + + def test_creates_webauthn_attribute(self): + fake_webauthn = object() + user_service = pretend.stub( + get_webauthn_by_label=pretend.call_recorder(lambda *a: fake_webauthn) + ) + form = forms.DeleteWebAuthnForm( + data={"label": "fake label"}, + user_service=user_service, + user_id=pretend.stub(), + ) + + assert form.validate() + assert form.webauthn is fake_webauthn diff --git a/tests/unit/manage/test_views.py b/tests/unit/manage/test_views.py index d0dffae84123..945a00d6a821 100644 --- a/tests/unit/manage/test_views.py +++ b/tests/unit/manage/test_views.py @@ -1088,6 +1088,218 @@ def test_delete_totp_two_factor_not_allowed(self): ] +class TestProvisionWebAuthn: + def test_get_webauthn_view(self): + user_service = pretend.stub() + request = pretend.stub(find_service=lambda *a, **kw: user_service) + + view = views.ProvisionWebAuthnViews(request) + result = view.webauthn_provision() + + assert result == {} + + def test_get_webauthn_options(self): + user_service = pretend.stub( + get_webauthn_credential_options=pretend.call_recorder( + lambda *a, **kw: {"not_real": "credential_options"} + ) + ) + request = pretend.stub( + user=pretend.stub(id=1234), + session=pretend.stub( + get_webauthn_challenge=pretend.call_recorder(lambda: "fake_challenge") + ), + find_service=lambda *a, **kw: user_service, + registry=pretend.stub( + settings={ + "site.name": "fake_site_name", + "warehouse.domain": "fake_domain", + } + ), + domain="fake_domain", + ) + + view = views.ProvisionWebAuthnViews(request) + result = view.webauthn_provision_options() + + assert result == {"not_real": "credential_options"} + assert user_service.get_webauthn_credential_options.calls == [ + pretend.call( + 1234, + challenge="fake_challenge", + rp_name=request.registry.settings["site.name"], + rp_id=request.domain, + icon_url=request.registry.settings["warehouse.domain"], + ) + ] + + def test_validate_webauthn_provision(self, monkeypatch): + user_service = pretend.stub( + add_webauthn=pretend.call_recorder(lambda *a, **kw: pretend.stub()) + ) + request = pretend.stub( + POST={}, + user=pretend.stub(id=1234, webauthn=None), + session=pretend.stub( + get_webauthn_challenge=pretend.call_recorder(lambda: "fake_challenge"), + clear_webauthn_challenge=pretend.call_recorder(lambda: pretend.stub()), + flash=pretend.call_recorder(lambda *a, **kw: None), + ), + find_service=lambda *a, **kw: user_service, + domain="fake_domain", + host_url="fake_host_url", + ) + + provision_webauthn_obj = pretend.stub( + validate=lambda: True, + validated_credential=pretend.stub( + credential_id=b"fake_credential_id", + public_key=b"fake_public_key", + sign_count=1, + ), + label=pretend.stub(data="fake_label"), + ) + provision_webauthn_cls = pretend.call_recorder( + lambda *a, **kw: provision_webauthn_obj + ) + monkeypatch.setattr(views, "ProvisionWebAuthnForm", provision_webauthn_cls) + + view = views.ProvisionWebAuthnViews(request) + result = view.validate_webauthn_provision() + + assert request.session.get_webauthn_challenge.calls == [pretend.call()] + assert request.session.clear_webauthn_challenge.calls == [pretend.call()] + assert user_service.add_webauthn.calls == [ + pretend.call( + 1234, + label="fake_label", + credential_id="fake_credential_id", + public_key="fake_public_key", + sign_count=1, + ) + ] + assert request.session.flash.calls == [ + pretend.call("WebAuthn successfully provisioned.", queue="success") + ] + assert result == {"success": "WebAuthn successfully provisioned"} + + def test_validate_webauthn_provision_invalid_form(self, monkeypatch): + user_service = pretend.stub( + add_webauthn=pretend.call_recorder(lambda *a, **kw: pretend.stub()) + ) + request = pretend.stub( + POST={}, + user=pretend.stub(id=1234, webauthn=None), + session=pretend.stub( + get_webauthn_challenge=pretend.call_recorder(lambda: "fake_challenge"), + clear_webauthn_challenge=pretend.call_recorder(lambda: pretend.stub()), + flash=pretend.call_recorder(lambda *a, **kw: None), + ), + find_service=lambda *a, **kw: user_service, + domain="fake_domain", + host_url="fake_host_url", + ) + + provision_webauthn_obj = pretend.stub( + validate=lambda: False, + errors=pretend.stub( + values=pretend.call_recorder(lambda: [["Not a real error"]]) + ), + ) + provision_webauthn_cls = pretend.call_recorder( + lambda *a, **kw: provision_webauthn_obj + ) + monkeypatch.setattr(views, "ProvisionWebAuthnForm", provision_webauthn_cls) + + view = views.ProvisionWebAuthnViews(request) + result = view.validate_webauthn_provision() + + assert request.session.get_webauthn_challenge.calls == [pretend.call()] + assert request.session.clear_webauthn_challenge.calls == [pretend.call()] + assert user_service.add_webauthn.calls == [] + assert result == {"fail": {"errors": ["Not a real error"]}} + + def test_delete_webauthn(self, monkeypatch): + request = pretend.stub( + POST={}, + user=pretend.stub( + id=1234, + username=pretend.stub(), + webauthn=pretend.stub( + __get__=pretend.call_recorder(lambda *a: [pretend.stub]), + __len__=pretend.call_recorder(lambda *a: 1), + remove=pretend.call_recorder(lambda *a: pretend.stub()), + ), + ), + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), + route_path=pretend.call_recorder(lambda x: "/foo/bar"), + find_service=lambda *a, **kw: pretend.stub(), + ) + + delete_webauthn_obj = pretend.stub( + validate=lambda: True, webauthn=pretend.stub() + ) + delete_webauthn_cls = pretend.call_recorder( + lambda *a, **kw: delete_webauthn_obj + ) + monkeypatch.setattr(views, "DeleteWebAuthnForm", delete_webauthn_cls) + + view = views.ProvisionWebAuthnViews(request) + result = view.delete_webauthn() + + assert request.session.flash.calls == [ + pretend.call("WebAuthn device deleted.", queue="success") + ] + assert request.route_path.calls == [pretend.call("manage.account")] + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "/foo/bar" + + def test_delete_webauthn_not_provisioned(self): + request = pretend.stub( + user=pretend.stub(id=1234, webauthn=[]), + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), + route_path=pretend.call_recorder(lambda x: "/foo/bar"), + find_service=lambda *a, **kw: pretend.stub(), + ) + + view = views.ProvisionWebAuthnViews(request) + result = view.delete_webauthn() + + assert request.session.flash.calls == [ + pretend.call("No WebAuthhn device to delete.", queue="error") + ] + assert request.route_path.calls == [pretend.call("manage.account")] + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "/foo/bar" + + def test_delete_webauthn_invalid_form(self, monkeypatch): + request = pretend.stub( + POST={}, + user=pretend.stub( + id=1234, username=pretend.stub(), webauthn=[pretend.stub()] + ), + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), + route_path=pretend.call_recorder(lambda x: "/foo/bar"), + find_service=lambda *a, **kw: pretend.stub(), + ) + + delete_webauthn_obj = pretend.stub(validate=lambda: False) + delete_webauthn_cls = pretend.call_recorder( + lambda *a, **kw: delete_webauthn_obj + ) + monkeypatch.setattr(views, "DeleteWebAuthnForm", delete_webauthn_cls) + + view = views.ProvisionWebAuthnViews(request) + result = view.delete_webauthn() + + assert request.session.flash.calls == [ + pretend.call("Invalid credentials.", queue="error") + ] + assert request.route_path.calls == [pretend.call("manage.account")] + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "/foo/bar" + + class TestManageProjects: def test_manage_projects(self, db_request): older_release = ReleaseFactory(created=datetime.datetime(2015, 1, 1)) diff --git a/tests/unit/packaging/test_models.py b/tests/unit/packaging/test_models.py index 0e25e723690c..5a7388919ce6 100644 --- a/tests/unit/packaging/test_models.py +++ b/tests/unit/packaging/test_models.py @@ -124,11 +124,19 @@ def test_acl(self, db_session): assert acls == [ (Allow, "group:admins", "admin"), (Allow, "group:moderators", "moderator"), - (Allow, str(owner1.user.id), ["manage:project", "upload"]), - (Allow, str(owner2.user.id), ["manage:project", "upload"]), - (Allow, str(maintainer1.user.id), ["upload"]), - (Allow, str(maintainer2.user.id), ["upload"]), - ] + ] + sorted( + [ + (Allow, str(owner1.user.id), ["manage:project", "upload"]), + (Allow, str(owner2.user.id), ["manage:project", "upload"]), + ], + key=lambda x: x[1], + ) + sorted( + [ + (Allow, str(maintainer1.user.id), ["upload"]), + (Allow, str(maintainer2.user.id), ["upload"]), + ], + key=lambda x: x[1], + ) class TestRelease: @@ -293,11 +301,19 @@ def test_acl(self, db_session): assert acls == [ (Allow, "group:admins", "admin"), (Allow, "group:moderators", "moderator"), - (Allow, str(owner1.user.id), ["manage:project", "upload"]), - (Allow, str(owner2.user.id), ["manage:project", "upload"]), - (Allow, str(maintainer1.user.id), ["upload"]), - (Allow, str(maintainer2.user.id), ["upload"]), - ] + ] + sorted( + [ + (Allow, str(owner1.user.id), ["manage:project", "upload"]), + (Allow, str(owner2.user.id), ["manage:project", "upload"]), + ], + key=lambda x: x[1], + ) + sorted( + [ + (Allow, str(maintainer1.user.id), ["upload"]), + (Allow, str(maintainer2.user.id), ["upload"]), + ], + key=lambda x: x[1], + ) @pytest.mark.parametrize( ("home_page", "expected"), diff --git a/tests/unit/test_routes.py b/tests/unit/test_routes.py index 54d32eb8fba2..7bdc6bc985c7 100644 --- a/tests/unit/test_routes.py +++ b/tests/unit/test_routes.py @@ -125,6 +125,16 @@ def add_policy(name, filename): ), pretend.call("accounts.login", "/account/login/", domain=warehouse), pretend.call("accounts.two-factor", "/account/two-factor/", domain=warehouse), + pretend.call( + "accounts.webauthn-authenticate.options", + "/accounts/webauthn-authenticate/options", + domain=warehouse, + ), + pretend.call( + "accounts.webauthn-authenticate.validate", + "/accounts/webauthn-authenticate/validate", + domain=warehouse, + ), pretend.call("accounts.logout", "/account/logout/", domain=warehouse), pretend.call("accounts.register", "/account/register/", domain=warehouse), pretend.call( @@ -149,6 +159,26 @@ def add_policy(name, filename): "/manage/account/totp-provision/image", domain=warehouse, ), + pretend.call( + "manage.account.webauthn-provision", + "/manage/account/webauthn-provision", + domain=warehouse, + ), + pretend.call( + "manage.account.webauthn-provision.options", + "/manage/account/webauthn-provision/options", + domain=warehouse, + ), + pretend.call( + "manage.account.webauthn-provision.validate", + "/manage/account/webauthn-provision/validate", + domain=warehouse, + ), + pretend.call( + "manage.account.webauthn-provision.delete", + "/manage/account/webauthn-provision/delete", + domain=warehouse, + ), pretend.call("manage.projects", "/manage/projects/", domain=warehouse), pretend.call( "manage.project.settings", diff --git a/tests/unit/test_sessions.py b/tests/unit/test_sessions.py index e42e26603b4a..c5acc2680388 100644 --- a/tests/unit/test_sessions.py +++ b/tests/unit/test_sessions.py @@ -21,6 +21,7 @@ import warehouse.sessions import warehouse.utils.otp as otp +import warehouse.utils.webauthn as webauthn from warehouse.sessions import ( InvalidSession, @@ -250,7 +251,7 @@ def test_get_csrf_token_empty(self): assert session.get_csrf_token() == "123456" assert session.new_csrf_token.calls == [pretend.call()] - def test_get_totp_secret(self, monkeypatch): + def test_get_totp_secret(self): session = Session() session[session._totp_secret_key] = b"foobar" @@ -271,6 +272,31 @@ def test_clear_totp_secret(self): session.clear_totp_secret() assert not session[session._totp_secret_key] + def test_get_webauthn_challenge(self): + session = Session() + session[session._webauthn_challenge_key] = "not_a_real_challenge" + + assert session.get_webauthn_challenge() == "not_a_real_challenge" + + def test_get_webauthn_challenge_empty(self, monkeypatch): + generate_webauthn_challenge = pretend.call_recorder( + lambda: "not_a_real_challenge" + ) + monkeypatch.setattr( + webauthn, "generate_webauthn_challenge", generate_webauthn_challenge + ) + + session = Session() + assert session.get_webauthn_challenge() == "not_a_real_challenge" + assert session._webauthn_challenge_key in session + + def test_clear_webauthn_challenge(self): + session = Session() + session[session._webauthn_challenge_key] = "not_a_real_challenge" + + session.clear_webauthn_challenge() + assert not session[session._webauthn_challenge_key] + class TestSessionFactory: def test_initialize(self, monkeypatch): diff --git a/tests/unit/utils/test_webauthn.py b/tests/unit/utils/test_webauthn.py new file mode 100644 index 000000000000..85b163c520e4 --- /dev/null +++ b/tests/unit/utils/test_webauthn.py @@ -0,0 +1,121 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pretend +import pytest +import webauthn as pywebauthn + +import warehouse.utils.webauthn as webauthn + + +def test_generate_webauthn_challenge(): + challenge = webauthn.generate_webauthn_challenge() + + assert isinstance(challenge, str) + assert ( + challenge + == webauthn._webauthn_b64encode( + webauthn._webauthn_b64decode(challenge) + ).decode() + ) + + +def test_verify_registration_response(monkeypatch): + response_obj = pretend.stub( + verify=pretend.call_recorder(lambda: "not a real object") + ) + response_cls = pretend.call_recorder(lambda *a, **kw: response_obj) + monkeypatch.setattr(pywebauthn, "WebAuthnRegistrationResponse", response_cls) + + resp = webauthn.verify_registration_response( + {}, "not_a_real_challenge", rp_id="fake_rp_id", origin="fake_origin" + ) + + assert response_cls.calls == [ + pretend.call( + "fake_rp_id", + "fake_origin", + {}, + webauthn._webauthn_b64encode("not_a_real_challenge".encode()).decode(), + ) + ] + assert resp == "not a real object" + + +def test_verify_registration_response_failure(monkeypatch): + response_obj = pretend.stub( + verify=pretend.raiser(pywebauthn.webauthn.RegistrationRejectedException) + ) + response_cls = pretend.call_recorder(lambda *a, **kw: response_obj) + monkeypatch.setattr(pywebauthn, "WebAuthnRegistrationResponse", response_cls) + + with pytest.raises(webauthn.RegistrationRejectedException): + webauthn.verify_registration_response( + {}, "not_a_real_challenge", rp_id="fake_rp_id", origin="fake_origin" + ) + + +def test_verify_assertion_response(monkeypatch): + assertion_obj = pretend.stub(verify=pretend.call_recorder(lambda: 1234)) + assertion_cls = pretend.call_recorder(lambda *a, **kw: assertion_obj) + monkeypatch.setattr(pywebauthn, "WebAuthnAssertionResponse", assertion_cls) + + not_a_real_user = pretend.stub(credential_id="not_a_real_credential") + get_webauthn_users = pretend.call_recorder(lambda *a, **kw: [not_a_real_user]) + monkeypatch.setattr(webauthn, "_get_webauthn_users", get_webauthn_users) + + not_a_real_assertion = object() + resp = webauthn.verify_assertion_response( + not_a_real_assertion, + challenge="not_a_real_challenge", + user=not_a_real_user, + origin="fake_origin", + icon_url="fake_icon_url", + rp_id="fake_rp_id", + ) + + assert get_webauthn_users.calls == [ + pretend.call(not_a_real_user, icon_url="fake_icon_url", rp_id="fake_rp_id") + ] + assert assertion_cls.calls == [ + pretend.call( + not_a_real_user, + not_a_real_assertion, + webauthn._webauthn_b64encode("not_a_real_challenge".encode()).decode(), + "fake_origin", + allow_credentials=["not_a_real_credential"], + ) + ] + assert resp == ("not_a_real_credential", 1234) + + +def test_verify_assertion_response_failure(monkeypatch): + assertion_obj = pretend.stub( + verify=pretend.raiser(pywebauthn.webauthn.AuthenticationRejectedException) + ) + assertion_cls = pretend.call_recorder(lambda *a, **kw: assertion_obj) + monkeypatch.setattr(pywebauthn, "WebAuthnAssertionResponse", assertion_cls) + + get_webauthn_users = pretend.call_recorder( + lambda *a, **kw: [pretend.stub(credential_id=pretend.stub())] + ) + monkeypatch.setattr(webauthn, "_get_webauthn_users", get_webauthn_users) + + with pytest.raises(webauthn.AuthenticationRejectedException): + webauthn.verify_assertion_response( + pretend.stub(), + challenge="not_a_real_challenge", + user=pretend.stub(), + origin="fake_origin", + icon_url="fake_icon_url", + rp_id="fake_rp_id", + ) diff --git a/warehouse/accounts/forms.py b/warehouse/accounts/forms.py index 24a773b70f51..c238893d100d 100644 --- a/warehouse/accounts/forms.py +++ b/warehouse/accounts/forms.py @@ -10,11 +10,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + import disposable_email_domains import jinja2 import wtforms import wtforms.fields.html5 +import warehouse.utils.webauthn as webauthn + from warehouse import forms from warehouse.accounts.interfaces import TooManyFailedLogins from warehouse.accounts.models import DisableReason @@ -46,6 +50,11 @@ class TOTPValueMixin: ) +class WebAuthnCredentialMixin: + + credential = wtforms.StringField(wtforms.validators.DataRequired()) + + class NewUsernameMixin: username = wtforms.StringField( @@ -242,18 +251,54 @@ def validate_password(self, field): ) -class TwoFactorForm(TOTPValueMixin, forms.Form): +class _TwoFactorAuthenticationForm(forms.Form): def __init__(self, *args, user_id, user_service, **kwargs): super().__init__(*args, **kwargs) self.user_id = user_id self.user_service = user_service + +class TOTPAuthenticationForm(TOTPValueMixin, _TwoFactorAuthenticationForm): def validate_totp_value(self, field): totp_value = field.data.encode("utf8") if not self.user_service.check_totp_value(self.user_id, totp_value): raise wtforms.validators.ValidationError("Invalid TOTP code.") +class WebAuthnAuthenticationForm(WebAuthnCredentialMixin, _TwoFactorAuthenticationForm): + __params__ = ["credential"] + + def __init__(self, *args, challenge, origin, icon_url, rp_id, **kwargs): + super().__init__(*args, **kwargs) + self.challenge = challenge + self.origin = origin + self.icon_url = icon_url + self.rp_id = rp_id + + def validate_credential(self, field): + try: + assertion_dict = json.loads(field.data.encode("utf8")) + except json.JSONDecodeError: + raise wtforms.validators.ValidationError( + f"Invalid WebAuthn assertion: Bad payload" + ) + + try: + validated_credential = self.user_service.verify_webauthn_assertion( + self.user_id, + assertion_dict, + challenge=self.challenge, + origin=self.origin, + icon_url=self.icon_url, + rp_id=self.rp_id, + ) + + except webauthn.AuthenticationRejectedException as e: + raise wtforms.validators.ValidationError(str(e)) + + self.validated_credential = validated_credential + + class RequestPasswordResetForm(forms.Form): username_or_email = wtforms.StringField( validators=[wtforms.validators.DataRequired()] diff --git a/warehouse/accounts/interfaces.py b/warehouse/accounts/interfaces.py index 8cb0cb2c7281..6cfa5c1a0cff 100644 --- a/warehouse/accounts/interfaces.py +++ b/warehouse/accounts/interfaces.py @@ -107,6 +107,16 @@ def has_two_factor(user_id): authentication and is allowed to use it. """ + def has_totp(user_id): + """ + Returns True if the user has a TOTP device provisioned. + """ + + def has_webauthn(user_id): + """ + Returns True if the user has a security key provisioned. + """ + def get_totp_secret(user_id): """ Returns the user's TOTP secret as bytes. @@ -120,6 +130,59 @@ def check_totp_value(user_id, totp_value, *, tags=None): Returns True if the given TOTP code is valid. """ + def add_webauthn(user_id, **kwargs): + """ + Adds a WebAuthn credential to the given user. + + Returns None if the user already has this credential. + """ + + def get_webauthn_credential_options( + user_id, *, challenge, rp_name, rp_id, icon_url + ): + """ + Returns a dictionary of credential options suitable for beginning the WebAuthn + provisioning process for the given user. + """ + + def get_webauthn_assertion_options(user_id, *, challenge, icon_url, rp_id): + """ + Returns a dictionary of assertion options suitable for beginning the WebAuthn + authentication process for the given user. + """ + + def verify_webauthn_credential(credential, *, challenge, rp_id, origin): + """ + Checks whether the given credential is valid, i.e. suitable for generating + assertions during authentication. + + Returns the validated credential on success, raises + webauthn.RegistrationRejectedException on failure. + """ + + def verify_webauthn_assertion( + user_id, assertion, *, challenge, origin, icon_url, rp_id + ): + """ + Checks whether the given assertion was produced by the given user's WebAuthn + device. + + Returns the updated signage count on success, raises + webauthn.AuthenticationRejectedException on failure. + """ + + def get_webauthn_by_label(user_id, label): + """ + Returns a WebAuthn credential for the given user by its label, + or None if no credential for the user has this label. + """ + + def get_webauthn_by_credential_id(user_id, credential_id): + """ + Returns a WebAuthn credential for the given user by its credential ID, + or None of the user doesn't have a credential with this ID. + """ + class ITokenService(Interface): def dumps(data): diff --git a/warehouse/accounts/models.py b/warehouse/accounts/models.py index 582f632104ea..54cfb592e664 100644 --- a/warehouse/accounts/models.py +++ b/warehouse/accounts/models.py @@ -80,8 +80,13 @@ class User(SitemapMixin, db.Model): Enum(DisableReason, values_callable=lambda x: [e.value for e in x]), nullable=True, ) + totp_secret = Column(Binary(length=20), nullable=True) + webauthn = orm.relationship( + "WebAuthn", backref="user", cascade="all, delete-orphan", lazy=False + ) + emails = orm.relationship( "Email", backref="user", cascade="all, delete-orphan", lazy=False ) @@ -107,15 +112,31 @@ def email(self): @property def has_two_factor(self): - # TODO: This is where user.u2f_provisioned et al. - # will also go. - return self.totp_secret is not None + return self.totp_secret is not None or len(self.webauthn) > 0 @property def two_factor_provisioning_allowed(self): return self.primary_email is not None and self.primary_email.verified +class WebAuthn(db.Model): + __tablename__ = "user_security_keys" + __table_args__ = ( + UniqueConstraint("label", name="user_security_keys_label_key"), + Index("user_security_keys_label_key", "user_id"), + ) + + user_id = Column( + UUID(as_uuid=True), + ForeignKey("users.id", deferrable=True, initially="DEFERRED"), + nullable=False, + ) + label = Column(String, nullable=False) + credential_id = Column(String, unique=True, nullable=False) + public_key = Column(String, unique=True, nullable=True) + sign_count = Column(Integer, default=0) + + class UnverifyReasons(enum.Enum): SpamComplaint = "spam complaint" diff --git a/warehouse/accounts/services.py b/warehouse/accounts/services.py index ff3ad6220ca7..d88b20c04428 100644 --- a/warehouse/accounts/services.py +++ b/warehouse/accounts/services.py @@ -26,6 +26,7 @@ from zope.interface import implementer import warehouse.utils.otp as otp +import warehouse.utils.webauthn as webauthn from warehouse.accounts.interfaces import ( IPasswordBreachedService, @@ -36,7 +37,7 @@ TokenMissing, TooManyFailedLogins, ) -from warehouse.accounts.models import Email, User +from warehouse.accounts.models import Email, User, WebAuthn from warehouse.metrics import IMetricsService from warehouse.rate_limiting import DummyRateLimiter, IRateLimiter from warehouse.utils.crypto import BadData, SignatureExpired, URLSafeTimedSerializer @@ -237,6 +238,22 @@ def has_two_factor(self, user_id): return user.has_two_factor + def has_totp(self, user_id): + """ + Returns True if the user has a TOTP device provisioned. + """ + user = self.get_user(user_id) + + return user.totp_secret is not None + + def has_webauthn(self, user_id): + """ + Returns True if the user has a security key provisioned. + """ + user = self.get_user(user_id) + + return len(user.webauthn) > 0 + def get_totp_secret(self, user_id): """ Returns the user's TOTP secret as bytes. @@ -300,6 +317,116 @@ def check_totp_value(self, user_id, totp_value, *, tags=None): return valid + def get_webauthn_credential_options( + self, user_id, *, challenge, rp_name, rp_id, icon_url + ): + """ + Returns a dictionary of credential options suitable for beginning the WebAuthn + provisioning process for the given user. + """ + user = self.get_user(user_id) + + return webauthn.get_credential_options( + user, challenge=challenge, rp_name=rp_name, rp_id=rp_id, icon_url=icon_url + ) + + def get_webauthn_assertion_options(self, user_id, *, challenge, icon_url, rp_id): + """ + Returns a dictionary of assertion options suitable for beginning the WebAuthn + authentication process for the given user. + """ + user = self.get_user(user_id) + + return webauthn.get_assertion_options( + user, challenge=challenge, icon_url=icon_url, rp_id=rp_id + ) + + def verify_webauthn_credential(self, credential, *, challenge, rp_id, origin): + """ + Checks whether the given credential is valid, i.e. suitable for generating + assertions during authentication. + + Returns the validated credential on success, raises + webauthn.RegistrationRejectedException on failure. + """ + validated_credential = webauthn.verify_registration_response( + credential, challenge=challenge, rp_id=rp_id, origin=origin + ) + + webauthn_cred = ( + self.db.query(WebAuthn) + .filter_by(credential_id=validated_credential.credential_id.decode()) + .first() + ) + + if webauthn_cred is not None: + raise webauthn.RegistrationRejectedException("Credential ID already in use") + + return validated_credential + + def verify_webauthn_assertion( + self, user_id, assertion, *, challenge, origin, icon_url, rp_id + ): + """ + Checks whether the given assertion was produced by the given user's WebAuthn + device. + + Returns the updated signage count on success, raises + webauthn.AuthenticationRejectedException on failure. + """ + user = self.get_user(user_id) + + return webauthn.verify_assertion_response( + assertion, + challenge=challenge, + user=user, + origin=origin, + icon_url=icon_url, + rp_id=rp_id, + ) + + def add_webauthn(self, user_id, **kwargs): + """ + Adds a WebAuthn credential to the given user. + + Returns None if the user already has this credential. + """ + user = self.get_user(user_id) + + webauthn = WebAuthn(user=user, **kwargs) + self.db.add(webauthn) + self.db.flush() # flush the db now so webauthn.id is available + + return webauthn + + def get_webauthn_by_label(self, user_id, label): + """ + Returns a WebAuthn credential for the given user by its label, + or None if no credential for the user has this label. + """ + user = self.get_user(user_id) + + return next( + (credential for credential in user.webauthn if credential.label == label), + None, + ) + + def get_webauthn_by_credential_id(self, user_id, credential_id): + """ + Returns a WebAuthn credential for the given user by its credential ID, + or None of the user doesn't have a credential with this ID. + """ + user = self.get_user(user_id) + + return next( + ( + credential + for credential in user.webauthn + if credential.credential_id == credential_id + ), + None, + ) + @implementer(ITokenService) class TokenService: diff --git a/warehouse/accounts/views.py b/warehouse/accounts/views.py index a85fcc8338b9..81bcddb27805 100644 --- a/warehouse/accounts/views.py +++ b/warehouse/accounts/views.py @@ -30,7 +30,8 @@ RegistrationForm, RequestPasswordResetForm, ResetPasswordForm, - TwoFactorForm, + TOTPAuthenticationForm, + WebAuthnAuthenticationForm, ) from warehouse.accounts.interfaces import ( IPasswordBreachedService, @@ -186,40 +187,35 @@ def login(request, redirect_field_name=REDIRECT_FIELD_NAME, _form_class=LoginFor require_csrf=True, require_methods=False, ) -def two_factor(request, _form_class=TwoFactorForm): +def two_factor_and_totp_validate(request, _form_class=TOTPAuthenticationForm): if request.authenticated_userid is not None: return HTTPSeeOther(request.route_path("manage.projects")) - token_service = request.find_service(ITokenService, name="two_factor") - try: - two_factor_data = token_service.loads(request.query_string) + two_factor_data = _get_two_factor_data(request) except TokenException: request.session.flash("Invalid or expired two factor login.", queue="error") return HTTPSeeOther(request.route_path("accounts.login")) userid = two_factor_data.get("userid") - if not userid: - return HTTPSeeOther(request.route_path("accounts.login")) - redirect_to = two_factor_data.get("redirect_to") user_service = request.find_service(IUserService, context=None) - form = _form_class( - request.POST, - user_id=userid, - user_service=user_service, - check_password_metrics_tags=["method:auth", "auth_method:login_form"], - ) + two_factor_state = {} + if user_service.has_totp(userid): + two_factor_state["totp_form"] = _form_class( + request.POST, + user_id=userid, + user_service=user_service, + check_password_metrics_tags=["method:auth", "auth_method:login_form"], + ) + if user_service.has_webauthn(userid): + two_factor_state["has_webauthn"] = True if request.method == "POST": + form = two_factor_state["totp_form"] if form.validate(): - # If the user-originating redirection url is not safe, then - # redirect to the index instead. - if not redirect_to or not is_safe_url(url=redirect_to, host=request.host): - redirect_to = request.route_path("manage.projects") - _login_user(request, userid) resp = HTTPSeeOther(redirect_to) @@ -234,7 +230,87 @@ def two_factor(request, _form_class=TwoFactorForm): else: form.totp_value.data = "" - return {"form": form} + return two_factor_state + + +@view_config( + uses_session=True, + request_method="GET", + route_name="accounts.webauthn-authenticate.options", + renderer="json", +) +def webauthn_authentication_options(request): + if request.authenticated_userid is not None: + return {"fail": {"errors": ["Already authenticated"]}} + + try: + two_factor_data = _get_two_factor_data(request) + except TokenException: + request.session.flash("Invalid or expired two factor login.", queue="error") + return {"fail": {"errors": ["Invalid two factor token"]}} + + userid = two_factor_data.get("userid") + user_service = request.find_service(IUserService, context=None) + return user_service.get_webauthn_assertion_options( + userid, + challenge=request.session.get_webauthn_challenge(), + icon_url=request.registry.settings.get("warehouse.domain", request.domain), + rp_id=request.domain, + ) + + +@view_config( + require_csrf=True, + require_methods=False, + uses_session=True, + request_method="POST", + request_param=WebAuthnAuthenticationForm.__params__, + route_name="accounts.webauthn-authenticate.validate", + renderer="json", +) +def webauthn_authentication_validate(request): + if request.authenticated_userid is not None: + return {"fail": {"errors": ["Already authenticated"]}} + + try: + two_factor_data = _get_two_factor_data(request) + except TokenException: + request.session.flash("Invalid or expired two factor login.", queue="error") + return {"fail": {"errors": ["Invalid two factor token"]}} + + redirect_to = two_factor_data.get("redirect_to") + userid = two_factor_data.get("userid") + + user_service = request.find_service(IUserService, context=None) + form = WebAuthnAuthenticationForm( + **request.POST, + user_id=userid, + user_service=user_service, + challenge=request.session.get_webauthn_challenge(), + origin=request.host_url, + icon_url=request.registry.settings.get("warehouse.domain", request.domain), + rp_id=request.domain, + ) + + request.session.clear_webauthn_challenge() + + if form.validate(): + credential_id, sign_count = form.validated_credential + webauthn = user_service.get_webauthn_by_credential_id(userid, credential_id) + webauthn.sign_count = sign_count + + _login_user(request, userid) + + request.response.set_cookie( + USER_ID_INSECURE_COOKIE, + hashlib.blake2b(str(userid).encode("ascii"), person=b"warehouse.userid") + .hexdigest() + .lower(), + ) + return {"success": "Successful WebAuthn assertion", "redirect_to": redirect_to} + + errors = [str(error) for error in form.credential.errors] + return {"fail": {"errors": errors}} @view_config( @@ -498,6 +574,22 @@ def _error(message): return HTTPSeeOther(request.route_path("manage.account")) +def _get_two_factor_data(request, _redirect_to="/"): + token_service = request.find_service(ITokenService, name="two_factor") + two_factor_data = token_service.loads(request.query_string) + + if two_factor_data.get("userid") is None: + raise TokenInvalid + + # If the user-originating redirection url is not safe, then + # redirect to the index instead. + redirect_to = two_factor_data.get("redirect_to") + if redirect_to is None or not is_safe_url(url=redirect_to, host=request.host): + two_factor_data["redirect_to"] = _redirect_to + + return two_factor_data + + def _login_user(request, userid): # We have a session factory associated with this request, so in order # to protect against session fixation attacks we're going to make sure diff --git a/warehouse/manage/forms.py b/warehouse/manage/forms.py index a5b56a9fd5a2..a00f26d06e87 100644 --- a/warehouse/manage/forms.py +++ b/warehouse/manage/forms.py @@ -10,9 +10,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + import wtforms import warehouse.utils.otp as otp +import warehouse.utils.webauthn as webauthn from warehouse import forms from warehouse.accounts.forms import ( @@ -20,6 +23,7 @@ NewPasswordMixin, PasswordMixin, TOTPValueMixin, + WebAuthnCredentialMixin, ) @@ -104,3 +108,78 @@ def validate_totp_value(self, field): totp_value = field.data.encode("utf8") if not otp.verify_totp(self.totp_secret, totp_value): raise wtforms.validators.ValidationError("Invalid TOTP code. Try again?") + + +class DeleteWebAuthnForm(forms.Form): + __params__ = ["confirm_key_name"] + + label = wtforms.StringField( + validators=[ + wtforms.validators.DataRequired(message="Specify a label"), + wtforms.validators.Length( + max=64, message=("Label must be 64 characters or less") + ), + ] + ) + + def __init__(self, *args, user_service, user_id, **kwargs): + super().__init__(*args, **kwargs) + self.user_service = user_service + self.user_id = user_id + + def validate_label(self, field): + label = field.data + + webauthn = self.user_service.get_webauthn_by_label(self.user_id, label) + if webauthn is None: + raise wtforms.validators.ValidationError("No WebAuthn key with given label") + self.webauthn = webauthn + + +class ProvisionWebAuthnForm(WebAuthnCredentialMixin, forms.Form): + __params__ = ["label", "credential"] + + label = wtforms.StringField( + validators=[ + wtforms.validators.DataRequired(message="Specify a label"), + wtforms.validators.Length( + max=64, message=("Label must be 64 characters or less") + ), + ] + ) + + def __init__( + self, *args, user_service, user_id, challenge, rp_id, origin, **kwargs + ): + super().__init__(*args, **kwargs) + self.user_service = user_service + self.user_id = user_id + self.challenge = challenge + self.rp_id = rp_id + self.origin = origin + + def validate_credential(self, field): + try: + credential_dict = json.loads(field.data.encode("utf8")) + except json.JSONDecodeError: + raise wtforms.validators.ValidationError( + "Invalid WebAuthn credential: Bad payload" + ) + + try: + validated_credential = self.user_service.verify_webauthn_credential( + credential_dict, + challenge=self.challenge, + rp_id=self.rp_id, + origin=self.origin, + ) + except webauthn.RegistrationRejectedException as e: + raise wtforms.validators.ValidationError(str(e)) + + self.validated_credential = validated_credential + + def validate_label(self, field): + label = field.data + + if self.user_service.get_webauthn_by_label(self.user_id, label) is not None: + raise wtforms.validators.ValidationError(f"Label '{label}' already in use") diff --git a/warehouse/manage/views.py b/warehouse/manage/views.py index 427989ff032c..3e144bace990 100644 --- a/warehouse/manage/views.py +++ b/warehouse/manage/views.py @@ -43,7 +43,9 @@ ChangeRoleForm, CreateRoleForm, DeleteTOTPForm, + DeleteWebAuthnForm, ProvisionTOTPForm, + ProvisionWebAuthnForm, SaveAccountForm, ) from warehouse.packaging.models import File, JournalEntry, Project, Release, Role @@ -421,6 +423,104 @@ def delete_totp(self): return HTTPSeeOther(self.request.route_path("manage.account")) +@view_defaults( + uses_session=True, + require_csrf=True, + require_methods=False, + permission="manage:user", + http_cache=0, +) +class ProvisionWebAuthnViews: + def __init__(self, request): + self.request = request + self.user_service = request.find_service(IUserService, context=None) + + @view_config( + request_method="GET", + route_name="manage.account.webauthn-provision", + renderer="manage/account/webauthn-provision.html", + ) + def webauthn_provision(self): + return {} + + @view_config( + request_method="GET", + route_name="manage.account.webauthn-provision.options", + renderer="json", + ) + def webauthn_provision_options(self): + return self.user_service.get_webauthn_credential_options( + self.request.user.id, + challenge=self.request.session.get_webauthn_challenge(), + rp_name=self.request.registry.settings["site.name"], + rp_id=self.request.domain, + icon_url=self.request.registry.settings.get( + "warehouse.domain", self.request.domain + ), + ) + + @view_config( + request_method="POST", + request_param=ProvisionWebAuthnForm.__params__, + route_name="manage.account.webauthn-provision.validate", + renderer="json", + ) + def validate_webauthn_provision(self): + form = ProvisionWebAuthnForm( + **self.request.POST, + user_service=self.user_service, + user_id=self.request.user.id, + challenge=self.request.session.get_webauthn_challenge(), + rp_id=self.request.domain, + origin=self.request.host_url, + ) + + self.request.session.clear_webauthn_challenge() + + if form.validate(): + self.user_service.add_webauthn( + self.request.user.id, + label=form.label.data, + credential_id=form.validated_credential.credential_id.decode(), + public_key=form.validated_credential.public_key.decode(), + sign_count=form.validated_credential.sign_count, + ) + self.request.session.flash( + "WebAuthn successfully provisioned.", queue="success" + ) + return {"success": "WebAuthn successfully provisioned"} + + errors = [ + str(error) for error_list in form.errors.values() for error in error_list + ] + return {"fail": {"errors": errors}} + + @view_config( + request_method="POST", + request_param=DeleteWebAuthnForm.__params__, + route_name="manage.account.webauthn-provision.delete", + ) + def delete_webauthn(self): + if len(self.request.user.webauthn) == 0: + self.request.session.flash("No WebAuthhn device to delete.", queue="error") + return HTTPSeeOther(self.request.route_path("manage.account")) + + form = DeleteWebAuthnForm( + **self.request.POST, + username=self.request.user.username, + user_service=self.user_service, + user_id=self.request.user.id, + ) + + if form.validate(): + self.request.user.webauthn.remove(form.webauthn) + self.request.session.flash("WebAuthn device deleted.", queue="success") + else: + self.request.session.flash("Invalid credentials.", queue="error") + + return HTTPSeeOther(self.request.route_path("manage.account")) + + @view_config( route_name="manage.projects", renderer="manage/projects.html", diff --git a/warehouse/migrations/versions/af7dca2bb2fe_webauthn_model.py b/warehouse/migrations/versions/af7dca2bb2fe_webauthn_model.py new file mode 100644 index 000000000000..7627fcbe9d95 --- /dev/null +++ b/warehouse/migrations/versions/af7dca2bb2fe_webauthn_model.py @@ -0,0 +1,52 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +webauthn model + +Revision ID: af7dca2bb2fe +Revises: e1b493d3b171 +Create Date: 2019-05-06 15:58:35.922060 +""" + +import sqlalchemy as sa + +from alembic import op +from sqlalchemy.dialects import postgresql + +revision = "af7dca2bb2fe" +down_revision = "e1b493d3b171" + + +def upgrade(): + op.create_table( + "user_security_keys", + sa.Column( + "id", + postgresql.UUID(as_uuid=True), + server_default=sa.text("gen_random_uuid()"), + nullable=False, + ), + sa.Column("user_id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("credential_id", sa.String, nullable=False), + sa.Column("public_key", sa.String, nullable=True), + sa.Column("sign_count", sa.Integer, nullable=True), + sa.ForeignKeyConstraint( + ["user_id"], ["users.id"], initially="DEFERRED", deferrable=True + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("credential_id"), + sa.UniqueConstraint("public_key"), + ) + + +def downgrade(): + op.drop_table("user_security_keys") diff --git a/warehouse/migrations/versions/cdb2915fda5c_add_webauthn_labels.py b/warehouse/migrations/versions/cdb2915fda5c_add_webauthn_labels.py new file mode 100644 index 000000000000..ed77c6dbb506 --- /dev/null +++ b/warehouse/migrations/versions/cdb2915fda5c_add_webauthn_labels.py @@ -0,0 +1,37 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +add webauthn labels + +Revision ID: cdb2915fda5c +Revises: af7dca2bb2fe +Create Date: 2019-06-08 16:31:41.681380 +""" + +import sqlalchemy as sa + +from alembic import op + +revision = "cdb2915fda5c" +down_revision = "af7dca2bb2fe" + + +def upgrade(): + op.add_column("user_security_keys", sa.Column("label", sa.String(), nullable=False)) + op.create_index( + "user_security_keys_label_key", "user_security_keys", ["user_id"], unique=False + ) + + +def downgrade(): + op.drop_index("user_security_keys_label_key", table_name="user_security_keys") + op.drop_column("user_security_keys", "label") diff --git a/warehouse/packaging/models.py b/warehouse/packaging/models.py index c20d9f43737e..64c893afaa4d 100644 --- a/warehouse/packaging/models.py +++ b/warehouse/packaging/models.py @@ -174,6 +174,7 @@ def __acl__(self): query = session.query(Role).filter(Role.project == self) query = query.options(orm.lazyload("project")) query = query.options(orm.joinedload("user").lazyload("emails")) + query = query.join(User).order_by(User.id.asc()) for role in sorted( query.all(), key=lambda x: ["Owner", "Maintainer"].index(x.role_name) ): diff --git a/warehouse/routes.py b/warehouse/routes.py index 676f7c612100..9f02f2403dd7 100644 --- a/warehouse/routes.py +++ b/warehouse/routes.py @@ -98,6 +98,16 @@ def includeme(config): ) config.add_route("accounts.login", "/account/login/", domain=warehouse) config.add_route("accounts.two-factor", "/account/two-factor/", domain=warehouse) + config.add_route( + "accounts.webauthn-authenticate.options", + "/accounts/webauthn-authenticate/options", + domain=warehouse, + ) + config.add_route( + "accounts.webauthn-authenticate.validate", + "/accounts/webauthn-authenticate/validate", + domain=warehouse, + ) config.add_route("accounts.logout", "/account/logout/", domain=warehouse) config.add_route("accounts.register", "/account/register/", domain=warehouse) config.add_route( @@ -124,6 +134,26 @@ def includeme(config): "/manage/account/totp-provision/image", domain=warehouse, ) + config.add_route( + "manage.account.webauthn-provision", + "/manage/account/webauthn-provision", + domain=warehouse, + ) + config.add_route( + "manage.account.webauthn-provision.options", + "/manage/account/webauthn-provision/options", + domain=warehouse, + ) + config.add_route( + "manage.account.webauthn-provision.validate", + "/manage/account/webauthn-provision/validate", + domain=warehouse, + ) + config.add_route( + "manage.account.webauthn-provision.delete", + "/manage/account/webauthn-provision/delete", + domain=warehouse, + ) config.add_route("manage.projects", "/manage/projects/", domain=warehouse) config.add_route( "manage.project.settings", diff --git a/warehouse/sessions.py b/warehouse/sessions.py index 070a16b95c97..4ca0c721a69c 100644 --- a/warehouse/sessions.py +++ b/warehouse/sessions.py @@ -22,6 +22,7 @@ from zope.interface import implementer import warehouse.utils.otp as otp +import warehouse.utils.webauthn as webauthn from warehouse.cache.http import add_vary from warehouse.utils import crypto @@ -84,6 +85,7 @@ class Session(dict): _csrf_token_key = "_csrf_token" _flash_key = "_flash_messages" _totp_secret_key = "_totp_secret" + _webauthn_challenge_key = "_webauthn_challenge" # A number of our methods need to be decorated so that they also call # self.changed() @@ -180,6 +182,16 @@ def get_totp_secret(self): def clear_totp_secret(self): self[self._totp_secret_key] = None + def get_webauthn_challenge(self): + webauthn_challenge = self.get(self._webauthn_challenge_key) + if webauthn_challenge is None: + self[self._webauthn_challenge_key] = webauthn.generate_webauthn_challenge() + webauthn_challenge = self[self._webauthn_challenge_key] + return webauthn_challenge + + def clear_webauthn_challenge(self): + self[self._webauthn_challenge_key] = None + @implementer(ISessionFactory) class SessionFactory: diff --git a/warehouse/static/js/warehouse/index.js b/warehouse/static/js/warehouse/index.js index 10ce8b00b027..f97d3008ee89 100644 --- a/warehouse/static/js/warehouse/index.js +++ b/warehouse/static/js/warehouse/index.js @@ -39,6 +39,7 @@ import searchFilterToggle from "warehouse/utils/search-filter-toggle"; import YouTubeIframeLoader from "youtube-iframe"; import RepositoryInfo from "warehouse/utils/repository-info"; import BindModalKeys from "warehouse/utils/bind-modal-keys"; +import {AuthenticateWebAuthn, ProvisionWebAuthn} from "warehouse/utils/webauthn"; // Do this before anything else, to potentially capture errors down the line docReady(() => { @@ -216,6 +217,12 @@ docReady(bindDropdowns); // Get modal keypress event listeners ready docReady(BindModalKeys); +// Get WebAuthn provisioning ready +docReady(ProvisionWebAuthn); + +// Get WebAuthn authentication ready +docReady(AuthenticateWebAuthn); + // Bind again when client-side includes have been loaded (for the logged-in // user dropdown) document.addEventListener("CSILoaded", bindDropdowns); diff --git a/warehouse/static/js/warehouse/utils/webauthn.js b/warehouse/static/js/warehouse/utils/webauthn.js new file mode 100644 index 000000000000..595773647bae --- /dev/null +++ b/warehouse/static/js/warehouse/utils/webauthn.js @@ -0,0 +1,217 @@ +/* Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +const populateWebAuthnErrorList = (errors) => { + const errorList = document.getElementById("webauthn-errors"); + if (errorList === null) { + return; + } + + errorList.innerHTML = ""; + + errors.forEach((error) => { + const errorItem = document.createElement("li"); + errorItem.appendChild(document.createTextNode(error)); + errorList.appendChild(errorItem); + }); +}; + +const doWebAuthn = (buttonId, func) => { + const webAuthnButton = document.getElementById(buttonId); + if (webAuthnButton === null) { + return null; + } + + const csrfToken = webAuthnButton.getAttribute("csrf-token"); + if (csrfToken === null) { + return; + } + + if (!window.PublicKeyCredential) { + populateWebAuthnErrorList(["Your browser doesn't support WebAuthn."]); + return; + } + + webAuthnButton.disabled = false; + webAuthnButton.addEventListener("click", async () => { func(csrfToken); }); +}; + +const hexEncode = (buf) => { + return Array.from(buf).map((x) => { + return ("0" + x.toString(16)).substr(-2); + }).join(""); +}; + +const webAuthnBtoA = (encoded) => { + return btoa(encoded).replace(/\+/g, "-").replace(/\//g, "_").replace(/=/g, ""); +}; + +const webAuthnBase64Normalize = (encoded) => { + return encoded.replace(/_/g, "/").replace(/-/g, "+"); +}; + +const transformAssertionOptions = (assertionOptions) => { + let {challenge, allowCredentials} = assertionOptions; + + challenge = Uint8Array.from(challenge, c => c.charCodeAt(0)); + allowCredentials = allowCredentials.map(credentialDescriptor => { + let {id} = credentialDescriptor; + id = webAuthnBase64Normalize(id); + id = Uint8Array.from(atob(id), c => c.charCodeAt(0)); + return Object.assign({}, credentialDescriptor, {id}); + }); + + const transformedOptions = Object.assign( + {}, + assertionOptions, + {challenge, allowCredentials}); + + return transformedOptions; +}; + +const transformAssertion = (assertion) => { + const authData = new Uint8Array(assertion.response.authenticatorData); + const clientDataJSON = new Uint8Array(assertion.response.clientDataJSON); + const rawId = new Uint8Array(assertion.rawId); + const sig = new Uint8Array(assertion.response.signature); + const assertionClientExtensions = assertion.getClientExtensionResults(); + + return { + id: assertion.id, + rawId: webAuthnBtoA(rawId), + type: assertion.type, + authData: webAuthnBtoA(String.fromCharCode(...authData)), + clientData: webAuthnBtoA(String.fromCharCode(...clientDataJSON)), + signature: hexEncode(sig), + assertionClientExtensions: JSON.stringify(assertionClientExtensions), + }; +}; + +const transformCredentialOptions = (credentialOptions) => { + let {challenge, user} = credentialOptions; + user.id = Uint8Array.from(credentialOptions.user.id, c => c.charCodeAt(0)); + challenge = Uint8Array.from(credentialOptions.challenge, c => c.charCodeAt(0)); + + const transformedOptions = Object.assign({}, credentialOptions, {challenge, user}); + + return transformedOptions; +}; + +const transformCredential = (credential) => { + const attObj = new Uint8Array(credential.response.attestationObject); + const clientDataJSON = new Uint8Array(credential.response.clientDataJSON); + const rawId = new Uint8Array(credential.rawId); + const registrationClientExtensions = credential.getClientExtensionResults(); + + return { + id: credential.id, + rawId: webAuthnBtoA(rawId), + type: credential.type, + attObj: webAuthnBtoA(String.fromCharCode(...attObj)), + clientData: webAuthnBtoA(String.fromCharCode(...clientDataJSON)), + registrationClientExtensions: JSON.stringify(registrationClientExtensions), + }; +}; + +const postCredential = async (label, credential, token) => { + const formData = new FormData(); + formData.set("label", label); + formData.set("credential", JSON.stringify(credential)); + formData.set("csrf_token", token); + + const resp = await fetch( + "/manage/account/webauthn-provision/validate", { + method: "POST", + cache: "no-cache", + body: formData, + } + ); + + return await resp.json(); +}; + +const postAssertion = async (assertion, token) => { + const formData = new FormData(); + formData.set("credential", JSON.stringify(assertion)); + formData.set("csrf_token", token); + + const resp = await fetch( + "/accounts/webauthn-authenticate/validate" + window.location.search, { + method: "POST", + cache: "no-cache", + body: formData, + } + ); + + return await resp.json(); +}; + +export const ProvisionWebAuthn = () => { + doWebAuthn("webauthn-provision-begin", async (csrfToken) => { + const label = document.getElementById("webauthn-provision-label").value; + + // TODO(ww): Should probably find a way to use the route string here, + // not the actual endpoint. + const resp = await fetch( + "/manage/account/webauthn-provision/options", { + cache: "no-cache", + } + ); + + const credentialOptions = await resp.json(); + const transformedOptions = transformCredentialOptions(credentialOptions); + const credential = await navigator.credentials.create({ + publicKey: transformedOptions, + }); + const transformedCredential = transformCredential(credential); + + const status = await postCredential(label, transformedCredential, csrfToken); + if (status.fail) { + populateWebAuthnErrorList(status.fail.errors); + return; + } + + window.location.replace("/manage/account"); + }); +}; + +export const AuthenticateWebAuthn = () => { + doWebAuthn("webauthn-auth-begin", async (csrfToken) => { + const resp = await fetch( + "/accounts/webauthn-authenticate/options" + window.location.search, { + cache: "no-cache", + } + ); + + const assertionOptions = await resp.json(); + if (assertionOptions.fail) { + window.location.replace("/accounts/login"); + return; + } + + const transformedOptions = transformAssertionOptions(assertionOptions); + const assertion = await navigator.credentials.get({ + publicKey: transformedOptions, + }); + const transformedAssertion = transformAssertion(assertion); + + const status = await postAssertion(transformedAssertion, csrfToken); + if (status.fail) { + populateWebAuthnErrorList(status.fail.errors); + return; + } + + window.location.replace(status.redirect_to); + }); +}; diff --git a/warehouse/static/sass/blocks/_twofa-login.scss b/warehouse/static/sass/blocks/_twofa-login.scss new file mode 100644 index 000000000000..ce8fcbadecb8 --- /dev/null +++ b/warehouse/static/sass/blocks/_twofa-login.scss @@ -0,0 +1,57 @@ +/*! + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* +
+
+ // method here +
+
+ // add divider +
+
+*/ + +.twofa-login { + @media only screen and (min-width: $tablet) { + display: flex; + } + + &__method:first-of-type { + @media only screen and (min-width: $tablet) { + flex-shrink: 0; + } + } + + &__method--padded { + padding: $spacing-unit 0; + + @media only screen and (max-width: $tablet) { + padding: 0 0 $spacing-unit 0; + } + } + + &__divider { + margin: 0 $spacing-unit * 2; + width: 2px; + background-image: linear-gradient(to bottom, $border-color, $white); + + @media only screen and (max-width: $tablet) { + margin: 0 0 $spacing-unit 0; + width: 100%; + height: 2px; + background-image: linear-gradient(to right, $border-color, $white); + } + } +} diff --git a/warehouse/static/sass/warehouse.scss b/warehouse/static/sass/warehouse.scss index 4721a0492cb9..157726c41453 100644 --- a/warehouse/static/sass/warehouse.scss +++ b/warehouse/static/sass/warehouse.scss @@ -109,6 +109,7 @@ @import "blocks/table"; @import "blocks/tooltip"; @import "blocks/totp-form"; +@import "blocks/twofa-login"; @import "blocks/vertical-tabs"; @import "blocks/viewport-section"; diff --git a/warehouse/templates/accounts/two-factor.html b/warehouse/templates/accounts/two-factor.html index 4e69b956b621..0d5efe4ba178 100644 --- a/warehouse/templates/accounts/two-factor.html +++ b/warehouse/templates/accounts/two-factor.html @@ -24,38 +24,76 @@
-

Two-factor authentication - authenticate with app

+

Two-factor authentication

-
- +
diff --git a/warehouse/templates/manage/account.html b/warehouse/templates/manage/account.html index 7730eb1e1b2b..a6557fd2e3c4 100644 --- a/warehouse/templates/manage/account.html +++ b/warehouse/templates/manage/account.html @@ -280,43 +280,63 @@

Change password

{% if user.two_factor_provisioning_allowed %}
+ {% macro twofa_buttons() %} + {% if not user.totp_secret %} + Add 2FA by TOTP application + {% endif %} + Add 2FA with security key + {% endmacro %}

Two factor authentication

-

Two factor authentication adds an additional layer of security to your account. Learn more

+

Two factor authentication (2FA) adds an additional layer of security to your account. Learn more

+ {% if user.totp_secret or user.webauthn %} - - + + {% if user.totp_secret %} - + {% endif %} + {% for credential in user.webauthn %} + + + + + {% endfor %}
Two factor methodStatusTwo factor method
Authentication app (TOTP){% if user.totp_secret %} - Enabled - {% else %} - Disabled - {% endif %} - - {% if user.totp_secret %} - Disable - {% set title="Disable 2FA by TOTP application" %} - {% set confirm_button_label="Disable TOTP application" %} + Remove + {% set title="Remove 2FA by TOTP application" %} + {% set confirm_button_label="Remove TOTP application" %} {% set action="/manage/account/totp-provision" %} {{ confirm_modal(title=title, confirm_name="Username", confirm_string=user.username, confirm_button_label=confirm_button_label, slug="disable-totp", action=action, warning=False, confirm_string_in_title=False) }} - {% else %} - Enable - {% endif %}
"{{ credential.label }}" - Security key (WebAuthn) + Remove + {% set title="Remove 2FA key - " + credential.label %} + {% set confirm_button_label="Remove key" %} + {% set action=request.route_path('manage.account.webauthn-provision.delete') %} + {% set slug="disable-webauthn-" + credential.id | string %} + {% set extra_fields %} + + {% endset %} + {{ confirm_modal(title=title, confirm_name="Key name", confirm_string=credential.label, confirm_button_label=confirm_button_label, slug=slug, extra_fields=extra_fields, action=action, warning=False, confirm_string_in_title=False) }} +
+ {{ twofa_buttons() }} + {% else %} +
+

You have not enabled two factor authentication on your account.

+ {{ twofa_buttons() }} +
+ {% endif %}
- {% endif %}
diff --git a/warehouse/templates/manage/account/webauthn-provision.html b/warehouse/templates/manage/account/webauthn-provision.html new file mode 100644 index 000000000000..44dd184f3c68 --- /dev/null +++ b/warehouse/templates/manage/account/webauthn-provision.html @@ -0,0 +1,58 @@ +{# + # Licensed under the Apache License, Version 2.0 (the "License"); + # you may not use this file except in compliance with the License. + # You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. +-#} +{% extends "manage/manage_base.html" %} + +{% set user = request.user %} +{% set title = "Set up 2FA using a security key (WebAuthn)" %} + +{% block title %}{{ title }}{% endblock %} + +{# Hide mobile search on manager pages #} +{% block mobile_search %}{% endblock %} + +{% block main %} +

{{ title }}

+
+

How two factor authentication with WebAuthn works

+ +
    +
  1. When you sign into PyPI, you will be asked to provide your username and password, as usual
  2. +
  3. You will then see a screen asking for a secondary login method
  4. +
  5. Insert and activate your key (provisioned below), to access your PyPI account
  6. +
+ +
+ + + +
+ + +

PyPI supports adding multiple WebAuthn keys.
Please give this key a name.

+ +
+
+ +
+
+{% endblock %} diff --git a/warehouse/templates/pages/help.html b/warehouse/templates/pages/help.html index 4269ea244b2c..ac47d88be68b 100644 --- a/warehouse/templates/pages/help.html +++ b/warehouse/templates/pages/help.html @@ -29,6 +29,7 @@ {% macro compromised_password() %}Why is PyPI telling me my password is compromised?{% endmacro %} {% macro twoFA() %}How do I access my PyPI account using two factor authentication?{% endmacro %} {% macro totp() %}How do I generate a code through a TOTP application?{% endmacro %} +{% macro utfkey() %}How do I login with a security key?{% endmacro %} {% macro mirroring() %}How can I run a mirror of PyPI?{% endmacro %} {% macro APIs() %}Does PyPI have APIs I can use?{% endmacro %} @@ -83,6 +84,7 @@

My Account

  • {{ compromised_password() }}
  • {{ twoFA() }}
  • {{ totp() }}
  • +
  • {{ utfkey() }}
  • @@ -177,7 +179,7 @@

    {{ compromised_password() }}

    {{ twoFA() }}

    Users who have chosen to set up two factor authentication (2FA) on their PyPI account must provide a second method of identity verification (other than their username and password) to log in.

    -

    PyPI currently supports a single 2FA method: Generating a code through a TOTP application. +

    PyPI supports two 2FA methods: generating a code through a TOTP application, and using a U2F security key.

    {{ totp() }}

    When enabling two factor authentication (2FA) via TOTP in your account admin, you were asked to provision an application (usually a mobile phone app) in order to generate authentication codes. Popular applications include:

    @@ -189,6 +191,10 @@

    {{ totp() }}

  • Duo Mobile for Android and iPhone (proprietary)
  • Open the application of your choice to generate a code.

    + +

    {{ utfkey() }}

    +

    A security key (also known as a universal second factor, or U2F key) is hardware device that communicates via USB, NFC, or Bluetooth. Popular keys include Yubikey, Google Titan and Thetis. PyPI supports any FIDO U2F compatible key and follows the WebAuthn standard.

    +

    Users who have set up this second factor will be prompted to use their key (usually by inserting it into a USB port and pressing a button) when logging in.

    diff --git a/warehouse/utils/webauthn.py b/warehouse/utils/webauthn.py new file mode 100644 index 000000000000..37143d83b4aa --- /dev/null +++ b/warehouse/utils/webauthn.py @@ -0,0 +1,148 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import os + +import webauthn as pywebauthn + +from webauthn.webauthn import ( + AuthenticationRejectedException as _AuthenticationRejectedException, + RegistrationRejectedException as _RegistrationRejectedException, +) + + +class AuthenticationRejectedException(Exception): + pass + + +class RegistrationRejectedException(Exception): + pass + + +WebAuthnCredential = pywebauthn.WebAuthnCredential + + +def _get_webauthn_users(user, *, icon_url, rp_id): + """ + Returns a webauthn.WebAuthnUser instance corresponding + to the given user model, with properties suitable for + usage within the webauthn API. + """ + return [ + pywebauthn.WebAuthnUser( + str(user.id), + user.username, + user.name, + icon_url, + credential.credential_id, + credential.public_key, + credential.sign_count, + rp_id, + ) + for credential in user.webauthn + ] + + +def _webauthn_b64decode(encoded): + padding = "=" * (len(encoded) % 4) + return base64.urlsafe_b64decode(encoded + padding) + + +def _webauthn_b64encode(source): + return base64.urlsafe_b64encode(source).rstrip(b"=") + + +def generate_webauthn_challenge(): + """ + Returns a random challenge suitable for use within + Webauthn's credential and configuration option objects. + + See: https://w3c.github.io/webauthn/#cryptographic-challenges + """ + # NOTE: Webauthn recommends at least 16 bytes of entropy, + # we go with 32 because it doesn't cost us anything. + return _webauthn_b64encode(os.urandom(32)).decode() + + +def get_credential_options(user, *, challenge, rp_name, rp_id, icon_url): + """ + Returns a dictionary of options for credential creation + on the client side. + """ + options = pywebauthn.WebAuthnMakeCredentialOptions( + challenge, rp_name, rp_id, str(user.id), user.username, user.name, icon_url + ) + + return options.registration_dict + + +def get_assertion_options(user, *, challenge, icon_url, rp_id): + """ + Returns a dictionary of options for assertion retrieval + on the client side. + """ + options = pywebauthn.WebAuthnAssertionOptions( + _get_webauthn_users(user, icon_url=icon_url, rp_id=rp_id), challenge + ) + + return options.assertion_dict + + +def verify_registration_response(response, challenge, *, rp_id, origin): + """ + Validates the challenge and attestation information + sent from the client during device registration. + + Returns a WebAuthnCredential on success. + Raises RegistrationRejectedException on failire. + """ + # NOTE: We re-encode the challenge below, because our + # response's clientData.challenge is encoded twice: + # first for the entire clientData payload, and then again + # for the individual challenge. + response = pywebauthn.WebAuthnRegistrationResponse( + rp_id, origin, response, _webauthn_b64encode(challenge.encode()).decode() + ) + try: + return response.verify() + except _RegistrationRejectedException as e: + raise RegistrationRejectedException(str(e)) + + +def verify_assertion_response(assertion, *, challenge, user, origin, icon_url, rp_id): + """ + Validates the challenge and assertion information + sent from the client during authentication. + + Returns an updated signage count on success. + Raises AuthenticationRejectedException on failure. + """ + webauthn_users = _get_webauthn_users(user, icon_url=icon_url, rp_id=rp_id) + cred_ids = [cred.credential_id for cred in webauthn_users] + + for webauthn_user in webauthn_users: + response = pywebauthn.WebAuthnAssertionResponse( + webauthn_user, + assertion, + _webauthn_b64encode(challenge.encode()).decode(), + origin, + allow_credentials=cred_ids, + ) + try: + return (webauthn_user.credential_id, response.verify()) + except _AuthenticationRejectedException: + pass + + # If we exit the loop, then we've failed to verify the assertion against + # any of the user's WebAuthn credentials. Fail. + raise AuthenticationRejectedException("Invalid WebAuthn credential")