-
Notifications
You must be signed in to change notification settings - Fork 14
/
trust.py
211 lines (179 loc) · 7.21 KB
/
trust.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import json
from satosa.context import Context
from satosa.response import Response
from pyeudiw.jwk import JWK
from pyeudiw.jwt import JWSHelper
from pyeudiw.jwt.utils import decode_jwt_header
from pyeudiw.satosa.exceptions import (DiscoveryFailedError,
NotTrustedFederationError)
from pyeudiw.storage.exceptions import EntryNotFound
from pyeudiw.tools.base_logger import BaseLogger
from pyeudiw.tools.utils import exp_from_now, iat_now
from pyeudiw.trust import TrustEvaluationHelper
from pyeudiw.trust.trust_anchors import update_trust_anchors_ecs
class BackendTrust(BaseLogger):
"""
Backend Trust class.
"""
def init_trust_resources(self) -> None:
"""
Initializes the trust resources.
"""
# TODO: adapt method to init ALL types of trust resources (if configured)
# private keys by kid
self.federations_jwks_by_kids = {
i['kid']: i for i in self.config['trust']['federation']['config']['federation_jwks']
}
# dumps public jwks
self.federation_public_jwks = [
JWK(i).public_key for i in self.config['trust']['federation']['config']['federation_jwks']
]
# we close the connection in this constructor since it must be fork safe and
# get reinitialized later on, within each fork
self.update_trust_anchors()
try:
self.get_backend_trust_chain()
except Exception as e:
self._log_critical(
"Backend Trust",
f"Cannot fetch the trust anchor configuration: {e}"
)
self.db_engine.close()
self._db_engine = None
def entity_configuration_endpoint(self, context: Context) -> Response:
"""
Entity Configuration endpoint.
:param context: The current context
:type context: Context
:return: The entity configuration
:rtype: Response
"""
if context.qs_params.get('format', '') == 'json':
return Response(
json.dumps(self.entity_configuration_as_dict),
status="200",
content="application/json"
)
return Response(
self.entity_configuration,
status="200",
content="application/entity-statement+jwt"
)
def update_trust_anchors(self):
"""
Updates the trust anchors of current instance.
"""
tas = self.config['trust']['federation']['config']['trust_anchors']
self._log_info("Trust Anchors updates", f"Trying to update: {tas}")
for ta in tas:
try:
update_trust_anchors_ecs(
db=self.db_engine,
trust_anchors=[ta],
httpc_params=self.config['network']['httpc_params']
)
except Exception as e:
self._log_warning("Trust Anchor updates",
f"{ta} update failed: {e}")
self._log_info("Trust Anchor updates", f"{ta} updated")
def get_backend_trust_chain(self) -> list[str]:
"""
Get the backend trust chain. In case something raises an Exception (e.g. faulty storage), logs a warning message
and returns an empty list.
:return: The trust chain
:rtype: list
"""
try:
trust_evaluation_helper = TrustEvaluationHelper.build_trust_chain_for_entity_id(
storage=self.db_engine,
entity_id=self.client_id,
entity_configuration=self.entity_configuration,
httpc_params=self.config['network']['httpc_params']
)
self.db_engine.add_or_update_trust_attestation(
entity_id=self.client_id,
attestation=trust_evaluation_helper.trust_chain,
exp=trust_evaluation_helper.exp
)
return trust_evaluation_helper.trust_chain
except (DiscoveryFailedError, EntryNotFound, Exception) as e:
message = (
f"Error while building trust chain for client with id: {self.client_id}. "
f"{e.__class__.__name__}: {e}"
)
self._log_warning("Trust Chain", message)
return []
def _validate_trust(self, context: Context, jws: str) -> TrustEvaluationHelper:
"""
Validates the trust of the given jws.
:param context: the request context
:type context: satosa.context.Context
:param jws: the jws to validate
:type jws: str
:raises: NotTrustedFederationError: raises an error if the trust evaluation fails.
:return: the trust evaluation helper
:rtype: TrustEvaluationHelper
"""
self._log_debug(context, "[TRUST EVALUATION] evaluating trust.")
headers = decode_jwt_header(jws)
trust_eval = TrustEvaluationHelper(
self.db_engine,
httpc_params=self.config['network']['httpc_params'],
**headers
)
try:
trust_eval.evaluation_method()
except EntryNotFound:
message = (
"[TRUST EVALUATION] not found for "
f"{trust_eval.entity_id}"
)
self._log_error(context, message)
raise NotTrustedFederationError(
f"{trust_eval.entity_id} not found for Trust evaluation."
)
except Exception as e:
message = (
"[TRUST EVALUATION] failed for "
f"{trust_eval.entity_id}: {e}"
)
self._log_error(context, message)
raise NotTrustedFederationError(
f"{trust_eval.entity_id} is not trusted."
)
return trust_eval
@property
def default_federation_private_jwk(self) -> dict:
"""Returns the default federation private jwk."""
return tuple(self.federations_jwks_by_kids.values())[0]
@property
def entity_configuration_as_dict(self) -> dict:
"""Returns the entity configuration as a dictionary."""
ec_payload = {
"exp": exp_from_now(minutes=self.default_exp),
"iat": iat_now(),
"iss": self.client_id,
"sub": self.client_id,
"jwks": {
"keys": self.federation_public_jwks
},
"metadata": {
self.config['trust']['federation']['config']["metadata_type"]: self.config['metadata'],
"federation_entity": self.config['trust']['federation']['config']['federation_entity_metadata']
},
"authority_hints": self.config['trust']['federation']['config']['authority_hints']
}
return ec_payload
@property
def entity_configuration(self) -> dict:
"""Returns the entity configuration as a JWT."""
data = self.entity_configuration_as_dict
jwshelper = JWSHelper(self.default_federation_private_jwk)
return jwshelper.sign(
protected={
"alg": self.config['trust']['federation']['config']["default_sig_alg"],
"kid": self.default_federation_private_jwk["kid"],
"typ": "entity-statement+jwt"
},
plain_dict=data
)