Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rule plugins simple2 #231

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/basic_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

|Assumptions|
|-----------|
{tm.assumptions:repeat:|{{item}}|
{tm.assumptions:repeat:|{{item}}|
}

 
Expand Down Expand Up @@ -62,6 +62,15 @@ Name|Description|Classification
<p>{{item.mitigations}}</p>
<h6>References</h6>
<p>{{item.references}}</p>
<h6>Comment</h6>
<p>{{item.specific_comment}}</p>
<h6>CWEs</h6>
<p>{{item.cwes}}</p>
<h6>TTPs</h6>
<p>{{item.ttps}}</p>
<h6>CAPECs</h6>
<p>{{item.capecs}}</p>

&nbsp;
&nbsp;
&emsp;
Expand Down
11 changes: 11 additions & 0 deletions pytm/plugins/base/base_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
""" Base class for all plugin types """


class BasePlugin():
""" Base class for plugins """

def __init__(self):
raise NotImplementedError("Plugin needs an __init__ function")

def get_name(self):
return self.name
133 changes: 133 additions & 0 deletions pytm/plugins/base/rule_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from pytm.plugins.base.base_plugin import BasePlugin


class PluginThreat():
""" A threat description """
# TODO: Untangle the main code and use the Threat class from there here.

cwes = []
capecs = []
ttps = []

def __init__(self, element, comment, **kwargs):

self.data = {"SID": kwargs.get("SID"),
"description": kwargs.get("description", ""),
"condition": kwargs.get("condition", ""),
"target": kwargs.get("target", []),
"details": kwargs.get("details", ""),
"severity": kwargs.get("severity", ""),
"mitigations": kwargs.get("mitigations", ""),
"example": kwargs.get("example", ""),
"references": " ".join(kwargs.get("reference_list", [])),
"specific_comment": kwargs.get("specific_comment", ""),

"cwes": kwargs.get("cwes", []),
"ttps": kwargs.get("ttps", []),
"capecs": kwargs.get("capecs", [])
}
self.element = element
self.comment = comment

def to_threatfile_format(self):
""" Returns data in threatfile format """
return self.data


class RuleResult():
""" Can collect a large variety of detection results. Can be extended beyond threats. This is the reason there is a whole class here to collect that """

def __init__(self) -> None:
self._threats = []

def add_threat(self, element, comment, **kwargs):
self._threats.append(PluginThreat(element, comment, **kwargs))

def get_threats(self):
return self._threats



class RulePlugin(BasePlugin):
""" A rule matching plugin base

"""

sid = None

cwes = []
capecs = []
ttps = []

def __init__(self):
self.result = RuleResult()
self.elements = []

### Entry points

def threat_check(self, elements):
""" Calls the plugin function to check threats after abstracting internals away """

self.elements = elements

self.threat_match()

### Generic functions

def get_type(self, element):
""" returns a type string for an element """

# TODO: Move that to the classes
if str((type(element))) == "<class 'pytm.pytm.Boundary'>":
return "Boundary"
if str((type(element))) == "<class 'pytm.pytm.Datastore'>":
return "Datastore"
if str((type(element))) == "<class 'pytm.pytm.Dataflow'>":
return "Dataflow"
if str((type(element))) == "<class 'pytm.pytm.Server'>":
return "Server"
if str((type(element))) == "<class 'pytm.pytm.Actor'>":
return "Actor"

def get_elements(self):
return self.elements

### Threat things

def add_threat(self, element, comment):
""" Add a threat to the results

@param element: the threat is tied to
@param comment: used comment for this threat

"""
data = {"SID": self.SID,
"description": self.description,
"condition": self.condition,
"target": self.target,
"details": self.details,
"severity": self.severity,
"mitigations": self.mitigations,
"example": self.example,
"reference_list": self.reference_list,
"specific_comment": comment,

"ttps": self.ttps,
"capecs": self.capecs,
"cwes": self.cwes,
}

self.result.add_threat(element, comment, **data)

def get_threats(self):
""" Read threats from the collection """
return self.result.get_threats()

def get_id(self):
return self.SID

def get_description(self):
return self.description



61 changes: 61 additions & 0 deletions pytm/plugins/rules/specific_sql_injection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from pytm.plugins.base.rule_plugin import RulePlugin, RuleResult
from pytm import DatastoreType


class StrictSQLInjectionRulePlugin(RulePlugin):
# Boilerplate
name = "strict_sql_injection"
description = "A strict SQL injection rule"

SID = "EXP01"
details = "A SQL datastore is connected to a web server which does not sanitize inputs. This web server can be accessed by an actor"
LikelihoodOfAttack = "High"
severity = "High"
condition = "A SQL datastore is connected to a web server which does not sanitize inputs. This web server can be accessed by an actor"
prerequisites = ""
mitigations = "Sanitize input to protect the SQL server. Use PreparedStatements"
example = ""
reference_list = []
target = []

cwes = ["89", "1286"]
capecs = ["66"]
ttps = ["T1190"]

def __init__(self):
super().__init__()
self.plugin_path = __file__

def connected_elements(self, element):
Copy link
Collaborator

@colesmj colesmj Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be a function of an Element rather than in the rule definition? Seems it could be reused for other things, and taking a couple of extra parameters perhaps to be generically useful.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be better. I want to make changes by this feature in the core code as minimal as possible. Maybe create a separate PR for the Element upgrade ? Which parameters do you propose ?

""" Lists all elements connected by Dataflows to this element """
res = []

for a_dataflow in self.get_elements():
if self.get_type(a_dataflow) == "Dataflow":
if a_dataflow.source == element:
res.append(a_dataflow.sink)
if a_dataflow.sink == element:
res.append(a_dataflow.source)
return res

def threat_match(self):
""" Specific SQL injection test. Extra specific to test the power of plugin rules.

A SQL datastore is connected to a web server which does not sanitize inputs. This web server can be accessed by an actor .
"""
for a_database in self.get_elements():
if self.get_type(a_database) == "Datastore" and a_database.type == DatastoreType.SQL:
servers_connected_to_database = self.connected_elements(a_database)
for a_webserver in servers_connected_to_database:
# Is connected to a web server which does not sanitize input
if self.get_type(a_webserver) == "Server" and a_webserver.controls.sanitizesInput == False:
users_connected_to_server = self.connected_elements(a_webserver)
# Check all connections of this web server, is a user connected (="Actor")
for a_user in users_connected_to_server:
if self.get_type(a_user) == "Actor":
self.add_threat(a_database, comment = f"The user '{a_user.name}' could run SQL injection attacks on '{a_database.name}' via '{a_webserver.name}'")





Loading