From c1dca5e2e4a12fd353c6ed965009957a1707aefe Mon Sep 17 00:00:00 2001 From: sivatarunp Date: Tue, 30 Jul 2024 14:37:28 +0530 Subject: [PATCH 1/2] changing es package version --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index f1f4ec190..149d162b9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ sortedcontainers==2.4.0 boto3==1.34.129 cffi==1.16.0 croniter==2.0.5 -elasticsearch==6.3.1 +elasticsearch==7.10.1 envparse==0.2.0 exotel==0.1.5 Jinja2==3.1.2 From 5b6328c23ca04ef5cb1c5be39ac02a4bdb7bbec0 Mon Sep 17 00:00:00 2001 From: sivatarunp Date: Wed, 31 Jul 2024 12:20:19 +0530 Subject: [PATCH 2/2] changing elastalert to work for OS clusters --- elastalert/elastalert.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/elastalert/elastalert.py b/elastalert/elastalert.py index 5cf6c6210..7d17d2859 100755 --- a/elastalert/elastalert.py +++ b/elastalert/elastalert.py @@ -318,7 +318,7 @@ def get_index_start(self, index, timestamp_field='@timestamp'): query = {'sort': {timestamp_field: {'order': 'asc'}}} try: res = self.thread_data.current_es.search(index=index, size=1, body=query, - _source_includes=[timestamp_field], ignore_unavailable=True) + _source_includes=[timestamp_field], ignore_unavailable=True) except ElasticsearchException as e: self.handle_error("Elasticsearch query error: %s" % (e), {'index': index, 'query': query}) return '1969-12-30T00:00:00Z' @@ -841,8 +841,8 @@ def get_starttime(self, rule): doc_type = 'elastalert_status' index = self.writeback_es.resolve_writeback_index(self.writeback_index, doc_type) #modded for elasticsearch ver 6 library compatibility - res = self.writeback_es.search(index=index, doc_type='elastalert_status', - size=1, body=query, _source_include=['endtime', 'rule_name']) + res = self.writeback_es.search(index=index, size=1, body=query, + _source_includes=['endtime', 'rule_name']) if res['hits']['hits']: endtime = ts_to_dt(res['hits']['hits'][0]['_source']['endtime']) @@ -1650,7 +1650,7 @@ def writeback(self, doc_type, body, rule=None, match_body=None): try: index = self.writeback_es.resolve_writeback_index(self.writeback_index, doc_type) - res = self.writeback_es.index(index=index,doc_type=doc_type, body=body) + res = self.writeback_es.index(index=index, body=body) return res except ElasticsearchException as e: elastalert_logger.exception("Error writing alert info to Elasticsearch: %s" % (e)) @@ -1672,7 +1672,6 @@ def find_recent_pending_alerts(self, time_limit): try: #modded for elasticsearch ver 6 library compatibility res = self.writeback_es.search(index=self.writeback_index, - doc_type='elastalert', body=query, size=1000) if res['hits']['hits']: @@ -1727,7 +1726,6 @@ def send_pending_alerts(self): # Delete it from the index try: self.writeback_es.delete(index=self.writeback_index, - doc_type='elastalert', id=_id) except ElasticsearchException: # TODO: Give this a more relevant exception, try:except: is evil. self.handle_error("Failed to delete alert %s at %s" % (_id, alert_time)) @@ -1760,13 +1758,11 @@ def get_aggregated_matches(self, _id): try: #modded for elasticsearch ver 6 library compatibility res = self.writeback_es.search(index=self.writeback_index, - doc_type='elastalert', body=query, size=self.max_aggregation) for match in res['hits']['hits']: matches.append(match['_source']) self.writeback_es.delete(index=self.writeback_index, - doc_type='elastalert', id=match['_id']) except (KeyError, ElasticsearchException) as e: self.handle_error("Error fetching aggregated matches: %s" % (e), {'id': _id}) @@ -1931,8 +1927,8 @@ def is_silenced(self, rule_name): doc_type = 'silence' index = self.writeback_es.resolve_writeback_index(self.writeback_index, doc_type) #modded for elasticsearch ver 6 library compatibility - res = self.writeback_es.search(index=index, doc_type='silence', - size=1, body=query, _source_include=['until', 'exponent']) + res = self.writeback_es.search(index=index, size=1, body=query, + _source_includes=['until', 'exponent']) except ElasticsearchException as e: self.handle_error("Error while querying for alert silence status: %s" % (e), {'rule': rule_name})