-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rule plugins simple2 #231
Open
Thorsten-Sick
wants to merge
4
commits into
OWASP:master
Choose a base branch
from
primion:rule_plugins_simple2
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Rule plugins simple2 #231
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
48906d7
Intregated example plugins for rules
Thorsten-Sick c191ac8
Implementing a first SQL injection rule
Thorsten-Sick 4d678b9
Extending basic template. Support for specific comments, CWEs, CAPECs…
Thorsten-Sick 15efac1
Fixing unit tests
Thorsten-Sick File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
""" Base class for all plugin types """ | ||
|
||
|
||
class BasePlugin(): | ||
""" Base class for plugins """ | ||
|
||
def __init__(self): | ||
raise NotImplementedError("Plugin needs an __init__ function") | ||
|
||
def get_name(self): | ||
return self.name |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
from pytm.plugins.base.base_plugin import BasePlugin | ||
|
||
|
||
class PluginThreat(): | ||
""" A threat description """ | ||
# TODO: Untangle the main code and use the Threat class from there here. | ||
|
||
cwes = [] | ||
capecs = [] | ||
ttps = [] | ||
|
||
def __init__(self, element, comment, **kwargs): | ||
|
||
self.data = {"SID": kwargs.get("SID"), | ||
"description": kwargs.get("description", ""), | ||
"condition": kwargs.get("condition", ""), | ||
"target": kwargs.get("target", []), | ||
"details": kwargs.get("details", ""), | ||
"severity": kwargs.get("severity", ""), | ||
"mitigations": kwargs.get("mitigations", ""), | ||
"example": kwargs.get("example", ""), | ||
"references": " ".join(kwargs.get("reference_list", [])), | ||
"specific_comment": kwargs.get("specific_comment", ""), | ||
|
||
"cwes": kwargs.get("cwes", []), | ||
"ttps": kwargs.get("ttps", []), | ||
"capecs": kwargs.get("capecs", []) | ||
} | ||
self.element = element | ||
self.comment = comment | ||
|
||
def to_threatfile_format(self): | ||
""" Returns data in threatfile format """ | ||
return self.data | ||
|
||
|
||
class RuleResult(): | ||
""" Can collect a large variety of detection results. Can be extended beyond threats. This is the reason there is a whole class here to collect that """ | ||
|
||
def __init__(self) -> None: | ||
self._threats = [] | ||
|
||
def add_threat(self, element, comment, **kwargs): | ||
self._threats.append(PluginThreat(element, comment, **kwargs)) | ||
|
||
def get_threats(self): | ||
return self._threats | ||
|
||
|
||
|
||
class RulePlugin(BasePlugin): | ||
""" A rule matching plugin base | ||
|
||
""" | ||
|
||
sid = None | ||
|
||
cwes = [] | ||
capecs = [] | ||
ttps = [] | ||
|
||
def __init__(self): | ||
self.result = RuleResult() | ||
self.elements = [] | ||
|
||
### Entry points | ||
|
||
def threat_check(self, elements): | ||
""" Calls the plugin function to check threats after abstracting internals away """ | ||
|
||
self.elements = elements | ||
|
||
self.threat_match() | ||
|
||
### Generic functions | ||
|
||
def get_type(self, element): | ||
""" returns a type string for an element """ | ||
|
||
# TODO: Move that to the classes | ||
if str((type(element))) == "<class 'pytm.pytm.Boundary'>": | ||
return "Boundary" | ||
if str((type(element))) == "<class 'pytm.pytm.Datastore'>": | ||
return "Datastore" | ||
if str((type(element))) == "<class 'pytm.pytm.Dataflow'>": | ||
return "Dataflow" | ||
if str((type(element))) == "<class 'pytm.pytm.Server'>": | ||
return "Server" | ||
if str((type(element))) == "<class 'pytm.pytm.Actor'>": | ||
return "Actor" | ||
|
||
def get_elements(self): | ||
return self.elements | ||
|
||
### Threat things | ||
|
||
def add_threat(self, element, comment): | ||
""" Add a threat to the results | ||
|
||
@param element: the threat is tied to | ||
@param comment: used comment for this threat | ||
|
||
""" | ||
data = {"SID": self.SID, | ||
"description": self.description, | ||
"condition": self.condition, | ||
"target": self.target, | ||
"details": self.details, | ||
"severity": self.severity, | ||
"mitigations": self.mitigations, | ||
"example": self.example, | ||
"reference_list": self.reference_list, | ||
"specific_comment": comment, | ||
|
||
"ttps": self.ttps, | ||
"capecs": self.capecs, | ||
"cwes": self.cwes, | ||
} | ||
|
||
self.result.add_threat(element, comment, **data) | ||
|
||
def get_threats(self): | ||
""" Read threats from the collection """ | ||
return self.result.get_threats() | ||
|
||
def get_id(self): | ||
return self.SID | ||
|
||
def get_description(self): | ||
return self.description | ||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
from pytm.plugins.base.rule_plugin import RulePlugin, RuleResult | ||
from pytm import DatastoreType | ||
|
||
|
||
class StrictSQLInjectionRulePlugin(RulePlugin): | ||
# Boilerplate | ||
name = "strict_sql_injection" | ||
description = "A strict SQL injection rule" | ||
|
||
SID = "EXP01" | ||
details = "A SQL datastore is connected to a web server which does not sanitize inputs. This web server can be accessed by an actor" | ||
LikelihoodOfAttack = "High" | ||
severity = "High" | ||
condition = "A SQL datastore is connected to a web server which does not sanitize inputs. This web server can be accessed by an actor" | ||
prerequisites = "" | ||
mitigations = "Sanitize input to protect the SQL server. Use PreparedStatements" | ||
example = "" | ||
reference_list = [] | ||
target = [] | ||
|
||
cwes = ["89", "1286"] | ||
capecs = ["66"] | ||
ttps = ["T1190"] | ||
|
||
def __init__(self): | ||
super().__init__() | ||
self.plugin_path = __file__ | ||
|
||
def connected_elements(self, element): | ||
""" Lists all elements connected by Dataflows to this element """ | ||
res = [] | ||
|
||
for a_dataflow in self.get_elements(): | ||
if self.get_type(a_dataflow) == "Dataflow": | ||
if a_dataflow.source == element: | ||
res.append(a_dataflow.sink) | ||
if a_dataflow.sink == element: | ||
res.append(a_dataflow.source) | ||
return res | ||
|
||
def threat_match(self): | ||
""" Specific SQL injection test. Extra specific to test the power of plugin rules. | ||
|
||
A SQL datastore is connected to a web server which does not sanitize inputs. This web server can be accessed by an actor . | ||
""" | ||
for a_database in self.get_elements(): | ||
if self.get_type(a_database) == "Datastore" and a_database.type == DatastoreType.SQL: | ||
servers_connected_to_database = self.connected_elements(a_database) | ||
for a_webserver in servers_connected_to_database: | ||
# Is connected to a web server which does not sanitize input | ||
if self.get_type(a_webserver) == "Server" and a_webserver.controls.sanitizesInput == False: | ||
users_connected_to_server = self.connected_elements(a_webserver) | ||
# Check all connections of this web server, is a user connected (="Actor") | ||
for a_user in users_connected_to_server: | ||
if self.get_type(a_user) == "Actor": | ||
self.add_threat(a_database, comment = f"The user '{a_user.name}' could run SQL injection attacks on '{a_database.name}' via '{a_webserver.name}'") | ||
|
||
|
||
|
||
|
||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be a function of an Element rather than in the rule definition? Seems it could be reused for other things, and taking a couple of extra parameters perhaps to be generically useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be better. I want to make changes by this feature in the core code as minimal as possible. Maybe create a separate PR for the Element upgrade ? Which parameters do you propose ?