diff --git a/pyeudiw/federation/__init__.py b/pyeudiw/federation/__init__.py index e8044ceb..afdc418d 100644 --- a/pyeudiw/federation/__init__.py +++ b/pyeudiw/federation/__init__.py @@ -1,6 +1,16 @@ from pyeudiw.federation.schemas.entity_configuration import EntityStatementPayload, EntityConfigurationPayload def is_es(payload: dict) -> bool: + """ + Determines if payload dict is an Entity Statement + + :param payload: the object to determine if is an Entity Statement + :type payload: dict + + :returns: True if is an Entity Statement and False otherwise + :rtype: bool + """ + try: EntityStatementPayload(**payload) if payload["iss"] != payload["sub"]: @@ -10,6 +20,16 @@ def is_es(payload: dict) -> bool: def is_ec(payload: dict) -> bool: + """ + Determines if payload dict is an Entity Configuration + + :param payload: the object to determine if is an Entity Configuration + :type payload: dict + + :returns: True if is an Entity Configuration and False otherwise + :rtype: bool + """ + try: EntityConfigurationPayload(**payload) return True diff --git a/pyeudiw/federation/http_client.py b/pyeudiw/federation/http_client.py index 177ab666..a7cb3b04 100644 --- a/pyeudiw/federation/http_client.py +++ b/pyeudiw/federation/http_client.py @@ -3,7 +3,21 @@ import requests -async def fetch(session, url, httpc_params: dict): +async def fetch(session: dict, url: str, httpc_params: dict) -> str: + """ + Fetches the content of a URL. + + :param session: a dict representing the current session + :type session: dict + :param url: the url where fetch the content + :type url: str + :param httpc_params: parameters to perform http requests. + :type httpc_params: dict + + :returns: the response in string format + :rtype: str + """ + async with session.get(url, **httpc_params.get("connection", {})) as response: if response.status != 200: # pragma: no cover # response.raise_for_status() @@ -11,7 +25,21 @@ async def fetch(session, url, httpc_params: dict): return await response.text() -async def fetch_all(session, urls, httpc_params: dict): +async def fetch_all(session: dict, urls: list[str], httpc_params: dict) -> list[str]: + """ + Fetches the content of a list of URL. + + :param session: a dict representing the current session + :type session: dict + :param urls: the url list where fetch the content + :type urls: list[str] + :param httpc_params: parameters to perform http requests. + :type httpc_params: dict + + :returns: the list of responses in string format + :rtype: list[str] + """ + tasks = [] for url in urls: task = asyncio.create_task(fetch(session, url, httpc_params)) @@ -21,7 +49,19 @@ async def fetch_all(session, urls, httpc_params: dict): async def http_get(urls, httpc_params: dict, sync=True): + """ + Perform a GET http call. + + :param session: a dict representing the current session + :type session: dict + :param urls: the url list where fetch the content + :type urls: list[str] + :param httpc_params: parameters to perform http requests. + :type httpc_params: dict + :returns: the list of responses in string format + :rtype: list[str] + """ if sync: _conf = { 'verify': httpc_params['connection']['ssl'], diff --git a/pyeudiw/federation/statements.py b/pyeudiw/federation/statements.py index c167475e..9fd20b44 100644 --- a/pyeudiw/federation/statements.py +++ b/pyeudiw/federation/statements.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from copy import deepcopy from pyeudiw.federation.exceptions import ( UnknownKid, @@ -7,20 +9,18 @@ InvalidEntityHeader, InvalidEntityStatementPayload ) -from pyeudiw.federation.http_client import http_get from pyeudiw.federation.schemas.entity_configuration import ( EntityConfigurationHeader, EntityStatementPayload ) from pyeudiw.jwt.utils import unpad_jwt_payload, unpad_jwt_header from pyeudiw.jwt import JWSHelper +from pyeudiw.tools.utils import get_http_url from pydantic import ValidationError -import asyncio + import json import logging -import requests - try: pass @@ -32,42 +32,84 @@ logger = logging.getLogger(__name__) -def jwks_from_jwks_uri(jwks_uri: str, httpc_params: dict) -> list: - return [json.loads(asyncio.run(http_get([jwks_uri], httpc_params)))] # pragma: no cover +def jwks_from_jwks_uri(jwks_uri: str, httpc_params: dict, http_async: bool = True) -> list[dict]: + """ + Retrieves jwks from an entity uri. + :param jwks_uri: the uri where the jwks are located. + :type jwks_uri: str + :param httpc_params: parameters to perform http requests. + :type httpc_params: dict + :param http_async: if is set to True the operation will be performed in async (deafault True) + :type http_async: bool -def get_federation_jwks(jwt_payload: dict, httpc_params: dict): - return ( - jwt_payload.get("jwks", {}).get("keys", []) - ) + :returns: A list of entity jwks. + :rtype: list[dict] + """ + response = get_http_url(jwks_uri, httpc_params, http_async) + jwks = json.loads(response) -def get_http_url(urls: list, httpc_params: dict, http_async: bool = True) -> list: - if http_async: - responses = asyncio.run( - http_get(urls, httpc_params)) # pragma: no cover - else: - responses = [] - for i in urls: - res = requests.get(i, **httpc_params) # nosec - B113 - responses.append(res.content.decode()) - return responses + return [jwks] -def get_entity_statements(urls: list, httpc_params: dict) -> list: +def get_federation_jwks(jwt_payload: dict) -> list[dict]: """ - Fetches an entity statement/configuration + Returns the list of JWKS inside a JWT payload. + + :param jwt_payload: the jwt payload from where extract the JWKs. + :type jwt_payload: dict + + :returns: A list of entity jwk's keys. + :rtype: list[dict] """ - if isinstance(urls, str): - urls = [urls] # pragma: no cover + + jwks = jwt_payload.get("jwks", {}) + keys = jwks.get("keys", []) + + return keys + + +def get_entity_statements(urls: list[str] | str, httpc_params: dict, http_async: bool = True) -> list[dict]: + """ + Fetches an entity statement from the specified urls. + + :param urls: The url or a list of url where perform the GET HTTP calls + :type urls: list[str] | str + :param httpc_params: parameters to perform http requests. + :type httpc_params: dict + :param http_async: if is set to True the operation will be performed in async (deafault True) + :type http_async: bool + + :returns: A list of entity statements. + :rtype: list[dict] + """ + + urls = urls if isinstance(urls, list) else [urls] + for url in urls: logger.debug(f"Starting Entity Statement Request to {url}") - return get_http_url(urls, httpc_params) + return get_http_url(urls, httpc_params, http_async) + + +def get_entity_configurations(subjects: list[str] | str, httpc_params: dict, http_async: bool = True): + """ + Fetches an entity configuration from the specified subjects. + + :param subjects: The url or a list of url where perform the GET HTTP calls + :type subjects: list[str] | str + :param httpc_params: parameters to perform http requests. + :type httpc_params: dict + :param http_async: if is set to True the operation will be performed in async (deafault True) + :type http_async: bool + + :returns: A list of entity statements. + :rtype: list[dict] + """ + + subjects = subjects if isinstance(subjects, list) else [subjects] -def get_entity_configurations(subjects: list, httpc_params: dict): - if isinstance(subjects, str): - subjects = [subjects] urls = [] for subject in subjects: if subject[-1] != "/": @@ -75,11 +117,23 @@ def get_entity_configurations(subjects: list, httpc_params: dict): url = f"{subject}{OIDCFED_FEDERATION_WELLKNOWN_URL}" urls.append(url) logger.info(f"Starting Entity Configuration Request for {url}") - return get_http_url(urls, httpc_params) + + return get_http_url(urls, httpc_params, http_async) class TrustMark: + """The class representing a Trust Mark""" + def __init__(self, jwt: str, httpc_params: dict): + """ + Create an instance of Trust Mark + + :param jwt: the JWT containing the trust marks + :type jwt: str + :param httpc_params: parameters to perform http requests. + :type httpc_params: dict + """ + self.jwt = jwt self.header = unpad_jwt_header(jwt) self.payload = unpad_jwt_payload(jwt) @@ -93,7 +147,16 @@ def __init__(self, jwt: str, httpc_params: dict): self.issuer_entity_configuration = None self.httpc_params = httpc_params - def validate_by(self, ec) -> bool: + def validate_by(self, ec: dict) -> bool: + """ + Validates Trust Marks by an Entity Configuration + + :param ec: the entity configuration to validate by + :type ec: dict + + :returns: True if is valid otherwise False + :rtype: bool + """ try: EntityConfigurationHeader(**self.header) except ValidationError as e: @@ -116,9 +179,15 @@ def validate_by(self, ec) -> bool: return payload def validate_by_its_issuer(self) -> bool: + """ + Validates Trust Marks by it's issuer + + :returns: True if is valid otherwise False + :rtype: bool + """ if not self.issuer_entity_configuration: self.issuer_entity_configuration = get_entity_configurations( - self.iss, self.httpc_params + self.iss, self.httpc_params, False ) try: ec = EntityStatement(self.issuer_entity_configuration[0]) @@ -154,17 +223,30 @@ def __init__( self, jwt: str, httpc_params: dict, - filter_by_allowed_trust_marks: list = [], - trust_anchor_entity_conf=None, - trust_mark_issuers_entity_confs: dict = [], + filter_by_allowed_trust_marks: list[str] = [], + trust_anchor_entity_conf: 'EntityStatement' | None = None, + trust_mark_issuers_entity_confs: list[EntityStatement] = [], ): + """ + Creates EntityStatement istance + + :param jwt: the JWT containing the trust marks. + :type jwt: str + :param httpc_params: parameters to perform http requests. + :type httpc_params: dict + :param filter_by_allowed_trust_marks: allowed trust marks list. + :type filter_by_allowed_trust_marks: list[str] + :param trust_anchor_entity_conf: the trust anchor entity conf or None + :type trust_anchor_entity_conf: EntityStatement | None + :param trust_mark_issuers_entity_confs: the list containig the trust mark's entiity confs + """ self.jwt = jwt self.header = unpad_jwt_header(jwt) self.payload = unpad_jwt_payload(jwt) self.sub = self.payload["sub"] self.iss = self.payload["iss"] self.exp = self.payload["exp"] - self.jwks = get_federation_jwks(self.payload, httpc_params) + self.jwks = get_federation_jwks(self.payload) if not self.jwks or not self.jwks[0]: _msg = f"Missing jwks in the statement for {self.sub}" logger.error(_msg) @@ -196,6 +278,15 @@ def __init__( self.verified_trust_marks = [] self.is_valid = False + def update_trust_anchor_conf(self, trust_anchor_entity_conf: 'EntityStatement') -> None: + """ + Updates the internal Trust Anchor conf. + + :param trust_anchor_entity_conf: the trust anchor entity conf + :type trust_anchor_entity_conf: EntityStatement + """ + self.trust_anchor_entity_conf = trust_anchor_entity_conf + def validate_by_itself(self) -> bool: """ validates the entity configuration by it self @@ -319,12 +410,22 @@ def validate_by_allowed_trust_marks(self) -> bool: def get_superiors( self, - authority_hints: list = [], + authority_hints: list[str] = [], max_authority_hints: int = 0, - superiors_hints: list = [], + superiors_hints: list[dict] = [], ) -> dict: """ get superiors entity configurations + + :param authority_hints: the authority hint list + :type authority_hints: list[str] + :param max_authority_hints: the number of max authority hint + :type max_authority_hints: int + :param superiors_hints: the list of superior hints + :type superiors_hints: list[dict] + + :returns: a dict with the superior's entity configurations + :rtype: dict """ # apply limits if defined authority_hints = authority_hints or deepcopy( @@ -361,7 +462,7 @@ def get_superiors( if not jwts: jwts = get_entity_configurations( - authority_hints, self.httpc_params) + authority_hints, self.httpc_params, False) for jwt in jwts: try: @@ -393,6 +494,12 @@ def get_superiors( def validate_descendant_statement(self, jwt: str) -> bool: """ jwt is a descendant entity statement issued by self + + :param jwt: the JWT to validate by + :type jwt: str + + :returns: True if is valid or False otherwise + :rtype: bool """ header = unpad_jwt_header(jwt) payload = unpad_jwt_payload(jwt) @@ -425,13 +532,16 @@ def validate_descendant_statement(self, jwt: str) -> bool: self.verified_descendant_statements_as_jwt[payload["sub"]] = jwt return self.verified_descendant_statements - def validate_by_superior_statement(self, jwt: str, ec): + def validate_by_superior_statement(self, jwt: str, ec: 'EntityStatement') -> str: """ - jwt is a statement issued by a superior - ec is a superior entity configuration - - this method validates self with the jwks contained in statement - of the superior + validates self with the jwks contained in statement of the superior + :param jwt: the statement issued by a superior in form of JWT + :type jwt: str + :param ec: is a superior entity configuration + :type ec: EntityStatement + + :returns: the entity configuration subject if is valid + :rtype: str """ is_valid = None payload = {} @@ -439,7 +549,7 @@ def validate_by_superior_statement(self, jwt: str, ec): payload = unpad_jwt_payload(jwt) ec.validate_by_itself() ec.validate_descendant_statement(jwt) - _jwks = get_federation_jwks(payload, self.httpc_params) + _jwks = get_federation_jwks(payload) _kids = [i.get("kid") for i in _jwks] jwsh = JWSHelper(_jwks[_kids.index(self.header["kid"])]) @@ -471,11 +581,14 @@ def validate_by_superiors( superiors_entity_configurations: dict = {}, ) -> dict: """ - validates the entity configuration with the entity statements - issued by its superiors + validates the entity configuration with the entity statements issued by its superiors + this methods create self.verified_superiors and failed ones and self.verified_by_superiors and failed ones - this methods create self.verified_superiors and failed ones - and self.verified_by_superiors and failed ones + :param superiors_entity_configurations: an object containing the entity configurations of superiors + :type superiors_entity_configurations: dict + + :returns: an object containing the superior validations + :rtype: dict """ for ec in superiors_entity_configurations: if ec.sub in ec.verified_by_superiors: @@ -498,7 +611,7 @@ def validate_by_superiors( else: _url = f"{fetch_api_url}?sub={self.sub}" logger.info(f"Getting entity statements from {_url}") - jwts = get_entity_statements([_url], self.httpc_params) + jwts = get_entity_statements([_url], self.httpc_params, False) if not jwts: logger.error( f"Empty response for {_url}" diff --git a/pyeudiw/federation/trust_chain_builder.py b/pyeudiw/federation/trust_chain_builder.py index ad9b4319..0371d3b3 100644 --- a/pyeudiw/federation/trust_chain_builder.py +++ b/pyeudiw/federation/trust_chain_builder.py @@ -26,13 +26,6 @@ class TrustChainBuilder: """ A trust walker that fetches statements and evaluate the evaluables - - max_intermediaries means how many hops are allowed to the trust anchor - max_authority_hints means how much authority_hints to follow on each hop - - required_trust_marks means all the trsut marks needed to start a metadata discovery - at least one of the required trust marks is needed to start a metadata discovery - if this param if absent the filter won't be considered. """ def __init__( @@ -42,14 +35,37 @@ def __init__( httpc_params: dict, trust_anchor_configuration: Union[EntityStatement, str, None] = None, max_authority_hints: int = 10, - subject_configuration: EntityStatement = None, - required_trust_marks: list = [], + subject_configuration: EntityStatement | None = None, + required_trust_marks: list[dict] = [], # TODO - prefetch cache? # pre_fetched_entity_configurations = {}, # pre_fetched_statements = {}, # **kwargs, ) -> None: + """ + Initialized a TrustChainBuilder istance + + :parameter subject: represents the subject url (leaf) of the Trust Chain + :type subject: str + :parameter trust_anchor: represents the issuer url (leaf) of the Trust Chain + :type trust_anchor: str + :param httpc_params: parameters needed to perform http requests + :type httpc_params: dict + :param trust_anchor_configuration: is the entity statment configuration of Trust Anchor. + The assigned value can be an EntityStatement, a str or None. + If the value is a string it will be converted in an EntityStatement istance. + If the value is None it will be retrieved from an http request on the trust_anchor field. + :parameter max_authority_hints: the number of how many authority_hints to follow on each hop + :type max_authority_hints: int + :parameter subject_configuration: the configuration of subject + :type subject_configuration: EntityStatement + :parameter required_trust_marks: means all the trust marks needed to start a metadata discovery + at least one of the required trust marks is needed to start a metadata discovery + if this param if absent the filter won't be considered. + :type required_trust_marks: list[dict] + + """ self.subject = subject self.subject_configuration = subject_configuration @@ -64,7 +80,9 @@ def __init__( trust_anchor_configuration = EntityStatement( jwts[0], httpc_params=self.httpc_params ) - trust_anchor_configuration.subject_configuration.validate_by_itself() + + subject_configuration.update_trust_anchor_conf(trust_anchor_configuration) + subject_configuration.validate_by_itself() except Exception as e: _msg = f"Entity Configuration for {self.trust_anchor} failed: {e}" logger.error(_msg) @@ -95,8 +113,10 @@ def __init__( def apply_metadata_policy(self) -> dict: """ filters the trust path from subject to trust anchor - apply the metadata policies along the path and - returns the final metadata + apply the metadata policies along the path. + + :returns: the final metadata with policy applied + :rtype: dict """ # find the path of trust if not self.trust_path: @@ -155,23 +175,26 @@ def apply_metadata_policy(self) -> dict: ) # set exp - self.set_exp() + self._set_exp() return self.final_metadata - @property - def exp_datetime(self) -> datetime.datetime: - if self.exp: # pragma: no cover - return datetime_from_timestamp(self.exp) - - def set_exp(self) -> int: + def _set_exp(self) -> None: + """ + updates the internal exp field with the nearest + expiraton date found in the trust_path field + """ exps = [i.payload["exp"] for i in self.trust_path] if exps: self.exp = min(exps) def discovery(self) -> bool: """ - return a chain of verified statements - from the lower up to the trust anchor + discovers the chain of verified statements + from the lower up to the trust anchor and updates + the internal representation of chain. + + :returns: the validity status of the updated chain + :rtype: bool """ logger.info( f"Starting a Walk into Metadata Discovery for {self.subject}") @@ -225,6 +248,10 @@ def discovery(self) -> bool: return self.is_valid def get_trust_anchor_configuration(self) -> None: + """ + Download and updates the internal field trust_anchor_configuration + with the entity statement of trust anchor. + """ if not isinstance(self.trust_anchor, EntityStatement): logger.info( f"Get Trust Anchor Entity Configuration for {self.subject}") @@ -245,8 +272,11 @@ def get_trust_anchor_configuration(self) -> None: self._set_max_path_len() - def _set_max_path_len(self): - + def _set_max_path_len(self) -> None: + """ + Sets the internal field max_path_len with the costraint + found in trust anchor payload + """ if self.trust_anchor_configuration.payload.get("constraints", {}).get( "max_path_length" ): @@ -257,6 +287,12 @@ def _set_max_path_len(self): ) def get_subject_configuration(self) -> None: + """ + Download and updates the internal field subject_configuration + with the entity statement of leaf. + + :rtype: None + """ if not self.subject_configuration: try: jwts = get_entity_configurations( @@ -289,10 +325,22 @@ def get_subject_configuration(self) -> None: else: self.verified_trust_marks.extend(sc.verified_trust_marks) - def serialize(self): + def serialize(self) -> str: + """ + Serializes the chain in JSON format. + + :returns: the serialized chain in JSON format + :rtype: str + """ return json.dumps(self.get_trust_chain()) - def get_trust_chain(self): + def get_trust_chain(self) -> list[str]: + """ + Retrieves the leaf and the Trust Anchor entity configurations. + + :returns: the list containing the ECs + :rtype: list[str] + """ res = [] # we keep just the leaf's and TA's EC, all the intermediates EC will be dropped ta_ec: str = "" @@ -312,6 +360,13 @@ def get_trust_chain(self): return res def start(self): + """ + Retrieves the subject (leaf) configuration and starts + chain discovery. + + :returns: the list containing the ECs + :rtype: list[str] + """ try: # self.get_trust_anchor_configuration() self.get_subject_configuration() @@ -320,3 +375,9 @@ def start(self): self.is_valid = False logger.error(f"{e}") raise e + + @property + def exp_datetime(self) -> datetime.datetime: + """The exp filed converted in datetime format""" + if self.exp: # pragma: no cover + return datetime_from_timestamp(self.exp) \ No newline at end of file diff --git a/pyeudiw/federation/trust_chain_validator.py b/pyeudiw/federation/trust_chain_validator.py index eef834fc..9c00c53a 100644 --- a/pyeudiw/federation/trust_chain_validator.py +++ b/pyeudiw/federation/trust_chain_validator.py @@ -3,6 +3,7 @@ from pyeudiw.jwt import JWSHelper from pyeudiw.jwt.utils import unpad_jwt_payload, unpad_jwt_header from pyeudiw.federation import is_es +from pyeudiw.federation.policy import TrustChainPolicy from pyeudiw.federation.statements import ( get_entity_configurations, get_entity_statements @@ -17,7 +18,18 @@ logger = logging.getLogger(__name__) -def find_jwk(kid: str, jwks: list) -> dict: +def find_jwk(kid: str, jwks: list[dict]) -> dict: + """ + Find the JWK with the indicated kid in the jwks list. + + :param kid: the identifier of the jwk + :type kid: str + :param jwks: the list of jwks + :type jwks: list[dict] + + :returns: the jwk with the indicated kid or an empty dict if no jwk is found + :rtype: dict + """ if not kid: return {} for jwk in jwks: @@ -27,13 +39,25 @@ def find_jwk(kid: str, jwks: list) -> dict: class StaticTrustChainValidator: + """Helper class for Static Trust Chain validation""" def __init__( self, - static_trust_chain: list, - trust_anchor_jwks: list, + static_trust_chain: list[str], + trust_anchor_jwks: list[dict], httpc_params: dict, **kwargs, ) -> None: + + """ + Generates a new StaticTrustChainValidator istance + + :param static_trust_chain: the list of JWTs, containing the EC, componing the static tust chain + :type static_trust_chain: list[str] + :param trust_anchor_jwks: the list of trust anchor jwks + :type trust_anchor_jwks: list[dict] + :param httpc_params: parameters to perform http requests + :type httpc_params: dict + """ self.static_trust_chain = static_trust_chain self.updated_trust_chain = [] @@ -51,9 +75,45 @@ def __init__( setattr(self, k, v) def _check_expired(self, exp: int) -> bool: + """ + Checks if exp value is expired. + + :param exp: an integer that represent the timestemp to check + :type exp: int + :returns: True if exp is expired and False otherwise + :rtype: bool + """ + return exp < iat_now() + + def _validate_exp(self, exp: int) -> None: + """ + Checks if exp value is expired. + + :param exp: an integer that represent the timestemp to check + :type exp: int + + :raises TimeValidationError: if exp value is expired + """ + + if not self._check_expired(exp): + raise TimeValidationError( + "Expired validation error" + ) + + def _validate_keys(self, fed_jwks: list[dict], st_header: dict) -> None: + """ + Checks that the kid in st_header match with one JWK present + in the federation JWKs list. + + :param fed_jwks: the list of federation's JWKs + :type fed_jwks: list[dict] + :param st_header: the statement header + :type st_header: dict + + :raises KeyValidationError: if no JWK with the kid specified in feild st_header is found + """ - def _validate_keys(self, fed_jwks: list[str], st_header: dict) -> None: current_kid = st_header["kid"] validation_kid = None @@ -65,17 +125,14 @@ def _validate_keys(self, fed_jwks: list[str], st_header: dict) -> None: if not validation_kid: raise KeyValidationError(f"Kid {current_kid} not found") - def _validate_single(self, fed_jwks: list[str], header: dict, payload: dict) -> bool: - try: - self._validate_keys(fed_jwks, header) - self._validate_exp(payload["exp"]) - except Exception as e: - logger.warning(f"Warning: {e}") - return False - - return True - def validate(self) -> bool: + """ + Validates the static chain checking the validity in all jwt inside the field trust_chain. + + :returns: True if static chain is valid and False otherwise + :rtype: bool + """ + # start from the last entity statement rev_tc = [ i for i in reversed(self.trust_chain) @@ -104,9 +161,7 @@ def validate(self) -> bool: self.exp = es_payload["exp"] if self._check_expired(self.exp): - raise TimeValidationError( - "Expired validation error" - ) + return False fed_jwks = es_payload["jwks"]["keys"] @@ -133,11 +188,16 @@ def validate(self) -> bool: return True - @property - def is_valid(self) -> bool: - return self.validate() - def _retrieve_ec(self, iss: str) -> str: + """ + Retrieves the Entity configuration from an on-line source. + + :param iss: The issuer url where retrieve the entity configuration. + :type iss: str + + :returns: the entity configuration in form of JWT. + :rtype: str + """ jwt = get_entity_configurations(iss, self.httpc_params) if not jwt: raise HttpError( @@ -147,6 +207,17 @@ def _retrieve_ec(self, iss: str) -> str: return jwt[0] def _retrieve_es(self, download_url: str, iss: str) -> str: + """ + Retrieves the Entity Statement from an on-line source. + + :param download_url: The path where retrieve the entity configuration. + :type download_url: str + :param iss: The issuer url. + :type iss: str + + :returns: the entity statement in form of JWT. + :rtype: str + """ jwt = get_entity_statements(download_url, self.httpc_params) if not jwt: logger.warning( @@ -157,6 +228,15 @@ def _retrieve_es(self, download_url: str, iss: str) -> str: return jwt def _update_st(self, st: str) -> str: + """ + Updates the statement retrieving the new one using the source end_point and the sub fields of st payload. + + :param st: The statement in form of a JWT. + :type st: str + + :returns: the entity statement in form of JWT. + :rtype: str + """ payload = unpad_jwt_payload(st) iss = payload['iss'] if not is_es(payload): @@ -190,10 +270,22 @@ def _update_st(self, st: str) -> str: return jwt def set_exp(self, exp: int) -> None: + """ + Updates the self.exp field if the exp parameter is more recent than the previous one. + + :param exp: an integer that represent the timestemp to check + :type exp: int + """ if not self.exp or self.exp > exp: self.exp = exp def update(self) -> bool: + """ + Updates the statement retrieving and the exp filed and determines the validity of it. + + :returns: True if the updated chain is valid, False otherwise. + :rtype: bool + """ self.exp = 0 for st in self.static_trust_chain: jwt = self._update_st(st) @@ -204,29 +296,38 @@ def update(self) -> bool: self.updated_trust_chain.append(jwt) return self.is_valid + + @property + def is_valid(self) -> bool: + """Get the validity of chain.""" + return self.validate() @property def trust_chain(self) -> list[str]: + """Get the list of the jwt that compones the trust chain.""" return self.updated_trust_chain or self.static_trust_chain @property def is_expired(self) -> int: + """Get the status of chain expiration.""" return self._check_expired(self.exp) @property def entity_id(self) -> str: + """Get the chain's entity_id.""" chain = self.trust_chain payload = unpad_jwt_payload(chain[0]) return payload["iss"] @property def final_metadata(self) -> dict: - anchor = self.trust_anchor_jwks[-1] + """Apply the metadata and returns the final metadata.""" + anchor = self.static_trust_chain[-1] es_anchor_payload = unpad_jwt_payload(anchor) policy = es_anchor_payload.get("metadata_policy", {}) - leaf = self.trust_anchor_jwks[0] + leaf = self.static_trust_chain[0] es_leaf_payload = unpad_jwt_payload(leaf) - #return TrustChainPolicy().apply_policy(es_leaf_payload["metadata"], policy) + return TrustChainPolicy().apply_policy(es_leaf_payload["metadata"], policy) diff --git a/pyeudiw/tools/utils.py b/pyeudiw/tools/utils.py index fcd9d1a2..895dab69 100644 --- a/pyeudiw/tools/utils.py +++ b/pyeudiw/tools/utils.py @@ -1,7 +1,11 @@ import datetime import json import logging +import asyncio +import requests + from secrets import token_hex +from pyeudiw.federation.http_client import http_get logger = logging.getLogger(__name__) @@ -26,10 +30,31 @@ def datetime_from_timestamp(value) -> datetime.datetime: return make_timezone_aware(datetime.datetime.fromtimestamp(value)) -def get_http_url(url: str): - raise NotImplementedError( - f"{__name__} get_http_url is not implemented, please see federation.statements" - ) +def get_http_url(urls: list[str] | str, httpc_params: dict, http_async: bool = True) -> list[dict]: + """ + Perform an HTTP Request returning the payload of the call. + + :param urls: The url or a list of url where perform the GET HTTP calls + :type urls: list[str] | str + :param httpc_params: parameters to perform http requests. + :type httpc_params: dict + :param http_async: if is set to True the operation will be performed in async (deafault True) + :type http_async: bool + + :returns: A list of responses. + :rtype: list[dict] + """ + urls = urls if isinstance(urls, list) else [urls] + + if http_async: + responses = asyncio.run( + http_get(urls, httpc_params)) # pragma: no cover + else: + responses = [] + for i in urls: + res = requests.get(i, **httpc_params) # nosec - B113 + responses.append(res.content) + return responses def get_jwks(httpc_params: dict, metadata: dict, federation_jwks: list = []) -> dict: