diff --git a/stix_shifter_modules/abuseipdb/__init__.py b/stix_shifter_modules/abuseipdb/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/stix_shifter_modules/abuseipdb/configuration/config.json b/stix_shifter_modules/abuseipdb/configuration/config.json new file mode 100644 index 000000000..1dd71034b --- /dev/null +++ b/stix_shifter_modules/abuseipdb/configuration/config.json @@ -0,0 +1,105 @@ +{ + "connection": { + "type": { + "id": "AbuseIPDB_Connector", + "displayName": "AbuseIPDB", + "description": "Determine whether an IP was reported or not as malicious by AbuseIPDB." + }, + "options": { + "type": "fields", + "concurrent": { + "default": 4, + "min": 1, + "max": 100, + "type": "number", + "previous": "connection.maxConcurrentSearches" + }, + "result_limit": { + "default": 10000, + "min": 1, + "max": 500000, + "type": "number", + "previous": "connection.resultSizeLimit", + "hidden": true + }, + "time_range": { + "default": 5, + "min": 1, + "max": 10000, + "type": "number", + "previous": "connection.timerange", + "nullable": true, + "hidden": true + }, + "timeout": { + "default": 30, + "min": 1, + "max": 60, + "type": "number", + "previous": "connection.timeoutLimit" + } + }, + "help": { + "default": "www.ibm.com", + "type": "link" + }, + "namespace":{ + "type": "text", + "default": "9d4bedaf-d351-4f50-930f-f8eb121e5bae", + "hidden": true + }, + "host": { + "type": "text", + "default": "", + "hidden": true + }, + "port": { + "default": 443, + "type": "number", + "min": 1, + "max": 65535, + "hidden": true + } + }, + "configuration": { + "auth": { + "type" : "fields", + "key":{ + "type":"password" + } + }, + "rateLimit": { + "type": "fields", + "rateLimit": { + "default": 1000, + "type": "number", + "hidden": true + }, + "rateUnit": { + "type": "text", + "default": "Day", + "hidden": true + } + }, + "cacheDuration": { + "type": "fields", + "cacheDuration": { + "default": 10, + "type": "number", + "hidden": true + }, + "unit": { + "default": "Minute", + "type": "text", + "hidden": true + } + }, + "dataTypeList": { + "type": "fields", + "ip": { + "type": "checkbox", + "default": true + } + } + } +} \ No newline at end of file diff --git a/stix_shifter_modules/abuseipdb/configuration/lang_en.json b/stix_shifter_modules/abuseipdb/configuration/lang_en.json new file mode 100644 index 000000000..416e23dc3 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/configuration/lang_en.json @@ -0,0 +1,61 @@ +{ + "connection": { + "options": { + "concurrent": { + "label": "Concurrent Search Limit", + "description": "The number of simultaneous connections that can be made between the host and the data source. Valid input range is {{min}} to {{max}}." + }, + "search_timeout": { + "label": "Query Search Timeout Limit", + "description": "The limit on how long the query will run, in minutes, on the data source." + } + }, + "host": { + "label": "Management IP address or Hostname", + "placeholder": "192.168.1.10", + "description": "Specify the OCP Cluster hostname or the XForce API host URL" + }, + "port": { + "label": "Host Port", + "description": "Set the port number that is associated with the Host name or IP" + }, + "namespace": { + "label": "The UUID Namespace to generate unique ", + "description": "Supply a UUID to generate deterministic UUIDs for the resulting STIX bundle" + } + }, + "configuration": { + "auth": { + "key": { + "label": "Key", + "description": "The APIKey for AbuseIPDB Threat Feed" + } + }, + "rateLimit": { + "rateLimit": { + "label": "Rate Limit", + "description": "The number of queries allowed by AbuseIPDB" + }, + "rateUnit": { + "label": "Rate Unit", + "description": "The rate unit for rate limit in [seconds, minutes, days, months, years ...]" + } + }, + "cacheDuration": { + "cacheDuration": { + "label": "Cache Duration", + "description": "How long should we cache the results of the STIX Bundle execution?" + }, + "unit": { + "label": "Rate Unit", + "description": "The unit for cache in [seconds, minutes, days, months, years ...]" + } + }, + "dataTypeList": { + "ip": { + "label": "IP Address", + "description": "Whether IP Address lookup queries are supported by AbuseIPDB based on the User's API Provisioning" + } + } + } +} \ No newline at end of file diff --git a/stix_shifter_modules/abuseipdb/entry_point.py b/stix_shifter_modules/abuseipdb/entry_point.py new file mode 100644 index 000000000..4f3cdf834 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/entry_point.py @@ -0,0 +1,40 @@ +from stix_shifter_utils.utils.base_entry_point import BaseEntryPoint +from stix_shifter_utils.modules.base.stix_transmission.base_sync_connector import BaseSyncConnector +from .stix_transmission.ping_connector import PingConnector +from .stix_transmission.delete_connector import DeleteConnector +from .stix_transmission.results_connector import ResultsConnector +from .stix_transmission.api_client import APIClient +from .stix_translation.query_translator import QueryTranslator +from .stix_translation.results_translator import ResultsTranslator +import os + +class EntryPoint(BaseEntryPoint): + + def __init__(self, connection={}, configuration={}, options={}): + super().__init__(connection, configuration, options) + self.set_async(False) + if connection: + api_client = APIClient(connection, configuration) + base_sync_connector = BaseSyncConnector() + ping_connector = PingConnector(api_client) + query_connector = base_sync_connector + status_connector = base_sync_connector + results_connector = ResultsConnector(api_client) + delete_connector = DeleteConnector(api_client) + + self.set_results_connector(results_connector) + self.set_status_connector(status_connector) + self.set_delete_connector(delete_connector) + self.set_query_connector(query_connector) + self.set_ping_connector(ping_connector) + + # Use default translation setup with default dialect otherwise... + # self.setup_translation_simple(dialect_default='default') + + basepath = os.path.dirname(__file__) + filepath = os.path.abspath(os.path.join(basepath, "stix_translation")) + + dialect = 'default' + query_translator = QueryTranslator(options, dialect, filepath) + results_translator = ResultsTranslator(options, dialect, filepath) + self.add_dialect(dialect, query_translator=query_translator, results_translator=results_translator, default=True) \ No newline at end of file diff --git a/stix_shifter_modules/abuseipdb/requirements.txt b/stix_shifter_modules/abuseipdb/requirements.txt new file mode 100644 index 000000000..4336eaa01 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/requirements.txt @@ -0,0 +1 @@ +uuid==1.30 \ No newline at end of file diff --git a/stix_shifter_modules/abuseipdb/stix_translation/__init__.py b/stix_shifter_modules/abuseipdb/stix_translation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/stix_shifter_modules/abuseipdb/stix_translation/json/from_stix_map.json b/stix_shifter_modules/abuseipdb/stix_translation/json/from_stix_map.json new file mode 100644 index 000000000..ee5b48298 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/stix_translation/json/from_stix_map.json @@ -0,0 +1,12 @@ +{ + "ipv4-addr": { + "fields": { + "value":["SourceIpV4", "DestinationIpV4"] + } + }, + "ipv6-addr": { + "fields":{ + "value":["SourceIpV6", "DestinationIpV6"] + } + } +} \ No newline at end of file diff --git a/stix_shifter_modules/abuseipdb/stix_translation/json/operators.json b/stix_shifter_modules/abuseipdb/stix_translation/json/operators.json new file mode 100644 index 000000000..03ff0cbd4 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/stix_translation/json/operators.json @@ -0,0 +1,16 @@ +{ + "ComparisonExpressionOperators.And": "AND", + "ComparisonExpressionOperators.Or": "OR", + "ComparisonComparators.GreaterThan": ">", + "ComparisonComparators.GreaterThanOrEqual": ">=", + "ComparisonComparators.LessThan": "<", + "ComparisonComparators.LessThanOrEqual": "<=", + "ComparisonComparators.Equal": "=", + "ComparisonComparators.NotEqual": "!=", + "ComparisonComparators.Like": "=", + "ComparisonComparators.In": "IN", + "ComparisonComparators.Matches": "CONTAINS", + "ComparisonComparators.IsSubSet": "insubnet", + "ObservationOperators.Or": "OR", + "ObservationOperators.And": "AND" +} \ No newline at end of file diff --git a/stix_shifter_modules/abuseipdb/stix_translation/json/to_stix_map.json b/stix_shifter_modules/abuseipdb/stix_translation/json/to_stix_map.json new file mode 100644 index 000000000..077404aaa --- /dev/null +++ b/stix_shifter_modules/abuseipdb/stix_translation/json/to_stix_map.json @@ -0,0 +1,3 @@ +{ + +} \ No newline at end of file diff --git a/stix_shifter_modules/abuseipdb/stix_translation/query_constructor.py b/stix_shifter_modules/abuseipdb/stix_translation/query_constructor.py new file mode 100644 index 000000000..c5cb3b011 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/stix_translation/query_constructor.py @@ -0,0 +1,226 @@ +from stix_shifter_utils.stix_translation.src.patterns.pattern_objects import ObservationExpression, ComparisonExpression, \ + ComparisonExpressionOperators, ComparisonComparators, Pattern, \ + CombinedComparisonExpression, CombinedObservationExpression, ObservationOperators +from stix_shifter_utils.stix_translation.src.utils.transformers import TimestampToMilliseconds +from stix_shifter_utils.stix_translation.src.json_to_stix import observable +import logging +import re + +# Source and destination reference mapping for ip and mac addresses. +# Change the keys to match the data source fields. The value array indicates the possible data type that can come into from field. +REFERENCE_DATA_TYPES = {"SourceIpV4": ["ipv4", "ipv4_cidr"], + "SourceIpV6": ["ipv6"], + "DestinationIpV4": ["ipv4", "ipv4_cidr"], + "DestinationIpV6": ["ipv6"]} + +logger = logging.getLogger(__name__) +data = "" +dataType = "" + +class QueryStringPatternTranslator: + # Change comparator values to match with supported data source operators + def __init__(self, pattern: Pattern, data_model_mapper): + self.dmm = data_model_mapper + self.comparator_lookup = self.dmm.map_comparator() + self.pattern = pattern + self.translated = self.parse_expression(pattern) + + @staticmethod + def _format_set(values) -> str: + gen = values.element_iterator() + return "({})".format(' OR '.join([QueryStringPatternTranslator._escape_value(value) for value in gen])) + + @staticmethod + def _format_match(value) -> str: + raw = QueryStringPatternTranslator._escape_value(value) + if raw[0] == "^": + raw = raw[1:] + else: + raw = ".*" + raw + if raw[-1] == "$": + raw = raw[0:-1] + else: + raw = raw + ".*" + return "\'{}\'".format(raw) + + @staticmethod + def _format_equality(value) -> str: + return '\'{}\''.format(value) + + @staticmethod + def _format_like(value) -> str: + value = "'%{value}%'".format(value=value) + return QueryStringPatternTranslator._escape_value(value) + + @staticmethod + def _escape_value(value, comparator=None) -> str: + if isinstance(value, str): + return '{}'.format(value.replace('\\', '\\\\').replace('\"', '\\"').replace('(', '\\(').replace(')', '\\)')) + else: + return value + + @staticmethod + def _negate_comparison(comparison_string): + return "NOT ({})".format(comparison_string) + + @staticmethod + def _check_value_type(value): + value = str(value) + for key, pattern in observable.REGEX.items(): + if key != 'date' and bool(re.search(pattern, value)): + return key + return None + + @staticmethod + def _parse_reference(self, stix_field, value_type, mapped_field, value, comparator): + if value_type not in REFERENCE_DATA_TYPES["{}".format(mapped_field)]: + return None + else: + return "{mapped_field} {comparator} {value}".format( + mapped_field=mapped_field, comparator=comparator, value=value) + + @staticmethod + def _parse_mapped_fields(self, expression, value, comparator, stix_field, mapped_fields_array): + comparison_string = "" + is_reference_value = self._is_reference_value(stix_field) + # Need to use expression.value to match against regex since the passed-in value has already been formated. + value_type = self._check_value_type(expression.value) if is_reference_value else None + mapped_fields_count = 1 if is_reference_value else len(mapped_fields_array) + + for mapped_field in mapped_fields_array: + if is_reference_value: + parsed_reference = self._parse_reference(self, stix_field, value_type, mapped_field, value, comparator) + if not parsed_reference: + continue + comparison_string += parsed_reference + else: + comparison_string += "{mapped_field} {comparator} {value}".format(mapped_field=mapped_field, comparator=comparator, value=value) + + if (mapped_fields_count > 1): + comparison_string += " OR " + mapped_fields_count -= 1 + return comparison_string + + @staticmethod + def _is_reference_value(stix_field): + return stix_field == 'src_ref.value' or stix_field == 'dst_ref.value' + + @staticmethod + def _lookup_comparison_operator(self, expression_operator): + if str(expression_operator) not in self.comparator_lookup: + raise NotImplementedError("Comparison operator {} unsupported for Dummy connector".format(expression_operator.name)) + return self.comparator_lookup[str(expression_operator)] + + def _parse_expression(self, expression, qualifier=None) -> str: + if isinstance(expression, ComparisonExpression): # Base Case + # Resolve STIX Object Path to a field in the target Data Model + stix_object, stix_field = expression.object_path.split(':') + mapped_fields_array = self.dmm.map_field(stix_object, stix_field) + comparator = self._lookup_comparison_operator(self, expression.comparator) + if stix_field == 'start' or stix_field == 'end': + transformer = TimestampToMilliseconds() + expression.value = transformer.transform(expression.value) + # Some values are formatted differently based on how they're being compared + if expression.comparator == ComparisonComparators.Matches: # needs forward slashes + value = self._format_match(expression.value) + # should be (x, y, z, ...) + elif expression.comparator == ComparisonComparators.In: + value = self._format_set(expression.value) + elif expression.comparator == ComparisonComparators.Equal or expression.comparator == ComparisonComparators.NotEqual: + # Should be in single-quotes + value = self._format_equality(expression.value) + # '%' -> '*' wildcard, '_' -> '?' single wildcard + elif expression.comparator == ComparisonComparators.Like: + value = self._format_like(expression.value) + else: + value = self._escape_value(expression.value) + + get_data_source_query(stix_field=stix_field, stix_object=stix_object, value=value) + + comparison_string = self._parse_mapped_fields(self, expression, value, comparator, stix_field, mapped_fields_array) + if(len(mapped_fields_array) > 1 and not self._is_reference_value(stix_field)): + # More than one data source field maps to the STIX attribute, so group comparisons together. + grouped_comparison_string = "(" + comparison_string + ")" + comparison_string = grouped_comparison_string + + if expression.negated: + comparison_string = self._negate_comparison(comparison_string) + if qualifier is not None: + return "{} {}".format(comparison_string, qualifier) + else: + return "{}".format(comparison_string) + + elif isinstance(expression, CombinedComparisonExpression): + operator = self._lookup_comparison_operator(self, expression.operator) + expression_01 = self._parse_expression(expression.expr1) + expression_02 = self._parse_expression(expression.expr2) + if not expression_01 or not expression_02: + return '' + if isinstance(expression.expr1, CombinedComparisonExpression): + expression_01 = "({})".format(expression_01) + if isinstance(expression.expr2, CombinedComparisonExpression): + expression_02 = "({})".format(expression_02) + query_string = "{} {} {}".format(expression_01, operator, expression_02) + if qualifier is not None: + return "{} {}".format(query_string, qualifier) + else: + return "{}".format(query_string) + elif isinstance(expression, ObservationExpression): + return self._parse_expression(expression.comparison_expression, qualifier) + elif hasattr(expression, 'qualifier') and hasattr(expression, 'observation_expression'): + if isinstance(expression.observation_expression, CombinedObservationExpression): + operator = self._lookup_comparison_operator(self, expression.observation_expression.operator) + expression_01 = self._parse_expression(expression.observation_expression.expr1) + # qualifier only needs to be passed into the parse expression once since it will be the same for both expressions + expression_02 = self._parse_expression(expression.observation_expression.expr2, expression.qualifier) + return "{} {} {}".format(expression_01, operator, expression_02) + else: + return self._parse_expression(expression.observation_expression.comparison_expression, expression.qualifier) + elif isinstance(expression, CombinedObservationExpression): + operator = self._lookup_comparison_operator(self, expression.operator) + expression_01 = self._parse_expression(expression.expr1) + expression_02 = self._parse_expression(expression.expr2) + if expression_01 and expression_02: + return "({}) {} ({})".format(expression_01, operator, expression_02) + elif expression_01: + return "{}".format(expression_01) + elif expression_02: + return "{}".format(expression_02) + else: + return '' + elif isinstance(expression, Pattern): + return "{expr}".format(expr=self._parse_expression(expression.expression)) + else: + raise RuntimeError("Unknown Recursion Case for expression={}, type(expression)={}".format( + expression, type(expression))) + + def parse_expression(self, pattern: Pattern): + return self._parse_expression(pattern) + + +def translate_pattern(pattern: Pattern, data_model_mapping, options): + # Query result limit and time range can be passed into the QueryStringPatternTranslator if supported by the data source. + query = QueryStringPatternTranslator(pattern, data_model_mapping).translated + # Add space around START STOP qualifiers + query = re.sub("START", "START ", query) + query = re.sub("STOP", " STOP ", query) + + translated_query = {"data": data, "dataType": dataType} + return [str(translated_query)] + +def get_data_source_query(stix_field, stix_object, value): + global data, dataType + dataType = get_data_type(stix_object, stix_field) # can ipv4-addr, ipv6-addr, url, domain, hash + data = value.replace("'", "") + +def get_data_type(stix_object, stix_field): + if "ipv4" in stix_object or "ipv6" in stix_object: + return "ip" + elif "url" in stix_object: + return "url" + elif "domain-name" in stix_object: + return "domain" + elif "file" in stix_object and "hashes" in stix_field: + return "hash" + else: + return "Unsupported Data Type" \ No newline at end of file diff --git a/stix_shifter_modules/abuseipdb/stix_translation/query_translator.py b/stix_shifter_modules/abuseipdb/stix_translation/query_translator.py new file mode 100644 index 000000000..575372d07 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/stix_translation/query_translator.py @@ -0,0 +1,26 @@ +import logging + +from stix_shifter_utils.modules.base.stix_translation.base_query_translator import BaseQueryTranslator +from . import query_constructor + +logger = logging.getLogger(__name__) + + +class QueryTranslator(BaseQueryTranslator): + + def transform_antlr(self, data, antlr_parsing_object): + """ + Transforms STIX pattern into a different query format. Based on a mapping file + :param antlr_parsing_object: Antlr parsing objects for the STIX pattern + :type antlr_parsing_object: object + :param mapping: The mapping file path to use as instructions on how to transform the given STIX query into another format. This should default to something if one isn't passed in + :type mapping: str (filepath) + :return: transformed query string + :rtype: str + """ + + logger.info("Converting STIX2 Pattern to data source query") + + query_string = query_constructor.translate_pattern( + antlr_parsing_object, self, self.options) + return query_string diff --git a/stix_shifter_modules/abuseipdb/stix_translation/results_translator.py b/stix_shifter_modules/abuseipdb/stix_translation/results_translator.py new file mode 100644 index 000000000..321f6a59d --- /dev/null +++ b/stix_shifter_modules/abuseipdb/stix_translation/results_translator.py @@ -0,0 +1,253 @@ +from stix_shifter_utils.modules.base.stix_translation.base_results_translator import BaseResultTranslator +from . import sdo_translator +import json +from ipaddress import ip_network, IPv4Network, IPv6Network +import uuid +from urllib.parse import urlparse +# from stix_shifter_utils.normalization.normalization_helper import create_attributes, evaluate_attribute_type + + +def create_attributes(attribute_fields, data): + threat_attribute_report = [] + if isinstance(attribute_fields, dict): + for attribute, value in attribute_fields.items(): + if attribute in data: + attribute_dict = {} + attribute_dict['attribute_name'] = value + attribute_dict['attribute_value'] = str(data[attribute]) + attribute_dict['attribute_type'] = evaluate_attribute_type(data[attribute]) + threat_attribute_report.append(attribute_dict) if attribute_dict['attribute_type'] is not None else '' + elif type(attribute_fields) is str: + attribute_dict = {} + attribute_dict['attribute_name'] = attribute_fields + attribute_dict['attribute_value'] = str(data) + attribute_dict['attribute_type'] = evaluate_attribute_type(data) + threat_attribute_report.append(attribute_dict) if attribute_dict['attribute_type'] is not None else '' + return threat_attribute_report + + +def evaluate_attribute_type(attribute): + # supported types = string, number, uri, ip, lat_lng + attribute_type = None + if isinstance(attribute, bool): + attribute_type = 'string' if attribute is True else None + elif isinstance(attribute, (int, float, complex)): + attribute_type = 'number' + if isinstance(attribute, (str)): + attribute_type = 'string' + if uri_validator(attribute): + attribute_type = 'uri' + try: + if isinstance(ip_network(attribute), (IPv4Network, IPv6Network)): + attribute_type = 'ip' + except ValueError: + pass + + return attribute_type + + +def uri_validator(x): + result = urlparse(x) + return all([result.scheme, result.netloc]) + + +class ResultsTranslator(BaseResultTranslator): + + # Get indicator_types value + def get_indicator_types(self, value): + if (value>=0 and value<=25): + return 'benign' + elif (value>=26 and value<=74): + return 'anomalous-activity' + elif (value>=75 and value<=100): + return 'malicious-activity' + # if key/value pair is not included in the report + elif value is None: + return 'unknown' + else: + return 'unknown' + + def get_ip_address(self, ip): + return 'ipv4' if isinstance(ip_network(ip), IPv4Network) else 'ipv6' + + #Get permalink from the report + def get_external_reference_from_json(self, data): + ext_data = data['external_reference'] + if ext_data.get('url') == '' or ext_data.get('url') == 'N/A': + return None + url = data['external_reference'] + external_reference = {"external_references":[url]} + return external_reference + + #return optional fields for indicators such as name, description, indicator-type, kill-chain-phases etc + def get_optional_values(self, value): + indicator_types_value = self.get_indicator_types(value) + indicator_types = {"indicator_types": [indicator_types_value]} if indicator_types_value is not None else None + return indicator_types + + # Get required pattern field from the report, the pattern is a combination of data and dataType fields in the Analyzer result JSON + def get_pattern_from_json(self, data): + pattern_type, pattern_value = data['dataType'], data['data'] + pattern = self.evaluate_pattern(pattern_type, pattern_value) + pattern = {"pattern": pattern} + return pattern + + def evaluate_pattern(self, pattern_type, value): + pattern = [] + # "dataTypeList": ["file", "hash", "domain", "ip", "url", "unknown"], + if pattern_type == 'ip': + pattern_type = self.get_ip_address(value) + pattern = "["+ pattern_type + "-addr:value='" + value + "']" + return pattern + + return pattern + + def get_threat_score(self, threat_score): + BENIGN_SCORE_MIN = 0 + BENIGN_SCORE_MAX = 9 + UNKNOWN_SCORE_MIN = 10 + SUSPICIOUS_SCORE_MIN = 30 + SUSPICIOUS_SCORE_MAX = 69 + MALICIOUS_SCORE_MIN = 70 + MALICIOUS_SCORE_MAX = 100 + + if threat_score is None: + return {"threat_score": UNKNOWN_SCORE_MIN} + + # Declare constants + + range_min = 0 + range_max = 100 + + if(threat_score>=0 and threat_score<=25): + tis_range_min = BENIGN_SCORE_MIN + tis_range_max = BENIGN_SCORE_MAX + tis_score = tis_range_min + (tis_range_max-tis_range_min) * 1.0 * (threat_score-range_min) / (range_max - range_min) + elif (threat_score>=26 and threat_score<=74): + tis_range_min = SUSPICIOUS_SCORE_MIN + tis_range_max = SUSPICIOUS_SCORE_MAX + tis_score = tis_range_min + (tis_range_max-tis_range_min) * 1.0 * (threat_score-range_min) / (range_max - range_min) + elif (threat_score>=75 and threat_score<=100): + tis_range_min = MALICIOUS_SCORE_MIN + tis_range_max = MALICIOUS_SCORE_MAX + tis_score = tis_range_min + (tis_range_max-tis_range_min) * 1.0 * (threat_score-range_min) / (range_max - range_min) + else: + tis_score = UNKNOWN_SCORE_MIN + + if tis_score > range_max: + tis_score = range_max + + threat_score = {"threat_score": round(tis_score, 1)} + + return threat_score + + def get_description(self, score): + indicator_type = self.get_indicator_types(score) + description = "IP is following into " + indicator_type + " category" + return {"description": description} + + + def create_indicator_object(self, *properties): + indicator_object = {} + for prop in properties: + if prop is not None: + for key, value in prop.items(): + indicator_object[key] = value + return indicator_object + + + def get_threat_report(self, data): + report = {'x_ibm_original_threat_feed_data': {'full': data['report']}} + return report + + def get_threat_attributes(self, data): + threat_attribute_report = [] + if 'report' in data: + full_report = data['report'][0] + full_fields = { + 'ipAddress': 'IP Address', + 'isPublic': 'Is Public', + 'isWhitelisted': 'Is Whitelisted', + 'totalReports': 'Total Reports', + 'numDistinctUsers': 'Number of Distinct Users', + 'lastReportedAt': 'Last Reported At', + 'usageType': 'Usage Type', + 'domain': 'Domain' + } + + for attribute_keys, attribute_value in full_fields.items(): + if attribute_keys in full_report: + threat_attribute_report += create_attributes(attribute_value, full_report[attribute_keys]) + + + return {'threat_attributes' : threat_attribute_report} if threat_attribute_report else None + + def translate_results(self, data_source, data): + """ + Translates JSON data into STIX results based on a mapping file + :param data: JSON formatted data to translate into STIX format + :type data: str + :param mapping: The mapping file path to use as instructions on how to translate the given JSON data to STIX. + Defaults the path to whatever is passed into the constructor for JSONToSTIX (This should be the to_stix_map.json in the module's json directory) + :type mapping: str (filepath) + :return: STIX formatted results + :rtype: str + """ + + json_data = data[0] + + ANALYZER_NAME = "AbuseIPDB_Connector" + data_source['id'] = ANALYZER_NAME + + # Add Namespace + try: + uuid.UUID(json_data["namespace"]) + except ValueError: + raise ValueError("Namespace is not valid UUID") + NAMESPACE = json_data["namespace"] + data_source['name'] = ANALYZER_NAME + pattern = self.get_pattern_from_json(json_data) + external_reference = self.get_external_reference_from_json(json_data) + indicator_types = self.get_optional_values(json_data['report'][0]['abuseConfidenceScore']) + description = self.get_description(json_data['report'][0]['abuseConfidenceScore']) + threat_score = self.get_threat_score(json_data['report'][0]['abuseConfidenceScore']) + threat_attributes = self.get_threat_attributes(json_data) + indicator_name = {'name': json_data['data']} + indicator_object = self.create_indicator_object(pattern, external_reference, indicator_types, description, indicator_name) + + #json_data['report']['full'] = json_data['report'] + + + # Create STIX Bundle and add SDOs + sdo_translator_object = sdo_translator.SdoTranslator(self.options) + stix_bundle = sdo_translator_object.create_stix_bundle() + + + # Add Identity SDO + identity_object = sdo_translator_object.create_identity_sdo(data_source, NAMESPACE) + stix_bundle['objects'] += identity_object + + # Add extension-definition SDO + toplevel_properties = ['x_ibm_original_threat_feed_data', 'threat_score'] + if (threat_attributes): + toplevel_properties.append('threat_attributes') + # extension property to get the attribute list from the enrich info + nested_properties = [] + + extension_object = sdo_translator_object.create_extension_sdo(identity_object[0], NAMESPACE, nested_properties, toplevel_properties=toplevel_properties) + stix_bundle['objects'] += extension_object + + # Add Indicator SDO + extension_id = extension_object[0]['id'] + identity_id = identity_object[0]['id'] + report = self.get_threat_report(json_data) + + nested_indicator = [] + top_indicator = [threat_score, report] + if (threat_attributes): + top_indicator.append(threat_attributes) + + indicator_stix_object = sdo_translator_object.create_indicator_sdo(indicator_object, identity_id, extension_id, nested_indicator, top_indicator) + stix_bundle['objects'] += indicator_stix_object + + return stix_bundle \ No newline at end of file diff --git a/stix_shifter_modules/abuseipdb/stix_translation/sdo_translator.py b/stix_shifter_modules/abuseipdb/stix_translation/sdo_translator.py new file mode 100644 index 000000000..966751ed2 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/stix_translation/sdo_translator.py @@ -0,0 +1,21 @@ +from stix_shifter_utils.normalization.BaseNormalization import BaseNormalization + +class SdoTranslator(BaseNormalization): + def create_sighting_sdo(self, sighting_object, indicator_id): + return super().create_sighting_sdo(sighting_object, indicator_id) + + def create_infrastructure_object_sdo(self, infrastructure_object, enriched_ioc, indicator_id): + return super().create_infrastructure_object_sdo(infrastructure_object, enriched_ioc, indicator_id) + + def create_malware_sdo(self, malware_object, indicator_id, enriched_ioc): + return super().create_malware_sdo(malware_object, indicator_id, enriched_ioc) + + def create_identity_sdo(self, data_source, namespace): + return super().create_identity_sdo(data_source, namespace) + + def create_extension_sdo(self, identity_object, namespace, nested_properties, toplevel_properties): + # Create an extension-definition object to be used in conjunction with STIX Indicator object + return super().create_extension_sdo(identity_object, namespace, nested_properties, toplevel_properties) + + def create_indicator_sdo(self, indicator_object, identity_id, extension_id, nested_properties, top_properties=None): + return super().create_indicator_sdo(indicator_object, identity_id, extension_id, nested_properties, top_properties) diff --git a/stix_shifter_modules/abuseipdb/stix_transmission/__init__.py b/stix_shifter_modules/abuseipdb/stix_transmission/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/stix_shifter_modules/abuseipdb/stix_transmission/api_client.py b/stix_shifter_modules/abuseipdb/stix_transmission/api_client.py new file mode 100644 index 000000000..d079f96c8 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/stix_transmission/api_client.py @@ -0,0 +1,66 @@ +from stix_shifter_utils.stix_transmission.utils.RestApiClientAsync import RestApiClientAsync +import json + + +class APIClient(): + + def __init__(self, connection, configuration): + headers = dict() + auth = configuration.get('auth') + headers['Key'] = auth.get('key') + headers['Accept'] = "application/json" + host = "api.abuseipdb.com" + self.namespace = connection.get('namespace') + self.client = RestApiClientAsync(host=host, port=None, headers=headers) + + async def ping_abuseipdb(self): + # Pings the data source + response = await query_ip(self, "118.25.6.39") + return response + + async def get_search_results(self, query_expression): + # Queries the data source + # extract the data + self.data_type = query_expression['dataType'] + self.data = query_expression['data'] + if self.data_type == 'ip': + rep = await query_ip(self, self.data) + return rep, self.namespace + else: + return {"code": 401, "error": "IoC Type not supported"}, self.namespace + + async def delete_search(self, search_id): + # Optional since this may not be supported by the data source API + # Delete the search + return {"code": 200, "success": True} + +async def query_ip(self, data): + + response_data = dict() + endpoint_uri = f'api/v2/check?ipAddress={data}' + try: + response = await self.client.call_api(endpoint_uri, 'GET') + json_data = json.loads(response.read().decode('utf-8')) + + if response and response.code == 200 and 'data' in json_data: + response_data["code"] = response.code + response_data["data"] = [json_data['data']] + return response_data + elif 'errors' in json_data: + if len(json_data['errors']) > 0: + error_data = json_data['errors'][0] + response_data['error'] = error_data['detail'] + response_data['code'] = error_data['status'] + else: + response_data['error'] = 'API Access error' + response_data['code'] = response.code + return response_data + else: + response_data['error'] = 'API Access error' + response_data['code'] = response.code + return response_data + except Exception as e: + response_data["code"] = 500 + response_data["error"] = "OS error: {0}".format(e) + + return response_data diff --git a/stix_shifter_modules/abuseipdb/stix_transmission/delete_connector.py b/stix_shifter_modules/abuseipdb/stix_transmission/delete_connector.py new file mode 100644 index 000000000..111f8a786 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/stix_transmission/delete_connector.py @@ -0,0 +1,24 @@ +from stix_shifter_utils.modules.base.stix_transmission.base_delete_connector import BaseDeleteConnector +from stix_shifter_utils.utils.error_response import ErrorResponder +from stix_shifter_utils.utils import logger + +class DeleteConnector(BaseDeleteConnector): + def __init__(self, api_client): + self.api_client = api_client + self.logger = logger.set_logger(__name__) + + async def delete_query_connection(self, search_id): + try: + response_dict = await self.api_client.delete_search(search_id) + response_code = response_dict["code"] + + # Construct a response object + return_obj = dict() + if response_code == 200: + return_obj['success'] = response_dict['success'] + else: + ErrorResponder.fill_error(return_obj, response_dict, ['message']) + return return_obj + except Exception as err: + self.logger.error('error when deleting search {}:'.format(err)) + raise diff --git a/stix_shifter_modules/abuseipdb/stix_transmission/error_mapper.py b/stix_shifter_modules/abuseipdb/stix_transmission/error_mapper.py new file mode 100644 index 000000000..f7bb6bc76 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/stix_transmission/error_mapper.py @@ -0,0 +1,52 @@ +from stix_shifter_utils.utils.error_mapper_base import ErrorMapperBase +from stix_shifter_utils.utils.error_response import ErrorCode +from stix_shifter_utils.utils import logger + +error_mapping = { + # These are only examples. Change the keys to reflect the error codes that come back from the data source API. + # search does not exist + 1002: ErrorCode.TRANSMISSION_SEARCH_DOES_NOT_EXISTS, + # The search cannot be created. The requested search ID that was provided in the query expression is already in use. + # Please use a unique search ID (or allow one to be generated). + 1004: ErrorCode.TRANSMISSION_MODULE_DEFAULT_ERROR.value, + # A request parameter is not valid + 1005: ErrorCode.TRANSMISSION_INVALID_PARAMETER, + # The server might be temporarily unavailable or offline. Please try again later. + 1010: ErrorCode.TRANSMISSION_REMOTE_SYSTEM_IS_UNAVAILABLE, + # An error occurred during the attempt + 1020: ErrorCode.TRANSMISSION_MODULE_DEFAULT_ERROR.value, + #error in query + 2000: ErrorCode.TRANSMISSION_QUERY_PARSING_ERROR, + + 400: ErrorCode.TRANSMISSION_QUERY_PARSING_ERROR, + + 403: ErrorCode. TRANSMISSION_FORBIDDEN, + + 401: ErrorCode.TRANSMISSION_AUTH_CREDENTIALS, + + 422: ErrorCode.TRANSMISSION_QUERY_LOGICAL_ERROR +} + + +class ErrorMapper(): + + DEFAULT_ERROR = ErrorCode.TRANSMISSION_MODULE_DEFAULT_ERROR + logger = logger.set_logger(__name__) + + @staticmethod + def set_error_code(json_data, return_obj): + code = None + try: + code = int(json_data['code']) + except Exception: + pass + + error_code = ErrorMapper.DEFAULT_ERROR + + if code in error_mapping: + error_code = error_mapping[code] + + if error_code == ErrorMapper.DEFAULT_ERROR: + ErrorMapper.logger.error("failed to map: " + str(json_data)) + + ErrorMapperBase.set_error_code(return_obj, error_code) diff --git a/stix_shifter_modules/abuseipdb/stix_transmission/ping_connector.py b/stix_shifter_modules/abuseipdb/stix_transmission/ping_connector.py new file mode 100644 index 000000000..7b71c8d48 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/stix_transmission/ping_connector.py @@ -0,0 +1,26 @@ +from stix_shifter_utils.modules.base.stix_transmission.base_ping_connector import BasePingConnector +from stix_shifter_utils.utils.error_response import ErrorResponder +from stix_shifter_utils.utils import logger + +class PingConnector(BasePingConnector): + def __init__(self, api_client): + self.api_client = api_client + self.logger = logger.set_logger(__name__) + self.connector = __name__.split('.')[1] + + async def ping_connection(self): + try: + response_dict = await self.api_client.ping_abuseipdb() + response_code = response_dict['code'] + return_obj = dict() + if response_code == 200: + return_obj['success'] = True + return_obj['code'] = response_code + return_obj['connector'] = self.connector + else: + ErrorResponder.fill_error(return_obj, response_dict, ['message'], connector=self.connector) + self.logger.error(return_obj) + return return_obj + except Exception as err: + self.logger.error('error when pinging datasource {}:'.format(err)) + raise diff --git a/stix_shifter_modules/abuseipdb/stix_transmission/results_connector.py b/stix_shifter_modules/abuseipdb/stix_transmission/results_connector.py new file mode 100644 index 000000000..4824b6dde --- /dev/null +++ b/stix_shifter_modules/abuseipdb/stix_transmission/results_connector.py @@ -0,0 +1,37 @@ +from stix_shifter_utils.modules.base.stix_transmission.base_json_results_connector import BaseJsonResultsConnector +from stix_shifter_utils.utils.error_response import ErrorResponder +from stix_shifter_utils.utils import logger +import json + +class ResultsConnector(BaseJsonResultsConnector): + def __init__(self, api_client): + self.api_client = api_client + self.logger = logger.set_logger(__name__) + + def permalink(self, input): + url = "https://www.abuseipdb.com/check/"+input['data'] + + return {"source_name":"AbuseIPDB_Connector","url":url} + + async def create_results_connection(self, query_data, offset, length): + try: + query_data = query_data.replace('\'', "\"") + query_json = json.loads(query_data) + response, namespace = await self.api_client.get_search_results(query_json) + response_code = response['code'] + return_obj = dict() + if response_code == 200: + response['report'] = response['data'] + response['data'] = query_json['data'] + response['dataType'] = query_json['dataType'] + response['external_reference'] = self.permalink(response) + response['namespace'] = namespace + return_obj['success'] = True + return_obj['data'] = [response] + else: + ErrorResponder.fill_error(return_obj, response) + return_obj['error'] = response['error'] + return return_obj + except Exception as err: + self.logger.error('error when creating search: {}'.format(err)) + raise diff --git a/stix_shifter_modules/abuseipdb/tests/stix_translation/json/to_stix_map.json b/stix_shifter_modules/abuseipdb/tests/stix_translation/json/to_stix_map.json new file mode 100644 index 000000000..0e0dcd235 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/tests/stix_translation/json/to_stix_map.json @@ -0,0 +1,3 @@ +{ + +} \ No newline at end of file diff --git a/stix_shifter_modules/abuseipdb/tests/stix_translation/test_abuseipdb_json_to_stix.py b/stix_shifter_modules/abuseipdb/tests/stix_translation/test_abuseipdb_json_to_stix.py new file mode 100644 index 000000000..61eb14699 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/tests/stix_translation/test_abuseipdb_json_to_stix.py @@ -0,0 +1,161 @@ +import json +import unittest +from functools import wraps +from stix_shifter_modules.abuseipdb.entry_point import EntryPoint + +MODULE = "abuseipdb" +DATA_SOURCE = {"type": "identity", "id": "identity--000810fd-722d-52a0-bb9e-cd852e6ba394", "name": "AbuseIPDB_Connector", + "identity_class": "system"} +options = {'stix_validator':True} +entry_point = EntryPoint(options=options) +ip_value = "194.147.78.155" +extension_types = ["toplevel-property-extension"] +extension_properties = ["x_ibm_original_threat_feed_data","threat_score","threat_attributes"] +query_pattern = "[ipv4-addr:value='"+ip_value+"']" +transmitQueryData = { + "data": [ + { + "code": 200, + "success": True, + "report": [{ + "ipAddress": "194.147.78.155", + "isPublic": True, + "ipVersion": 4, + "isWhitelisted": '', + "abuseConfidenceScore": 0, + "countryCode": "RU", + "usageType": "Data Center/Web Hosting/Transit", + "isp": "LIR LLC", + "domain": "lir.am", + "hostnames": [ + "free.ds" + ], + "totalReports": 0, + "numDistinctUsers": 0, + "lastReportedAt": '' + }], + "data": ip_value, + "dataType": "ip", + "namespace": "8bf42ea1-e30d-41a2-a3ee-1aec759cf409", + "external_reference": { + "source_name": "AbuseIPDP_1_0", + "url": "https://www.abuseipdb.com/check/194.147.78.155" + } + } + ] + } + +class TestAbuseipdbResultsToStix(unittest.TestCase): + """ + class to perform unit test case for translate results + """ + + def __init__(self,*args, **kwargs): + super(TestAbuseipdbResultsToStix, self).__init__(*args, **kwargs) + self.result_translator = entry_point.create_default_results_translator(dialect='default') + self.result_bundle = self.result_translator.translate_results(data_source=DATA_SOURCE, data=transmitQueryData['data']) + self.result_bundle_objects = self.result_bundle['objects'] + self.extension_property_names = [] + + @staticmethod + def exists(obj, chain): + """ + Check if the nested keys exist in the dictionary or not + """ + _key = chain.pop(0) + if _key in obj: + return TestAbuseipdbResultsToStix.exists(obj[_key], chain) if chain else obj[_key] + + @staticmethod + def get_first(itr, constraint): + return next( + (obj for obj in itr if constraint(obj)), + None + ) + + @staticmethod + def get_first_of_type(itr, typ): + return TestAbuseipdbResultsToStix.get_first(itr, lambda o: type(o) == dict and o.get('type') == typ) + + def get_extension_property_keys(self, obj): + for k, v in obj.items(): + if isinstance(v,dict) and not "key" in v: + self.get_extension_property_keys(v) + else: + self.extension_property_names.append(v["key"]) + return self.extension_property_names + + def check_stix_bundle_type(func): + """ + decorator function to convert the data source query result into stix bundle + """ + @wraps(func) + def wrapper_func(self, *args, **kwargs): + assert self.result_bundle['type'] == 'bundle' + return func(self, *args, **kwargs) + return wrapper_func + + @check_stix_bundle_type + def test_stix_identity_prop(self): + """ + to test the identity stix object properties + """ + stix_identity = TestAbuseipdbResultsToStix.get_first_of_type(self.result_bundle_objects, DATA_SOURCE['type']) + assert 'type' in stix_identity and stix_identity['type'] == DATA_SOURCE['type'] + assert 'name' in stix_identity and stix_identity['name'] == DATA_SOURCE['name'] + assert 'identity_class' in stix_identity and stix_identity['identity_class'] == DATA_SOURCE['identity_class'] + + @check_stix_bundle_type + def test_stix_extension_prop(self): + """ + to test the extension stix object properties + """ + sdo_type = 'extension-definition' + stix_extension = TestAbuseipdbResultsToStix.get_first_of_type(self.result_bundle_objects, sdo_type) + assert 'type' in stix_extension and stix_extension['type'] == sdo_type + assert 'name' in stix_extension + assert 'version' in stix_extension + assert 'extension_types' in stix_extension and stix_extension['extension_types'] == extension_types + assert 'extension_properties' in stix_extension and stix_extension['extension_properties'] == extension_properties + + @check_stix_bundle_type + def test_stix_indicator_prop(self): + """ + to test the indicator stix object properties + """ + sdo_type = 'indicator' + stix_indicator = TestAbuseipdbResultsToStix.get_first_of_type(self.result_bundle_objects, sdo_type) + assert 'type' in stix_indicator and stix_indicator['type'] == sdo_type + assert 'pattern' in stix_indicator and stix_indicator['pattern'] == query_pattern + assert 'valid_from' in stix_indicator + assert 'created' in stix_indicator + assert 'modified' in stix_indicator + assert 'indicator_types' in stix_indicator and len(stix_indicator['indicator_types']) == 1 \ + and stix_indicator['indicator_types'][0] == 'benign' + assert 'external_references' in stix_indicator and len(stix_indicator['external_references']) > 0 + assert 'url' in stix_indicator['external_references'][0] + assert stix_indicator['external_references'][0]['url'] == 'https://www.abuseipdb.com/check/194.147.78.155' + assert 'threat_score' in stix_indicator + assert 'threat_attributes' in stix_indicator + + @check_stix_bundle_type + def test_stix_indicator_extensions_prop(self): + """ + to test the indicator stix object extensions properties + """ + stix_indicator = TestAbuseipdbResultsToStix.get_first_of_type( + self.result_bundle_objects, 'indicator') + + assert 'x_ibm_original_threat_feed_data' in stix_indicator + extension_property = extension_properties[0] + property_name1 = "x_ibm_original_threat_feed_data.full" + is_exist = TestAbuseipdbResultsToStix.exists( + stix_indicator, property_name1.split(".")) + assert is_exist is not None + assert stix_indicator[extension_property]["full"][0] + property_name2 = "threat_attributes" + is_exist1 = TestAbuseipdbResultsToStix.exists( + stix_indicator, property_name2.split(".")) + assert is_exist1 is not None + assert stix_indicator['threat_attributes'] + diff --git a/stix_shifter_modules/abuseipdb/tests/stix_translation/test_abuseipdb_stix_to_query.py b/stix_shifter_modules/abuseipdb/tests/stix_translation/test_abuseipdb_stix_to_query.py new file mode 100644 index 000000000..46af63d26 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/tests/stix_translation/test_abuseipdb_stix_to_query.py @@ -0,0 +1,111 @@ +import unittest +from stix_shifter.stix_translation import stix_translation + +translation = stix_translation.StixTranslation() +MODULE = 'abuseipdb' + +def _test_query_assertions(query, queries): + assert isinstance(query, dict) is True + assert 'queries' in query + assert query['queries'] == [queries] + + +class TestAbuseipdbStixToQuery(unittest.TestCase, object): + + @staticmethod + def get_query_translation_result(stix_pattern, options={}): + return translation.translate(MODULE, 'query', MODULE, stix_pattern, options) + + def test_ipv4_query(self): + stix_pattern = "[ipv4-addr:value='194.147.78.155']" + query = TestAbuseipdbStixToQuery.get_query_translation_result(stix_pattern) + queries = "{'data': '194.147.78.155', 'dataType': 'ip'}" + _test_query_assertions(query, queries) + + def test_ipv6_query(self): + stix_pattern = "[ipv6-addr:value = '3001:0:0:0:0:0:0:2']" + query = TestAbuseipdbStixToQuery.get_query_translation_result(stix_pattern) + queries = "{'data': '3001:0:0:0:0:0:0:2', 'dataType': 'ip'}" + _test_query_assertions(query, queries) + + def test_multi_ipv4_expression_query(self): + stix_pattern = "([ipv4-addr:value = '194.147.78.155'] OR [ipv4-addr:value = '198.51.100.10'])" + query = TestAbuseipdbStixToQuery.get_query_translation_result(stix_pattern) + queries = "{'data': '198.51.100.10', 'dataType': 'ip'}" + _test_query_assertions(query, queries) + + def test_multi_expression_query(self): + stix_pattern = "[domain-name:value='test.com' OR ipv4-addr:value='194.147.78.155']" + query = TestAbuseipdbStixToQuery.get_query_translation_result(stix_pattern) + queries = "{'data': '194.147.78.155', 'dataType': 'ip'}" + _test_query_assertions(query, queries) + + def test_not_comp_exp(self): + """ + Test with NOT operator + :return: + """ + stix_pattern = "[ipv4-addr:value NOT = '172.31.60.104'] START t'2020-05-01T08:43:10.003Z' " \ + "STOP t'2020-10-30T10:43:10.003Z'" + query = TestAbuseipdbStixToQuery.get_query_translation_result(stix_pattern) + queries = "{'data': '172.31.60.104', 'dataType': 'ip'}" + _test_query_assertions(query, queries) + + def test_in_comp_exp(self): + """ + Test with IN operator + """ + stix_pattern = "[ipv4-addr:value IN ('172.31.60.104','94.147.78.155')]" + query = TestAbuseipdbStixToQuery.get_query_translation_result(stix_pattern) + queries = "{'data': '(172.31.60.104 OR 94.147.78.155)', 'dataType': 'ip'}" + _test_query_assertions(query, queries) + + def test_one_obser_is_super_set_operator_network(self): + """ + to test single observation with an un-supported operator + """ + stix_pattern = "[ipv4-addr:value ISSUPERSET '172.217.0.0/24'] " \ + "START t'2019-04-10T08:43:10.003Z' STOP t'2019-04-23T10:43:10.003Z'" + query = TestAbuseipdbStixToQuery.get_query_translation_result(stix_pattern) + assert query['success'] is False + assert query['code'] == 'mapping_error' + + def test_like_comp_exp(self): + """ + Test with LIKE operator + """ + stix_pattern = "[ipv4-addr:value LIKE '172.31.60.104'] START t'2020-10-01T08:43:10.003Z' " \ + "STOP t'2020-10-30T10:43:10.003Z'" + query = TestAbuseipdbStixToQuery.get_query_translation_result(stix_pattern) + queries = "{'data': '%172.31.60.104%', 'dataType': 'ip'}" + _test_query_assertions(query, queries) + + def test_matches_comp_exp(self): + """ + Test with MATCHES operator + :return: + """ + stix_pattern = "[ipv4-addr:value MATCHES '\\\\d+'] START t'2020-10-01T08:43:10.003Z' STOP " \ + "t'2020-10-30T10:43:10.003Z'" + query = TestAbuseipdbStixToQuery.get_query_translation_result(stix_pattern) + queries = "{'data': '.*\\\\\\\\d+.*', 'dataType': 'ip'}" + _test_query_assertions(query, queries) + def test_not_comp_exp(self): + """ + Test with NOT operator + :return: + """ + stix_pattern = "[ipv4-addr:value NOT = '172.31.60.104'] START t'2020-05-01T08:43:10.003Z' " \ + "STOP t'2020-10-30T10:43:10.003Z'" + query = TestAbuseipdbStixToQuery.get_query_translation_result(stix_pattern) + queries = "{'data': '172.31.60.104', 'dataType': 'ip'}" + _test_query_assertions(query, queries) + + def test_in_comp_exp(self): + """ + Test with IN operator + """ + stix_pattern = "[ipv4-addr:value IN ('172.31.60.104','94.147.78.155')]" + query = TestAbuseipdbStixToQuery.get_query_translation_result(stix_pattern) + queries = "{'data': '(172.31.60.104 OR 94.147.78.155)', 'dataType': 'ip'}" + _test_query_assertions(query, queries) \ No newline at end of file diff --git a/stix_shifter_modules/abuseipdb/tests/stix_transmission/test_abuseipdb_transmit.py b/stix_shifter_modules/abuseipdb/tests/stix_transmission/test_abuseipdb_transmit.py new file mode 100644 index 000000000..1bd6a1955 --- /dev/null +++ b/stix_shifter_modules/abuseipdb/tests/stix_transmission/test_abuseipdb_transmit.py @@ -0,0 +1,205 @@ +from stix_shifter.stix_transmission import stix_transmission +from unittest.mock import patch +from collections import namedtuple +import unittest + +MODULE_NAME = "abuseipdb" +namespace = "8bf42ea1-e30d-41a2-a3ee-1aec759cf409" + +SAMPLE_DATA = '{"data": "34.102.136.180", "dataType": "ip"}' + +DATA = { + "data": + [ + { + "ipAddress": "34.102.136.180", + "isPublic": True, + "ipVersion": 4, + "isWhitelisted": False, + "abuseConfidenceScore": 42, + "countryCode": "US", + "usageType": "Data Center/Web Hosting/Transit", + "isp": "Google LLC", + "domain": "google.com", + "hostnames": [ + "180.136.102.34.bc.googleusercontent.com" + ], + "totalReports": 5, + "numDistinctUsers": 5, + "lastReportedAt": "2022-03-07T15:24:47+00:00" + } + ], + "code": 200, +} + +connection = { + "namespace":namespace +} +config = { + "auth": { + "key": "k" + } +} +Response = namedtuple('Response', ['data', 'response_code']) + + + +class MockHttpResponse: + def __init__(self, string): + self.string = string + + def decode(self, string): + return self.string + +class AbuseIPDBHttpResponse: + def __init__(self, obj, response_code): + self.code = response_code + self.object = obj + + def read(self): + return self.object + + +@patch('stix_shifter_modules.abuseipdb.stix_transmission.api_client.APIClient.__init__', autospec=True) +class TestAbuseIPDBConnection(unittest.TestCase, object): + @patch('stix_shifter_modules.abuseipdb.stix_transmission.api_client.APIClient.ping_abuseipdb') + def test_abuseipdb_ping(self, mock_ping_response, mock_api_client): + mock_api_client.return_value = None + mock_ping_response.return_value = {'success': 'true', 'code': 200} + + transmission = stix_transmission.StixTransmission( + MODULE_NAME, connection, config) + ping_response = transmission.ping() + + assert ping_response is not None + assert ping_response['success'] + + @patch('stix_shifter_modules.abuseipdb.stix_transmission.api_client.APIClient.ping_abuseipdb') + def test_abuseipdb_ping_exception(self, mock_ping_response, mock_api_client): + response = MockHttpResponse('/exception') + mock_api_client.return_value = None + mock_ping_response.return_value = AbuseIPDBHttpResponse(response, 400) + mock_ping_response.side_effect = Exception('an error occured retriving ping information') + + transmission = stix_transmission.StixTransmission(MODULE_NAME, connection, config) + ping_response = transmission.ping() + print(ping_response) + assert ping_response is not None + assert ping_response['success'] is False + + @patch('stix_shifter_modules.abuseipdb.stix_transmission.api_client.APIClient.get_search_results', autospec=True) + def test_abuseipdb_results(self, mock_result_connection, mock_api_client): + + mock_api_client.return_value = None + mock_result_connection.return_value = DATA.copy(), namespace + + transmission = stix_transmission.StixTransmission( + MODULE_NAME, connection, config) + query_response = transmission.query(SAMPLE_DATA) + + assert query_response is not None + assert 'search_id' in query_response + assert query_response['search_id'] == SAMPLE_DATA + + search_results_response = transmission.results( + query_response['search_id'], 0, 9) + assert 'success' in search_results_response and search_results_response['success'] is True + assert 'report' in search_results_response['data'][0] + report = search_results_response['data'][0]['report'] + assert 'abuseConfidenceScore' in report[0] + assert 'ipAddress' in report[0] + + @patch('stix_shifter_modules.abuseipdb.stix_transmission.api_client.APIClient.get_search_results', autospec=True) + def test_abuseipdb_results_error(self, mock_result_connection, mock_api_client): + mock_api_client.return_value = None + mock_data = DATA = { + "error": "Invalid", + "success": False, + "code": 400 + } + mock_result_connection.return_value = mock_data, namespace + mock_result_connection.side_effect = Exception('an error occured retriving ping information') + transmission = stix_transmission.StixTransmission(MODULE_NAME, connection, config) + query_response = transmission.query(SAMPLE_DATA) + + assert query_response is not None + assert 'search_id' in query_response + assert query_response['search_id'] == SAMPLE_DATA + + search_results_response = transmission.results(query_response['search_id'], 0, 9) + + assert 'success' in search_results_response + assert search_results_response['success'] is False + assert 'code' in search_results_response, search_results_response['code'] == 'invalid_query' + + def test_abuseipdb_status(self, mock_api_client): + mock_api_client.return_value = None + transmission = stix_transmission.StixTransmission(MODULE_NAME, connection, config) + query_response = transmission.status(SAMPLE_DATA) + assert query_response is not None + assert 'success' in query_response, query_response['success'] is True + assert 'status' in query_response, query_response['status'] == 'COMPLETED' + assert 'progress' in query_response, query_response['progress'] == 100 + + @patch('stix_shifter_utils.modules.base.stix_transmission.base_sync_connector.BaseSyncConnector.create_status_connection', autospec=True) + def test_abuseipdb_status_exception(self, mock_status_response, mock_api_client): + error_msg = 'an error occured while checking the status' + mock_api_client.return_value = None + mock_status_response.return_value = {'status':'FAILED', 'success':False} + mock_status_response.side_effect = Exception(error_msg) + transmission = stix_transmission.StixTransmission(MODULE_NAME, connection, config) + query_response = transmission.status(SAMPLE_DATA) + assert query_response is not None + assert 'success' in query_response, query_response['success'] is False + assert 'error' in query_response, query_response['error'] == error_msg + + @patch('stix_shifter_utils.modules.base.stix_transmission.base_sync_connector.BaseSyncConnector.create_query_connection', autospec=True) + def test_abuseipdb_query_exception(self, mock_query_response, mock_api_client): + error_msg = 'cannot create a query connection' + mock_api_client.return_value = None + mock_query_response.return_value = {'search_id':'', 'success':False} + mock_query_response.side_effect = Exception(error_msg) + transmission = stix_transmission.StixTransmission(MODULE_NAME, connection, config) + query_response = transmission.query(SAMPLE_DATA) + assert query_response is not None + assert 'success' in query_response, query_response['success'] is False + assert 'error' in query_response, query_response['error'] == error_msg + + def test_abuseipdb_is_async_query(self, mock_api_client): + mock_api_client.return_value = None + transmission = stix_transmission.StixTransmission("abc", connection, config) + is_async_result = transmission.is_async() + assert 'success' in is_async_result + assert is_async_result['success'] is False + assert 'code' in is_async_result, is_async_result['code'] == 'unknown' + + @patch('stix_shifter_utils.utils.base_entry_point.BaseEntryPoint.is_async', autospec=True) + def test_abuseipdb_is_async_query_exception(self, mock_async_response, mock_api_client): + error_msg = 'an error occured while checking the if the query is async' + mock_api_client.return_value = None + mock_async_response.return_value = False + mock_async_response.side_effect = Exception(error_msg) + transmission = stix_transmission.StixTransmission(MODULE_NAME, connection, config) + query_response = transmission.is_async() + assert query_response is not None + assert 'success' in query_response, query_response['success'] is False + assert 'error' in query_response, query_response['error'] == error_msg + + def test_delete_query(self, mock_api_client): + mock_api_client.return_value = None + transmission = stix_transmission.StixTransmission(MODULE_NAME, connection, config) + query_response = transmission.delete(SAMPLE_DATA) + assert query_response is not None + assert 'success' in query_response + assert query_response['success'] is True + + @patch('stix_shifter_utils.utils.base_entry_point.BaseEntryPoint.delete_query_connection', autospec=True) + def test_delete_query_exception(self, mock_delete_response, mock_api_client): + error_msg = 'an error occured while checking the if the query is deleted' + mock_api_client.return_value = None + mock_delete_response.return_value = False + mock_delete_response.side_effect = Exception(error_msg) + transmission = stix_transmission.StixTransmission(MODULE_NAME, connection, config) + query_response = transmission.delete("") + assert 'success' in query_response, query_response['success'] is False + assert 'error' in query_response, query_response['error'] == error_msg