Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tenable Parser: Support the new "workbench" format #9804

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions dojo/tools/tenable/csv_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,33 @@ def _format_cpe(self, val):
cpe_match = re.findall(r"cpe:/[^\n\ ]+", val)
return cpe_match if cpe_match else None

def detect_delimiter(self, content: str):
"""Detect the delimiter of the CSV file"""
if isinstance(content, bytes):
content = content.decode("utf-8")
first_line = content.split('\n')[0]
if ';' in first_line:
return ';'
else:
return ',' # default to comma if no semicolon found

def get_findings(self, filename: str, test: Test):
# Read the CSV
content = filename.read()
delimiter = self.detect_delimiter(content)
if isinstance(content, bytes):
content = content.decode("utf-8")
csv.field_size_limit(int(sys.maxsize / 10)) # the request/resp are big
reader = csv.DictReader(io.StringIO(content))
reader = csv.DictReader(io.StringIO(content), delimiter=delimiter)
dupes = {}
# Iterate over each line and create findings
for row in reader:
# title: Could come from "Name" or "Plugin Name"
title = row.get("Name", row.get("Plugin Name"))
title = row.get("Name", row.get("Plugin Name", row.get("asset.name")))
if title is None or title == "":
continue
# severity: Could come from "Severity" or "Risk"
raw_severity = row.get("Risk", "")
raw_severity = row.get("Risk", row.get("severity", ""))
if raw_severity == "":
raw_severity = row.get("Severity", "Info")
# this could actually be a int, so try to convert
Expand All @@ -89,17 +100,17 @@ def get_findings(self, filename: str, test: Test):
# convert the severity to something dojo likes
severity = self._convert_severity(raw_severity)
# Other text fields
description = row.get("Synopsis", "")
mitigation = str(row.get("Solution", "N/A"))
impact = row.get("Description", "N/A")
references = row.get("See Also", "N/A")
description = row.get("Synopsis", row.get("definition.synopsis", "N/A"))
mitigation = str(row.get("Solution", row.get("definition.solution", "N/A")))
impact = row.get("Description", row.get("definition.description", "N/A"))
references = row.get("See Also", row.get("definition.see_also", "N/A"))
# Determine if the current row has already been processed
dupe_key = (
severity
+ title
+ row.get("Host", "No host")
+ str(row.get("Port", "No port"))
+ row.get("Synopsis", "No synopsis")
+ row.get("Host", row.get("asset.host_name", "No host"))
+ str(row.get("Port", row.get("asset.port", "No port")))
+ row.get("Synopsis", row.get("definition.synopsis", "No synopsis"))
)
# Finding has not been detected in the current report. Proceed with
# parsing
Expand All @@ -123,11 +134,11 @@ def get_findings(self, filename: str, test: Test):
).clean_vector(output_prefix=True)

# Add CVSS score if present
cvssv3 = row.get("CVSSv3", "")
cvssv3 = row.get("CVSSv3", row.get("definition.cvss3.base_score", ""))
if cvssv3 != "":
find.cvssv3_score = cvssv3
# manage CPE data
detected_cpe = self._format_cpe(str(row.get("CPE", "")))
detected_cpe = self._format_cpe(str(row.get("CPE", row.get("definition.cpe", ""))))
if detected_cpe:
# FIXME support more than one CPE in Nessus CSV parser
if len(detected_cpe) > 1:
Expand Down Expand Up @@ -156,26 +167,26 @@ def get_findings(self, filename: str, test: Test):

# Determine if there is more details to be included in the
# description
plugin_output = str(row.get("Plugin Output", ""))
plugin_output = str(row.get("Plugin Output", row.get("output", "")))
if plugin_output != "":
find.description += f"\n\n{plugin_output}"
# Process any CVEs
detected_cve = self._format_cve(str(row.get("CVE", "")))
detected_cve = self._format_cve(str(row.get("CVE", row.get("definition.cve", ""))))
if detected_cve:
if isinstance(detected_cve, list):
find.unsaved_vulnerability_ids += detected_cve
else:
find.unsaved_vulnerability_ids.append(detected_cve)
# Endpoint related fields
host = row.get("Host", "")
host = row.get("Host", row.get("asset.host_name", ""))
if host == "":
host = row.get("DNS Name", "")
if host == "":
host = row.get("IP Address", "localhost")

protocol = row.get("Protocol", "")
protocol = row.get("Protocol", row.get("protocol", ""))
protocol = protocol.lower() if protocol != "" else None
port = row.get("Port", "")
port = str(row.get("Port", row.get("asset.port", "")))
if isinstance(port, str) and port in ["", "0"]:
port = None
# Update the endpoints
Expand Down
Loading
Loading