-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
187 additions
and
77 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,7 @@ | ||
# BEGIN | ||
""" | ||
This package provides functions for interacting with the Rapid7 InsightVM API. | ||
This package provides functions for interacting with different security tools and their APIs. | ||
""" | ||
|
||
from .rapid7 import * | ||
|
||
__all__ = [ | ||
"load_r7_platform_api_credentials", | ||
"get_platform_api_headers", | ||
"load_r7_isvm_api_credentials", | ||
"get_isvm_access_token", | ||
"search_asset_isvm", | ||
"get_asset_isvm", | ||
"get_assets_isvm" | ||
] | ||
# END | ||
import src.rapid7 | ||
import src.paloalto | ||
import src.client |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
""" | ||
TODO: Update Summary Docstring | ||
""" | ||
|
||
from src.rapid7 import Rapid7 | ||
from src.paloalto import PaloAlto |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
import src.paloalto.cortex_xdr |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,36 +1,7 @@ | ||
""" | ||
This module provides functions for authenticating and working with the Palo Alto Cortex XDR API. | ||
Functions: | ||
- load_xdr_api_credentials: loads the XDR API credentials from a configuration file | ||
- generate_advanced_authentication: generates an advanced authentication token for the XDR API | ||
- unisolate_endpoint: unisolates an endpoint in the XDR API | ||
- get_endpoint_quarantine_status: gets the quarantine status of an endpoint in the XDR API | ||
- quarantine_endpoint: quarantines an endpoint in the XDR API | ||
- unquarantine_endpoint: unquarantines an endpoint in the XDR API | ||
- get_endpoint_network_details: gets the network details of an endpoint in the XDR API | ||
""" | ||
|
||
from .api_pa_xdr_auth import ( | ||
load_xdr_api_credentials, | ||
generate_advanced_authentication | ||
) | ||
|
||
from .api_pa_xdr import ( | ||
unisolate_endpoint, | ||
get_endpoint_quarantine_status, | ||
quarantine_endpoint, | ||
unquarantine_endpoint, | ||
get_endpoint_network_details, | ||
) | ||
from .api_pa_xdr_auth import * | ||
|
||
__all__ = [ | ||
"load_xdr_api_credentials", | ||
"generate_advanced_authentication", | ||
"unisolate_endpoint", | ||
"get_endpoint_quarantine_status", | ||
"quarantine_endpoint", | ||
"unquarantine_endpoint", | ||
"get_endpoint_network_details", | ||
"api_pa_xdr" | ||
] | ||
from .api_pa_xdr import * |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
""" | ||
TODO: Add docstring | ||
""" | ||
|
||
import collections | ||
from typing import Any, Tuple | ||
import logging | ||
import urllib3 | ||
import requests | ||
from src.rapid7.api_r7_auth_class import R7_ISVM_Auth | ||
|
||
# Set up logging | ||
logging.basicConfig(filename="api_r7_api.log", level=logging.ERROR) | ||
|
||
|
||
class R7_ISVM_Api: | ||
def __init__( | ||
self, auth: R7_ISVM_Auth, fqdn: str, api_name: str, timeout: Tuple[int, int] | ||
) -> None: | ||
self.auth = auth | ||
self.fqdn = fqdn | ||
self.api_name = api_name | ||
self.timeout = timeout | ||
|
||
def _get_api_url(self, call_name: str) -> str: | ||
""" | ||
Returns the API URL. | ||
Returns: | ||
A string containing the API URL. | ||
""" | ||
return f"{self.auth.isvm_base_url}/api/3/{self.api_name}/{call_name}" | ||
|
||
def _call( | ||
self, | ||
call_name: str, | ||
method: str = "post", | ||
params: dict = None, | ||
json_value: object = None, | ||
header_params=None, | ||
) -> requests.Response: | ||
""" | ||
Calls the API with the specified parameters. | ||
Args: | ||
call_name: A string containing the name of the API call to make. | ||
method: A string containing the HTTP method to use for the API call (default is "post"). | ||
params: A dictionary containing the query parameters to include in the API call (default is None). | ||
json_value: An object containing the JSON data to include in the API call (default is None). | ||
header_params: A dictionary containing additional headers to include in the API call (default is None). | ||
Returns: | ||
A requests.Response object containing the API response. | ||
""" | ||
if header_params is None: | ||
header_params = {} | ||
if params is None: | ||
params = {} | ||
if json_value is None: | ||
json_value = {} | ||
url = self._get_api_url(call_name) | ||
headers = self.auth.get_isvm_encoded_auth_header() | ||
self.extend_dict(headers, header_params) | ||
|
||
return self._execute_call( | ||
url=url, | ||
method=method, | ||
params=params, | ||
json_value=json_value, | ||
headers=headers, | ||
) | ||
|
||
def _execute_call( | ||
self, | ||
url: str, | ||
method: str, | ||
params: dict = None, | ||
json_value: object = None, | ||
headers: dict = None, | ||
) -> requests.Response: | ||
""" | ||
Executes the API call. | ||
Returns: | ||
A requests.Response object containing the API response. | ||
""" | ||
response = None | ||
if method == "get": | ||
response = requests.get( | ||
url, headers=headers, params=params, timeout=self.timeout | ||
) | ||
elif method == "post": | ||
response = requests.post( | ||
url, headers=headers, json=json_value, timeout=self.timeout | ||
) | ||
elif method == "put": | ||
response = requests.put( | ||
url, headers=headers, json=json_value, timeout=self.timeout | ||
) | ||
elif method == "delete": | ||
response = requests.delete(url, headers=headers, timeout=self.timeout) | ||
if response is not None: | ||
response.raise_for_status() | ||
else: | ||
response = requests.Response() | ||
return response | ||
|
||
@staticmethod | ||
def extend_dict(*args): | ||
""" | ||
Extends a dictionary with the key-value pairs from one or more dictionaries. | ||
Args: | ||
*args: One or more dictionaries to extend. | ||
Returns: | ||
A dictionary containing the extended key-value pairs. | ||
""" | ||
if args is not None: | ||
if type(args[0]) is collections.OrderedDict: | ||
result = collections.OrderedDict() | ||
else: | ||
result = {} | ||
for arg in args: | ||
result.update(arg) | ||
return result | ||
return {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
""" | ||
This file contains the R7_ISVM_Auth class for handling authentication with the InsightVM API. | ||
""" | ||
import os | ||
import logging | ||
import base64 | ||
import dotenv | ||
|
||
# Load environment variables from .env file | ||
dotenv.load_dotenv() | ||
|
||
# Set up logging | ||
logging.basicConfig(filename='api_r7_auth.log', level=logging.INFO) | ||
|
||
class R7_ISVM_Auth: | ||
""" | ||
A class for handling authentication with the InsightVM API. | ||
Attributes: | ||
isvm_api_username (str): The InsightVM API username. | ||
isvm_api_password (str): The InsightVM API password. | ||
isvm_base_url (str): The InsightVM API base URL. | ||
""" | ||
|
||
def __init__(self) -> None: | ||
""" | ||
Initializes the R7Auth class by loading the necessary environment variables and checking for missing credentials. | ||
""" | ||
self.isvm_api_username = os.environ.get('INSIGHTVM_API_USERNAME') | ||
self.isvm_api_password = os.environ.get('INSIGHTVM_API_PASSWORD') | ||
self.isvm_base_url = os.environ.get('INSIGHTVM_BASE_URL') | ||
|
||
if not self.isvm_api_username or not self.isvm_api_password or not self.isvm_base_url: | ||
logging.error("Missing ISVM API credentials or BASE URL. Please check .env file.") | ||
raise ValueError("Missing ISVM API credentials or BASE URL. Please check .env file.") | ||
|
||
def get_isvm_encoded_auth_header(self) -> dict[str, str]: | ||
""" | ||
Returns the Authorization header with the Base64 encoded hash of the username and password. | ||
Returns: | ||
A dictionary containing the Authorization header. | ||
""" | ||
auth_string = f"{self.isvm_api_username}:{self.isvm_api_password}" | ||
encoded_auth_string = base64.b64encode(auth_string.encode()).decode() | ||
auth_headers = {"Authorization": f"Basic {encoded_auth_string}"} | ||
return auth_headers |