Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VIRTS-2743] Abilities v2 API tests #2228

Merged
merged 14 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions tests/api/v2/handlers/test_abilities_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import pytest

from http import HTTPStatus

from app.objects.c_ability import Ability
from app.objects.secondclass.c_executor import Executor, ExecutorSchema
from app.utility.base_service import BaseService


@pytest.fixture
def new_ability_payload():
test_executor_linux = Executor(name='sh', platform='linux', command='whoami')
return {'name': 'new test ability',
'ability_id': '456',
'tactic': 'collection',
'technique_name': 'collection',
'technique_id': '1',
'executors': [ExecutorSchema().dump(test_executor_linux)],
'access': {},
'additional_info': {},
'buckets': ['collection'],
'description': '',
'privilege': '',
'repeatable': False,
'requirements': [],
'singleton': False
}


@pytest.fixture
def updated_ability_payload(test_ability):
ability_data = test_ability.schema.dump(test_ability)
ability_data.update(dict(name='an updated test ability', tactic='defense-evasion'))
return ability_data


@pytest.fixture
def replaced_ability_payload(test_ability):
ability_data = test_ability.schema.dump(test_ability)
test_executor_linux = Executor(name='sh', platform='linux', command='whoami')
ability_data.update(dict(name='replaced test ability', tactic='collection', technique_name='discovery',
technique_id='2', executors=[ExecutorSchema().dump(test_executor_linux)]))
return ability_data


@pytest.fixture
def test_ability(loop, api_v2_client, executor):
executor_linux = executor(name='sh', platform='linux')
ability = Ability(ability_id='123', name='Test Ability', executors=[executor_linux],
technique_name='collection', technique_id='1', description='', privilege='', tactic='discovery')
loop.run_until_complete(BaseService.get_service('data_svc').store(ability))
return ability


class TestAbilitiesApi:
async def test_get_abilities(self, api_v2_client, api_cookies, test_ability):
resp = await api_v2_client.get('/api/v2/abilities', cookies=api_cookies)
abilities_list = await resp.json()
assert len(abilities_list) == 1
ability_dict = abilities_list[0]
assert ability_dict == test_ability.schema.dump(test_ability)

async def test_unauthorized_get_abilities(self, api_v2_client, test_ability):
resp = await api_v2_client.get('/api/v2/abilities')
assert resp.status == HTTPStatus.UNAUTHORIZED

async def test_get_ability_by_id(self, api_v2_client, api_cookies, test_ability):
resp = await api_v2_client.get('/api/v2/abilities/123', cookies=api_cookies)
ability_dict = await resp.json()
assert ability_dict == test_ability.schema.dump(test_ability)

async def test_unauthorized_get_ability_by_id(self, api_v2_client, test_ability):
resp = await api_v2_client.get('/api/v2/abilities/123')
assert resp.status == HTTPStatus.UNAUTHORIZED

async def test_get_nonexistent_ability_by_id(self, api_v2_client, api_cookies):
resp = await api_v2_client.get('/api/v2/abilities/999', cookies=api_cookies)
assert resp.status == HTTPStatus.NOT_FOUND

async def test_create_ability(self, api_v2_client, api_cookies, mocker, async_return, new_ability_payload):
resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=new_ability_payload)
assert resp.status == HTTPStatus.OK
ability_data = await resp.json()
assert ability_data == new_ability_payload
ability_exists = await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '456'})
assert ability_exists

async def test_unauthorized_create_ability(self, api_v2_client, new_ability_payload):
resp = await api_v2_client.post('/api/v2/abilities', json=new_ability_payload)
assert resp.status == HTTPStatus.UNAUTHORIZED

async def test_create_duplicate_ability(self, api_v2_client, api_cookies, mocker, async_return, test_ability):
payload = test_ability.schema.dump(test_ability)
resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload)
assert resp.status == HTTPStatus.BAD_REQUEST

async def test_create_invalid_ability(self, api_v2_client, api_cookies, mocker, async_return, test_ability):
payload = dict(name='new test ability', ability_id='123', technique_name='collection',
technique_id='1', executors=[])
resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload)
assert resp.status == HTTPStatus.BAD_REQUEST

async def test_update_ability(self, api_v2_client, api_cookies, test_ability, updated_ability_payload):
resp = await api_v2_client.patch('/api/v2/abilities/123', cookies=api_cookies, json=updated_ability_payload)
assert resp.status == HTTPStatus.OK
ability = (await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '123'}))[0]
assert ability.schema.dump(ability) == updated_ability_payload

async def test_unauthorized_update_ability(self, api_v2_client, test_ability, updated_ability_payload):
resp = await api_v2_client.patch('/api/v2/abilities/123', json=updated_ability_payload)
assert resp.status == HTTPStatus.UNAUTHORIZED

async def test_update_nonexistent_ability(self, api_v2_client, api_cookies, updated_ability_payload):
resp = await api_v2_client.patch('/api/v2/abilities/999', cookies=api_cookies, json=updated_ability_payload)
assert resp.status == HTTPStatus.NOT_FOUND

async def test_replace_ability(self, api_v2_client, api_cookies, test_ability, replaced_ability_payload):
resp = await api_v2_client.put('/api/v2/abilities/123', cookies=api_cookies, json=replaced_ability_payload)
assert resp.status == HTTPStatus.OK
ability = await resp.json()
assert ability == replaced_ability_payload

async def test_unauthorized_replace_ability(self, api_v2_client, test_ability, replaced_ability_payload):
resp = await api_v2_client.put('/api/v2/abilities/123', json=replaced_ability_payload)
assert resp.status == HTTPStatus.UNAUTHORIZED

async def test_replace_nonexistent_ability(self, api_v2_client, api_cookies, new_ability_payload):
resp = await api_v2_client.put('/api/v2/abilities/456', cookies=api_cookies, json=new_ability_payload)
assert resp.status == HTTPStatus.OK
ability = await resp.json()
assert ability == new_ability_payload

async def test_invalid_replace_ability(self, api_v2_client, api_cookies, test_ability):
payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2',
executors=[])
resp = await api_v2_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload)
assert resp.status == HTTPStatus.BAD_REQUEST
4 changes: 3 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import asyncio
import os.path

import pytest
import random
import string
import uuid
import yaml
import aiohttp_apispec

from unittest import mock
from aiohttp_apispec import validation_middleware
from aiohttp import web
from pathlib import Path

from app.api.v2.handlers.ability_api import AbilityApi
from app.objects.c_obfuscator import Obfuscator
from app.utility.base_world import BaseWorld
from app.service.app_svc import AppService
Expand Down Expand Up @@ -266,6 +267,7 @@ def make_app(svcs):
json_request_validation_middleware
]
)
AbilityApi(svcs).add_routes(app)
OperationApi(svcs).add_routes(app)
return app

Expand Down