From 50d54ab352a1610244e0337761375262d1a9d767 Mon Sep 17 00:00:00 2001 From: Andrey Fedorov Date: Sun, 1 Dec 2019 16:03:37 -0800 Subject: [PATCH 1/8] [dc] refactor and debug aws collect --- migrations/v1_9_0-v1_9_1.md | 49 + src/connectors/aws_collect.py | 1674 ++++++++++------------ src/connectors/tests/test_aws_collect.py | 142 ++ src/connectors/utils.py | 4 + src/pytest.ini | 2 +- src/setup.py | 5 +- 6 files changed, 941 insertions(+), 935 deletions(-) create mode 100644 src/connectors/tests/test_aws_collect.py diff --git a/migrations/v1_9_0-v1_9_1.md b/migrations/v1_9_0-v1_9_1.md index cacfb9d7a..99e0e674d 100644 --- a/migrations/v1_9_0-v1_9_1.md +++ b/migrations/v1_9_0-v1_9_1.md @@ -14,3 +14,52 @@ AS ' ' ; ~~~ + +## Update AWS Collect landing tables + +Note: the below commands refer to default table names, please add the custom connection name if you set it. + +There were more fixes to this connector than we anticipated, so we recommend simply re-creating these connections. +Please reach out if you're not sure how to go about this. If you absolutely must retain the data that was collected +earlier, the following migration should help: + +~~~ +ALTER TABLE data.aws_collect_grants + RENAME TO aws_collect_iam_get_credential_report +; +ALTER TABLE data.aws_collect_ec2_describe_instances + ADD COLUMN requester_id STRING +; +ALTER TABLE data.aws_collect_iam_list_users + ADD COLUMN tags VARIANT +; +ALTER TABLE data.aws_collect_iam_list_users + ADD COLUMN permissions_boundary VARIANT +; +ALTER TABLE data.aws_collect_iam_list_policies + ADD COLUMN description STRING +; +ALTER TABLE data.aws_collect_s3_get_bucket_acl + DROP COLUMN owner +; +ALTER TABLE data.aws_collect_iam_list_policies + RENAME COLUMN arn TO policy_arn +; +ALTER TABLE data.aws_collect_iam_list_virtual_mfa_devices + ADD COLUMN enable_date TIMESTAMP_NTZ +; +ALTER TABLE data.aws_collect_cloudtrail_describe_trails + ADD COLUMN has_insight_selectors BOOLEAN +; +ALTER TABLE data.aws_collect_cloudtrail_get_event_selectors + ADD COLUMN exclude_management_event_sources VARIANT +; +ALTER TABLE data.aws_collect_kms_get_key_rotation_status + ADD COLUMN key_arn STRING +; +ALTER TABLE data.aws_collect_s3_get_bucket_policy + ADD COLUMN policy_json_parsed VARIANT +; +~~~ + +This does not migrate some of the table column types, but should get you to a working state. diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index 1fc534077..180104294 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -2,18 +2,21 @@ Load Inventory and Configuration of all accounts in your Org using auditor Roles """ -from botocore.exceptions import ClientError +import asyncio +import aioboto3 +from botocore.exceptions import BotoCoreError, ClientError, DataNotFoundError +from collections import defaultdict, namedtuple import csv from dateutil.parser import parse as parse_date +import json import fire import io -from typing import Dict, List, Generator +from typing import Tuple, List, Generator from runners.helpers.dbconfig import ROLE as SA_ROLE -from connectors.utils import sts_assume_role, qmap_mp, updated, yaml_dump +from connectors.utils import sts_assume_role, qmap_mp, updated, yaml_dump, bytes_to_str from runners.helpers import db, log -from runners.utils import groups_of AUDIT_ASSUMER_ARN = 'arn:aws:iam::1234567890987:role/audit-assumer' @@ -55,412 +58,46 @@ }, ] -AWS_API_METHODS = { - 'organizations.list_accounts': { - 'response': { - 'Accounts': { - 'Id': 'id', - 'Arn': 'arn', - 'Email': 'email', - 'Name': 'name', - 'Status': 'status', - 'JoinedMethod': 'joined_method', - 'JoinedTimestamp': 'joined_timestamp', - } - } - }, - 'ec2.describe_instances': { - 'response': { - 'Reservations': { - 'Groups': 'groups', - 'Instances': 'instances', - 'OwnerId': 'owner_id', - 'ReservationId': 'reservation_id', - } - } - }, - 'ec2.describe_security_groups': { - 'response': { - 'SecurityGroups': { - 'Description': 'description', - 'GroupName': 'group_name', - 'IpPermissions': 'ip_permissions', - 'OwnerId': 'owner_id', - 'GroupId': 'group_id', - 'IpPermissionsEgress': 'ip_permissions_egress', - 'Tags': 'tags', - 'VpcId': 'vpc_id', - } - } - }, - 'config.describe_configuration_recorders': { - 'response': { - 'ConfigurationRecorders': { - 'name': 'name', - 'roleARN': 'role_arn', - 'recordingGroup': 'recording_group', - } - } - }, - 'kms.list_keys': { - 'response': {'Keys': {'KeyId': 'key_id', 'KeyArn': 'key_arn'}}, - 'children': [ - {'method': 'kms.get_key_rotation_status', 'params': {'KeyId': 'key_id'}} - ], - }, - 'kms.get_key_rotation_status': { - 'response': {'KeyRotationEnabled': 'key_rotation_enabled'} - }, - 'cloudtrail.get_event_selectors': { - 'response': { - 'TrailARN': 'trail_arn', - 'EventSelectors': { - 'ReadWriteType': 'read_write_type', - 'IncludeManagementEvents': 'include_management_events', - 'DataResources': 'data_resources', - }, - } - }, - 'cloudtrail.describe_trails': { - 'response': { - 'trailList': { - 'Name': 'name', - 'S3BucketName': 's3_bucket_name', - 'S3KeyPrefix': 's3_key_prefix', - 'SnsTopicName': 'sns_topic_name', - 'SnsTopicARN': 'sns_topic_arn', - 'IncludeGlobalServiceEvents': 'include_global_service_events', - 'IsMultiRegionTrail': 'is_multi_region_trail', - 'HomeRegion': 'home_region', - 'TrailARN': 'trail_arn', - 'LogFileValidationEnabled': 'log_file_validation_enabled', - 'CloudWatchLogsLogGroupArn': 'cloud_watch_logs_log_group_arn', - 'CloudWatchLogsRoleArn': 'cloud_watch_logs_role_arn', - 'KmsKeyId': 'kms_key_id', - 'HasCustomEventSelectors': 'has_custom_event_selectors', - 'IsOrganizationTrail': 'is_organization_trail', - } - }, - 'children': [ - {'method': 'cloudtrail.get_trail_status', 'params': {'Name': 'trail_arn'}}, - { - 'method': 'cloudtrail.get_event_selectors', - 'params': {'TrailName': 'trail_arn'}, - }, - ], - }, - 'cloudtrail.get_trail_status': { - 'response': { - 'IsLogging': 'is_logging', - 'LatestDeliveryError': 'latest_delivery_error', - 'LatestNotificationError': 'latest_notification_error', - 'LatestDeliveryTime': 'latest_delivery_time', - 'LatestNotificationTime': 'latest_notification_time', - 'StartLoggingTime': 'start_logging_time', - 'StopLoggingTime': 'stop_logging_time', - 'LatestCloudWatchLogsDeliveryError': 'latest_cloud_watch_logs_delivery_error', - 'LatestCloudWatchLogsDeliveryTime': 'latest_cloud_watch_logs_delivery_time', - 'LatestDigestDeliveryTime': 'latest_digest_delivery_time', - 'LatestDigestDeliveryError': 'latest_digest_delivery_error', - 'LatestDeliveryAttemptTime': 'latest_delivery_attempt_time', - 'LatestNotificationAttemptTime': 'latest_notification_attempt_time', - 'LatestNotificationAttemptSucceeded': 'latest_notification_attempt_succeeded', - 'LatestDeliveryAttemptSucceeded': 'latest_delivery_attempt_succeeded', - 'TimeLoggingStarted': 'time_logging_started', - 'TimeLoggingStopped': 'time_logging_stopped', - } - }, - 'iam.list_users': { - 'response': { - 'Users': { - 'Arn': 'arn', - 'Path': 'path', - 'CreateDate': 'create_date', - 'UserId': 'user_id', - 'UserName': 'user_name', - 'PasswordLastUsed': 'password_last_used', - } - }, - 'children': [ - { - 'methods': [ - 'iam.list_groups_for_user', - 'iam.list_access_keys', - 'iam.get_login_profile', - 'iam.list_mfa_devices', - 'iam.list_user_policies', - 'iam.list_attached_user_policies', - ], - 'params': {'UserName': 'user_name'}, - } - ], - }, - 'iam.list_groups_for_user': { - 'response': { - 'Groups': { - 'Arn': 'arn', - 'Path': 'path', - 'UserName': 'user_name', - 'CreateDate': 'create_date', - 'GroupId': 'group_id', - 'GroupName': 'group_name', - } - } - }, - 'iam.list_policies': { - 'response': { - 'Policies': { - 'Arn': 'arn', - 'Path': 'path', - 'PolicyName': 'policy_name', - 'CreateDate': 'create_date', - 'UpdateDate': 'update_date', - 'AttachmentCount': 'attachment_count', - 'IsAttachable': 'is_attachable', - 'PolicyId': 'policy_id', - 'DefaultVersionId': 'default_version_id', - 'PermissionsBoundaryUsageCount': 'permissions_boundary_usage_count', - } - }, - 'children': [ - { - 'method': 'iam.get_policy_version', - 'params': {'PolicyArn': 'arn', 'VersionId': 'default_version_id'}, - }, - {'method': 'iam.list_entities_for_policy', 'params': {'PolicyArn': 'arn'}}, - ], - }, - 'iam.list_access_keys': { - 'response': { - 'AccessKeyMetadata': { - 'CreateDate': 'create_date', - 'UserName': 'user_name', - 'Status': 'status', - 'AccessKeyId': 'access_key_id', - } - } - }, - 'iam.get_login_profile': { - 'response': { - 'LoginProfile': { - 'UserName': 'user_name', - 'CreateDate': 'create_date', - 'PasswordResetRequired': 'password_reset_required', - } - } - }, - 'iam.list_mfa_devices': { - 'response': { - 'MFADevices': { - 'UserName': 'user_name', - 'SerialNumber': 'serial_number', - 'EnableDate': 'enable_date', - } - } - }, - 'iam.list_attached_user_policies': { - 'response': { - 'AttachedPolicies': { - 'UserName': 'user_name', - 'PolicyName': 'policy_name', - 'PolicyArn': 'policy_arn', - } - } - }, - 'iam.list_user_policies': { - 'response': { - 'PolicyNames': { - 'AccountID': 'account_id', - 'UserName': 'user_name', - 'PolicyName': 'policy_name', - } - } - }, - 'iam.list_account_aliases': { - 'response': {'AccountAliases': {'AccountAliase': 'account_alias'}} - }, - 'iam.get_account_password_policy': { - 'response': { - 'PasswordPolicy': { - 'AllowUsersToChangePassword': 'allow_users_to_change_password', - 'RequireLowercaseCharacters': 'require_lowercase_characters', - 'RequireUppercaseCharacters': 'require_uppercase_characters', - 'MinimumPasswordLength': 'minimum_password_length', - 'MaxPasswordAge': 'max_password_age', - 'PasswordReusePrevention': 'password_reuse_prevention', - 'RequireNumbers': 'require_numbers', - 'RequireSymbols': 'require_symbols', - 'HardExpiry': 'hard_expiry', - 'ExpirePasswords': 'expire_passwords', - } - } - }, - 'iam.generate_credential_report': { - 'response': {'State': 'state', 'Description': 'description'} - }, - 'iam.get_credential_report': { - 'response': { - 'Content': ('csv', 'content'), - 'ReportFormat': 'report_format', - 'GeneratedTime': 'generated_time', - } - }, - 'iam.list_virtual_mfa_devices': { - 'response': { - 'VirtualMFADevices': { - 'SerialNumber': 'serial_number', - 'Base32StringSeed': 'base32_string_seed', - 'QRCodePNG': 'qr_code_png', - 'User': 'user', - } - } - }, - 'iam.get_account_summary': { - 'response': { - 'SummaryMap': { - 'UsersQuota': 'users_quota', - 'GroupsPerUserQuota': 'groups_per_user_quota', - 'AttachedPoliciesPerGroupQuota': 'attached_policies_per_group_quota', - 'PoliciesQuota': 'policies_quota', - 'GroupsQuota': 'groups_quota', - 'InstanceProfiles': 'instance_profiles', - 'SigningCertificatesPerUserQuota': 'signing_certificates_per_user_quota', - 'PolicySizeQuota': 'policy_size_quota', - 'PolicyVersionsInUseQuota': 'policy_versions_in_use_quota', - 'RolePolicySizeQuota': 'role_policy_size_quota', - 'AccountSigningCertificatesPresent': 'account_signing_certificates_present', - 'Users': 'users', - 'ServerCertificatesQuota': 'server_certificates_quota', - 'ServerCertificates': 'server_certificates', - 'AssumeRolePolicySizeQuota': 'assume_role_policy_size_quota', - 'Groups': 'groups', - 'MFADevicesInUse': 'mfa_devices_in_use', - 'RolesQuota': 'roles_quota', - 'VersionsPerPolicyQuota': 'versions_per_policy_quota', - 'AccountAccessKeysPresent': 'account_access_keys_present', - 'Roles': 'roles', - 'AccountMFAEnabled': 'account_mfa_enabled', - 'MFADevices': 'mfa_devices', - 'Policies': 'policies', - 'GroupPolicySizeQuota': 'group_policy_size_quota', - 'InstanceProfilesQuota': 'instance_profiles_quota', - 'AccessKeysPerUserQuota': 'access_keys_per_user_quota', - 'AttachedPoliciesPerRoleQuota': 'attached_policies_per_role_quota', - 'PolicyVersionsInUse': 'policy_versions_in_use', - 'Providers': 'providers', - 'AttachedPoliciesPerUserQuota': 'attached_policies_per_user_quota', - 'UserPolicySizeQuota': 'user_policy_size_quota', - 'GlobalEndpointTokenVersion': 'global_endpoint_token_version', - } - } - }, - 's3.list_buckets': { - 'response': { - 'Buckets': {'Name': 'bucket_name', 'CreationDate': 'bucket_creation_date'}, - 'Owner': {'DisplayName': 'owner_display_name', 'ID': 'owner_id'}, - }, - 'children': [ - { - 'methods': [ - 's3.get_bucket_acl', - 's3.get_bucket_policy', - 's3.get_bucket_logging', - ], - 'params': {'Bucket': 'bucket_name'}, - } - ], - }, - 's3.get_bucket_acl': { - 'response': { - 'Owner': {'DisplayName': 'owner_display_name', 'ID': 'owner_id'}, - 'Grants': {'Grantee': 'grants_grantee', 'Permission': 'grants_permission'}, - } - }, - 's3.get_bucket_policy': {'response': {'Policy': 'policy'}}, - 's3.get_bucket_logging': { - 'response': { - 'LoggingEnabled': { - 'TargetBucket': 'target_bucket', - 'TargetGrants': 'target_grants', - 'TargetPrefix': 'target_prefix', - } - } - }, - 'iam.list_entities_for_policy': { - 'response': { - 'PolicyGroups': { - 'PolicyArn': 'policy_arn', - 'GroupName': 'group_name', - 'GroupId': 'group_id', - }, - 'PolicyUsers': { - 'PolicyArn': 'policy_arn', - 'UserName': 'user_name', - 'UserId': 'user_id', - }, - 'PolicyRoles': { - 'PolicyArn': 'policy_arn', - 'RoleName': 'role_name', - 'RoleId': 'role_id', - }, - } - }, - 'iam.get_policy_version': { - 'response': { - 'PolicyVersion': { - 'PolicyArn': 'policy_arn', - 'VersionId': 'version_id', - 'CreateDate': 'create_date', - 'Document': 'document', - 'IsDefaultVersion': 'is_default_version', - } - } - }, +CollectTask = namedtuple('CollectTask', ['account_id', 'method', 'args']) +DBEntry = namedtuple('DBEntry', ['entity']) + +ParsedCol = namedtuple('ParsedCol', ['type', 'colname', 'parsed_colname']) + +PARSERS = { + 'csv': lambda v: [dict(x) for x in csv.DictReader(io.StringIO(v))], + 'json': json.loads, } + LANDING_TABLE_COLUMNS = [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('id', 'VARCHAR'), - ('arn', 'VARCHAR'), - ('email', 'VARCHAR'), - ('name', 'VARCHAR'), - ('status', 'VARCHAR'), - ('joined_method', 'VARCHAR'), + ('recorded_at', 'TIMESTAMP_LTZ'), + ('id', 'STRING'), + ('arn', 'STRING'), + ('email', 'STRING'), + ('name', 'STRING'), + ('status', 'STRING'), + ('joined_method', 'STRING'), ('joined_timestamp', 'TIMESTAMP_NTZ'), ] SUPPLEMENTARY_TABLES = { + # https://docs.aws.amazon.com/cli/latest/reference/iam/generate-credential-report.html#output 'iam_generate_credential_report': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('state', 'VARCHAR'), - ('description', 'VARCHAR'), - ], - 'iam_get_account_password_policy': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('allow_users_to_change_password', 'VARCHAR'), - ('require_lowercase_characters', 'VARCHAR'), - ('require_uppercase_characters', 'VARCHAR'), - ('minimum_password_length', 'NUMBER'), - ('password_reuse_prevention', 'NUMBER'), - ('max_password_age', 'NUMBER'), - ('require_numbers', 'VARCHAR'), - ('require_symbols', 'VARCHAR'), - ('hard_expiry', 'VARCHAR'), - ('expire_passwords', 'VARCHAR'), + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('state', 'STRING'), + ('description', 'STRING'), ], - 'iam_list_access_keys': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('user_name', 'VARCHAR'), - ('status', 'VARCHAR'), - ('create_date', 'TIMESTAMP_NTZ'), - ('access_key_id', 'VARCHAR'), + # https://docs.aws.amazon.com/cli/latest/reference/iam/list-account-aliases.html#output + 'iam_list_account_aliases': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('account_alias', 'STRING'), ], + # https://docs.aws.amazon.com/cli/latest/reference/iam/get-account-summary.html#output 'iam_get_account_summary': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), ('users_quota', 'NUMBER'), ('groups_per_user_quota', 'NUMBER'), ('attached_policies_per_group_quota', 'NUMBER'), @@ -495,157 +132,226 @@ ('user_policy_size_quota', 'NUMBER'), ('global_endpoint_token_version', 'NUMBER'), ], - 'GRANTS': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('generated_time', 'VARCHAR'), - ('report_format', 'VARCHAR'), - ('content', 'VARCHAR'), + # https://docs.aws.amazon.com/cli/latest/reference/iam/get-account-password-policy.html#output + 'iam_get_account_password_policy': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('minimum_password_length', 'NUMBER'), + ('require_symbols', 'BOOLEAN'), + ('require_numbers', 'BOOLEAN'), + ('require_uppercase_characters', 'BOOLEAN'), + ('require_lowercase_characters', 'BOOLEAN'), + ('allow_users_to_change_password', 'BOOLEAN'), + ('expire_passwords', 'BOOLEAN'), + ('max_password_age', 'NUMBER'), + ('password_reuse_prevention', 'NUMBER'), + ('hard_expiry', 'BOOLEAN'), + ], + # https://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#output + 'ec2_describe_instances': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('groups', 'VARIANT'), + ('instances', 'VARIANT'), + ('owner_id', 'STRING'), + ('requester_id', 'STRING'), + ('reservation_id', 'STRING'), + ], + # https://docs.aws.amazon.com/cli/latest/reference/ec2/describe-security-groups.html#output + 'ec2_describe_security_groups': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('description', 'STRING'), + ('group_name', 'STRING'), + ('ip_permissions', 'VARIANT'), + ('owner_id', 'STRING'), + ('group_id', 'STRING'), + ('ip_permissions_egress', 'VARIANT'), + ('tags', 'VARIANT'), + ('vpc_id', 'STRING'), + ], + # https://docs.aws.amazon.com/cli/latest/reference/configservice/describe-configuration-recorders.html#output + 'config_describe_configuration_recorders': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('name', 'STRING'), + ('role_arn', 'STRING'), + ('recording_group', 'VARIANT'), + ], + # https://docs.aws.amazon.com/cli/latest/reference/iam/get-credential-report.html#output + 'iam_get_credential_report': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('generated_time', 'TIMESTAMP_LTZ'), + ('report_format', 'STRING'), + ('content', 'STRING'), ('content_csv_parsed', 'VARIANT'), ], - 'iam_get_login_profile': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('user_name', 'VARCHAR'), - ('create_date', 'TIMESTAMP_NTZ'), - ('password_reset_required', 'VARCHAR'), + # https://docs.aws.amazon.com/cli/latest/reference/kms/list-keys.html#output + 'kms_list_keys': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('key_id', 'STRING'), + ('key_arn', 'STRING'), ], - 'iam_get_policy_version': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('policy_arn', 'VARCHAR'), - ('version_id', 'VARCHAR'), - ('create_date', 'TIMESTAMP_NTZ'), - ('document', 'VARIANT'), - ('is_default_version', 'VARCHAR'), + # https://docs.aws.amazon.com/cli/latest/reference/kms/get-key-rotation-status.html#output + 'kms_get_key_rotation_status': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('key_arn', 'STRING'), + ('key_rotation_enabled', 'BOOLEAN'), ], - 'iam_list_access_keys': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('user_name', 'VARCHAR'), - ('status', 'VARCHAR'), - ('create_date', 'TIMESTAMP_NTZ'), - ('access_key_id', 'VARCHAR'), + # https://docs.aws.amazon.com/cli/latest/reference/iam/list-users.html#output + 'iam_list_users': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('path', 'STRING'), + ('user_name', 'STRING'), + ('user_id', 'STRING'), + ('arn', 'STRING'), + ('create_date', 'TIMESTAMP_LTZ'), + ('password_last_used', 'TIMESTAMP_LTZ'), + ('permissions_boundary', 'VARIANT'), + ('tags', 'VARIANT'), ], - 'iam_list_attached_user_policies': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('user_name', 'VARCHAR'), - ('policy_name', 'VARCHAR'), - ('policy_arn', 'VARCHAR'), + # https://docs.aws.amazon.com/cli/latest/reference/iam/get-login-profile.html#output + 'iam_get_login_profile': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('user_name', 'STRING'), + ('create_date', 'TIMESTAMP_LTZ'), + ('password_reset_required', 'BOOLEAN'), ], - 'iam_list_entities_for_policy': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('policy_arn', 'VARCHAR'), - ('group_id', 'VARCHAR'), - ('group_name', 'VARCHAR'), - ('user_id', 'VARCHAR'), - ('user_name', 'VARCHAR'), - ('role_id', 'VARCHAR'), - ('role_name', 'VARCHAR'), + # https://docs.aws.amazon.com/cli/latest/reference/iam/list-mfa-devices.html#output + 'iam_list_mfa_devices': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('user_name', 'STRING'), + ('serial_number', 'STRING'), + ('enable_date', 'TIMESTAMP_LTZ'), + ], + # https://docs.aws.amazon.com/cli/latest/reference/iam/list-access-keys.html#output + 'iam_list_access_keys': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('user_name', 'STRING'), + ('access_key_id', 'STRING'), + ('status', 'STRING'), + ('create_date', 'TIMESTAMP_LTZ'), ], + # https://docs.aws.amazon.com/cli/latest/reference/iam/list-groups-for-user.html#output 'iam_list_groups_for_user': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('user_name', 'VARCHAR'), - ('path', 'VARCHAR'), - ('create_date', 'TIMESTAMP_NTZ'), - ('group_id', 'VARCHAR'), - ('arn', 'VARCHAR'), - ('group_name', 'VARCHAR'), + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('user_name', 'STRING'), + ('path', 'STRING'), + ('group_name', 'STRING'), + ('group_id', 'STRING'), + ('arn', 'STRING'), + ('create_date', 'TIMESTAMP_LTZ'), ], - 'iam_list_mfa_devices': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('user_name', 'VARCHAR'), - ('serial_number', 'VARCHAR'), - ('enable_date', 'TIMESTAMP_NTZ'), + # https://docs.aws.amazon.com/cli/latest/reference/iam/list-user-policies.html#output + 'iam_list_user_policies': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('user_name', 'STRING'), + ('policy_name', 'STRING'), + ], + # https://docs.aws.amazon.com/cli/latest/reference/iam/list-attached-user-policies.html#output + 'iam_list_attached_user_policies': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('user_name', 'STRING'), + ('policy_name', 'STRING'), + ('policy_arn', 'STRING'), ], + # https://docs.aws.amazon.com/cli/latest/reference/iam/list-policies.html#output 'iam_list_policies': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('policy_name', 'VARCHAR'), - ('create_date', 'TIMESTAMP_NTZ'), + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('policy_name', 'STRING'), + ('policy_id', 'STRING'), + ('arn', 'STRING'), + ('path', 'STRING'), + ('default_version_id', 'STRING'), ('attachment_count', 'NUMBER'), - ('is_attachable', 'VARCHAR'), - ('policy_id', 'VARCHAR'), - ('default_version_id', 'VARCHAR'), - ('path', 'VARCHAR'), - ('arn', 'VARCHAR'), - ('update_date', 'TIMESTAMP_NTZ'), ('permissions_boundary_usage_count', 'NUMBER'), + ('is_attachable', 'BOOLEAN'), + ('description', 'STRING'), + ('create_date', 'TIMESTAMP_LTZ'), + ('update_date', 'TIMESTAMP_LTZ'), ], - 'iam_list_user_policies': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('user_name', 'VARCHAR'), - ('policy_name', 'VARCHAR'), + # https://docs.aws.amazon.com/cli/latest/reference/iam/get-policy-version.html#output + 'iam_get_policy_version': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('policy_arn', 'STRING'), + ('document', 'STRING'), + ('version_id', 'STRING'), + ('is_default_version', 'BOOLEAN'), + ('create_date', 'TIMESTAMP_LTZ'), ], - 'iam_list_users': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('user_name', 'VARCHAR'), - ('password_last_used', 'TIMESTAMP_NTZ'), - ('create_date', 'TIMESTAMP_NTZ'), - ('user_id', 'VARCHAR'), - ('path', 'VARCHAR'), - ('arn', 'VARCHAR'), + # https://docs.aws.amazon.com/cli/latest/reference/iam/list-entities-for-policy.html#output + 'iam_list_entities_for_policy': [ + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('policy_arn', 'STRING'), + ('group_id', 'STRING'), + ('group_name', 'STRING'), + ('user_id', 'STRING'), + ('user_name', 'STRING'), + ('role_id', 'STRING'), + ('role_name', 'STRING'), ], + # https://docs.aws.amazon.com/cli/latest/reference/iam/list-virtual-mfa-devices.html#output 'iam_list_virtual_mfa_devices': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('serial_number', 'VARCHAR'), - ('base32_string_seed', 'VARCHAR'), - ('qr_code_png', 'VARCHAR'), + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('serial_number', 'STRING'), + ('base32_string_seed', 'STRING'), + ('qr_code_png', 'STRING'), ('user', 'VARIANT'), + ('enable_date', 'TIMESTAMP_LTZ'), ], + # https://docs.aws.amazon.com/cli/latest/reference/s3api/list-buckets.html#output 's3_list_buckets': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('bucket_name', 'VARCHAR'), - ('bucket_creation_date', 'VARCHAR'), - ('owner_display_name', 'VARCHAR'), - ('owner_id', 'VARCHAR'), + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('bucket_name', 'STRING'), + ('bucket_creation_date', 'TIMESTAMP_LTZ'), + ('owner_display_name', 'STRING'), + ('owner_id', 'STRING'), ], + # https://docs.aws.amazon.com/cli/latest/reference/s3api/get-bucket-acl.html#output 's3_get_bucket_acl': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'VARCHAR'), - ('bucket', 'VARCHAR'), - ('owner', 'VARCHAR'), - ('grants_grantee', 'VARIANT'), - ('grants_permission', 'VARCHAR'), - ('owner_display_name', 'VARCHAR'), - ('owner_id', 'VARCHAR'), + ('recorded_at', 'TIMESTAMP_LTZ'), + ('account_id', 'STRING'), + ('bucket', 'STRING'), + ('grants_grantee', 'STRING'), + ('grants_permission', 'STRING'), + ('owner_display_name', 'STRING'), + ('owner_id', 'STRING'), ], + # https://docs.aws.amazon.com/cli/latest/reference/s3api/get-bucket-policy.html#output 's3_get_bucket_policy': [ - ('recorded_at', 'TIMESTAMP_NTZ'), + ('recorded_at', 'TIMESTAMP_LTZ'), ('account_id', 'STRING'), ('bucket', 'STRING'), ('policy', 'STRING'), + ('policy_json_parsed', 'VARIANT'), ], + # https://docs.aws.amazon.com/cli/latest/reference/s3api/get-bucket-logging.html#output 's3_get_bucket_logging': [ - ('recorded_at', 'TIMESTAMP_NTZ'), + ('recorded_at', 'TIMESTAMP_LTZ'), ('account_id', 'STRING'), ('bucket', 'STRING'), ('target_bucket', 'STRING'), ('target_grants', 'VARIANT'), ('target_prefix', 'STRING'), ], - 'ec2_describe_security_groups': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'STRING'), - ('description', 'STRING'), - ('group_name', 'STRING'), - ('ip_permissions', 'VARIANT'), - ('owner_id', 'STRING'), - ('group_id', 'STRING'), - ('ip_permissions_egress', 'VARIANT'), - ('tags', 'VARIANT'), - ('vpc_id', 'STRING'), - ], + # https://docs.aws.amazon.com/cli/latest/reference/cloudtrail/describe-trails.html#output 'cloudtrail_describe_trails': [ - ('recorded_at', 'TIMESTAMP_NTZ'), + ('recorded_at', 'TIMESTAMP_LTZ'), ('account_id', 'STRING'), ('name', 'STRING'), ('s3_bucket_name', 'STRING'), @@ -661,22 +367,24 @@ ('cloud_watch_logs_role_arn', 'STRING'), ('kms_key_id', 'STRING'), ('has_custom_event_selectors', 'BOOLEAN'), + ('has_insight_selectors', 'BOOLEAN'), ('is_organization_trail', 'BOOLEAN'), ], + # https://docs.aws.amazon.com/cli/latest/reference/cloudtrail/get-trail-status.html#output 'cloudtrail_get_trail_status': [ - ('recorded_at', 'TIMESTAMP_NTZ'), + ('recorded_at', 'TIMESTAMP_LTZ'), ('account_id', 'STRING'), ('trail_arn', 'STRING'), ('is_logging', 'BOOLEAN'), ('latest_delivery_error', 'STRING'), ('latest_notification_error', 'STRING'), - ('latest_delivery_time', 'TIMESTAMP'), - ('latest_notification_time', 'TIMESTAMP'), - ('start_logging_time', 'TIMESTAMP'), - ('stop_logging_time', 'TIMESTAMP'), + ('latest_delivery_time', 'TIMESTAMP_NTZ'), + ('latest_notification_time', 'TIMESTAMP_NTZ'), + ('start_logging_time', 'TIMESTAMP_NTZ'), + ('stop_logging_time', 'TIMESTAMP_NTZ'), ('latest_cloud_watch_logs_delivery_error', 'STRING'), - ('latest_cloud_watch_logs_delivery_time', 'TIMESTAMP'), - ('latest_digest_delivery_time', 'TIMESTAMP'), + ('latest_cloud_watch_logs_delivery_time', 'TIMESTAMP_NTZ'), + ('latest_digest_delivery_time', 'TIMESTAMP_NTZ'), ('latest_digest_delivery_error', 'STRING'), ('latest_delivery_attempt_time', 'STRING'), ('latest_notification_attempt_time', 'STRING'), @@ -685,46 +393,408 @@ ('time_logging_started', 'STRING'), ('time_logging_stopped', 'STRING'), ], + # https://docs.aws.amazon.com/cli/latest/reference/cloudtrail/get-event-selectors.html#output 'cloudtrail_get_event_selectors': [ - ('recorded_at', 'TIMESTAMP_NTZ'), + ('recorded_at', 'TIMESTAMP_LTZ'), ('account_id', 'STRING'), ('trail_arn', 'STRING'), ('read_write_type', 'STRING'), ('include_management_events', 'BOOLEAN'), ('data_resources', 'VARIANT'), + ('exclude_management_event_sources', 'VARIANT'), ], - 'kms_list_keys': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'STRING'), - ('key_id', 'STRING'), - ('key_arn', 'STRING'), - ], - 'kms_get_key_rotation_status': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'STRING'), - ('key_id', 'STRING'), - ('key_rotation_enabled', 'BOOLEAN'), - ], - 'config_describe_configuration_recorders': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'STRING'), - ('name', 'STRING'), - ('role_arn', 'STRING'), - ('recording_group', 'VARIANT'), - ], - 'ec2_describe_instances': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'STRING'), - ('groups', 'VARIANT'), - ('instances', 'VARIANT'), - ('owner_id', 'STRING'), - ('reservation_id', 'STRING'), - ], - 'iam_list_account_aliases': [ - ('recorded_at', 'TIMESTAMP_NTZ'), - ('account_id', 'STRING'), - ('account_alias', 'STRING'), - ], +} + +AWS_API_METHOD_COLUMNS = { + 'organizations.list_accounts': { + 'response': { + 'Accounts': [ + { + 'Id': 'id', + 'Arn': 'arn', + 'Email': 'email', + 'Name': 'name', + 'Status': 'status', + 'JoinedMethod': 'joined_method', + 'JoinedTimestamp': 'joined_timestamp', + } + ] + } + }, + 'iam.list_account_aliases': {'response': {'AccountAliases': ['account_alias']}}, + 'iam.get_account_summary': { + 'response': { + 'SummaryMap': { + 'GroupPolicySizeQuota': 'group_policy_size_quota', + 'InstanceProfilesQuota': 'instance_profiles_quota', + 'Policies': 'policies', + 'GroupsPerUserQuota': 'groups_per_user_quota', + 'InstanceProfiles': 'instance_profiles', + 'AttachedPoliciesPerUserQuota': 'attached_policies_per_user_quota', + 'Users': 'users', + 'PoliciesQuota': 'policies_quota', + 'Providers': 'providers', + 'AccountMFAEnabled': 'account_mfa_enabled', + 'AccessKeysPerUserQuota': 'access_keys_per_user_quota', + 'AssumeRolePolicySizeQuota': 'assume_role_policy_size_quota', + 'PolicyVersionsInUseQuota': 'policy_versions_in_use_quota', + 'GlobalEndpointTokenVersion': 'global_endpoint_token_version', + 'VersionsPerPolicyQuota': 'versions_per_policy_quota', + 'AttachedPoliciesPerGroupQuota': 'attached_policies_per_group_quota', + 'PolicySizeQuota': 'policy_size_quota', + 'Groups': 'groups', + 'AccountSigningCertificatesPresent': 'account_signing_certificates_present', + 'UsersQuota': 'users_quota', + 'ServerCertificatesQuota': 'server_certificates_quota', + 'MFADevices': 'mfa_devices', + 'UserPolicySizeQuota': 'user_policy_size_quota', + 'PolicyVersionsInUse': 'policy_versions_in_use', + 'ServerCertificates': 'server_certificates', + 'Roles': 'roles', + 'RolesQuota': 'roles_quota', + 'SigningCertificatesPerUserQuota': 'signing_certificates_per_user_quota', + 'MFADevicesInUse': 'mfa_devices_in_use', + 'RolePolicySizeQuota': 'role_policy_size_quota', + 'AttachedPoliciesPerRoleQuota': 'attached_policies_per_role_quota', + 'AccountAccessKeysPresent': 'account_access_keys_present', + 'GroupsQuota': 'groups_quota', + } + } + }, + 'iam.get_account_password_policy': { + 'response': { + 'PasswordPolicy': { + 'MinimumPasswordLength': 'minimum_password_length', + 'RequireSymbols': 'require_symbols', + 'RequireNumbers': 'require_numbers', + 'RequireUppercaseCharacters': 'require_uppercase_characters', + 'RequireLowercaseCharacters': 'require_lowercase_characters', + 'AllowUsersToChangePassword': 'allow_users_to_change_password', + 'ExpirePasswords': 'expire_passwords', + 'MaxPasswordAge': 'max_password_age', + 'PasswordReusePrevention': 'password_reuse_prevention', + 'HardExpiry': 'hard_expiry', + } + } + }, + 'ec2.describe_instances': { + 'response': { + 'Reservations': [ + { + 'Groups': 'groups', + 'Instances': 'instances', + 'OwnerId': 'owner_id', + 'RequesterId': 'requester_id', + 'ReservationId': 'reservation_id', + } + ] + } + }, + 'ec2.describe_security_groups': { + 'response': { + 'SecurityGroups': [ + { + 'Description': 'description', + 'GroupName': 'group_name', + 'IpPermissions': 'ip_permissions', + 'OwnerId': 'owner_id', + 'GroupId': 'group_id', + 'IpPermissionsEgress': 'ip_permissions_egress', + 'Tags': 'tags', + 'VpcId': 'vpc_id', + } + ] + } + }, + 'config.describe_configuration_recorders': { + 'response': { + 'ConfigurationRecorders': [ + { + 'name': 'name', + 'roleARN': 'role_arn', + 'recordingGroup': 'recording_group', + } + ] + } + }, + 'kms.list_keys': { + 'response': {'Keys': [{'KeyId': 'key_id', 'KeyArn': 'key_arn'}]}, + 'children': [ + {'method': 'kms.get_key_rotation_status', 'args': {'KeyId': 'key_arn'}} + ], + }, + 'kms.get_key_rotation_status': { + 'params': {'KeyId': 'key_arn'}, + 'response': {'KeyRotationEnabled': 'key_rotation_enabled'}, + }, + 'iam.generate_credential_report': { + 'response': {'State': 'state', 'Description': 'description'} + }, + 'iam.get_credential_report': { + 'response': { + 'Content': ParsedCol('csv', 'content', 'content_csv_parsed'), + 'ReportFormat': 'report_format', + 'GeneratedTime': 'generated_time', + } + }, + 'iam.list_users': { + 'response': { + 'Users': [ + { + 'Arn': 'arn', + 'Path': 'path', + 'CreateDate': 'create_date', + 'UserId': 'user_id', + 'UserName': 'user_name', + 'PasswordLastUsed': 'password_last_used', + 'PermissionsBoundary': 'permissions_boundary', + 'Tags': 'tags', + } + ] + }, + 'children': [ + { + 'methods': [ + 'iam.get_login_profile', + 'iam.list_mfa_devices', + 'iam.list_access_keys', + 'iam.list_groups_for_user', + 'iam.list_user_policies', + 'iam.list_attached_user_policies', + ], + 'args': {'UserName': 'user_name'}, + } + ], + }, + 'iam.list_groups_for_user': { + 'params': {'UserName': 'user_name'}, + 'response': { + 'Groups': [ + { + 'Path': 'path', + 'GroupName': 'group_name', + 'GroupId': 'group_id', + 'Arn': 'arn', + 'CreateDate': 'create_date', + } + ] + }, + }, + 'iam.list_access_keys': { + 'params': {'UserName': 'user_name'}, + 'response': { + 'AccessKeyMetadata': [ + { + 'UserName': 'user_name', + 'AccessKeyId': 'access_key_id', + 'Status': 'status', + 'CreateDate': 'create_date', + } + ] + }, + }, + 'iam.get_login_profile': { + 'params': {'UserName': 'user_name'}, + 'response': { + 'LoginProfile': { + 'UserName': 'user_name', + 'CreateDate': 'create_date', + 'PasswordResetRequired': 'password_reset_required', + } + }, + }, + 'iam.list_mfa_devices': { + 'params': {'UserName': 'user_name'}, + 'response': { + 'MFADevices': [ + { + 'UserName': 'user_name', + 'SerialNumber': 'serial_number', + 'EnableDate': 'enable_date', + } + ] + }, + }, + 'iam.list_user_policies': { + 'params': {'UserName': 'user_name'}, + 'response': {'PolicyNames': ['policy_name']}, + }, + 'iam.list_attached_user_policies': { + 'params': {'UserName': 'user_name'}, + 'response': { + 'AttachedPolicies': [ + {'PolicyName': 'policy_name', 'PolicyArn': 'policy_arn'} + ] + }, + }, + 'iam.list_policies': { + 'response': { + 'Policies': [ + { + 'PolicyName': 'policy_name', + 'PolicyId': 'policy_id', + 'Arn': 'arn', + 'Path': 'path', + 'DefaultVersionId': 'default_version_id', + 'AttachmentCount': 'attachment_count', + 'PermissionsBoundaryUsageCount': 'permissions_boundary_usage_count', + 'IsAttachable': 'is_attachable', + 'Description': 'description', + 'CreateDate': 'create_date', + 'UpdateDate': 'update_date', + } + ] + }, + 'children': [ + { + 'method': 'iam.get_policy_version', + 'args': {'PolicyArn': 'arn', 'VersionId': 'default_version_id'}, + }, + {'method': 'iam.list_entities_for_policy', 'args': {'PolicyArn': 'arn'}}, + ], + }, + 'iam.get_policy_version': { + 'params': {'PolicyArn': 'policy_arn'}, + 'response': { + 'PolicyVersion': { + 'Document': 'document', + 'VersionId': 'version_id', + 'CreateDate': 'create_date', + 'IsDefaultVersion': 'is_default_version', + } + }, + }, + 'iam.list_entities_for_policy': { + 'params': {'PolicyArn': 'policy_arn'}, + 'response': { + 'PolicyGroups': [{'GroupName': 'group_name', 'GroupId': 'group_id'}], + 'PolicyUsers': [{'UserName': 'user_name', 'UserId': 'user_id'}], + 'PolicyRoles': [{'RoleName': 'role_name', 'RoleId': 'role_id'}], + }, + }, + 'iam.list_virtual_mfa_devices': { + 'response': { + 'VirtualMFADevices': [ + { + 'SerialNumber': 'serial_number', + 'Base32StringSeed': 'base32_string_seed', + 'QRCodePNG': 'qr_code_png', + 'User': 'user', + 'EnableDate': 'enable_date', + } + ] + } + }, + 's3.list_buckets': { + 'response': { + 'Buckets': [ + {'Name': 'bucket_name', 'CreationDate': 'bucket_creation_date'} + ], + 'Owner': {'DisplayName': 'owner_display_name', 'ID': 'owner_id'}, + }, + 'children': [ + { + 'methods': [ + 's3.get_bucket_acl', + 's3.get_bucket_policy', + 's3.get_bucket_logging', + ], + 'args': {'Bucket': 'bucket_name'}, + } + ], + }, + 's3.get_bucket_acl': { + 'params': {'Bucket': 'bucket'}, + 'response': { + 'Owner': {'DisplayName': 'owner_display_name', 'ID': 'owner_id'}, + 'Grants': [ + {'Grantee': 'grants_grantee', 'Permission': 'grants_permission'} + ], + }, + }, + 's3.get_bucket_policy': { + 'params': {'Bucket': 'bucket'}, + 'response': {'Policy': ParsedCol('json', 'policy', 'policy_json_parsed')}, + }, + 's3.get_bucket_logging': { + 'params': {'Bucket': 'bucket'}, + 'response': { + 'LoggingEnabled': { + 'TargetBucket': 'target_bucket', + 'TargetGrants': 'target_grants', + 'TargetPrefix': 'target_prefix', + } + }, + }, + 'cloudtrail.describe_trails': { + 'response': { + 'trailList': [ + { + 'Name': 'name', + 'S3BucketName': 's3_bucket_name', + 'S3KeyPrefix': 's3_key_prefix', + 'SnsTopicName': 'sns_topic_name', + 'SnsTopicARN': 'sns_topic_arn', + 'IncludeGlobalServiceEvents': 'include_global_service_events', + 'IsMultiRegionTrail': 'is_multi_region_trail', + 'HomeRegion': 'home_region', + 'TrailARN': 'trail_arn', + 'LogFileValidationEnabled': 'log_file_validation_enabled', + 'CloudWatchLogsLogGroupArn': 'cloud_watch_logs_log_group_arn', + 'CloudWatchLogsRoleArn': 'cloud_watch_logs_role_arn', + 'KmsKeyId': 'kms_key_id', + 'HasCustomEventSelectors': 'has_custom_event_selectors', + 'HasInsightSelectors': 'has_insight_selectors', + 'IsOrganizationTrail': 'is_organization_trail', + } + ] + }, + 'children': [ + { + 'method': 'cloudtrail.get_trail_status', + 'args': {'TrailName': 'trail_arn'}, + }, + { + 'method': 'cloudtrail.get_event_selectors', + 'args': {'TrailName': 'trail_arn'}, + }, + ], + }, + 'cloudtrail.get_trail_status': { + 'params': {'Name': 'trail_arn'}, + 'response': { + 'IsLogging': 'is_logging', + 'LatestDeliveryError': 'latest_delivery_error', + 'LatestNotificationError': 'latest_notification_error', + 'LatestDeliveryTime': 'latest_delivery_time', + 'LatestNotificationTime': 'latest_notification_time', + 'StartLoggingTime': 'start_logging_time', + 'StopLoggingTime': 'stop_logging_time', + 'LatestCloudWatchLogsDeliveryError': 'latest_cloud_watch_logs_delivery_error', + 'LatestCloudWatchLogsDeliveryTime': 'latest_cloud_watch_logs_delivery_time', + 'LatestDigestDeliveryTime': 'latest_digest_delivery_time', + 'LatestDigestDeliveryError': 'latest_digest_delivery_error', + 'LatestDeliveryAttemptTime': 'latest_delivery_attempt_time', + 'LatestNotificationAttemptTime': 'latest_notification_attempt_time', + 'LatestNotificationAttemptSucceeded': 'latest_notification_attempt_succeeded', + 'LatestDeliveryAttemptSucceeded': 'latest_delivery_attempt_succeeded', + 'TimeLoggingStarted': 'time_logging_started', + 'TimeLoggingStopped': 'time_logging_stopped', + }, + }, + 'cloudtrail.get_event_selectors': { + 'response': { + 'TrailARN': 'trail_arn', + 'EventSelectors': [ + { + 'ReadWriteType': 'read_write_type', + 'IncludeManagementEvents': 'include_management_events', + 'DataResources': 'data_resources', + 'ExcludeManagementEventSources': 'exclude_management_event_sources', + } + ], + } + }, } @@ -746,7 +816,7 @@ def connect(connection_name, options): master_reader_arn=master_reader_arn, audit_reader_role=audit_reader_role, reader_eid=reader_eid, - collect_apis='all' + collect_apis='all', ) db.create_table(name=landing_table, cols=LANDING_TABLE_COLUMNS, comment=comment) @@ -763,392 +833,130 @@ def connect(connection_name, options): } -def aws_collect(client, method, params=None): - if params is None: - params = {} - - k2c = AWS_API_METHODS[method]['response'] - ent_keys = k2c.keys() # we'll be expecting response to have these keys - - client_name, method_name = method.split('.', 1) - - pages = ( - client.get_paginator(method_name).paginate(**params) - if client.can_paginate(method_name) - else None - ) - - NotFoundExceptions = tuple( - filter( - None, - [ - ClientError, - getattr(client.exceptions, 'AccessDeniedException', None), - getattr(client.exceptions, 'NoSuchEntityException', None), - ], - ) - ) - - # this is a bit crufty, may not be necessary now that the for loop below - # handles string values as {ent_key: cols} & {ent_key: ents} - inline_object_response = all(type(v) in (str, tuple) for k, v in k2c.items()) - - try: - if pages is None: - pages = [getattr(client, method_name)(**params)] - except NotFoundExceptions as e: - blank = None if inline_object_response else {} - pages = [updated({ent: blank for ent in ent_keys}, e.response)] - - for page in pages: - if inline_object_response: - # e.g. *-credential-report methods - def to_str(x): - return x.decode() if type(x) is bytes else x +def process_response_lists(coldict, page): + for response_key, colname in coldict.items(): + response_value = page.get(response_key) - parsers = { - 'csv': lambda v: [dict(x) for x in csv.DictReader(io.StringIO(v))] - } + if type(colname) is list: + for x in response_value or []: + yield process_response_items(colname[0], x, {}) - yield updated( - {v: to_str(page.get(k)) for k, v in k2c.items() if type(v) is str}, - { - f'{t[1]}_{t[0]}_parsed': parsers[t[0]](to_str(page.get(k))) - for k, t in k2c.items() - if type(t) is tuple - }, - {t[1]: to_str(page.get(k)) for k, t in k2c.items() if type(t) is tuple}, - recorded_at=parse_date(page['ResponseMetadata']['HTTPHeaders']['date']), - ) - continue + if type(colname) is dict: + yield from process_response_lists(colname, response_value) - for ent_key in ent_keys: - ents = page.get(ent_key, {}) - cols = k2c[ent_key] - cols = cols if type(cols) is dict else {ent_key: cols} - cols = updated({'ResponseHeaderDate': 'recorded_at'}, cols) - # treat singular entities from get_* like list with one ent. - ents = [ents] if type(ents) is dict else ents - ents = [{ent_key: ents}] if type(ents) is str else ents +def process_response_items(coldict, page, db_entry=None): + if db_entry is None: + db_entry = {} - for ent in ents: - # ents = {"PolicyNames": ["p1"]} -> [{"PolicyName": "p1"}] - if type(ent) is str and ent_key.endswith('s'): - ent = {ent_key[:-1]: ent} + if type(coldict) is str: + db_entry[coldict] = page - ent['ResponseHeaderDate'] = parse_date( - page['ResponseMetadata']['HTTPHeaders']['date'] - ) + elif type(coldict) is ParsedCol: + parse = PARSERS[coldict.type] + db_entry[coldict.colname] = bytes_to_str(page) + db_entry[coldict.parsed_colname] = parse(bytes_to_str(page)) - yield {v: ent.get(k) for k, v in cols.items()} + elif type(coldict) is dict: + for response_key, colname in coldict.items(): + response_value = page.get(response_key) + db_entry.update(process_response_items(colname, response_value)) + return db_entry -def load_aws_iam( - account_id, method, params, add_task -) -> Generator[Dict[str, List[dict]], None, None]: - account_arn = f'arn:aws:iam::{account_id}:role/{AUDIT_READER_ROLE}' - account_info = {'account_id': account_id} - client_name, method = method.split('.', 1) +def process_aws_response(task, page): + response_coldict = AWS_API_METHOD_COLUMNS[task.method]['response'] + children_list = AWS_API_METHOD_COLUMNS[task.method].get('children', []) + params = AWS_API_METHOD_COLUMNS[task.method].get('params', {}) - try: - session = sts_assume_role( - src_role_arn=AUDIT_ASSUMER_ARN, - dest_role_arn=account_arn, - dest_external_id=READER_EID, - ) + base_entity = {} + if task.account_id: + base_entity['account_id'] = task.account_id - except ClientError as e: - # record missing auditor role as empty account summary - yield { - f'{client_name}.{method}': [ - updated( - account_info, - recorded_at=parse_date( - e.response['ResponseMetadata']['HTTPHeaders']['date'] - ), - ) - ] - } + if isinstance(page, BotoCoreError): + yield DBEntry(base_entity) return - client = session.client(client_name) - - if method == 'list_virtual_mfa_devices': - virtual_mfa_devices = [ - updated(u, account_info) - for u in aws_collect(client, 'iam.list_virtual_mfa_devices') - ] - yield {'iam.list_virtual_mfa_devices': virtual_mfa_devices} - - if method == 'list_account_aliases': - account_aliases = [ - updated(u, account_info) - for u in aws_collect(client, 'iam.list_account_aliases') - ] - yield {'iam.list_account_aliases': account_aliases} - - if method == 'describe_instances': - reservations = [ - updated(u, account_info) - for u in aws_collect(client, 'ec2.describe_instances') - ] - yield {'ec2.describe_instances': reservations} - - if method == 'describe_configuration_recorders': - config_recorders = [ - updated(u, account_info) - for u in aws_collect(client, 'config.describe_configuration_recorders') - ] - yield {'config.describe_configuration_recorders': config_recorders} - - if method == 'describe_security_groups': - security_groups = [ - updated(u, account_info) - for u in aws_collect(client, 'ec2.describe_security_groups') - ] - yield {'ec2.describe_security_groups': security_groups} - - if method == 'list_keys': - keys = [updated(u, account_info) for u in aws_collect(client, 'kms.list_keys')] - yield {'kms.list_keys': keys} - for key in keys: - params = {'KeyId': key['key_id']} - yield { - 'kms.get_key_rotation_status': [ - updated(u, account_info, {'key_id': params['KeyId']}) - for u in aws_collect(client, 'kms.get_key_rotation_status', params) - ] - } - - if method == 'describe_trails': - trails = [ - updated(u, account_info) - for u in aws_collect(client, 'cloudtrail.describe_trails') - ] - yield {'cloudtrail.describe_trails': trails} - for trail in trails: - params = {'Name': trail['trail_arn']} - yield { - 'cloudtrail.get_trail_status': [ - updated(u, account_info, {'trail_arn': params['Name']}) - for u in aws_collect(client, 'cloudtrail.get_trail_status', params) - ] - } - params = {'TrailName': trail['trail_arn']} - yield { - 'cloudtrail.get_event_selectors': [ - updated(u, account_info, {'trail_arn': params['TrailName']}) - for u in aws_collect( - client, 'cloudtrail.get_event_selectors', params - ) - ] - } - - if method == 'get_trail_status': - yield { - 'cloudtrail.get_trail_status': [ - updated(u, account_info, {'name': params['Name']}) - for u in aws_collect(client, 'cloudtrail.get_trail_status', params) - ] - } + base_entity.update({v: task.args[k] for k, v in params.items()}) + base_entity.update(process_response_items(response_coldict, page)) - if method == 'get_event_selectors': - yield { - 'cloudtrail.get_event_selectors': [ - updated(u, account_info, {'name': params['TrailName']}) - for u in aws_collect(client, 'cloudtrail.get_event_selectors', params) - ] - } + if 'ResponseMetadata' in page: + base_entity['recorded_at'] = parse_date( + page['ResponseMetadata']['HTTPHeaders']['date'] + ) - if method == 'list_buckets': - buckets = [ - updated(u, account_info) for u in aws_collect(client, 's3.list_buckets') - ] - yield {'s3.list_buckets': buckets} - for bucket_group in groups_of(1000, buckets): - add_task( - { - 'account_id': account_id, - 'methods': [ - 's3.get_bucket_acl', - 's3.get_bucket_policy', - 's3.get_bucket_logging', - ], - 'params': [ - {'Bucket': bucket['bucket_name']} - for bucket in bucket_group - if 'bucket_name' in bucket - ], - } - ) + iterated_entries = list(process_response_lists(response_coldict, page)) - if method == 'get_bucket_policy': - yield { - 's3.get_bucket_policy': [ - updated(u, account_info, {'bucket': params['Bucket']}) - for u in aws_collect(client, 's3.get_bucket_policy', params) - ] - } + for entry in iterated_entries or [base_entity]: + db_entry = DBEntry(updated(base_entity.copy(), entry)) + yield db_entry + for child in children_list: + for method in child.get('methods', [child.get('method')]): + req_args = child.get('args', {}) + if any(v not in db_entry.entity for v in req_args.values()): + continue + args = {k: db_entry.entity[v] for k, v in req_args.items()} + yield CollectTask(task.account_id, method, args) - if method == 'get_bucket_acl': - yield { - 's3.get_bucket_acl': [ - updated(u, account_info, {'bucket': params['Bucket']}) - for u in aws_collect(client, 's3.get_bucket_acl', params) - ] - } - if method == 'get_bucket_logging': - yield { - 's3.get_bucket_logging': [ - updated(u, account_info, {'bucket': params['Bucket']}) - for u in aws_collect(client, 's3.get_bucket_logging', params) - ] - } +def load_task_response(client, task): + args = task.args or {} - if method == 'get_account_password_policy': - yield { - 'iam.get_account_password_policy': [ - updated(u, account_info) - for u in aws_collect(client, 'iam.get_account_password_policy') - ] - } + client_name, method_name = task.method.split('.', 1) - if method == 'generate_credential_report': - yield { - 'iam.generate_credential_report': [ - updated(u, account_info) - for u in aws_collect(client, 'iam.generate_credential_report') - ] - } - add_task({'account_id': account_id, 'methods': 'iam.get_credential_report'}) + pages = ( + client.get_paginator(method_name).paginate(**args) + if client.can_paginate(method_name) + else None + ) - if method == 'get_credential_report': - yield { - 'iam.get_credential_report': [ - updated(u, account_info) - for u in aws_collect(client, 'iam.get_credential_report') - ] - } + if pages is None: + try: + pages = [getattr(client, method_name)(**args)] - if method == 'list_users': - users = [ - updated(u, account_info) for u in aws_collect(client, 'iam.list_users') - ] - yield {'iam.list_users': users} - for user_group in groups_of(1000, users): - add_task( - { - 'account_id': account_id, - 'methods': [ - 'iam.list_groups_for_user', - 'iam.list_access_keys', - 'iam.get_login_profile', - 'iam.list_mfa_devices', - 'iam.list_user_policies', - 'iam.list_attached_user_policies', - ], - 'params': [{'UserName': user['user_name']} for user in user_group], - } - ) + except (ClientError, DataNotFoundError) as e: + pages = [e] - if method == 'list_groups_for_user': - yield { - 'iam.list_groups_for_user': [ - updated(group, account_info, {'user_name': params['UserName']}) - for group in aws_collect(client, 'iam.list_groups_for_user', params) - ] - } + for page in pages: + yield from process_aws_response(task, page) - if method == 'list_access_keys': - yield { - 'iam.list_access_keys': [ - updated(access_key, account_info, {'user_name': params['UserName']}) - for access_key in aws_collect(client, 'iam.list_access_keys', params) - ] - } - if method == 'get_login_profile': - yield { - 'iam.get_login_profile': [ - updated(login_profile, account_info, {'user_name': params['UserName']}) - for login_profile in aws_collect( - client, 'iam.get_login_profile', params - ) - ] - } +def process_task(task, add_task) -> Generator[Tuple[str, dict], None, None]: + account_arn = f'arn:aws:iam::{task.account_id}:role/{AUDIT_READER_ROLE}' + account_info = {'account_id': task.account_id} - if method == 'list_mfa_devices': - yield { - 'iam.list_mfa_devices': [ - updated(mfa_device, account_info, {'user_name': params['UserName']}) - for mfa_device in aws_collect(client, 'iam.list_mfa_devices', params) - ] - } + client_name, method_name = task.method.split('.', 1) - if method == 'list_attached_user_policies': - yield { - 'iam.list_attached_user_policies': [ - updated(user_policy, account_info, {'user_name': params['UserName']}) - for user_policy in aws_collect( - client, 'iam.list_attached_user_policies', params - ) - ] - } + try: + session = sts_assume_role( + src_role_arn=AUDIT_ASSUMER_ARN, + dest_role_arn=account_arn, + dest_external_id=READER_EID, + ) - if method == 'list_user_policies': - yield { - 'iam.list_user_policies': [ - updated(user_policy, account_info, {'user_name': params['UserName']}) - for user_policy in aws_collect(client, 'iam.list_user_policies', params) - ] - } + except ClientError as e: + # record missing auditor role as empty account summary + yield ( + task.method, + updated( + account_info, + recorded_at=parse_date( + e.response['ResponseMetadata']['HTTPHeaders']['date'] + ), + ), + ) + return - if method == 'list_policies': - policies = [ - updated(u, account_info) for u in aws_collect(client, 'iam.list_policies') - ] - yield {'iam.list_policies': policies} - for policy_group in groups_of(1000, policies): - add_task( - { - 'account_id': account_id, - 'method': 'iam.get_policy_version', - 'params': [ - { - 'PolicyArn': policy['arn'], - 'VersionId': policy['default_version_id'], - } - for policy in policy_group - ], - } - ) - add_task( - { - 'account_id': account_id, - 'method': 'iam.list_entities_for_policy', - 'params': [{'PolicyArn': policy['arn']} for policy in policy_group], - } - ) - if method == 'get_policy_version': - yield { - 'iam.get_policy_version': [ - updated(version, account_info, {'policy_arn': params['PolicyArn']}) - for version in aws_collect(client, 'iam.get_policy_version', params) - ] - } + client = session.client(client_name) - if method == 'list_entities_for_policy': - yield { - 'iam.list_entities_for_policy': [ - updated(entity, account_info, {'policy_arn': params['PolicyArn']}) - for entity in aws_collect( - client, 'iam.list_entities_for_policy', params - ) - ] - } + for response in load_task_response(client, task): + if type(response) is DBEntry: + yield (task.method, response.entity) + elif type(response) is CollectTask: + add_task(response) + else: + log.info('log response', response) def insert_list(name, values, table_name=None): @@ -1160,18 +968,13 @@ def insert_list(name, values, table_name=None): def aws_collect_task(task, add_task=None): log.info(f'processing {task}') - account_id = task['account_id'] - methods = task.get('methods') or [task['method']] - params = task.get('params', {}) - if type(params) is dict: - params = [params] + result_lists = defaultdict(list) + for k, v in process_task(task, add_task): + result_lists[k].append(v) - for param in params: - for method in methods: - for lists in load_aws_iam(account_id, method, param, add_task): - for name, values in lists.items(): - response = insert_list(name, values) - log.info(f'finished {response}') + for name, vs in result_lists.items(): + response = insert_list(name, vs) + log.info(f'finished {response}') def ingest(table_name, options): @@ -1190,7 +993,12 @@ def ingest(table_name, options): dest_external_id=READER_EID, ).client('organizations') - accounts = [a for a in aws_collect(org_client, 'organizations.list_accounts')] + accounts = [ + a.entity + for a in load_task_response( + org_client, CollectTask(None, 'organizations.list_accounts', {}) + ) + ] insert_list( 'organizations.list_accounts', accounts, table_name=f'data.{table_name}' ) @@ -1199,29 +1007,31 @@ def ingest(table_name, options): 32, aws_collect_task, [ - {'method': method, 'account_id': a['id']} + CollectTask(a['id'], method, {}) for a in accounts for method in [ + 'iam.generate_credential_report', + 'iam.list_account_aliases', 'iam.get_account_summary', 'iam.get_account_password_policy', + 'ec2.describe_instances', + 'ec2.describe_security_groups', + 'config.describe_configuration_recorders', + 'iam.get_credential_report', + 'kms.list_keys', 'iam.list_users', 'iam.list_policies', - 'iam.list_account_aliases', - 's3.list_buckets', - 'iam.generate_credential_report', - 'iam.get_credential_report', 'iam.list_virtual_mfa_devices', - 'ec2.describe_security_groups', + 's3.list_buckets', 'cloudtrail.describe_trails', - 'kms.list_keys', - 'config.describe_configuration_recorders', - 'ec2.describe_instances', ] ], ) -def main(table_name, audit_assumer_ARN, master_reader_ARN, reader_eid, audit_reader_role): +def main( + table_name, audit_assumer_ARN, master_reader_ARN, reader_eid, audit_reader_role +): print( ingest( table_name, diff --git a/src/connectors/tests/test_aws_collect.py b/src/connectors/tests/test_aws_collect.py new file mode 100644 index 000000000..202d828a3 --- /dev/null +++ b/src/connectors/tests/test_aws_collect.py @@ -0,0 +1,142 @@ +from botocore.exceptions import BotoCoreError + +from collections import namedtuple + +from connectors.aws_collect import process_aws_response, DBEntry, CollectTask + + +Sample = namedtuple('Sample', ['task', 'response', 'entities', 'subrequests']) + + +TEST_DATA_REQUEST_RESPONSE = [ + # e.g. error + Sample( + CollectTask('1', 'iam.list_account_aliases', {}), + BotoCoreError(), + [DBEntry({'account_id': '1'})], + [], + ), + # e.g. list-of-entities response + Sample( + CollectTask('1', 'kms.list_keys', {}), + { + "Keys": [ + {'KeyId': 'id1', 'KeyArn': 'arn1'}, + {'KeyId': 'id2', 'KeyArn': 'arn2'}, + ] + }, + [ + DBEntry({'account_id': '1', "key_id": "id1", "key_arn": "arn1"}), + DBEntry({'account_id': '1', "key_id": "id2", "key_arn": "arn2"}), + ], + [ + CollectTask('1', 'kms.get_key_rotation_status', {'KeyId': 'arn1'}), + CollectTask('1', 'kms.get_key_rotation_status', {'KeyId': 'arn2'}), + ], + ), + # e.g. list-of-strings response + Sample( + CollectTask('1', 'iam.list_account_aliases', {}), + {'AccountAliases': ['one', 'two']}, + [ + DBEntry({'account_id': '1', 'account_alias': 'one'}), + DBEntry({'account_id': '1', 'account_alias': 'two'}), + ], + [], + ), + # e.g. single-entity response with custom parser + Sample( + CollectTask('1', 'iam.get_credential_report', {}), + { + 'Content': 'col1,col2\nval11,val12\nval21,val22', + 'ReportFormat': 'csv', + 'GeneratedTime': '2019-11-30T12:13:14Z', + }, + [ + DBEntry( + { + 'account_id': '1', + 'generated_time': '2019-11-30T12:13:14Z', + 'report_format': 'csv', + 'content': 'col1,col2\nval11,val12\nval21,val22', + 'content_csv_parsed': [ + {'col1': 'val11', 'col2': 'val12'}, + {'col1': 'val21', 'col2': 'val22'}, + ], + } + ) + ], + [], + ), + # e.g. repeat-field list-of-entities response + Sample( + CollectTask('1', 's3.list_buckets', {}), + { + 'Owner': {'DisplayName': 'dn1', 'ID': 'oid1'}, + 'Buckets': [ + {'Name': 'name1', 'CreationDate': 'date1'}, + {'Name': 'name2', 'CreationDate': 'date2'}, + ], + }, + [ + DBEntry( + { + 'account_id': '1', + 'owner_display_name': 'dn1', + 'owner_id': 'oid1', + 'bucket_name': 'name1', + 'bucket_creation_date': 'date1', + } + ), + DBEntry( + { + 'account_id': '1', + 'owner_display_name': 'dn1', + 'owner_id': 'oid1', + 'bucket_name': 'name2', + 'bucket_creation_date': 'date2', + } + ), + ], + [ + CollectTask( + account_id='1', method='s3.get_bucket_acl', args={'Bucket': 'name1'} + ), + CollectTask( + account_id='1', method='s3.get_bucket_policy', args={'Bucket': 'name1'} + ), + CollectTask( + account_id='1', method='s3.get_bucket_logging', args={'Bucket': 'name1'} + ), + CollectTask( + account_id='1', method='s3.get_bucket_acl', args={'Bucket': 'name2'} + ), + CollectTask( + account_id='1', method='s3.get_bucket_policy', args={'Bucket': 'name2'} + ), + CollectTask( + account_id='1', method='s3.get_bucket_logging', args={'Bucket': 'name2'} + ), + ], + ), + # e.g. with parameter + Sample( + CollectTask('1', 'kms.get_key_rotation_status', {'KeyId': 'arn1'}), + {'KeyRotationEnabled': True}, + [DBEntry({'account_id': '1', 'key_arn': 'arn1', 'key_rotation_enabled': True})], + [], + ), +] + + +def test_process_aws_response(): + for sample in TEST_DATA_REQUEST_RESPONSE: + db_entries = [] + child_requests = [] + for r in process_aws_response(sample.task, sample.response): + if type(r) is DBEntry: + db_entries.append(r) + elif type(r) is CollectTask: + child_requests.append(r) + assert db_entries == sample.entities + assert child_requests == sample.subrequests diff --git a/src/connectors/utils.py b/src/connectors/utils.py index 49bc43207..7616c48ed 100644 --- a/src/connectors/utils.py +++ b/src/connectors/utils.py @@ -77,6 +77,10 @@ def yaml_dump(**kwargs): return yaml.dump(kwargs, default_flow_style=False, explicit_start=True) +def bytes_to_str(x): + return x.decode() if type(x) is bytes else x + + def create_metadata_table(table, cols, addition): db.create_table(table, cols, ifnotexists=True) db.execute(f"GRANT INSERT, SELECT ON {table} TO ROLE {SA_ROLE}") diff --git a/src/pytest.ini b/src/pytest.ini index 90c237804..7bbac45dc 100644 --- a/src/pytest.ini +++ b/src/pytest.ini @@ -1,2 +1,2 @@ [pytest] -python_files = runners/tests/*.py +python_files = */tests/*.py diff --git a/src/setup.py b/src/setup.py index ea9d21b9f..fe6720e73 100644 --- a/src/setup.py +++ b/src/setup.py @@ -7,10 +7,11 @@ include_package_data=True, install_requires=[ 'aiohttp[speedups]', + 'aioboto3==6.4.1', 'fire==0.1.3', 'jira==2.0.0', 'PyYAML==4.2b1', - 'snowflake-connector-python==2.0.4', + 'snowflake-connector-python==1.9.1', 'snowflake-sqlalchemy==1.1.2', 'pandas==0.24.1', 'pybrake==0.4.0', @@ -28,7 +29,7 @@ 'azure-storage-common==2.1.0', 'google-api-python-client==1.7.10', 'pyTenable==0.3.22', - 'boto3==1.10.28', + 'boto3==1.9.253', 'twilio==6.29.4', ], ) From c89d17937fba22fe822fcd3333267626845f1702 Mon Sep 17 00:00:00 2001 From: Andrey Fedorov Date: Tue, 3 Dec 2019 08:53:40 -0800 Subject: [PATCH 2/8] switch to aio --- src/connectors/aws_collect.py | 197 +++++++++++++++++++--------------- src/connectors/utils.py | 31 ++++++ src/runners/helpers/db.py | 14 +++ 3 files changed, 156 insertions(+), 86 deletions(-) diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index 180104294..f0eddc9fe 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -3,19 +3,22 @@ """ import asyncio -import aioboto3 -from botocore.exceptions import BotoCoreError, ClientError, DataNotFoundError +from botocore.exceptions import ( + # BotoCoreError, + ClientError, + DataNotFoundError, +) from collections import defaultdict, namedtuple import csv from dateutil.parser import parse as parse_date import json import fire import io -from typing import Tuple, List, Generator +from typing import Tuple, Generator from runners.helpers.dbconfig import ROLE as SA_ROLE -from connectors.utils import sts_assume_role, qmap_mp, updated, yaml_dump, bytes_to_str +from connectors.utils import aio_sts_assume_role, updated, yaml_dump, bytes_to_str from runners.helpers import db, log @@ -24,6 +27,9 @@ AUDIT_READER_ROLE = 'audit-reader' READER_EID = '' +_SESSION_CACHE = {} +_TASKS_PER_SECOND = 100 + CONNECTION_OPTIONS = [ { 'type': 'str', @@ -874,7 +880,7 @@ def process_aws_response(task, page): if task.account_id: base_entity['account_id'] = task.account_id - if isinstance(page, BotoCoreError): + if isinstance(page, Exception): yield DBEntry(base_entity) return @@ -900,40 +906,51 @@ def process_aws_response(task, page): yield CollectTask(task.account_id, method, args) -def load_task_response(client, task): +async def load_task_response(client, task): args = task.args or {} client_name, method_name = task.method.split('.', 1) - pages = ( - client.get_paginator(method_name).paginate(**args) - if client.can_paginate(method_name) - else None - ) - - if pages is None: - try: - pages = [getattr(client, method_name)(**args)] - - except (ClientError, DataNotFoundError) as e: - pages = [e] + try: + if client.can_paginate(method_name): + async for page in client.get_paginator(method_name).paginate(**args): + for x in process_aws_response(task, page): + yield x + else: + for x in process_aws_response( + task, await getattr(client, method_name)(**args) + ): + yield x - for page in pages: - yield from process_aws_response(task, page) + except (ClientError, DataNotFoundError) as e: + for x in process_aws_response(task, e): + yield x -def process_task(task, add_task) -> Generator[Tuple[str, dict], None, None]: +async def process_task(task, add_task) -> Generator[Tuple[str, dict], None, None]: account_arn = f'arn:aws:iam::{task.account_id}:role/{AUDIT_READER_ROLE}' account_info = {'account_id': task.account_id} client_name, method_name = task.method.split('.', 1) try: - session = sts_assume_role( - src_role_arn=AUDIT_ASSUMER_ARN, - dest_role_arn=account_arn, - dest_external_id=READER_EID, + session = _SESSION_CACHE[account_arn] = ( + _SESSION_CACHE[account_arn] + if account_arn in _SESSION_CACHE + else await aio_sts_assume_role( + src_role_arn=AUDIT_ASSUMER_ARN, + dest_role_arn=account_arn, + dest_external_id=READER_EID, + ) ) + async with session.client(client_name) as client: + async for response in load_task_response(client, task): + if type(response) is DBEntry: + yield (task.method, response.entity) + elif type(response) is CollectTask: + add_task(response) + else: + log.info('log response', response) except ClientError as e: # record missing auditor role as empty account summary @@ -946,17 +963,6 @@ def process_task(task, add_task) -> Generator[Tuple[str, dict], None, None]: ), ), ) - return - - client = session.client(client_name) - - for response in load_task_response(client, task): - if type(response) is DBEntry: - yield (task.method, response.entity) - elif type(response) is CollectTask: - add_task(response) - else: - log.info('log response', response) def insert_list(name, values, table_name=None): @@ -966,18 +972,17 @@ def insert_list(name, values, table_name=None): return db.insert(table_name, values) -def aws_collect_task(task, add_task=None): +async def aws_collect_task(task, wait=0.0, add_task=None): log.info(f'processing {task}') result_lists = defaultdict(list) - for k, v in process_task(task, add_task): + if wait: + await asyncio.sleep(wait) + async for k, v in process_task(task, add_task): result_lists[k].append(v) + return result_lists - for name, vs in result_lists.items(): - response = insert_list(name, vs) - log.info(f'finished {response}') - -def ingest(table_name, options): +async def aioingest(table_name, options): global AUDIT_ASSUMER_ARN global MASTER_READER_ARN global AUDIT_READER_ROLE @@ -987,61 +992,81 @@ def ingest(table_name, options): AUDIT_READER_ROLE = options.get('audit_reader_role', '') READER_EID = options.get('reader_eid', '') - org_client = sts_assume_role( + session = await aio_sts_assume_role( src_role_arn=AUDIT_ASSUMER_ARN, dest_role_arn=MASTER_READER_ARN, dest_external_id=READER_EID, - ).client('organizations') + ) + + async with session.client('organizations') as org_client: + accounts = [ + a.entity + async for a in load_task_response( + org_client, CollectTask(None, 'organizations.list_accounts', {}) + ) + ] - accounts = [ - a.entity - for a in load_task_response( - org_client, CollectTask(None, 'organizations.list_accounts', {}) - ) - ] insert_list( 'organizations.list_accounts', accounts, table_name=f'data.{table_name}' ) if options.get('collect_apis') == 'all': - qmap_mp( - 32, - aws_collect_task, - [ - CollectTask(a['id'], method, {}) - for a in accounts - for method in [ - 'iam.generate_credential_report', - 'iam.list_account_aliases', - 'iam.get_account_summary', - 'iam.get_account_password_policy', - 'ec2.describe_instances', - 'ec2.describe_security_groups', - 'config.describe_configuration_recorders', - 'iam.get_credential_report', - 'kms.list_keys', - 'iam.list_users', - 'iam.list_policies', - 'iam.list_virtual_mfa_devices', - 's3.list_buckets', - 'cloudtrail.describe_trails', - ] - ], - ) + collection_tasks = [ + CollectTask(a['id'], method, {}) + for a in accounts + for method in [ + # 'iam.generate_credential_report', + 'iam.list_account_aliases', + 'iam.get_account_summary', + 'iam.get_account_password_policy', + 'ec2.describe_instances', + 'ec2.describe_security_groups', + 'config.describe_configuration_recorders', + # 'iam.get_credential_report', + 'kms.list_keys', + 'iam.list_users', + 'iam.list_policies', + 'iam.list_virtual_mfa_devices', + 's3.list_buckets', + 'cloudtrail.describe_trails', + ] + ] + + def add_task(t): + collection_tasks.append(t) + + while collection_tasks: + coroutines = [ + aws_collect_task( + t, wait=(float(i) / _TASKS_PER_SECOND), add_task=add_task + ) + for i, t in enumerate(collection_tasks) + ] + collection_tasks = [] + + all_results = defaultdict(list) + for result_lists in await asyncio.gather(*coroutines): + for k, vs in result_lists.items(): + all_results[k] += vs + for name, vs in all_results.items(): + response = insert_list(name, vs) + log.info(f'finished {name} {response}') + + +def ingest(table_name, options): + return asyncio.get_event_loop().run_until_complete(aioingest(table_name, options)) def main( - table_name, audit_assumer_ARN, master_reader_ARN, reader_eid, audit_reader_role + table_name, audit_assumer_arn, master_reader_arn, reader_eid, audit_reader_role ): - print( - ingest( - table_name, - { - 'audit_assumer_ARN': audit_assumer_ARN, - 'master_reader_ARN': master_reader_ARN, - 'reader_eid': reader_eid, - 'audit_reader_role': audit_reader_role, - }, - ) + ingest( + table_name, + { + 'audit_assumer_arn': audit_assumer_arn, + 'master_reader_arn': master_reader_arn, + 'reader_eid': reader_eid, + 'audit_reader_role': audit_reader_role, + }, ) diff --git a/src/connectors/utils.py b/src/connectors/utils.py index 7616c48ed..3253186b7 100644 --- a/src/connectors/utils.py +++ b/src/connectors/utils.py @@ -1,3 +1,4 @@ +import aioboto3 import boto3 import random import yaml @@ -73,6 +74,36 @@ def sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): ) +async def aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): + session_name = ''.join(random.choice('0123456789ABCDEF') for i in range(16)) + async with aioboto3.client('sts') as sts: + src_role = await sts.assume_role( + RoleArn=src_role_arn, RoleSessionName=session_name + ) + async with aioboto3.Session( + aws_access_key_id=src_role['Credentials']['AccessKeyId'], + aws_secret_access_key=src_role['Credentials']['SecretAccessKey'], + aws_session_token=src_role['Credentials']['SessionToken'], + ).client('sts') as sts_client: + sts_role = await ( + sts_client.assume_role( + RoleArn=dest_role_arn, + RoleSessionName=session_name, + ExternalId=dest_external_id, + ) + if dest_external_id + else sts_client.assume_role( + RoleArn=dest_role_arn, RoleSessionName=session_name + ) + ) + + return aioboto3.Session( + aws_access_key_id=sts_role['Credentials']['AccessKeyId'], + aws_secret_access_key=sts_role['Credentials']['SecretAccessKey'], + aws_session_token=sts_role['Credentials']['SessionToken'], + ) + + def yaml_dump(**kwargs): return yaml.dump(kwargs, default_flow_style=False, explicit_start=True) diff --git a/src/runners/helpers/db.py b/src/runners/helpers/db.py index 1128390dc..4bf62e05c 100644 --- a/src/runners/helpers/db.py +++ b/src/runners/helpers/db.py @@ -334,6 +334,20 @@ def determine_cols(values: List[dict]) -> Tuple[List[str], List[str]]: def insert(table, values, overwrite=False, select="", columns=[], dryrun=False): + num_rows_inserted = 0 + # snowflake limits the number of rows inserted in a single statement: + # snowflake.connector.errors.ProgrammingError: 001795 (42601): + # SQL compilation error: error line 3 at position 158 + # maximum number of expressions in a list exceeded, + # expected at most 16,384, got 169,667 + for group in utils.groups_of(16384, values): + num_rows_inserted += do_insert( + table, values, overwrite, select, columns, dryrun + )['number of rows inserted'] + return {'number of rows inserted': num_rows_inserted} + + +def do_insert(table, values, overwrite=False, select="", columns=[], dryrun=False): if len(values) == 0: return {'number of rows inserted': 0} From f54f97cb5ef19a609b5aec03d0da4069ea0d25f7 Mon Sep 17 00:00:00 2001 From: Andrey Fedorov Date: Tue, 3 Dec 2019 09:58:01 -0800 Subject: [PATCH 3/8] fix mypy --- src/connectors/aws_collect.py | 6 +++--- src/mypy.ini | 3 +++ src/webui/backend/webui/app.py | 9 +++++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index f0eddc9fe..e4be6cb54 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -14,7 +14,7 @@ import json import fire import io -from typing import Tuple, Generator +from typing import Tuple, AsyncGenerator from runners.helpers.dbconfig import ROLE as SA_ROLE @@ -27,7 +27,7 @@ AUDIT_READER_ROLE = 'audit-reader' READER_EID = '' -_SESSION_CACHE = {} +_SESSION_CACHE: dict = {} _TASKS_PER_SECOND = 100 CONNECTION_OPTIONS = [ @@ -927,7 +927,7 @@ async def load_task_response(client, task): yield x -async def process_task(task, add_task) -> Generator[Tuple[str, dict], None, None]: +async def process_task(task, add_task) -> AsyncGenerator[Tuple[str, dict], None]: account_arn = f'arn:aws:iam::{task.account_id}:role/{AUDIT_READER_ROLE}' account_info = {'account_id': task.account_id} diff --git a/src/mypy.ini b/src/mypy.ini index 7ca5cac09..177c40c55 100644 --- a/src/mypy.ini +++ b/src/mypy.ini @@ -69,3 +69,6 @@ ignore_missing_imports = True [mypy-aiohttp.*] ignore_missing_imports = True + +[mypy-aioboto3.*] +ignore_missing_imports = True diff --git a/src/webui/backend/webui/app.py b/src/webui/backend/webui/app.py index a8da502b4..2558ce462 100644 --- a/src/webui/backend/webui/app.py +++ b/src/webui/backend/webui/app.py @@ -37,8 +37,13 @@ def get_send_file_max_age(self, name): app.config.from_object(config.FlaskConfig) # type: ignore app.debug = config.DEBUG -# clear db cache persisting between requests -app.teardown_request(lambda _: setattr(db.CACHE, db.CONNECTION, None)) + +def clear_cache(request): + "clear db cache persisting between requests" + setattr(db.CACHE, db.CONNECTION, None) + + +app.teardown_request(clear_cache) app.register_blueprint(app_views) app.register_blueprint(data_api, url_prefix='/api/sa/data') From 40c93aedcc61968914b6001e82927cb54418fa54 Mon Sep 17 00:00:00 2001 From: Andrey Fedorov Date: Tue, 3 Dec 2019 19:44:29 -0800 Subject: [PATCH 4/8] bugfixes --- migrations/v1_9_0-v1_9_1.md | 3 --- src/connectors/aws_collect.py | 25 +++++++++++++++---------- src/runners/helpers/db.py | 2 +- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/migrations/v1_9_0-v1_9_1.md b/migrations/v1_9_0-v1_9_1.md index 99e0e674d..a1d05c858 100644 --- a/migrations/v1_9_0-v1_9_1.md +++ b/migrations/v1_9_0-v1_9_1.md @@ -42,9 +42,6 @@ ALTER TABLE data.aws_collect_iam_list_policies ALTER TABLE data.aws_collect_s3_get_bucket_acl DROP COLUMN owner ; -ALTER TABLE data.aws_collect_iam_list_policies - RENAME COLUMN arn TO policy_arn -; ALTER TABLE data.aws_collect_iam_list_virtual_mfa_devices ADD COLUMN enable_date TIMESTAMP_NTZ ; diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index e4be6cb54..a8526dbc3 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -28,7 +28,8 @@ READER_EID = '' _SESSION_CACHE: dict = {} -_TASKS_PER_SECOND = 100 +_REQUEST_PACE_PER_SECOND = 100 +_MAX_BATCH_SIZE = 500 CONNECTION_OPTIONS = [ { @@ -758,7 +759,7 @@ 'children': [ { 'method': 'cloudtrail.get_trail_status', - 'args': {'TrailName': 'trail_arn'}, + 'args': {'Name': 'trail_arn'}, }, { 'method': 'cloudtrail.get_event_selectors', @@ -865,8 +866,9 @@ def process_response_items(coldict, page, db_entry=None): elif type(coldict) is dict: for response_key, colname in coldict.items(): - response_value = page.get(response_key) - db_entry.update(process_response_items(colname, response_value)) + if page: # e.g. sometimes get_login_profile returns None + response_value = page.get(response_key) + db_entry.update(process_response_items(colname, response_value)) return db_entry @@ -880,11 +882,12 @@ def process_aws_response(task, page): if task.account_id: base_entity['account_id'] = task.account_id + base_entity.update({v: task.args[k] for k, v in params.items()}) + if isinstance(page, Exception): yield DBEntry(base_entity) return - base_entity.update({v: task.args[k] for k, v in params.items()}) base_entity.update(process_response_items(response_coldict, page)) if 'ResponseMetadata' in page: @@ -973,10 +976,11 @@ def insert_list(name, values, table_name=None): async def aws_collect_task(task, wait=0.0, add_task=None): - log.info(f'processing {task}') - result_lists = defaultdict(list) if wait: await asyncio.sleep(wait) + + log.info(f'processing {task}') + result_lists = defaultdict(list) async for k, v in process_task(task, add_task): result_lists[k].append(v) return result_lists @@ -1037,11 +1041,12 @@ def add_task(t): while collection_tasks: coroutines = [ aws_collect_task( - t, wait=(float(i) / _TASKS_PER_SECOND), add_task=add_task + t, wait=(float(i) / _REQUEST_PACE_PER_SECOND), add_task=add_task ) - for i, t in enumerate(collection_tasks) + for i, t in enumerate(collection_tasks[:_MAX_BATCH_SIZE]) ] - collection_tasks = [] + del collection_tasks[:_MAX_BATCH_SIZE] + log.info(f'progress: starting {len(coroutines)}, queued {len(collection_tasks)}') all_results = defaultdict(list) for result_lists in await asyncio.gather(*coroutines): diff --git a/src/runners/helpers/db.py b/src/runners/helpers/db.py index 4bf62e05c..4327dacb9 100644 --- a/src/runners/helpers/db.py +++ b/src/runners/helpers/db.py @@ -342,7 +342,7 @@ def insert(table, values, overwrite=False, select="", columns=[], dryrun=False): # expected at most 16,384, got 169,667 for group in utils.groups_of(16384, values): num_rows_inserted += do_insert( - table, values, overwrite, select, columns, dryrun + table, group, overwrite, select, columns, dryrun )['number of rows inserted'] return {'number of rows inserted': num_rows_inserted} From 9990b1303a3730c9e6287c0333cbf729f732135d Mon Sep 17 00:00:00 2001 From: Andrey Fedorov Date: Tue, 3 Dec 2019 22:06:56 -0800 Subject: [PATCH 5/8] bugfixes --- src/connectors/aws_collect.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index a8526dbc3..41c148f48 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -849,7 +849,8 @@ def process_response_lists(coldict, page): yield process_response_items(colname[0], x, {}) if type(colname) is dict: - yield from process_response_lists(colname, response_value) + if response_value: + yield from process_response_lists(colname, response_value) def process_response_items(coldict, page, db_entry=None): @@ -884,7 +885,7 @@ def process_aws_response(task, page): base_entity.update({v: task.args[k] for k, v in params.items()}) - if isinstance(page, Exception): + if isinstance(page, (Exception, type(None))): yield DBEntry(base_entity) return From 23e404f454e7d492ea54093c7d9622db3625f33c Mon Sep 17 00:00:00 2001 From: Andrey Fedorov Date: Wed, 4 Dec 2019 09:49:19 -0800 Subject: [PATCH 6/8] finialize --- src/connectors/aws_collect.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index 41c148f48..a053903f8 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -980,7 +980,7 @@ async def aws_collect_task(task, wait=0.0, add_task=None): if wait: await asyncio.sleep(wait) - log.info(f'processing {task}') + # log.info(f'processing {task}') result_lists = defaultdict(list) async for k, v in process_task(task, add_task): result_lists[k].append(v) @@ -1017,23 +1017,23 @@ async def aioingest(table_name, options): if options.get('collect_apis') == 'all': collection_tasks = [ CollectTask(a['id'], method, {}) - for a in accounts for method in [ - # 'iam.generate_credential_report', + 'iam.generate_credential_report', 'iam.list_account_aliases', 'iam.get_account_summary', 'iam.get_account_password_policy', 'ec2.describe_instances', 'ec2.describe_security_groups', 'config.describe_configuration_recorders', - # 'iam.get_credential_report', 'kms.list_keys', 'iam.list_users', 'iam.list_policies', 'iam.list_virtual_mfa_devices', 's3.list_buckets', 'cloudtrail.describe_trails', + 'iam.get_credential_report', ] + for a in accounts ] def add_task(t): From 5cdad29388cedb3122f72a3c559c0d704cc1f2f0 Mon Sep 17 00:00:00 2001 From: Andrey Fedorov Date: Thu, 5 Dec 2019 13:35:22 -0800 Subject: [PATCH 7/8] run once every 3 hours --- src/connectors/aws_collect.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index a053903f8..7f3a2f92b 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -10,6 +10,7 @@ ) from collections import defaultdict, namedtuple import csv +from datetime import datetime from dateutil.parser import parse as parse_date import json import fire @@ -1039,6 +1040,7 @@ async def aioingest(table_name, options): def add_task(t): collection_tasks.append(t) + num_entires = 0 while collection_tasks: coroutines = [ aws_collect_task( @@ -1055,11 +1057,19 @@ def add_task(t): all_results[k] += vs for name, vs in all_results.items(): response = insert_list(name, vs) + num_entires += len(vs) log.info(f'finished {name} {response}') + return num_entries + + return 0 def ingest(table_name, options): - return asyncio.get_event_loop().run_until_complete(aioingest(table_name, options)) + now = datetime.now(). + if (now.hour % 3 == 0 and now.minute < 15): + return asyncio.get_event_loop().run_until_complete(aioingest(table_name, options)) + else: + log.info('not time yett') def main( From 49bbb7521a5b65329e95ae58ddaaff418939694a Mon Sep 17 00:00:00 2001 From: Andrey Fedorov Date: Thu, 5 Dec 2019 13:36:54 -0800 Subject: [PATCH 8/8] typo --- src/connectors/aws_collect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index 7f3a2f92b..dfb009e08 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -1065,7 +1065,7 @@ def add_task(t): return 0 def ingest(table_name, options): - now = datetime.now(). + now = datetime.now() if (now.hour % 3 == 0 and now.minute < 15): return asyncio.get_event_loop().run_until_complete(aioingest(table_name, options)) else: