-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathag_generation.py
326 lines (277 loc) · 16.7 KB
/
ag_generation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
import os
from signatures.mappings import macro_inv, micro, micro2macro, verbose_micro
def _translate(label, root=False):
"""
Translates the node label into a more human-readable version.
@param label: the label of the node (`mcat|mserv` or `mcat|mserv|state_ID`, where `mcat` is a string mapped from the `micro` dictionary, e.g. `DATA_DELIVERY|http|36`)
@param root: whether this node is a root node (will be prepended with 'Victim: <victim_ip>\n')
@return: a new more human-readable version of the label
"""
new_label = ""
parts = label.split("|")
if root:
new_label += 'Victim: ' + str(root) + '\n'
if len(parts) >= 1:
new_label += verbose_micro[parts[0]]
if len(parts) >= 2:
new_label += "\n" + parts[1]
if len(parts) >= 3:
new_label += " | ID: " + parts[2]
return new_label
def _get_objective_nodes(state_sequences, obj_only=False):
"""
Gets the objectives from the state sequences. An objective is defined as `mcat|mserv` (by default),
with `mcat` having a high severity.
@param state_sequences: the state sequences per attacker-victim pair
@param obj_only: whether to use only `mcat` (instead of `mcat|mserv`)
@return: a list of the found objective nodes
"""
objectives = set()
for episodes in state_sequences.values(): # Iterate over all episodes and collect the objective nodes
for epi in episodes:
if len(str(epi[2])) == 3: # If high-severity, then include it
mcat = micro[epi[2]].split('.')[1]
if obj_only: # Experiment: only mcat or mcat + mserv?
vert_name = mcat
else:
vert_name = mcat + '|' + epi[4]
objectives.add(vert_name)
return list(objectives)
def _is_severe_sink(vname, sev_sinks):
"""
Checks whether a node is a severe sink (i.e. a sink that has a medium or high severity).
@param vname: the name of the node (for low-severity nodes: `mcat|mserv`, for other nodes `mcat|mserv|stateID`)
@param sev_sinks: the list of IDs of the severe sinks
@return: True if the node is a severe sink, False otherwise
"""
for sink in sev_sinks:
if vname.split("|")[-1] == sink:
return True
return False
def _make_vertex_info(episode, in_main_model):
"""
Creates a tuple with the information about the node corresponding to the given episode.
The format of the node information: (vert_name, start_time, end_time, signs, timestamps)
@param episode: the episode corresponding to the current node
@param in_main_model: the state IDs that are present in the main model
@return: the tuple with the node information, as defined above
"""
start_time = round(episode[0] / 1.0)
end_time = round(episode[1] / 1.0)
cat = micro[episode[2]].split('.')[1]
signs = episode[5]
timestamps = episode[6]
if episode[3] in in_main_model: # TODO: this check is useless, since it is always True
state_id = '' if len(str(episode[2])) == 1 else '|' + str(episode[3])
else:
state_id = '|Sink'
vert_name = cat + '|' + episode[4] + state_id
vert_info = (vert_name, start_time, end_time, signs, timestamps)
return vert_info
def _get_attack_attempts(state_sequences, victim, objective, in_main_model):
"""
Gets the attack attempts for a given victim and objective from the given state sequences (per attacker-victim pair).
@param state_sequences: the state sequences per attacker-victim pair
@param victim: the IP of the given victim
@param objective: the given objective (`mcat|mserv`)
@param in_main_model: the state IDs that are present in the main model
@return: a list of the attack attempts and a list of the observed objective variants for a given (victim, objective)
"""
team_attack_attempts = dict()
observed_obj = set() # Variants of current objective
for att, episodes in state_sequences.items(): # Iterate over (a,v): [episode, episode, episode]
if victim != att.split("->")[1]: # If it's not the right victim, then don't process further
continue
vertices = []
for epi in episodes:
vertices.append(_make_vertex_info(epi, in_main_model))
# If the objective is never reached, don't process further
if not sum([True if objective.split("|")[:2] == v[0].split("|")[:2] else False for v in vertices]):
continue
# If it's an episode sequence targeting the requested victim and obtaining the requested objective
attempts = []
sub_attempt = []
for vertex in vertices: # Cut each attempt until the requested objective
sub_attempt.append(vertex) # Add the vertex in path
if objective.split("|")[:2] == vertex[0].split("|")[:2]: # If it's the objective
if len(sub_attempt) <= 1: # If only a single node, reject
sub_attempt = []
continue
attempts.append(sub_attempt)
sub_attempt = []
observed_obj.add(vertex[0])
continue
team_attacker = att.split('->')[0] # Team + attacker
if team_attacker not in team_attack_attempts.keys():
team_attack_attempts[team_attacker] = []
team_attack_attempts[team_attacker].extend(attempts)
# team_attack_attempts[team_attacker] = sorted(team_attack_attempts[team_attacker], key=lambda item: item[1])
return team_attack_attempts, observed_obj
def _print_unique_attempts(team_attacker, attempts):
"""
For a given attacker, print the path info (i.e. total paths, unique paths, the longest and the shortest paths).
@param team_attacker: the IP of the given attacker (prepended with the team ID, e.g. 't0-10.0.254.30')
@param attempts: the attack attempts for the given attacker
"""
paths = [''.join([action[0] for action in attempt]) for attempt in attempts]
unique_paths = len(set(paths)) # Count exactly unique attempts
longest_path = max([len(attempt) for attempt in attempts], default=0)
shortest_path = min([len(attempt) for attempt in attempts], default=0)
print('Attacker {}, total paths: {}, unique paths: {}, longest path: {}, shortest path: {}'
.format(team_attacker, len(attempts), unique_paths, longest_path, shortest_path))
def _create_edge_line(vid, vname1, vname2, ts1, ts2, color, attacker):
"""
Creates a line defining the given edge, which will be written to the dot file for graphviz.
@param vid: the ID of the current node (the first edge will have a different label format)
@param vname1: the name of the first node (tail node)
@param vname2: the name of the second node (head node)
@param ts1: the start and end times of the first episode
@param ts2: the start and end times of the second episode
@param color: the color to be used for this edge
@param attacker: the IP of the attacker for which this edge is created
@return: the line defining the given edge (edge name, i.e. "tail -> head", color, label)
"""
from_last = ts1[1].strftime("%d/%m/%y, %H:%M:%S")
to_first = ts2[0].strftime("%d/%m/%y, %H:%M:%S")
gap = round((ts2[0] - ts1[1]).total_seconds())
edge_name = '"{}" -> "{}"'.format(_translate(vname1), _translate(vname2))
if vid == 0: # First transition, add attacker IP
label = '<<font color="{}"> start_next: {}<br/>gap: {}sec<br/>end_prev: {}</font>'
label += '<br/><font color="{}"><b>Attacker: {}</b></font>>'
label = label.format(color, to_first, gap, from_last, color, attacker)
edge_line = '{} [ color={}] [label={}]'.format(edge_name, color, label)
else:
label = 'start_next: {}\ngap: {}sec\nend_prev: {}'.format(to_first, gap, from_last)
edge_line = '{} [ label="{}"][ fontcolor="{}" color={}]'.format(edge_name, label, color, color)
return edge_line
def _print_simplicity(ag_name, lines):
"""
Computes and prints the number of nodes and edges, and the simplicity for the given attack graph.
@param ag_name: the name of the given attack graph
@param lines: the lines in the dot file for a given attack graph
"""
vertices, edges = 0, 0
for line in lines: # Count vertices and edges
if '->' in line[1]:
edges += 1
elif 'shape=' in line[1]:
vertices += 1
print('{}: # vert: {}, # edges: {}, simplicity: {}'.format(ag_name, vertices, edges, vertices / float(edges)))
# Step 7: Create AGs per victim per objective (14 Nov)
def make_attack_graphs(state_sequences, sev_sinks, datafile, dir_name, save_ag=True):
"""
Creates the attack graphs based on the given state sequences.
@param state_sequences: the previously created state sequences (per attacker-victim pair)
@param sev_sinks: the set of severe sinks (i.e. sinks with a medium or high severity)
@param datafile: the name of the file with the traces (used as a prefix for the file name of the attack graph)
@param dir_name: the name of the directory where the attack graphs should be saved
@param save_ag: whether to save the attack graphs (i.e. create dot, png and svg files with the attack graphs)
"""
tcols = {'t0': 'maroon', 't1': 'orange', 't2': 'darkgreen', 't3': 'blue', 't4': 'magenta',
't5': 'purple', 't6': 'brown', 't7': 'tomato', 't8': 'turquoise', 't9': 'skyblue'}
if save_ag:
try:
os.mkdir(dir_name)
except (FileExistsError, FileNotFoundError):
print("Can't create directory here")
else:
print("Successfully created directory for AGs")
shapes = ['oval', 'oval', 'oval', 'box', 'box', 'box', 'box', 'hexagon', 'hexagon', 'hexagon', 'hexagon', 'hexagon']
in_main_model = set([episode[3] for sequence in state_sequences.values() for episode in sequence])
total_victims = set([attacker_victim.split('->')[1] for attacker_victim in state_sequences.keys()]) # Victim IPs
objectives = _get_objective_nodes(state_sequences)
for victim in total_victims:
print('!!! Rendering AGs for Victim', victim)
for objective in objectives:
print('\t!!!! Objective', objective)
# Get the variants of current objective and attempts per team
team_attack_attempts, observed_obj = _get_attack_attempts(state_sequences, victim, objective, in_main_model)
# If no team has obtained this objective or has targeted this victim, don't generate its AG
if sum([len(attempt) for attempt in team_attack_attempts.values()]) == 0:
continue
ag_name = objective.replace('|', '').replace('_', '').replace('-', '').replace('(', '').replace(')', '')
lines = [(0, 'digraph ' + ag_name + ' {'),
(0, 'rankdir="BT"; \n graph [ nodesep="0.1", ranksep="0.02"] \n node [ fontname=Arial, ' +
'fontsize=24, penwidth=3]; \n edge [ fontname=Arial, fontsize=20,penwidth=5 ];')]
root_node = _translate(objective, root=victim)
lines.append((0, '"' + root_node + '" [shape=doubleoctagon, style=filled, fillcolor=salmon];'))
lines.append((0, '{ rank = max; "' + root_node + '"}'))
# For each variant of objective, add a link to the root node, and determine if it's sink
for obj_variant in list(observed_obj):
lines.append((0, '"' + _translate(obj_variant) + '" -> "' + root_node + '"'))
if _is_severe_sink(obj_variant, sev_sinks):
lines.append((0, '"' + _translate(obj_variant) + '" [style="filled,dotted", fillcolor= salmon]'))
else:
lines.append((0, '"' + _translate(obj_variant) + '" [style=filled, fillcolor= salmon]'))
# All obj variants have the same rank
lines.append((0, '{ rank=same; "' + '" "'.join([_translate(obj) for obj in observed_obj]) + '"}'))
node_signatures = dict()
already_addressed = set()
for team_attacker, attempts in team_attack_attempts.items(): # For each attacker that obtains the objective
color = tcols[team_attacker.split('-')[0]] # Team color
# _print_unique_attempts(team_attacker, attempts)
for attempt in attempts:
# Record signatures
for action in attempt: # action = (vert_name, start_time, end_time, signs, timestamps)
if action[0] not in node_signatures.keys():
node_signatures[action[0]] = set()
node_signatures[action[0]].update(action[3])
# Create nodes for the AG
for vid, action in enumerate(attempt): # Iterate over each action in an attempt
vname = action[0]
if vid == 0: # If first action
if 'Sink' in vname: # If sink, make dotted TODO: this check is always False
lines.append(
(0, '"' + _translate(vname) + '" [style="dotted,filled", fillcolor= yellow]'))
else:
if _is_severe_sink(vname, sev_sinks): # If med- or high-sev sink, make dotted
lines.append(
(0, '"' + _translate(vname) + '" [style="dotted,filled", fillcolor= yellow]'))
already_addressed.add(vname.split('|')[2])
else: # Else, normal starting node
lines.append((0, '"' + _translate(vname) + '" [style=filled, fillcolor= yellow]'))
else: # For other actions
if 'Sink' in vname: # TODO: this check is always false
# Take all AG lines so far, and if it was ever defined before, redefine it to be dotted
already_dotted = False
for line in lines:
if (_translate(vname) in line[1]) and ('dotted' in line[1]) and (
'->' not in line[1]):
already_dotted = True
break
if already_dotted:
continue
partial = '"' + _translate(vname) + '" [style="dotted' # Redefine here
if not sum([True if partial in line[1] else False for line in lines]):
lines.append((0, partial + '"]'))
# Create edges (transitions) for the AG
bigram = zip(attempt, attempt[1:]) # Make bigrams (sliding window of 2)
for vid, ((vname1, time1, _, _, ts1), (vname2, _, _, _, ts2)) in enumerate(bigram):
edge_line = _create_edge_line(vid, vname1, vname2, ts1, ts2, color, team_attacker.split('-')[1])
lines.append((time1, edge_line))
# Go over all vertices again and define their shapes + make high-sev sink states dotted
for vname, signatures in node_signatures.items():
mcat = vname.split('|')[0]
mcat = macro_inv[micro2macro['MicroAttackStage.' + mcat]]
shape = shapes[mcat]
# If it's oval, we don't do anything because it is not a high-sev sink
if shape == shapes[0] or vname.split('|')[2] in already_addressed:
lines.append((0, '"' + _translate(vname) + '" [shape=' + shape + ']'))
else:
if _is_severe_sink(vname, sev_sinks):
lines.append((0, '"' + _translate(vname) + '" [style="dotted", shape=' + shape + ']'))
else:
lines.append((0, '"' + _translate(vname) + '" [shape=' + shape + ']'))
# Add a tooltip with signatures
lines.append((1, '"' + _translate(vname) + '"' + ' [tooltip="' + "\n".join(signatures) + '"]'))
lines.append((1000, '}'))
print('\t Created')
# _print_simplicity(ag_name, lines)
if save_ag:
out_filename = datafile + '-attack-graph-for-victim-' + victim + '-' + ag_name
out_filepath = dir_name + '/' + out_filename
with open(out_filepath + '.dot', 'w') as outfile:
for line in lines:
outfile.write(str(line[1]) + '\n')
os.system("dot -Tpng " + out_filepath + ".dot -o " + out_filepath + ".png")
os.system("dot -Tsvg " + out_filepath + ".dot -o " + out_filepath + ".svg")