diff --git a/CHANGELOG.md b/CHANGELOG.md index b3c97c41b..c2fc49f5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Add command line tool `openeo-auth token-clear` to remove OIDC refresh token cache +- Add support for OIDC device authorization grant without PKCE nor client secret, + ([#225](https://github.com/Open-EO/openeo-python-client/issues/225), [openeo-api#410](https://github.com/Open-EO/openeo-api/issues/410)) ### Changed diff --git a/openeo/rest/auth/oidc.py b/openeo/rest/auth/oidc.py index a27b6c18c..f11f76146 100644 --- a/openeo/rest/auth/oidc.py +++ b/openeo/rest/auth/oidc.py @@ -222,11 +222,18 @@ class DefaultOidcClientGrant(enum.Enum): Enum with possible values for "grant_types" field of default OIDC clients provided by backend. """ IMPLICIT = "implicit" + AUTH_CODE = "authorization_code" AUTH_CODE_PKCE = "authorization_code+pkce" + DEVICE_CODE = "urn:ietf:params:oauth:grant-type:device_code" DEVICE_CODE_PKCE = "urn:ietf:params:oauth:grant-type:device_code+pkce" REFRESH_TOKEN = "refresh_token" +# Type hint for function that checks if given list of OIDC grant types (DefaultOidcClientGrant enum values) +# fulfills a criterion. +GrantsChecker = Union[List[DefaultOidcClientGrant], Callable[[List[DefaultOidcClientGrant]], bool]] + + class OidcProviderInfo: """OpenID Connect Provider information, as provided by an openEO back-end (endpoint `/credentials/oidc`)""" @@ -274,12 +281,28 @@ def get_scopes_string(self, request_refresh_token: bool = False): log.debug("Using scopes: {s}".format(s=scopes)) return " ".join(sorted(scopes)) - def get_default_client_id(self, grant_types: List[DefaultOidcClientGrant]) -> Union[str, None]: - """Get first default client supporting the given grant types""" + def get_default_client_id(self, grant_check: GrantsChecker) -> Union[str, None]: + """ + Get first default client that supports (as stated by provider's `grant_types`) + the desired grant types (as implemented by `grant_check`) + """ + if isinstance(grant_check, list): + # Simple `grant_check` mode: just provide list of grants that all must be supported. + desired_grants = grant_check + grant_check = lambda grants: all(g in grants for g in desired_grants) + + def normalize_grants(grants: List[str]): + for grant in grants: + try: + yield DefaultOidcClientGrant(grant) + except ValueError: + log.warning(f"Invalid OIDC grant type {grant!r}.") + for client in self.default_clients or []: client_id = client.get("id") supported_grants = client.get("grant_types") - if client_id and supported_grants and all(g.value in supported_grants for g in grant_types): + supported_grants = list(normalize_grants(supported_grants)) + if client_id and supported_grants and grant_check(supported_grants): return client_id @@ -296,6 +319,13 @@ def __init__(self, client_id: str, provider: OidcProviderInfo, client_secret: st # TODO: load from config file + def guess_device_flow_pkce_support(self): + """Best effort guess if PKCE should be used for device auth grant""" + # Check if this client is also defined as default client with device_code+pkce + default_clients = [c for c in self.provider.default_clients or [] if c["id"] == self.client_id] + grant_types = set(g for c in default_clients for g in c.get("grant_types", [])) + return any("device_code+pkce" in g for g in grant_types) + class OidcAuthenticator: """ @@ -614,13 +644,10 @@ def __init__( # Allow to specify/override device code URL for cases when it is not available in OIDC discovery doc. self._device_code_url = device_code_url or self._provider_config.get("device_authorization_endpoint") if not self._device_code_url: - raise OidcException("No support for device code flow") + raise OidcException("No support for device authorization grant") self._max_poll_time = max_poll_time if use_pkce is None: - # TODO: better auto-detection if PKCE should/can be used, e.g.: - # does OIDC provider supports device flow + PKCE? Get this from `OidcProviderInfo`? - # (also see https://github.com/Open-EO/openeo-api/pull/366) - use_pkce = client_info.client_secret is None + use_pkce = client_info.client_secret is None and client_info.guess_device_flow_pkce_support() self._pkce = PkceCode() if use_pkce else None def _get_verification_info(self, request_refresh_token: bool = False) -> VerificationInfo: diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index 4f15d9fbf..2cc39ed43 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -26,7 +26,7 @@ from openeo.rest.auth.config import RefreshTokenStore, AuthConfig from openeo.rest.auth.oidc import OidcClientCredentialsAuthenticator, OidcAuthCodePkceAuthenticator, \ OidcClientInfo, OidcAuthenticator, OidcRefreshTokenAuthenticator, OidcResourceOwnerPasswordAuthenticator, \ - OidcDeviceAuthenticator, OidcProviderInfo, OidcException, DefaultOidcClientGrant + OidcDeviceAuthenticator, OidcProviderInfo, OidcException, DefaultOidcClientGrant, GrantsChecker from openeo.rest.datacube import DataCube from openeo.rest.imagecollectionclient import ImageCollectionClient from openeo.rest.job import RESTJob @@ -338,7 +338,7 @@ def _get_oidc_provider(self, provider_id: Union[str, None] = None) -> Tuple[str, def _get_oidc_provider_and_client_info( self, provider_id: str, client_id: Union[str, None], client_secret: Union[str, None], - default_client_grant_types: Union[None, List[DefaultOidcClientGrant]] = None + default_client_grant_check: Union[None, GrantsChecker] = None ) -> Tuple[str, OidcClientInfo]: """ Resolve provider_id and client info (as given or from config) @@ -357,10 +357,10 @@ def _get_oidc_provider_and_client_info( ) if client_id: _log.info("Using client_id {c!r} from config (provider {p!r})".format(c=client_id, p=provider_id)) - if client_id is None and default_client_grant_types: + if client_id is None and default_client_grant_check: # Try "default_client" from backend's provider info. _log.debug("No client_id given: checking default client in backend's provider info") - client_id = provider.get_default_client_id(grant_types=default_client_grant_types) + client_id = provider.get_default_client_id(grant_check=default_client_grant_check) if client_id: _log.info("Using default client_id {c!r} from OIDC provider {p!r} info.".format( c=client_id, p=provider_id @@ -414,7 +414,7 @@ def authenticate_oidc_authorization_code( """ provider_id, client_info = self._get_oidc_provider_and_client_info( provider_id=provider_id, client_id=client_id, client_secret=client_secret, - default_client_grant_types=[DefaultOidcClientGrant.AUTH_CODE_PKCE], + default_client_grant_check=[DefaultOidcClientGrant.AUTH_CODE_PKCE], ) authenticator = OidcAuthCodePkceAuthenticator( client_info=client_info, @@ -466,7 +466,7 @@ def authenticate_oidc_refresh_token( """ provider_id, client_info = self._get_oidc_provider_and_client_info( provider_id=provider_id, client_id=client_id, client_secret=client_secret, - default_client_grant_types=[DefaultOidcClientGrant.REFRESH_TOKEN], + default_client_grant_check=[DefaultOidcClientGrant.REFRESH_TOKEN], ) if refresh_token is None: @@ -495,9 +495,10 @@ def authenticate_oidc_device( .. versionchanged:: 0.5.1 Add :py:obj:`use_pkce` argument """ + _g = DefaultOidcClientGrant # alias for compactness provider_id, client_info = self._get_oidc_provider_and_client_info( provider_id=provider_id, client_id=client_id, client_secret=client_secret, - default_client_grant_types=[DefaultOidcClientGrant.DEVICE_CODE_PKCE], + default_client_grant_check=(lambda grants: _g.DEVICE_CODE in grants or _g.DEVICE_CODE_PKCE in grants), ) authenticator = OidcDeviceAuthenticator(client_info=client_info, use_pkce=use_pkce, **kwargs) return self._authenticate_oidc(authenticator, provider_id=provider_id, store_refresh_token=store_refresh_token) @@ -506,16 +507,20 @@ def authenticate_oidc( self, provider_id: str = None, client_id: Union[str, None] = None, client_secret: Union[str, None] = None, - store_refresh_token: bool = True + store_refresh_token: bool = True, + use_pkce: Union[bool, None] = None, ): """ Do OpenID Connect authentication, first trying refresh tokens and falling back on device code flow. .. versionadded:: 0.6.0 """ + _g = DefaultOidcClientGrant # alias for compactness provider_id, client_info = self._get_oidc_provider_and_client_info( provider_id=provider_id, client_id=client_id, client_secret=client_secret, - default_client_grant_types=[DefaultOidcClientGrant.DEVICE_CODE_PKCE, DefaultOidcClientGrant.REFRESH_TOKEN] + default_client_grant_check=lambda grants: ( + _g.REFRESH_TOKEN in grants and (_g.DEVICE_CODE in grants or _g.DEVICE_CODE_PKCE in grants) + ) ) # Try refresh token first. @@ -539,7 +544,7 @@ def authenticate_oidc( # Fall back on device code flow # TODO: make it possible to do other fallback flows too? _log.info("Trying device code flow.") - authenticator = OidcDeviceAuthenticator(client_info=client_info) + authenticator = OidcDeviceAuthenticator(client_info=client_info, use_pkce=use_pkce) con = self._authenticate_oidc(authenticator, provider_id=provider_id, store_refresh_token=store_refresh_token) print("Authenticated using device code flow.") return con diff --git a/tests/rest/auth/test_oidc.py b/tests/rest/auth/test_oidc.py index fa6dbcbda..e7ab1d725 100644 --- a/tests/rest/auth/test_oidc.py +++ b/tests/rest/auth/test_oidc.py @@ -23,6 +23,9 @@ DEVICE_CODE_POLL_INTERVAL = 2 +# Sentinel object to indicate that a field should be absent. +ABSENT = object() + def handle_request(handler_class, path: str): """ @@ -128,11 +131,31 @@ def test_provider_info_scopes(requests_mock): def test_provider_info_default_client_none(requests_mock): requests_mock.get("https://authit.test/.well-known/openid-configuration", json={}) info = OidcProviderInfo(issuer="https://authit.test") - assert info.get_default_client_id(grant_types=[]) is None - assert info.get_default_client_id(grant_types=[DefaultOidcClientGrant.DEVICE_CODE_PKCE]) is None + assert info.get_default_client_id(grant_check=[]) is None + assert info.get_default_client_id(grant_check=lambda grants: True) is None + + +def test_provider_info_default_client_available_list(requests_mock): + requests_mock.get("https://authit.test/.well-known/openid-configuration", json={}) + default_client = { + "id": "jak4l0v3-45lsdfe3d", + "grant_types": ["urn:ietf:params:oauth:grant-type:device_code+pkce", "refresh_token"] + } + info = OidcProviderInfo(issuer="https://authit.test", default_clients=[default_client]) + # Alias for compactness + g = DefaultOidcClientGrant -def test_provider_info_default_client_available(requests_mock): + assert info.get_default_client_id(grant_check=[]) == "jak4l0v3-45lsdfe3d" + assert info.get_default_client_id(grant_check=[g.DEVICE_CODE_PKCE]) == "jak4l0v3-45lsdfe3d" + assert info.get_default_client_id(grant_check=[g.REFRESH_TOKEN]) == "jak4l0v3-45lsdfe3d" + assert info.get_default_client_id(grant_check=[g.DEVICE_CODE_PKCE, g.REFRESH_TOKEN]) == "jak4l0v3-45lsdfe3d" + + assert info.get_default_client_id(grant_check=[g.IMPLICIT]) is None + assert info.get_default_client_id(grant_check=[g.IMPLICIT, g.REFRESH_TOKEN]) is None + + +def test_provider_info_default_client_available_lambda(requests_mock): requests_mock.get("https://authit.test/.well-known/openid-configuration", json={}) default_client = { "id": "jak4l0v3-45lsdfe3d", @@ -140,16 +163,41 @@ def test_provider_info_default_client_available(requests_mock): } info = OidcProviderInfo(issuer="https://authit.test", default_clients=[default_client]) - assert info.get_default_client_id(grant_types=[]) == "jak4l0v3-45lsdfe3d" - assert info.get_default_client_id(grant_types=[DefaultOidcClientGrant.DEVICE_CODE_PKCE]) == "jak4l0v3-45lsdfe3d" - assert info.get_default_client_id(grant_types=[DefaultOidcClientGrant.REFRESH_TOKEN]) == "jak4l0v3-45lsdfe3d" - assert info.get_default_client_id(grant_types=[ - DefaultOidcClientGrant.DEVICE_CODE_PKCE, DefaultOidcClientGrant.REFRESH_TOKEN - ]) == "jak4l0v3-45lsdfe3d" - assert info.get_default_client_id(grant_types=[DefaultOidcClientGrant.IMPLICIT]) is None - assert info.get_default_client_id(grant_types=[ - DefaultOidcClientGrant.IMPLICIT, DefaultOidcClientGrant.REFRESH_TOKEN - ]) is None + # Alias for compactness + g = DefaultOidcClientGrant + + assert info.get_default_client_id(grant_check=lambda grants: True) == "jak4l0v3-45lsdfe3d" + assert info.get_default_client_id(grant_check=lambda grants: g.REFRESH_TOKEN in grants) == "jak4l0v3-45lsdfe3d" + assert info.get_default_client_id(grant_check=lambda grants: g.DEVICE_CODE_PKCE in grants) == "jak4l0v3-45lsdfe3d" + assert info.get_default_client_id( + grant_check=lambda grants: g.DEVICE_CODE_PKCE in grants and g.REFRESH_TOKEN in grants + ) == "jak4l0v3-45lsdfe3d" + + assert info.get_default_client_id(grant_check=lambda grants: False) is None + assert info.get_default_client_id(grant_check=lambda grants: g.IMPLICIT in grants) is None + assert info.get_default_client_id( + grant_check=lambda grants: g.IMPLICIT in grants and g.REFRESH_TOKEN in grants + ) is None + + assert info.get_default_client_id( + grant_check=lambda grants: g.IMPLICIT in grants or g.REFRESH_TOKEN in grants + ) == "jak4l0v3-45lsdfe3d" + + +def test_provider_info_default_client_invalid_grants(requests_mock, caplog): + requests_mock.get("https://authit.test/.well-known/openid-configuration", json={}) + default_client = { + "id": "jak4l0v3-45lsdfe3d", + "grant_types": ["refresh_token", "nope dis invalid"] + } + info = OidcProviderInfo(issuer="https://authit.test", default_clients=[default_client]) + + # Alias for compactness + g = DefaultOidcClientGrant + + with caplog.at_level(logging.WARNING): + assert info.get_default_client_id(grant_check=[g.REFRESH_TOKEN]) == "jak4l0v3-45lsdfe3d" + assert "Invalid OIDC grant type 'nope dis" in caplog.text @pytest.mark.parametrize( @@ -168,6 +216,31 @@ def test_provider_info_get_scopes_string_refresh_token_offline_access(requests_m assert p.get_scopes_string() == "openid" +def test_oidc_client_info_uess_device_flow_pkce_support(requests_mock): + oidc_discovery_url = "http://oidc.test/.well-known/openid-configuration" + oidc_mock = OidcMock( + requests_mock=requests_mock, + oidc_discovery_url=oidc_discovery_url, + expected_grant_type=None, + ) + provider = OidcProviderInfo(discovery_url=oidc_discovery_url, default_clients=[ + {"id": "c1", "grant_types": ["authorization_code+pkce"]}, + {"id": "c2", "grant_types": ["urn:ietf:params:oauth:grant-type:device_code"]}, + {"id": "c3", "grant_types": ["urn:ietf:params:oauth:grant-type:device_code+pkce"]}, + {"id": "c4", "grant_types": ["refresh_token", "urn:ietf:params:oauth:grant-type:device_code+pkce"]}, + ]) + + for client_id, expected in [ + ("c1", False), + ("c2", False), + ("c3", True), + ("c4", True), + ("foo", False) + ]: + client_info = OidcClientInfo(client_id=client_id, provider=provider) + assert client_info.guess_device_flow_pkce_support() is expected + + class OidcMock: """ Mock object to test OIDC flows @@ -279,8 +352,14 @@ def device_code_callback(self, request: requests_mock.request._RequestObjectProx self.state["user_code"] = random_string(length=6).upper() self.state["scope"] = params["scope"] if "code_challenge" in self.expected_fields: - assert "code_challenge" in params - self.state["code_challenge"] = params["code_challenge"] + expect_code_challenge = self.expected_fields.get("code_challenge") + if expect_code_challenge in [True]: + assert "code_challenge" in params + self.state["code_challenge"] = params["code_challenge"] + elif expect_code_challenge in [False, ABSENT]: + assert "code_challenge" not in params + else: + raise ValueError(expect_code_challenge) return json.dumps({ # TODO: also verification_url (google tweak) "verification_uri": self.provider_root_url + "/dc", @@ -294,12 +373,17 @@ def token_callback_device_code(self, params: dict, context): expected_client_secret = self.expected_fields.get("client_secret") if expected_client_secret: assert params["client_secret"] == expected_client_secret - expect_code_verifier = bool(self.expected_fields.get("code_verifier")) - if expect_code_verifier: + else: + assert "client_secret" not in params + expect_code_verifier = self.expected_fields.get("code_verifier") + if expect_code_verifier in [True]: assert PkceCode.sha256_hash(params["code_verifier"]) == self.state["code_challenge"] self.state["code_verifier"] = params["code_verifier"] - if bool(expected_client_secret) == expect_code_verifier: - pytest.fail("Token callback should either have client secret or PKCE code verifier") + elif expect_code_verifier in [False, None, ABSENT]: + assert "code_verifier" not in params + assert "code_challenge" not in self.state + else: + raise ValueError(expect_code_verifier) assert params["device_code"] == self.state["device_code"] assert params["grant_type"] == "urn:ietf:params:oauth:grant-type:device_code" # Fail with pending/too fast? @@ -368,6 +452,7 @@ def _build_token_response(self, sub="123", name="john", include_id_token=True) - @contextlib.contextmanager def assert_device_code_poll_sleep(): + """Fake sleeping, but check it was called with poll interval.""" with mock.patch("time.sleep") as sleep: yield sleep.assert_called_with(DEVICE_CODE_POLL_INTERVAL) @@ -514,13 +599,7 @@ def test_oidc_device_flow_with_pkce(requests_mock, caplog): ) -@pytest.mark.parametrize(["mode", "use_pkce", "client_secret", "expected_fields"], [ - ("client_secret explicit", False, "$3cr3t", {"scope": "df openid", "client_secret": "$3cr3t"}), - ("PKCE explicit", True, None, {"scope": "df openid", "code_challenge": True, "code_verifier": True}), - ("client_secret autodetect", None, "$3cr3t", {"scope": "df openid", "client_secret": "$3cr3t"}), - ("PKCE autodetect", None, None, {"scope": "df openid", "code_challenge": True, "code_verifier": True}), -]) -def test_oidc_device_flow_auto_detect(requests_mock, caplog, mode, use_pkce, client_secret, expected_fields): +def test_oidc_device_flow_without_pkce_nor_secret(requests_mock, caplog): client_id = "myclient" oidc_discovery_url = "http://oidc.test/.well-known/openid-configuration" oidc_mock = OidcMock( @@ -528,12 +607,89 @@ def test_oidc_device_flow_auto_detect(requests_mock, caplog, mode, use_pkce, cli expected_grant_type="urn:ietf:params:oauth:grant-type:device_code", expected_client_id=client_id, oidc_discovery_url=oidc_discovery_url, - expected_fields=expected_fields, + expected_fields={"scope": "df openid", "code_challenge": ABSENT, "code_verifier": ABSENT}, state={"device_code_callback_timeline": ["authorization_pending", "slow_down", "great success"]}, scopes_supported=["openid", "df"] ) provider = OidcProviderInfo(discovery_url=oidc_discovery_url, scopes=["df"]) display = [] + authenticator = OidcDeviceAuthenticator( + client_info=OidcClientInfo(client_id=client_id, provider=provider), + display=display.append, + ) + with mock.patch.object(openeo.rest.auth.oidc.time, "sleep") as sleep: + with caplog.at_level(logging.INFO): + tokens = authenticator.get_tokens() + assert oidc_mock.state["access_token"] == tokens.access_token + assert re.search( + r"visit https://auth\.test/dc and enter the user code {c!r}".format(c=oidc_mock.state['user_code']), + display[0] + ) + assert display[1] == "Authorized successfully." + assert sleep.mock_calls == [mock.call(2), mock.call(2), mock.call(7)] + assert re.search( + r"Authorization pending\..*Polling too fast, will slow down\..*Authorized successfully\.", + caplog.text, + flags=re.DOTALL + ) + + +@pytest.mark.parametrize(["mode", "client_id", "use_pkce", "client_secret", "expected_fields"], [ + ( + "client_secret, no PKCE", + "myclient", False, "$3cr3t", + {"scope": "df openid", "client_secret": "$3cr3t", "code_challenge": ABSENT, "code_verifier": ABSENT} + ), + ( + "client_secret, auto PKCE", + "myclient", None, "$3cr3t", + {"scope": "df openid", "client_secret": "$3cr3t", "code_challenge": ABSENT, "code_verifier": ABSENT} + ), + ( + "use PKCE", + "myclient", True, None, + {"scope": "df openid", "code_challenge": True, "code_verifier": True} + ), + ( + "auto PKCE", + "myclient", None, None, + {"scope": "df openid", "code_challenge": ABSENT, "code_verifier": ABSENT} + ), + ( + "auto PKCE, default client with PKCE", + "default-with-pkce", None, None, + {"scope": "df openid", "code_challenge": True, "code_verifier": True} + ), + ( + "auto PKCE, default client no PKCE", + "default-no-pkce", None, None, + {"scope": "df openid", "code_challenge": ABSENT, "code_verifier": ABSENT} + ), + ( + "auto PKCE, default client with PKCE, and secret", + "default-with-pkce", None, "$3cr3t", + {"scope": "df openid", "client_secret": "$3cr3t", "code_challenge": ABSENT, "code_verifier": ABSENT} + ), +]) +def test_oidc_device_flow_auto_detect( + requests_mock, caplog, mode, client_id, use_pkce, client_secret, expected_fields +): + """Autodetection of device auth grant mode: with secret, PKCE or neither.""" + oidc_discovery_url = "http://oidc.test/.well-known/openid-configuration" + oidc_mock = OidcMock( + requests_mock=requests_mock, + expected_grant_type="urn:ietf:params:oauth:grant-type:device_code", + expected_client_id=client_id, + oidc_discovery_url=oidc_discovery_url, + expected_fields=expected_fields, + state={"device_code_callback_timeline": ["authorization_pending", "slow_down", "great success"]}, + scopes_supported=["openid", "df"] + ) + provider = OidcProviderInfo(discovery_url=oidc_discovery_url, scopes=["df"], default_clients=[ + {"id": "default-with-pkce", "grant_types": ["urn:ietf:params:oauth:grant-type:device_code+pkce"]}, + {"id": "default-no-pkce", "grant_types": ["urn:ietf:params:oauth:grant-type:device_code"]}, + ]) + display = [] authenticator = OidcDeviceAuthenticator( client_info=OidcClientInfo(client_id=client_id, provider=provider, client_secret=client_secret), display=display.append, diff --git a/tests/rest/test_connection.py b/tests/rest/test_connection.py index 629f71ace..6b6585739 100644 --- a/tests/rest/test_connection.py +++ b/tests/rest/test_connection.py @@ -15,7 +15,7 @@ from openeo.rest.auth.oidc import OidcException from openeo.rest.connection import Connection, RestApiConnection, connect, paginate from .auth.test_cli import auth_config, refresh_token_store -from .auth.test_oidc import OidcMock, assert_device_code_poll_sleep +from .auth.test_oidc import OidcMock, assert_device_code_poll_sleep, ABSENT from .. import load_json_resource API_URL = "https://oeo.test/" @@ -818,7 +818,9 @@ def test_authenticate_oidc_resource_owner_password_credentials_client_from_confi (True, ["openid", "email", "offline_access"], "offline_access openid"), ] ) -def test_authenticate_oidc_device_flow(requests_mock, store_refresh_token, scopes_supported, expected_scopes): +def test_authenticate_oidc_device_flow_with_secret( + requests_mock, store_refresh_token, scopes_supported, expected_scopes +): requests_mock.get(API_URL, json={"api_version": "1.0.0"}) client_id = "myclient" client_secret = "$3cr3t" @@ -859,7 +861,7 @@ def test_authenticate_oidc_device_flow(requests_mock, store_refresh_token, scope assert refresh_token_store.mock_calls == [] -def test_authenticate_oidc_device_flow_client_from_config(requests_mock, auth_config, caplog): +def test_authenticate_oidc_device_flow_client_with_secret_from_config(requests_mock, auth_config, caplog): requests_mock.get(API_URL, json={"api_version": "1.0.0"}) client_id = "myclient" client_secret = "$3cr3t" @@ -924,11 +926,18 @@ def test_authenticate_oidc_device_flow_no_support(requests_mock, auth_config): refresh_token_store = mock.Mock() conn = Connection(API_URL, refresh_token_store=refresh_token_store) assert isinstance(conn.auth, NullAuth) - with pytest.raises(OidcException, match="No support for device code flow"): + with pytest.raises(OidcException, match="No support for device authorization grant"): conn.authenticate_oidc_device() -def test_authenticate_oidc_device_flow_multiple_providers_no_given(requests_mock, auth_config, caplog): +@pytest.mark.parametrize(["use_pkce", "expect_pkce"], [ + (None, False), + (True, True), + (False, False), +]) +def test_authenticate_oidc_device_flow_multiple_providers_no_given( + requests_mock, auth_config, caplog, use_pkce, expect_pkce +): """OIDC device flow + PKCE with multiple OIDC providers and none specified to use.""" requests_mock.get(API_URL, json={"api_version": "1.0.0"}) client_id = "myclient" @@ -943,7 +952,9 @@ def test_authenticate_oidc_device_flow_multiple_providers_no_given(requests_mock expected_grant_type="urn:ietf:params:oauth:grant-type:device_code", expected_client_id=client_id, expected_fields={ - "scope": "openid w", "code_verifier": True, "code_challenge": True + "scope": "openid w", + "code_verifier": True if expect_pkce else ABSENT, + "code_challenge": True if expect_pkce else ABSENT, }, scopes_supported=["openid", "w"], oidc_discovery_url="https://fauth.test/.well-known/openid-configuration", @@ -957,14 +968,21 @@ def test_authenticate_oidc_device_flow_multiple_providers_no_given(requests_mock assert isinstance(conn.auth, NullAuth) oidc_mock.state["device_code_callback_timeline"] = ["great success"] with assert_device_code_poll_sleep(): - conn.authenticate_oidc_device(client_id=client_id) + conn.authenticate_oidc_device(client_id=client_id, use_pkce=use_pkce) assert isinstance(conn.auth, BearerAuth) assert conn.auth.bearer == 'oidc/fauth/' + oidc_mock.state["access_token"] assert refresh_token_store.mock_calls == [] assert "No OIDC provider given. Using first provider 'fauth' as advertised by backend." in caplog.text -def test_authenticate_oidc_device_flow_multiple_provider_one_config_no_given(requests_mock, auth_config, caplog): +@pytest.mark.parametrize(["use_pkce", "expect_pkce"], [ + (None, False), + (True, True), + (False, False), +]) +def test_authenticate_oidc_device_flow_multiple_provider_one_config_no_given( + requests_mock, auth_config, caplog, use_pkce, expect_pkce +): """OIDC device flow + PKCE with multiple OIDC providers, one in config and none specified to use.""" requests_mock.get(API_URL, json={"api_version": "1.0.0"}) client_id = "myclient" @@ -979,7 +997,9 @@ def test_authenticate_oidc_device_flow_multiple_provider_one_config_no_given(req expected_grant_type="urn:ietf:params:oauth:grant-type:device_code", expected_client_id=client_id, expected_fields={ - "scope": "openid", "code_verifier": True, "code_challenge": True + "scope": "openid", + "code_verifier": True if expect_pkce else ABSENT, + "code_challenge": True if expect_pkce else ABSENT, }, scopes_supported=["openid"], oidc_discovery_url="https://fauth.test/.well-known/openid-configuration", @@ -994,7 +1014,7 @@ def test_authenticate_oidc_device_flow_multiple_provider_one_config_no_given(req assert isinstance(conn.auth, NullAuth) oidc_mock.state["device_code_callback_timeline"] = ["great success"] with assert_device_code_poll_sleep(): - conn.authenticate_oidc_device() + conn.authenticate_oidc_device(use_pkce=use_pkce) assert isinstance(conn.auth, BearerAuth) assert conn.auth.bearer == 'oidc/fauth/' + oidc_mock.state["access_token"] assert refresh_token_store.mock_calls == [] @@ -1045,6 +1065,55 @@ def test_authenticate_oidc_device_flow_multiple_provider_one_config_no_given_def assert refresh_token_store.mock_calls == [] +@pytest.mark.parametrize(["grant_types", "use_pkce", "expect_pkce"], [ + (["urn:ietf:params:oauth:grant-type:device_code+pkce"], True, True), + (["urn:ietf:params:oauth:grant-type:device_code+pkce"], None, True), + (["urn:ietf:params:oauth:grant-type:device_code+pkce", "refresh_token"], None, True), + (["urn:ietf:params:oauth:grant-type:device_code"], None, False), + (["urn:ietf:params:oauth:grant-type:device_code"], False, False), +]) +def test_authenticate_oidc_device_flow_default_client_handling(requests_mock, grant_types, use_pkce, expect_pkce): + """ + OIDC device authn grant + secret/PKCE/neither: default client grant_types handling + """ + requests_mock.get(API_URL, json={"api_version": "1.0.0"}) + default_client_id = "dadefaultklient" + requests_mock.get(API_URL + 'credentials/oidc', json={ + "providers": [ + { + "id": "auth", "issuer": "https://auth.test", "title": "Auth", "scopes": ["openid"], + "default_clients": [{"id": default_client_id, "grant_types": grant_types}] + }, + ] + }) + + expected_fields = { + "scope": "openid", + } + if expect_pkce: + expected_fields["code_verifier"] = True + expected_fields["code_challenge"] = True + oidc_mock = OidcMock( + requests_mock=requests_mock, + expected_grant_type="urn:ietf:params:oauth:grant-type:device_code", + expected_client_id=default_client_id, + expected_fields=expected_fields, + scopes_supported=["openid"], + oidc_discovery_url="https://auth.test/.well-known/openid-configuration", + ) + + # With all this set up, kick off the openid connect flow + refresh_token_store = mock.Mock() + conn = Connection(API_URL, refresh_token_store=refresh_token_store) + assert isinstance(conn.auth, NullAuth) + oidc_mock.state["device_code_callback_timeline"] = ["great success"] + with assert_device_code_poll_sleep(): + conn.authenticate_oidc_device(use_pkce=use_pkce) + assert isinstance(conn.auth, BearerAuth) + assert conn.auth.bearer == 'oidc/auth/' + oidc_mock.state["access_token"] + assert refresh_token_store.mock_calls == [] + + def test_authenticate_oidc_refresh_token(requests_mock): requests_mock.get(API_URL, json={"api_version": "1.0.0"}) client_id = "myclient" @@ -1131,7 +1200,12 @@ def test_authenticate_oidc_auto_with_existing_refresh_token(requests_mock, refre assert [r["grant_type"] for r in oidc_mock.grant_request_history] == ["refresh_token"] -def test_authenticate_oidc_auto_no_existing_refresh_token(requests_mock, refresh_token_store): +@pytest.mark.parametrize(["use_pkce", "expect_pkce"], [ + (None, False), + (True, True), + (False, False), +]) +def test_authenticate_oidc_auto_no_existing_refresh_token(requests_mock, refresh_token_store, use_pkce, expect_pkce): requests_mock.get(API_URL, json={"api_version": "1.0.0"}) client_id = "myclient" issuer = "https://oidc.test" @@ -1148,7 +1222,8 @@ def test_authenticate_oidc_auto_no_existing_refresh_token(requests_mock, refresh expected_fields={ "refresh_token": "unkn0wn", "scope": "openid", - "code_verifier": True, "code_challenge": True + "code_verifier": True if expect_pkce else ABSENT, + "code_challenge": True if expect_pkce else ABSENT, } ) @@ -1157,7 +1232,7 @@ def test_authenticate_oidc_auto_no_existing_refresh_token(requests_mock, refresh assert isinstance(conn.auth, NullAuth) oidc_mock.state["device_code_callback_timeline"] = ["great success"] with assert_device_code_poll_sleep(): - conn.authenticate_oidc(client_id=client_id) + conn.authenticate_oidc(client_id=client_id, use_pkce=use_pkce) assert isinstance(conn.auth, BearerAuth) assert conn.auth.bearer == 'oidc/oi/' + oidc_mock.state["access_token"] assert [r["grant_type"] for r in oidc_mock.grant_request_history] == [ @@ -1165,7 +1240,12 @@ def test_authenticate_oidc_auto_no_existing_refresh_token(requests_mock, refresh ] -def test_authenticate_oidc_auto_expired_refresh_token(requests_mock, refresh_token_store): +@pytest.mark.parametrize(["use_pkce", "expect_pkce"], [ + (None, False), + (True, True), + (False, False), +]) +def test_authenticate_oidc_auto_expired_refresh_token(requests_mock, refresh_token_store, use_pkce, expect_pkce): requests_mock.get(API_URL, json={"api_version": "1.0.0"}) client_id = "myclient" issuer = "https://oidc.test" @@ -1182,7 +1262,8 @@ def test_authenticate_oidc_auto_expired_refresh_token(requests_mock, refresh_tok expected_fields={ "refresh_token": "unkn0wn", "scope": "openid", - "code_verifier": True, "code_challenge": True + "code_verifier": True if expect_pkce else ABSENT, + "code_challenge": True if expect_pkce else ABSENT, } ) refresh_token_store.set_refresh_token(issuer=issuer, client_id=client_id, refresh_token="0ld.t0k3n") @@ -1192,7 +1273,7 @@ def test_authenticate_oidc_auto_expired_refresh_token(requests_mock, refresh_tok assert isinstance(conn.auth, NullAuth) oidc_mock.state["device_code_callback_timeline"] = ["great success"] with assert_device_code_poll_sleep(): - conn.authenticate_oidc(client_id=client_id) + conn.authenticate_oidc(client_id=client_id, use_pkce=use_pkce) assert isinstance(conn.auth, BearerAuth) assert conn.auth.bearer == 'oidc/oi/' + oidc_mock.state["access_token"] assert [r["grant_type"] for r in oidc_mock.grant_request_history] == [