diff --git a/microsoft-defender/src/openbas_microsoft_defender.py b/microsoft-defender/src/openbas_microsoft_defender.py index 8921119..e4f822e 100644 --- a/microsoft-defender/src/openbas_microsoft_defender.py +++ b/microsoft-defender/src/openbas_microsoft_defender.py @@ -77,6 +77,7 @@ def __init__(self): # TODO Command line # self.relevant_signatures_types = ["process_name", "command_line", "file_name", "hostname", "ipv4_address"] self.relevant_signatures_types = [ + "parent_process_name", "process_name", "file_name", "hostname", @@ -102,6 +103,18 @@ def _extract_process_names(self, alert): process_names.append(evidence.file_details.file_name) return process_names + def _extract_parent_process_name(self, alert): + parent_process_names = [] + for evidence in alert.evidence: + if ( + hasattr(evidence, "parent_process_image_file") + and evidence.parent_process_image_file + ): + parent_process_names.append( + evidence.parent_process_image_file.file_name + ) + return parent_process_names + def _extract_command_lines(self, alert): command_lines = [] for evidence in alert.evidence: @@ -134,7 +147,7 @@ def _extract_ip_addresses(self, alert): def _is_prevented(self, alert): for evidence in alert.evidence: if evidence.odata_type == "#microsoft.graph.security.processEvidence": - if evidence.detection_status in [ + if evidence.detection_status.lower() in [ "prevented", "remediated", "blocked", @@ -174,6 +187,12 @@ def _match_alert(self, endpoint, alert, expectation): "data": self._extract_process_names(alert), "score": 80, } + elif type == "parent_process_name": + alert_data[type] = { + "type": "fuzzy", + "data": self._extract_parent_process_name(alert), + "score": 80, + } elif type == "command_line": alert_data[type] = { "type": "fuzzy", @@ -269,7 +288,9 @@ async def _process_alerts(self, graph_client): ) for i in range(len(alerts.value)): alert = alerts.value[i] - alert_date = parse(str(alert.created_date_time)).astimezone(pytz.UTC) + alert_date = parse(str(alert.last_update_date_time)).astimezone( + pytz.UTC + ) if alert_date > limit_date: result = self._match_alert(endpoint, alert, expectation) if result is not False: diff --git a/microsoft-sentinel/src/openbas_microsoft_sentinel.py b/microsoft-sentinel/src/openbas_microsoft_sentinel.py index 41debee..3a9c4ac 100644 --- a/microsoft-sentinel/src/openbas_microsoft_sentinel.py +++ b/microsoft-sentinel/src/openbas_microsoft_sentinel.py @@ -93,6 +93,7 @@ def __init__(self): # Initialize signatures helper self.relevant_signatures_types = [ + "parent_process_name", "process_name", "command_line", "file_name", @@ -122,6 +123,21 @@ def _extract_process_names(self, columns_index, alert): process_names.append(entity["Name"]) return process_names + def _extract_parent_process_name(self, columns_index, alert): + parent_process_names = [] + entities = json.loads(alert[columns_index["Entities"]]) + for entity in entities: + if "Type" in entity and entity["Type"] == "process": + if "ParentProcess" in entity and "ImageFile" in entity["ParentProcess"]: + if ( + "ImageFile" in entity["ParentProcess"] + and "Name" in entity["ParentProcess"]["ImageFile"] + ): + parent_process_names.append( + entity["ParentProcess"]["ImageFile"]["Name"] + ) + return parent_process_names + def _extract_command_lines(self, columns_index, alert): command_lines = [] entities = json.loads(alert[columns_index["Entities"]]) @@ -161,17 +177,14 @@ def _extract_ip_addresses(self, columns_index, alert): return ip_addresses def _is_prevented(self, columns_index, alert): - extended_properties = json.loads(alert[columns_index["ExtendedProperties"]]) - if "Action" in extended_properties and extended_properties["Action"] in [ - "blocked", - "quarantine", - "remove", - ]: - return True - return False + prevented_keywords = ["blocked", "quarantine", "remove", "prevented"] + alert_name = alert[columns_index["AlertName"]].strip().lower() + result_alert_name = any( + prevented_keyword in alert_name for prevented_keyword in prevented_keywords + ) + return result_alert_name def _match_alert(self, endpoint, columns_index, alert, expectation): - print(alert) self.helper.collector_logger.info( "Trying to match alert " + str(alert[columns_index["SystemAlertId"]]) @@ -201,6 +214,12 @@ def _match_alert(self, endpoint, columns_index, alert, expectation): "data": self._extract_process_names(columns_index, alert), "score": 80, } + if type == "parent_process_name": + alert_data[type] = { + "type": "fuzzy", + "data": self._extract_parent_process_name(columns_index, alert), + "score": 80, + } elif type == "command_line": alert_data[type] = { "type": "fuzzy", @@ -306,7 +325,6 @@ def _process_alerts(self): alert_date = parse( str(alert[columns_index["TimeGenerated"]]) ).astimezone(pytz.UTC) - print(alert) if alert_date > limit_date: result = self._match_alert( endpoint, columns_index, alert, expectation