Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎇 refactor sonarqube and add JSON parsing for api export #9734

Merged
merged 26 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions docs/content/en/integrations/parsers/file/sonarqube.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,26 @@
title: "SonarQube"
toc_hide: true
---
## SonarQube Scan (Aggregates findings per cwe, title, description, file\_path.)
# SonarQube Scan
There are two ways to retrieve findings from SonarQube. You can either use the [soprasteria package](https://github.com/soprasteria/sonar-report) or the SonarQube REST API directly.
Both ways (**SonarQube REST API** and **Soprasteria**) are depicted below.

### Sample Scan Data
Sample SonarQube scans can be found [here](https://github.com/DefectDojo/django-DefectDojo/tree/master/unittests/scans/sonarqube).

## SonarQube REST API
You can retrieve the JSON directly from SonarQube if you use one of the following REST API endpoint:
- `<sonarqubeurl>/api/issues/search?projects=<projectkey>`
- `<sonarqubeurl>/api/hotspots/search?projectKey=<projectkey>`

### JSON
The REST API JSON output can be uploaded to DefectDojo with "SonarQube Scan".

### ZIP
If you have too many findings in one project, you can implement a small script to handle pagination and put all JSON files in a .zip file. This zip file can also be parsed from DefectDojo with "SonarQube Scan".

## Soprasteria
### Soprasteria SonarQube Scan (Aggregates findings per cwe, title, description, file\_path.)

SonarQube output file can be imported in HTML format or JSON format. JSON format generated by options `--save-report-json` and have same behavior with HTML format.

Expand All @@ -12,7 +31,7 @@ To generate the report, see
Version: \>= 1.1.0
Recommend version for both format \>= 3.1.2

## SonarQube Scan Detailed (Import all findings from SonarQube html report.)
### Soprasteria SonarQube Scan Detailed (Import all findings from SonarQube html report.)
manuel-sommer marked this conversation as resolved.
Show resolved Hide resolved

SonarQube output file can be imported in HTML format or JSON format. JSON format generated by options `--save-report-json` and have same behavior with HTML format.

Expand All @@ -23,5 +42,4 @@ Version: \>= 1.1.0.
Recommend version for both format \>= 3.1.2


### Sample Scan Data
Sample SonarQube scans can be found [here](https://github.com/DefectDojo/django-DefectDojo/tree/master/unittests/scans/sonarqube).

manuel-sommer marked this conversation as resolved.
Show resolved Hide resolved
317 changes: 24 additions & 293 deletions dojo/tools/sonarqube/parser.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import logging
import re

from django.utils.html import strip_tags
from dojo.tools.sonarqube.soprasteria_json import SonarQubeSoprasteriaJSON
from dojo.tools.sonarqube.soprasteria_html import SonarQubeSoprasteriaHTML
from dojo.tools.sonarqube.sonarqube_restapi_json import SonarQubeRESTAPIJSON
from dojo.tools.sonarqube.sonarqube_restapi_zip import SonarQubeRESTAPIZIP
from lxml import etree
import zipfile
import json

from dojo.models import Finding

logger = logging.getLogger(__name__)


Expand All @@ -24,301 +23,33 @@ def get_label_for_scan_types(self, scan_type):

def get_description_for_scan_types(self, scan_type):
if scan_type == "SonarQube Scan":
return "Aggregates findings per cwe, title, description, file_path. SonarQube output file can be imported in HTML format or JSON format. Generate with https://github.com/soprasteria/sonar-report version >= 1.1.0, recommend version >= 3.1.2"
return "Aggregates findings per cwe, title, description, file_path. SonarQube output file can be imported in HTML format or JSON format. You can get the JSON output directly if you use the SonarQube API or generate with https://github.com/soprasteria/sonar-report version >= 1.1.0, recommend version >= 3.1.2"
else:
return "Import all findings from sonarqube html report or JSON format. SonarQube output file can be imported in HTML format or JSON format. Generate with https://github.com/soprasteria/sonar-report version >= 1.1.0, recommend version >= 3.1.2"

def get_findings(self, filename, test):
if filename.name.strip().lower().endswith(".json"):
json_content = json.load(filename)
return self.get_json_items(json_content, test, self.mode)
def get_findings(self, file, test):
if file.name.endswith(".json"):
json_content = json.load(file)
if json_content.get("date") and json_content.get("projectName") and json_content.get("hotspotKeys"):
return SonarQubeSoprasteriaJSON().get_json_items(json_content, test, self.mode)
elif json_content.get("paging") and json_content.get("components"):
return SonarQubeRESTAPIJSON().get_json_items(json_content, test, self.mode)
else:
return []
if file.name.endswith(".zip"):
if str(file.__class__) == "<class '_io.TextIOWrapper'>":
input_zip = zipfile.ZipFile(file.name, 'r')
else:
input_zip = zipfile.ZipFile(file, 'r')
zipdata = {name: input_zip.read(name) for name in input_zip.namelist()}
return SonarQubeRESTAPIZIP().get_items(zipdata, test, self.mode)
else:
parser = etree.HTMLParser()
tree = etree.parse(filename, parser)
tree = etree.parse(file, parser)
if self.mode not in [None, "detailed"]:
raise ValueError(
"Internal error: Invalid mode "
+ self.mode
+ ". Expected: one of None, 'detailed'"
)

return self.get_items(tree, test, self.mode)

def get_json_items(self, json_content, test, mode):
dupes = dict()
rules = json_content["rules"]
issues = json_content["issues"]
for issue in issues:
key = issue["key"]
line = str(issue["line"])
mitigation = issue["message"]
title = issue["description"]
file_path = issue["component"]
severity = self.convert_sonar_severity(issue["severity"])
rule_id = issue["rule"]

if title is None or mitigation is None:
raise ValueError(
"Parser ValueError: can't find a title or a mitigation for vulnerability of name "
+ rule_id
)

try:
issue_detail = rules[rule_id]
parser = etree.HTMLParser()
html_desc_as_e_tree = etree.fromstring(issue_detail["htmlDesc"], parser)
issue_description = self.get_description(html_desc_as_e_tree)
logger.debug(issue_description)
issue_references = self.get_references(
rule_id, html_desc_as_e_tree
)
issue_cwe = self.get_cwe(issue_references)
except KeyError:
issue_description = "No description provided"
issue_references = ""
issue_cwe = 0

if mode is None:
self.process_result_file_name_aggregated(
test,
dupes,
title,
issue_cwe,
issue_description,
file_path,
line,
severity,
mitigation,
issue_references,
)
else:
self.process_result_detailed(
test,
dupes,
title,
issue_cwe,
issue_description,
file_path,
line,
severity,
mitigation,
issue_references,
key,
)
return list(dupes.values())

def get_items(self, tree, test, mode):
# Check that there is at least one vulnerability (the vulnerabilities
# table is absent when no vuln are found)
detailTbody = tree.xpath(
"/html/body/div[contains(@class,'detail')]/table/tbody"
)
dupes = dict()
if len(detailTbody) == 2:
# First is "Detail of the Detected Vulnerabilities" (not present if no vuln)
# Second is "Known Security Rules"
vulnerabilities_table = list(detailTbody[0].iter("tr"))
rules_table = list(detailTbody[1].xpath("tr"))

# iterate over the rules once to get the information we need
rulesDic = dict()
for rule in rules_table:
rule_properties = list(rule.iter("td"))
rule_name = list(rule_properties[0].iter("a"))[0].text.strip()
rule_details = list(rule_properties[1].iter("details"))[0]
rulesDic[rule_name] = rule_details

for vuln in vulnerabilities_table:
vuln_properties = list(vuln.iter("td"))
rule_key = list(vuln_properties[0].iter("a"))[0].text
vuln_rule_name = rule_key and rule_key.strip()
vuln_severity = self.convert_sonar_severity(
vuln_properties[1].text and vuln_properties[1].text.strip()
)
vuln_file_path = vuln_properties[2].text and vuln_properties[2].text.strip()
vuln_line = vuln_properties[3].text and vuln_properties[3].text.strip()
vuln_title = vuln_properties[4].text and vuln_properties[4].text.strip()
vuln_mitigation = vuln_properties[5].text and vuln_properties[5].text.strip()
vuln_key = vuln_properties[6].text and vuln_properties[6].text.strip()
if vuln_title is None or vuln_mitigation is None:
raise ValueError(
"Parser ValueError: can't find a title or a mitigation for vulnerability of name "
+ vuln_rule_name
)
try:
vuln_details = rulesDic[vuln_rule_name]
vuln_description = self.get_description(vuln_details)
vuln_references = self.get_references(
vuln_rule_name, vuln_details
)
vuln_cwe = self.get_cwe(vuln_references)
except KeyError:
vuln_description = "No description provided"
vuln_references = ""
vuln_cwe = 0
if mode is None:
self.process_result_file_name_aggregated(
test,
dupes,
vuln_title,
vuln_cwe,
vuln_description,
vuln_file_path,
vuln_line,
vuln_severity,
vuln_mitigation,
vuln_references,
)
else:
self.process_result_detailed(
test,
dupes,
vuln_title,
vuln_cwe,
vuln_description,
vuln_file_path,
vuln_line,
vuln_severity,
vuln_mitigation,
vuln_references,
vuln_key,
)
return list(dupes.values())

# Process one vuln from the report for "SonarQube Scan detailed"
# Create the finding and add it into the dupes list
def process_result_detailed(
self,
test,
dupes,
vuln_title,
vuln_cwe,
vuln_description,
vuln_file_path,
vuln_line,
vuln_severity,
vuln_mitigation,
vuln_references,
vuln_key,
):
# vuln_key is the unique id from tool which means that there is
# basically no aggregation except real duplicates
aggregateKeys = "{}{}{}{}{}".format(
vuln_cwe, vuln_title, vuln_description, vuln_file_path, vuln_key
)
find = Finding(
title=vuln_title,
cwe=int(vuln_cwe),
description=vuln_description,
file_path=vuln_file_path,
line=vuln_line,
test=test,
severity=vuln_severity,
mitigation=vuln_mitigation,
references=vuln_references,
false_p=False,
duplicate=False,
out_of_scope=False,
mitigated=None,
impact="No impact provided",
static_finding=True,
dynamic_finding=False,
unique_id_from_tool=vuln_key,
)
dupes[aggregateKeys] = find

# Process one vuln from the report for "SonarQube Scan"
# Create the finding and add it into the dupes list
# For aggregated findings:
# - the description is enriched with each finding line number
# - the mitigation (message) is concatenated with each finding's mitigation value
def process_result_file_name_aggregated(
self,
test,
dupes,
vuln_title,
vuln_cwe,
vuln_description,
vuln_file_path,
vuln_line,
vuln_severity,
vuln_mitigation,
vuln_references,
):
aggregateKeys = "{}{}{}{}".format(
vuln_cwe, vuln_title, vuln_description, vuln_file_path
)
descriptionOneOccurence = "Line: {}".format(vuln_line)
if aggregateKeys not in dupes:
find = Finding(
title=vuln_title,
cwe=int(vuln_cwe),
description=vuln_description
+ "\n\n-----\nOccurences:\n"
+ descriptionOneOccurence,
file_path=vuln_file_path,
# No line number because we have aggregated different
# vulnerabilities that may have different line numbers
test=test,
severity=vuln_severity,
mitigation=vuln_mitigation,
references=vuln_references,
false_p=False,
duplicate=False,
out_of_scope=False,
mitigated=None,
impact="No impact provided",
static_finding=True,
dynamic_finding=False,
nb_occurences=1,
)
dupes[aggregateKeys] = find
else:
# We have already created a finding for this aggregate: updates the
# description, nb_occurences and mitigation (message field in the
# report which may vary for each vuln)
find = dupes[aggregateKeys]
find.description = "{}\n{}".format(
find.description, descriptionOneOccurence
)
find.mitigation = "{}\n______\n{}".format(
find.mitigation, vuln_mitigation
)
find.nb_occurences = find.nb_occurences + 1

def convert_sonar_severity(self, sonar_severity):
sev = sonar_severity.lower()
if sev == "blocker":
return "Critical"
elif sev == "critical":
return "High"
elif sev == "major":
return "Medium"
elif sev == "minor":
return "Low"
else:
return "Info"

def get_description(self, vuln_details):
rule_description = etree.tostring(
vuln_details, pretty_print=True
).decode("utf-8", errors="replace")
rule_description = rule_description.split("<h2>See", 1)[0]
rule_description = (str(rule_description)).replace("<h2>", "**")
rule_description = (str(rule_description)).replace("</h2>", "**")
rule_description = strip_tags(rule_description).strip()
return rule_description

def get_references(self, rule_name, vuln_details):
rule_references = rule_name
for a in vuln_details.iter("a"):
rule_references += "\n" + str(a.text)
return rule_references

def get_cwe(self, vuln_references):
# Match only the first CWE!
cweSearch = re.search("CWE-([0-9]*)", vuln_references, re.IGNORECASE)
if cweSearch:
return cweSearch.group(1)
else:
return 0
return SonarQubeSoprasteriaHTML().get_items(tree, test, self.mode)
Loading
Loading