diff --git a/pyproject.toml b/pyproject.toml index 26d6959b..1b503f55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta" [metadata] name = "idpyoidc" -version = "1.1.1" +version = "1.2.0" author = "Roland Hedberg" author_email = "roland@catalogix.se" description = "Everything OAuth2 and OIDC" diff --git a/src/idpyoidc/__init__.py b/src/idpyoidc/__init__.py index 29e24514..3f84ad51 100644 --- a/src/idpyoidc/__init__.py +++ b/src/idpyoidc/__init__.py @@ -1,5 +1,5 @@ __author__ = "Roland Hedberg" -__version__ = "1.1.1" +__version__ = "1.2.0" import os from typing import Dict diff --git a/src/idpyoidc/client/client_auth.py b/src/idpyoidc/client/client_auth.py index a9efd464..258f7c23 100755 --- a/src/idpyoidc/client/client_auth.py +++ b/src/idpyoidc/client/client_auth.py @@ -302,11 +302,13 @@ def construct(self, request=None, service=None, http_args=None, **kwargs): if service.service_name == "refresh_token": _acc_token = find_token(request, "refresh_token", service, **kwargs) + elif service.service_name == "token_exchange": + _acc_token = find_token(request, "subject_token", service, **kwargs) else: _acc_token = find_token(request, "access_token", service, **kwargs) if not _acc_token: - raise KeyError("No access or refresh token available") + raise KeyError("No bearer token available") # The authorization value starts with 'Bearer' when bearer tokens # are used diff --git a/src/idpyoidc/client/defaults.py b/src/idpyoidc/client/defaults.py index f029efd4..3237cd61 100644 --- a/src/idpyoidc/client/defaults.py +++ b/src/idpyoidc/client/defaults.py @@ -18,7 +18,7 @@ } DEFAULT_OAUTH2_SERVICES = { - "discovery": {"class": "idpyoidc.client.oauth2.provider_info_discovery.ProviderInfoDiscovery"}, + "discovery": {"class": "idpyoidc.client.oauth2.server_metadata.ServerMetadata"}, "authorization": {"class": "idpyoidc.client.oauth2.authorization.Authorization"}, "access_token": {"class": "idpyoidc.client.oauth2.access_token.AccessToken"}, "refresh_access_token": { diff --git a/src/idpyoidc/client/oauth2/provider_info_discovery.py b/src/idpyoidc/client/oauth2/server_metadata.py similarity index 96% rename from src/idpyoidc/client/oauth2/provider_info_discovery.py rename to src/idpyoidc/client/oauth2/server_metadata.py index 7b9ff584..bf32700e 100644 --- a/src/idpyoidc/client/oauth2/provider_info_discovery.py +++ b/src/idpyoidc/client/oauth2/server_metadata.py @@ -12,14 +12,14 @@ LOGGER = logging.getLogger(__name__) -class ProviderInfoDiscovery(Service): - """The service that talks to the OAuth2 provider info discovery endpoint.""" +class ServerMetadata(Service): + """The service that talks to the OAuth2 server metadata endpoint.""" msg_type = oauth2.Message response_cls = oauth2.ASConfigurationResponse error_msg = ResponseMessage synchronous = True - service_name = "provider_info" + service_name = "server_metadata" http_method = "GET" metadata_attributes = {} diff --git a/src/idpyoidc/client/oauth2/token_exchange.py b/src/idpyoidc/client/oauth2/token_exchange.py new file mode 100644 index 00000000..f583ac7a --- /dev/null +++ b/src/idpyoidc/client/oauth2/token_exchange.py @@ -0,0 +1,74 @@ +"""Implements the service that can exchange one token for another.""" +import logging + +from idpyoidc.client.oauth2.utils import get_state_parameter +from idpyoidc.client.service import Service +from idpyoidc.exception import MissingParameter +from idpyoidc.exception import MissingRequiredAttribute +from idpyoidc.message import oauth2 +from idpyoidc.message.oauth2 import ResponseMessage +from idpyoidc.time_util import time_sans_frac + +LOGGER = logging.getLogger(__name__) + + +class TokenExchange(Service): + """The token exchange service.""" + + msg_type = oauth2.TokenExchangeRequest + response_cls = oauth2.TokenExchangeResponse + error_msg = ResponseMessage + endpoint_name = "token_endpoint" + synchronous = True + service_name = "token_exchange" + default_authn_method = "client_secret_basic" + http_method = "POST" + request_body_type = "urlencoded" + response_body_type = "json" + + + def __init__(self, client_get, conf=None): + Service.__init__(self, client_get, conf=conf) + self.pre_construct.append(self.oauth_pre_construct) + + def update_service_context(self, resp, key="", **kwargs): + if "expires_in" in resp: + resp["__expires_at"] = time_sans_frac() + int(resp["expires_in"]) + self.client_get("service_context").state.store_item(resp, "token_response", key) + + def oauth_pre_construct(self, request_args=None, post_args=None, **kwargs): + """ + + :param request_args: Initial set of request arguments + :param kwargs: Extra keyword arguments + :return: Request arguments + """ + if request_args is None: + request_args = {} + + if 'subject_token' not in request_args: + try: + _key = get_state_parameter(request_args, kwargs) + except MissingParameter: + raise MissingRequiredAttribute("subject_token") + + parameters = {'access_token', 'scope'} + + _state = self.client_get("service_context").state + + _args = _state.extend_request_args( + {}, oauth2.AuthorizationResponse, "auth_response", _key, parameters + ) + _args = _state.extend_request_args( + _args, oauth2.AccessTokenResponse, "token_response", _key, parameters + ) + _args = _state.extend_request_args( + _args, oauth2.AccessTokenResponse, "refresh_token_response", _key, parameters + ) + + request_args["subject_token"] = _args["access_token"] + request_args["subject_token_type"] = 'urn:ietf:params:oauth:token-type:access_token' + if 'scope' not in request_args and "scope" in _args: + request_args["scope"] = _args["scope"] + + return request_args, post_args diff --git a/src/idpyoidc/client/oidc/provider_info_discovery.py b/src/idpyoidc/client/oidc/provider_info_discovery.py index a4f98778..6caa4370 100644 --- a/src/idpyoidc/client/oidc/provider_info_discovery.py +++ b/src/idpyoidc/client/oidc/provider_info_discovery.py @@ -1,7 +1,7 @@ import logging from idpyoidc.client.exception import ConfigurationError -from idpyoidc.client.oauth2 import provider_info_discovery +from idpyoidc.client.oauth2 import server_metadata from idpyoidc.message import oidc from idpyoidc.message.oauth2 import ResponseMessage @@ -61,14 +61,16 @@ def add_redirect_uris(request_args, service=None, **kwargs): return request_args, {} -class ProviderInfoDiscovery(provider_info_discovery.ProviderInfoDiscovery): +class ProviderInfoDiscovery(server_metadata.ServerMetadata): msg_type = oidc.Message response_cls = oidc.ProviderConfigurationResponse error_msg = ResponseMessage + service_name = "provider_info" + metadata_attributes = {} def __init__(self, client_get, conf=None): - provider_info_discovery.ProviderInfoDiscovery.__init__(self, client_get, conf=conf) + server_metadata.ServerMetadata.__init__(self, client_get, conf=conf) def update_service_context(self, resp, **kwargs): _context = self.client_get("service_context") diff --git a/src/idpyoidc/client/service.py b/src/idpyoidc/client/service.py index 7e0eee23..66b20dc4 100644 --- a/src/idpyoidc/client/service.py +++ b/src/idpyoidc/client/service.py @@ -16,14 +16,13 @@ from idpyoidc.message.oauth2 import ResponseMessage from idpyoidc.message.oauth2 import is_error_message from idpyoidc.util import importer - -from ..constant import JOSE_ENCODED -from ..constant import JSON_ENCODED -from ..constant import URL_ENCODED from .configure import Configuration from .exception import ResponseError from .util import get_http_body from .util import get_http_url +from ..constant import JOSE_ENCODED +from ..constant import JSON_ENCODED +from ..constant import URL_ENCODED __author__ = "Roland Hedberg" @@ -452,6 +451,7 @@ def get_request_parameters( content_type = JSON_ENCODED _info["body"] = get_http_body(request, content_type) + _headers.update({"Content-Type": content_type}) if _headers: @@ -655,7 +655,7 @@ def construct_uris(self, base_url, hex): uri = self.usage_to_uri_map.get(usage) if uri and uri not in self.metadata: self.metadata[uri] = self.get_uri(base_url, self.callback_path[uri], - hex) + hex) def get_metadata(self, attribute, default=None): try: diff --git a/src/idpyoidc/server/oauth2/server_metadata.py b/src/idpyoidc/server/oauth2/server_metadata.py new file mode 100755 index 00000000..ccc1922c --- /dev/null +++ b/src/idpyoidc/server/oauth2/server_metadata.py @@ -0,0 +1,37 @@ +import logging + +from idpyoidc.message import oauth2 + +from idpyoidc.message import oidc +from idpyoidc.server.endpoint import Endpoint + +logger = logging.getLogger(__name__) + + +class ServerMetadata(Endpoint): + request_cls = oauth2.Message + response_cls = oauth2.ASConfigurationResponse + request_format = "" + response_format = "json" + name = "server_metadata" + + def __init__(self, server_get, **kwargs): + Endpoint.__init__(self, server_get=server_get, **kwargs) + self.pre_construct.append(self.add_endpoints) + + def add_endpoints(self, request, client_id, endpoint_context, **kwargs): + for endpoint in [ + "authorization_endpoint", + "registration_endpoint", + "token_endpoint", + "userinfo_endpoint", + "end_session_endpoint", + ]: + endp_instance = self.server_get("endpoint", endpoint) + if endp_instance: + request[endpoint] = endp_instance.endpoint_path + + return request + + def process_request(self, request=None, **kwargs): + return {"response_args": self.server_get("endpoint_context").provider_info} diff --git a/src/idpyoidc/server/oauth2/token.py b/src/idpyoidc/server/oauth2/token.py index 1600db92..08b050aa 100755 --- a/src/idpyoidc/server/oauth2/token.py +++ b/src/idpyoidc/server/oauth2/token.py @@ -13,6 +13,7 @@ from idpyoidc.server.exception import ProcessError from idpyoidc.server.oauth2.token_helper import AccessTokenHelper from idpyoidc.server.oauth2.token_helper import RefreshTokenHelper +from idpyoidc.server.session import MintingNotAllowed from idpyoidc.server.session.token import TOKEN_TYPES_MAPPING from idpyoidc.util import importer @@ -121,6 +122,8 @@ def process_request(self, request: Optional[Union[Message, dict]] = None, **kwar ) except JWEException as err: return self.error_cls(error="invalid_request", error_description="%s" % err) + except MintingNotAllowed as err: + return self.error_cls(error="invalid_request", error_description="%s" % err) if isinstance(response_args, ResponseMessage): return response_args diff --git a/tests/test_client_40_dpop.py b/tests/test_client_40_dpop.py index 1929d96a..906aa266 100644 --- a/tests/test_client_40_dpop.py +++ b/tests/test_client_40_dpop.py @@ -88,7 +88,7 @@ def create_client(self): services = { "discovery": { - "class": "idpyoidc.client.oauth2.provider_info_discovery.ProviderInfoDiscovery" + "class": "idpyoidc.client.oauth2.server_metadata.ServerMetadata" }, "authorization": {"class": "idpyoidc.client.oauth2.authorization.Authorization"}, "access_token": {"class": "idpyoidc.client.oauth2.access_token.AccessToken"}, diff --git a/tests/test_client_55_token_exchange.py b/tests/test_client_55_token_exchange.py new file mode 100644 index 00000000..707dd14d --- /dev/null +++ b/tests/test_client_55_token_exchange.py @@ -0,0 +1,92 @@ +import os + +from cryptojwt.key_jar import init_key_jar +import pytest + +from idpyoidc.client.entity import Entity +from idpyoidc.message import Message +from idpyoidc.message.oauth2 import AccessTokenResponse +from idpyoidc.message.oauth2 import AuthorizationResponse +from idpyoidc.message.oidc import IdToken +from tests.test_client_21_oidc_service import make_keyjar + +KEYSPEC = [ + {"type": "RSA", "use": ["sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]}, +] + +_dirname = os.path.dirname(os.path.abspath(__file__)) + +ISS = "https://example.com" + +ISS_KEY = init_key_jar( + public_path="{}/pub_iss.jwks".format(_dirname), + private_path="{}/priv_iss.jwks".format(_dirname), + key_defs=KEYSPEC, + issuer_id=ISS, + read_only=False, +) + +ISS_KEY.import_jwks_as_json(open("{}/pub_client.jwks".format(_dirname)).read(), "client_id") + + +def create_jws(val): + lifetime = 3600 + + idts = IdToken(**val) + + return idts.to_jwt( + key=ISS_KEY.get_signing_key("ec", issuer_id=ISS), algorithm="ES256", lifetime=lifetime + ) + + +class TestUserInfo(object): + @pytest.fixture(autouse=True) + def create_request(self): + self._iss = ISS + client_config = { + "client_id": "client_id", + "client_secret": "a longesh password", + "redirect_uris": ["https://example.com/cli/authz_cb"], + "issuer": self._iss, + "requests_dir": "requests", + "base_url": "https://example.com/cli/", + } + entity = Entity(keyjar=make_keyjar(), config=client_config, + services={ + "discovery": { + "class": + "idpyoidc.client.oauth2.server_metadata.ServerMetadata"}, + "authorization": { + "class": "idpyoidc.client.oauth2.authorization.Authorization"}, + "access_token": { + "class": "idpyoidc.client.oauth2.access_token.AccessToken"}, + "token_exchange": { + "class": + "idpyoidc.client.oauth2.token_exchange.TokenExchange" + }, + } + ) + entity.client_get("service_context").issuer = "https://example.com" + self.service = entity.client_get("service", "token_exchange") + + _state_interface = self.service.client_get("service_context").state + # Add history + auth_response = AuthorizationResponse(code="access_code").to_json() + _state_interface.store_item(auth_response, "auth_response", "abcde") + + idtval = {"nonce": "KUEYfRM2VzKDaaKD", "sub": "diana", "iss": ISS, "aud": "client_id"} + idt = create_jws(idtval) + + ver_idt = IdToken().from_jwt(idt, make_keyjar()) + + token_response = AccessTokenResponse( + access_token="access_token", id_token=idt, __verified_id_token=ver_idt + ).to_json() + _state_interface.store_item(token_response, "token_response", "abcde") + + def test_construct(self): + _req = self.service.construct(state="abcde") + assert isinstance(_req, Message) + assert len(_req) == 2 + assert "subject_token" in _req diff --git a/tests/test_server_24_oauth2_token_endpoint.py b/tests/test_server_24_oauth2_token_endpoint.py index 0e6d52db..51f0742b 100644 --- a/tests/test_server_24_oauth2_token_endpoint.py +++ b/tests/test_server_24_oauth2_token_endpoint.py @@ -689,8 +689,9 @@ def test_do_refresh_access_token_not_allowed(self): _request = REFRESH_TOKEN_REQ.copy() _request["refresh_token"] = _resp["response_args"]["refresh_token"] _req = self.token_endpoint.parse_request(_request.to_json()) - with pytest.raises(MintingNotAllowed): - self.token_endpoint.process_request(_req) + res = self.token_endpoint.process_request(_req) + assert "error" in res + assert res["error_description"] == 'Minting of access_token not supported' def test_do_refresh_access_token_revoked(self): areq = AUTH_REQ.copy() diff --git a/tests/test_server_35_oidc_token_endpoint.py b/tests/test_server_35_oidc_token_endpoint.py index 1ce18887..10dc34e3 100755 --- a/tests/test_server_35_oidc_token_endpoint.py +++ b/tests/test_server_35_oidc_token_endpoint.py @@ -905,8 +905,8 @@ def test_do_refresh_access_token_not_allowed(self): _request = REFRESH_TOKEN_REQ.copy() _request["refresh_token"] = _resp["response_args"]["refresh_token"] _req = self.token_endpoint.parse_request(_request.to_urlencoded()) - with pytest.raises(MintingNotAllowed): - self.token_endpoint.process_request(_req) + res = self.token_endpoint.process_request(_req) + assert "error" in res def test_do_refresh_access_token_revoked(self): areq = AUTH_REQ.copy() diff --git a/tests/test_tandem_10_token_exchange.py b/tests/test_tandem_10_token_exchange.py new file mode 100644 index 00000000..bf2c2649 --- /dev/null +++ b/tests/test_tandem_10_token_exchange.py @@ -0,0 +1,689 @@ +import json +import os + +from cryptojwt.key_jar import build_keyjar +import pytest + +from idpyoidc.client.oauth2 import Client +from idpyoidc.message.oauth2 import is_error_message +from idpyoidc.message.oidc import AccessTokenRequest +from idpyoidc.message.oidc import AuthorizationRequest +from idpyoidc.message.oidc import RefreshAccessTokenRequest +from idpyoidc.server import Server +from idpyoidc.server.authz import AuthzHandling +from idpyoidc.server.client_authn import verify_client +from idpyoidc.server.configure import ASConfiguration +from idpyoidc.server.cookie_handler import CookieHandler +from idpyoidc.server.user_authn.authn_context import INTERNETPROTOCOLPASSWORD +from idpyoidc.server.user_info import UserInfo +from idpyoidc.util import rndstr +from tests import CRYPT_CONFIG +from tests import SESSION_PARAMS + +KEYDEFS = [ + {"type": "RSA", "key": "", "use": ["sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]}, +] + +CLIENT_KEYJAR = build_keyjar(KEYDEFS) + +COOKIE_KEYDEFS = [ + {"type": "oct", "kid": "sig", "use": ["sig"]}, + {"type": "oct", "kid": "enc", "use": ["enc"]}, +] + +RESPONSE_TYPES_SUPPORTED = [ + ["code"], + ["token"], + ["id_token"], + ["code", "token"], + ["code", "id_token"], + ["id_token", "token"], + ["code", "token", "id_token"], + ["none"], +] + +CAPABILITIES = { + "subject_types_supported": ["public", "pairwise", "ephemeral"], + "grant_types_supported": [ + "authorization_code", + "implicit", + "urn:ietf:params:oauth:grant-type:jwt-bearer", + "refresh_token", + ], +} + +AUTH_REQ = AuthorizationRequest( + client_id="client_1", + redirect_uri="https://example.com/cb", + scope=["openid"], + state="STATE", + response_type="code", +) + +TOKEN_REQ = AccessTokenRequest( + client_id="client_1", + redirect_uri="https://example.com/cb", + state="STATE", + grant_type="authorization_code", + client_secret="hemligt", +) + +REFRESH_TOKEN_REQ = RefreshAccessTokenRequest( + grant_type="refresh_token", client_id="https://example.com/", client_secret="hemligt" +) + +TOKEN_REQ_DICT = TOKEN_REQ.to_dict() + +BASEDIR = os.path.abspath(os.path.dirname(__file__)) + + +def full_path(local_file): + return os.path.join(BASEDIR, local_file) + + +USERINFO = UserInfo(json.loads(open(full_path("users.json")).read())) + +_OAUTH2_SERVICES = { + "metadata": {"class": "idpyoidc.client.oauth2.server_metadata.ServerMetadata"}, + "authorization": {"class": "idpyoidc.client.oauth2.authorization.Authorization"}, + "access_token": {"class": "idpyoidc.client.oauth2.access_token.AccessToken"}, + "refresh_access_token": { + "class": "idpyoidc.client.oauth2.refresh_access_token.RefreshAccessToken" + }, + "token_exchange": { + "class": "idpyoidc.client.oauth2.token_exchange.TokenExchange" + } +} + + +class TestEndpoint(object): + @pytest.fixture(autouse=True) + def create_endpoint(self): + server_conf = { + "issuer": "https://example.com/", + "httpc_params": {"verify": False, "timeout": 1}, + "capabilities": CAPABILITIES, + "cookie_handler": { + "class": CookieHandler, + "kwargs": {"keys": {"key_defs": COOKIE_KEYDEFS}}, + }, + "keys": {"uri_path": "jwks.json", "key_defs": KEYDEFS}, + "endpoint": { + "provider_config": { + "path": ".well-known/openid-configuration", + "class": "idpyoidc.server.oauth2.server_metadata.ServerMetadata", + "kwargs": {}, + }, + "authorization": { + "path": "authorization", + "class": "idpyoidc.server.oauth2.authorization.Authorization", + "kwargs": {}, + }, + "token": { + "path": "token", + "class": "idpyoidc.server.oidc.token.Token", + "kwargs": { + "client_authn_method": [ + "client_secret_basic", + "client_secret_post", + "client_secret_jwt", + "private_key_jwt", + ], + }, + }, + }, + "authentication": { + "anon": { + "acr": INTERNETPROTOCOLPASSWORD, + "class": "idpyoidc.server.user_authn.user.NoAuthn", + "kwargs": {"user": "diana"}, + } + }, + "userinfo": {"class": UserInfo, "kwargs": {"db": {}}}, + "client_authn": verify_client, + "template_dir": "template", + "authz": { + "class": AuthzHandling, + "kwargs": { + "grant_config": { + "usage_rules": { + "authorization_code": { + "supports_minting": ["access_token", "refresh_token"], + "max_usage": 1, + }, + "access_token": { + "supports_minting": ["access_token", "refresh_token"], + "expires_in": 600, + }, + "refresh_token": { + "supports_minting": ["access_token"], + "audience": ["https://example.com", "https://example2.com"], + "expires_in": 43200, + }, + }, + "expires_in": 43200, + } + }, + }, + "token_handler_args": { + "jwks_file": "private/token_jwks.json", + "code": {"lifetime": 600, "kwargs": {"crypt_conf": CRYPT_CONFIG}}, + "token": { + "class": "idpyoidc.server.token.jwt_token.JWTToken", + "kwargs": { + "lifetime": 3600, + "add_claims_by_scope": True, + "aud": ["https://example.org/appl"], + }, + }, + "refresh": { + "class": "idpyoidc.server.token.jwt_token.JWTToken", + "kwargs": { + "lifetime": 3600, + "aud": ["https://example.org/appl"], + }, + }, + }, + "session_params": SESSION_PARAMS, + } + self.server = Server(ASConfiguration(conf=server_conf, base_path=BASEDIR), cwd=BASEDIR) + + client_1_config = { + "issuer": server_conf["issuer"], + "client_secret": "hemligt", + "client_id": "client_1", + "redirect_uris": ["https://example.com/cb"], + "client_salt": "salted", + "token_endpoint_auth_method": "client_secret_post", + "response_types": ["code", "token", "code id_token", "id_token"], + "allowed_scopes": ["openid", "profile", "offline_access"], + } + client_2_config = { + "issuer": server_conf["issuer"], + "client_id": "client_2", + "client_secret": "hemligt", + "redirect_uris": ["https://example.com/cb"], + "client_salt": "salted", + "token_endpoint_auth_method": "client_secret_post", + "response_types": ["code", "token", "code id_token", "id_token"], + "allowed_scopes": ["openid", "profile", "offline_access"], + } + self.client_1 = Client(client_type='oauth2', config=client_1_config, + keyjar=build_keyjar(KEYDEFS), + services=_OAUTH2_SERVICES) + self.client_2 = Client(client_type='oauth2', config=client_2_config, + keyjar=build_keyjar(KEYDEFS), + services=_OAUTH2_SERVICES) + + self.endpoint_context = self.server.endpoint_context + self.endpoint_context.cdb["client_1"] = client_1_config + self.endpoint_context.cdb["client_2"] = client_2_config + self.endpoint_context.keyjar.import_jwks( + self.client_1.get_service_context().keyjar.export_jwks(), "client_1") + self.endpoint_context.keyjar.import_jwks( + self.client_2.get_service_context().keyjar.export_jwks(), "client_2") + + # self.endpoint = self.server.server_get("endpoint", "token") + # self.introspection_endpoint = self.server.server_get("endpoint", "introspection") + self.session_manager = self.endpoint_context.session_manager + self.user_id = "diana" + + def do_query(self, service_type, endpoint_type, request_args, state): + _client = self.client_1.get_service(service_type) + req_info = _client.get_request_parameters(request_args=request_args) + + areq = req_info.get("request") + headers = req_info.get("headers") + + _server = self.server.get_endpoint(endpoint_type) + if areq: + if headers: + argv = {"http_info": {"headers": headers}} + else: + argv = {} + areq.lax = True + _pr_resp = _server.parse_request(areq.to_urlencoded(), **argv) + else: + _pr_resp = _server.parse_request(areq) + + if is_error_message(_pr_resp): + return areq, _pr_resp + + _resp = _server.process_request(_pr_resp) + if is_error_message(_resp): + return areq, _resp + + _response = _server.do_response(**_resp) + + resp = _client.parse_response(_response["response"]) + _client.update_service_context(_resp["response_args"], key=state) + return areq, resp + + def process_setup(self, token=None, scope=None): + # ***** Discovery ********* + + _req, _resp = self.do_query('server_metadata', 'server_metadata', {}, '') + + # ***** Authorization Request ********** + _nonce = rndstr(24), + _context = self.client_1.get_service_context() + # Need a new state for a new authorization request + _state = _context.state.create_state(_context.get("issuer")) + _context.state.store_nonce2state(_nonce, _state) + + req_args = { + "response_type": ["code"], + "nonce": _nonce, + "state": _state + } + + if scope: + _scope = scope + else: + _scope = ["openid"] + + if token and list(token.keys())[0] == "refresh_token": + _scope = ["openid", "offline_access"] + + req_args["scope"] = _scope + + areq, auth_response = self.do_query('authorization', 'authorization', req_args, _state) + + # ***** Token Request ********** + + req_args = { + "code": auth_response["code"], + "state": auth_response["state"], + "redirect_uri": areq["redirect_uri"], + "grant_type": "authorization_code", + "client_id": self.client_1.get_client_id(), + "client_secret": _context.get("client_secret"), + } + + _token_request, resp = self.do_query("accesstoken", 'token', req_args, _state) + + return resp, _state, _scope + + @pytest.mark.parametrize( + "token", + [ + {"access_token": "urn:ietf:params:oauth:token-type:access_token"}, + {"refresh_token": "urn:ietf:params:oauth:token-type:refresh_token"}, + ], + ) + def test_token_exchange(self, token): + """ + Test that token exchange requests work correctly + """ + + resp, _state, _scope = self.process_setup(token) + + # ****** Token Exchange Request ********** + + req_args = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "requested_token_type": token[list(token.keys())[0]], + "subject_token": resp["access_token"], + "subject_token_type": 'urn:ietf:params:oauth:token-type:access_token', + "state": _state + } + + _token_exchange_request, _te_resp = self.do_query("token_exchange", "token", req_args, + _state) + + assert set(_te_resp.keys()) == { + "access_token", + "token_type", + "scope", + "expires_in", + "issued_token_type", + } + + assert _te_resp["issued_token_type"] == list(token.keys())[0] + assert _te_resp["scope"] == _scope + + @pytest.mark.parametrize( + "token", + [ + {"access_token": "urn:ietf:params:oauth:token-type:access_token"}, + {"refresh_token": "urn:ietf:params:oauth:token-type:refresh_token"}, + ], + ) + def test_token_exchange_per_client(self, token): + """ + Test that per-client token exchange configuration works correctly + """ + self.endpoint_context.cdb["client_1"]["token_exchange"] = { + "subject_token_types_supported": [ + "urn:ietf:params:oauth:token-type:access_token", + "urn:ietf:params:oauth:token-type:refresh_token", + ], + "requested_token_types_supported": [ + "urn:ietf:params:oauth:token-type:access_token", + "urn:ietf:params:oauth:token-type:refresh_token", + ], + "policy": { + "": { + "callable": + "idpyoidc.server.oauth2.token_helper.validate_token_exchange_policy", + "kwargs": {"scope": ["openid"]}, + } + }, + } + + resp, _state, _scope = self.process_setup(token) + + # ****** Token Exchange Request ********** + + req_args = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "requested_token_type": token[list(token.keys())[0]], + "subject_token": resp["access_token"], + "subject_token_type": 'urn:ietf:params:oauth:token-type:access_token', + "state": _state + } + + _token_exchange_request, _te_resp = self.do_query("token_exchange", "token", req_args, + _state) + + assert set(_te_resp.keys()) == { + "access_token", + "token_type", + "scope", + "expires_in", + "issued_token_type", + } + + assert _te_resp["issued_token_type"] == list(token.keys())[0] + assert _te_resp["scope"] == _scope + + def test_additional_parameters(self): + """ + Test that a token exchange with additional parameters including + scope, audience and subject_token_type works. + """ + endp = self.server.get_endpoint("token") + conf = endp.helper["urn:ietf:params:oauth:grant-type:token-exchange"].config + conf["policy"][""]["kwargs"] = {} + conf["policy"][""]["kwargs"]["audience"] = ["https://example.com"] + conf["policy"][""]["kwargs"]["resource"] = ["https://example.com"] + + resp, _state, _scope = self.process_setup() + + # ****** Token Exchange Request ********** + + req_args = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": resp["access_token"], + "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", + "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", + "audience": ["https://example.com"], + "resource": ["https://example.com"], + } + + _token_exchange_request, _te_resp = self.do_query("token_exchange", "token", req_args, + _state) + + assert set(_te_resp.keys()) == { + "access_token", + "token_type", + "expires_in", + "issued_token_type", + "scope", + } + + def test_token_exchange_fails_if_disabled(self): + """ + Test that token exchange fails if it's not included in Token's + grant_types_supported (that are set in its helper attribute). + """ + endpoint = self.server.get_endpoint("token") + del endpoint.helper["urn:ietf:params:oauth:grant-type:token-exchange"] + + resp, _state, _scope = self.process_setup() + + # ****** Token Exchange Request ********** + + req_args = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": resp["access_token"], + "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", + "resource": ["https://example.com/api"] + } + + _te_request, _te_resp = self.do_query("token_exchange", "token", req_args, _state) + + assert _te_resp["error"] == "invalid_request" + assert ( + _te_resp["error_description"] + == "Unsupported grant_type: urn:ietf:params:oauth:grant-type:token-exchange" + ) + + def test_wrong_resource(self): + """ + Test that requesting a token for an unknown resource fails. + """ + endpoint = self.server.get_endpoint("token") + + conf = endpoint.helper["urn:ietf:params:oauth:grant-type:token-exchange"].config + conf["policy"][""]["kwargs"] = {} + conf["policy"][""]["kwargs"]["resource"] = ["https://example.com"] + + resp, _state, _scope = self.process_setup() + + # ****** Token Exchange Request ********** + + req_args = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": resp["access_token"], + "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", + "resource": ["https://unknown-resource.com/api"], + } + + _te_request, _te_resp = self.do_query("token_exchange", "token", req_args, _state) + + assert set(_te_resp.keys()) == {"error", "error_description"} + assert _te_resp["error"] == "invalid_target" + assert _te_resp["error_description"] == "Unknown resource" + + def test_refresh_token_audience(self): + """ + Test that requesting a refresh token with audience fails. + """ + + resp, _state, _scope = self.process_setup( + {"refresh_token": "urn:ietf:params:oauth:token-type:refresh_token"}) + + # ****** Token Exchange Request ********** + + req_args = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": resp["refresh_token"], + "subject_token_type": "urn:ietf:params:oauth:token-type:refresh_token", + "audience": ["https://example.com"], + } + + _te_request, _te_resp = self.do_query("token_exchange", "token", req_args, _state) + + assert set(_te_resp.keys()) == {"error", "error_description"} + assert _te_resp["error"] == "invalid_target" + assert _te_resp["error_description"] == "Refresh token has single owner" + + def test_wrong_audience(self): + """ + Test that requesting a token for an unknown audience fails. + """ + endpoint = self.server.get_endpoint("token") + conf = endpoint.helper["urn:ietf:params:oauth:grant-type:token-exchange"].config + conf["policy"][""]["kwargs"] = {} + conf["policy"][""]["kwargs"]["audience"] = ["https://example.com"] + + resp, _state, _scope = self.process_setup() + + # ****** Token Exchange Request ********** + + req_args = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": resp["access_token"], + "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", + "audience": ["https://unknown-audience.com/"], + } + + _te_request, _te_resp = self.do_query("token_exchange", "token", req_args, _state) + + assert set(_te_resp.keys()) == {"error", "error_description"} + assert _te_resp["error"] == "invalid_target" + assert _te_resp["error_description"] == "Unknown audience" + + def test_exchange_refresh_token_to_refresh_token(self): + """ + Test whether exchanging a refresh token to another refresh token works. + """ + resp, _state, _scope = self.process_setup( + {"refresh_token": "urn:ietf:params:oauth:token-type:refresh_token"}) + + # ****** Token Exchange Request ********** + + req_args = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": resp["refresh_token"], + "subject_token_type": "urn:ietf:params:oauth:token-type:refresh_token", + "requested_token_type": "urn:ietf:params:oauth:token-type:refresh_token", + } + + _te_request, _te_resp = self.do_query("token_exchange", "token", req_args, _state) + + assert set(_te_resp.keys()) == {"error", "error_description"} + + @pytest.mark.parametrize( + "scopes", + [ + ["openid", "offline_access"], + ["openid"], + ], + ) + def test_exchange_access_token_to_refresh_token(self, scopes): + + resp, _state, _scope = self.process_setup(scope=scopes) + + # ****** Token Exchange Request ********** + + req_args = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": resp["access_token"], + "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", + "requested_token_type": "urn:ietf:params:oauth:token-type:refresh_token", + } + + _te_request, _te_resp = self.do_query("token_exchange", "token", req_args, _state) + + if "offline_access" in scopes: + assert set(_te_resp.keys()) != {"error", "error_description"} + else: + assert _te_resp["error"] == "invalid_request" + + @pytest.mark.parametrize( + "unsupported_type", + [ + "unknown", + "urn:ietf:params:oauth:token-type:id_token", + "urn:ietf:params:oauth:token-type:saml2", + "urn:ietf:params:oauth:token-type:saml1", + ], + ) + def test_unsupported_requested_token_type(self, unsupported_type): + """ + Test that requesting a token type that is unknown or unsupported fails. + """ + + resp, _state, _scope = self.process_setup() + + # ****** Token Exchange Request ********** + + req_args = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": resp["access_token"], + "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", + "requested_token_type": unsupported_type, + "audience": ["https://example.com"], + "resource": ["https://example.com/api"], + } + + _te_request, _te_resp = self.do_query("token_exchange", "token", req_args, _state) + + assert set(_te_resp.keys()) == {"error", "error_description"} + assert _te_resp["error"] == "invalid_request" + assert _te_resp["error_description"] == "Unsupported requested token type" + + @pytest.mark.parametrize( + "unsupported_type", + [ + "unknown", + "urn:ietf:params:oauth:token-type:id_token", + "urn:ietf:params:oauth:token-type:saml2", + "urn:ietf:params:oauth:token-type:saml1", + ], + ) + def test_unsupported_subject_token_type(self, unsupported_type): + """ + Test that providing an unsupported subject token type fails. + """ + resp, _state, _scope = self.process_setup() + + # ****** Token Exchange Request ********** + + req_args = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": resp["access_token"], + "subject_token_type": unsupported_type, + "audience": ["https://example.com"], + "resource": ["https://example.com/api"], + } + + _te_request, _te_resp = self.do_query("token_exchange", "token", req_args, _state) + + assert set(_te_resp.keys()) == {"error", "error_description"} + assert _te_resp["error"] == "invalid_request" + assert _te_resp["error_description"] == "Subject token invalid" + + def test_unsupported_actor_token(self): + """ + Test that providing an actor token fails as it's unsupported. + """ + resp, _state, _scope = self.process_setup() + + # ****** Token Exchange Request ********** + + req_args = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": resp["access_token"], + "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", + "actor_token": resp["access_token"], + } + + _te_request, _te_resp = self.do_query("token_exchange", "token", req_args, _state) + + assert set(_te_resp.keys()) == {"error", "error_description"} + assert _te_resp["error"] == "invalid_request" + assert _te_resp["error_description"] == "Actor token not supported" + + def test_invalid_token(self): + """ + Test that providing an invalid token fails. + """ + resp, _state, _scope = self.process_setup() + + # ****** Token Exchange Request ********** + + req_args = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": "invalid_token", + "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", + } + + _te_request, _te_resp = self.do_query("token_exchange", "token", req_args, _state) + + assert set(_te_resp.keys()) == {"error", "error_description"} + assert _te_resp["error"] == "invalid_request" + assert _te_resp["error_description"] == "Subject token invalid"