-
Notifications
You must be signed in to change notification settings - Fork 792
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1503 from guardicore/629/ship-db-with-attack-miti…
…gations Ship database with attack mitigations
- Loading branch information
Showing
20 changed files
with
606 additions
and
356 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,3 @@ | ||
[submodule "monkey/monkey_island/cc/services/attack/attack_data"] | ||
path = monkey/monkey_island/cc/services/attack/attack_data | ||
url = https://github.com/guardicore/cti | ||
[submodule "docs/themes/learn"] | ||
path = docs/themes/learn | ||
url = https://github.com/guardicode/hugo-theme-learn.git |
65 changes: 65 additions & 0 deletions
65
deployment_scripts/dump_attack_mitigations/attack_mitigations.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
from typing import Dict | ||
|
||
from mongoengine import Document, EmbeddedDocument, EmbeddedDocumentField, ListField, StringField | ||
from stix2 import AttackPattern, CourseOfAction | ||
|
||
|
||
class Mitigation(EmbeddedDocument): | ||
name = StringField(required=True) | ||
description = StringField(required=True) | ||
url = StringField() | ||
|
||
@staticmethod | ||
def get_from_stix2_data(mitigation: CourseOfAction): | ||
name = mitigation["name"] | ||
description = mitigation["description"] | ||
url = get_stix2_external_reference_url(mitigation) | ||
return Mitigation(name=name, description=description, url=url) | ||
|
||
|
||
class AttackMitigations(Document): | ||
technique_id = StringField(required=True, primary_key=True) | ||
mitigations = ListField(EmbeddedDocumentField("Mitigation")) | ||
|
||
def add_mitigation(self, mitigation: CourseOfAction): | ||
mitigation_external_ref_id = get_stix2_external_reference_id(mitigation) | ||
if mitigation_external_ref_id.startswith("M"): | ||
self.mitigations.append(Mitigation.get_from_stix2_data(mitigation)) | ||
|
||
def add_no_mitigations_info(self, mitigation: CourseOfAction): | ||
mitigation_external_ref_id = get_stix2_external_reference_id(mitigation) | ||
if mitigation_external_ref_id.startswith("T") and len(self.mitigations) == 0: | ||
mitigation_mongo_object = Mitigation.get_from_stix2_data(mitigation) | ||
mitigation_mongo_object["description"] = mitigation_mongo_object[ | ||
"description" | ||
].splitlines()[0] | ||
mitigation_mongo_object["url"] = "" | ||
self.mitigations.append(mitigation_mongo_object) | ||
|
||
@staticmethod | ||
def dict_from_stix2_attack_patterns(stix2_dict: Dict[str, AttackPattern]): | ||
return { | ||
key: AttackMitigations.mitigations_from_attack_pattern(attack_pattern) | ||
for key, attack_pattern in stix2_dict.items() | ||
} | ||
|
||
@staticmethod | ||
def mitigations_from_attack_pattern(attack_pattern: AttackPattern): | ||
return AttackMitigations( | ||
technique_id=get_stix2_external_reference_id(attack_pattern), | ||
mitigations=[], | ||
) | ||
|
||
|
||
def get_stix2_external_reference_url(stix2_data) -> str: | ||
for reference in stix2_data["external_references"]: | ||
if "url" in reference: | ||
return reference["url"] | ||
return "" | ||
|
||
|
||
def get_stix2_external_reference_id(stix2_data) -> str: | ||
for reference in stix2_data["external_references"]: | ||
if reference["source_name"] == "mitre-attack" and "external_id" in reference: | ||
return reference["external_id"] | ||
return "" |
184 changes: 184 additions & 0 deletions
184
deployment_scripts/dump_attack_mitigations/dump_attack_mitigations.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
import argparse | ||
import json | ||
import subprocess | ||
import time | ||
from pathlib import Path | ||
from typing import Dict, List | ||
|
||
import mongoengine | ||
import pymongo | ||
from attack_mitigations import AttackMitigations | ||
from bson import json_util | ||
from stix2 import AttackPattern, CourseOfAction, FileSystemSource, Filter | ||
|
||
COLLECTION_NAME = "attack_mitigations" | ||
|
||
|
||
def main(): | ||
args = parse_args() | ||
|
||
set_default_mongo_connection(args.database_name, args.mongo_host, args.mongo_port) | ||
|
||
mongo_client = pymongo.MongoClient(host=args.mongo_host, port=args.mongo_port) | ||
database = mongo_client.get_database(args.database_name) | ||
|
||
clean_collection(database) | ||
populate_attack_mitigations(database, Path(args.cti_repo)) | ||
dump_attack_mitigations(database, Path(args.cti_repo), Path(args.dump_file_path)) | ||
|
||
|
||
def parse_args(): | ||
parser = argparse.ArgumentParser( | ||
description="Export attack mitigations from a database", | ||
formatter_class=argparse.ArgumentDefaultsHelpFormatter, | ||
) | ||
parser.add_argument( | ||
"--mongo_host", default="localhost", help="URL for mongo database.", required=False | ||
) | ||
parser.add_argument( | ||
"--mongo-port", | ||
action="store", | ||
default=27017, | ||
type=int, | ||
help="Port for mongo database.", | ||
required=False, | ||
) | ||
parser.add_argument( | ||
"--database-name", | ||
action="store", | ||
default="monkeyisland", | ||
help="Database name inside of mongo.", | ||
required=False, | ||
) | ||
parser.add_argument( | ||
"--cti-repo", | ||
action="store", | ||
default="attack_mitigations", | ||
help="The path to the Cyber Threat Intelligence Repository.", | ||
required=True, | ||
) | ||
parser.add_argument( | ||
"--dump-file-path", | ||
action="store", | ||
default="./attack_mitigations.json", | ||
help="A file path where the database dump will be saved.", | ||
required=False, | ||
) | ||
|
||
return parser.parse_args() | ||
|
||
|
||
def set_default_mongo_connection(database_name: str, host: str, port: int): | ||
mongoengine.connect(db=database_name, host=host, port=port) | ||
|
||
|
||
def clean_collection(database: pymongo.database.Database): | ||
if collection_exists(database, COLLECTION_NAME): | ||
database.drop_collection(COLLECTION_NAME) | ||
|
||
|
||
def collection_exists(database: pymongo.database.Database, collection_name: str) -> bool: | ||
return collection_name in database.list_collection_names() | ||
|
||
|
||
def populate_attack_mitigations(database: pymongo.database.Database, cti_repo: Path): | ||
database.create_collection(COLLECTION_NAME) | ||
attack_data_path = cti_repo / "enterprise-attack" | ||
|
||
stix2_mitigations = get_all_mitigations(attack_data_path) | ||
mongo_mitigations = AttackMitigations.dict_from_stix2_attack_patterns( | ||
get_all_attack_techniques(attack_data_path) | ||
) | ||
mitigation_technique_relationships = get_technique_and_mitigation_relationships( | ||
attack_data_path | ||
) | ||
for relationship in mitigation_technique_relationships: | ||
mongo_mitigations[relationship["target_ref"]].add_mitigation( | ||
stix2_mitigations[relationship["source_ref"]] | ||
) | ||
for relationship in mitigation_technique_relationships: | ||
mongo_mitigations[relationship["target_ref"]].add_no_mitigations_info( | ||
stix2_mitigations[relationship["source_ref"]] | ||
) | ||
for key, mongo_object in mongo_mitigations.items(): | ||
mongo_object.save() | ||
|
||
|
||
def get_all_mitigations(attack_data_path: Path) -> Dict[str, CourseOfAction]: | ||
file_system = FileSystemSource(attack_data_path) | ||
mitigation_filter = [Filter("type", "=", "course-of-action")] | ||
all_mitigations = file_system.query(mitigation_filter) | ||
all_mitigations = {mitigation["id"]: mitigation for mitigation in all_mitigations} | ||
return all_mitigations | ||
|
||
|
||
def get_all_attack_techniques(attack_data_path: Path) -> Dict[str, AttackPattern]: | ||
file_system = FileSystemSource(attack_data_path) | ||
technique_filter = [Filter("type", "=", "attack-pattern")] | ||
all_techniques = file_system.query(technique_filter) | ||
all_techniques = {technique["id"]: technique for technique in all_techniques} | ||
return all_techniques | ||
|
||
|
||
def get_technique_and_mitigation_relationships(attack_data_path: Path) -> List[CourseOfAction]: | ||
file_system = FileSystemSource(attack_data_path) | ||
technique_filter = [ | ||
Filter("type", "=", "relationship"), | ||
Filter("relationship_type", "=", "mitigates"), | ||
] | ||
all_techniques = file_system.query(technique_filter) | ||
return all_techniques | ||
|
||
|
||
def dump_attack_mitigations( | ||
database: pymongo.database.Database, cti_repo: Path, dump_file_path: Path | ||
): | ||
if not collection_exists(database, COLLECTION_NAME): | ||
raise Exception(f"Could not find collection: {COLLECTION_NAME}") | ||
|
||
metadata = get_metadata(cti_repo) | ||
data = get_data_from_database(database) | ||
|
||
json_output = f'{{"metadata":{json.dumps(metadata)},"data":{json_util.dumps(data)}}}' | ||
|
||
with open(dump_file_path, "wb") as jsonfile: | ||
jsonfile.write(json_output.encode()) | ||
|
||
|
||
def get_metadata(cti_repo: Path) -> dict: | ||
timestamp = str(time.time()) | ||
commit_hash = get_commit_hash(cti_repo) | ||
origin_url = get_origin_url(cti_repo) | ||
|
||
return {"timestamp": timestamp, "commit_hash": commit_hash, "origin_url": origin_url} | ||
|
||
|
||
def get_commit_hash(cti_repo: Path) -> str: | ||
return run_command(["git", "rev-parse", "--short", "HEAD"], cti_repo).strip() | ||
|
||
|
||
def get_origin_url(cti_repo: Path) -> str: | ||
return run_command(["git", "remote", "get-url", "origin"], cti_repo).strip() | ||
|
||
|
||
def run_command(cmd: List, cwd: Path = None) -> str: | ||
cp = subprocess.run(cmd, capture_output=True, cwd=cwd, encoding="utf-8") | ||
|
||
if cp.returncode != 0: | ||
raise Exception( | ||
f"Error running command -- Command: {cmd} -- Return Code: {cp.returncode} -- stderr: " | ||
f"{cp.stderr}" | ||
) | ||
|
||
return cp.stdout | ||
|
||
|
||
def get_data_from_database(database: pymongo.database.Database) -> pymongo.cursor.Cursor: | ||
collection = database.get_collection(COLLECTION_NAME) | ||
collection_contents = collection.find() | ||
|
||
return collection_contents | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
13 changes: 13 additions & 0 deletions
13
deployment_scripts/dump_attack_mitigations/requirements.txt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
antlr4-python3-runtime==4.8 | ||
certifi==2021.5.30 | ||
charset-normalizer==2.0.6 | ||
idna==3.2 | ||
mongoengine==0.23.1 | ||
pymongo==3.12.0 | ||
pytz==2021.1 | ||
requests==2.26.0 | ||
simplejson==3.17.5 | ||
six==1.16.0 | ||
stix2==3.0.1 | ||
stix2-patterns==1.3.2 | ||
urllib3==1.26.7 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
--- | ||
title: "MITRE ATT&CK Mitigations" | ||
date: 2021-09-30T08:18:37+03:00 | ||
draft: true | ||
weight: 10 | ||
--- | ||
|
||
{{% notice info %}} | ||
Check out [the documentation for the MITRE ATT&CK techniques as well]({{< ref "/reports/mitre" >}}). | ||
{{% /notice %}} | ||
|
||
## Summary | ||
|
||
Attack Mitigations are presented in MITRE ATT&CK report. They appear next to | ||
descriptions of attack techniques and suggest steps that can be taken to reduce | ||
the risk of that particular technique being successful in a network. They also | ||
provide links for further reading on https://attack.mitre.org/ | ||
|
||
The Infection Monkey is shipped with pre-processed information about MITRE | ||
ATT&CK mitigations located at | ||
`monkey/monkey_island/cc/setup/mongo/attack_mitigations.json`. This may need to | ||
be periodically updated as the MITRE ATT&CK framework evolves. | ||
|
||
|
||
## Updating the MITRE ATT&CK mitigations data | ||
1. Clone the [MITRE Cyber Threat Intelligence | ||
Repository](https://github.com/mitre/cti) or the [Guardicore | ||
fork](https://github.com/guardicore/cti): | ||
``` | ||
$ CTI_REPO=$PWD/cti | ||
$ git clone <REPO> $CTI_REPO | ||
``` | ||
2. Start a MongoDB v4.2 server. | ||
3. Run the script to generate the `attack_mitigations.json` file: | ||
``` | ||
$ cd monkey/deployment_scripts/dump_attack_mitigations | ||
$ pip install -r requirements.txt | ||
$ python dump_attack_mitigations.py --cti-repo $CTI_REPO --dump-file-path ../../monkey/monkey_island/cc/setup/mongo/attack_mitigations.json | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.