Skip to content

Commit

Permalink
New term optimisation (#26)
Browse files Browse the repository at this point in the history
* New Term Optimisation - Threshold Feature enabled, removed use of refresh interval, Sliding Terms Window

* Test case updates




---------

Co-authored-by: RashmiRam <rashmi.ramanathan@freshworks.com>
  • Loading branch information
ajaywk7 and RashmiRam authored Sep 4, 2023
1 parent 71f9916 commit a8b62c7
Show file tree
Hide file tree
Showing 5 changed files with 503 additions and 255 deletions.
Binary file added .DS_Store
Binary file not shown.
51 changes: 16 additions & 35 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,45 +456,26 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False):

return hits

def get_new_terms_data(self, rule, starttime, endtime, field):
new_terms = []

rule_inst = rule["type"]

def get_terms_data(self,rule, starttime, endtime):
data = {}
rule_inst = rule['type']
try:
query = rule_inst.get_new_term_query(starttime,endtime,field)
request = get_msearch_query(query,rule)
res = self.thread_data.current_es.msearch(body=request)
res = res['responses'][0]

if 'aggregations' in res:
buckets = res['aggregations']['values']['buckets']
for field in rule['fields']:
terms, counts = rule_inst.get_terms_data(self.thread_data.current_es,starttime,endtime,field)
self.thread_data.num_hits += len(terms)
terms_counts_pair = ( terms, counts )
if type(field) == list:
for bucket in buckets:
new_terms += rule_inst.flatten_aggregation_hierarchy(bucket)
else:
new_terms = [bucket['key'] for bucket in buckets]

data[tuple(field)] = terms_counts_pair
else:
data[field] = terms_counts_pair
except ElasticsearchException as e:
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
self.handle_error('Error running new terms query: %s' % (e), {'rule': rule['name'], 'query': query})
return []

return new_terms

self.handle_error('Error running new terms query: %s' % (e), {'rule': rule['name'], 'query': rule_inst.get_new_term_query(starttime, endtime,field)})
return {endtime: {}}



def get_new_terms(self,rule, starttime, endtime):
data = {}

for field in rule['fields']:
new_terms = self.get_new_terms_data(rule,starttime,endtime,field)
self.thread_data.num_hits += len(new_terms)
if type(field) == list:
data[tuple(field)] = new_terms
else:
data[field] = new_terms


lt = rule.get('use_local_time')
status_log = "Queried rule %s from %s to %s: %s / %s hits" % (
Expand Down Expand Up @@ -752,7 +733,7 @@ def run_query(self, rule, start=None, end=None, scroll=False):
rule['scrolling_cycle'] = rule.get('scrolling_cycle', 0) + 1
index = self.get_index(rule, start, end)
if isinstance(rule_inst, NewTermsRule):
data = self.get_new_terms(rule, start, end)
data = self.get_terms_data(rule, start, end)
elif rule.get('use_count_query'):
data = self.get_hits_count(rule, start, end, index)
elif rule.get('use_terms_query'):
Expand All @@ -773,7 +754,7 @@ def run_query(self, rule, start=None, end=None, scroll=False):
return False
elif data:
if isinstance(rule_inst, NewTermsRule):
rule_inst.add_new_term_data(data)
rule_inst.add_terms_data(data)
elif rule.get('use_count_query'):
rule_inst.add_count_data(data)
elif rule.get('use_terms_query'):
Expand Down
Loading

0 comments on commit a8b62c7

Please sign in to comment.