Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VIRTS-2743] Abilities v2 API tests #2228

Merged
merged 14 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions tests/api/v2/handlers/test_abilities_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import pytest

from http import HTTPStatus

from app.objects.c_ability import Ability
from app.objects.secondclass.c_executor import Executor, ExecutorSchema
from app.utility.base_service import BaseService


@pytest.fixture
def test_ability(loop, api_v2_client, executor):
executor_linux = executor(name='sh', platform='linux')
ability = Ability(ability_id='123', name='Test Ability', executors=[executor_linux],
technique_name='collection', technique_id='1')
loop.run_until_complete(BaseService.get_service('data_svc').store(ability))
return ability


class TestAbilitiesApi:
async def test_get_abilities(self, api_v2_client, api_cookies, test_ability):
resp = await api_v2_client.get('/api/v2/abilities', cookies=api_cookies)
abilities_list = await resp.json()
assert len(abilities_list) == 1
ability_dict = abilities_list[0]
assert ability_dict['ability_id'] == test_ability.ability_id
bleepbop marked this conversation as resolved.
Show resolved Hide resolved
assert ability_dict['name'] == test_ability.name
assert ability_dict['technique_name'] == test_ability.technique_name
assert len(ability_dict['executors']) == 1

async def test_unauthorized_get_abilities(self, api_v2_client, test_ability):
resp = await api_v2_client.get('/api/v2/abilities')
assert resp.status == HTTPStatus.UNAUTHORIZED

async def test_get_ability_by_id(self, api_v2_client, api_cookies, test_ability):
resp = await api_v2_client.get('/api/v2/abilities/123', cookies=api_cookies)
ability_dict = await resp.json()
assert ability_dict['ability_id'] == test_ability.ability_id
assert ability_dict['name'] == test_ability.name
assert ability_dict['technique_name'] == test_ability.technique_name
assert len(ability_dict['executors']) == 1

async def test_unauthorized_get_ability_by_id(self, api_v2_client, test_ability):
resp = await api_v2_client.get('/api/v2/abilities/123')
assert resp.status == HTTPStatus.UNAUTHORIZED

async def test_get_nonexistent_ability_by_id(self, api_v2_client, api_cookies):
resp = await api_v2_client.get('/api/v2/abilities/999', cookies=api_cookies)
assert resp.status == HTTPStatus.NOT_FOUND

async def test_create_ability(self, api_v2_client, api_cookies, mocker, async_return):
test_executor_linux = Executor(name='sh', platform='linux', command='whoami')
payload = dict(name='new test ability', ability_id='456', tactic='collection', technique_name='collection',
technique_id='1', executors=[ExecutorSchema().dump(test_executor_linux)])
resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload)
assert resp.status == HTTPStatus.OK
ability_data = await resp.json()
assert ability_data.get('name') == payload['name']
bleepbop marked this conversation as resolved.
Show resolved Hide resolved
assert ability_data.get('ability_id') == payload['ability_id']
assert ability_data.get('tactic') == payload['tactic']
ability_exists = await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '456'})
assert ability_exists

async def test_unauthorized_create_ability(self, api_v2_client):
test_executor_linux = Executor(name='sh', platform='linux', command='whoami')
payload = dict(name='new test ability', ability_id='456', tactic='collection', technique_name='collection',
technique_id='1', executors=[ExecutorSchema().dump(test_executor_linux)])
resp = await api_v2_client.post('/api/v2/abilities', json=payload)
assert resp.status == HTTPStatus.UNAUTHORIZED

async def test_create_duplicate_ability(self, api_v2_client, api_cookies, mocker, async_return, test_ability):
test_executor_linux = Executor(name='sh', platform='linux', command='whoami')
payload = dict(name='new test ability', ability_id='123', tactic='collection', technique_name='collection',
technique_id='1', executors=[ExecutorSchema().dump(test_executor_linux)])
resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload)
assert resp.status == HTTPStatus.BAD_REQUEST

async def test_create_invalid_ability(self, api_v2_client, api_cookies, mocker, async_return, test_ability):
payload = dict(name='new test ability', ability_id='123', technique_name='collection',
technique_id='1', executors=[])
resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload)
assert resp.status == HTTPStatus.BAD_REQUEST

async def test_update_ability(self, api_v2_client, api_cookies, test_ability):
payload = dict(name='an updated test ability', tactic='defense-evasion')
resp = await api_v2_client.patch('/api/v2/abilities/123', cookies=api_cookies, json=payload)
assert resp.status == HTTPStatus.OK
ability = (await BaseService.get_service('data_svc').locate('abilities', {'ability_id': '123'}))[0]
assert ability.name == payload['name']
assert ability.tactic == payload['tactic']
assert ability.ability_id == test_ability.ability_id
assert ability.description == test_ability.description
assert ability.technique_id == test_ability.technique_id
assert ability.technique_name == test_ability.technique_name

async def test_unauthorized_update_ability(self, api_v2_client, test_ability):
payload = dict(name='an updated test ability', tactic='defense-evasion')
resp = await api_v2_client.patch('/api/v2/abilities/123', json=payload)
assert resp.status == HTTPStatus.UNAUTHORIZED

async def test_update_nonexistent_ability(self, api_v2_client, api_cookies):
payload = dict(name='an updated test ability', tactic='defense-evasion')
resp = await api_v2_client.patch('/api/v2/abilities/999', cookies=api_cookies, json=payload)
assert resp.status == HTTPStatus.NOT_FOUND

async def test_replace_ability(self, api_v2_client, api_cookies, test_ability):
test_executor_linux = Executor(name='sh', platform='linux', command='whoami')
payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2',
executors=[ExecutorSchema().dump(test_executor_linux)])
resp = await api_v2_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload)
assert resp.status == HTTPStatus.OK
ability = await resp.json()
assert ability['name'] == payload['name']
assert ability['technique_name'] == payload['technique_name']
assert ability['technique_id'] == payload['technique_id']
assert ability['tactic'] == payload['tactic']
assert ability['ability_id'] == test_ability.ability_id

async def test_unauthorized_replace_ability(self, api_v2_client, test_ability):
test_executor_linux = Executor(name='sh', platform='linux', command='whoami')
payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2',
executors=[ExecutorSchema().dump(test_executor_linux)])
resp = await api_v2_client.put('/api/v2/abilities/123', json=payload)
assert resp.status == HTTPStatus.UNAUTHORIZED

async def test_replace_nonexistent_ability(self, api_v2_client, api_cookies):
test_executor_linux = Executor(name='sh', platform='linux', command='whoami')
payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2',
executors=[ExecutorSchema().dump(test_executor_linux)])
resp = await api_v2_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload)
bleepbop marked this conversation as resolved.
Show resolved Hide resolved
assert resp.status == HTTPStatus.OK
ability = await resp.json()
assert ability['name'] == payload['name']
assert ability['technique_name'] == payload['technique_name']
assert ability['technique_id'] == payload['technique_id']
assert ability['tactic'] == payload['tactic']
assert ability['ability_id'] == '123'

async def test_invalid_replace_ability(self, api_v2_client, api_cookies, test_ability):
payload = dict(name='replaced test ability', tactic='collection', technique_name='discovery', technique_id='2',
executors=[])
resp = await api_v2_client.put('/api/v2/abilities/123', cookies=api_cookies, json=payload)
assert resp.status == HTTPStatus.BAD_REQUEST
83 changes: 83 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
import asyncio
import os.path

import aiohttp_apispec
import pytest
import random
import string
import uuid
import yaml
from unittest import mock

from aiohttp_apispec import validation_middleware
from aiohttp import web
from pathlib import Path

from app import version
from app.api.rest_api import RestApi
from app.api.v2.handlers.ability_api import AbilityApi
from app.api.v2.responses import apispec_request_validation_middleware, json_request_validation_middleware
from app.api.v2.security import authentication_required_middleware_factory
from app.objects.c_obfuscator import Obfuscator
from app.service.auth_svc import AuthService
from app.utility.base_world import BaseWorld
from app.service.app_svc import AppService
from app.service.data_svc import DataService
Expand Down Expand Up @@ -243,3 +255,74 @@ def _agent_profile(paw=None, group='red', platform='linux', executors=None, priv
)

return _agent_profile


@pytest.fixture
def api_v2_client(loop, aiohttp_client):
def make_app(svcs):
app = web.Application(
middlewares=[
authentication_required_middleware_factory(svcs['auth_svc']),
json_request_validation_middleware
]
)
AbilityApi(svcs).add_routes(app)
return app

async def initialize():
with open(Path(__file__).parents[1] / 'conf' / 'default.yml', 'r') as fle:
BaseWorld.apply_config('main', yaml.safe_load(fle))
with open(Path(__file__).parents[1] / 'conf' / 'payloads.yml', 'r') as fle:
BaseWorld.apply_config('payloads', yaml.safe_load(fle))

app_svc = AppService(web.Application(client_max_size=5120 ** 2))
_ = DataService()
_ = RestService()
_ = PlanningService()
_ = LearningService()
auth_svc = AuthService()
_ = ContactService()
_ = FileSvc()
_ = EventService()
services = app_svc.get_services()
os.chdir(str(Path(__file__).parents[1]))

await app_svc.register_contacts()
await app_svc.load_plugins(['sandcat', 'ssl'])
_ = await RestApi(services).enable()
await auth_svc.apply(app_svc.application, auth_svc.get_config('users'))
await auth_svc.set_login_handlers(services)

app_svc.register_subapp('/api/v2', make_app(svcs=services))
aiohttp_apispec.setup_aiohttp_apispec(
app=app_svc.application,
title='CALDERA',
version=version.get_version(),
swagger_path='/api/docs',
url='/api/docs/swagger.json',
static_path='/static/swagger'
)
app_svc.application.middlewares.append(apispec_request_validation_middleware)
app_svc.application.middlewares.append(validation_middleware)

return app_svc.application

app = loop.run_until_complete(initialize())
return loop.run_until_complete(aiohttp_client(app))


@pytest.fixture
def api_cookies(loop, api_v2_client):
async def get_cookie():
r = await api_v2_client.post('/enter', allow_redirects=False, data=dict(username='admin', password='admin'))
return r.cookies
return loop.run_until_complete(get_cookie())


@pytest.fixture
def async_return():
def _async_return(return_param):
f = asyncio.Future()
f.set_result(return_param)
return f
return _async_return