diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index e876d934..5d9982b3 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -65,6 +65,7 @@ jobs: python -m pip install -e . python -m pip install "Pillow>=10.0.0,<10.1" "device_detector>=5.0,<6" "satosa>=8.4,<8.6" "jinja2>=3.0,<4" "pymongo>=4.4.1,<4.5" aiohttp python -m pip install git+https://github.com/openwallet-foundation-labs/sd-jwt-python.git + python -m pip install git+https://github.com/peppelinux/pyMDOC-CBOR.git - name: Lint with flake8 run: | diff --git a/pyeudiw/openid4vp/utils.py b/pyeudiw/openid4vp/utils.py index 6ddc7e21..75d9da8e 100644 --- a/pyeudiw/openid4vp/utils.py +++ b/pyeudiw/openid4vp/utils.py @@ -1,4 +1,5 @@ from pyeudiw.openid4vp.vp import Vp +from pyeudiw.openid4vp.vp_mdoc_cbor import VpMDocCbor from pyeudiw.openid4vp.vp_sd_jwt import VpSdJwt from pyeudiw.jwt.utils import decode_jwt_header from pyeudiw.openid4vp.exceptions import VPFormatNotSupported @@ -21,6 +22,7 @@ def vp_parser(jwt: str) -> Vp: if headers["typ"].lower() == "jwt": return VpSdJwt(jwt) - - raise VPFormatNotSupported( - "VP Digital credentials type not implemented yet: {_typ}") + elif headers["typ"].lower() == "mdoc_cbor": + return VpMDocCbor(jwt) + + raise VPFormatNotSupported("VP Digital credentials type not implemented yet: {_typ}") diff --git a/pyeudiw/openid4vp/vp.py b/pyeudiw/openid4vp/vp.py index b2378f94..b104bdb3 100644 --- a/pyeudiw/openid4vp/vp.py +++ b/pyeudiw/openid4vp/vp.py @@ -1,36 +1,11 @@ -from pyeudiw.openid4vp.exceptions import InvalidVPToken from pyeudiw.tools.base_logger import BaseLogger -from pyeudiw.jwt.utils import is_jwt_format, decode_jwt_header, decode_jwt_payload - class Vp(BaseLogger): """Class for Verifiable Presentation istance.""" - - def __init__(self, jwt: str) -> None: - """ - Generates a VP istance. - - :param jwt: a string that represents the jwt. - :type jwt: str - - :raises InvalidVPToken: if the jwt field's value is not a JWT. - """ - - if not is_jwt_format(jwt): - raise InvalidVPToken("VP is not in JWT format.") - - self.headers = decode_jwt_header(jwt) - self.jwt = jwt - self.payload = decode_jwt_payload(jwt) - - self.credential_headers: dict = {} - self.credential_payload: dict = {} - - self.parse_digital_credential() - - self.disclosed_user_attributes: dict = {} - self._credential_jwks: list[dict] = [] - + + def parse_digital_credential(self) -> None: + raise NotImplementedError + def _detect_vp_type(self) -> str: """ Detects and return the type of verifiable presentation. @@ -38,31 +13,8 @@ def _detect_vp_type(self) -> str: :returns: the type of VP. :rtype: str """ - return self.headers["typ"].lower() - - def get_credential_jwks(self) -> list[dict]: - """ - Returns the credential JWKs. - - :returns: the list containing credential's JWKs. - :rtype: list[dict] - """ - if not self.credential_jwks: - return {} - return self.credential_jwks - - def parse_digital_credential(self) -> None: raise NotImplementedError - def set_credential_jwks(self, credential_jwks: list[dict]) -> None: - """ - Set the credential JWKs for the current istance. - - :param credential_jwks: a list containing the credential's JWKs. - :type credential_jwks: list[dict] - """ - self._credential_jwks = credential_jwks - def check_revocation(self): """ Check if the VP is revoked. @@ -78,15 +30,3 @@ def verify( **kwargs ) -> bool: raise NotImplementedError - - @property - def credential_jwks(self) -> list[dict]: - """Returns the credential JWKs""" - return self._credential_jwks - - @property - def credential_issuer(self) -> str: - """Returns the credential issuer""" - if not self.credential_payload.get('iss', None): - self.parse_digital_credential() - return self.credential_payload.get('iss', None) diff --git a/pyeudiw/openid4vp/vp_mdoc_cbor.py b/pyeudiw/openid4vp/vp_mdoc_cbor.py new file mode 100644 index 00000000..a545b70b --- /dev/null +++ b/pyeudiw/openid4vp/vp_mdoc_cbor.py @@ -0,0 +1,17 @@ +from pyeudiw.openid4vp.vp import Vp +from pymdoccbor.mdoc.verifier import MdocCbor + +class VpMDocCbor(Vp): + def __init__(self, data: str) -> None: + self.data = data + self.mdoc = MdocCbor() + self.parse_digital_credential() + + def parse_digital_credential(self) -> None: + self.mdoc.load(data=self.data) + + def verify(self, **kwargs) -> bool: + return self.mdoc.verify() + + def _detect_vp_type(self) -> str: + return "mdoc_cbor" \ No newline at end of file diff --git a/pyeudiw/openid4vp/vp_sd_jwt.py b/pyeudiw/openid4vp/vp_sd_jwt.py index 81472d07..16b091e7 100644 --- a/pyeudiw/openid4vp/vp_sd_jwt.py +++ b/pyeudiw/openid4vp/vp_sd_jwt.py @@ -1,19 +1,19 @@ from typing import Dict from pyeudiw.jwk import JWK from pyeudiw.jwt import JWSHelper -from pyeudiw.jwt.utils import decode_jwt_header, decode_jwt_payload +from pyeudiw.jwt.utils import decode_jwt_header, decode_jwt_payload, is_jwt_format from pyeudiw.sd_jwt import verify_sd_jwt from pyeudiw.jwk.exceptions import KidNotFoundError from pyeudiw.openid4vp.vp import Vp - +from pyeudiw.openid4vp.exceptions import InvalidVPToken class VpSdJwt(Vp): """Class for SD-JWT Format""" def __init__(self, jwt: str): """ - Generates a VpSdJwt istance + Generates a VP instance. :param jwt: a string that represents the jwt. :type jwt: str @@ -21,7 +21,20 @@ def __init__(self, jwt: str): :raises InvalidVPToken: if the jwt field's value is not a JWT. """ - super().__init__(jwt) + if not is_jwt_format(jwt): + raise InvalidVPToken("VP is not in JWT format.") + + self.headers = decode_jwt_header(jwt) + self.jwt = jwt + self.payload = decode_jwt_payload(jwt) + self.data = jwt + self.credential_headers: dict = {} + self.credential_payload: dict = {} + + self.parse_digital_credential() + + self.disclosed_user_attributes: dict = {} + self._credential_jwks: list[dict] = [] def parse_digital_credential(self) -> None: """ @@ -83,3 +96,42 @@ def verify( result.update(result['verified_claims'].get('claims', {})) return True + + def get_credential_jwks(self) -> list[dict]: + """ + Returns the credential JWKs. + + :returns: the list containing credential's JWKs. + :rtype: list[dict] + """ + return self.credential_jwks or {} + + def set_credential_jwks(self, credential_jwks: list[dict]) -> None: + """ + Set the credential JWKs for the current istance. + + :param credential_jwks: a list containing the credential's JWKs. + :type credential_jwks: list[dict] + """ + self._credential_jwks = credential_jwks + + def _detect_vp_type(self) -> str: + """ + Detects and return the type of verifiable presentation. + + :returns: the type of VP. + :rtype: str + """ + return self.headers.get("typ", "").lower() + + @property + def credential_jwks(self) -> list[dict]: + """Returns the credential JWKs""" + return self._credential_jwks + + @property + def credential_issuer(self) -> str: + """Returns the credential issuer""" + if not self.credential_payload.get('iss', None): + self.parse_digital_credential() + return self.credential_payload.get('iss', None) \ No newline at end of file diff --git a/pyeudiw/satosa/default/request_handler.py b/pyeudiw/satosa/default/request_handler.py index a44b3121..b92d1966 100644 --- a/pyeudiw/satosa/default/request_handler.py +++ b/pyeudiw/satosa/default/request_handler.py @@ -40,10 +40,11 @@ def _handle_credential_trust(self, context: Context, vp: Vp) -> bool: return self._handle_400(context, f"Trust Evaluation failed for {tchelper.entity_id}") # TODO: generalyze also for x509 - credential_jwks = tchelper.get_trusted_jwks( - metadata_type='openid_credential_issuer' - ) - vp.set_credential_jwks(credential_jwks) + if isinstance(vp, VpSdJwt): + credential_jwks = tchelper.get_trusted_jwks( + metadata_type='openid_credential_issuer' + ) + vp.set_credential_jwks(credential_jwks) except InvalidVPToken: return self._handle_400(context, f"Cannot validate VP: {vp.jwt}") except ValidationError as e: @@ -140,7 +141,6 @@ def request_endpoint(self, context: Context, *args: tuple) -> Redirect | JsonRes self._handle_credential_trust(context, vp) # the trust is established to the credential issuer, then we can get the disclosed user attributes - # TODO - what if the credential is different from sd-jwt? -> generalyze within Vp class try: if isinstance(vp, VpSdJwt): @@ -151,7 +151,7 @@ def request_endpoint(self, context: Context, *args: tuple) -> Redirect | JsonRes else: vp.verify() except Exception as e: - return self._handle_400(context, f"VP SD-JWT validation error: {e}") + return self._handle_400(context, f"VP validation error with {self.data}: {e}") # vp.result attributes_by_issuers[vp.credential_issuer] = vp.disclosed_user_attributes diff --git a/pyeudiw/trust/trust_anchors.py b/pyeudiw/trust/trust_anchors.py index 3f405c79..33e0cb61 100644 --- a/pyeudiw/trust/trust_anchors.py +++ b/pyeudiw/trust/trust_anchors.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) -def update_trust_anchors_ecs(trust_anchors: list, db: DBEngine, httpc_params: dict) -> None: +def update_trust_anchors_ecs(trust_anchors: list[str], db: DBEngine, httpc_params: dict) -> None: """ Update the trust anchors entity configurations. diff --git a/setup.py b/setup.py index e218d356..6d89055f 100644 --- a/setup.py +++ b/setup.py @@ -53,6 +53,7 @@ def readme(): "pymongo>=4.4.1,<4.5", "requests>=2.2,<2.4", "sd-jwt", + "pymdoccbor @ git+https://github.com/peppelinux/pyMDOC-CBOR.git" ], "federation": [ "asyncio>=4,<4.1",