diff --git a/google/oauth2/challenges.py b/google/oauth2/challenges.py index bb523e6ca..4eb665b8a 100644 --- a/google/oauth2/challenges.py +++ b/google/oauth2/challenges.py @@ -124,7 +124,16 @@ def obtain_challenge_input(self, metadata): ) sk = metadata["securityKey"] challenges = sk["challenges"] - app_id = sk["applicationId"] + # Read both 'applicationId' and 'relyingPartyId', if they are the same, use + # applicationId, if they are different, use relyingPartyId first and retry + # with applicationId + application_id = sk["applicationId"] + relying_party_id = sk["relyingPartyId"] + + if application_id != relying_party_id: + application_parameters = [relying_party_id, application_id] + else: + application_parameters = [application_id] challenge_data = [] for c in challenges: @@ -134,24 +143,37 @@ def obtain_challenge_input(self, metadata): challenge = base64.urlsafe_b64decode(challenge) challenge_data.append({"key": key, "challenge": challenge}) - try: - api = pyu2f.convenience.authenticator.CreateCompositeAuthenticator( - REAUTH_ORIGIN - ) - response = api.Authenticate( - app_id, challenge_data, print_callback=sys.stderr.write - ) - return {"securityKey": response} - except pyu2f.errors.U2FError as e: - if e.code == pyu2f.errors.U2FError.DEVICE_INELIGIBLE: - sys.stderr.write("Ineligible security key.\n") - elif e.code == pyu2f.errors.U2FError.TIMEOUT: - sys.stderr.write("Timed out while waiting for security key touch.\n") - else: - raise e - except pyu2f.errors.NoDeviceFoundError: - sys.stderr.write("No security key found.\n") - return None + # Track number of tries to suppress error message until all application_parameters + # are tried. + tries = 0 + for app_id in application_parameters: + try: + tries += 1 + api = pyu2f.convenience.authenticator.CreateCompositeAuthenticator( + REAUTH_ORIGIN + ) + response = api.Authenticate( + app_id, challenge_data, print_callback=sys.stderr.write + ) + return {"securityKey": response} + except pyu2f.errors.U2FError as e: + if e.code == pyu2f.errors.U2FError.DEVICE_INELIGIBLE: + # Only show error if all app_ids have been tried + if tries == len(application_parameters): + sys.stderr.write("Ineligible security key.\n") + return None + continue + if e.code == pyu2f.errors.U2FError.TIMEOUT: + sys.stderr.write( + "Timed out while waiting for security key touch.\n" + ) + else: + raise e + except pyu2f.errors.PluginError: + continue + except pyu2f.errors.NoDeviceFoundError: + sys.stderr.write("No security key found.\n") + return None class SamlChallenge(ReauthChallenge): diff --git a/tests/oauth2/test_challenges.py b/tests/oauth2/test_challenges.py index 9e35d88af..a06f55283 100644 --- a/tests/oauth2/test_challenges.py +++ b/tests/oauth2/test_challenges.py @@ -45,13 +45,15 @@ def test_security_key(): ).decode("ascii"), } ], + "relyingPartyId": "security_key_application_id", }, } mock_key = mock.Mock() challenge = challenges.SecurityKeyChallenge() - # Test the case that security key challenge is passed. + # Test the case that security key challenge is passed with applicationId and + # relyingPartyId the same. with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key): with mock.patch( "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" @@ -68,6 +70,56 @@ def test_security_key(): print_callback=sys.stderr.write, ) + # Test the case that security key challenge is passed with applicationId and + # relyingPartyId different, first call works. + metadata["securityKey"]["relyingPartyId"] = "security_key_relying_party_id" + sys.stderr.write("metadata=" + str(metadata) + "\n") + with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key): + with mock.patch( + "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" + ) as mock_authenticate: + mock_authenticate.return_value = "security key response" + assert challenge.name == "SECURITY_KEY" + assert challenge.is_locally_eligible + assert challenge.obtain_challenge_input(metadata) == { + "securityKey": "security key response" + } + mock_authenticate.assert_called_with( + "security_key_relying_party_id", + [{"key": mock_key, "challenge": b"some_challenge"}], + print_callback=sys.stderr.write, + ) + + # Test the case that security key challenge is passed with applicationId and + # relyingPartyId different, first call fails, requires retry. + metadata["securityKey"]["relyingPartyId"] = "security_key_relying_party_id" + with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key): + with mock.patch( + "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" + ) as mock_authenticate: + assert challenge.name == "SECURITY_KEY" + assert challenge.is_locally_eligible + mock_authenticate.side_effect = [ + pyu2f.errors.U2FError(pyu2f.errors.U2FError.DEVICE_INELIGIBLE), + "security key response", + ] + assert challenge.obtain_challenge_input(metadata) == { + "securityKey": "security key response" + } + calls = [ + mock.call( + "security_key_relying_party_id", + [{"key": mock_key, "challenge": b"some_challenge"}], + print_callback=sys.stderr.write, + ), + mock.call( + "security_key_application_id", + [{"key": mock_key, "challenge": b"some_challenge"}], + print_callback=sys.stderr.write, + ), + ] + mock_authenticate.assert_has_calls(calls) + # Test various types of exceptions. with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key): with mock.patch( @@ -86,6 +138,12 @@ def test_security_key(): ) assert challenge.obtain_challenge_input(metadata) is None + with mock.patch( + "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" + ) as mock_authenticate: + mock_authenticate.side_effect = pyu2f.errors.PluginError() + assert challenge.obtain_challenge_input(metadata) is None + with mock.patch( "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" ) as mock_authenticate: