Skip to content

Commit

Permalink
wip: stabilize integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Zicchio committed Dec 11, 2024
1 parent 1108639 commit 6343f47
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 51 deletions.
1 change: 1 addition & 0 deletions example/satosa/integration_test/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
PYEUDIW_MONGO_TEST_AUTH_INLINE=satosa:thatpassword@
39 changes: 23 additions & 16 deletions example/satosa/integration_test/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
leaf_wallet_signed,
)
from pyeudiw.sd_jwt.holder import SDJWTHolder
from pyeudiw.trust.model.trust_source import TrustSourceData
from saml2_sp import saml2_request

from settings import (
Expand All @@ -51,6 +52,15 @@
"default_exp": 1024,
"key_binding": True
}
CREDENTIAL_ISSUER_TRUST_SOURCE_Dict = {
"entity_id": ISSUER_CONF["issuer"],
"policies": {},
"metadata": {},
"revoked": False,
"keys": [CREDENTIAL_ISSUER_JWK.as_dict()],
"trust_params": {}
}
CREDENTIAL_ISSUER_TRUST_SOURCE = TrustSourceData(**CREDENTIAL_ISSUER_TRUST_SOURCE_Dict)
WALLET_PRIVATE_JWK = JWK(leaf_wallet_jwk.serialize(private=True))
WALLET_PUBLIC_JWK = JWK(leaf_wallet_jwk.serialize())

Expand Down Expand Up @@ -81,7 +91,8 @@ def apply_trust_settings(db_engine_inst: DBEngine) -> DBEngine:
db_engine_inst.add_or_update_trust_attestation(
entity_id=leaf_cred["iss"],
attestation=leaf_cred_signed,
exp=datetime.datetime.now().isoformat()
exp=datetime.datetime.now().isoformat(),
trust_type=TrustType.FEDERATION
)

settings = ISSUER_CONF
Expand All @@ -90,6 +101,11 @@ def apply_trust_settings(db_engine_inst: DBEngine) -> DBEngine:
trust_type=TrustType.DIRECT_TRUST_SD_JWT_VC,
jwks=[leaf_cred_jwk_prot.serialize()]
)

db_engine_inst.add_trust_source(
trust_source=CREDENTIAL_ISSUER_TRUST_SOURCE_Dict
)

return db_engine_inst

def create_saml_auth_request() -> str:
Expand All @@ -100,27 +116,24 @@ def create_issuer_test_data() -> dict[Literal["jws"] | Literal["issuance"], str]
# create a SD-JWT signed by a trusted credential issuer
settings = ISSUER_CONF
settings["default_exp"] = 33

user_claims = _yaml_load_specification(StringIO(settings["sd_specification"]))
claims = {
"iss": settings["issuer"],
"iat": iat_now(),
"exp": exp_from_now(settings["default_exp"]) # in seconds
}
user_claims.update(claims)


issued_jwt = SDJWTIssuer(
issuer_keys=CREDENTIAL_ISSUER_JWK,
holder_key= WALLET_PUBLIC_JWK,
issuer_keys=CREDENTIAL_ISSUER_JWK.as_dict(),
holder_key=WALLET_PUBLIC_JWK.as_dict(),
extra_header_parameters={
"typ": "dc+sd-jwt",
"kid": CREDENTIAL_ISSUER_JWK.kid
},
user_claims=_yaml_load_specification(StringIO(settings["sd_specification"])),
user_claims=user_claims,
add_decoy_claims=claims.get("add_decoy_claims", True)
)

return {"jws": issued_jwt.serialized_sd_jwt, "issuance": issued_jwt.sd_jwt_issuance}


Expand All @@ -131,6 +144,7 @@ def create_holder_test_data(issued_jwt: dict[Literal["jws"] | Literal["issuance"
issued_jwt["issuance"],
serialization_format="compact",
)
holder_private_key: dict | None = WALLET_PRIVATE_JWK.as_dict() if settings.get("key_binding", False) else None
sdjwt_at_holder.create_presentation(
claims_to_disclose={
"tax_id_code": True,
Expand All @@ -140,14 +154,7 @@ def create_holder_test_data(issued_jwt: dict[Literal["jws"] | Literal["issuance"
nonce=request_nonce,
aud=request_aud,
sign_alg=DEFAULT_SIGN_KTY_TO_ALG[WALLET_PRIVATE_JWK.key.kty],
holder_key=(
key_from_jwk_dict(
WALLET_PRIVATE_JWK.key.priv_key,
kid=WALLET_PRIVATE_JWK.kid
)
if settings.get("key_binding", False)
else None
)
holder_key=holder_private_key
)

vp_token = sdjwt_at_holder.sd_jwt_presentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from pyeudiw.jwt.utils import decode_jwt_payload

from . commons import (
from commons import (
ISSUER_CONF,
setup_test_db_engine,
apply_trust_settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
db_engine_inst = setup_test_db_engine()
db_engine_inst = apply_trust_settings(db_engine_inst)


def _extract_request_uri(e: Exception) -> str:
request_uri: str = re.search(r'request_uri=(.*?)(?:\'|\s|$)', urllib.parse.unquote_plus(e.args[0])).group(1)
request_uri = request_uri.rstrip()
Expand Down
7 changes: 5 additions & 2 deletions example/satosa/integration_test/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
TIMEOUT_S = 10
IDP_BASEURL = "https://localhost"
RP_EID = "https://localhost/OpenID4VP"
MONGO_AUTH_INLINE = os.getenv('PYEUDIW_MONGO_TEST_AUTH_INLINE', '')
MONGO_URL_CONNECTION = f"mongodb://{MONGO_AUTH_INLINE}localhost:27017/?timeoutMS=2000"

CONFIG_DB = {
"mongo_db": {
Expand All @@ -22,12 +24,13 @@
"class": "MongoStorage",
"init_params": {
# according to Satosa-Saml2Spid demo
"url": f"mongodb://{os.getenv('PYEUDIW_MONGO_TEST_AUTH_INLINE', '')}localhost:27017/?timeoutMS=2000",
"url": MONGO_URL_CONNECTION,
"conf": {
"db_name": "eudiw",
"db_sessions_collection": "sessions",
"db_trust_attestations_collection": "trust_attestations",
"db_trust_anchors_collection": "trust_anchors"
"db_trust_anchors_collection": "trust_anchors",
"db_trust_sources_collection": "trust_sources"
},
"connection_params": {}
}
Expand Down
1 change: 1 addition & 0 deletions example/satosa/pyeudiw_backend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ config:
db_sessions_collection: sessions
db_trust_attestations_collection: trust_attestations
db_trust_anchors_collection: trust_anchors
db_trust_sources_collection: trust_sources
data_ttl: 63072000 # 2 years
# - connection_params:

Expand Down
2 changes: 1 addition & 1 deletion pyeudiw/jwt/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def _select_verifying_key(self, header: dict) -> JWK | None:
if unsupported_claims.intersection(header):
raise JWSVerificationError(NotImplementedError(f"self contained key extraction form header with claims {unsupported_claims} not supported yet"))
# if only one key and there is no header claim that can identitfy any key, than that MUST
# be the only valid candidate key for signatuire verification
# be the only valid candidate key for signature verification
if len(self.jwks) == 1:
return self.jwks[0]
return None
1 change: 0 additions & 1 deletion pyeudiw/openid4vp/vp_sd_jwt_vc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def __init__(self, token: str, verifying_keys: list[dict] | TrustedPublicKeySour
_issuer_keys: list[dict] = []
if hasattr(verifying_keys, 'get_public_keys'):
# this IF is duck typing check on TrustEvaluator / TrustedPublicKeySource
# breakpoint()
_issuer_keys = verifying_keys.get_public_keys(self.get_issuer_name())
elif isinstance(verifying_keys, list):
_issuer_keys = verifying_keys
Expand Down
1 change: 1 addition & 0 deletions pyeudiw/sd_jwt/issuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

logger = logging.getLogger(__name__)


class SDJWTIssuer(SDJWTCommon):
DECOY_MIN_ELEMENTS = 2
DECOY_MAX_ELEMENTS = 5
Expand Down
5 changes: 4 additions & 1 deletion pyeudiw/storage/base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,13 @@ def has_trust_anchor(self, entity_id: str) -> bool:
:rtype: bool
"""
raise NotImplementedError()

def has_trust_source(self, entity_id: str) -> bool:
raise NotImplementedError()

def _upsert_entry(self, key_label: str, collection: str, data: Union[str, dict]) -> tuple[str, dict]:
raise NotADirectoryError

def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType, jwks: dict) -> str:
"""
Add a trust attestation.
Expand Down
5 changes: 1 addition & 4 deletions pyeudiw/storage/mongo_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@ def _upsert_entry(
)

if not document_status.acknowledged:
raise StorageEntryUpdateFailed(
"Trust Anchor matched count is ZERO"
)
raise StorageEntryUpdateFailed(f"Failed to update or insert document with label {key_label}")

return document_status

Expand Down Expand Up @@ -311,7 +309,6 @@ def _update_anchor_metadata(self, entity: dict, attestation: list[str], exp: dat

entity[trust_name] = trust_entity


return entity

def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType, jwks: list[dict]) -> str:
Expand Down
2 changes: 1 addition & 1 deletion pyeudiw/tests/trust/handler/test_direct_trust.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def test_direct_trust_jwk():
trust_source = trust_handler.extract_and_update_trust_materials(random_issuer, trust_source)

obtained_jwks = trust_source.keys

mocked_issuer_jwt_vc_issuer_endpoint.stop()

assert len(obtained_jwks) == 1, f"expected 1 jwk, obtained {len(obtained_jwks)}"
Expand Down
4 changes: 2 additions & 2 deletions pyeudiw/tests/trust/mock_trust_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ def extract_and_update_trust_materials(self, issuer: str, trust_source: TrustSou
trust_source = self.get_metadata(issuer, trust_source)
trust_source.keys.append(mock_jwk)
return trust_source

class NonConformatTrustHandler:
def get_metadata(self, issuer: str, trust_source: TrustSourceData) -> dict:
return trust_source

def extract(self, issuer: str, trust_source: TrustSourceData) -> TrustSourceData:
return trust_source
return trust_source
24 changes: 12 additions & 12 deletions pyeudiw/trust/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ def get_policies(self, issuer: str) -> dict[str, any]:

if not trust_source.policies:
raise Exception(f"no trust evaluator can provide policies for {issuer}: searched among: {self.handlers_names}")

return trust_source.policies

def get_selfissued_jwt_header_trust_parameters(self, issuer: str) -> list[dict]:
"""
Get the trust parameters of a certain issuer according to some trust model.
Expand All @@ -160,9 +160,9 @@ def get_selfissued_jwt_header_trust_parameters(self, issuer: str) -> list[dict]:

if not trust_source.trust_params:
raise Exception(f"no trust evaluator can provide trust parameters for {issuer}: searched among: {self.handlers_names}")

return {type: param.trust_params for type, param in trust_source.trust_params.items()}

@staticmethod
def from_config(config: dict, db_engine: DBEngine) -> 'CombinedTrustEvaluator':
"""
Expand All @@ -172,7 +172,7 @@ def from_config(config: dict, db_engine: DBEngine) -> 'CombinedTrustEvaluator':
:type config: dict
:param db_engine: The database engine
:type db_engine: DBEngine
:returns: The CombinedTrustEvaluator
:rtype: CombinedTrustEvaluator
"""
Expand All @@ -181,21 +181,21 @@ def from_config(config: dict, db_engine: DBEngine) -> 'CombinedTrustEvaluator':
for handler_name, handler_config in config.items():
try:
trust_handler = dynamic_class_loader(
handler_config["module"],
handler_config["class"],
handler_config["module"],
handler_config["class"],
handler_config["config"]
)
except Exception as e:
raise TrustConfigurationError(f"invalid configuration for {handler_name}: {e}", e)

if not isinstance(trust_handler, TrustHandlerInterface):
raise TrustConfigurationError(f"class {trust_handler.__class__} does not satisfy the interface TrustEvaluator")


# TODO: check if the imported class has attributes that satisfy given interface
# if not isinstance(trust_handler, TrustHandlerInterface):
# raise TrustConfigurationError(f"class {trust_handler.__class__} does not satisfy the interface TrustEvaluator")

handlers.append(trust_handler)

if not handlers:
logger.warning("No configured trust model, using direct trust model")
handlers.append(DirectTrustSdJwtVc())

return CombinedTrustEvaluator(handlers, db_engine)

22 changes: 12 additions & 10 deletions pyeudiw/trust/handler/direct_trust_sd_jwt_vc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from pyeudiw.trust.handler.exception import InvalidJwkMetadataException


DEAFAULT_JWK_ENDPOINT = "/.well-known/jwt-vc-issuer"
DEAFAULT_METADATA_ENDPOINT = "/.well-known/openid-credential-issuer"
DEFAULT_SDJWTVC_METADATA_ENDPOINT = "/.well-known/jwt-vc-issuer"
DEFAULT_OPENID4VCI_METADATA_ENDPOINT = "/.well-known/openid-credential-issuer"

DEFAULT_DIRECT_TRUST_SD_JWC_VC_PARAMS = {
"connection": {
Expand All @@ -21,14 +21,17 @@
}
}


class DirectTrustSdJwtVc(TrustHandlerInterface, BaseLogger):
def __init__(
self,
httpc_params: dict = DEFAULT_DIRECT_TRUST_SD_JWC_VC_PARAMS,
jwk_endpoint: str = DEAFAULT_JWK_ENDPOINT,
metadata_endpoint: str = DEAFAULT_METADATA_ENDPOINT,
self,
httpc_params: dict | None = None,
jwk_endpoint: str = DEFAULT_SDJWTVC_METADATA_ENDPOINT,
metadata_endpoint: str = DEFAULT_OPENID4VCI_METADATA_ENDPOINT,
cache_ttl: int = 0,
) -> None:
if httpc_params is None:
httpc_params = DEFAULT_DIRECT_TRUST_SD_JWC_VC_PARAMS
self.httpc_params = httpc_params
self.jwk_endpoint = jwk_endpoint
self.metadata_endpoint = metadata_endpoint
Expand Down Expand Up @@ -81,8 +84,7 @@ def build_issuer_jwk_endpoint(issuer_id: str, well_known_path_component: str) ->
def build_issuer_metadata_endpoint(issuer: str, metadata_path_component: str) -> str:
issuer_normalized = issuer if issuer[-1] != '/' else issuer[:-1]
return issuer_normalized + metadata_path_component



def extract_and_update_trust_materials(self, issuer: str, trust_source: TrustSourceData) -> TrustSourceData:
"""
Fetches the public key of the issuer by querying a given endpoint.
Expand All @@ -107,7 +109,7 @@ def extract_and_update_trust_materials(self, issuer: str, trust_source: TrustSou

trust_source.add_keys(jwk_l)
except Exception as e:
self._log_warning("Extracting JWK" ,f"Failed to extract jwks from issuer {issuer}: {e}")
self._log_warning("Extracting JWK", f"Failed to extract jwks from issuer {issuer}: {e}")

return trust_source

Expand All @@ -126,4 +128,4 @@ def get_metadata(self, issuer: str, trust_source: TrustSourceData) -> TrustSourc
else:
trust_source.metadata = cacheable_get_http_url(self.cache_ttl, url, self.httpc_params, self.http_async_calls).json()

return trust_source
return trust_source

0 comments on commit 6343f47

Please sign in to comment.