Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support large query with elastic search_after pagination #1299

Merged
merged 20 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1e89e28
support large query with elastic search_after pagination
tingdai Jan 27, 2023
4b39359
change implementation by moving the pagination loop from stix-shifter…
tingdai Jan 30, 2023
8af9893
expose get_pagesize API in stix-transmission module
tingdai Feb 3, 2023
95af8c6
Merge branch 'develop' into largequery
mdazam1942 Feb 8, 2023
12f58cb
Merge branch 'opencybersecurityalliance:develop' into largequery
tingdai Feb 9, 2023
fe5ed4c
ignore get_pagesize() if not supported
tingdai Feb 9, 2023
2573ef6
make testcast compatibale with search_pagination() function
tingdai Feb 9, 2023
10a501f
remove the exposion of get_pagesize() API to external/upper layler ap…
tingdai Feb 14, 2023
435a316
when the search_after returns empty list, returns the metadata argume…
tingdai Feb 14, 2023
2901230
Extract max_result_window from settings before defaults, and add test…
tingdai Feb 17, 2023
707ef4f
reformat thet get_pagesize() function
tingdai Feb 21, 2023
3cc4da8
Merge branch 'develop' into largequery
delliott90 Feb 28, 2023
6e16dd1
add int cast before length in case it is passed as string
tingdai Feb 28, 2023
414e6ee
Merge branch 'largequery' of https://github.com/tingdai/stix-shifter-…
tingdai Feb 28, 2023
1f00731
remove depricated run_search() function
tingdai Feb 28, 2023
3c374e7
Merge branch 'develop' into largequery
delliott90 Mar 1, 2023
d0571da
replace run_search() with search_pagination() in testcase
tingdai Mar 1, 2023
fc21dc7
Merge branch 'largequery' of https://github.com/tingdai/stix-shifter-…
tingdai Mar 1, 2023
8783334
replace lastsort with metadata
tingdai Mar 1, 2023
0a8d238
Merge branch 'develop' into largequery
mdazam1942 Mar 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions stix_shifter/stix_transmission/stix_transmission.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ def results_stix(self, search_id, offset, length, data_source, metadata=None):
ErrorResponder.fill_error(return_obj, error=ex, connector=self.connector)
return return_obj

def get_pagesize(self):
try:
return self.entry_point.get_pagesize()
except Exception as ex:
max_result_window = None
ErrorResponder.fill_error(max_result_window, error=ex, connector=self.connector)
return max_result_window


def delete(self, search_id):
# Sends a request to the correct datasource, asking to terminate a specific query
try:
Expand Down
69 changes: 64 additions & 5 deletions stix_shifter_modules/elastic_ecs/stix_transmission/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ def __init__(self, connection, configuration):
self.indices = ",".join(self.indices)

if self.indices:
self.endpoint = self.indices + '/' +'_search'
self.endpoint = self.indices + '/' + '_search'
self.pit_endpoint = self.indices + '/' + '_pit'
self.setting_endpoint = self.indices + '/' + '_settings'
else:
self.endpoint = '_search'
self.pit_endpoint = '_pit'
self.setting_endpoint = '_settings'

if auth:
if 'username' in auth and 'password' in auth:
Expand All @@ -46,11 +50,11 @@ def __init__(self, connection, configuration):
cert_verify=connection.get('selfSignedCert', True),
sni=connection.get('sni', None)
)

self.timeout = connection['options'].get('timeout')

def ping_box(self):
return self.client.call_api(self.PING_ENDPOINT, 'GET',timeout=self.timeout)
return self.client.call_api(self.PING_ENDPOINT, 'GET', timeout=self.timeout)

def run_search(self, query_expression, offset=None, length=DEFAULT_LIMIT):
headers = dict()
Expand Down Expand Up @@ -91,11 +95,12 @@ def run_search(self, query_expression, offset=None, length=DEFAULT_LIMIT):
data = {
"_source": {
"includes": ["@timestamp", "source.*", "destination.*", "event.*", "client.*", "server.*",
"host.*","network.*", "process.*", "user.*", "file.*", "url.*", "registry.*", "dns.*", "tags"]
"host.*", "network.*", "process.*", "user.*", "file.*", "url.*", "registry.*", "dns.*",
"tags"]
},
"query": {
"query_string": {
"query": query_expression
"query": query_expression
}
}
}
Expand All @@ -104,3 +109,57 @@ def run_search(self, query_expression, offset=None, length=DEFAULT_LIMIT):
self.logger.debug("URL data: " + json.dumps(data))

return self.client.call_api(endpoint, 'GET', headers, data=json.dumps(data), timeout=self.timeout)

def search_pagination(self, query_expression, lastsortvalue=None, length=DEFAULT_LIMIT):
headers = dict()
headers['Content-Type'] = 'application/json'
endpoint = self.endpoint
# add size value
if length is not None:
endpoint = "{}?size={}".format(endpoint, length)

data = {
"_source": {
"includes": ["@timestamp", "source.*", "destination.*", "event.*", "client.*", "server.*",
"host.*", "network.*", "process.*", "user.*", "file.*", "url.*", "registry.*", "dns.*",
"tags"]
},
"query": {
"query_string": {
"query": query_expression
}
},
"sort": [
{"@timestamp": "asc"},
]
}

if not (lastsortvalue is None):
extra_data = {
"search_after": lastsortvalue
}
data.update(extra_data)

self.logger.debug("URL endpoint: " + endpoint)
self.logger.debug("URL data: " + json.dumps(data))

return self.client.call_api(endpoint, 'GET', headers, data=json.dumps(data), timeout=self.timeout)

def get_max_result_window(self):
# GET winlogbeat-*/_settings?include_defaults=true
endpoint = self.setting_endpoint
endpoint = "{}?include_defaults=true".format(endpoint)
return self.client.call_api(endpoint, 'GET', timeout=self.timeout)

def set_pit(self):
headers = dict()
headers['Content-Type'] = 'application/json'

# GET PIT
# POST /my-index-000001/_pit?keep_alive=1m
endpoint = "{}?keep_alive=1m&pretty".format(self.pit_endpoint)

return self.client.call_api(endpoint, 'POST', headers, timeout=self.timeout)



53 changes: 51 additions & 2 deletions stix_shifter_modules/elastic_ecs/stix_transmission/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ def __init__(self, connection, configuration):
self.api_client = APIClient(connection, configuration)
self.logger = logger.set_logger(__name__)
self.connector = __name__.split('.')[1]
self.max_result_window = 10000
# extract the max_result_window from elasticsearch
self.get_pagesize()

def _handle_errors(self, response, return_obj):
response_code = response.code
Expand All @@ -35,6 +38,8 @@ def ping_connection(self):
response_txt = None
return_obj = dict()
try:
# test the pit
#print(self.set_point_in_time())
response = self.api_client.ping_box()
return self._handle_errors(response, return_obj)
except Exception as e:
Expand All @@ -44,12 +49,39 @@ def ping_connection(self):
else:
raise e

def create_results_connection(self, query, offset, length):
def get_pagesize(self):
response_txt = None
return_obj = dict()
try:
response = self.api_client.get_max_result_window()
return_obj = self._handle_errors(response, return_obj)
if (return_obj['success']):
response_json = json.loads(return_obj["data"])
max_result_windows = set()
if not (response_json is None):
for _, item_json in response_json.items():
max_res_win = item_json['defaults']['index']['max_result_window']
max_result_windows.add(max_res_win)
if len(max_result_windows) != 1:
ErrorResponder.fill_error(max_result_windows, message='inconsistent max_result_window settings', connector=self.connector)
self.logger.error('inconsistent max_result_window settings: ' + str(max_result_windows))
self.max_result_window = int(max_result_windows.pop())
return self.max_result_window
except Exception as e:
if response_txt is not None:
ErrorResponder.fill_error(return_obj, message='unexpected exception', connector=self.connector)
self.logger.error('can not parse response: ' + str(response_txt))
else:
raise e

def create_results_connection(self, query, offset, length, metadata=None):
response_txt = None
return_obj = dict()

try:
response = self.api_client.run_search(query, offset, length)
tingdai marked this conversation as resolved.
Show resolved Hide resolved
# using search after API in ElasticSearch
# pass the last searched value in metadata argument, ignore offset argument
response = self.api_client.search_pagination(query, metadata, min(length, self.max_result_window))
tingdai marked this conversation as resolved.
Show resolved Hide resolved
return_obj = self._handle_errors(response, return_obj)

if (return_obj['success']):
Expand All @@ -59,6 +91,8 @@ def create_results_connection(self, query, offset, length):
self.logger.error("Total # of hits:" + str(response_json['hits']['total']))
return_obj['data'] = [record['_source'] for record in response_json["hits"]["hits"]]
self.logger.error("Total # of records: " + str(len(return_obj['data'])))
if 'sort' in response_json["hits"]["hits"][-1]:
return_obj['lastsort'] = response_json["hits"]["hits"][-1]['sort']
tingdai marked this conversation as resolved.
Show resolved Hide resolved

return return_obj
except Exception as e:
Expand All @@ -67,3 +101,18 @@ def create_results_connection(self, query, offset, length):
self.logger.error('can not parse response: ' + str(response_txt))
else:
raise e



def set_point_in_time(self):
response_txt = None
return_obj = dict()
try:
response = self.api_client.set_pit()
return self._handle_errors(response, return_obj)
except Exception as e:
if response_txt is not None:
ErrorResponder.fill_error(return_obj, message='unexpected exception', connector=self.connector)
self.logger.error('can not parse response: ' + str(response_txt))
else:
raise e
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ def ping_connection(self):
"""
raise NotImplementedError()

def get_pagesize(self):
"""
Creates a connection to the specified datasource to retrieve pagesize (e.g.,max_size_window)

Args:
None.

Returns:
dict: The return value.
keys:
success (bool): True or False
data (int): The pagesize result data
"""
raise NotImplementedError()

def create_results_stix_connection(self, entry_point, search_id, offset, length, data_source, metadata=None):
stats = []
if metadata:
Expand Down
10 changes: 10 additions & 0 deletions stix_shifter_utils/utils/base_entry_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def setup_transmission_basic(self, connection, configuration):
self.set_results_connector(connector)
self.set_delete_connector(connector)
self.set_ping_connector(connector)
self.set_pagesize_connector(connector)

def set_query_connector(self, connector):
if not (isinstance(connector, (BaseConnector, BaseQueryConnector)) or issubclass(connector, BaseConnector)):
Expand Down Expand Up @@ -291,6 +292,15 @@ def set_ping_connector(self, connector):
def ping_connection(self):
return self.__ping_connector.ping_connection()

def set_pagesize_connector(self, connector):
if not isinstance(connector, (BaseConnector, BasePingConnector)):
raise Exception('connector is not instance of BaseConnector or BasePingConnector')
self.__pagesize_connector = connector

@transmission
def get_pagesize(self):
return self.__pagesize_connector.get_pagesize()

def set_async(self, is_async):
self.__async = is_async

Expand Down