Skip to content

Commit

Permalink
used requests for api queries, formatting and removed redundant varia…
Browse files Browse the repository at this point in the history
…bles
  • Loading branch information
nadia31415 committed Feb 19, 2025
1 parent 1470eaa commit 22f60c7
Showing 1 changed file with 46 additions and 89 deletions.
135 changes: 46 additions & 89 deletions dxapp_compliance/dxapp_queries.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import json
import logging
import requests
import os
from datetime import datetime
# Fastcore extends the python standard library to allow for the use of ghapi.
Expand Down Expand Up @@ -909,7 +910,7 @@ def get_latest_commit_date(self, organisation_name, repo_name, token):

def get_security_advisories(self, organisation_name, repo_name, token):
"""
Checks if security advisories have been enabled for the GitHub repo using GitHub CLI.
Checks if security advisories have been enabled for the GitHub repo using GitHub API.
Parameters
----------
Expand All @@ -922,60 +923,43 @@ def get_security_advisories(self, organisation_name, repo_name, token):
Returns
-------
status_code (str):
Status of configuration (e.g., "attached").
dependabot_alerts_status (str):
Dependabot status alerts (e.g., "enabled" or "disabled").
dependabot_alerts_setting (str):
If dependabot alerts are set or not (e.g., "set" or "not_set").
advisories_info (str):
Info on status of dependabot alerts.
"""

# Set GitHub CLI command
cmd = [
"gh", "api",
"-H", "Accept: application/vnd.github+json",
"-H", f"Authorization: token {token}",
"-H", "X-GitHub-Api-Version: 2022-11-28",
f"/repos/{organisation_name}/{repo_name}/code-security-configuration"
]
url = f"https://api.github.com/repos/{organisation_name}/{repo_name}/code-security-configuration"
headers = {
"Accept": "application/vnd.github+json",
"Authorization": f"token {token}",
"X-GitHub-Api-Version": "2022-11-28"
}

try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)

# Parse JSON output
security_config = json.loads(result.stdout)

# Extract statuses
status_code = security_config.get("status")
dependabot_alerts_status = security_config.get("configuration", {}).get("dependabot_alerts", "disabled")
dependabot_alerts_setting = security_config.get("configuration", {}).get("dependabot_security_updates", "not_set")
advisories_info = (
f"Dependabot alerts are {dependabot_alerts_status} and "
f"security updates are {dependabot_alerts_setting}."
)
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()

security_config = response.json()

except subprocess.CalledProcessError as e:
print(f"Error checking security advisories for {repo_name}: {e.stderr}")
status_code = "error"
dependabot_alerts_status = security_config.get("configuration", {}).get("dependabot_alerts")
dependabot_alerts_setting = security_config.get("configuration", {}).get("dependabot_security_updates")

except requests.RequestException as e:
print(f"Error checking security advisories for {repo_name}: {str(e)}")
dependabot_alerts_status = "disabled"
dependabot_alerts_setting = "not_set"
advisories_info = f"Error: {e.stderr}"

except json.JSONDecodeError as e:
print(f"Error parsing JSON response: {str(e)}")
status_code = "error"
dependabot_alerts_status = "disabled"
dependabot_alerts_setting = "not_set"
advisories_info = f"Error parsing JSON response: {str(e)}"

return status_code, dependabot_alerts_status, dependabot_alerts_setting, advisories_info
return dependabot_alerts_status, dependabot_alerts_setting

def check_requirements_file_in_python_app(self, organisation_name, repo_name, token):
"""
Checks if 'requirements.txt' exists in a Python GitHub repo
using GitHub CLI.
Checks if 'requirements.txt' exists in a Python GitHub repo using GitHub API.
Parameters
----------
Expand All @@ -986,75 +970,48 @@ def check_requirements_file_in_python_app(self, organisation_name, repo_name, to
Returns
-------
file_exists (bool): Shows if 'requirements.txt' exists in repo
status_code (int): HTTP status code returned by API.
file_info (str): Information message about file search.
"""

# Check language of repo
lang_cmd = [
"gh", "api",
"-H", "Accept: application/vnd.github+json",
"-H", f"Authorization: token {token}",
f"/repos/{organisation_name}/{repo_name}/languages"
]
headers = {
"Accept": "application/vnd.github+json",
"Authorization": f"token {token}"
}

try:
# Run GitHub CLI to get language for repo
lang_result = subprocess.run(lang_cmd, capture_output=True, text=True)
status_code = lang_result.returncode

if status_code != 200:
return False, status_code, f"Error checking repo language: {lang_result.stderr}"
# Check language of repo
lang_url = f"https://api.github.com/repos/{organisation_name}/{repo_name}/languages"
lang_response = requests.get(lang_url, headers=headers, timeout=10)
lang_response.raise_for_status()

# Get language output
languages = json.loads(lang_result.stdout)
languages = lang_response.json()

# Check Python is main language
# Check if Python is the main language
if not languages or 'Python' not in languages:
return False, 200, "The repository is not primarily a Python application."
return False

# List contents of repo (root directory)
list_cmd = [
"gh", "api",
"-H", "Accept: application/vnd.github+json",
"-H", f"Authorization: token {token}",
f"/repos/{organisation_name}/{repo_name}/contents"
]

list_result = subprocess.run(list_cmd, capture_output=True, text=True)
status_code = list_result.returncode

if status_code == 200:
contents = json.loads(list_result.stdout)

# Search 'requirements.txt' witout case-sensitivity
requirements_files = [
item for item in contents
if item['type'] == 'file' and 'requirements.txt' in item['name'].lower()
]

if requirements_files:
file_exists = True
file_info = f"Found {len(requirements_files)} requirements.txt file(s): {', '.join(file['name'] for file in requirements_files)}"
else:
file_exists = False
file_info = "No requirements.txt file found."
contents_url = f"https://api.github.com/repos/{organisation_name}/{repo_name}/contents"
contents_response = requests.get(contents_url, headers=headers)
contents_response.raise_for_status()

else:
file_exists = False
file_info = f"Error with status code {status_code}: {list_result.stderr}"
contents = contents_response.json()

except subprocess.CalledProcessError as e:
status_code = e.returncode
# Search for 'requirements.txt' without case-sensitivity
file_exists = any(
item['type'] == 'file' and 'requirements.txt' in item['name'].lower()
for item in contents
)

except requests.RequestException as e:
print(f"Error checking file: {str(e)}")
file_exists = False
file_info = f"Error checking file: {e.stderr}"

except json.JSONDecodeError as e:
status_code = 404
print(f"Error parsing API response: {str(e)}")
file_exists = False
file_info = "Error parsing API response."

return file_exists, status_code, file_info
return file_exists


def orchestrate_app_compliance(self, list_apps, list_of_json_contents):
"""
Expand Down Expand Up @@ -1095,7 +1052,7 @@ def orchestrate_app_compliance(self, list_apps, list_of_json_contents):
# Check compliance
df_repo, df_repo_details = self.check_file_compliance(app, dxapp_contents)

# Append additional info
# Append security status and requirements file status
df_repo['dependabot_alerts_status'] = dependabot_alerts_status
df_repo['dependabot_alerts_setting'] = dependabot_alerts_setting
df_repo['requirements_file_exists'] = file_exists
Expand Down

0 comments on commit 22f60c7

Please sign in to comment.