Skip to content

Commit

Permalink
report directory, code quality, docs
Browse files Browse the repository at this point in the history
adjusts to default domain reputation visualizer + analyzers urls and abstractmethods (intelowlproject#2250)

* little adjusts

* adjust

* refactored base_url in url to enable healthchecks

* adjust

* added update abstract method

* added logging of intel_owl package

adjusted containers dependencies

Fix

Signed-off-by: 0ssigeno <s.berni@certego.net>

report directory, code quality, docs

adjusts to default domain reputation visualizer + analyzers urls and abstractmethods (intelowlproject#2250)

* little adjusts

* adjust

* refactored base_url in url to enable healthchecks

* adjust

* added update abstract method

* added logging of intel_owl package

adjusted containers dependencies

Fix

Signed-off-by: 0ssigeno <s.berni@certego.net>

RED tlp
  • Loading branch information
g4ze committed Apr 11, 2024
1 parent b91ab75 commit 8522189
Show file tree
Hide file tree
Showing 50 changed files with 397 additions and 126 deletions.
23 changes: 16 additions & 7 deletions api_app/analyzers_manager/file_analyzers/blint_scan.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import os

from blint.analysis import AnalysisRunner
from django.conf import settings
Expand All @@ -10,12 +9,22 @@


class BlintAnalyzer(FileAnalyzer):
reports_dir = os.path.join(settings.MEDIA_ROOT, "reports")
"""
Wrapper for Blint static analysis tool
"""

def run(self):
def update(self) -> bool:
pass

def run(self) -> tuple:
logger.info(f"Running Blint on {self.filepath}")
if not os.path.exists(self.reports_dir):
os.makedirs(self.reports_dir)
# Blint requires a report directory
# that we create during the docker build at
# /opt/deploy/files_required/reports
reports_dir = f"{settings.MEDIA_ROOT}/reports"
analyzer = AnalysisRunner()

return analyzer.start(files=[self.filepath], reports_dir=self.reports_dir)
response = analyzer.start(files=[self.filepath], reports_dir=reports_dir)
logger.info(f"response: {response}")
if response == ([], [], []):
return "No issues found"
return response
121 changes: 121 additions & 0 deletions api_app/analyzers_manager/migrations/0078_analyzer_config_blint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": None,
"module": "blint_scan.BlintAnalyzer",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "Blint",
"description": "[Blint](https://github.com/owasp-dep-scan/blint) is a Binary Linter that checks the security properties and capabilities of your executables.\r\nSupported binary formats:\r\n- Android (apk, aab)\r\n- ELF (GNU, musl)\r\n- PE (exe, dll)\r\n- Mach-O (x64, arm64)",
"disabled": False,
"soft_time_limit": 60,
"routing_key": "default",
"health_check_status": True,
"type": "file",
"docker_based": False,
"maximum_tlp": "RED",
"observable_supported": [],
"supported_filetypes": [
"application/vnd.android.package-archive",
"application/vnd.microsoft.portable-executable",
"application/x-binary",
],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"model": "analyzers_manager.AnalyzerConfig",
}

params = []

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0062_alter_parameter_python_module"),
("analyzers_manager", "0077_analyzer_config_abusix"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
8 changes: 6 additions & 2 deletions api_app/analyzers_manager/observable_analyzers/auth0.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@

class Auth0(classes.ObservableAnalyzer):
name: str = "Auth0"
base_url: str = "https://signals.api.auth0.com/v2.0/ip"
url: str = "https://signals.api.auth0.com/v2.0/ip"

_api_key_name: str

@classmethod
def update(cls) -> bool:
pass

def run(self):
headers = {"X-Auth-Token": self._api_key_name}
url = f"{self.base_url}/{self.observable_name}"
url = f"{self.url}/{self.observable_name}"
response = requests.get(url, headers=headers)
response.raise_for_status()

Expand Down
12 changes: 8 additions & 4 deletions api_app/analyzers_manager/observable_analyzers/binaryedge.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@


class BinaryEdge(classes.ObservableAnalyzer):
base_url: str = "https://api.binaryedge.io/v2/query/"
url: str = "https://api.binaryedge.io/v2/query/"

_api_key_name: str

@classmethod
def update(cls) -> bool:
pass

def config(self, runtime_configuration: Dict):
super().config(runtime_configuration)
self.headers = {"X-Key": self._api_key_name}
Expand All @@ -23,12 +27,12 @@ def run(self):
if self.observable_classification == self.ObservableTypes.IP:
try:
response_recent_ip_info = requests.get(
self.base_url + "ip/" + self.observable_name, headers=self.headers
self.url + "ip/" + self.observable_name, headers=self.headers
)
response_recent_ip_info.raise_for_status()

response_query_ip = requests.get(
self.base_url + "search?query=ip:" + self.observable_name,
self.url + "search?query=ip:" + self.observable_name,
headers=self.headers,
)
response_query_ip.raise_for_status()
Expand All @@ -43,7 +47,7 @@ def run(self):
elif self.observable_classification == self.ObservableTypes.DOMAIN:
try:
response_domain_report = requests.get(
self.base_url + "domains/subdomain/" + self.observable_name,
self.url + "domains/subdomain/" + self.observable_name,
headers=self.headers,
)
results = response_domain_report.json()
Expand Down
4 changes: 2 additions & 2 deletions api_app/analyzers_manager/observable_analyzers/censys.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Censys(classes.ObservableAnalyzer):
def update(self):
pass

base_url = "https://search.censys.io/api/v2"
url = "https://search.censys.io/api/v2"

censys_analysis: str
_api_id_name: str
Expand All @@ -33,7 +33,7 @@ def run(self):
"Supported is IP"
)
response = requests.get(
self.base_url + uri,
self.url + uri,
auth=(self._api_id_name, self._api_secret_name),
headers={
"Accept": "application/json",
Expand Down
6 changes: 3 additions & 3 deletions api_app/analyzers_manager/observable_analyzers/checkphish.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@


class CheckPhish(classes.ObservableAnalyzer):
base_url: str = "https://developers.checkphish.ai/api/neo/scan"
status_url: str = base_url + "/status"
url: str = "https://developers.checkphish.ai/api/neo/scan"
status_url: str = url + "/status"

polling_tries: int
polling_time: float
Expand All @@ -25,7 +25,7 @@ def run(self):
"urlInfo": {"url": self.observable_name},
}

response = requests.post(CheckPhish.base_url, json=json_data)
response = requests.post(CheckPhish.url, json=json_data)
response.raise_for_status()

job_id = response.json().get("jobID")
Expand Down
3 changes: 2 additions & 1 deletion api_app/analyzers_manager/observable_analyzers/crowdsec.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@

class Crowdsec(ObservableAnalyzer):
_api_key_name: str
url: str = "https://cti.api.crowdsec.net"

def run(self):
headers = {
"x-api-key": self._api_key_name,
"User-Agent": f"crowdsec-intelowl/{settings.VERSION}",
}
url = f"https://cti.api.crowdsec.net/v2/smoke/{self.observable_name}"
url = f"{self.url}/v2/smoke/{self.observable_name}"
response = requests.get(url, headers=headers)
if response.status_code == 404:
result = {"not_found": True}
Expand Down
8 changes: 6 additions & 2 deletions api_app/analyzers_manager/observable_analyzers/crxcavator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@

class CRXcavator(classes.ObservableAnalyzer):
name: str = "CRXcavator"
base_url: str = "https://api.crxcavator.io/v1/report/"
url: str = "https://api.crxcavator.io/v1/report/"

@classmethod
def update(cls) -> bool:
pass

def run(self):
try:
response = requests.get(self.base_url + self.observable_name)
response = requests.get(self.url + self.observable_name)
response.raise_for_status()
except requests.RequestException as e:
raise AnalyzerRunException(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@


class DocGuard_Hash(classes.ObservableAnalyzer):
base_url: str = "https://api.docguard.net:8443/api/FileAnalyzing/GetByHash/"
url: str = "https://api.docguard.net:8443/api/FileAnalyzing/GetByHash/"

_api_key_name: str

@classmethod
def update(cls) -> bool:
pass

@property
def hash_type(self):
hash_lengths = {32: "md5", 64: "sha256"}
Expand All @@ -43,7 +47,7 @@ def run(self):
uri = f"{self.observable_name}"
if self.observable_classification == self.ObservableTypes.HASH:
try:
response = requests.get(self.base_url + uri, headers=headers)
response = requests.get(self.url + uri, headers=headers)
response.raise_for_status()
except requests.RequestException as e:
raise AnalyzerRunException(e)
Expand Down
4 changes: 2 additions & 2 deletions api_app/analyzers_manager/observable_analyzers/emailrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


class EmailRep(classes.ObservableAnalyzer):
base_url: str = "https://emailrep.io/{}"
url: str = "https://emailrep.io/{}"

_api_key_name: str

Expand All @@ -32,7 +32,7 @@ def run(self):
f" Supported: generic"
)

url = self.base_url.format(self.observable_name)
url = self.url.format(self.observable_name)

response = requests.get(url, headers=headers)
response.raise_for_status()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@
class FileScanSearch(ObservableAnalyzer):
"""FileScan_Search analyzer"""

base_url: str = "https://www.filescan.io/api/reports/search"
url: str = "https://www.filescan.io/api/reports/search"
_api_key: str

@classmethod
def update(cls) -> bool:
pass

def run(self):
"""Runs the FileScan_Search analyzer"""
observable_name_base64 = base64.b64encode(
self.observable_name.encode()
).decode()
endpoint = "?query={input}"
url = f"{self.base_url}/{endpoint.format(input=observable_name_base64)}"
url = f"{self.url}/{endpoint.format(input=observable_name_base64)}"
try:
response = requests.get(url, headers={"X-Api-Key": self._api_key})
response.raise_for_status()
Expand Down
6 changes: 3 additions & 3 deletions api_app/analyzers_manager/observable_analyzers/ha_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@


class HybridAnalysisGet(ObservableAnalyzer):
base_url: str = "https://www.hybrid-analysis.com"
api_url: str = f"{base_url}/api/v2/"
sample_url: str = f"{base_url}/sample"
url: str = "https://www.hybrid-analysis.com"
api_url: str = f"{url}/api/v2/"
sample_url: str = f"{url}/sample"

_api_key_name: str

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


class HaveIBeenPwned(classes.ObservableAnalyzer):
base_url: str = "https://haveibeenpwned.com/api/v3/breachedaccount/"
url: str = "https://haveibeenpwned.com/api/v3/breachedaccount/"

truncate_response: bool
include_unverified: bool
Expand All @@ -26,7 +26,7 @@ def run(self):
headers = {"hibp-api-key": self._api_key_name}

response = requests.get(
self.base_url + self.observable_name, params=params, headers=headers
self.url + self.observable_name, params=params, headers=headers
)
response.raise_for_status()

Expand Down
Loading

0 comments on commit 8522189

Please sign in to comment.