-
Notifications
You must be signed in to change notification settings - Fork 28
/
policy.py
355 lines (297 loc) · 11.8 KB
/
policy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
__author__ = "Roland Hedberg"
__license__ = "Apache 2.0"
__version__ = ""
import logging
logger = logging.getLogger(__name__)
class PolicyError(Exception):
pass
def combine_subset_of(s1, s2): # pragma: no cover
return list(set(s1).intersection(set(s2)))
def combine_superset_of(s1, s2): # pragma: no cover
return list(set(s1).intersection(set(s2)))
def combine_one_of(s1, s2): # pragma: no cover
return list(set(s1).intersection(set(s2)))
def combine_add(s1, s2): # pragma: no cover
if isinstance(s1, list):
set1 = set(s1)
else:
set1 = {s1}
if isinstance(s2, list):
set2 = set(s2)
else:
set2 = {s2}
return list(set1.union(set2))
POLICY_FUNCTIONS = {
"subset_of",
"superset_of",
"one_of",
"add",
"value",
"default",
"essential",
}
OP2FUNC = {
"subset_of": combine_subset_of,
"superset_of": combine_superset_of,
"one_of": combine_one_of,
"add": combine_add,
}
def do_sub_one_super_add(superior, child, policy): # pragma: no cover
if policy in superior and policy in child:
comb = OP2FUNC[policy](superior[policy], child[policy])
if comb:
return comb
else:
raise PolicyError("Value sets doesn't overlap")
elif policy in superior:
return superior[policy]
elif policy in child:
return child[policy]
def do_value(superior, child, policy): # pragma: no cover
if policy in superior and policy in child:
if superior[policy] == child[policy]:
return superior[policy]
else:
raise PolicyError("Not allowed to combine values")
elif policy in superior:
return superior[policy]
elif policy in child:
return child[policy]
def do_default(superior, child, policy): # pragma: no cover
# A child's default can not override a superiors
if policy in superior and policy in child:
if superior["default"] == child["default"]:
return superior["default"]
else:
raise PolicyError("Not allowed to change default")
elif policy in superior:
return superior[policy]
elif policy in child:
return child[policy]
def do_essential(superior, child, policy): # pragma: no cover
# essential: an child can make it True if a superior has states False
# but not the other way around
if policy in superior and policy in child:
if not superior[policy] and child["essential"]:
return True
else:
return superior[policy]
elif policy in superior:
return superior[policy]
elif policy in child: # Not in superior is the same as essential=True
return True
DO_POLICY = {
"superset_of": do_sub_one_super_add,
"subset_of": do_sub_one_super_add,
"one_of": do_sub_one_super_add,
"add": do_sub_one_super_add,
"value": do_value,
"default": do_default,
"essential": do_essential,
}
def combine_claim_policy(superior, child): # pragma: no cover
"""
Combine policy rules.
Applying the child policy can only make the combined policy more restrictive.
:param superior: Superior policy
:param child: Intermediates policy
"""
# weed out everything I don't recognize
superior_set = set(superior).intersection(POLICY_FUNCTIONS)
child_set = set(child).intersection(POLICY_FUNCTIONS)
if "value" in superior_set: # An exact value can not be restricted.
if child_set:
if "essential" in child_set:
if len(child_set) == 1:
return {"value": superior["value"], "essential": child["essential"]}
else:
raise PolicyError(
"value can only be combined with essential, not {}".format(
child_set
)
)
elif "value" in child_set:
if child["value"] != superior["value"]: # Not OK
raise PolicyError("Child can not set another value then superior")
else:
return superior
else:
raise PolicyError(
"Not allowed combination of policies: {} + {}".format(
superior, child
)
)
return superior
else:
if "essential" in superior_set and "essential" in child_set:
# can only go from False to True
if (
superior["essential"] != child["essential"]
and child["essential"] is False
):
raise PolicyError("Essential can not go from True to False")
comb_policy = superior_set.union(child_set)
if "one_of" in comb_policy:
if "subset_of" in comb_policy or "superset_of" in comb_policy:
raise PolicyError(
"one_of can not be combined with subset_of/superset_of"
)
rule = {}
for policy in comb_policy:
rule[policy] = DO_POLICY[policy](superior, child, policy)
if comb_policy == {"superset_of", "subset_of"}:
# make sure the subset_of is a superset of superset_of.
if set(rule["superset_of"]).difference(set(rule["subset_of"])):
raise PolicyError("superset_of not a super set of subset_of")
elif comb_policy == {"superset_of", "subset_of", "default"}:
# make sure the subset_of is a superset of superset_of.
if set(rule["superset_of"]).difference(set(rule["subset_of"])):
raise PolicyError("superset_of not a super set of subset_of")
if set(rule["default"]).difference(set(rule["subset_of"])):
raise PolicyError("default not a sub set of subset_of")
if set(rule["superset_of"]).difference(set(rule["default"])):
raise PolicyError("default not a super set of subset_of")
elif comb_policy == {"subset_of", "default"}:
if set(rule["default"]).difference(set(rule["subset_of"])):
raise PolicyError("default not a sub set of subset_of")
elif comb_policy == {"superset_of", "default"}:
if set(rule["superset_of"]).difference(set(rule["default"])):
raise PolicyError("default not a super set of subset_of")
elif comb_policy == {"one_of", "default"}:
if isinstance(rule["default"], list):
if set(rule["default"]).difference(set(rule["one_of"])):
raise PolicyError("default not a super set of one_of")
else:
if {rule["default"]}.difference(set(rule["one_of"])):
raise PolicyError("default not a super set of one_of")
return rule
def combine_policy(superior, child):
res = {}
sup_set = set(superior.keys())
chi_set = set(child.keys())
for claim in set(sup_set).intersection(chi_set):
res[claim] = combine_claim_policy(superior[claim], child[claim])
for claim in sup_set.difference(chi_set):
res[claim] = superior[claim]
for claim in chi_set.difference(sup_set):
res[claim] = child[claim]
return res
def gather_policies(chain, entity_type):
"""
Gather and combine all the metadata policies that are defined in the trust chain
:param chain: A list of Entity Statements
:return: The combined metadata policy
"""
try:
combined_policy = chain[0]["metadata_policy"][entity_type]
except KeyError:
combined_policy = {}
for es in chain[1:]:
try:
child = es["metadata_policy"][entity_type]
except KeyError:
pass
else:
combined_policy = combine_policy(combined_policy, child)
return combined_policy
def union(val1, val2):
if isinstance(val1, list):
base = set(val1)
else:
base = {val1}
if isinstance(val2, list):
ext = set(val2)
else:
ext = {val2}
return base.union(ext)
def apply_policy(metadata, policy):
"""
Apply a metadata policy to a metadata statement.
The order is value, add, default and then the checks subset_of/superset_of and one_of
:param metadata: A metadata statement
:param policy: A metadata policy
:return: A metadata statement that adheres to a metadata policy
"""
metadata_set = set(metadata.keys())
policy_set = set(policy.keys())
# Metadata claims that there exists a policy for
for claim in metadata_set.intersection(policy_set):
if "value" in policy[claim]: # value overrides everything
metadata[claim] = policy[claim]["value"]
else:
if "one_of" in policy[claim]:
# The is for claims that can have only one value
if isinstance(metadata[claim], list): # Should not be but ...
_claim = [
c for c in metadata[claim] if c in policy[claim]["one_of"]
]
if _claim:
metadata[claim] = _claim[0]
else:
raise PolicyError(
"{}: None of {} among {}".format(
claim, metadata[claim], policy[claim]["one_of"]
)
)
else:
if metadata[claim] in policy[claim]["one_of"]:
pass
else:
raise PolicyError(
"{} not among {}".format(
metadata[claim], policy[claim]["one_of"]
)
)
else:
# The following is for claims that can have lists of values
if "add" in policy[claim]:
metadata[claim] = list(union(metadata[claim], policy[claim]["add"]))
if "subset_of" in policy[claim]:
_val = set(policy[claim]["subset_of"]).intersection(
set(metadata[claim])
)
if _val:
metadata[claim] = list(_val)
else:
raise PolicyError(
"{} not subset of {}".format(
metadata[claim], policy[claim]["subset_of"]
)
)
if "superset_of" in policy[claim]:
if set(policy[claim]["superset_of"]).difference(
set(metadata[claim])
):
raise PolicyError(
"{} not superset of {}".format(
metadata[claim], policy[claim]["superset_of"]
)
)
else:
pass
# In policy but not in metadata
for claim in policy_set.difference(metadata_set):
if "value" in policy[claim]:
metadata[claim] = policy[claim]["value"]
elif "add" in policy[claim]:
metadata[claim] = policy[claim]["add"]
elif "default" in policy[claim]:
metadata[claim] = policy[claim]["default"]
if claim not in metadata:
if "essential" in policy[claim] and policy[claim]["essential"]:
raise PolicyError("Essential claim '{}' missing".format(claim))
# All that are in metadata but not in policy should just remain
return metadata
def diff2policy(new, old):
res = {}
for claim in set(new).intersection(set(old)):
if new[claim] == old[claim]:
continue
else:
res[claim] = {"value": new[claim]}
for claim in set(new).difference(set(old)):
if claim in ["contacts"]:
res[claim] = {"add": new[claim]}
else:
res[claim] = {"value": new[claim]}
return res