From 1e89e283be2134a5d417301e0f872d4c63eb0f63 Mon Sep 17 00:00:00 2001 From: tingdai Date: Fri, 27 Jan 2023 10:26:43 -0500 Subject: [PATCH 01/13] support large query with elastic search_after pagination --- .../stix_transmission/api_client.py | 71 +++++++++++++++++-- .../stix_transmission/connector.py | 65 +++++++++++++++++ 2 files changed, 131 insertions(+), 5 deletions(-) diff --git a/stix_shifter_modules/elastic_ecs/stix_transmission/api_client.py b/stix_shifter_modules/elastic_ecs/stix_transmission/api_client.py index 20559d18b..a781acfd6 100644 --- a/stix_shifter_modules/elastic_ecs/stix_transmission/api_client.py +++ b/stix_shifter_modules/elastic_ecs/stix_transmission/api_client.py @@ -25,9 +25,13 @@ def __init__(self, connection, configuration): self.indices = ",".join(self.indices) if self.indices: - self.endpoint = self.indices + '/' +'_search' + self.endpoint = self.indices + '/' + '_search' + self.pit_endpoint = self.indices + '/' + '_pit' + self.setting_endpoint = self.indices + '/' + '_settings' else: self.endpoint = '_search' + self.pit_endpoint = '_pit' + self.setting_endpoint = '_settings' if auth: if 'username' in auth and 'password' in auth: @@ -46,11 +50,11 @@ def __init__(self, connection, configuration): cert_verify=connection.get('selfSignedCert', True), sni=connection.get('sni', None) ) - + self.timeout = connection['options'].get('timeout') def ping_box(self): - return self.client.call_api(self.PING_ENDPOINT, 'GET',timeout=self.timeout) + return self.client.call_api(self.PING_ENDPOINT, 'GET', timeout=self.timeout) def run_search(self, query_expression, offset=None, length=DEFAULT_LIMIT): headers = dict() @@ -91,11 +95,12 @@ def run_search(self, query_expression, offset=None, length=DEFAULT_LIMIT): data = { "_source": { "includes": ["@timestamp", "source.*", "destination.*", "event.*", "client.*", "server.*", - "host.*","network.*", "process.*", "user.*", "file.*", "url.*", "registry.*", "dns.*", "tags"] + "host.*", "network.*", "process.*", "user.*", "file.*", "url.*", "registry.*", "dns.*", + "tags"] }, "query": { "query_string": { - "query": query_expression + "query": query_expression } } } @@ -104,3 +109,59 @@ def run_search(self, query_expression, offset=None, length=DEFAULT_LIMIT): self.logger.debug("URL data: " + json.dumps(data)) return self.client.call_api(endpoint, 'GET', headers, data=json.dumps(data), timeout=self.timeout) + + def search_pagination(self, query_expression, lastsortvalue=None, length=DEFAULT_LIMIT): + headers = dict() + headers['Content-Type'] = 'application/json' + endpoint = self.endpoint + # add size value + if length is not None: + endpoint = "{}?size={}".format(endpoint, length) + + data = { + "_source": { + "includes": ["@timestamp", "source.*", "destination.*", "event.*", "client.*", "server.*", + "host.*", "network.*", "process.*", "user.*", "file.*", "url.*", "registry.*", "dns.*", + "tags"] + }, + "query": { + "query_string": { + "query": query_expression + } + }, + "sort": [ + {"@timestamp": "asc"}, + ] + } + + if not (lastsortvalue is None): + extra_data = { + "search_after": [ + lastsortvalue + ] + } + data.update(extra_data) + + self.logger.debug("URL endpoint: " + endpoint) + self.logger.debug("URL data: " + json.dumps(data)) + + return self.client.call_api(endpoint, 'GET', headers, data=json.dumps(data), timeout=self.timeout) + + def get_max_result_window(self): + # GET winlogbeat-*/_settings?include_defaults=true + endpoint = self.setting_endpoint + endpoint = "{}?include_defaults=true".format(endpoint) + return self.client.call_api(endpoint, 'GET', timeout=self.timeout) + + def set_pit(self): + headers = dict() + headers['Content-Type'] = 'application/json' + + # GET PIT + # POST /my-index-000001/_pit?keep_alive=1m + endpoint = "{}?keep_alive=1m&pretty".format(self.pit_endpoint) + + return self.client.call_api(endpoint, 'POST', headers, timeout=self.timeout) + + + diff --git a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py index 81b44e54a..e5293f8e9 100644 --- a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py +++ b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py @@ -14,6 +14,7 @@ def __init__(self, connection, configuration): self.api_client = APIClient(connection, configuration) self.logger = logger.set_logger(__name__) self.connector = __name__.split('.')[1] + self.max_result_window = 10000 def _handle_errors(self, response, return_obj): response_code = response.code @@ -35,6 +36,8 @@ def ping_connection(self): response_txt = None return_obj = dict() try: + # test the pit + #print(self.set_point_in_time()) response = self.api_client.ping_box() return self._handle_errors(response, return_obj) except Exception as e: @@ -44,11 +47,60 @@ def ping_connection(self): else: raise e + def settings_connection(self): + response_txt = None + return_obj = dict() + try: + response = self.api_client.get_max_result_window() + return_obj = self._handle_errors(response, return_obj) + if (return_obj['success']): + response_json = json.loads(return_obj["data"]) + max_result_windows = set() + if not (response_json is None): + for _, item_json in response_json.items(): + max_res_win = item_json['defaults']['index']['max_result_window'] + max_result_windows.add(max_res_win) + if len(max_result_windows) != 1: + ErrorResponder.fill_error(max_result_windows, message='inconsistent max_result_window settings', connector=self.connector) + self.logger.error('inconsistent max_result_window settings: ' + str(max_result_windows)) + self.max_result_window = int(max_result_windows.pop()) + except Exception as e: + if response_txt is not None: + ErrorResponder.fill_error(return_obj, message='unexpected exception', connector=self.connector) + self.logger.error('can not parse response: ' + str(response_txt)) + else: + raise e + def create_results_connection(self, query, offset, length): response_txt = None return_obj = dict() try: + # offset with -1 to indicate using search after API in ElasticSearch + if offset == -1: + # extract the max_result_window from elasticsearch + self.settings_connection() + return_objs = {'success': True, 'data': []} + while length > 0: + lastsortvalue = None + response = self.api_client.search_pagination(query, lastsortvalue, min(length, self.max_result_window)) + return_obj = self._handle_errors(response, return_obj) + if (return_obj['success']): + response_json = json.loads(return_obj["data"]) + if response_json['hits']: + # and (response_json['hits']['total']['value'] >= 0 or response_json['hits']['total'] >= 0): + self.logger.error("Total # of hits:" + str(response_json['hits']['total'])) + return_obj['data'] = [record['_source'] for record in response_json["hits"]["hits"]] + return_objs['data'].extend(return_obj['data']) + self.logger.error("Total # of records: " + str(len(return_obj['data']))) + lastsortvalue = response_json["hits"]["hits"][-1]['sort'] + length -= self.max_result_window + else: + return return_obj #return the faulty iteration result + else: + break + return return_objs + response = self.api_client.run_search(query, offset, length) return_obj = self._handle_errors(response, return_obj) @@ -67,3 +119,16 @@ def create_results_connection(self, query, offset, length): self.logger.error('can not parse response: ' + str(response_txt)) else: raise e + + def set_point_in_time(self): + response_txt = None + return_obj = dict() + try: + response = self.api_client.set_pit() + return self._handle_errors(response, return_obj) + except Exception as e: + if response_txt is not None: + ErrorResponder.fill_error(return_obj, message='unexpected exception', connector=self.connector) + self.logger.error('can not parse response: ' + str(response_txt)) + else: + raise e From 4b39359f7bd356e45254244e259124dd32cce7e9 Mon Sep 17 00:00:00 2001 From: tingdai Date: Mon, 30 Jan 2023 14:41:01 -0500 Subject: [PATCH 02/13] change implementation by moving the pagination loop from stix-shifter to upper applications(e.g., Kestrel) --- .../stix_transmission/connector.py | 39 +++++++------------ 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py index e5293f8e9..7cac37814 100644 --- a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py +++ b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py @@ -15,6 +15,8 @@ def __init__(self, connection, configuration): self.logger = logger.set_logger(__name__) self.connector = __name__.split('.')[1] self.max_result_window = 10000 + # extract the max_result_window from elasticsearch + self.settings_connection() def _handle_errors(self, response, return_obj): response_code = response.code @@ -71,38 +73,19 @@ def settings_connection(self): else: raise e - def create_results_connection(self, query, offset, length): + def create_results_connection(self, query, offset, length, metadata=None): response_txt = None return_obj = dict() try: # offset with -1 to indicate using search after API in ElasticSearch if offset == -1: - # extract the max_result_window from elasticsearch - self.settings_connection() - return_objs = {'success': True, 'data': []} - while length > 0: - lastsortvalue = None - response = self.api_client.search_pagination(query, lastsortvalue, min(length, self.max_result_window)) - return_obj = self._handle_errors(response, return_obj) - if (return_obj['success']): - response_json = json.loads(return_obj["data"]) - if response_json['hits']: - # and (response_json['hits']['total']['value'] >= 0 or response_json['hits']['total'] >= 0): - self.logger.error("Total # of hits:" + str(response_json['hits']['total'])) - return_obj['data'] = [record['_source'] for record in response_json["hits"]["hits"]] - return_objs['data'].extend(return_obj['data']) - self.logger.error("Total # of records: " + str(len(return_obj['data']))) - lastsortvalue = response_json["hits"]["hits"][-1]['sort'] - length -= self.max_result_window - else: - return return_obj #return the faulty iteration result - else: - break - return return_objs - - response = self.api_client.run_search(query, offset, length) - return_obj = self._handle_errors(response, return_obj) + # pass the last searched value in metadata argument + response = self.api_client.search_pagination(query, metadata, min(length, self.max_result_window)) + return_obj = self._handle_errors(response, return_obj) + else: + response = self.api_client.run_search(query, offset, length) + return_obj = self._handle_errors(response, return_obj) if (return_obj['success']): response_json = json.loads(return_obj["data"]) @@ -111,6 +94,8 @@ def create_results_connection(self, query, offset, length): self.logger.error("Total # of hits:" + str(response_json['hits']['total'])) return_obj['data'] = [record['_source'] for record in response_json["hits"]["hits"]] self.logger.error("Total # of records: " + str(len(return_obj['data']))) + if 'sort' in response_json["hits"]["hits"][-1]: + return_obj['lastsort'] = response_json["hits"]["hits"][-1]['sort'] return return_obj except Exception as e: @@ -120,6 +105,8 @@ def create_results_connection(self, query, offset, length): else: raise e + + def set_point_in_time(self): response_txt = None return_obj = dict() From 8af9893983e14072d20fc4432ab823badfd886e1 Mon Sep 17 00:00:00 2001 From: tingdai Date: Fri, 3 Feb 2023 12:41:37 -0500 Subject: [PATCH 03/13] expose get_pagesize API in stix-transmission module --- .../stix_transmission/stix_transmission.py | 9 +++++++++ .../elastic_ecs/stix_transmission/api_client.py | 4 +--- .../elastic_ecs/stix_transmission/connector.py | 17 +++++++---------- .../base/stix_transmission/base_connector.py | 15 +++++++++++++++ stix_shifter_utils/utils/base_entry_point.py | 10 ++++++++++ 5 files changed, 42 insertions(+), 13 deletions(-) diff --git a/stix_shifter/stix_transmission/stix_transmission.py b/stix_shifter/stix_transmission/stix_transmission.py index 4ba703826..f3ff9b78b 100644 --- a/stix_shifter/stix_transmission/stix_transmission.py +++ b/stix_shifter/stix_transmission/stix_transmission.py @@ -80,6 +80,15 @@ def results_stix(self, search_id, offset, length, data_source, metadata=None): ErrorResponder.fill_error(return_obj, error=ex, connector=self.connector) return return_obj + def get_pagesize(self): + try: + return self.entry_point.get_pagesize() + except Exception as ex: + max_result_window = None + ErrorResponder.fill_error(max_result_window, error=ex, connector=self.connector) + return max_result_window + + def delete(self, search_id): # Sends a request to the correct datasource, asking to terminate a specific query try: diff --git a/stix_shifter_modules/elastic_ecs/stix_transmission/api_client.py b/stix_shifter_modules/elastic_ecs/stix_transmission/api_client.py index a781acfd6..940b281cd 100644 --- a/stix_shifter_modules/elastic_ecs/stix_transmission/api_client.py +++ b/stix_shifter_modules/elastic_ecs/stix_transmission/api_client.py @@ -136,9 +136,7 @@ def search_pagination(self, query_expression, lastsortvalue=None, length=DEFAULT if not (lastsortvalue is None): extra_data = { - "search_after": [ - lastsortvalue - ] + "search_after": lastsortvalue } data.update(extra_data) diff --git a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py index 7cac37814..d5bca2073 100644 --- a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py +++ b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py @@ -16,7 +16,7 @@ def __init__(self, connection, configuration): self.connector = __name__.split('.')[1] self.max_result_window = 10000 # extract the max_result_window from elasticsearch - self.settings_connection() + self.get_pagesize() def _handle_errors(self, response, return_obj): response_code = response.code @@ -49,7 +49,7 @@ def ping_connection(self): else: raise e - def settings_connection(self): + def get_pagesize(self): response_txt = None return_obj = dict() try: @@ -66,6 +66,7 @@ def settings_connection(self): ErrorResponder.fill_error(max_result_windows, message='inconsistent max_result_window settings', connector=self.connector) self.logger.error('inconsistent max_result_window settings: ' + str(max_result_windows)) self.max_result_window = int(max_result_windows.pop()) + return self.max_result_window except Exception as e: if response_txt is not None: ErrorResponder.fill_error(return_obj, message='unexpected exception', connector=self.connector) @@ -78,14 +79,10 @@ def create_results_connection(self, query, offset, length, metadata=None): return_obj = dict() try: - # offset with -1 to indicate using search after API in ElasticSearch - if offset == -1: - # pass the last searched value in metadata argument - response = self.api_client.search_pagination(query, metadata, min(length, self.max_result_window)) - return_obj = self._handle_errors(response, return_obj) - else: - response = self.api_client.run_search(query, offset, length) - return_obj = self._handle_errors(response, return_obj) + # using search after API in ElasticSearch + # pass the last searched value in metadata argument, ignore offset argument + response = self.api_client.search_pagination(query, metadata, min(length, self.max_result_window)) + return_obj = self._handle_errors(response, return_obj) if (return_obj['success']): response_json = json.loads(return_obj["data"]) diff --git a/stix_shifter_utils/modules/base/stix_transmission/base_connector.py b/stix_shifter_utils/modules/base/stix_transmission/base_connector.py index a85b9cc6d..e6bc9c658 100644 --- a/stix_shifter_utils/modules/base/stix_transmission/base_connector.py +++ b/stix_shifter_utils/modules/base/stix_transmission/base_connector.py @@ -89,6 +89,21 @@ def ping_connection(self): """ raise NotImplementedError() + def get_pagesize(self): + """ + Creates a connection to the specified datasource to retrieve pagesize (e.g.,max_size_window) + + Args: + None. + + Returns: + dict: The return value. + keys: + success (bool): True or False + data (int): The pagesize result data + """ + raise NotImplementedError() + def create_results_stix_connection(self, entry_point, search_id, offset, length, data_source, metadata=None): stats = [] if metadata: diff --git a/stix_shifter_utils/utils/base_entry_point.py b/stix_shifter_utils/utils/base_entry_point.py index 1c9703a73..d66e64bac 100644 --- a/stix_shifter_utils/utils/base_entry_point.py +++ b/stix_shifter_utils/utils/base_entry_point.py @@ -235,6 +235,7 @@ def setup_transmission_basic(self, connection, configuration): self.set_results_connector(connector) self.set_delete_connector(connector) self.set_ping_connector(connector) + self.set_pagesize_connector(connector) def set_query_connector(self, connector): if not (isinstance(connector, (BaseConnector, BaseQueryConnector)) or issubclass(connector, BaseConnector)): @@ -291,6 +292,15 @@ def set_ping_connector(self, connector): def ping_connection(self): return self.__ping_connector.ping_connection() + def set_pagesize_connector(self, connector): + if not isinstance(connector, (BaseConnector, BasePingConnector)): + raise Exception('connector is not instance of BaseConnector or BasePingConnector') + self.__pagesize_connector = connector + + @transmission + def get_pagesize(self): + return self.__pagesize_connector.get_pagesize() + def set_async(self, is_async): self.__async = is_async From fe5ed4c38ecfe5defabe112d5c3c843b331d3563 Mon Sep 17 00:00:00 2001 From: tingdai Date: Thu, 9 Feb 2023 13:23:06 -0500 Subject: [PATCH 04/13] ignore get_pagesize() if not supported --- .../elastic_ecs/stix_transmission/connector.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py index d5bca2073..eb3ca02f0 100644 --- a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py +++ b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py @@ -16,7 +16,10 @@ def __init__(self, connection, configuration): self.connector = __name__.split('.')[1] self.max_result_window = 10000 # extract the max_result_window from elasticsearch - self.get_pagesize() + try: + self.get_pagesize() + except Exception as e: + pass def _handle_errors(self, response, return_obj): response_code = response.code From 2573ef6d539ea7d76bf030d738dd8918a5bc489e Mon Sep 17 00:00:00 2001 From: tingdai Date: Thu, 9 Feb 2023 13:55:07 -0500 Subject: [PATCH 05/13] make testcast compatibale with search_pagination() function --- .../tests/stix_transmission/test_elastic_ecs.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py b/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py index df7ce83a7..512c4be76 100644 --- a/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py +++ b/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py @@ -103,7 +103,7 @@ def test_query_response(self, mock_api_client): assert 'search_id' in query_response assert query_response['search_id'] == query - @patch('stix_shifter_modules.elastic_ecs.stix_transmission.api_client.APIClient.run_search', + @patch('stix_shifter_modules.elastic_ecs.stix_transmission.api_client.APIClient.search_pagination', autospec=True) def test_results_response(self, mock_results_response, mock_api_client): mock_api_client.return_value = None @@ -136,7 +136,8 @@ def test_results_response(self, mock_results_response, mock_api_client): "category": "network_traffic", "dataset": "dns" } - } + }, + "sort": [1555072867] } ] } @@ -166,6 +167,8 @@ def test_results_response(self, mock_results_response, mock_api_client): assert results_response['success'] assert 'data' in results_response assert len(results_response['data']) > 0 + assert len(results_response['lastsort']) >= 1 + @patch('stix_shifter_modules.elastic_ecs.stix_transmission.api_client.APIClient.run_search', autospec=True) @@ -197,7 +200,7 @@ def test_results_response_exception(self, mock_results_response, mock_api_client assert results_response['success'] is False - @patch('stix_shifter_modules.elastic_ecs.stix_transmission.api_client.APIClient.run_search', + @patch('stix_shifter_modules.elastic_ecs.stix_transmission.api_client.APIClient.search_pagination', autospec=True) def test_query_flow(self, mock_results_response, mock_api_client): mock_api_client.return_value = None @@ -230,7 +233,8 @@ def test_query_flow(self, mock_results_response, mock_api_client): "category": "network_traffic", "dataset": "dns" } - } + }, + "sort": [1555072867] } ] } @@ -266,3 +270,4 @@ def test_query_flow(self, mock_results_response, mock_api_client): assert results_response is not None assert 'data' in results_response assert len(results_response['data']) > 0 + assert len(results_response['lastsort']) >= 1 From 10a501f1e8230763bf92e42e434e813d799d69c2 Mon Sep 17 00:00:00 2001 From: tingdai Date: Tue, 14 Feb 2023 14:57:22 -0500 Subject: [PATCH 06/13] remove the exposion of get_pagesize() API to external/upper layler applications --- .../stix_transmission/stix_transmission.py | 9 --------- .../base/stix_transmission/base_connector.py | 15 --------------- stix_shifter_utils/utils/base_entry_point.py | 10 ---------- 3 files changed, 34 deletions(-) diff --git a/stix_shifter/stix_transmission/stix_transmission.py b/stix_shifter/stix_transmission/stix_transmission.py index f3ff9b78b..4ba703826 100644 --- a/stix_shifter/stix_transmission/stix_transmission.py +++ b/stix_shifter/stix_transmission/stix_transmission.py @@ -80,15 +80,6 @@ def results_stix(self, search_id, offset, length, data_source, metadata=None): ErrorResponder.fill_error(return_obj, error=ex, connector=self.connector) return return_obj - def get_pagesize(self): - try: - return self.entry_point.get_pagesize() - except Exception as ex: - max_result_window = None - ErrorResponder.fill_error(max_result_window, error=ex, connector=self.connector) - return max_result_window - - def delete(self, search_id): # Sends a request to the correct datasource, asking to terminate a specific query try: diff --git a/stix_shifter_utils/modules/base/stix_transmission/base_connector.py b/stix_shifter_utils/modules/base/stix_transmission/base_connector.py index e6bc9c658..a85b9cc6d 100644 --- a/stix_shifter_utils/modules/base/stix_transmission/base_connector.py +++ b/stix_shifter_utils/modules/base/stix_transmission/base_connector.py @@ -89,21 +89,6 @@ def ping_connection(self): """ raise NotImplementedError() - def get_pagesize(self): - """ - Creates a connection to the specified datasource to retrieve pagesize (e.g.,max_size_window) - - Args: - None. - - Returns: - dict: The return value. - keys: - success (bool): True or False - data (int): The pagesize result data - """ - raise NotImplementedError() - def create_results_stix_connection(self, entry_point, search_id, offset, length, data_source, metadata=None): stats = [] if metadata: diff --git a/stix_shifter_utils/utils/base_entry_point.py b/stix_shifter_utils/utils/base_entry_point.py index d66e64bac..1c9703a73 100644 --- a/stix_shifter_utils/utils/base_entry_point.py +++ b/stix_shifter_utils/utils/base_entry_point.py @@ -235,7 +235,6 @@ def setup_transmission_basic(self, connection, configuration): self.set_results_connector(connector) self.set_delete_connector(connector) self.set_ping_connector(connector) - self.set_pagesize_connector(connector) def set_query_connector(self, connector): if not (isinstance(connector, (BaseConnector, BaseQueryConnector)) or issubclass(connector, BaseConnector)): @@ -292,15 +291,6 @@ def set_ping_connector(self, connector): def ping_connection(self): return self.__ping_connector.ping_connection() - def set_pagesize_connector(self, connector): - if not isinstance(connector, (BaseConnector, BasePingConnector)): - raise Exception('connector is not instance of BaseConnector or BasePingConnector') - self.__pagesize_connector = connector - - @transmission - def get_pagesize(self): - return self.__pagesize_connector.get_pagesize() - def set_async(self, is_async): self.__async = is_async From 435a31689eb905251b580be5f3517e716a3e8bcb Mon Sep 17 00:00:00 2001 From: tingdai Date: Tue, 14 Feb 2023 16:05:31 -0500 Subject: [PATCH 07/13] when the search_after returns empty list, returns the metadata argument as lastsort value. --- .../elastic_ecs/stix_transmission/connector.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py index eb3ca02f0..10f957250 100644 --- a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py +++ b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py @@ -94,9 +94,10 @@ def create_results_connection(self, query, offset, length, metadata=None): self.logger.error("Total # of hits:" + str(response_json['hits']['total'])) return_obj['data'] = [record['_source'] for record in response_json["hits"]["hits"]] self.logger.error("Total # of records: " + str(len(return_obj['data']))) - if 'sort' in response_json["hits"]["hits"][-1]: + if len(response_json["hits"]["hits"]) == 0: + return_obj['lastsort'] = metadata + elif 'sort' in response_json["hits"]["hits"][-1]: return_obj['lastsort'] = response_json["hits"]["hits"][-1]['sort'] - return return_obj except Exception as e: if response_txt is not None: From 29012308691222a30f4bc28e30019ce9a8b778f1 Mon Sep 17 00:00:00 2001 From: tingdai Date: Fri, 17 Feb 2023 10:18:16 -0500 Subject: [PATCH 08/13] Extract max_result_window from settings before defaults, and add testcase --- .../stix_transmission/connector.py | 21 +++++--- .../stix_transmission/test_elastic_ecs.py | 51 +++++++++++++++++++ 2 files changed, 64 insertions(+), 8 deletions(-) diff --git a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py index 10f957250..febd96687 100644 --- a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py +++ b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py @@ -60,15 +60,20 @@ def get_pagesize(self): return_obj = self._handle_errors(response, return_obj) if (return_obj['success']): response_json = json.loads(return_obj["data"]) - max_result_windows = set() + max_result_windows = list() if not (response_json is None): - for _, item_json in response_json.items(): - max_res_win = item_json['defaults']['index']['max_result_window'] - max_result_windows.add(max_res_win) - if len(max_result_windows) != 1: - ErrorResponder.fill_error(max_result_windows, message='inconsistent max_result_window settings', connector=self.connector) - self.logger.error('inconsistent max_result_window settings: ' + str(max_result_windows)) - self.max_result_window = int(max_result_windows.pop()) + for index, item_json in response_json.items(): + if 'max_result_window' in item_json['settings']['index']: + max_res_win = item_json['settings']['index']['max_result_window'] + elif 'max_result_window' in item_json['defaults']['index']: + max_res_win = item_json['defaults']['index']['max_result_window'] + else: + ErrorResponder.fill_error(item_json, + message='max_result_window is not set in index: ' + str(index), + connector=self.connector) + self.logger.error('max_result_window is not set in index: ' + str(index)) + max_result_windows.append(int(max_res_win)) + self.max_result_window = sorted(max_result_windows)[0] #return the smallest max_return_window in indices return self.max_result_window except Exception as e: if response_txt is not None: diff --git a/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py b/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py index 512c4be76..f9a3f0db4 100644 --- a/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py +++ b/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py @@ -271,3 +271,54 @@ def test_query_flow(self, mock_results_response, mock_api_client): assert 'data' in results_response assert len(results_response['data']) > 0 assert len(results_response['lastsort']) >= 1 + + + @patch('stix_shifter_modules.elastic_ecs.stix_transmission.api_client.APIClient.get_max_result_window', + autospec=True) + def test_pagesize(self, mock_results_response, mock_api_client): + mock_api_client.return_value = None + mocked_return_value = """ { + "index1": { + "settings": { + "index": { + "creation_date": "1676581113776" + } + }, + "defaults": { + "index": { + "max_result_window": "20000" + } + } + }, + "index2": { + "settings": { + "index": { + "max_result_window": "30000", + "creation_date": "1676580367477" + } + }, + "defaults": { + "index": { + } + } + } + } """ + mock_results_response.return_value = ElasticEcsMockResponse(200, mocked_return_value) + + config = { + "auth": { + "username": "bla", + "password": "bla" + } + } + connection = { + "host": "hostbla", + "port": 8080, + "selfSignedCert": "cert", + "indices": "index1,index2" + } + + transmission = stix_transmission.StixTransmission('elastic_ecs', connection, config) + + assert transmission.entry_point._BaseEntryPoint__results_connector.max_result_window == 20000 + From 707ef4fa2872f2418c5ea9475fdcb558c9e4bf94 Mon Sep 17 00:00:00 2001 From: tingdai Date: Tue, 21 Feb 2023 14:14:52 -0500 Subject: [PATCH 09/13] reformat thet get_pagesize() function --- .../elastic_ecs/stix_transmission/connector.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py index febd96687..408c4a842 100644 --- a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py +++ b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py @@ -17,7 +17,7 @@ def __init__(self, connection, configuration): self.max_result_window = 10000 # extract the max_result_window from elasticsearch try: - self.get_pagesize() + self.max_result_window = self.get_pagesize() except Exception as e: pass @@ -73,8 +73,8 @@ def get_pagesize(self): connector=self.connector) self.logger.error('max_result_window is not set in index: ' + str(index)) max_result_windows.append(int(max_res_win)) - self.max_result_window = sorted(max_result_windows)[0] #return the smallest max_return_window in indices - return self.max_result_window + max_result_window = sorted(max_result_windows)[0] #return the smallest max_return_window in indices + return max_result_window except Exception as e: if response_txt is not None: ErrorResponder.fill_error(return_obj, message='unexpected exception', connector=self.connector) From 6e16dd1cd8f5494fd8f2ee51338ad6f38923a4bc Mon Sep 17 00:00:00 2001 From: tingdai Date: Tue, 28 Feb 2023 14:38:05 -0500 Subject: [PATCH 10/13] add int cast before length in case it is passed as string --- stix_shifter_modules/elastic_ecs/stix_transmission/connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py index 408c4a842..23019e9ca 100644 --- a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py +++ b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py @@ -89,7 +89,7 @@ def create_results_connection(self, query, offset, length, metadata=None): try: # using search after API in ElasticSearch # pass the last searched value in metadata argument, ignore offset argument - response = self.api_client.search_pagination(query, metadata, min(length, self.max_result_window)) + response = self.api_client.search_pagination(query, metadata, min(int(length), self.max_result_window)) return_obj = self._handle_errors(response, return_obj) if (return_obj['success']): From 1f007316ae8fa428b25263785c75788e0e1ae194 Mon Sep 17 00:00:00 2001 From: tingdai Date: Tue, 28 Feb 2023 14:41:56 -0500 Subject: [PATCH 11/13] remove depricated run_search() function --- .../stix_transmission/api_client.py | 54 ------------------- 1 file changed, 54 deletions(-) diff --git a/stix_shifter_modules/elastic_ecs/stix_transmission/api_client.py b/stix_shifter_modules/elastic_ecs/stix_transmission/api_client.py index 940b281cd..c8b79409d 100644 --- a/stix_shifter_modules/elastic_ecs/stix_transmission/api_client.py +++ b/stix_shifter_modules/elastic_ecs/stix_transmission/api_client.py @@ -56,60 +56,6 @@ def __init__(self, connection, configuration): def ping_box(self): return self.client.call_api(self.PING_ENDPOINT, 'GET', timeout=self.timeout) - def run_search(self, query_expression, offset=None, length=DEFAULT_LIMIT): - headers = dict() - headers['Content-Type'] = 'application/json' - - endpoint = self.endpoint - - uri_search = False # For testing and debugging two ways of _search API methods - - # URI Search - if uri_search: - if query_expression is not None: - # update/add size value - if length is not None: - if re.search(r"&size=\d+", query_expression): - query_expression = re.sub(r"(?<=&size=)\d+", str(length), query_expression) - else: - query_expression = '{}&size={}'.format(query_expression, length) - - # add offset to query expression - if offset is not None: - query_expression = '{}&from={}'.format(query_expression, offset) - - # addition of QueryString to API END point - endpoint = endpoint + '?q=' + query_expression - - return self.client.call_api(endpoint, 'GET', headers, timeout=self.timeout) - # Request body search - else: - # add size value - if length is not None: - endpoint = "{}?size={}".format(endpoint, length) - - # add offset value - if offset is not None: - endpoint = "{}&from={}".format(endpoint, offset) - - data = { - "_source": { - "includes": ["@timestamp", "source.*", "destination.*", "event.*", "client.*", "server.*", - "host.*", "network.*", "process.*", "user.*", "file.*", "url.*", "registry.*", "dns.*", - "tags"] - }, - "query": { - "query_string": { - "query": query_expression - } - } - } - - self.logger.debug("URL endpoint: " + endpoint) - self.logger.debug("URL data: " + json.dumps(data)) - - return self.client.call_api(endpoint, 'GET', headers, data=json.dumps(data), timeout=self.timeout) - def search_pagination(self, query_expression, lastsortvalue=None, length=DEFAULT_LIMIT): headers = dict() headers['Content-Type'] = 'application/json' From d0571da25dfe8d461326f9aff6fa1d0d921460cb Mon Sep 17 00:00:00 2001 From: tingdai Date: Wed, 1 Mar 2023 11:49:52 -0500 Subject: [PATCH 12/13] replace run_search() with search_pagination() in testcase --- .../elastic_ecs/tests/stix_transmission/test_elastic_ecs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py b/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py index f9a3f0db4..b24ac9571 100644 --- a/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py +++ b/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py @@ -170,7 +170,7 @@ def test_results_response(self, mock_results_response, mock_api_client): assert len(results_response['lastsort']) >= 1 - @patch('stix_shifter_modules.elastic_ecs.stix_transmission.api_client.APIClient.run_search', + @patch('stix_shifter_modules.elastic_ecs.stix_transmission.api_client.APIClient.search_pagination', autospec=True) def test_results_response_exception(self, mock_results_response, mock_api_client): mock_api_client.return_value = None From 87833349d3ebe89bce70d4df1063a907a53b670c Mon Sep 17 00:00:00 2001 From: tingdai Date: Wed, 1 Mar 2023 14:01:15 -0500 Subject: [PATCH 13/13] replace lastsort with metadata --- .../elastic_ecs/stix_transmission/connector.py | 4 ++-- .../elastic_ecs/tests/stix_transmission/test_elastic_ecs.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py index 23019e9ca..ec874c9cb 100644 --- a/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py +++ b/stix_shifter_modules/elastic_ecs/stix_transmission/connector.py @@ -100,9 +100,9 @@ def create_results_connection(self, query, offset, length, metadata=None): return_obj['data'] = [record['_source'] for record in response_json["hits"]["hits"]] self.logger.error("Total # of records: " + str(len(return_obj['data']))) if len(response_json["hits"]["hits"]) == 0: - return_obj['lastsort'] = metadata + return_obj['metadata'] = metadata elif 'sort' in response_json["hits"]["hits"][-1]: - return_obj['lastsort'] = response_json["hits"]["hits"][-1]['sort'] + return_obj['metadata'] = response_json["hits"]["hits"][-1]['sort'] return return_obj except Exception as e: if response_txt is not None: diff --git a/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py b/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py index b24ac9571..fab295011 100644 --- a/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py +++ b/stix_shifter_modules/elastic_ecs/tests/stix_transmission/test_elastic_ecs.py @@ -167,7 +167,7 @@ def test_results_response(self, mock_results_response, mock_api_client): assert results_response['success'] assert 'data' in results_response assert len(results_response['data']) > 0 - assert len(results_response['lastsort']) >= 1 + assert len(results_response['metadata']) >= 1 @patch('stix_shifter_modules.elastic_ecs.stix_transmission.api_client.APIClient.search_pagination', @@ -270,7 +270,7 @@ def test_query_flow(self, mock_results_response, mock_api_client): assert results_response is not None assert 'data' in results_response assert len(results_response['data']) > 0 - assert len(results_response['lastsort']) >= 1 + assert len(results_response['metadata']) >= 1 @patch('stix_shifter_modules.elastic_ecs.stix_transmission.api_client.APIClient.get_max_result_window',