Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding the new NIST CVE Tracking feature for 3.x release. #365

Open
wants to merge 4 commits into
base: next3.0
Choose a base branch
from

Conversation

bminnix
Copy link

@bminnix bminnix commented Aug 8, 2024

Closes: #80

This issue was marked as closed, but then briefly discussed in #81 that is open regarding last_modified_date (or similar). See summary below.

What's Changed

Summary:

New "NIST - Software CVE Search" Job

image

Job Results

image

@bminnix bminnix force-pushed the app-nist-cve-sync3.0 branch 3 times, most recently from 9e9de39 to 7cf3d04 Compare August 10, 2024 06:31
@bminnix bminnix changed the title Adding the new NIST CVE Tracking feature to the next3.0 base. Adding the new NIST CVE Tracking feature for 3.x release. Aug 10, 2024
nautobot_device_lifecycle_mgmt/jobs/cve_tracking.py Outdated Show resolved Hide resolved
"""Initializing job with extra options."""
super().__init__()
self.nist_api_key = getenv("NIST_API_KEY")
self.sleep_timer = 0.75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configurable? Feels a bit arbitrary.


# Set session attributes for retries
self.session = Session()
retries = Retry(
Copy link
Contributor

@progala progala Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configurable?

nautobot_device_lifecycle_mgmt/jobs/cve_tracking.py Outdated Show resolved Hide resolved
nautobot_device_lifecycle_mgmt/jobs/cve_tracking.py Outdated Show resolved Hide resolved
nautobot_device_lifecycle_mgmt/jobs/cve_tracking.py Outdated Show resolved Hide resolved
nautobot_device_lifecycle_mgmt/jobs/cve_tracking.py Outdated Show resolved Hide resolved
"""Converts CVE info into DLC Model compatibility."""
cve = cve_json

# cve_base = cve["vulnerabilities"][0]['cve']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the last commit resolved this.

nautobot_device_lifecycle_mgmt/jobs/cve_tracking.py Outdated Show resolved Hide resolved

class Migration(migrations.Migration):
dependencies = [
("nautobot_device_lifecycle_mgmt", "0030_delete_models_migrated_to_core"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to rename/regenerate this. I had to pull in migration for Nautobot 2.3 compatibility into next3.0

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been regenerated.

bminnix and others added 3 commits August 12, 2024 15:15
Creating a batched commit with several of the syntax update recommendations.

Co-authored-by: Przemek Rogala <progala@progala.net>
def __init__(self):
"""Initializing job with extra options."""
super().__init__()
self.nist_api_key = getenv("NAUTOBOT_DLM_NIST_API_KEY")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this again. We should move this to the plugin config and set the default in app init. Then we can do:

self.nist_api_key = PLUGIN_CFG["NAUTOBOT_DLM_NIST_API_KEY"]

Also, we should guard for this being empty and return early with error.

Comment on lines +94 to +97
self.headers = {"ContentType": "application/json", "apiKey": self.nist_api_key}

# Set session attributes for retries
self.session = Session()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always send the same headers so let's set them when we init our Session object.

Suggested change
self.headers = {"ContentType": "application/json", "apiKey": self.nist_api_key}
# Set session attributes for retries
self.session = Session()
headers = {"ContentType": "application/json", "apiKey": self.nist_api_key}
# Set session attributes for retries
self.session = Session()
self.session.headers.update(headers)

Comment on lines +176 to +177
@staticmethod
def create_cpe_software_search_urls(vendor: str, platform: str, version: str) -> list:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this method is needed. We are making call to the external code, we could move it to the caller method.

self.associate_software_to_cve(software.id, matching_dlc_cve.id)
if str(cve_info["modified_date"][0:10]) != str(matching_dlc_cve.last_modified_date):
self.update_cve(matching_dlc_cve, cve_info)
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed.

Suggested change
continue

Comment on lines +161 to +164
def associate_software_to_cve(self, software_id, cve_id):
"""A function to associate software to a CVE."""
cve = CVELCM.objects.get(id=cve_id)
software = SoftwareVersion.objects.get(id=software_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have those objects in the method that's calling this one. Perhaps we could get rid of this method and do the association directly in the calling method.


self.logger.info("Created New CVEs.", extra={"grouping": "CVE Creation"})

def get_cve_info(self, cpe_software_search_urls: list, software_id=None) -> dict:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have SoftwareVersion object in the calling method so it's probably better to use that instead of passing software_id around.

extra={"object": SoftwareVersion.objects.get(id=software_id), "grouping": "CVE Creation"},
)
cve_list = [cve["cve"] for cve in result["vulnerabilities"]]
dlc_cves = [cve.name for cve in CVELCM.objects.all()]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dlc_cves = [cve.name for cve in CVELCM.objects.all()]
dlc_cves = CVELCM.objects.values_list("name", flat=True)

Comment on lines +253 to +264
if cve_list:
for cve in cve_list:
cve_name = cve["id"]
if cve_name.startswith("CVE"):
if cve_name not in dlc_cves:
processed_cve_info["new"].update({cve_name: self.prep_cve_for_dlc(cve)})
else:
processed_cve_info["existing"].update({cve_name: self.prep_cve_for_dlc(cve)})
self.logger.info(
"Prepared %s CVE for creation." % len(processed_cve_info["new"]),
extra={"object": SoftwareVersion.objects.get(id=software_id), "grouping": "CVE Creation"},
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if cve_list:
for cve in cve_list:
cve_name = cve["id"]
if cve_name.startswith("CVE"):
if cve_name not in dlc_cves:
processed_cve_info["new"].update({cve_name: self.prep_cve_for_dlc(cve)})
else:
processed_cve_info["existing"].update({cve_name: self.prep_cve_for_dlc(cve)})
self.logger.info(
"Prepared %s CVE for creation." % len(processed_cve_info["new"]),
extra={"object": SoftwareVersion.objects.get(id=software_id), "grouping": "CVE Creation"},
)
if not cve_list:
return processed_cve_info
for cve in cve_list:
cve_name = cve["id"]
if not cve_name.startswith("CVE"):
continue
if cve_name not in dlc_cves:
processed_cve_info["new"].update({cve_name: self.prep_cve_for_dlc(cve)})
else:
processed_cve_info["existing"].update({cve_name: self.prep_cve_for_dlc(cve)})
self.logger.info(
"Prepared %s CVE for creation." % len(processed_cve_info["new"]),
extra={"object": SoftwareVersion.objects.get(id=software_id), "grouping": "CVE Creation"},
)

dict: Dictionary of returned results if successful.
"""
try:
result = self.session.get(url, headers=self.headers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll set headers when we init session.

Suggested change
result = self.session.get(url, headers=self.headers)
result = self.session.get(url)

"The NIST Service is currently unavailable. Status Code: %s. Try running the job again later.", code
)

return result.json()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need try..except in case JSON decoding fails?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants