Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New term optimisation #26

Merged
merged 12 commits into from
Sep 4, 2023
Merged
Binary file added .DS_Store
Binary file not shown.
51 changes: 16 additions & 35 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,45 +453,26 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False):

return hits

def get_new_terms_data(self, rule, starttime, endtime, field):
new_terms = []

rule_inst = rule["type"]

def get_terms_data(self,rule, starttime, endtime):
data = {}
rule_inst = rule['type']
try:
query = rule_inst.get_new_term_query(starttime,endtime,field)
request = get_msearch_query(query,rule)
res = self.thread_data.current_es.msearch(body=request)
res = res['responses'][0]

if 'aggregations' in res:
buckets = res['aggregations']['values']['buckets']
for field in rule['fields']:
terms, counts = rule_inst.get_terms_data(self.thread_data.current_es,starttime,endtime,field)
self.thread_data.num_hits += len(terms)
terms_counts_pair = ( terms, counts )
if type(field) == list:
for bucket in buckets:
new_terms += rule_inst.flatten_aggregation_hierarchy(bucket)
else:
new_terms = [bucket['key'] for bucket in buckets]

data[tuple(field)] = terms_counts_pair
else:
data[field] = terms_counts_pair
except ElasticsearchException as e:
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
self.handle_error('Error running new terms query: %s' % (e), {'rule': rule['name'], 'query': query})
return []

return new_terms

self.handle_error('Error running new terms query: %s' % (e), {'rule': rule['name'], 'query': rule_inst.get_new_term_query(starttime, endtime,field)})
return {endtime: {}}



def get_new_terms(self,rule, starttime, endtime):
data = {}

for field in rule['fields']:
new_terms = self.get_new_terms_data(rule,starttime,endtime,field)
self.thread_data.num_hits += len(new_terms)
if type(field) == list:
data[tuple(field)] = new_terms
else:
data[field] = new_terms


lt = rule.get('use_local_time')
status_log = "Queried rule %s from %s to %s: %s / %s hits" % (
Expand Down Expand Up @@ -749,7 +730,7 @@ def run_query(self, rule, start=None, end=None, scroll=False):
rule['scrolling_cycle'] = rule.get('scrolling_cycle', 0) + 1
index = self.get_index(rule, start, end)
if isinstance(rule_inst, NewTermsRule):
data = self.get_new_terms(rule, start, end)
data = self.get_terms_data(rule, start, end)
elif rule.get('use_count_query'):
data = self.get_hits_count(rule, start, end, index)
elif rule.get('use_terms_query'):
Expand All @@ -770,7 +751,7 @@ def run_query(self, rule, start=None, end=None, scroll=False):
return False
elif data:
if isinstance(rule_inst, NewTermsRule):
rule_inst.add_new_term_data(data)
rule_inst.add_terms_data(data)
elif rule.get('use_count_query'):
rule_inst.add_count_data(data)
elif rule.get('use_terms_query'):
Expand Down
Loading