From b90f702b6397acfecc8eb4f870a8dd530df9313c Mon Sep 17 00:00:00 2001 From: blee Date: Tue, 10 Aug 2021 15:09:05 -0400 Subject: [PATCH 1/7] add tests for abilities api --- tests/api/v2/handlers/test_abilities_api.py | 76 +++++++++++++++++++ tests/conftest.py | 83 +++++++++++++++++++++ 2 files changed, 159 insertions(+) create mode 100644 tests/api/v2/handlers/test_abilities_api.py diff --git a/tests/api/v2/handlers/test_abilities_api.py b/tests/api/v2/handlers/test_abilities_api.py new file mode 100644 index 000000000..3784e5ebc --- /dev/null +++ b/tests/api/v2/handlers/test_abilities_api.py @@ -0,0 +1,76 @@ +import pytest + +from http import HTTPStatus + +from app.objects.c_ability import Ability +from app.objects.secondclass.c_executor import Executor, ExecutorSchema +from app.utility.base_service import BaseService + + +@pytest.fixture +def setup_abilities_api_test(loop, api_client, executor): + test_executor_linux = executor(name='sh', platform='linux') + test_ability = Ability(ability_id='123', name='Test Ability', executors=[test_executor_linux]) + loop.run_until_complete(BaseService.get_service('data_svc').store(test_ability)) + + +@pytest.mark.usefixtures( + "setup_abilities_api_test" +) +class TestAbilitiesApi: + async def test_get_abilities(self, api_client, api_cookies): + resp = await api_client.get('/api/v2/abilities', cookies=api_cookies) + abilities_list = await resp.json() + assert len(abilities_list) == 1 + ability_dict = abilities_list[0] + assert ability_dict['ability_id'] == '123' + assert ability_dict['name'] == 'Test Ability' + + async def test_get_ability_by_id(self, api_client, api_cookies): + resp = await api_client.get('/api/v2/abilities/123', cookies=api_cookies) + ability_dict = await resp.json() + assert ability_dict['ability_id'] == '123' + assert ability_dict['name'] == 'Test Ability' + + async def test_unauthorized_get_ability_by_id(self, api_client): + resp = await api_client.get('/api/v2/abilities/123') + assert resp.status == HTTPStatus.UNAUTHORIZED + + async def test_delete_ability_by_id(self, api_client, api_cookies): + ability_exists = await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '123'}) + assert ability_exists + resp = await api_client.delete('/api/v2/abilities/123', cookies=api_cookies) + assert resp.status == HTTPStatus.NO_CONTENT + ability_exists = await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '123'}) + assert not ability_exists + + async def test_create_ability(self, api_client, api_cookies, mocker, async_return): + test_executor_linux = Executor(name='sh', platform='linux', command='whoami') + payload = dict(name='new test ability', ability_id='456', tactic='collection', technique_name='collection', + technique_id='1', executors=[ExecutorSchema().dump(test_executor_linux)]) + resp = await api_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + assert resp.status == HTTPStatus.OK + ability_data = await resp.json() + assert ability_data.get('name') == "new test ability" + ability_exists = await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '456'}) + assert ability_exists + + async def test_update_ability(self, api_client, api_cookies): + payload = dict(name='an updated test ability', tactic='defense-evasion') + resp = await api_client.patch('/api/v2/abilities/123', cookies=api_cookies, json=payload) + assert resp.status == HTTPStatus.OK + ability = (await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '123'}))[0] + assert ability.name == payload['name'] + assert ability.tactic == payload['tactic'] + + async def test_replace_ability(self, api_client, api_cookies): + test_executor_linux = Executor(name='sh', platform='linux', command='whoami') + payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', + executors=[ExecutorSchema().dump(test_executor_linux)]) + resp = await api_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload) + assert resp.status == HTTPStatus.OK + ability = await resp.json() + assert ability['name'] == payload['name'] + assert ability['technique_name'] == payload['technique_name'] + assert ability['technique_id'] == payload['technique_id'] + assert ability['tactic'] == payload['tactic'] diff --git a/tests/conftest.py b/tests/conftest.py index 9da375768..5b90eff31 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,7 @@ +import asyncio import os.path +import aiohttp_apispec import pytest import random import string @@ -7,7 +9,17 @@ import yaml from unittest import mock +from aiohttp_apispec import validation_middleware +from aiohttp import web +from pathlib import Path + +from app import version +from app.api.rest_api import RestApi +from app.api.v2.handlers.ability_api import AbilityApi +from app.api.v2.responses import apispec_request_validation_middleware, json_request_validation_middleware +from app.api.v2.security import authentication_required_middleware_factory from app.objects.c_obfuscator import Obfuscator +from app.service.auth_svc import AuthService from app.utility.base_world import BaseWorld from app.service.app_svc import AppService from app.service.data_svc import DataService @@ -243,3 +255,74 @@ def _agent_profile(paw=None, group='red', platform='linux', executors=None, priv ) return _agent_profile + + +@pytest.fixture +def api_client(loop, aiohttp_client): + def make_app(svcs): + app = web.Application( + middlewares=[ + authentication_required_middleware_factory(svcs['auth_svc']), + json_request_validation_middleware + ] + ) + AbilityApi(svcs).add_routes(app) + return app + + async def initialize(): + with open(Path(__file__).parents[1] / 'conf' / 'default.yml', 'r') as fle: + BaseWorld.apply_config('main', yaml.safe_load(fle)) + with open(Path(__file__).parents[1] / 'conf' / 'payloads.yml', 'r') as fle: + BaseWorld.apply_config('payloads', yaml.safe_load(fle)) + + app_svc = AppService(web.Application(client_max_size=5120 ** 2)) + _ = DataService() + _ = RestService() + _ = PlanningService() + _ = LearningService() + auth_svc = AuthService() + _ = ContactService() + _ = FileSvc() + _ = EventService() + services = app_svc.get_services() + os.chdir(str(Path(__file__).parents[1])) + + await app_svc.register_contacts() + await app_svc.load_plugins(['sandcat', 'ssl']) + _ = await RestApi(services).enable() + await auth_svc.apply(app_svc.application, auth_svc.get_config('users')) + await auth_svc.set_login_handlers(services) + + app_svc.register_subapp('/api/v2', make_app(svcs=services)) + aiohttp_apispec.setup_aiohttp_apispec( + app=app_svc.application, + title='CALDERA', + version=version.get_version(), + swagger_path='/api/docs', + url='/api/docs/swagger.json', + static_path='/static/swagger' + ) + app_svc.application.middlewares.append(apispec_request_validation_middleware) + app_svc.application.middlewares.append(validation_middleware) + + return app_svc.application + + app = loop.run_until_complete(initialize()) + return loop.run_until_complete(aiohttp_client(app)) + + +@pytest.fixture +def api_cookies(loop, api_client): + async def get_cookie(): + r = await api_client.post('/enter', allow_redirects=False, data=dict(username='admin', password='admin')) + return r.cookies + return loop.run_until_complete(get_cookie()) + + +@pytest.fixture +def async_return(): + def _async_return(return_param): + f = asyncio.Future() + f.set_result(return_param) + return f + return _async_return From 4e5cbb79e5ddcbe193ee0d0d65047de1f9e9f12d Mon Sep 17 00:00:00 2001 From: blee Date: Thu, 12 Aug 2021 15:57:21 -0400 Subject: [PATCH 2/7] test additional endpoints w/o authorization --- tests/api/v2/handlers/test_abilities_api.py | 31 +++++++++++++++------ 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/tests/api/v2/handlers/test_abilities_api.py b/tests/api/v2/handlers/test_abilities_api.py index 3784e5ebc..72d8293e4 100644 --- a/tests/api/v2/handlers/test_abilities_api.py +++ b/tests/api/v2/handlers/test_abilities_api.py @@ -26,6 +26,10 @@ async def test_get_abilities(self, api_client, api_cookies): assert ability_dict['ability_id'] == '123' assert ability_dict['name'] == 'Test Ability' + async def test_unauthorized_get_abilities(self, api_client): + resp = await api_client.get('/api/v2/abilities') + assert resp.status == HTTPStatus.UNAUTHORIZED + async def test_get_ability_by_id(self, api_client, api_cookies): resp = await api_client.get('/api/v2/abilities/123', cookies=api_cookies) ability_dict = await resp.json() @@ -36,14 +40,6 @@ async def test_unauthorized_get_ability_by_id(self, api_client): resp = await api_client.get('/api/v2/abilities/123') assert resp.status == HTTPStatus.UNAUTHORIZED - async def test_delete_ability_by_id(self, api_client, api_cookies): - ability_exists = await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '123'}) - assert ability_exists - resp = await api_client.delete('/api/v2/abilities/123', cookies=api_cookies) - assert resp.status == HTTPStatus.NO_CONTENT - ability_exists = await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '123'}) - assert not ability_exists - async def test_create_ability(self, api_client, api_cookies, mocker, async_return): test_executor_linux = Executor(name='sh', platform='linux', command='whoami') payload = dict(name='new test ability', ability_id='456', tactic='collection', technique_name='collection', @@ -55,6 +51,13 @@ async def test_create_ability(self, api_client, api_cookies, mocker, async_retur ability_exists = await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '456'}) assert ability_exists + async def test_unauthorized_create_ability(self, api_client): + test_executor_linux = Executor(name='sh', platform='linux', command='whoami') + payload = dict(name='new test ability', ability_id='456', tactic='collection', technique_name='collection', + technique_id='1', executors=[ExecutorSchema().dump(test_executor_linux)]) + resp = await api_client.post('/api/v2/abilities', json=payload) + assert resp.status == HTTPStatus.UNAUTHORIZED + async def test_update_ability(self, api_client, api_cookies): payload = dict(name='an updated test ability', tactic='defense-evasion') resp = await api_client.patch('/api/v2/abilities/123', cookies=api_cookies, json=payload) @@ -63,6 +66,11 @@ async def test_update_ability(self, api_client, api_cookies): assert ability.name == payload['name'] assert ability.tactic == payload['tactic'] + async def test_unauthorized_update_ability(self, api_client): + payload = dict(name='an updated test ability', tactic='defense-evasion') + resp = await api_client.patch('/api/v2/abilities/123', json=payload) + assert resp.status == HTTPStatus.UNAUTHORIZED + async def test_replace_ability(self, api_client, api_cookies): test_executor_linux = Executor(name='sh', platform='linux', command='whoami') payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', @@ -74,3 +82,10 @@ async def test_replace_ability(self, api_client, api_cookies): assert ability['technique_name'] == payload['technique_name'] assert ability['technique_id'] == payload['technique_id'] assert ability['tactic'] == payload['tactic'] + + async def test_unauthorized_repalce_ability(self, api_client): + test_executor_linux = Executor(name='sh', platform='linux', command='whoami') + payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', + executors=[ExecutorSchema().dump(test_executor_linux)]) + resp = await api_client.put('/api/v2/abilities/123', json=payload) + assert resp.status == HTTPStatus.UNAUTHORIZED From 45058ad41b79421635bdc802443adbf1ca8f4b17 Mon Sep 17 00:00:00 2001 From: blee Date: Tue, 17 Aug 2021 16:12:48 -0400 Subject: [PATCH 3/7] add tests for nonexistent abilities --- tests/api/v2/handlers/test_abilities_api.py | 88 ++++++++++++++++----- 1 file changed, 68 insertions(+), 20 deletions(-) diff --git a/tests/api/v2/handlers/test_abilities_api.py b/tests/api/v2/handlers/test_abilities_api.py index 72d8293e4..000b796d1 100644 --- a/tests/api/v2/handlers/test_abilities_api.py +++ b/tests/api/v2/handlers/test_abilities_api.py @@ -8,38 +8,45 @@ @pytest.fixture -def setup_abilities_api_test(loop, api_client, executor): - test_executor_linux = executor(name='sh', platform='linux') - test_ability = Ability(ability_id='123', name='Test Ability', executors=[test_executor_linux]) - loop.run_until_complete(BaseService.get_service('data_svc').store(test_ability)) +def test_ability(loop, api_client, executor): + executor_linux = executor(name='sh', platform='linux') + ability = Ability(ability_id='123', name='Test Ability', executors=[executor_linux], + technique_name='collection', technique_id='1') + loop.run_until_complete(BaseService.get_service('data_svc').store(ability)) + return ability -@pytest.mark.usefixtures( - "setup_abilities_api_test" -) class TestAbilitiesApi: - async def test_get_abilities(self, api_client, api_cookies): + async def test_get_abilities(self, api_client, api_cookies, test_ability): resp = await api_client.get('/api/v2/abilities', cookies=api_cookies) abilities_list = await resp.json() assert len(abilities_list) == 1 ability_dict = abilities_list[0] - assert ability_dict['ability_id'] == '123' - assert ability_dict['name'] == 'Test Ability' + assert ability_dict['ability_id'] == test_ability.ability_id + assert ability_dict['name'] == test_ability.name + assert ability_dict['technique_name'] == test_ability.technique_name + assert len(ability_dict['executors']) == 1 - async def test_unauthorized_get_abilities(self, api_client): + async def test_unauthorized_get_abilities(self, api_client, test_ability): resp = await api_client.get('/api/v2/abilities') assert resp.status == HTTPStatus.UNAUTHORIZED - async def test_get_ability_by_id(self, api_client, api_cookies): + async def test_get_ability_by_id(self, api_client, api_cookies, test_ability): resp = await api_client.get('/api/v2/abilities/123', cookies=api_cookies) ability_dict = await resp.json() - assert ability_dict['ability_id'] == '123' - assert ability_dict['name'] == 'Test Ability' + assert ability_dict['ability_id'] == test_ability.ability_id + assert ability_dict['name'] == test_ability.name + assert ability_dict['technique_name'] == test_ability.technique_name + assert len(ability_dict['executors']) == 1 - async def test_unauthorized_get_ability_by_id(self, api_client): + async def test_unauthorized_get_ability_by_id(self, api_client, test_ability): resp = await api_client.get('/api/v2/abilities/123') assert resp.status == HTTPStatus.UNAUTHORIZED + async def test_get_nonexistent_ability_by_id(self, api_client, api_cookies): + resp = await api_client.get('/api/v2/abilities/123', cookies=api_cookies) + assert resp.status == HTTPStatus.NOT_FOUND + async def test_create_ability(self, api_client, api_cookies, mocker, async_return): test_executor_linux = Executor(name='sh', platform='linux', command='whoami') payload = dict(name='new test ability', ability_id='456', tactic='collection', technique_name='collection', @@ -47,7 +54,9 @@ async def test_create_ability(self, api_client, api_cookies, mocker, async_retur resp = await api_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) assert resp.status == HTTPStatus.OK ability_data = await resp.json() - assert ability_data.get('name') == "new test ability" + assert ability_data.get('name') == payload['name'] + assert ability_data.get('ability_id') == payload['ability_id'] + assert ability_data.get('tactic') == payload['tactic'] ability_exists = await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '456'}) assert ability_exists @@ -58,20 +67,40 @@ async def test_unauthorized_create_ability(self, api_client): resp = await api_client.post('/api/v2/abilities', json=payload) assert resp.status == HTTPStatus.UNAUTHORIZED - async def test_update_ability(self, api_client, api_cookies): + async def test_create_duplicate_ability(self, api_client, api_cookies, mocker, async_return, test_ability): + test_executor_linux = Executor(name='sh', platform='linux', command='whoami') + payload = dict(name='new test ability', ability_id='123', tactic='collection', technique_name='collection', + technique_id='1', executors=[ExecutorSchema().dump(test_executor_linux)]) + resp = await api_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + assert resp.status == HTTPStatus.BAD_REQUEST + + async def test_create_invalid_ability(self, api_client, api_cookies, mocker, async_return, test_ability): + payload = dict(name='new test ability', ability_id='123', technique_name='collection', + technique_id='1', executors=[]) + resp = await api_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + assert resp.status == HTTPStatus.BAD_REQUEST + + async def test_update_ability(self, api_client, api_cookies, test_ability): payload = dict(name='an updated test ability', tactic='defense-evasion') resp = await api_client.patch('/api/v2/abilities/123', cookies=api_cookies, json=payload) assert resp.status == HTTPStatus.OK ability = (await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '123'}))[0] assert ability.name == payload['name'] assert ability.tactic == payload['tactic'] + assert ability.ability_id == test_ability.ability_id + assert ability.description == test_ability.description - async def test_unauthorized_update_ability(self, api_client): + async def test_unauthorized_update_ability(self, api_client, test_ability): payload = dict(name='an updated test ability', tactic='defense-evasion') resp = await api_client.patch('/api/v2/abilities/123', json=payload) assert resp.status == HTTPStatus.UNAUTHORIZED - async def test_replace_ability(self, api_client, api_cookies): + async def test_update_nonexistent_ability(self, api_client, api_cookies): + payload = dict(name='an updated test ability', tactic='defense-evasion') + resp = await api_client.patch('/api/v2/abilities/123', cookies=api_cookies, json=payload) + assert resp.status == HTTPStatus.NOT_FOUND + + async def test_replace_ability(self, api_client, api_cookies, test_ability): test_executor_linux = Executor(name='sh', platform='linux', command='whoami') payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', executors=[ExecutorSchema().dump(test_executor_linux)]) @@ -82,10 +111,29 @@ async def test_replace_ability(self, api_client, api_cookies): assert ability['technique_name'] == payload['technique_name'] assert ability['technique_id'] == payload['technique_id'] assert ability['tactic'] == payload['tactic'] + assert ability['ability_id'] == test_ability.ability_id - async def test_unauthorized_repalce_ability(self, api_client): + async def test_unauthorized_replace_ability(self, api_client, test_ability): test_executor_linux = Executor(name='sh', platform='linux', command='whoami') payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', executors=[ExecutorSchema().dump(test_executor_linux)]) resp = await api_client.put('/api/v2/abilities/123', json=payload) assert resp.status == HTTPStatus.UNAUTHORIZED + + async def test_replace_nonexistent_ability(self, api_client, api_cookies): + test_executor_linux = Executor(name='sh', platform='linux', command='whoami') + payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', + executors=[ExecutorSchema().dump(test_executor_linux)]) + resp = await api_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload) + assert resp.status == HTTPStatus.OK + ability = await resp.json() + assert ability['name'] == payload['name'] + assert ability['technique_name'] == payload['technique_name'] + assert ability['technique_id'] == payload['technique_id'] + assert ability['tactic'] == payload['tactic'] + + async def test_invalid_replace_ability(self, api_client, api_cookies, test_ability): + payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', + executors=[]) + resp = await api_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload) + assert resp.status == HTTPStatus.BAD_REQUEST From b30e1215eea021ab275bee80f495be24dcbe745a Mon Sep 17 00:00:00 2001 From: blee Date: Tue, 17 Aug 2021 16:42:07 -0400 Subject: [PATCH 4/7] update assertions --- tests/api/v2/handlers/test_abilities_api.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/api/v2/handlers/test_abilities_api.py b/tests/api/v2/handlers/test_abilities_api.py index 000b796d1..205231710 100644 --- a/tests/api/v2/handlers/test_abilities_api.py +++ b/tests/api/v2/handlers/test_abilities_api.py @@ -44,7 +44,7 @@ async def test_unauthorized_get_ability_by_id(self, api_client, test_ability): assert resp.status == HTTPStatus.UNAUTHORIZED async def test_get_nonexistent_ability_by_id(self, api_client, api_cookies): - resp = await api_client.get('/api/v2/abilities/123', cookies=api_cookies) + resp = await api_client.get('/api/v2/abilities/999', cookies=api_cookies) assert resp.status == HTTPStatus.NOT_FOUND async def test_create_ability(self, api_client, api_cookies, mocker, async_return): @@ -89,6 +89,8 @@ async def test_update_ability(self, api_client, api_cookies, test_ability): assert ability.tactic == payload['tactic'] assert ability.ability_id == test_ability.ability_id assert ability.description == test_ability.description + assert ability.technique_id == test_ability.technique_id + assert ability.technique_name == test_ability.technique_name async def test_unauthorized_update_ability(self, api_client, test_ability): payload = dict(name='an updated test ability', tactic='defense-evasion') @@ -97,7 +99,7 @@ async def test_unauthorized_update_ability(self, api_client, test_ability): async def test_update_nonexistent_ability(self, api_client, api_cookies): payload = dict(name='an updated test ability', tactic='defense-evasion') - resp = await api_client.patch('/api/v2/abilities/123', cookies=api_cookies, json=payload) + resp = await api_client.patch('/api/v2/abilities/999', cookies=api_cookies, json=payload) assert resp.status == HTTPStatus.NOT_FOUND async def test_replace_ability(self, api_client, api_cookies, test_ability): @@ -131,6 +133,7 @@ async def test_replace_nonexistent_ability(self, api_client, api_cookies): assert ability['technique_name'] == payload['technique_name'] assert ability['technique_id'] == payload['technique_id'] assert ability['tactic'] == payload['tactic'] + assert ability['ability_id'] == '123' async def test_invalid_replace_ability(self, api_client, api_cookies, test_ability): payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', From 86c2cbfc3ea064bb5a5963d78b2b61e5c9e7e9b1 Mon Sep 17 00:00:00 2001 From: blee Date: Wed, 18 Aug 2021 15:14:38 -0400 Subject: [PATCH 5/7] rename fixtures --- tests/api/v2/handlers/test_abilities_api.py | 66 ++++++++++----------- tests/conftest.py | 6 +- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/tests/api/v2/handlers/test_abilities_api.py b/tests/api/v2/handlers/test_abilities_api.py index 205231710..19cb70f6d 100644 --- a/tests/api/v2/handlers/test_abilities_api.py +++ b/tests/api/v2/handlers/test_abilities_api.py @@ -8,7 +8,7 @@ @pytest.fixture -def test_ability(loop, api_client, executor): +def test_ability(loop, api_v2_client, executor): executor_linux = executor(name='sh', platform='linux') ability = Ability(ability_id='123', name='Test Ability', executors=[executor_linux], technique_name='collection', technique_id='1') @@ -17,8 +17,8 @@ def test_ability(loop, api_client, executor): class TestAbilitiesApi: - async def test_get_abilities(self, api_client, api_cookies, test_ability): - resp = await api_client.get('/api/v2/abilities', cookies=api_cookies) + async def test_get_abilities(self, api_v2_client, api_cookies, test_ability): + resp = await api_v2_client.get('/api/v2/abilities', cookies=api_cookies) abilities_list = await resp.json() assert len(abilities_list) == 1 ability_dict = abilities_list[0] @@ -27,31 +27,31 @@ async def test_get_abilities(self, api_client, api_cookies, test_ability): assert ability_dict['technique_name'] == test_ability.technique_name assert len(ability_dict['executors']) == 1 - async def test_unauthorized_get_abilities(self, api_client, test_ability): - resp = await api_client.get('/api/v2/abilities') + async def test_unauthorized_get_abilities(self, api_v2_client, test_ability): + resp = await api_v2_client.get('/api/v2/abilities') assert resp.status == HTTPStatus.UNAUTHORIZED - async def test_get_ability_by_id(self, api_client, api_cookies, test_ability): - resp = await api_client.get('/api/v2/abilities/123', cookies=api_cookies) + async def test_get_ability_by_id(self, api_v2_client, api_cookies, test_ability): + resp = await api_v2_client.get('/api/v2/abilities/123', cookies=api_cookies) ability_dict = await resp.json() assert ability_dict['ability_id'] == test_ability.ability_id assert ability_dict['name'] == test_ability.name assert ability_dict['technique_name'] == test_ability.technique_name assert len(ability_dict['executors']) == 1 - async def test_unauthorized_get_ability_by_id(self, api_client, test_ability): - resp = await api_client.get('/api/v2/abilities/123') + async def test_unauthorized_get_ability_by_id(self, api_v2_client, test_ability): + resp = await api_v2_client.get('/api/v2/abilities/123') assert resp.status == HTTPStatus.UNAUTHORIZED - async def test_get_nonexistent_ability_by_id(self, api_client, api_cookies): - resp = await api_client.get('/api/v2/abilities/999', cookies=api_cookies) + async def test_get_nonexistent_ability_by_id(self, api_v2_client, api_cookies): + resp = await api_v2_client.get('/api/v2/abilities/999', cookies=api_cookies) assert resp.status == HTTPStatus.NOT_FOUND - async def test_create_ability(self, api_client, api_cookies, mocker, async_return): + async def test_create_ability(self, api_v2_client, api_cookies, mocker, async_return): test_executor_linux = Executor(name='sh', platform='linux', command='whoami') payload = dict(name='new test ability', ability_id='456', tactic='collection', technique_name='collection', technique_id='1', executors=[ExecutorSchema().dump(test_executor_linux)]) - resp = await api_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) assert resp.status == HTTPStatus.OK ability_data = await resp.json() assert ability_data.get('name') == payload['name'] @@ -60,29 +60,29 @@ async def test_create_ability(self, api_client, api_cookies, mocker, async_retur ability_exists = await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '456'}) assert ability_exists - async def test_unauthorized_create_ability(self, api_client): + async def test_unauthorized_create_ability(self, api_v2_client): test_executor_linux = Executor(name='sh', platform='linux', command='whoami') payload = dict(name='new test ability', ability_id='456', tactic='collection', technique_name='collection', technique_id='1', executors=[ExecutorSchema().dump(test_executor_linux)]) - resp = await api_client.post('/api/v2/abilities', json=payload) + resp = await api_v2_client.post('/api/v2/abilities', json=payload) assert resp.status == HTTPStatus.UNAUTHORIZED - async def test_create_duplicate_ability(self, api_client, api_cookies, mocker, async_return, test_ability): + async def test_create_duplicate_ability(self, api_v2_client, api_cookies, mocker, async_return, test_ability): test_executor_linux = Executor(name='sh', platform='linux', command='whoami') payload = dict(name='new test ability', ability_id='123', tactic='collection', technique_name='collection', technique_id='1', executors=[ExecutorSchema().dump(test_executor_linux)]) - resp = await api_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) assert resp.status == HTTPStatus.BAD_REQUEST - async def test_create_invalid_ability(self, api_client, api_cookies, mocker, async_return, test_ability): + async def test_create_invalid_ability(self, api_v2_client, api_cookies, mocker, async_return, test_ability): payload = dict(name='new test ability', ability_id='123', technique_name='collection', technique_id='1', executors=[]) - resp = await api_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) assert resp.status == HTTPStatus.BAD_REQUEST - async def test_update_ability(self, api_client, api_cookies, test_ability): + async def test_update_ability(self, api_v2_client, api_cookies, test_ability): payload = dict(name='an updated test ability', tactic='defense-evasion') - resp = await api_client.patch('/api/v2/abilities/123', cookies=api_cookies, json=payload) + resp = await api_v2_client.patch('/api/v2/abilities/123', cookies=api_cookies, json=payload) assert resp.status == HTTPStatus.OK ability = (await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '123'}))[0] assert ability.name == payload['name'] @@ -92,21 +92,21 @@ async def test_update_ability(self, api_client, api_cookies, test_ability): assert ability.technique_id == test_ability.technique_id assert ability.technique_name == test_ability.technique_name - async def test_unauthorized_update_ability(self, api_client, test_ability): + async def test_unauthorized_update_ability(self, api_v2_client, test_ability): payload = dict(name='an updated test ability', tactic='defense-evasion') - resp = await api_client.patch('/api/v2/abilities/123', json=payload) + resp = await api_v2_client.patch('/api/v2/abilities/123', json=payload) assert resp.status == HTTPStatus.UNAUTHORIZED - async def test_update_nonexistent_ability(self, api_client, api_cookies): + async def test_update_nonexistent_ability(self, api_v2_client, api_cookies): payload = dict(name='an updated test ability', tactic='defense-evasion') - resp = await api_client.patch('/api/v2/abilities/999', cookies=api_cookies, json=payload) + resp = await api_v2_client.patch('/api/v2/abilities/999', cookies=api_cookies, json=payload) assert resp.status == HTTPStatus.NOT_FOUND - async def test_replace_ability(self, api_client, api_cookies, test_ability): + async def test_replace_ability(self, api_v2_client, api_cookies, test_ability): test_executor_linux = Executor(name='sh', platform='linux', command='whoami') payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', executors=[ExecutorSchema().dump(test_executor_linux)]) - resp = await api_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload) + resp = await api_v2_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload) assert resp.status == HTTPStatus.OK ability = await resp.json() assert ability['name'] == payload['name'] @@ -115,18 +115,18 @@ async def test_replace_ability(self, api_client, api_cookies, test_ability): assert ability['tactic'] == payload['tactic'] assert ability['ability_id'] == test_ability.ability_id - async def test_unauthorized_replace_ability(self, api_client, test_ability): + async def test_unauthorized_replace_ability(self, api_v2_client, test_ability): test_executor_linux = Executor(name='sh', platform='linux', command='whoami') payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', executors=[ExecutorSchema().dump(test_executor_linux)]) - resp = await api_client.put('/api/v2/abilities/123', json=payload) + resp = await api_v2_client.put('/api/v2/abilities/123', json=payload) assert resp.status == HTTPStatus.UNAUTHORIZED - async def test_replace_nonexistent_ability(self, api_client, api_cookies): + async def test_replace_nonexistent_ability(self, api_v2_client, api_cookies): test_executor_linux = Executor(name='sh', platform='linux', command='whoami') payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', executors=[ExecutorSchema().dump(test_executor_linux)]) - resp = await api_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload) + resp = await api_v2_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload) assert resp.status == HTTPStatus.OK ability = await resp.json() assert ability['name'] == payload['name'] @@ -135,8 +135,8 @@ async def test_replace_nonexistent_ability(self, api_client, api_cookies): assert ability['tactic'] == payload['tactic'] assert ability['ability_id'] == '123' - async def test_invalid_replace_ability(self, api_client, api_cookies, test_ability): + async def test_invalid_replace_ability(self, api_v2_client, api_cookies, test_ability): payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', executors=[]) - resp = await api_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload) + resp = await api_v2_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload) assert resp.status == HTTPStatus.BAD_REQUEST diff --git a/tests/conftest.py b/tests/conftest.py index 5b90eff31..c7802edfe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -258,7 +258,7 @@ def _agent_profile(paw=None, group='red', platform='linux', executors=None, priv @pytest.fixture -def api_client(loop, aiohttp_client): +def api_v2_client(loop, aiohttp_client): def make_app(svcs): app = web.Application( middlewares=[ @@ -312,9 +312,9 @@ async def initialize(): @pytest.fixture -def api_cookies(loop, api_client): +def api_cookies(loop, api_v2_client): async def get_cookie(): - r = await api_client.post('/enter', allow_redirects=False, data=dict(username='admin', password='admin')) + r = await api_v2_client.post('/enter', allow_redirects=False, data=dict(username='admin', password='admin')) return r.cookies return loop.run_until_complete(get_cookie()) From 6140d304bc8f84d6a7158b807370cf60d7e4c065 Mon Sep 17 00:00:00 2001 From: blee Date: Fri, 20 Aug 2021 09:44:30 -0400 Subject: [PATCH 6/7] refactor expected payloads --- tests/api/v2/handlers/test_abilities_api.py | 129 ++++++++++---------- 1 file changed, 62 insertions(+), 67 deletions(-) diff --git a/tests/api/v2/handlers/test_abilities_api.py b/tests/api/v2/handlers/test_abilities_api.py index 19cb70f6d..7da9cb1e0 100644 --- a/tests/api/v2/handlers/test_abilities_api.py +++ b/tests/api/v2/handlers/test_abilities_api.py @@ -7,11 +7,47 @@ from app.utility.base_service import BaseService +@pytest.fixture +def new_ability_payload(): + test_executor_linux = Executor(name='sh', platform='linux', command='whoami') + return {'name': 'new test ability', + 'ability_id': '456', + 'tactic': 'collection', + 'technique_name': 'collection', + 'technique_id': '1', + 'executors': [ExecutorSchema().dump(test_executor_linux)], + 'access': {}, + 'additional_info': {}, + 'buckets': ['collection'], + 'description': '', + 'privilege': '', + 'repeatable': False, + 'requirements': [], + 'singleton': False + } + + +@pytest.fixture +def updated_ability_payload(test_ability): + ability_data = test_ability.schema.dump(test_ability) + ability_data.update(dict(name='an updated test ability', tactic='defense-evasion')) + return ability_data + + +@pytest.fixture +def replaced_ability_payload(test_ability): + ability_data = test_ability.schema.dump(test_ability) + test_executor_linux = Executor(name='sh', platform='linux', command='whoami') + ability_data.update(dict(name='replaced test ability', tactic='collection', technique_name='discovery', + technique_id='2', executors=[ExecutorSchema().dump(test_executor_linux)])) + return ability_data + + @pytest.fixture def test_ability(loop, api_v2_client, executor): executor_linux = executor(name='sh', platform='linux') ability = Ability(ability_id='123', name='Test Ability', executors=[executor_linux], - technique_name='collection', technique_id='1') + technique_name='collection', technique_id='1', description='', privilege='', tactic='discovery') loop.run_until_complete(BaseService.get_service('data_svc').store(ability)) return ability @@ -22,10 +58,7 @@ async def test_get_abilities(self, api_v2_client, api_cookies, test_ability): abilities_list = await resp.json() assert len(abilities_list) == 1 ability_dict = abilities_list[0] - assert ability_dict['ability_id'] == test_ability.ability_id - assert ability_dict['name'] == test_ability.name - assert ability_dict['technique_name'] == test_ability.technique_name - assert len(ability_dict['executors']) == 1 + assert ability_dict == test_ability.schema.dump(test_ability) async def test_unauthorized_get_abilities(self, api_v2_client, test_ability): resp = await api_v2_client.get('/api/v2/abilities') @@ -34,10 +67,7 @@ async def test_unauthorized_get_abilities(self, api_v2_client, test_ability): async def test_get_ability_by_id(self, api_v2_client, api_cookies, test_ability): resp = await api_v2_client.get('/api/v2/abilities/123', cookies=api_cookies) ability_dict = await resp.json() - assert ability_dict['ability_id'] == test_ability.ability_id - assert ability_dict['name'] == test_ability.name - assert ability_dict['technique_name'] == test_ability.technique_name - assert len(ability_dict['executors']) == 1 + assert ability_dict == test_ability.schema.dump(test_ability) async def test_unauthorized_get_ability_by_id(self, api_v2_client, test_ability): resp = await api_v2_client.get('/api/v2/abilities/123') @@ -47,30 +77,20 @@ async def test_get_nonexistent_ability_by_id(self, api_v2_client, api_cookies): resp = await api_v2_client.get('/api/v2/abilities/999', cookies=api_cookies) assert resp.status == HTTPStatus.NOT_FOUND - async def test_create_ability(self, api_v2_client, api_cookies, mocker, async_return): - test_executor_linux = Executor(name='sh', platform='linux', command='whoami') - payload = dict(name='new test ability', ability_id='456', tactic='collection', technique_name='collection', - technique_id='1', executors=[ExecutorSchema().dump(test_executor_linux)]) - resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + async def test_create_ability(self, api_v2_client, api_cookies, mocker, async_return, new_ability_payload): + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=new_ability_payload) assert resp.status == HTTPStatus.OK ability_data = await resp.json() - assert ability_data.get('name') == payload['name'] - assert ability_data.get('ability_id') == payload['ability_id'] - assert ability_data.get('tactic') == payload['tactic'] + assert ability_data == new_ability_payload ability_exists = await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '456'}) assert ability_exists - async def test_unauthorized_create_ability(self, api_v2_client): - test_executor_linux = Executor(name='sh', platform='linux', command='whoami') - payload = dict(name='new test ability', ability_id='456', tactic='collection', technique_name='collection', - technique_id='1', executors=[ExecutorSchema().dump(test_executor_linux)]) - resp = await api_v2_client.post('/api/v2/abilities', json=payload) + async def test_unauthorized_create_ability(self, api_v2_client, new_ability_payload): + resp = await api_v2_client.post('/api/v2/abilities', json=new_ability_payload) assert resp.status == HTTPStatus.UNAUTHORIZED async def test_create_duplicate_ability(self, api_v2_client, api_cookies, mocker, async_return, test_ability): - test_executor_linux = Executor(name='sh', platform='linux', command='whoami') - payload = dict(name='new test ability', ability_id='123', tactic='collection', technique_name='collection', - technique_id='1', executors=[ExecutorSchema().dump(test_executor_linux)]) + payload = test_ability.schema.dump(test_ability) resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) assert resp.status == HTTPStatus.BAD_REQUEST @@ -80,60 +100,35 @@ async def test_create_invalid_ability(self, api_v2_client, api_cookies, mocker, resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) assert resp.status == HTTPStatus.BAD_REQUEST - async def test_update_ability(self, api_v2_client, api_cookies, test_ability): - payload = dict(name='an updated test ability', tactic='defense-evasion') - resp = await api_v2_client.patch('/api/v2/abilities/123', cookies=api_cookies, json=payload) + async def test_update_ability(self, api_v2_client, api_cookies, test_ability, updated_ability_payload): + resp = await api_v2_client.patch('/api/v2/abilities/123', cookies=api_cookies, json=updated_ability_payload) assert resp.status == HTTPStatus.OK ability = (await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '123'}))[0] - assert ability.name == payload['name'] - assert ability.tactic == payload['tactic'] - assert ability.ability_id == test_ability.ability_id - assert ability.description == test_ability.description - assert ability.technique_id == test_ability.technique_id - assert ability.technique_name == test_ability.technique_name - - async def test_unauthorized_update_ability(self, api_v2_client, test_ability): - payload = dict(name='an updated test ability', tactic='defense-evasion') - resp = await api_v2_client.patch('/api/v2/abilities/123', json=payload) + assert ability.schema.dump(ability) == updated_ability_payload + + async def test_unauthorized_update_ability(self, api_v2_client, test_ability, updated_ability_payload): + resp = await api_v2_client.patch('/api/v2/abilities/123', json=updated_ability_payload) assert resp.status == HTTPStatus.UNAUTHORIZED - async def test_update_nonexistent_ability(self, api_v2_client, api_cookies): - payload = dict(name='an updated test ability', tactic='defense-evasion') - resp = await api_v2_client.patch('/api/v2/abilities/999', cookies=api_cookies, json=payload) + async def test_update_nonexistent_ability(self, api_v2_client, api_cookies, updated_ability_payload): + resp = await api_v2_client.patch('/api/v2/abilities/999', cookies=api_cookies, json=updated_ability_payload) assert resp.status == HTTPStatus.NOT_FOUND - async def test_replace_ability(self, api_v2_client, api_cookies, test_ability): - test_executor_linux = Executor(name='sh', platform='linux', command='whoami') - payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', - executors=[ExecutorSchema().dump(test_executor_linux)]) - resp = await api_v2_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload) + async def test_replace_ability(self, api_v2_client, api_cookies, test_ability, replaced_ability_payload): + resp = await api_v2_client.put('/api/v2/abilities/123', cookies=api_cookies, json=replaced_ability_payload) assert resp.status == HTTPStatus.OK ability = await resp.json() - assert ability['name'] == payload['name'] - assert ability['technique_name'] == payload['technique_name'] - assert ability['technique_id'] == payload['technique_id'] - assert ability['tactic'] == payload['tactic'] - assert ability['ability_id'] == test_ability.ability_id - - async def test_unauthorized_replace_ability(self, api_v2_client, test_ability): - test_executor_linux = Executor(name='sh', platform='linux', command='whoami') - payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', - executors=[ExecutorSchema().dump(test_executor_linux)]) - resp = await api_v2_client.put('/api/v2/abilities/123', json=payload) + assert ability == replaced_ability_payload + + async def test_unauthorized_replace_ability(self, api_v2_client, test_ability, replaced_ability_payload): + resp = await api_v2_client.put('/api/v2/abilities/123', json=replaced_ability_payload) assert resp.status == HTTPStatus.UNAUTHORIZED - async def test_replace_nonexistent_ability(self, api_v2_client, api_cookies): - test_executor_linux = Executor(name='sh', platform='linux', command='whoami') - payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', - executors=[ExecutorSchema().dump(test_executor_linux)]) - resp = await api_v2_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload) + async def test_replace_nonexistent_ability(self, api_v2_client, api_cookies, new_ability_payload): + resp = await api_v2_client.put('/api/v2/abilities/456', cookies=api_cookies, json=new_ability_payload) assert resp.status == HTTPStatus.OK ability = await resp.json() - assert ability['name'] == payload['name'] - assert ability['technique_name'] == payload['technique_name'] - assert ability['technique_id'] == payload['technique_id'] - assert ability['tactic'] == payload['tactic'] - assert ability['ability_id'] == '123' + assert ability == new_ability_payload async def test_invalid_replace_ability(self, api_v2_client, api_cookies, test_ability): payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', From d6b19b58778bfd03900542e6a3e40fd2488ef062 Mon Sep 17 00:00:00 2001 From: blee Date: Fri, 27 Aug 2021 08:48:56 -0400 Subject: [PATCH 7/7] fix duplicate imports --- tests/conftest.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index ed1b75d60..785420914 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,29 +1,19 @@ import asyncio import os.path - -import aiohttp_apispec import pytest import random import string import uuid import yaml import aiohttp_apispec -from unittest import mock -from aiohttp_apispec import validation_middleware -from aiohttp import web -from pathlib import Path +from unittest import mock from aiohttp_apispec import validation_middleware from aiohttp import web from pathlib import Path -from app import version -from app.api.rest_api import RestApi from app.api.v2.handlers.ability_api import AbilityApi -from app.api.v2.responses import apispec_request_validation_middleware, json_request_validation_middleware -from app.api.v2.security import authentication_required_middleware_factory from app.objects.c_obfuscator import Obfuscator -from app.service.auth_svc import AuthService from app.utility.base_world import BaseWorld from app.service.app_svc import AppService from app.service.auth_svc import AuthService