Skip to content

Commit

Permalink
break up get_target_from_rule (#1221)
Browse files Browse the repository at this point in the history
ec2_security_group - refacter get_target_from_rule()

SUMMARY
refacter get_target_from_rule to bring down the complexity score
Builds on top of #1214
ISSUE TYPE

Feature Pull Request

COMPONENT NAME
ec2_security_group
ADDITIONAL INFORMATION

Reviewed-by: Alina Buzachis <None>
  • Loading branch information
tremble authored Nov 2, 2022
1 parent 9b569d3 commit add1f5b
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 129 deletions.
2 changes: 2 additions & 0 deletions changelogs/fragments/1221-ec2_security_group.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
minor_changes:
- ec2_security_group - refacter ``get_target_from_rule()`` (https://github.com/ansible-collections/amazon.aws/pull/1221).
277 changes: 163 additions & 114 deletions plugins/modules/ec2_security_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,6 @@
import re
from collections import namedtuple
from copy import deepcopy
from ipaddress import IPv6Network
from ipaddress import ip_network
from time import sleep

Expand Down Expand Up @@ -694,6 +693,116 @@ def validate_rule(rule):
raise SecurityGroupError(msg='Specify proto: icmp or icmpv6 when using icmp_type/icmp_code')


def _target_from_rule_with_group_id(rule, groups):
owner_id = current_account_id
FOREIGN_SECURITY_GROUP_REGEX = r'^([^/]+)/?(sg-\S+)?/(\S+)'
foreign_rule = re.match(FOREIGN_SECURITY_GROUP_REGEX, rule['group_id'])

if not foreign_rule:
return 'group', (owner_id, rule['group_id'], None), False

# this is a foreign Security Group. Since you can't fetch it you must create an instance of it
# Matches on groups like amazon-elb/sg-5a9c116a/amazon-elb-sg, amazon-elb/amazon-elb-sg,
# and peer-VPC groups like 0987654321/sg-1234567890/example
owner_id, group_id, group_name = foreign_rule.groups()
group_instance = dict(UserId=owner_id, GroupId=group_id, GroupName=group_name)
groups[group_id] = group_instance
groups[group_name] = group_instance
if group_id and group_name:
if group_name.startswith('amazon-'):
# amazon-elb and amazon-prefix rules don't need group_id specified,
group_id = None
else:
# For cross-VPC references we'll use group_id as it is more specific
group_name = None
return 'group', (owner_id, group_id, group_name), False


def _lookup_target_or_fail(client, group_name, vpc_id, groups, msg):
owner_id = current_account_id
filters = {'group-name': group_name}
if vpc_id:
filters['vpc-id'] = vpc_id

filters = ansible_dict_to_boto3_filter_list(filters)
try:
found_group = get_security_groups_with_backoff(client, Filters=filters).get('SecurityGroups', [])[0]
except (is_boto3_error_code('InvalidGroup.NotFound'), IndexError):
raise SecurityGroupError(msg=msg)
except (BotoCoreError, ClientError) as e: # pylint: disable=duplicate-except
raise SecurityGroupError(msg="Failed to get security group", e=e)

group_id = found_group['GroupId']
groups[group_id] = found_group
groups[group_name] = found_group
return 'group', (owner_id, group_id, None), False


def _create_target_from_rule(client, rule, groups, vpc_id, check_mode):
owner_id = current_account_id
# We can't create a group in check mode...
if check_mode:
return 'group', (owner_id, None, None), True

group_name = rule['group_name']

try:
created_group = _create_security_group_with_wait(client, group_name, rule['group_desc'], vpc_id)
except is_boto3_error_code('InvalidGroup.Duplicate'):
# The group exists, but didn't show up in any of our previous describe-security-groups calls
# Try searching on a filter for the name, and allow a retry window for AWS to update
# the model on their end.
fail_msg = "Could not create or use existing group '{0}' in rule {1}. " \
"Make sure the group exists and try using the group_id " \
"instead of the name".format(group_name, rule)
return _lookup_target_or_fail(client, group_name, vpc_id, groups, fail_msg)
except (BotoCoreError, ClientError) as e:
raise SecurityGroupError(msg="Failed to create security group '{0}' in rule {1}", e=e)

group_id = created_group['GroupId']
groups[group_id] = created_group
groups[group_name] = created_group

return 'group', (owner_id, group_id, None), True


def _target_from_rule_with_group_name(client, rule, name, group, groups, vpc_id, check_mode):
group_name = rule['group_name']
owner_id = current_account_id
if group_name == name:
# Simplest case, the rule references itself
group_id = group['GroupId']
groups[group_id] = group
groups[group_name] = group
return 'group', (owner_id, group_id, None), False

# Already cached groups
if group_name in groups and group.get('VpcId') and groups[group_name].get('VpcId'):
# both are VPC groups, this is ok
group_id = groups[group_name]['GroupId']
return 'group', (owner_id, group_id, None), False

if group_name in groups and not (group.get('VpcId') or groups[group_name].get('VpcId')):
# both are EC2 classic, this is ok
group_id = groups[group_name]['GroupId']
return 'group', (owner_id, group_id, None), False

# if we got here, either the target group does not exist, or there
# is a mix of EC2 classic + VPC groups. Mixing of EC2 classic + VPC
# is bad, so we have to create a new SG because no compatible group
# exists

# Without a group description we can't create a new group, try looking up the group, or fail
# with a descriptive error message
if not rule.get('group_desc', '').strip():
# retry describing the group
fail_msg = "group '{0}' not found and would be automatically created by rule {1} but " \
"no description was provided".format(group_name, rule)
return _lookup_target_or_fail(client, group_name, vpc_id, groups, fail_msg)

return _create_target_from_rule(client, rule, groups, vpc_id, check_mode)


def get_target_from_rule(module, client, rule, name, group, groups, vpc_id):
"""
Returns tuple of (target_type, target, group_created) after validating rule params.
Expand All @@ -711,101 +820,21 @@ def get_target_from_rule(module, client, rule, name, group, groups, vpc_id):
values that will be compared to current_rules (from current_ingress and
current_egress) in wait_for_rule_propagation().
"""
FOREIGN_SECURITY_GROUP_REGEX = r'^([^/]+)/?(sg-\S+)?/(\S+)'
owner_id = current_account_id
group_id = None
group_name = None
target_group_created = False

try:
validate_rule(rule)
if rule.get('group_id'):
return _target_from_rule_with_group_id(rule, groups)
elif 'group_name' in rule:
return _target_from_rule_with_group_name(client, rule, name, group, groups, vpc_id, module.check_mode)
elif 'cidr_ip' in rule:
return 'ipv4', validate_ip(module, rule['cidr_ip']), False
elif 'cidr_ipv6' in rule:
return 'ipv6', validate_ip(module, rule['cidr_ipv6']), False
elif 'ip_prefix' in rule:
return 'ip_prefix', rule['ip_prefix'], False
except SecurityGroupError as e:
e.fail(module)

if rule.get('group_id') and re.match(FOREIGN_SECURITY_GROUP_REGEX, rule['group_id']):
# this is a foreign Security Group. Since you can't fetch it you must create an instance of it
# Matches on groups like amazon-elb/sg-5a9c116a/amazon-elb-sg, amazon-elb/amazon-elb-sg,
# and peer-VPC groups like 0987654321/sg-1234567890/example
owner_id, group_id, group_name = re.match(FOREIGN_SECURITY_GROUP_REGEX, rule['group_id']).groups()
group_instance = dict(UserId=owner_id, GroupId=group_id, GroupName=group_name)
groups[group_id] = group_instance
groups[group_name] = group_instance
if group_id and group_name:
if group_name.startswith('amazon-'):
# amazon-elb and amazon-prefix rules don't need group_id specified,
group_id = None
else:
# group_id/group_name are mutually exclusive - give group_id more precedence as it is more specific
group_name = None
return 'group', (owner_id, group_id, group_name), False
elif 'group_id' in rule:
return 'group', (owner_id, rule['group_id'], None), False
elif 'group_name' in rule:
group_name = rule['group_name']
if group_name == name:
group_id = group['GroupId']
groups[group_id] = group
groups[group_name] = group
elif group_name in groups and group.get('VpcId') and groups[group_name].get('VpcId'):
# both are VPC groups, this is ok
group_id = groups[group_name]['GroupId']
elif group_name in groups and not (group.get('VpcId') or groups[group_name].get('VpcId')):
# both are EC2 classic, this is ok
group_id = groups[group_name]['GroupId']
else:
auto_group = None
filters = {'group-name': group_name}
if vpc_id:
filters['vpc-id'] = vpc_id
# if we got here, either the target group does not exist, or there
# is a mix of EC2 classic + VPC groups. Mixing of EC2 classic + VPC
# is bad, so we have to create a new SG because no compatible group
# exists
if not rule.get('group_desc', '').strip():
# retry describing the group once
try:
auto_group = get_security_groups_with_backoff(client, Filters=ansible_dict_to_boto3_filter_list(filters)).get('SecurityGroups', [])[0]
except (is_boto3_error_code('InvalidGroup.NotFound'), IndexError):
module.fail_json(msg="group %s will be automatically created by rule %s but "
"no description was provided" % (group_name, rule))
except ClientError as e: # pylint: disable=duplicate-except
module.fail_json_aws(e)
elif not module.check_mode:
params = dict(GroupName=group_name, Description=rule['group_desc'])
if vpc_id:
params['VpcId'] = vpc_id
try:
auto_group = client.create_security_group(aws_retry=True, **params)
get_waiter(
client, 'security_group_exists',
).wait(
GroupIds=[auto_group['GroupId']],
)
except is_boto3_error_code('InvalidGroup.Duplicate'):
# The group exists, but didn't show up in any of our describe-security-groups calls
# Try searching on a filter for the name, and allow a retry window for AWS to update
# the model on their end.
try:
auto_group = get_security_groups_with_backoff(client, Filters=ansible_dict_to_boto3_filter_list(filters)).get('SecurityGroups', [])[0]
except IndexError:
module.fail_json(msg="Could not create or use existing group '{0}' in rule. Make sure the group exists".format(group_name))
except ClientError as e:
module.fail_json_aws(
e,
msg="Could not create or use existing group '{0}' in rule. Make sure the group exists".format(group_name))
if auto_group is not None:
group_id = auto_group['GroupId']
groups[group_id] = auto_group
groups[group_name] = auto_group
target_group_created = True
return 'group', (owner_id, group_id, None), target_group_created
elif 'cidr_ip' in rule:
return 'ipv4', validate_ip(module, rule['cidr_ip']), False
elif 'cidr_ipv6' in rule:
return 'ipv6', validate_ip(module, rule['cidr_ipv6']), False
elif 'ip_prefix' in rule:
return 'ip_prefix', rule['ip_prefix'], False

module.fail_json(msg="Could not match target for rule", failed_rule=rule)


Expand Down Expand Up @@ -977,30 +1006,36 @@ def authorize(client, module, ip_permissions, group_id, rule_type):

def validate_ip(module, cidr_ip):
split_addr = cidr_ip.split('/')
if len(split_addr) == 2:
# this_ip is a IPv4 or IPv6 CIDR that may or may not have host bits set
# Get the network bits if IPv4, and validate if IPv6.
try:
ip = to_subnet(split_addr[0], split_addr[1])
if ip != cidr_ip:
module.warn("One of your CIDR addresses ({0}) has host bits set. To get rid of this warning, "
"check the network mask and make sure that only network bits are set: {1}.".format(
cidr_ip, ip))
except ValueError:
# to_subnet throws a ValueError on IPv6 networks, so we should be working with v6 if we get here
try:
isinstance(ip_network(to_text(cidr_ip)), IPv6Network)
ip = cidr_ip
except ValueError:
# If a host bit is set on something other than a /128, IPv6Network will throw a ValueError
# The ipv6_cidr in this case probably looks like "2001:DB8:A0B:12F0::1/64" and we just want the network bits
ip6 = to_ipv6_subnet(split_addr[0]) + "/" + split_addr[1]
if ip6 != cidr_ip:
module.warn("One of your IPv6 CIDR addresses ({0}) has host bits set. To get rid of this warning, "
"check the network mask and make sure that only network bits are set: {1}.".format(cidr_ip, ip6))
return ip6
if len(split_addr) != 2:
return cidr_ip

try:
ip = ip_network(to_text(cidr_ip))
return str(ip)
except ValueError:
# If a host bit is incorrectly set, ip_network will throw an error at us,
# we'll continue, convert the address to a CIDR AWS will accept, but issue a warning.
pass

# Try evaluating as an IPv4 network, it'll throw a ValueError if it can't parse cidr_ip as an
# IPv4 network
try:
ip = to_subnet(split_addr[0], split_addr[1])
module.warn("One of your CIDR addresses ({0}) has host bits set. To get rid of this warning, "
"check the network mask and make sure that only network bits are set: {1}.".format(cidr_ip, ip))
return ip
return cidr_ip
except ValueError:
pass

# Try again, evaluating as an IPv6 network.
try:
ip6 = to_ipv6_subnet(split_addr[0]) + "/" + split_addr[1]
module.warn("One of your IPv6 CIDR addresses ({0}) has host bits set. To get rid of this warning, "
"check the network mask and make sure that only network bits are set: {1}.".format(cidr_ip, ip6))
return ip6
except ValueError:
module.warn("Unable to parse CIDR ({0}).".format(cidr_ip))
return cidr_ip


def update_tags(client, module, group_id, current_tags, tags, purge_tags):
Expand Down Expand Up @@ -1048,6 +1083,20 @@ def update_rule_descriptions(module, client, group_id, present_ingress, named_tu
return changed


def _create_security_group_with_wait(client, name, description, vpc_id):
params = dict(GroupName=name, Description=description)
if vpc_id:
params['VpcId'] = vpc_id

created_group = client.create_security_group(aws_retry=True, **params)
get_waiter(
client, 'security_group_exists',
).wait(
GroupIds=[created_group['GroupId']],
)
return created_group


def create_security_group(client, module, name, description, vpc_id):
if not module.check_mode:
params = dict(GroupName=name, Description=description)
Expand Down
Loading

0 comments on commit add1f5b

Please sign in to comment.