Skip to content

Commit

Permalink
fix: storage layer optionality (#283)
Browse files Browse the repository at this point in the history
* fix: storage layer optionality

* Apply suggestions from code review

---------

Co-authored-by: Giuseppe De Marco <giuseppe.demarco@teamdigitale.governo.it>
  • Loading branch information
Zicchio and Giuseppe De Marco authored Oct 23, 2024
1 parent 13bb766 commit 2eb59f0
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 27 deletions.
7 changes: 4 additions & 3 deletions example/satosa/integration_test/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ def apply_trust_settings(db_engine_inst: DBEngine) -> DBEngine:

settings = ISSUER_CONF
db_engine_inst.add_or_update_trust_attestation(
entity_id=settings["issuer"],
trust_type=TrustType.DIRECT_TRUST_SD_JWT_VC,
jwks=leaf_cred_jwk_prot.serialize())
entity_id=settings["issuer"],
trust_type=TrustType.DIRECT_TRUST_SD_JWT_VC,
jwks=[leaf_cred_jwk_prot.serialize()]
)
return db_engine_inst

def create_saml_auth_request() -> str:
Expand Down
6 changes: 3 additions & 3 deletions pyeudiw/storage/db_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def has_trust_attestation(self, entity_id: str) -> bool:
def has_trust_anchor(self, entity_id: str) -> bool:
return self.get_trust_anchor(entity_id) is not None

def add_trust_attestation(self, entity_id: str, attestation: list[str] = [], exp: datetime = None, trust_type: TrustType = TrustType.FEDERATION, jwks: dict = None) -> str:
def add_trust_attestation(self, entity_id: str, attestation: list[str] = [], exp: datetime = None, trust_type: TrustType = TrustType.FEDERATION, jwks: list[dict] = []) -> str:
return self.write("add_trust_attestation", entity_id, attestation, exp, trust_type, jwks)

def add_trust_attestation_metadata(self, entity_id: str, metadat_type: str, metadata: dict) -> str:
Expand All @@ -165,10 +165,10 @@ def add_trust_attestation_metadata(self, entity_id: str, metadat_type: str, meta
def add_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datetime, trust_type: TrustType = TrustType.FEDERATION) -> str:
return self.write("add_trust_anchor", entity_id, entity_configuration, exp, trust_type)

def update_trust_attestation(self, entity_id: str, attestation: list[str] = [], exp: datetime = None, trust_type: TrustType = TrustType.FEDERATION, jwks: dict = None) -> str:
def update_trust_attestation(self, entity_id: str, attestation: list[str] = [], exp: datetime = None, trust_type: TrustType = TrustType.FEDERATION, jwks: list[dict] = []) -> str:
return self.write("update_trust_attestation", entity_id, attestation, exp, trust_type, jwks)

def add_or_update_trust_attestation(self, entity_id: str, attestation: list[str] = [], exp: datetime = None, trust_type: TrustType = TrustType.FEDERATION, jwks: dict = None) -> str:
def add_or_update_trust_attestation(self, entity_id: str, attestation: list[str] = [], exp: datetime = None, trust_type: TrustType = TrustType.FEDERATION, jwks: list[dict] = []) -> str:
try:
self.get_trust_attestation(entity_id)
return self.write("update_trust_attestation", entity_id, attestation, exp, trust_type, jwks)
Expand Down
18 changes: 11 additions & 7 deletions pyeudiw/storage/mongo_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,18 @@ def _add_entry(
db_collection.insert_one(attestation)
return entity_id

def _update_attestation_metadata(self, entity: dict, attestation: list[str], exp: datetime, trust_type: TrustType, jwks: dict):
def _update_attestation_metadata(self, entity: dict, attestation: list[str], exp: datetime, trust_type: TrustType, jwks: list[dict]):
trust_name = trust_type_map[trust_type]
trust_field = trust_attestation_field_map.get(trust_type, None)

trust_entity = entity.get(trust_name, {})

if trust_field and attestation: trust_entity[trust_field] = attestation
if exp: trust_entity["exp"] = exp
if jwks: trust_entity["jwks"] = jwks
if trust_field and attestation:
trust_entity[trust_field] = attestation
if exp:
trust_entity["exp"] = exp
if jwks:
trust_entity["jwks"] = jwks

entity[trust_name] = trust_entity

Expand All @@ -272,14 +275,15 @@ def _update_anchor_metadata(self, entity: dict, attestation: list[str], exp: dat

trust_entity = entity.get(trust_name, {})

if trust_field and attestation: trust_entity[trust_field] = attestation
if trust_field and attestation:
trust_entity[trust_field] = attestation
trust_entity["exp"] = exp

entity[trust_name] = trust_entity

return entity

def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType, jwks: dict) -> str:
def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType, jwks: list[dict]) -> str:
entity = {
"entity_id": entity_id,
"federation": {},
Expand Down Expand Up @@ -331,7 +335,7 @@ def _update_trust_attestation(self, collection: str, entity_id: str, entity: dic
)
return documentStatus

def update_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType, jwks: dict) -> str:
def update_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType, jwks: list[dict]) -> str:
old_entity = self._get_trust_attestation(
"trust_attestations", entity_id) or {}
upd_entity = self._update_attestation_metadata(
Expand Down
3 changes: 2 additions & 1 deletion pyeudiw/tests/satosa/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def create_backend(self):
db_engine_inst.add_or_update_trust_attestation(
entity_id=CREDENTIAL_ISSUER_ENTITY_ID,
trust_type=TrustType.DIRECT_TRUST_SD_JWT_VC,
jwks=issuer_jwk)
jwks=[issuer_jwk]
)

self.backend = OpenID4VPBackend(
Mock(), INTERNAL_ATTRIBUTES, CONFIG, BASE_URL, "name")
Expand Down
3 changes: 1 addition & 2 deletions pyeudiw/tests/trust/test_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,4 @@ def test_combined_trust_evaluator():
"direct_trust_sd_jwt_vc": DirectTrustSdJwtVc(**DEFAULT_DIRECT_TRUST_SD_JWC_VC_PARAMS)
}
combined = CombinedTrustEvaluator(evaluators)
# TODO: re-enable when fixed
# assert MockTrustEvaluator.mock_jwk in combined.get_public_keys("mock_issuer")
assert MockTrustEvaluator.mock_jwk in combined.get_public_keys("mock_issuer")
36 changes: 25 additions & 11 deletions pyeudiw/trust/dynamic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sys
from typing import Any, Optional
from typing import Optional

if float(f"{sys.version_info.major}.{sys.version_info.minor}") >= 3.12:
from typing import TypedDict
Expand All @@ -18,6 +18,7 @@

TrustModuleConfiguration_T = TypedDict("_DynamicTrustConfiguration", {"module": str, "class": str, "config": dict})


def dynamic_trust_evaluators_loader(trust_config: dict[str, TrustModuleConfiguration_T]) -> dict[str, TrustEvaluator]: # type: ignore
"""Load a dynamically importable/configurable set of TrustEvaluators,
identified by the trust model they refer to.
Expand Down Expand Up @@ -60,9 +61,19 @@ def __init__(self, trust_evaluators: dict[str, TrustEvaluator], storage: Optiona

def _get_trust_identifier_names(self) -> str:
return f'[{",".join(self.trust_evaluators.keys())}]'

def _get_public_keys_from_storage(self, eval_identifier: str, issuer: str) -> dict | None:
# note: keys are serialized as jwks

def _get_public_keys_from_storage(self, eval_identifier: str, issuer: str) -> list[dict] | None:
"""
Search public key for trust model 'eval_identifier' in the storage layer (if any). If the storage
layer fails or does not exists, None is returned.
Public keys are intended to be serialized as jwks (jwk set) in the storage layer.
:returns: a JWKS dictionary if the keys are found in the storage, or None if storage lookup fails
:rtype: dict | None
"""
if not self.storage:
return None

if trust_attestation := self.storage.get_trust_attestation(issuer):
if trust_entity := trust_attestation.get(eval_identifier, None):
if trust_entity_jwks := trust_entity.get("jwks", None):
Expand All @@ -71,16 +82,19 @@ def _get_public_keys_from_storage(self, eval_identifier: str, issuer: str) -> di
# with mongodb we use ttl integrated in the engine
return new_pks
return None

def _get_public_keys(self, eval_identifier: str, eval_instance: TrustEvaluator, issuer: str) -> dict:

def _get_public_keys(self, eval_identifier: str, eval_instance: TrustEvaluator, issuer: str) -> list[dict]:
new_pks: list = []
try:
new_pks = eval_instance.get_public_keys(issuer)
self.storage.add_or_update_trust_attestation(issuer, trust_type=TrustType(eval_identifier), jwks=new_pks)
except:
if self.storage:
self.storage.add_or_update_trust_attestation(issuer, trust_type=TrustType(eval_identifier), jwks=new_pks)
except Exception:
new_pks = self._get_public_keys_from_storage(eval_identifier, issuer)

if new_pks: return new_pks
else: raise Exception
if new_pks:
return new_pks
raise Exception(f"unable to find any public key with trust model {eval_identifier}")

def get_public_keys(self, issuer: str) -> list[dict]:
"""
Expand All @@ -97,7 +111,7 @@ def get_public_keys(self, issuer: str) -> list[dict]:
self._log_warning(f"failed to find any key of issuer {issuer} with model {eval_identifier}: {eval_instance.__class__.__name__}", e)
continue
if new_pks:
pks.append(new_pks)
pks.extend(new_pks)
if not pks:
raise Exception(f"no trust evaluator can provide cyptographic material for {issuer}: searched among: {self._get_trust_identifier_names()}")
return pks
Expand Down

0 comments on commit 2eb59f0

Please sign in to comment.