Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parser for AWS Inspector2 findings #10829

Merged
merged 7 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/content/en/integrations/parsers/file/aws_inspector2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
title: "AWS Inspector2 Scanner"
toc_hide: true
---

### File Types
AWS Inspector2 report can be imported in json format. Inspector2 name comes from API calls to "modern" Inspector API - `aws inspector2` as opposite to Classic Inspector (previous version of the service), this is an example of how such report can be generated: `aws inspector2 list-findings --filter-criteria '{"resourceId":[{"comparison":"EQUALS","value":"i-instance_id_here"}]}' --region us-east-1 > inspector2_findings.json`


This parser can help to get findings in a delegated admin account for AWS Inspector or in a standalone AWS account. The parser is developed mostly for a scenario where findings are obtained for a specific resource like an ECR image or an instance, and uploaded to a test in a DefectDojo engagement that represents a branch from a git repository.


A minimal valid json file with no findings:

```json
{
"findings": []
}
```

Detailed API response format can be obtained [here](https://docs.aws.amazon.com/inspector/v2/APIReference/API_Finding.html)

### Sample Scan Data
Sample AWS Inspector2 findings can be found [here](https://github.com/DefectDojo/django-DefectDojo/tree/master/unittests/scans/aws_inspector2).
2 changes: 1 addition & 1 deletion dojo/settings/.settings.dist.py.sha256sum
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6a90a111e2b89eb2c400945c80ff76c64b135d78b84fdf6b09a6b83569946904
57b8bdd16694269a9ce711583e476037eb2c0cd8c7e2d814270c0318a9a8d0aa
3 changes: 3 additions & 0 deletions dojo/settings/settings.dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,7 @@ def saml2_attrib_map_format(dict):
"Kiuwan SCA Scan": ["description", "severity", "component_name", "component_version", "cwe"],
"Rapplex Scan": ["title", "endpoints", "severity"],
"AppCheck Web Application Scanner": ["title", "severity"],
"AWS Inspector2 Scan": ["title", "severity"],
Maffooch marked this conversation as resolved.
Show resolved Hide resolved
"Legitify Scan": ["title", "endpoints", "severity"],
"ThreatComposer Scan": ["title", "description"],
"Invicti Scan": ["title", "description", "severity"],
Expand Down Expand Up @@ -1350,6 +1351,7 @@ def saml2_attrib_map_format(dict):
"Wazuh": True,
"Nuclei Scan": True,
"Threagile risks report": True,
"AWS Inspector2 Scan": True,
}

# List of fields that are known to be usable in hash_code computation)
Expand Down Expand Up @@ -1510,6 +1512,7 @@ def saml2_attrib_map_format(dict):
"Kiuwan SCA Scan": DEDUPE_ALGO_HASH_CODE,
"Rapplex Scan": DEDUPE_ALGO_HASH_CODE,
"AppCheck Web Application Scanner": DEDUPE_ALGO_HASH_CODE,
"AWS Inspector2 Scan": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE,
"Legitify Scan": DEDUPE_ALGO_HASH_CODE,
"ThreatComposer Scan": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE,
"Invicti Scan": DEDUPE_ALGO_HASH_CODE,
Expand Down
Empty file.
255 changes: 255 additions & 0 deletions dojo/tools/aws_inspector2/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import json
from datetime import UTC, datetime

from dateutil import parser as date_parser

from dojo.models import Endpoint, Finding


class AWSInspector2Parser:

"""Import AWS Inspector2 json."""

def get_scan_types(self):
return ["AWS Inspector2 Scan"]

def get_label_for_scan_types(self, scan_type):
return "AWS Inspector2 Scan"

def get_description_for_scan_types(self, scan_type):
return "AWS Inspector2 report file can be imported in JSON format (aws inspector2 list-findings)."

def get_findings(self, file, test):
tree = json.load(file)
raw_findings = tree.get("findings", None)
if not isinstance(raw_findings, list):
msg = "Incorrect Inspector2 report format"
raise TypeError(msg)
self.test = test
findings = []
for raw_finding in raw_findings:
finding = self.get_base_finding(raw_finding)
# type specific details
finding_type = raw_finding.get("type", None)
if finding_type == "PACKAGE_VULNERABILITY":
finding = self.get_package_vulnerability(finding, raw_finding)
elif finding_type == "CODE_VULNERABILITY":
finding = self.get_code_vulnerability(finding, raw_finding)
elif finding_type == "NETWORK_REACHABILITY":
finding = self.get_network_reachability(finding, raw_finding)
else:
msg = "Incorrect Inspector2 report format"
raise TypeError(msg)
# process the endpoints
finding = self.process_endpoints(finding, raw_finding)
findings.append(finding)

return findings

def get_severity(self, severity_string):
if severity_string == "UNTRIAGED":
severity_string = "Info"
return severity_string.title()

def get_base_finding(self, raw_finding: dict) -> Finding:
# basic fields
finding_id = raw_finding.get("findingArn")
title = raw_finding.get("title", "The title could not be identified...")
description = ""
if (aws_account := raw_finding.get("awsAccountId")) is not None:
description += f"**AWS Account**: {aws_account}\n"
if finding_id is not None:
description += f"**Finding ARN**: {finding_id}\n"
if (inspector_score := raw_finding.get("inspectorScore")) is not None:
description += f"Inspector score: {inspector_score}\n"
if (discovered_at := raw_finding.get("firstObservedAt")) is not None:
description += f"Discovered at: {discovered_at}\n"
if (last_seen_at := raw_finding.get("lastObservedAt")) is not None:
description += f"Last seen: {last_seen_at}\n"
if (orig_description := raw_finding.get("description")) is not None:
description += f"Original description: \n{orig_description}\n"
finding = Finding(
title=title,
test=self.test,
description=description,
severity=self.get_severity(raw_finding.get("severity", "Info")),
unique_id_from_tool=finding_id,
static_finding=True,
dynamic_finding=False,
)
# set mitigation status
if raw_finding.get("status", "ACTIVE") == "ACTIVE":
mitigated = None
is_mitigated = False
active = True
else:
is_mitigated = True
active = False
if (last_observed := raw_finding.get("lastObservedAt", None)) is not None:
mitigated = date_parser(last_observed)
else:
mitigated = datetime.now(UTC)
finding.active = active
finding.is_mitigated = is_mitigated
finding.mitigated = mitigated
# EPSS
finding.epss_score = raw_finding.get("epss", {}).get("score", None)

return finding

def get_package_vulnerability(self, finding: Finding, raw_finding: dict) -> Finding:
vulnerability_details = raw_finding.get("packageVulnerabilityDetails", {})
vulnerability_packages_descriptions = "\n".join(
[
(
f'*Vulnerable package*: {vulnerability_package.get("name", "N/A")}\n'
f'\tpackage manager: {vulnerability_package.get("packageManager", "N/A")}\n'
f'\tversion: {vulnerability_package.get("version", "N/A")}\n'
f'\tfixed version: {vulnerability_package.get("fixedInVersion", "N/A")}\n'
f'\tremediation: {vulnerability_package.get("remediation", "N/A")}\n'
)
for vulnerability_package in vulnerability_details.get("vulnerablePackages", [])
],
)
if (vulnerability_id := vulnerability_details.get("vulnerabilityId", None)) is not None:
finding.unsaved_vulnerability_ids = [vulnerability_id]
vulnerability_source = vulnerability_details.get("source")
vulnerability_source_url = vulnerability_details.get("sourceUrl")
# populate fields
if vulnerability_source is not None and vulnerability_source_url is not None:
finding.url = vulnerability_source_url
finding.description += (
"\n**Additional info**\n"
f"Vulnerability info from: {vulnerability_source} {vulnerability_source_url}\n"
"Affected packages:\n"
f"{vulnerability_packages_descriptions}\n"
)

return finding

def get_code_vulnerability(self, finding: Finding, raw_finding: dict) -> Finding:
cwes = raw_finding.get("cwes", [])
detector_id = raw_finding.get("detectorId", "N/A")
detector_name = raw_finding.get("detectorName", "N/A")
file_path_info = raw_finding.get("filePath", {})
file_name = file_path_info.get("fileName", "N/A")
file_path = file_path_info.get("filePath", "N/A")
start_line = file_path_info.get("startLine", "N/A")
end_line = file_path_info.get("endLine", "N/A")
detector_tags = ", ".join(raw_finding.get("detectorTags", []))
reference_urls = ", ".join(raw_finding.get("referenceUrls", []))
rule_id = raw_finding.get("ruleId", "N/A")
layer_arn = raw_finding.get("sourceLambdaLayerArn", "N/A")
string_cwes = ", ".join(cwes)
# populate fields
finding.cwe = cwes[0] if cwes else None
finding.file_path = f"{file_path}{file_name}"
finding.sast_source_file_path = f"{file_path}{file_name}"
finding.line = start_line
finding.sast_source_line = start_line
finding.description += (
"\n**Additional info**\n"
f"CWEs: {string_cwes}\n"
f"Vulnerability info from: {detector_id} {detector_name}\n"
f"Rule: {rule_id}\n"
f"Lines: {start_line} - {end_line}\n"
f"Tags: {detector_tags or 'N/A'}\n"
f"URLs: {reference_urls or 'N/A'}\n"
f"Lambda layer ARN: {layer_arn}\n"
)

return finding

def get_network_reachability(self, finding: Finding, raw_finding: dict) -> Finding:
network_path_info = raw_finding.get("networkPath", {})
network_path_steps = network_path_info.get("steps", [])
steps_descriptions = "\n".join(
[
f'steps:\n{step_number}: {step.get("componentId", "N/A")} {step.get("componentType", "N/A")}'
for step_number, step in enumerate(network_path_steps)
],
)
open_port_range_info = raw_finding.get("openPortRange", {})
port_range_start = open_port_range_info.get("begin", "N/A")
port_range_end = open_port_range_info.get("end", "N/A")
protocol = raw_finding.get("protocol", "N/A")
finding.description += (
"\n**Additional info**\n"
f"protocol {protocol}, port range {port_range_start} - {port_range_end}"
f"{steps_descriptions}\n"
)

return finding

def process_endpoints(self, finding: Finding, raw_finding: dict) -> Finding:
impact = []
endpoints = []
for resource_info in raw_finding.get("resources", {}):
resource_type = resource_info.get("type", None)
resource_id = resource_info.get("id", "N/A")
resource_details = resource_info.get("details", {})
endpoint_host = f"{resource_type} - {resource_id}"
if resource_type == "AWS_EC2_INSTANCE":
aws_account = raw_finding.get("awsAccountId")
resource_region = resource_info.get("region", "N/A")
endpoint_host = resource_id
ec2_instance_details = resource_details.get("awsEc2Instance", None)
if ec2_instance_details:
impact.extend(
(
f"ARN: {resource_id}",
f"Image ID: {ec2_instance_details.get('imageId', 'N/A')}",
f"IPv4 address: {ec2_instance_details.get('ipV4Addresses', 'N/A')}",
f"Subnet: {ec2_instance_details.get('subnetId', 'N/A')}",
f"VPC: {ec2_instance_details.get('vpcId', 'N/A')}",
f"Region: {resource_region}",
f"AWS Account: {aws_account}",
f"Launched at: {ec2_instance_details.get('launchedAt', 'N/A')}",
"---",
),
)
elif resource_type == "AWS_ECR_CONTAINER_IMAGE":
image_id = resource_id.split("repository/")[1].replace("sha256:", "").replace("/", "-")
endpoint_host = image_id
ecr_image_details = resource_details.get("awsEcrContainerImage", None)
if ecr_image_details:
impact.extend(
(
f"ARN: {resource_id}",
f"Registry: {ecr_image_details.get('registry', 'N/A')}",
f"Repository: {ecr_image_details.get('repositoryName', 'N/A')}",
f"Hash: {ecr_image_details.get('imageHash', 'N/A')}",
f"Author: {ecr_image_details.get('author', 'N/A')}",
f"Pushed at: {ecr_image_details.get('pushedAt', 'N/A')}",
"---",
),
)
elif resource_type == "AWS_ECR_REPOSITORY":
# no corresponding
# key present in
# https://docs.aws.amazon.com/inspector/v2/APIReference/API_ResourceDetails.html
pass
elif resource_type == "AWS_LAMBDA_FUNCTION":
lambda_id = resource_id.split("function:")[1].replace(":", "-").replace("/", "-")
endpoint_host = lambda_id
lambda_details = resource_details.get("awsLambdaFunction", None)
if lambda_details:
impact.extend(
(
f"ARN: {resource_id}",
f"Name: {lambda_details.get('functionName', 'N/A')}",
f"Version: {lambda_details.get('version', 'N/A')}",
f"Runtime: {lambda_details.get('runtime', 'N/A')}",
f"Hash: {lambda_details.get('codeSha256', 'N/A')}",
f"Pushed at: {lambda_details.get('lastModifiedAt', 'N/A')}",
),
)
else:
msg = "Incorrect Inspector2 report format"
raise TypeError(msg)
endpoints.append(Endpoint(host=endpoint_host))
finding.impact = "\n".join(impact)
finding.unsaved_endpoints = []
finding.unsaved_endpoints.extend(endpoints)

return finding
Loading
Loading