diff --git a/example/satosa/integration_test/.env.example b/example/satosa/integration_test/.env.example new file mode 100644 index 00000000..0a29c2ad --- /dev/null +++ b/example/satosa/integration_test/.env.example @@ -0,0 +1 @@ +PYEUDIW_MONGO_TEST_AUTH_INLINE=satosa:thatpassword@ \ No newline at end of file diff --git a/example/satosa/integration_test/commons.py b/example/satosa/integration_test/commons.py index a1494855..d77ec536 100644 --- a/example/satosa/integration_test/commons.py +++ b/example/satosa/integration_test/commons.py @@ -26,6 +26,7 @@ leaf_wallet_signed, ) from pyeudiw.sd_jwt.holder import SDJWTHolder +from pyeudiw.trust.model.trust_source import TrustSourceData from saml2_sp import saml2_request from settings import ( @@ -51,6 +52,15 @@ "default_exp": 1024, "key_binding": True } +CREDENTIAL_ISSUER_TRUST_SOURCE_Dict = { + "entity_id": ISSUER_CONF["issuer"], + "policies": {}, + "metadata": {}, + "revoked": False, + "keys": [CREDENTIAL_ISSUER_JWK.as_dict()], + "trust_params": {} +} +CREDENTIAL_ISSUER_TRUST_SOURCE = TrustSourceData(**CREDENTIAL_ISSUER_TRUST_SOURCE_Dict) WALLET_PRIVATE_JWK = JWK(leaf_wallet_jwk.serialize(private=True)) WALLET_PUBLIC_JWK = JWK(leaf_wallet_jwk.serialize()) @@ -81,7 +91,8 @@ def apply_trust_settings(db_engine_inst: DBEngine) -> DBEngine: db_engine_inst.add_or_update_trust_attestation( entity_id=leaf_cred["iss"], attestation=leaf_cred_signed, - exp=datetime.datetime.now().isoformat() + exp=datetime.datetime.now().isoformat(), + trust_type=TrustType.FEDERATION ) settings = ISSUER_CONF @@ -90,6 +101,11 @@ def apply_trust_settings(db_engine_inst: DBEngine) -> DBEngine: trust_type=TrustType.DIRECT_TRUST_SD_JWT_VC, jwks=[leaf_cred_jwk_prot.serialize()] ) + + db_engine_inst.add_trust_source( + trust_source=CREDENTIAL_ISSUER_TRUST_SOURCE_Dict + ) + return db_engine_inst def create_saml_auth_request() -> str: @@ -100,7 +116,7 @@ def create_issuer_test_data() -> dict[Literal["jws"] | Literal["issuance"], str] # create a SD-JWT signed by a trusted credential issuer settings = ISSUER_CONF settings["default_exp"] = 33 - + user_claims = _yaml_load_specification(StringIO(settings["sd_specification"])) claims = { "iss": settings["issuer"], @@ -108,19 +124,16 @@ def create_issuer_test_data() -> dict[Literal["jws"] | Literal["issuance"], str] "exp": exp_from_now(settings["default_exp"]) # in seconds } user_claims.update(claims) - - issued_jwt = SDJWTIssuer( - issuer_keys=CREDENTIAL_ISSUER_JWK, - holder_key= WALLET_PUBLIC_JWK, + issuer_keys=CREDENTIAL_ISSUER_JWK.as_dict(), + holder_key=WALLET_PUBLIC_JWK.as_dict(), extra_header_parameters={ "typ": "dc+sd-jwt", "kid": CREDENTIAL_ISSUER_JWK.kid }, - user_claims=_yaml_load_specification(StringIO(settings["sd_specification"])), + user_claims=user_claims, add_decoy_claims=claims.get("add_decoy_claims", True) ) - return {"jws": issued_jwt.serialized_sd_jwt, "issuance": issued_jwt.sd_jwt_issuance} @@ -131,6 +144,7 @@ def create_holder_test_data(issued_jwt: dict[Literal["jws"] | Literal["issuance" issued_jwt["issuance"], serialization_format="compact", ) + holder_private_key: dict | None = WALLET_PRIVATE_JWK.as_dict() if settings.get("key_binding", False) else None sdjwt_at_holder.create_presentation( claims_to_disclose={ "tax_id_code": True, @@ -140,14 +154,7 @@ def create_holder_test_data(issued_jwt: dict[Literal["jws"] | Literal["issuance" nonce=request_nonce, aud=request_aud, sign_alg=DEFAULT_SIGN_KTY_TO_ALG[WALLET_PRIVATE_JWK.key.kty], - holder_key=( - key_from_jwk_dict( - WALLET_PRIVATE_JWK.key.priv_key, - kid=WALLET_PRIVATE_JWK.kid - ) - if settings.get("key_binding", False) - else None - ) + holder_key=holder_private_key ) vp_token = sdjwt_at_holder.sd_jwt_presentation diff --git a/example/satosa/integration_test/cross_device_integration_test.py b/example/satosa/integration_test/cross_device_integration_test.py index 2d8fbd79..2d30e558 100644 --- a/example/satosa/integration_test/cross_device_integration_test.py +++ b/example/satosa/integration_test/cross_device_integration_test.py @@ -7,7 +7,7 @@ from pyeudiw.jwt.utils import decode_jwt_payload -from . commons import ( +from commons import ( ISSUER_CONF, setup_test_db_engine, apply_trust_settings, diff --git a/example/satosa/integration_test/same_device_integration_test.py b/example/satosa/integration_test/same_device_integration_test.py index 3c22e670..a7afe358 100644 --- a/example/satosa/integration_test/same_device_integration_test.py +++ b/example/satosa/integration_test/same_device_integration_test.py @@ -21,6 +21,7 @@ db_engine_inst = setup_test_db_engine() db_engine_inst = apply_trust_settings(db_engine_inst) + def _extract_request_uri(e: Exception) -> str: request_uri: str = re.search(r'request_uri=(.*?)(?:\'|\s|$)', urllib.parse.unquote_plus(e.args[0])).group(1) request_uri = request_uri.rstrip() diff --git a/example/satosa/integration_test/settings.py b/example/satosa/integration_test/settings.py index d58147c0..082693d2 100644 --- a/example/satosa/integration_test/settings.py +++ b/example/satosa/integration_test/settings.py @@ -14,6 +14,8 @@ TIMEOUT_S = 10 IDP_BASEURL = "https://localhost" RP_EID = "https://localhost/OpenID4VP" +MONGO_AUTH_INLINE = os.getenv('PYEUDIW_MONGO_TEST_AUTH_INLINE', '') +MONGO_URL_CONNECTION = f"mongodb://{MONGO_AUTH_INLINE}localhost:27017/?timeoutMS=2000" CONFIG_DB = { "mongo_db": { @@ -22,12 +24,13 @@ "class": "MongoStorage", "init_params": { # according to Satosa-Saml2Spid demo - "url": f"mongodb://{os.getenv('PYEUDIW_MONGO_TEST_AUTH_INLINE', '')}localhost:27017/?timeoutMS=2000", + "url": MONGO_URL_CONNECTION, "conf": { "db_name": "eudiw", "db_sessions_collection": "sessions", "db_trust_attestations_collection": "trust_attestations", - "db_trust_anchors_collection": "trust_anchors" + "db_trust_anchors_collection": "trust_anchors", + "db_trust_sources_collection": "trust_sources" }, "connection_params": {} } diff --git a/example/satosa/pyeudiw_backend.yaml b/example/satosa/pyeudiw_backend.yaml index 0f602dcc..57f7fa48 100644 --- a/example/satosa/pyeudiw_backend.yaml +++ b/example/satosa/pyeudiw_backend.yaml @@ -161,6 +161,7 @@ config: db_sessions_collection: sessions db_trust_attestations_collection: trust_attestations db_trust_anchors_collection: trust_anchors + db_trust_sources_collection: trust_sources data_ttl: 63072000 # 2 years # - connection_params: diff --git a/pyeudiw/jwt/helper.py b/pyeudiw/jwt/helper.py index 6ba77643..f897713e 100644 --- a/pyeudiw/jwt/helper.py +++ b/pyeudiw/jwt/helper.py @@ -355,7 +355,7 @@ def _select_verifying_key(self, header: dict) -> JWK | None: if unsupported_claims.intersection(header): raise JWSVerificationError(NotImplementedError(f"self contained key extraction form header with claims {unsupported_claims} not supported yet")) # if only one key and there is no header claim that can identitfy any key, than that MUST - # be the only valid candidate key for signatuire verification + # be the only valid candidate key for signature verification if len(self.jwks) == 1: return self.jwks[0] return None diff --git a/pyeudiw/openid4vp/vp_sd_jwt_vc.py b/pyeudiw/openid4vp/vp_sd_jwt_vc.py index 4a96ba53..4ff9b0c4 100644 --- a/pyeudiw/openid4vp/vp_sd_jwt_vc.py +++ b/pyeudiw/openid4vp/vp_sd_jwt_vc.py @@ -24,7 +24,6 @@ def __init__(self, token: str, verifying_keys: list[dict] | TrustedPublicKeySour _issuer_keys: list[dict] = [] if hasattr(verifying_keys, 'get_public_keys'): # this IF is duck typing check on TrustEvaluator / TrustedPublicKeySource - # breakpoint() _issuer_keys = verifying_keys.get_public_keys(self.get_issuer_name()) elif isinstance(verifying_keys, list): _issuer_keys = verifying_keys diff --git a/pyeudiw/sd_jwt/issuer.py b/pyeudiw/sd_jwt/issuer.py index 644e809d..28b7fb65 100644 --- a/pyeudiw/sd_jwt/issuer.py +++ b/pyeudiw/sd_jwt/issuer.py @@ -22,6 +22,7 @@ logger = logging.getLogger(__name__) + class SDJWTIssuer(SDJWTCommon): DECOY_MIN_ELEMENTS = 2 DECOY_MAX_ELEMENTS = 5 diff --git a/pyeudiw/storage/base_storage.py b/pyeudiw/storage/base_storage.py index 89423cfa..d2b7703a 100644 --- a/pyeudiw/storage/base_storage.py +++ b/pyeudiw/storage/base_storage.py @@ -171,10 +171,13 @@ def has_trust_anchor(self, entity_id: str) -> bool: :rtype: bool """ raise NotImplementedError() - + def has_trust_source(self, entity_id: str) -> bool: raise NotImplementedError() + def _upsert_entry(self, key_label: str, collection: str, data: Union[str, dict]) -> tuple[str, dict]: + raise NotADirectoryError + def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType, jwks: dict) -> str: """ Add a trust attestation. diff --git a/pyeudiw/storage/mongo_storage.py b/pyeudiw/storage/mongo_storage.py index 77dd6747..4ec1ca5b 100644 --- a/pyeudiw/storage/mongo_storage.py +++ b/pyeudiw/storage/mongo_storage.py @@ -273,9 +273,7 @@ def _upsert_entry( ) if not document_status.acknowledged: - raise StorageEntryUpdateFailed( - "Trust Anchor matched count is ZERO" - ) + raise StorageEntryUpdateFailed(f"Failed to update or insert document with label {key_label}") return document_status @@ -311,7 +309,6 @@ def _update_anchor_metadata(self, entity: dict, attestation: list[str], exp: dat entity[trust_name] = trust_entity - return entity def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType, jwks: list[dict]) -> str: diff --git a/pyeudiw/tests/trust/handler/test_direct_trust.py b/pyeudiw/tests/trust/handler/test_direct_trust.py index ad94157d..f9c3db86 100644 --- a/pyeudiw/tests/trust/handler/test_direct_trust.py +++ b/pyeudiw/tests/trust/handler/test_direct_trust.py @@ -115,7 +115,7 @@ def test_direct_trust_jwk(): trust_source = trust_handler.extract_and_update_trust_materials(random_issuer, trust_source) obtained_jwks = trust_source.keys - + mocked_issuer_jwt_vc_issuer_endpoint.stop() assert len(obtained_jwks) == 1, f"expected 1 jwk, obtained {len(obtained_jwks)}" diff --git a/pyeudiw/tests/trust/mock_trust_handler.py b/pyeudiw/tests/trust/mock_trust_handler.py index b9e49908..4db90a9d 100644 --- a/pyeudiw/tests/trust/mock_trust_handler.py +++ b/pyeudiw/tests/trust/mock_trust_handler.py @@ -24,10 +24,10 @@ def extract_and_update_trust_materials(self, issuer: str, trust_source: TrustSou trust_source = self.get_metadata(issuer, trust_source) trust_source.keys.append(mock_jwk) return trust_source - + class NonConformatTrustHandler: def get_metadata(self, issuer: str, trust_source: TrustSourceData) -> dict: return trust_source def extract(self, issuer: str, trust_source: TrustSourceData) -> TrustSourceData: - return trust_source \ No newline at end of file + return trust_source diff --git a/pyeudiw/trust/dynamic.py b/pyeudiw/trust/dynamic.py index 618d4b5b..bc0ceeb0 100644 --- a/pyeudiw/trust/dynamic.py +++ b/pyeudiw/trust/dynamic.py @@ -143,9 +143,9 @@ def get_policies(self, issuer: str) -> dict[str, any]: if not trust_source.policies: raise Exception(f"no trust evaluator can provide policies for {issuer}: searched among: {self.handlers_names}") - + return trust_source.policies - + def get_selfissued_jwt_header_trust_parameters(self, issuer: str) -> list[dict]: """ Get the trust parameters of a certain issuer according to some trust model. @@ -160,9 +160,9 @@ def get_selfissued_jwt_header_trust_parameters(self, issuer: str) -> list[dict]: if not trust_source.trust_params: raise Exception(f"no trust evaluator can provide trust parameters for {issuer}: searched among: {self.handlers_names}") - + return {type: param.trust_params for type, param in trust_source.trust_params.items()} - + @staticmethod def from_config(config: dict, db_engine: DBEngine) -> 'CombinedTrustEvaluator': """ @@ -172,7 +172,7 @@ def from_config(config: dict, db_engine: DBEngine) -> 'CombinedTrustEvaluator': :type config: dict :param db_engine: The database engine :type db_engine: DBEngine - + :returns: The CombinedTrustEvaluator :rtype: CombinedTrustEvaluator """ @@ -181,16 +181,17 @@ def from_config(config: dict, db_engine: DBEngine) -> 'CombinedTrustEvaluator': for handler_name, handler_config in config.items(): try: trust_handler = dynamic_class_loader( - handler_config["module"], - handler_config["class"], + handler_config["module"], + handler_config["class"], handler_config["config"] ) except Exception as e: raise TrustConfigurationError(f"invalid configuration for {handler_name}: {e}", e) - - if not isinstance(trust_handler, TrustHandlerInterface): - raise TrustConfigurationError(f"class {trust_handler.__class__} does not satisfy the interface TrustEvaluator") - + + # TODO: check if the imported class has attributes that satisfy given interface + # if not isinstance(trust_handler, TrustHandlerInterface): + # raise TrustConfigurationError(f"class {trust_handler.__class__} does not satisfy the interface TrustEvaluator") + handlers.append(trust_handler) if not handlers: @@ -198,4 +199,3 @@ def from_config(config: dict, db_engine: DBEngine) -> 'CombinedTrustEvaluator': handlers.append(DirectTrustSdJwtVc()) return CombinedTrustEvaluator(handlers, db_engine) - diff --git a/pyeudiw/trust/handler/direct_trust_sd_jwt_vc.py b/pyeudiw/trust/handler/direct_trust_sd_jwt_vc.py index cdd56968..7e7511bf 100644 --- a/pyeudiw/trust/handler/direct_trust_sd_jwt_vc.py +++ b/pyeudiw/trust/handler/direct_trust_sd_jwt_vc.py @@ -9,8 +9,8 @@ from pyeudiw.trust.handler.exception import InvalidJwkMetadataException -DEAFAULT_JWK_ENDPOINT = "/.well-known/jwt-vc-issuer" -DEAFAULT_METADATA_ENDPOINT = "/.well-known/openid-credential-issuer" +DEFAULT_SDJWTVC_METADATA_ENDPOINT = "/.well-known/jwt-vc-issuer" +DEFAULT_OPENID4VCI_METADATA_ENDPOINT = "/.well-known/openid-credential-issuer" DEFAULT_DIRECT_TRUST_SD_JWC_VC_PARAMS = { "connection": { @@ -21,14 +21,17 @@ } } + class DirectTrustSdJwtVc(TrustHandlerInterface, BaseLogger): def __init__( - self, - httpc_params: dict = DEFAULT_DIRECT_TRUST_SD_JWC_VC_PARAMS, - jwk_endpoint: str = DEAFAULT_JWK_ENDPOINT, - metadata_endpoint: str = DEAFAULT_METADATA_ENDPOINT, + self, + httpc_params: dict | None = None, + jwk_endpoint: str = DEFAULT_SDJWTVC_METADATA_ENDPOINT, + metadata_endpoint: str = DEFAULT_OPENID4VCI_METADATA_ENDPOINT, cache_ttl: int = 0, ) -> None: + if httpc_params is None: + httpc_params = DEFAULT_DIRECT_TRUST_SD_JWC_VC_PARAMS self.httpc_params = httpc_params self.jwk_endpoint = jwk_endpoint self.metadata_endpoint = metadata_endpoint @@ -81,8 +84,7 @@ def build_issuer_jwk_endpoint(issuer_id: str, well_known_path_component: str) -> def build_issuer_metadata_endpoint(issuer: str, metadata_path_component: str) -> str: issuer_normalized = issuer if issuer[-1] != '/' else issuer[:-1] return issuer_normalized + metadata_path_component - - + def extract_and_update_trust_materials(self, issuer: str, trust_source: TrustSourceData) -> TrustSourceData: """ Fetches the public key of the issuer by querying a given endpoint. @@ -107,7 +109,7 @@ def extract_and_update_trust_materials(self, issuer: str, trust_source: TrustSou trust_source.add_keys(jwk_l) except Exception as e: - self._log_warning("Extracting JWK" ,f"Failed to extract jwks from issuer {issuer}: {e}") + self._log_warning("Extracting JWK", f"Failed to extract jwks from issuer {issuer}: {e}") return trust_source @@ -126,4 +128,4 @@ def get_metadata(self, issuer: str, trust_source: TrustSourceData) -> TrustSourc else: trust_source.metadata = cacheable_get_http_url(self.cache_ttl, url, self.httpc_params, self.http_async_calls).json() - return trust_source \ No newline at end of file + return trust_source