Skip to content

Commit

Permalink
Feature: x509 chain verification (#152)
Browse files Browse the repository at this point in the history
* feat: added module x509 and function verify_x509_cert_chain

* fix: fixed conversion for right handling

* fix: removed breakpoint

* feat: added better logging

* fix: typo

* test: initial implementation of tests for x509

* fix: fixed typo

* Update pyeudiw/x509/verify.py

Co-authored-by: Giuseppe De Marco <giuseppe.demarco@teamdigitale.governo.it>

* Update pyeudiw/x509/verify.py

Co-authored-by: Giuseppe De Marco <giuseppe.demarco@teamdigitale.governo.it>

* fix: fixed typo

* fix: fix argument passing in verify_x509_cert_chain

* feat: added x509 support to storage module

* test: added testo for db engine

* feat: added verify_x509_anchor plus refactoring

* test: added tests for verify_x509_anchor

* fix: added dependency

* fix: typo

* feat: added optional exp in verify functions

* test: added test for None exp

* feat: initial support for x509 handling

* Update pyeudiw/storage/mongo_storage.py

Co-authored-by: Giuseppe De Marco <giuseppe.demarco@teamdigitale.governo.it>

* Update pyeudiw/storage/mongo_storage.py

Co-authored-by: Giuseppe De Marco <giuseppe.demarco@teamdigitale.governo.it>

* feat: added function get_issuer_from_x5c

* test: added test for get_issuer_from_x5c function

* fix: added typ field to protected header

* feat: implemented evaluation_method

* fix: added trust_type_map

* fix: fixed variables names

* feat: added function is_der_format

* test: test der validity with is_der_format

* fix: fixed evaluation_method reading chain elements type

* fix: removed unecessary field

* fix: renamed function

* fix: added more context to warning messages

* fix: added mappings for entity field name

---------

Co-authored-by: Giuseppe De Marco <giuseppe.demarco@teamdigitale.governo.it>
  • Loading branch information
PascalDR and Giuseppe De Marco authored Nov 14, 2023
1 parent 77b5efd commit 13a3c25
Show file tree
Hide file tree
Showing 11 changed files with 542 additions and 64 deletions.
28 changes: 23 additions & 5 deletions pyeudiw/storage/base_storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
import datetime

from enum import Enum
from typing import Union

class TrustType(Enum):
X509 = 0
FEDERATION = 1

trust_type_map : dict = {
TrustType.X509 : "x509",
TrustType.FEDERATION: "federation"
}

trust_attestation_field_map : dict = {
TrustType.X509 : "x5c",
TrustType.FEDERATION: "chain"
}

trust_anchor_field_map : dict = {
TrustType.X509 : "pem",
TrustType.FEDERATION: "entity_configuration"
}

class BaseStorage(object):
def init_session(self, document_id: str, dpop_proof: dict, attestation: dict):
Expand Down Expand Up @@ -37,16 +55,16 @@ def has_trust_attestation(self, entity_id: str):
def has_trust_anchor(self, entity_id: str):
raise NotImplementedError()

def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime) -> str:
def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType) -> str:
raise NotImplementedError()

def add_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datetime):
def add_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datetime, trust_type: TrustType):
raise NotImplementedError()

def update_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime) -> str:
def update_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType) -> str:
raise NotImplementedError()

def update_trust_anchor(self, entity_id: str, entity_configuration: dict, exp: datetime) -> str:
def update_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datetime, trust_type: TrustType) -> str:
raise NotImplementedError()

def exists_by_state_and_session_id(self, state: str, session_id: str = "") -> bool:
Expand Down
24 changes: 12 additions & 12 deletions pyeudiw/storage/db_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import datetime
from typing import Callable, Union
from pyeudiw.storage.base_cache import BaseCache, RetrieveStatus
from pyeudiw.storage.base_storage import BaseStorage
from pyeudiw.storage.base_storage import BaseStorage, TrustType
from pyeudiw.storage.exceptions import (
ChainNotExist,
StorageWriteError,
Expand Down Expand Up @@ -155,24 +155,24 @@ def has_trust_attestation(self, entity_id: str):
def has_trust_anchor(self, entity_id: str):
return self.get_anchor(entity_id)

def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime) -> str:
return self.write("add_trust_attestation", entity_id, attestation, exp)
def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType = TrustType.FEDERATION) -> str:
return self.write("add_trust_attestation", entity_id, attestation, exp, trust_type)

def add_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datetime):
return self.write("add_trust_anchor", entity_id, entity_configuration, exp)
def add_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datetime, trust_type: TrustType = TrustType.FEDERATION):
return self.write("add_trust_anchor", entity_id, entity_configuration, exp, trust_type)

def update_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime) -> str:
return self.write("update_trust_attestation", entity_id, attestation, exp)
def update_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType = TrustType.FEDERATION) -> str:
return self.write("update_trust_attestation", entity_id, attestation, exp, trust_type)

def add_or_update_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime) -> str:
def add_or_update_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType = TrustType.FEDERATION) -> str:
try:
self.get_trust_attestation(entity_id)
return self.write("update_trust_attestation", entity_id, attestation, exp)
return self.write("update_trust_attestation", entity_id, attestation, exp, trust_type)
except (EntryNotFound, ChainNotExist):
return self.write("add_trust_attestation", entity_id, attestation, exp)
return self.write("add_trust_attestation", entity_id, attestation, exp, trust_type)

def update_trust_anchor(self, entity_id: str, entity_configuration: dict, exp: datetime) -> str:
return self.write("update_trust_anchor", entity_id, entity_configuration, exp)
def update_trust_anchor(self, entity_id: str, entity_configuration: dict, exp: datetime, trust_type: TrustType = TrustType.FEDERATION) -> str:
return self.write("update_trust_anchor", entity_id, entity_configuration, exp, trust_type)

def _cache_try_retrieve(self, object_name: str, on_not_found: Callable[[], str]) -> tuple[dict, RetrieveStatus, int]:
for i, cache in enumerate(self.caches):
Expand Down
91 changes: 56 additions & 35 deletions pyeudiw/storage/mongo_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@

from pymongo.results import UpdateResult

from pyeudiw.storage.base_storage import BaseStorage
from pyeudiw.storage.base_storage import (
BaseStorage,
TrustType,
trust_type_map,
trust_attestation_field_map,
trust_anchor_field_map
)
from pyeudiw.storage.exceptions import (
ChainNotExist,
StorageEntryUpdateFailed
)
from typing import Union


class MongoStorage(BaseStorage):
def __init__(self, conf: dict, url: str, connection_params: dict = {}) -> None:
super().__init__()
Expand Down Expand Up @@ -225,34 +230,58 @@ def _add_entry(
db_collection = getattr(self, collection)
db_collection.insert_one(attestation)
return entity_id

def _update_attestation_metadata(self, entity: dict, attestation: list[str], exp: datetime, trust_type: TrustType):
trust_name = trust_type_map[trust_type]
trust_field = trust_attestation_field_map[trust_type]

trust_entity = entity.get(trust_name, {})

trust_entity[trust_field] = attestation
trust_entity["exp"] = exp

entity[trust_name] = trust_entity

return entity

def _update_anchor_metadata(self, entity: dict, attestation: list[str], exp: datetime, trust_type: TrustType):
trust_name = trust_type_map[trust_type]
trust_field = trust_anchor_field_map[trust_type]

def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime):
trust_entity = entity.get(trust_name, {})

trust_entity[trust_field] = attestation
trust_entity["exp"] = exp

entity[trust_name] = trust_entity

return entity

def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType):
entity = {
"entity_id": entity_id,
"federation": {
"chain": attestation,
"exp": exp
},
"federation": {},
"x509": {}
}

updated_entity = self._update_attestation_metadata(entity, attestation, exp, trust_type)

self._add_entry(
"trust_attestations", entity_id, entity, exp
"trust_attestations", entity_id, updated_entity, exp
)

def add_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datetime):
entry = {
"entity_id": entity_id,
"federation": {
"entity_configuration": entity_configuration,
"exp": exp
},
"x509": {} # TODO x509
}
def add_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datetime, trust_type: TrustType):
if self.has_trust_anchor(entity_id):
self.update_trust_anchor(entity_id, entity_configuration, exp)
self.update_trust_anchor(entity_id, entity_configuration, exp, trust_type)
else:
self._add_entry("trust_anchors", entity_id, entry, exp)
entity = {
"entity_id": entity_id,
"federation": {},
"x509": {}
}

updated_entity = self._update_anchor_metadata(entity, entity_configuration, exp, trust_type)
self._add_entry("trust_anchors", entity_id, updated_entity, exp)

def _update_trust_attestation(self, collection: str, entity_id: str, entity: dict, exp: datetime) -> str:
if not self._has_trust_attestation(collection, entity_id):
Expand All @@ -264,30 +293,22 @@ def _update_trust_attestation(self, collection: str, entity_id: str, entity: dic
)
return documentStatus

def update_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime) -> str:
entity = {
"federation": {
"chain": attestation,
"exp": exp
}
}
def update_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType) -> str:
old_entity = self._get_trust_attestation("trust_attestations", entity_id) or {}
upd_entity = self._update_attestation_metadata(old_entity, attestation, exp, trust_type)

return self._update_trust_attestation("trust_attestations", entity_id, entity, exp)
return self._update_trust_attestation("trust_attestations", entity_id, upd_entity, exp)

def update_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datetime) -> str:
entity = {
"federation": {
"entity_configuration": entity_configuration,
"exp": exp
}
}
def update_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datetime, trust_type: TrustType) -> str:
old_entity = self._get_trust_attestation("trust_attestations", entity_id) or {}
upd_entity = self._update_anchor_metadata(old_entity, entity_configuration, exp, trust_type)

if not self.has_trust_anchor(entity_id):
raise ChainNotExist(f"Chain with entity id {entity_id} not exist")

documentStatus = self.trust_anchors.update_one(
{"entity_id": entity_id},
{"$set": entity}
{"$set": upd_entity}
)
if not documentStatus.matched_count:
raise StorageEntryUpdateFailed(
Expand Down
2 changes: 1 addition & 1 deletion pyeudiw/tests/satosa/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def test_request_endpoint(self, context):
WALLET_INSTANCE_ATTESTATION,
protected={
'trust_chain': trust_chain_wallet,
'x5c': []
'x5c': [],
}
)

Expand Down
139 changes: 139 additions & 0 deletions pyeudiw/tests/storage/test_db_engine.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import uuid
import pytest

from datetime import datetime
from pyeudiw.storage.db_engine import DBEngine
from pyeudiw.storage.base_storage import TrustType
from pyeudiw.storage.exceptions import StorageWriteError
from pyeudiw.tests.settings import CONFIG


Expand Down Expand Up @@ -59,3 +62,139 @@ def test_update_response_object_unexistent_id_object(self):
str(uuid.uuid4()), str(uuid.uuid4()), response_object)
except:
return

@pytest.fixture(autouse=True)
def test_insert_trusted_attestation_federation(self):
self.federation_entity_id = str(uuid.uuid4())
date = datetime.now()

replica_count = self.engine.add_trust_attestation(
self.federation_entity_id, ["a", "b", "c"], date)

assert replica_count > 0

ta = self.engine.get_trust_attestation(self.federation_entity_id)

assert ta.get("federation", None) != None
assert ta["federation"]["chain"] == ["a", "b", "c"]

@pytest.fixture(autouse=True)
def test_insert_trusted_attestation_x509(self):
self.x509_entity_id = str(uuid.uuid4())
date = datetime.now()

replica_count = self.engine.add_trust_attestation(
self.x509_entity_id, ["a", "b", "c"], date, TrustType.X509)

assert replica_count > 0

ta = self.engine.get_trust_attestation(self.x509_entity_id)

assert ta.get("x509", None) != None
assert ta["x509"]["x5c"] == ["a", "b", "c"]

def test_update_trusted_attestation_federation(self):
date = datetime.now()

replica_count = self.engine.update_trust_attestation(
self.federation_entity_id, ["a", "b", "d"], date)

assert replica_count > 0

ta = self.engine.get_trust_attestation(self.federation_entity_id)

assert ta.get("federation", None) != None
assert ta["federation"]["chain"] == ["a", "b", "d"]

def test_update_trusted_attestation_x509(self):
date = datetime.now()

replica_count = self.engine.update_trust_attestation(
self.x509_entity_id, ["a", "b", "d"], date, TrustType.X509)

assert replica_count > 0

ta = self.engine.get_trust_attestation(self.x509_entity_id)

assert ta.get("x509", None) != None
assert ta["x509"]["x5c"] == ["a", "b", "d"]

def test_update_unexistent_trusted_attestation(self):
try:
date = datetime.now()

self.engine.update_trust_attestation(
"12345", ["a", "b", "d"], date)

assert False

except StorageWriteError as e:
return

@pytest.fixture(autouse=True)
def test_insert_trusted_anchor_federation(self):
self.federation_entity_anchor_id = str(uuid.uuid4())
date = datetime.now()

replica_count = self.engine.add_trust_anchor(
self.federation_entity_anchor_id, "test123", date)

assert replica_count > 0

ta = self.engine.get_trust_anchor(self.federation_entity_anchor_id)

assert ta.get("federation", None) != None
assert ta["federation"]["entity_configuration"] == "test123"

@pytest.fixture(autouse=True)
def test_insert_trusted_anchor_x509(self):
self.x509_entity_anchor_id = str(uuid.uuid4())
date = datetime.now()

replica_count = self.engine.add_trust_anchor(
self.x509_entity_anchor_id, "test123", date, TrustType.X509)

assert replica_count > 0

ta = self.engine.get_trust_anchor(self.x509_entity_anchor_id)

assert ta.get("x509", None) != None
assert ta["x509"]["pem"] == "test123"

def test_update_trusted_anchor_federation(self):
date = datetime.now()

replica_count = self.engine.update_trust_anchor(
self.federation_entity_anchor_id, "test124", date)

assert replica_count > 0

ta = self.engine.get_trust_anchor(self.federation_entity_anchor_id)

assert ta.get("federation", None) != None
assert ta["federation"]["entity_configuration"] == "test124"

def test_update_trusted_anchor_x509(self):
date = datetime.now()

replica_count = self.engine.update_trust_anchor(
self.x509_entity_anchor_id, "test124", date, TrustType.X509)

assert replica_count > 0

ta = self.engine.get_trust_anchor(self.x509_entity_anchor_id)

assert ta.get("x509", None) != None
assert ta["x509"]["pem"] == "test124"

def test_update_unexistent_trusted_anchor(self):
try:
date = datetime.now()

self.engine.update_trust_anchor(
"12345", "test124", date, TrustType.X509)

assert False

except StorageWriteError as e:
return
Loading

0 comments on commit 13a3c25

Please sign in to comment.