-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
277 lines (232 loc) · 6.75 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
from re import match
from flask import (
Flask,
redirect,
render_template,
url_for,
request
)
from db import (
create_connection,
find_id_by_params,
execute_query,
insert_data
)
from crawler import (
CrawlerManager
)
from crawler_handler import (
crawler_handler
)
app = Flask(__name__)
@app.route('/events')
def events():
conn = create_connection()
events = execute_query(
conn=conn,
query="SELECT Event.event_date, Cve.name, Event.source "\
"FROM Event LEFT JOIN Cve ORDER BY Event.id DESC;",
)
return render_template(
'events.html',
title="Events",
header1=f"{len(events)} Events occurred during the program run",
events=events
)
@app.route('/vulnerabilities')
def vulnerabilities():
conn = create_connection()
cves = execute_query(
conn=conn,
query="SELECT name, cvss, belonging, cwe_id, published_date FROM Cve;",
)
return render_template(
'vulnerabilities.html',
title="Vulnerabilities",
header1=f"{len(cves)} Vulnerabilities were crawled and parsed",
cves=cves
)
@app.route('/cve/<cve_id>', methods=['GET', 'POST'])
def cve(cve_id: str):
conn = create_connection()
regex_check = match(
r"[Cc][Vv][Ee]\-\d{4}\-\d{2,5}",
cve_id
)
cve_not_found_page = render_template(
'content.html',
title="Page Not Found",
header1=f"Cve {str(cve_id)} was not found in base",
content="We apologize, this Cve is not in the database, \
most likely it was not processed by this mechanism \
due to not being in the time slot. \
If you are trying to phase this page on purpose, \
please give up, it's a silly idea"
)
if not regex_check:
return cve_not_found_page
found_cve_id = find_id_by_params(
conn=conn,
table_name='Cve',
search_query={
'name': cve_id
}
)
if not found_cve_id:
return cve_not_found_page
query_technique_id = request.args.get('technique')
if query_technique_id:
found_technique_id = find_id_by_params(
conn=conn,
table_name='MitreTechnique',
search_query={
'technique_id': query_technique_id
}
)
if found_technique_id:
insert_data(
conn=conn,
table_name='CveTechnique',
query={
'cve_id': found_cve_id,
'technique_id': found_technique_id
}
)
cve_info = execute_query(
conn=conn,
query="SELECT cvss, belonging, cwe_id, published_date, name FROM Cve WHERE id=?",
values=[found_cve_id],
is_one=True
)
descriptions = execute_query(
conn=conn,
query=f"SELECT * from Description WHERE cve_id=? ORDER BY priority ASC;",
values=[found_cve_id]
)
techniques = execute_query(
conn=conn,
query=f"SELECT technique_id, name FROM MitreTechnique WHERE id IN "\
"(SELECT technique_id FROM CveTechnique WHERE cve_id = ?);",
values=[found_cve_id]
)
links = execute_query(
conn=conn,
query=f"SELECT uri FROM Link WHERE cve_id=?;",
values=[found_cve_id]
)
return render_template(
'cve_page.html',
title=f"{cve_id}",
header1=f"Vulnerability {str(cve_id)} information",
cve_info=cve_info,
descriptions=descriptions,
techniques=techniques,
links=links
)
@app.route('/search_cve', methods=['GET'])
def search_cve():
query = request.args.get('query')
conn = create_connection()
response = execute_query(
conn=conn,
query="SELECT name, published_date FROM Cve WHERE id IN "\
"(SELECT cve_id FROM Description WHERE value LIKE '%' || ? || '%');",
values=[query]
)
return render_template(
'cve_search.html',
title="Search CVE",
header1=f"Results for query: {query}",
cves=response
)
@app.route('/cwe/<cwe_id>')
def cwe(cwe_id: str):
conn = create_connection()
regex_check = match(
r"\d{1,4}",
cwe_id
)
error_page = render_template(
'content.html',
title="Page Not Found",
header1=f"CWE-{cwe_id} was not found in base",
content="We apologize, this CWE is not in the database"
)
if not regex_check:
return error_page
cwe = execute_query(
conn=conn,
query=f"SELECT id, description from Cwe WHERE id=?",
values=[cwe_id],
is_one=True
)
if not cwe:
return error_page
return render_template(
'content.html',
title=f"CWE-{str(cwe_id)}",
header1=f"CWE-{cwe['id']} information",
content=cwe['description']
)
@app.route('/techniques/<technique_id>')
def techniques(technique_id: str):
conn = create_connection()
regex_check = match(
r"T\d{4}",
technique_id
)
error_page = render_template(
'content.html',
title="Page Not Found",
header1=f"CWE-{str(technique_id)} was not found in base",
content="We apologize, this MITRE technique is not in the database"
)
if not regex_check:
return error_page
found_technique_id = find_id_by_params(
conn=conn,
table_name='MitreTechnique',
search_query={
'technique_id': technique_id
}
)
if not found_technique_id:
return error_page
technique = execute_query(
conn=conn,
query=f"SELECT technique_id, name, description, belonging "\
"from MitreTechnique WHERE id=?",
values=[found_technique_id],
is_one=True
)
return render_template(
'content.html',
title=f"{technique['name']} ({technique['technique_id']})",
header1=f"{technique['name']} ({technique['technique_id']}) - {technique['belonging']}",
content=technique['description']
)
@app.route('/')
@app.route('/about')
def about():
return render_template(
'content.html',
title="About",
header1="Page about project",
content="The project is aimed at collecting \
and centralizing vulnerability data (CVE) \
from various sources. It aims to provide \
convenient access to vulnerability information, \
assisting users in quickly obtaining up-to-date \
data."
)
@app.route('/update')
def update():
cm = CrawlerManager()
crawler_handler(
sources=cm.crawl_sources()
)
return redirect(
url_for('events'),
code=302,
Response=None
)