diff --git a/src/__init__.py b/src/__init__.py index a6bc3d1..253f166 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -1,17 +1,7 @@ -# BEGIN """ -This package provides functions for interacting with the Rapid7 InsightVM API. +This package provides functions for interacting with different security tools and their APIs. """ -from .rapid7 import * - -__all__ = [ - "load_r7_platform_api_credentials", - "get_platform_api_headers", - "load_r7_isvm_api_credentials", - "get_isvm_access_token", - "search_asset_isvm", - "get_asset_isvm", - "get_assets_isvm" -] -# END +import src.rapid7 +import src.paloalto +import src.client diff --git a/src/client.py b/src/client.py new file mode 100644 index 0000000..aa5c252 --- /dev/null +++ b/src/client.py @@ -0,0 +1,6 @@ +""" +TODO: Update Summary Docstring +""" + +from src.rapid7 import Rapid7 +from src.paloalto import PaloAlto diff --git a/src/paloalto/__init__.py b/src/paloalto/__init__.py new file mode 100644 index 0000000..d754e3d --- /dev/null +++ b/src/paloalto/__init__.py @@ -0,0 +1 @@ +import src.paloalto.cortex_xdr diff --git a/src/paloalto/cortex_xdr/__init__.py b/src/paloalto/cortex_xdr/__init__.py index 376d764..c559f06 100644 --- a/src/paloalto/cortex_xdr/__init__.py +++ b/src/paloalto/cortex_xdr/__init__.py @@ -1,36 +1,7 @@ """ This module provides functions for authenticating and working with the Palo Alto Cortex XDR API. - -Functions: -- load_xdr_api_credentials: loads the XDR API credentials from a configuration file -- generate_advanced_authentication: generates an advanced authentication token for the XDR API -- unisolate_endpoint: unisolates an endpoint in the XDR API -- get_endpoint_quarantine_status: gets the quarantine status of an endpoint in the XDR API -- quarantine_endpoint: quarantines an endpoint in the XDR API -- unquarantine_endpoint: unquarantines an endpoint in the XDR API -- get_endpoint_network_details: gets the network details of an endpoint in the XDR API """ -from .api_pa_xdr_auth import ( - load_xdr_api_credentials, - generate_advanced_authentication -) - -from .api_pa_xdr import ( - unisolate_endpoint, - get_endpoint_quarantine_status, - quarantine_endpoint, - unquarantine_endpoint, - get_endpoint_network_details, -) +from .api_pa_xdr_auth import * -__all__ = [ - "load_xdr_api_credentials", - "generate_advanced_authentication", - "unisolate_endpoint", - "get_endpoint_quarantine_status", - "quarantine_endpoint", - "unquarantine_endpoint", - "get_endpoint_network_details", - "api_pa_xdr" -] +from .api_pa_xdr import * diff --git a/src/rapid7/api_r7_api.py b/src/rapid7/api_r7_api.py new file mode 100644 index 0000000..72b39e8 --- /dev/null +++ b/src/rapid7/api_r7_api.py @@ -0,0 +1,127 @@ +""" +TODO: Add docstring +""" + +import collections +from typing import Any, Tuple +import logging +import urllib3 +import requests +from src.rapid7.api_r7_auth_class import R7_ISVM_Auth + +# Set up logging +logging.basicConfig(filename="api_r7_api.log", level=logging.ERROR) + + +class R7_ISVM_Api: + def __init__( + self, auth: R7_ISVM_Auth, fqdn: str, api_name: str, timeout: Tuple[int, int] + ) -> None: + self.auth = auth + self.fqdn = fqdn + self.api_name = api_name + self.timeout = timeout + + def _get_api_url(self, call_name: str) -> str: + """ + Returns the API URL. + + Returns: + A string containing the API URL. + """ + return f"{self.auth.isvm_base_url}/api/3/{self.api_name}/{call_name}" + + def _call( + self, + call_name: str, + method: str = "post", + params: dict = None, + json_value: object = None, + header_params=None, + ) -> requests.Response: + """ + Calls the API with the specified parameters. + + Args: + call_name: A string containing the name of the API call to make. + method: A string containing the HTTP method to use for the API call (default is "post"). + params: A dictionary containing the query parameters to include in the API call (default is None). + json_value: An object containing the JSON data to include in the API call (default is None). + header_params: A dictionary containing additional headers to include in the API call (default is None). + + Returns: + A requests.Response object containing the API response. + """ + if header_params is None: + header_params = {} + if params is None: + params = {} + if json_value is None: + json_value = {} + url = self._get_api_url(call_name) + headers = self.auth.get_isvm_encoded_auth_header() + self.extend_dict(headers, header_params) + + return self._execute_call( + url=url, + method=method, + params=params, + json_value=json_value, + headers=headers, + ) + + def _execute_call( + self, + url: str, + method: str, + params: dict = None, + json_value: object = None, + headers: dict = None, + ) -> requests.Response: + """ + Executes the API call. + + Returns: + A requests.Response object containing the API response. + """ + response = None + if method == "get": + response = requests.get( + url, headers=headers, params=params, timeout=self.timeout + ) + elif method == "post": + response = requests.post( + url, headers=headers, json=json_value, timeout=self.timeout + ) + elif method == "put": + response = requests.put( + url, headers=headers, json=json_value, timeout=self.timeout + ) + elif method == "delete": + response = requests.delete(url, headers=headers, timeout=self.timeout) + if response is not None: + response.raise_for_status() + else: + response = requests.Response() + return response + + @staticmethod + def extend_dict(*args): + """ + Extends a dictionary with the key-value pairs from one or more dictionaries. + + Args: + *args: One or more dictionaries to extend. + + Returns: + A dictionary containing the extended key-value pairs. + """ + if args is not None: + if type(args[0]) is collections.OrderedDict: + result = collections.OrderedDict() + else: + result = {} + for arg in args: + result.update(arg) + return result + return {} diff --git a/src/rapid7/api_r7_auth.py b/src/rapid7/api_r7_auth.py index a3e9cc4..10fe84b 100644 --- a/src/rapid7/api_r7_auth.py +++ b/src/rapid7/api_r7_auth.py @@ -69,7 +69,6 @@ def get_platform_api_headers(): return platform_headers - def load_r7_isvm_api_credentials(): """ Loads the Rapid7 InsightVM API credentials from environment variables. @@ -89,37 +88,6 @@ def load_r7_isvm_api_credentials(): return isvm_api_username, isvm_api_password, isvm_base_url -def get_isvm_2fa_access_token(): - """ - Generates an access token for the Rapid7 InsightVM API. - - Returns: - The access token as a string. - """ - isvm_api_username, isvm_api_password, _ = load_r7_isvm_api_credentials() - - # Construct the URL for the access token endpoint - token_url = f"{os.getenv('INSIGHTVM_BASE_URL')}/api/3/access-tokens" - - # Make a POST request to the access token endpoint to generate a new access token - response = requests.post( - token_url, - auth=HTTPBasicAuth(isvm_api_username, isvm_api_password), - headers={"Accept": "application/json"}, - timeout=10, # Add a timeout argument to avoid hanging indefinitely - verify=False # Ignore SSL errors - ) - - # Check if the request was successful - if response.status_code != 201: - logging.error("Failed to generate access token.") - raise ValueError("Failed to generate access token.") - - # Extract the access token from the response - access_token = response.json()["token"] - - return access_token - def get_isvm_basic_auth_header(): """ Returns the Authorization header with the Base64 encoded hash of the username and password. diff --git a/src/rapid7/api_r7_auth_class.py b/src/rapid7/api_r7_auth_class.py new file mode 100644 index 0000000..167c3a3 --- /dev/null +++ b/src/rapid7/api_r7_auth_class.py @@ -0,0 +1,47 @@ +""" +This file contains the R7_ISVM_Auth class for handling authentication with the InsightVM API. +""" +import os +import logging +import base64 +import dotenv + +# Load environment variables from .env file +dotenv.load_dotenv() + +# Set up logging +logging.basicConfig(filename='api_r7_auth.log', level=logging.INFO) + +class R7_ISVM_Auth: + """ + A class for handling authentication with the InsightVM API. + + Attributes: + isvm_api_username (str): The InsightVM API username. + isvm_api_password (str): The InsightVM API password. + isvm_base_url (str): The InsightVM API base URL. + """ + + def __init__(self) -> None: + """ + Initializes the R7Auth class by loading the necessary environment variables and checking for missing credentials. + """ + self.isvm_api_username = os.environ.get('INSIGHTVM_API_USERNAME') + self.isvm_api_password = os.environ.get('INSIGHTVM_API_PASSWORD') + self.isvm_base_url = os.environ.get('INSIGHTVM_BASE_URL') + + if not self.isvm_api_username or not self.isvm_api_password or not self.isvm_base_url: + logging.error("Missing ISVM API credentials or BASE URL. Please check .env file.") + raise ValueError("Missing ISVM API credentials or BASE URL. Please check .env file.") + + def get_isvm_encoded_auth_header(self) -> dict[str, str]: + """ + Returns the Authorization header with the Base64 encoded hash of the username and password. + + Returns: + A dictionary containing the Authorization header. + """ + auth_string = f"{self.isvm_api_username}:{self.isvm_api_password}" + encoded_auth_string = base64.b64encode(auth_string.encode()).decode() + auth_headers = {"Authorization": f"Basic {encoded_auth_string}"} + return auth_headers