Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scan Multiple AWS accounts via AssumeRole #172

Merged
merged 10 commits into from
Apr 7, 2021
47 changes: 47 additions & 0 deletions cloudsplaining/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,50 @@
def change_log_level(log_level):
""""Change log level of module logger"""
logger.setLevel(log_level)


def set_stream_logger(name="cloudsplaining", level=logging.DEBUG, format_string=None): # pylint: disable=redefined-outer-name
"""
Add a stream handler for the given name and level to the logging module.
By default, this logs all cloudsplaining messages to ``stdout``.
>>> import cloudsplaining
>>> cloudsplaining.set_stream_logger('cloudsplaining.scan', logging.INFO)
:type name: string
:param name: Log name
:type level: int
:param level: Logging level, e.g. ``logging.INFO``
:type format_string: str
:param format_string: Log message format
"""
# remove existing handlers. since NullHandler is added by default
handlers = logging.getLogger(name).handlers
for handler in handlers: # pylint: disable=redefined-outer-name
logging.getLogger(name).removeHandler(handler)
if format_string is None:
format_string = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
logger = logging.getLogger(name) # pylint: disable=redefined-outer-name
logger.setLevel(level)
handler = logging.StreamHandler() # pylint: disable=redefined-outer-name
handler.setLevel(level)
formatter = logging.Formatter(format_string) # pylint: disable=redefined-outer-name
handler.setFormatter(formatter)
logger.addHandler(handler)


def set_log_level(verbose):
"""
Set Log Level based on click's count argument.

Default log level to critical; otherwise, set to: warning for -v, info for -vv, debug for -vvv

:param verbose: integer for verbosity count.
:return:
"""
if verbose == 1:
set_stream_logger(level=getattr(logging, "WARNING"))
elif verbose == 2:
set_stream_logger(level=getattr(logging, "INFO"))
elif verbose >= 3:
set_stream_logger(level=getattr(logging, "DEBUG"))
else:
set_stream_logger(level=getattr(logging, "CRITICAL"))
1 change: 1 addition & 0 deletions cloudsplaining/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def cloudsplaining():


cloudsplaining.add_command(command.create_exclusions_file.create_exclusions_file)
cloudsplaining.add_command(command.create_multi_account_config_file.create_multi_account_config_file)
cloudsplaining.add_command(command.expand_policy.expand_policy)
cloudsplaining.add_command(command.scan.scan)
cloudsplaining.add_command(command.scan_policy_file.scan_policy_file)
Expand Down
1 change: 1 addition & 0 deletions cloudsplaining/command/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# pylint: disable=missing-module-docstring
from cloudsplaining.command import create_exclusions_file
from cloudsplaining.command import create_multi_account_config_file
from cloudsplaining.command import expand_policy
from cloudsplaining.command import download
from cloudsplaining.command import scan
Expand Down
56 changes: 56 additions & 0 deletions cloudsplaining/command/create_multi_account_config_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Create YML Template files for the exclusions template command.
This way, users don't have to remember exactly how to phrase the yaml files, since this command creates it for them.
"""
# Copyright (c) 2020, salesforce.com, inc.
# All rights reserved.
# Licensed under the BSD 3-Clause license.
# For full license text, see the LICENSE file in the repo root
# or https://opensource.org/licenses/BSD-3-Clause
import os
from pathlib import Path
import logging
import click
from cloudsplaining.shared.constants import EXCLUSIONS_TEMPLATE
from cloudsplaining import change_log_level

logger = logging.getLogger(__name__)


@click.command(
context_settings=dict(max_content_width=160),
short_help="Creates a YML file to be used for multi-account scanning",
)
@click.option(
"--output-file",
type=click.Path(exists=False),
default=os.path.join(os.getcwd(), "multi-account-config.yml"),
required=True,
help="Relative path to output file where we want to store the multi account config template.",
)
@click.option(
"--verbose",
"-v",
type=click.Choice(
["critical", "error", "warning", "info", "debug"], case_sensitive=False
),
)
def create_multi_account_config_file(output_file, verbose):
"""
Creates a YML file to be used as a multi-account config template, so users can scan many different accounts.
"""
if verbose:
log_level = getattr(logging, verbose.upper())
change_log_level(log_level)

filename = Path(output_file).resolve()
with open(filename, "a") as file_obj:
for line in EXCLUSIONS_TEMPLATE:
file_obj.write(line)
print(f"Multi-account config file written to: {filename}")
print(
"Make sure you edit the multi-account-config.yml file and then run the scan-multi-account command, as shown below."
)
print(
"\tcloudsplaining scan-multi-account --exclusions-file exclusions.yml --config-file multi-account-config.yml -o ./"
)
205 changes: 205 additions & 0 deletions cloudsplaining/command/scan_multi_account.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""Scan multiple AWS accounts via AssumeRole"""
import logging
import os
import json
import yaml
import click
import boto3
from click_option_group import optgroup, RequiredMutuallyExclusiveOptionGroup
from cloudsplaining.shared.constants import EXCLUSIONS_FILE
from cloudsplaining.command.download import get_account_authorization_details
from cloudsplaining import set_log_level
from cloudsplaining.shared.exclusions import Exclusions, DEFAULT_EXCLUSIONS
from cloudsplaining.shared import utils, aws_login
from cloudsplaining.shared.validation import check_authorization_details_schema
from cloudsplaining.scan.authorization_details import AuthorizationDetails
from cloudsplaining.output.report import HTMLReport

logger = logging.getLogger(__name__)


@click.command(
short_help="Scan multiple AWS Accounts using a config file"
)
@click.option(
"--config-file",
"-c",
type=click.Path(exists=True),
required=True,
help="Path of the multi-account config file",
)
@click.option(
"--profile",
"-p",
type=str,
required=False,
help="Specify the AWS IAM profile.",
envvar="AWS_PROFILE"
)
@click.option(
"--role-name",
"-r",
type=str,
required=True,
help="The name of the IAM role to assume in target accounts. Must be the same name in all target accounts."
)
@click.option(
"--exclusions-file",
help="A yaml file containing a list of policy names to exclude from the scan.",
type=click.Path(exists=True),
required=False,
default=EXCLUSIONS_FILE,
)
@optgroup.group(
"Output Target Options",
cls=RequiredMutuallyExclusiveOptionGroup,
help="",
)
@optgroup.option(
"--output-directory",
"-o",
"output_directory",
required=False,
type=click.Path(exists=True),
# default=os.getcwd(),
help="Output directory. Supply this or --bucket.",
)
@optgroup.option(
"--bucket",
"-b",
"save_bucket",
type=str,
help="The S3 bucket to save the results. Supply this or --output-directory."
# TODO: Validate that this
)
@optgroup.group(
"Other Options",
help="",
)
@optgroup.option(
"--write-data-file",
is_flag=True,
required=False,
default=False,
help="Save the cloudsplaining JSON-formatted data results."
)
@click.option(
"-v",
"--verbose",
"verbosity",
count=True,
)
def scan_multi_account(config_file: str, profile: str, role_name: str, exclusions_file: str, output_directory: str, save_bucket: str, write_data_file: bool, verbosity: int):
"""Scan multiple accounts via AssumeRole"""
set_log_level(verbosity)

# Read the config file from the user
multi_account_config = MultiAccountConfig(config_file=config_file, role_name=role_name)

# Get the exclusions file
exclusions = get_exclusions(exclusions_file=exclusions_file)

# TODO: Speed improvements? Multithreading? idk.
for target_account_name, target_account_id in multi_account_config.accounts.items():
results = scan_account(target_account_id=target_account_id, target_role_name=role_name, exclusions=exclusions, profile=profile)
html_report = HTMLReport(
account_id=target_account_id,
account_name=target_account_name,
results=results,
minimize=True,
)
rendered_report = html_report.get_html_report()
if save_bucket:
logger.info("Saving the report to an S3 bucket!")
s3 = boto3.resource('s3')
# Write the HTML file
output_file = f"{target_account_name}.html"
s3.Object(save_bucket, output_file).put(ACL='bucket-owner-full-control', Body=rendered_report)
# Write the JSON data file
if write_data_file:
output_file = f"{target_account_name}.json"
body = json.dumps(
results,
sort_keys=True,
default=str,
indent=4
)
s3.Object(save_bucket, output_file).put(ACL='bucket-owner-full-control', Body=body)
logger.info(f"Wrote results to {save_bucket}/{output_file}")
else:
logger.info("Saving the report to a local folder")
# Write the JSON data file
if write_data_file:
results_data_file = os.path.join(output_directory, f"{target_account_name}.json")
results_data_filepath = utils.write_results_data_file(results, results_data_file)
logger.info(f"Wrote results to {results_data_filepath}")
# Write the HTML file
html_output_file = os.path.join(output_directory, f"{target_account_name}.html")
if os.path.exists(html_output_file):
os.remove(html_output_file)
with open(html_output_file, "w") as f:
f.write(rendered_report)


def scan_account(target_account_id: str, target_role_name: str, exclusions: Exclusions, profile: str = None):
"""Scan a target account in one shot"""
authorization_details = download_account_authorization_details(
target_account_id=target_account_id, target_role_name=target_role_name, profile=profile
)
check_authorization_details_schema(authorization_details)
authorization_details = AuthorizationDetails(authorization_details, exclusions)
results = authorization_details.results
return results


def download_account_authorization_details(target_account_id: str, target_role_name: str, profile: str = None) -> dict:
"""Download the account authorization details from a target account"""
aws_access_key_id, aws_secret_access_key, aws_session_token = aws_login.get_target_account_credentials(
target_account_id=target_account_id,
target_account_role_name=target_role_name,
profile=profile
)
session_data = dict(
region_name="us-east-1",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token
)
include_non_default_policy_versions = False
authorization_details = get_account_authorization_details(session_data, include_non_default_policy_versions)
return authorization_details


class MultiAccountConfig:
"""Handle the YAML file that parses the Multiaccount config"""
def __init__(self, config_file: str, role_name: str):
self.config_file = config_file
self.config = self._config()
self.role_name = role_name
self.accounts = self._accounts()

def _config(self) -> dict:
with open(self.config_file, "r") as yaml_file:
config_cfg = yaml.safe_load(yaml_file)
return config_cfg

def _accounts(self) -> dict:
accounts = self.config.get("accounts", None)
if not accounts:
raise Exception("Please supply a list of accounts in the multi-account config file")
return accounts


def get_exclusions(exclusions_file: str = None) -> Exclusions:
"""Get the exclusions configuration from a file"""
# Get the exclusions configuration
if exclusions_file:
with open(exclusions_file, "r") as yaml_file:
try:
exclusions_cfg = yaml.safe_load(yaml_file)
except yaml.YAMLError as exc:
logger.critical(exc)
exclusions = Exclusions(exclusions_cfg)
else:
exclusions = DEFAULT_EXCLUSIONS
return exclusions
70 changes: 70 additions & 0 deletions cloudsplaining/shared/aws_login.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""AWS Login utilities"""
import os
import logging
import boto3
from botocore.config import Config
logger = logging.getLogger(__name__)


def get_boto3_client(service: str, profile: str = None, region="us-east-1") -> boto3.Session.client:
"""Get a boto3 client for a given service"""
logging.getLogger('botocore').setLevel(logging.CRITICAL)
session_data = {"region_name": region}
if profile:
session_data["profile_name"] = profile
session = boto3.Session(**session_data)

config = Config(connect_timeout=5, retries={"max_attempts": 10})
if os.environ.get('LOCALSTACK_ENDPOINT_URL'):
client = session.client(service, config=config, endpoint_url=os.environ.get('LOCALSTACK_ENDPOINT_URL'))
else:
client = session.client(service, config=config, endpoint_url=os.environ.get('LOCALSTACK_ENDPOINT_URL'))
logger.debug(f"{client.meta.endpoint_url} in {client.meta.region_name}: boto3 client login successful")
return client


def get_current_account_id(sts_client: boto3.Session.client) -> str:
"""Get the current account ID"""
response = sts_client.get_caller_identity()
current_account_id = response.get("Account")
return current_account_id


def get_available_regions(service: str):
"""AWS exposes their list of regions as an API. Gather the list."""
regions = boto3.session.Session().get_available_regions(service)
logger.debug("The service %s does not have available regions. Returning us-east-1 as default")
if not regions:
regions = ["us-east-1"]
return regions


def get_target_account_credentials(target_account_role_name: str, target_account_id: str,
role_session_name: str = "HotDogsAreSandwiches", profile: str = None):
"""
Get a boto3 client for a given AWS service

:param profile:
:param role_session_name: AssumeRole session name
:param target_account_role_name: The name of the target account role
:param target_account_id: The target account ID
:return:
"""
default_region = "us-east-1"
session_data = {"region_name": default_region}
if profile:
session_data["profile_name"] = profile
session = boto3.Session(**session_data)
config = Config(connect_timeout=5, retries={"max_attempts": 10})
sts_client = session.client('sts', config=config)

acct_b = sts_client.assume_role(
RoleArn=f"arn:aws:iam::{target_account_role_name}:role/{target_account_id}",
RoleSessionName=role_session_name
)

aws_access_key_id = acct_b['Credentials']['AccessKeyId']
aws_secret_access_key = acct_b['Credentials']['SecretAccessKey']
aws_session_token = acct_b['Credentials']['SessionToken']

return aws_access_key_id, aws_secret_access_key, aws_session_token
Loading