Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable webauthn plugin for security keys #1528

Merged
merged 10 commits into from
Jun 6, 2024
64 changes: 64 additions & 0 deletions google/oauth2/challenges.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@

from google.auth import _helpers
from google.auth import exceptions
from google.oauth2 import webauthn_handler_factory
from google.oauth2.webauthn_types import (
AuthenticationExtensionsClientInputs,
GetRequest,
PublicKeyCredentialDescriptor,
)


REAUTH_ORIGIN = "https://accounts.google.com"
SAML_CHALLENGE_MESSAGE = (
"Please run `gcloud auth login` to complete reauthentication with SAML."
)
WEBAUTHN_TIMEOUT_MS = 120000 # Two minute timeout
cpisunyer marked this conversation as resolved.
Show resolved Hide resolved


def get_user_password(text):
Expand Down Expand Up @@ -110,6 +117,18 @@ def is_locally_eligible(self):

@_helpers.copy_docstring(ReauthChallenge)
def obtain_challenge_input(self, metadata):
# Check if there is an available Webauthn Handler, if not use pyu2f
try:
factory = webauthn_handler_factory.WebauthnHandlerFactory()
webauthn_handler = factory.get_handler()
if webauthn_handler is not None:
sys.stderr.write("Please insert and touch your security key\n")
return self._obtain_challenge_input_webauthn(metadata, webauthn_handler)
except Exception:
cpisunyer marked this conversation as resolved.
Show resolved Hide resolved
# Attempt pyu2f if exception in webauthn flow
# traceback.print_exc()
cpisunyer marked this conversation as resolved.
Show resolved Hide resolved
pass

try:
import pyu2f.convenience.authenticator # type: ignore
import pyu2f.errors # type: ignore
Expand Down Expand Up @@ -173,6 +192,51 @@ def obtain_challenge_input(self, metadata):
sys.stderr.write("No security key found.\n")
return None

def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler):
sk = metadata["securityKey"]
challenges = sk["challenges"]
cpisunyer marked this conversation as resolved.
Show resolved Hide resolved
application_id = sk["applicationId"]
relying_party_id = sk["relyingPartyId"]

allow_credentials = []
for challenge in challenges:
cpisunyer marked this conversation as resolved.
Show resolved Hide resolved
key_handle = self._urlsafe_b64recode(challenge["keyHandle"])
allow_credentials.append(PublicKeyCredentialDescriptor(id=key_handle))

extension = AuthenticationExtensionsClientInputs(appid=application_id)

get_request = GetRequest(
origin=REAUTH_ORIGIN,
rpid=relying_party_id,
challenge=self._urlsafe_b64recode(challenges[0]["challenge"]),
cpisunyer marked this conversation as resolved.
Show resolved Hide resolved
timeout_ms=WEBAUTHN_TIMEOUT_MS,
allow_credentials=allow_credentials,
user_verification="required",
extensions=extension,
)

try:
get_response = webauthn_handler.get(get_request)
except Exception as e:
cpisunyer marked this conversation as resolved.
Show resolved Hide resolved
sys.stderr.write("Webauthn Error: {}.\n".format(e))
raise e

response = {
"clientData": get_response.response.client_data_json,
"authenticatorData": get_response.response.authenticator_data,
"signatureData": get_response.response.signature,
"applicationId": application_id,
"keyHandle": get_response.id,
"securityKeyReplyType": 2,
}
return {"securityKey": response}

def _urlsafe_b64recode(self, s):
cpisunyer marked this conversation as resolved.
Show resolved Hide resolved
"""Converts standard b64 encoded string to url safe b64 encoded string
with no padding."""
b = base64.urlsafe_b64decode(s)
return base64.urlsafe_b64encode(b).decode().rstrip("=")


class SamlChallenge(ReauthChallenge):
"""Challenge that asks the users to browse to their ID Providers.
Expand Down
16 changes: 16 additions & 0 deletions google/oauth2/webauthn_handler_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import List, Optional

from google.oauth2.webauthn_handler import PluginHandler, WebAuthnHandler


class WebauthnHandlerFactory:
handlers: List[WebAuthnHandler]

def __init__(self):
self.handlers = [PluginHandler()]

def get_handler(self) -> Optional[WebAuthnHandler]:
for handler in self.handlers:
if handler.is_available():
return handler
return None
111 changes: 111 additions & 0 deletions tests/oauth2/test_challenges.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Tests for the reauth module."""

import base64
import os
import sys

import mock
Expand All @@ -23,6 +24,13 @@

from google.auth import exceptions
from google.oauth2 import challenges
from google.oauth2.webauthn_types import (
AuthenticationExtensionsClientInputs,
AuthenticatorAssertionResponse,
GetRequest,
GetResponse,
PublicKeyCredentialDescriptor,
)


def test_get_user_password():
Expand Down Expand Up @@ -54,6 +62,8 @@ def test_security_key():

# Test the case that security key challenge is passed with applicationId and
# relyingPartyId the same.
os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None)

with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key):
with mock.patch(
"pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
Expand All @@ -70,6 +80,19 @@ def test_security_key():
print_callback=sys.stderr.write,
)

# Test the case that webauthn plugin is available
os.environ["GOOGLE_AUTH_WEBAUTHN_PLUGIN"] = "plugin"

with mock.patch(
"google.oauth2.challenges.SecurityKeyChallenge._obtain_challenge_input_webauthn",
return_value={"securityKey": "security key response"},
):

assert challenge.obtain_challenge_input(metadata) == {
"securityKey": "security key response"
}
os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None)

# Test the case that security key challenge is passed with applicationId and
# relyingPartyId different, first call works.
metadata["securityKey"]["relyingPartyId"] = "security_key_relying_party_id"
Expand Down Expand Up @@ -173,6 +196,94 @@ def test_security_key():
assert excinfo.match(r"pyu2f dependency is required")


def test_security_key_webauthn():
metadata = {
"status": "READY",
"challengeId": 2,
"challengeType": "SECURITY_KEY",
"securityKey": {
"applicationId": "security_key_application_id",
"challenges": [
{
"keyHandle": "some_key",
"challenge": base64.urlsafe_b64encode(
"some_challenge".encode("ascii")
).decode("ascii"),
}
],
"relyingPartyId": "security_key_application_id",
},
}

challenge = challenges.SecurityKeyChallenge()

sk = metadata["securityKey"]
sk_challenges = sk["challenges"]

application_id = sk["applicationId"]

allow_credentials = []
for sk_challenge in sk_challenges:
allow_credentials.append(
PublicKeyCredentialDescriptor(id=sk_challenge["keyHandle"])
)

extension = AuthenticationExtensionsClientInputs(appid=application_id)

get_request = GetRequest(
origin=challenges.REAUTH_ORIGIN,
rpid=application_id,
challenge=challenge._urlsafe_b64recode(sk_challenge["challenge"]),
timeout_ms=challenges.WEBAUTHN_TIMEOUT_MS,
allow_credentials=allow_credentials,
user_verification="required",
extensions=extension,
)

assertion_resp = AuthenticatorAssertionResponse(
client_data_json="clientDataJSON",
authenticator_data="authenticatorData",
signature="signature",
user_handle="userHandle",
)
get_response = GetResponse(
id="id",
response=assertion_resp,
authenticator_attachment="authenticatorAttachment",
client_extension_results="clientExtensionResults",
)
response = {
"clientData": get_response.response.client_data_json,
"authenticatorData": get_response.response.authenticator_data,
"signatureData": get_response.response.signature,
"applicationId": "security_key_application_id",
"keyHandle": get_response.id,
"securityKeyReplyType": 2,
}

mock_handler = mock.Mock()
mock_handler.get.return_value = get_response

# Test success case
assert challenge._obtain_challenge_input_webauthn(metadata, mock_handler) == {
"securityKey": response
}
mock_handler.get.assert_called_with(get_request)

# Test exceptions
mock_handler.get.side_effect = exceptions.MalformedError
with pytest.raises(exceptions.MalformedError):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)

mock_handler.get.side_effect = exceptions.InvalidResource
with pytest.raises(exceptions.InvalidResource):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)

mock_handler.get.side_effect = exceptions.ReauthFailError
with pytest.raises(exceptions.ReauthFailError):
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)


@mock.patch("getpass.getpass", return_value="foo")
def test_password_challenge(getpass_mock):
challenge = challenges.PasswordChallenge()
Expand Down
29 changes: 29 additions & 0 deletions tests/oauth2/test_webauthn_handler_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import mock
import pytest # type: ignore

from google.oauth2 import webauthn_handler
from google.oauth2 import webauthn_handler_factory


@pytest.fixture
def os_get_stub():
with mock.patch.object(
webauthn_handler.os.environ,
"get",
return_value="gcloud_webauthn_plugin",
name="fake os.environ.get",
) as mock_os_environ_get:
yield mock_os_environ_get


# Check that get_handler returns a value when env is set,
# that type is PluginHandler, and that no value is returned
# if env not set.
def test_WebauthHandlerFactory_get(os_get_stub):
factory = webauthn_handler_factory.WebauthnHandlerFactory()
assert factory.get_handler() is not None

assert isinstance(factory.get_handler(), webauthn_handler.PluginHandler)

os_get_stub.return_value = None
assert factory.get_handler() is None