Skip to content

Commit

Permalink
Tenable Parser: Support the new "workbench" format (#9804)
Browse files Browse the repository at this point in the history
* Tenable Parser: Support the new "workbench" format

* Update test_tenable_parser.py
  • Loading branch information
FelixHernandez authored Mar 27, 2024
1 parent 22c7783 commit caad884
Show file tree
Hide file tree
Showing 3 changed files with 1,698 additions and 17 deletions.
45 changes: 28 additions & 17 deletions dojo/tools/tenable/csv_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,33 @@ def _format_cpe(self, val):
cpe_match = re.findall(r"cpe:/[^\n\ ]+", val)
return cpe_match if cpe_match else None

def detect_delimiter(self, content: str):
"""Detect the delimiter of the CSV file"""
if isinstance(content, bytes):
content = content.decode("utf-8")
first_line = content.split('\n')[0]
if ';' in first_line:
return ';'
else:
return ',' # default to comma if no semicolon found

def get_findings(self, filename: str, test: Test):
# Read the CSV
content = filename.read()
delimiter = self.detect_delimiter(content)
if isinstance(content, bytes):
content = content.decode("utf-8")
csv.field_size_limit(int(sys.maxsize / 10)) # the request/resp are big
reader = csv.DictReader(io.StringIO(content))
reader = csv.DictReader(io.StringIO(content), delimiter=delimiter)
dupes = {}
# Iterate over each line and create findings
for row in reader:
# title: Could come from "Name" or "Plugin Name"
title = row.get("Name", row.get("Plugin Name"))
title = row.get("Name", row.get("Plugin Name", row.get("asset.name")))
if title is None or title == "":
continue
# severity: Could come from "Severity" or "Risk"
raw_severity = row.get("Risk", "")
raw_severity = row.get("Risk", row.get("severity", ""))
if raw_severity == "":
raw_severity = row.get("Severity", "Info")
# this could actually be a int, so try to convert
Expand All @@ -89,17 +100,17 @@ def get_findings(self, filename: str, test: Test):
# convert the severity to something dojo likes
severity = self._convert_severity(raw_severity)
# Other text fields
description = row.get("Synopsis", "")
mitigation = str(row.get("Solution", "N/A"))
impact = row.get("Description", "N/A")
references = row.get("See Also", "N/A")
description = row.get("Synopsis", row.get("definition.synopsis", "N/A"))
mitigation = str(row.get("Solution", row.get("definition.solution", "N/A")))
impact = row.get("Description", row.get("definition.description", "N/A"))
references = row.get("See Also", row.get("definition.see_also", "N/A"))
# Determine if the current row has already been processed
dupe_key = (
severity
+ title
+ row.get("Host", "No host")
+ str(row.get("Port", "No port"))
+ row.get("Synopsis", "No synopsis")
+ row.get("Host", row.get("asset.host_name", "No host"))
+ str(row.get("Port", row.get("asset.port", "No port")))
+ row.get("Synopsis", row.get("definition.synopsis", "No synopsis"))
)
# Finding has not been detected in the current report. Proceed with
# parsing
Expand All @@ -123,11 +134,11 @@ def get_findings(self, filename: str, test: Test):
).clean_vector(output_prefix=True)

# Add CVSS score if present
cvssv3 = row.get("CVSSv3", "")
cvssv3 = row.get("CVSSv3", row.get("definition.cvss3.base_score", ""))
if cvssv3 != "":
find.cvssv3_score = cvssv3
# manage CPE data
detected_cpe = self._format_cpe(str(row.get("CPE", "")))
detected_cpe = self._format_cpe(str(row.get("CPE", row.get("definition.cpe", ""))))
if detected_cpe:
# FIXME support more than one CPE in Nessus CSV parser
if len(detected_cpe) > 1:
Expand Down Expand Up @@ -156,26 +167,26 @@ def get_findings(self, filename: str, test: Test):

# Determine if there is more details to be included in the
# description
plugin_output = str(row.get("Plugin Output", ""))
plugin_output = str(row.get("Plugin Output", row.get("output", "")))
if plugin_output != "":
find.description += f"\n\n{plugin_output}"
# Process any CVEs
detected_cve = self._format_cve(str(row.get("CVE", "")))
detected_cve = self._format_cve(str(row.get("CVE", row.get("definition.cve", ""))))
if detected_cve:
if isinstance(detected_cve, list):
find.unsaved_vulnerability_ids += detected_cve
else:
find.unsaved_vulnerability_ids.append(detected_cve)
# Endpoint related fields
host = row.get("Host", "")
host = row.get("Host", row.get("asset.host_name", ""))
if host == "":
host = row.get("DNS Name", "")
if host == "":
host = row.get("IP Address", "localhost")

protocol = row.get("Protocol", "")
protocol = row.get("Protocol", row.get("protocol", ""))
protocol = protocol.lower() if protocol != "" else None
port = row.get("Port", "")
port = str(row.get("Port", row.get("asset.port", "")))
if isinstance(port, str) and port in ["", "0"]:
port = None
# Update the endpoints
Expand Down
Loading

0 comments on commit caad884

Please sign in to comment.