Skip to content

Commit

Permalink
ec2_key/tests: add unit-tests (#1288) (#1381)
Browse files Browse the repository at this point in the history
[PR #1288/7e14dcf0 backport][stable-4] ec2_key/tests: add unit-tests 

This is a backport of PR #1288 as merged into main (7e14dcf).
SUMMARY


Add the unit-test coverage of the ec2_key module.
Refactor the module to reduce complexity and allow unit testing.


COMPONENT NAME

ec2_key
Latest unit-test coverage (86%): https://890cb34819f4a5031a6a-ac7bf798a2c5290001623a42300937df.ssl.cf5.rackcdn.com/1288/8f1100d82dc058131d0476ff7f82ee7f6eac5b7b/check/cloud-tox-py3/8b0e4b4/docs/coverage/plugins_modules_ec2_key_py.html

Reviewed-by: Mark Chappell
  • Loading branch information
patchback[bot] authored Feb 21, 2023
1 parent f0b5f09 commit ca26544
Show file tree
Hide file tree
Showing 3 changed files with 763 additions and 92 deletions.
3 changes: 3 additions & 0 deletions changelogs/fragments/unit-tests_test_ec2_key_only.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
minor_changes:
- "ec2_key - Add unit-tests coverage (https://github.com/ansible-collections/amazon.aws/pull/1288)."
238 changes: 146 additions & 92 deletions plugins/modules/ec2_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,140 +161,178 @@

from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule
from ansible_collections.amazon.aws.plugins.module_utils.core import is_boto3_error_code
from ansible_collections.amazon.aws.plugins.module_utils.core import scrub_none_parameters
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import AWSRetry
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ensure_ec2_tags
from ansible_collections.amazon.aws.plugins.module_utils.tagging import boto3_tag_specifications
from ansible_collections.amazon.aws.plugins.module_utils.tagging import boto3_tag_list_to_ansible_dict


def extract_key_data(key, key_type=None):
class Ec2KeyFailure(Exception):
def __init__(self, message=None, original_e=None):
super().__init__(message)
self.original_e = original_e
self.message = message


def _import_key_pair(ec2_client, name, key_material, tag_spec=None):
params = {
'KeyName': name,
'PublicKeyMaterial': to_bytes(key_material),
'TagSpecifications': tag_spec
}

params = scrub_none_parameters(params)

try:
key = ec2_client.import_key_pair(aws_retry=True, **params)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as err:
raise Ec2KeyFailure(err, "error importing key")
return key


def extract_key_data(key, key_type=None):
data = {
'name': key['KeyName'],
'fingerprint': key['KeyFingerprint'],
'id': key['KeyPairId'],
'tags': {},
'tags': boto3_tag_list_to_ansible_dict(key.get('Tags') or []),
# KeyMaterial is returned by create_key_pair, but not by describe_key_pairs
'private_key': key.get('KeyMaterial'),
# KeyType is only set by describe_key_pairs
'type': key.get('KeyType') or key_type
}
if 'Tags' in key:
data['tags'] = boto3_tag_list_to_ansible_dict(key['Tags'])
if 'KeyMaterial' in key:
data['private_key'] = key['KeyMaterial']
if 'KeyType' in key:
data['type'] = key['KeyType']
elif key_type:
data['type'] = key_type
return data


def get_key_fingerprint(module, ec2_client, key_material):

return scrub_none_parameters(data)


def get_key_fingerprint(check_mode, ec2_client, key_material):
'''
EC2's fingerprints are non-trivial to generate, so push this key
to a temporary name and make ec2 calculate the fingerprint for us.
http://blog.jbrowne.com/?p=23
https://forums.aws.amazon.com/thread.jspa?messageID=352828
'''

# find an unused name
name_in_use = True
while name_in_use:
random_name = "ansible-" + str(uuid.uuid4())
name_in_use = find_key_pair(module, ec2_client, random_name)

temp_key = _import_key_pair(module, ec2_client, random_name, key_material)
delete_key_pair(module, ec2_client, random_name, finish_task=False)
name_in_use = find_key_pair(ec2_client, random_name)
temp_key = _import_key_pair(ec2_client, random_name, key_material)
delete_key_pair(check_mode, ec2_client, random_name, finish_task=False)
return temp_key['KeyFingerprint']


def find_key_pair(module, ec2_client, name):

def find_key_pair(ec2_client, name):
try:
key = ec2_client.describe_key_pairs(aws_retry=True, KeyNames=[name])['KeyPairs'][0]
key = ec2_client.describe_key_pairs(aws_retry=True, KeyNames=[name])
except is_boto3_error_code('InvalidKeyPair.NotFound'):
return None
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as err: # pylint: disable=duplicate-except
module.fail_json_aws(err, msg="error finding keypair")
raise Ec2KeyFailure(err, "error finding keypair")
except IndexError:
key = None
return key

return key['KeyPairs'][0]

def create_key_pair(module, ec2_client, name, key_material, force, key_type):

tags = module.params.get('tags')
purge_tags = module.params.get('purge_tags')
key = find_key_pair(module, ec2_client, name)
tag_spec = boto3_tag_specifications(tags, ['key-pair'])
changed = False
if key:
if key_material and force:
new_fingerprint = get_key_fingerprint(module, ec2_client, key_material)
if key['KeyFingerprint'] != new_fingerprint:
changed = True
if not module.check_mode:
delete_key_pair(module, ec2_client, name, finish_task=False)
key = _import_key_pair(module, ec2_client, name, key_material, tag_spec)
key_data = extract_key_data(key)
module.exit_json(changed=True, key=key_data, msg="key pair updated")
if key_type and key_type != key['KeyType']:
changed = True
if not module.check_mode:
delete_key_pair(module, ec2_client, name, finish_task=False)
key = _create_key_pair(module, ec2_client, name, tag_spec, key_type)
key_data = extract_key_data(key, key_type)
module.exit_json(changed=True, key=key_data, msg="key pair updated")
changed |= ensure_ec2_tags(ec2_client, module, key['KeyPairId'], tags=tags, purge_tags=purge_tags)
key = find_key_pair(module, ec2_client, name)
key_data = extract_key_data(key)
module.exit_json(changed=changed, key=key_data, msg="key pair already exists")
else:
# key doesn't exist, create it now
key_data = None
if not module.check_mode:
if key_material:
key = _import_key_pair(module, ec2_client, name, key_material, tag_spec)
else:
key = _create_key_pair(module, ec2_client, name, tag_spec, key_type)
key_data = extract_key_data(key, key_type)
module.exit_json(changed=True, key=key_data, msg="key pair created")
def _create_key_pair(ec2_client, name, tag_spec, key_type):
params = {
'KeyName': name,
'TagSpecifications': tag_spec,
'KeyType': key_type,
}

params = scrub_none_parameters(params)

def _create_key_pair(module, ec2_client, name, tag_spec, key_type):
params = dict(KeyName=name)
if tag_spec:
params['TagSpecifications'] = tag_spec
if key_type:
params['KeyType'] = key_type
try:
key = ec2_client.create_key_pair(aws_retry=True, **params)
except botocore.exceptions.ClientError as err:
module.fail_json_aws(err, msg="error creating key")
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as err:
raise Ec2KeyFailure(err, "error creating key")
return key


def _import_key_pair(module, ec2_client, name, key_material, tag_spec=None):
params = dict(KeyName=name, PublicKeyMaterial=to_bytes(key_material))
if tag_spec:
params['TagSpecifications'] = tag_spec
def create_new_key_pair(ec2_client, name, key_material, key_type, tags, check_mode):
'''
key does not exist, we create new key
'''
if check_mode:
return {'changed': True, 'key': None, 'msg': 'key pair created'}

tag_spec = boto3_tag_specifications(tags, ['key-pair'])
if key_material:
key = _import_key_pair(ec2_client, name, key_material, tag_spec)
else:
key = _create_key_pair(ec2_client, name, tag_spec, key_type)
key_data = extract_key_data(key, key_type)

result = {'changed': True, 'key': key_data, 'msg': 'key pair created'}
return result


def update_key_pair_by_key_material(check_mode, ec2_client, name, key, key_material, tag_spec):
if check_mode:
return {'changed': True, 'key': None, 'msg': 'key pair updated'}
new_fingerprint = get_key_fingerprint(check_mode, ec2_client, key_material)
if key['KeyFingerprint'] != new_fingerprint:
delete_key_pair(check_mode, ec2_client, name, finish_task=False)
key = _import_key_pair(ec2_client, name, key_material, tag_spec)
key_data = extract_key_data(key)
return {'changed': True, 'key': key_data, 'msg': "key pair updated"}


def update_key_pair_by_key_type(check_mode, ec2_client, name, key_type, tag_spec):
if check_mode:
return {'changed': True, 'key': None, 'msg': 'key pair updated'}
else:
delete_key_pair(check_mode, ec2_client, name, finish_task=False)
key = _create_key_pair(ec2_client, name, tag_spec, key_type)
key_data = extract_key_data(key, key_type)
return {'changed': True, 'key': key_data, 'msg': "key pair updated"}


def _delete_key_pair(ec2_client, key_name):
try:
key = ec2_client.import_key_pair(aws_retry=True, **params)
except botocore.exceptions.ClientError as err:
module.fail_json_aws(err, msg="error importing key")
return key
ec2_client.delete_key_pair(aws_retry=True, KeyName=key_name)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as err:
raise Ec2KeyFailure(err, "error deleting key")


def delete_key_pair(module, ec2_client, name, finish_task=True):
def delete_key_pair(check_mode, ec2_client, name, finish_task=True):
key = find_key_pair(ec2_client, name)

key = find_key_pair(module, ec2_client, name)
if key:
if not module.check_mode:
try:
ec2_client.delete_key_pair(aws_retry=True, KeyName=name)
except botocore.exceptions.ClientError as err:
module.fail_json_aws(err, msg="error deleting key")
if key and check_mode:
result = {'changed': True, 'key': None, 'msg': 'key deleted'}
elif not key:
result = {'key': None, 'msg': 'key did not exist'}
else:
_delete_key_pair(ec2_client, name)
if not finish_task:
return
module.exit_json(changed=True, key=None, msg="key deleted")
module.exit_json(key=None, msg="key did not exist")
result = {'changed': True, 'key': None, 'msg': 'key deleted'}

return result


def handle_existing_key_pair_update(module, ec2_client, name, key):
key_material = module.params.get('key_material')
force = module.params.get('force')
key_type = module.params.get('key_type')
tags = module.params.get('tags')
purge_tags = module.params.get('purge_tags')
tag_spec = boto3_tag_specifications(tags, ['key-pair'])
check_mode = module.check_mode
if key_material and force:
result = update_key_pair_by_key_material(check_mode, ec2_client, name, key, key_material, tag_spec)
elif key_type and key_type != key['KeyType']:
result = update_key_pair_by_key_type(check_mode, ec2_client, name, key_type, tag_spec)
else:
changed = False
changed |= ensure_ec2_tags(ec2_client, module, key['KeyPairId'], tags=tags, purge_tags=purge_tags)
key = find_key_pair(ec2_client, name)
key_data = extract_key_data(key)
result = {'changed': changed, 'key': key_data, 'msg': 'key pair alreday exists'}
return result


def main():
Expand Down Expand Up @@ -330,16 +368,32 @@ def main():
name = module.params['name']
state = module.params.get('state')
key_material = module.params.get('key_material')
force = module.params.get('force')
key_type = module.params.get('key_type')
tags = module.params.get('tags')

result = {}

if key_type:
module.require_botocore_at_least('1.21.23', reason='to set the key_type for a keypair')
try:
if state == 'absent':
result = delete_key_pair(module.check_mode, ec2_client, name)

elif state == 'present':
# check if key already exists
key = find_key_pair(ec2_client, name)
if key:
result = handle_existing_key_pair_update(module, ec2_client, name, key)
else:
result = create_new_key_pair(ec2_client, name, key_material, key_type, tags, module.check_mode)

except Ec2KeyFailure as e:
if e.original_e:
module.fail_json_aws(e.original_e, e.message)
else:
module.fail_json(e.message)

if state == 'absent':
delete_key_pair(module, ec2_client, name)
elif state == 'present':
create_key_pair(module, ec2_client, name, key_material, force, key_type)
module.exit_json(**result)


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit ca26544

Please sign in to comment.