-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2228 from mitre/bleepbop/VIRTS-2743/abilities-api…
…-tests [VIRTS-2743] Abilities v2 API tests
- Loading branch information
Showing
2 changed files
with
140 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
import pytest | ||
|
||
from http import HTTPStatus | ||
|
||
from app.objects.c_ability import Ability | ||
from app.objects.secondclass.c_executor import Executor, ExecutorSchema | ||
from app.utility.base_service import BaseService | ||
|
||
|
||
@pytest.fixture | ||
def new_ability_payload(): | ||
test_executor_linux = Executor(name='sh', platform='linux', command='whoami') | ||
return {'name': 'new test ability', | ||
'ability_id': '456', | ||
'tactic': 'collection', | ||
'technique_name': 'collection', | ||
'technique_id': '1', | ||
'executors': [ExecutorSchema().dump(test_executor_linux)], | ||
'access': {}, | ||
'additional_info': {}, | ||
'buckets': ['collection'], | ||
'description': '', | ||
'privilege': '', | ||
'repeatable': False, | ||
'requirements': [], | ||
'singleton': False | ||
} | ||
|
||
|
||
@pytest.fixture | ||
def updated_ability_payload(test_ability): | ||
ability_data = test_ability.schema.dump(test_ability) | ||
ability_data.update(dict(name='an updated test ability', tactic='defense-evasion')) | ||
return ability_data | ||
|
||
|
||
@pytest.fixture | ||
def replaced_ability_payload(test_ability): | ||
ability_data = test_ability.schema.dump(test_ability) | ||
test_executor_linux = Executor(name='sh', platform='linux', command='whoami') | ||
ability_data.update(dict(name='replaced test ability', tactic='collection', technique_name='discovery', | ||
technique_id='2', executors=[ExecutorSchema().dump(test_executor_linux)])) | ||
return ability_data | ||
|
||
|
||
@pytest.fixture | ||
def test_ability(loop, api_v2_client, executor): | ||
executor_linux = executor(name='sh', platform='linux') | ||
ability = Ability(ability_id='123', name='Test Ability', executors=[executor_linux], | ||
technique_name='collection', technique_id='1', description='', privilege='', tactic='discovery') | ||
loop.run_until_complete(BaseService.get_service('data_svc').store(ability)) | ||
return ability | ||
|
||
|
||
class TestAbilitiesApi: | ||
async def test_get_abilities(self, api_v2_client, api_cookies, test_ability): | ||
resp = await api_v2_client.get('/api/v2/abilities', cookies=api_cookies) | ||
abilities_list = await resp.json() | ||
assert len(abilities_list) == 1 | ||
ability_dict = abilities_list[0] | ||
assert ability_dict == test_ability.schema.dump(test_ability) | ||
|
||
async def test_unauthorized_get_abilities(self, api_v2_client, test_ability): | ||
resp = await api_v2_client.get('/api/v2/abilities') | ||
assert resp.status == HTTPStatus.UNAUTHORIZED | ||
|
||
async def test_get_ability_by_id(self, api_v2_client, api_cookies, test_ability): | ||
resp = await api_v2_client.get('/api/v2/abilities/123', cookies=api_cookies) | ||
ability_dict = await resp.json() | ||
assert ability_dict == test_ability.schema.dump(test_ability) | ||
|
||
async def test_unauthorized_get_ability_by_id(self, api_v2_client, test_ability): | ||
resp = await api_v2_client.get('/api/v2/abilities/123') | ||
assert resp.status == HTTPStatus.UNAUTHORIZED | ||
|
||
async def test_get_nonexistent_ability_by_id(self, api_v2_client, api_cookies): | ||
resp = await api_v2_client.get('/api/v2/abilities/999', cookies=api_cookies) | ||
assert resp.status == HTTPStatus.NOT_FOUND | ||
|
||
async def test_create_ability(self, api_v2_client, api_cookies, mocker, async_return, new_ability_payload): | ||
resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=new_ability_payload) | ||
assert resp.status == HTTPStatus.OK | ||
ability_data = await resp.json() | ||
assert ability_data == new_ability_payload | ||
ability_exists = await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '456'}) | ||
assert ability_exists | ||
|
||
async def test_unauthorized_create_ability(self, api_v2_client, new_ability_payload): | ||
resp = await api_v2_client.post('/api/v2/abilities', json=new_ability_payload) | ||
assert resp.status == HTTPStatus.UNAUTHORIZED | ||
|
||
async def test_create_duplicate_ability(self, api_v2_client, api_cookies, mocker, async_return, test_ability): | ||
payload = test_ability.schema.dump(test_ability) | ||
resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) | ||
assert resp.status == HTTPStatus.BAD_REQUEST | ||
|
||
async def test_create_invalid_ability(self, api_v2_client, api_cookies, mocker, async_return, test_ability): | ||
payload = dict(name='new test ability', ability_id='123', technique_name='collection', | ||
technique_id='1', executors=[]) | ||
resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) | ||
assert resp.status == HTTPStatus.BAD_REQUEST | ||
|
||
async def test_update_ability(self, api_v2_client, api_cookies, test_ability, updated_ability_payload): | ||
resp = await api_v2_client.patch('/api/v2/abilities/123', cookies=api_cookies, json=updated_ability_payload) | ||
assert resp.status == HTTPStatus.OK | ||
ability = (await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '123'}))[0] | ||
assert ability.schema.dump(ability) == updated_ability_payload | ||
|
||
async def test_unauthorized_update_ability(self, api_v2_client, test_ability, updated_ability_payload): | ||
resp = await api_v2_client.patch('/api/v2/abilities/123', json=updated_ability_payload) | ||
assert resp.status == HTTPStatus.UNAUTHORIZED | ||
|
||
async def test_update_nonexistent_ability(self, api_v2_client, api_cookies, updated_ability_payload): | ||
resp = await api_v2_client.patch('/api/v2/abilities/999', cookies=api_cookies, json=updated_ability_payload) | ||
assert resp.status == HTTPStatus.NOT_FOUND | ||
|
||
async def test_replace_ability(self, api_v2_client, api_cookies, test_ability, replaced_ability_payload): | ||
resp = await api_v2_client.put('/api/v2/abilities/123', cookies=api_cookies, json=replaced_ability_payload) | ||
assert resp.status == HTTPStatus.OK | ||
ability = await resp.json() | ||
assert ability == replaced_ability_payload | ||
|
||
async def test_unauthorized_replace_ability(self, api_v2_client, test_ability, replaced_ability_payload): | ||
resp = await api_v2_client.put('/api/v2/abilities/123', json=replaced_ability_payload) | ||
assert resp.status == HTTPStatus.UNAUTHORIZED | ||
|
||
async def test_replace_nonexistent_ability(self, api_v2_client, api_cookies, new_ability_payload): | ||
resp = await api_v2_client.put('/api/v2/abilities/456', cookies=api_cookies, json=new_ability_payload) | ||
assert resp.status == HTTPStatus.OK | ||
ability = await resp.json() | ||
assert ability == new_ability_payload | ||
|
||
async def test_invalid_replace_ability(self, api_v2_client, api_cookies, test_ability): | ||
payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2', | ||
executors=[]) | ||
resp = await api_v2_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload) | ||
assert resp.status == HTTPStatus.BAD_REQUEST |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters