Skip to content

Commit

Permalink
merged pasquale branch
Browse files Browse the repository at this point in the history
  • Loading branch information
peppelinux committed Nov 14, 2024
1 parent 6bcfaa3 commit 3af829f
Show file tree
Hide file tree
Showing 43 changed files with 1,431 additions and 547 deletions.
71 changes: 39 additions & 32 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,37 +65,44 @@ jobs:
python -m pip install "Pillow>=10.0.0,<10.1" "device_detector>=5.0,<6" "satosa>=8.4,<8.6" "jinja2>=3.0,<4" "pymongo>=4.4.1,<4.5" aiohttp
python -m pip install git+https://github.com/peppelinux/pyMDOC-CBOR.git
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 pyeudiw --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 pyeudiw --count --exit-zero --statistics --max-line-length 160
- name: Tests
run: |
# pytest --cov=pyeudiw --cov-fail-under=90 pyeudiw
pytest --cov=pyeudiw pyeudiw
coverage report -m --skip-covered
- name: Bandit Security Scan
run: |
bandit -r -x pyeudiw/tests* pyeudiw/*
- name: Lint with html linter
run: |
echo -e '\nHTML:'
readarray -d '' array < <(find $SRC example -name "*.html" -print0)
echo "Running linter on (${#array[@]}): "
printf '\t- %s\n' "${array[@]}"
echo "Linter output:"
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 pyeudiw --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 pyeudiw --count --exit-zero --statistics --max-line-length 160
- name: Tests
run: |
# pytest --cov=pyeudiw --cov-fail-under=90 pyeudiw
pytest --cov=pyeudiw pyeudiw
coverage report -m --skip-covered
# - name: Integration Tests
# run: |
# sudo apt-get install xmlsec1
# cd example/satosa/integration_test/
# python -m pip install -r requirements_test.txt
# python cross_device_integration_test.py
# python same_device_integration_test.py
- name: Bandit Security Scan
run: |
bandit -r -x pyeudiw/tests* pyeudiw/*
- name: Lint with html linter
run: |
echo -e '\nHTML:'
readarray -d '' array < <(find $SRC example -name "*.html" -print0)
echo "Running linter on (${#array[@]}): "
printf '\t- %s\n' "${array[@]}"
echo "Linter output:"
for file in "${array[@]}"
do
echo -e "\n$file:"
html_lint.py "$file" | awk -v path="file://$PWD/$file:" '$0=path$0' | sed -e 's/: /:\n\t/';
done
for file in "${array[@]}"
do
echo -e "\n$file:"
html_lint.py "$file" | awk -v path="file://$PWD/$file:" '$0=path$0' | sed -e 's/: /:\n\t/';
done
# block if the html linter fails
#for file in "${array[@]}"
#do
#errors=$(html_lint.py "$file" | grep -c 'Error')
#if [ "$errors" -gt 0 ]; then exit 1; fi;
#done
# block if the html linter fails
#for file in "${array[@]}"
#do
#errors=$(html_lint.py "$file" | grep -c 'Error')
#if [ "$errors" -gt 0 ]; then exit 1; fi;
#done
13 changes: 11 additions & 2 deletions pyeudiw/jwk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ def as_dict(self) -> dict:
:rtype: dict
"""
return self.jwk

def as_public_dict(self) -> dict:
"""
Returns the public key in format of dict.
:returns: The public key in form of dict.
:rtype: dict
"""
return self.public_key

def __repr__(self):
# private part!
Expand Down Expand Up @@ -173,8 +182,8 @@ def find_jwk_by_kid(kid: str, jwks: list[dict], as_dict: bool = True) -> dict |
if not kid:
raise InvalidKid("Kid cannot be empty")
for jwk in jwks:
valid_jwk = jwk.get("kid", None)
if valid_jwk and kid == valid_jwk:
jwk_kid = jwk.get("kid", None)
if jwk_kid and kid == jwk_kid:
return jwk if as_dict else JWK(jwk)

raise KidNotFoundError(f"Key with Kid {kid} not found")
66 changes: 0 additions & 66 deletions pyeudiw/jwk/schema.py

This file was deleted.

3 changes: 3 additions & 0 deletions pyeudiw/jwt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ class JWSVerificationError(Exception):

class JWEEncryptionError(Exception):
pass

class JWTDecodeError(Exception):
pass
13 changes: 6 additions & 7 deletions pyeudiw/jwt/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from pyeudiw.jwt.utils import is_jwt_format
from pyeudiw.x509.verify import get_public_key_from_x509_chain
from pyeudiw.jwt.utils import decode_jwt_header, decode_jwt_payload

KeyIdentifier_T = str

Expand Down Expand Up @@ -48,18 +49,16 @@ def unsafe_parse_jws(token: str) -> DecodedJwt:
"""
if not is_jwt_format(token):
raise ValueError(f"unable to parse {token}: not a jwt")
b64header, b64payload, signature, *_ = token.split(".")
head = {}
payload = {}

try:
head = _unsafe_decode_part(b64header)
payload = _unsafe_decode_part(b64payload)
head = decode_jwt_header(token)
payload = decode_jwt_payload(token)
signature = token.split(".")[2]
except Exception as e:
raise ValueError(f"unable to decode JWS part: {e}")
return DecodedJwt(token, head, payload, signature=signature)



def extract_key_identifier(token_header: dict) -> ECKey | RSAKey | OKPKey | SYMKey | dict | KeyIdentifier_T:
"""
Extracts the key identifier from the JWT header.
Expand All @@ -69,7 +68,7 @@ def extract_key_identifier(token_header: dict) -> ECKey | RSAKey | OKPKey | SYM
if "kid" in token_header.keys():
return KeyIdentifier_T(token_header["kid"])
if "trust_chain" in token_header.keys():
return get_public_key_from_trust_chain(token_header["kid"])
return get_public_key_from_trust_chain(token_header["trust_chain"])
if "x5c" in token_header.keys():
return get_public_key_from_x509_chain(token_header["x5c"])
raise ValueError(f"unable to infer identifying key from token head: searched among keys {token_header.keys()}")
68 changes: 20 additions & 48 deletions pyeudiw/jwt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Dict

from pyeudiw.jwk import find_jwk_by_kid
from pyeudiw.jwt.exceptions import JWTInvalidElementPosition
from pyeudiw.jwt.exceptions import JWTInvalidElementPosition, JWTDecodeError

# jwt regexp pattern is non terminating, hence it match jwt, sd-jwt and sd-jwt with kb
JWT_REGEXP = r'^[_\w\-]+\.[_\w\-]+\.[_\w\-]+'
Expand All @@ -24,17 +24,29 @@ def decode_jwt_element(jwt: str, position: int) -> dict:
:returns: a dict with the content of the decoded section.
:rtype: dict
"""
if position > 1 or position < 0:
if position < 0:
raise JWTInvalidElementPosition(
f"Cannot accept negative position {position}")

if position > 2:
raise JWTInvalidElementPosition(
f"Cannot accept position greater than 2 {position}")

splitted_jwt = jwt.split(".")

if (len(splitted_jwt) - 1) < position:
raise JWTInvalidElementPosition(
f"JWT has no element in position {position}")

if isinstance(jwt, bytes):
jwt = jwt.decode()
try:
if isinstance(jwt, bytes):
jwt = jwt.decode()

b = jwt.split(".")[position]
padded = f"{b}{'=' * divmod(len(b), 4)[1]}"
data = json.loads(base64.urlsafe_b64decode(padded))
return data
b64_data = jwt.split(".")[position]
data = json.loads(base64_urldecode(b64_data))
return data
except Exception as e:
raise JWTDecodeError(f"Unable to decode JWT element: {e}")


def decode_jwt_header(jwt: str) -> dict:
Expand Down Expand Up @@ -63,30 +75,6 @@ def decode_jwt_payload(jwt: str) -> dict:
return decode_jwt_element(jwt, position=1)


def get_jwk_from_jwt(jwt: str, provider_jwks: Dict[str, dict]) -> dict:
"""
Find the JWK inside the provider JWKs with the kid
specified in jwt header.
:param jwt: a string that represents the jwt.
:type jwt: str
:param provider_jwks: a dictionary that contains one or more JWKs with the KID as the key.
:type provider_jwks: Dict[str, dict]
:raises InvalidKid: if kid is None.
:raises KidNotFoundError: if kid is not in jwks list.
:returns: the jwk as dict.
:rtype: dict
"""
head = decode_jwt_header(jwt)
kid = head["kid"]
if isinstance(provider_jwks, dict) and provider_jwks.get('keys'):
provider_jwks = provider_jwks['keys']

return find_jwk_by_kid(kid, provider_jwks)


def is_jwt_format(jwt: str) -> bool:
"""
Check if a string is in JWT format.
Expand Down Expand Up @@ -124,22 +112,6 @@ def is_jwe_format(jwt: str):
return True


def is_jws_format(jwt: str):
"""
Check if a string is in JWS format.
:param jwt: a string that represents the jwt.
:type jwt: str
:returns: True if the string is a JWS, False otherwise.
:rtype: bool
"""
if not is_jwt_format(jwt):
return False

return not is_jwe_format(jwt)


def base64_urlencode(v: bytes) -> str:
"""Urlsafe base64 encoding without padding symbols
Expand Down
21 changes: 14 additions & 7 deletions pyeudiw/jwt/verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from cryptojwt.jwk import JWK


def verify_jws_with_key(jws: str, key: JWK) -> None:
"""
:raises JWSVerificationError: is signature verification fails for *any* reason
Expand All @@ -17,16 +16,24 @@ def verify_jws_with_key(jws: str, key: JWK) -> None:
except Exception as e:
raise JWSVerificationError(f"error during signature verification: {e}", e)

def is_jwt_expired(token: str) -> bool:
"""
Check if a jwt is expired.
:param token: a string that represents the jwt.
:type token: str
:returns: True if the token is expired, False otherwise.
:rtype: bool
"""

token_payload = decode_jwt_payload(token)

def is_payload_expired(token_payload: dict) -> bool:
exp = token_payload.get("exp", None)
if not exp:
return True
if exp < iat_now():
elif exp < iat_now():
return True
return False



def is_jwt_expired(token: str) -> bool:
payalod = decode_jwt_payload(token)
return is_payload_expired(payalod)
5 changes: 3 additions & 2 deletions pyeudiw/openid4vp/vp_sd_jwt_vc.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def verify_challenge(self) -> None:
challenge : VerifierChallenge = {}
challenge["aud"] = self.verifier_id
challenge["nonce"] = self.verifier_nonce

try:
self.sdjwt.verify_holder_kb_jwt(challenge)
except (UnsupportedSdAlg, InvalidKeyBinding):
raise InvalidVPKeyBinding
except (UnsupportedSdAlg, InvalidKeyBinding) as e:
raise InvalidVPKeyBinding(f"{e}")
4 changes: 2 additions & 2 deletions pyeudiw/satosa/default/openid4vp_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pyeudiw.storage.exceptions import StorageWriteError
from pyeudiw.tools.mobile import is_smartphone
from pyeudiw.tools.utils import iat_now
from pyeudiw.trust.dynamic import CombinedTrustEvaluator, dynamic_trust_evaluators_loader
from pyeudiw.trust.dynamic import CombinedTrustEvaluator

from ..interfaces.openid4vp_backend import OpenID4VPBackendInterface

Expand Down Expand Up @@ -95,7 +95,7 @@ def __init__(

self.response_code_helper = ResponseCodeSource(self.config["response_code"]["sym_key"])
trust_configuration = self.config.get("trust", {})
self.trust_evaluator = CombinedTrustEvaluator(dynamic_trust_evaluators_loader(trust_configuration), self.db_engine)
self.trust_evaluator = CombinedTrustEvaluator.from_config(trust_configuration, self.db_engine)
self.init_trust_resources() # Questo carica risorse, metadata endpoint (sotto formate di attributi con pattern *_endpoint) etc, che satosa deve pubblicare

def register_endpoints(self) -> list[tuple[str, Callable[[Context], Response]]]:
Expand Down
Loading

0 comments on commit 3af829f

Please sign in to comment.