diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 000000000..c5ba7493f Binary files /dev/null and b/.DS_Store differ diff --git a/elastalert/elastalert.py b/elastalert/elastalert.py index bee8537e1..f4e81fa32 100755 --- a/elastalert/elastalert.py +++ b/elastalert/elastalert.py @@ -453,45 +453,26 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False): return hits - def get_new_terms_data(self, rule, starttime, endtime, field): - new_terms = [] - - rule_inst = rule["type"] + + def get_terms_data(self,rule, starttime, endtime): + data = {} + rule_inst = rule['type'] try: - query = rule_inst.get_new_term_query(starttime,endtime,field) - request = get_msearch_query(query,rule) - res = self.thread_data.current_es.msearch(body=request) - res = res['responses'][0] - - if 'aggregations' in res: - buckets = res['aggregations']['values']['buckets'] + for field in rule['fields']: + terms, counts = rule_inst.get_terms_data(self.thread_data.current_es,starttime,endtime,field) + self.thread_data.num_hits += len(terms) + terms_counts_pair = ( terms, counts ) if type(field) == list: - for bucket in buckets: - new_terms += rule_inst.flatten_aggregation_hierarchy(bucket) - else: - new_terms = [bucket['key'] for bucket in buckets] - + data[tuple(field)] = terms_counts_pair + else: + data[field] = terms_counts_pair except ElasticsearchException as e: if len(str(e)) > 1024: e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024) - self.handle_error('Error running new terms query: %s' % (e), {'rule': rule['name'], 'query': query}) - return [] - - return new_terms - + self.handle_error('Error running new terms query: %s' % (e), {'rule': rule['name'], 'query': rule_inst.get_new_term_query(starttime, endtime,field)}) + return {endtime: {}} - - - def get_new_terms(self,rule, starttime, endtime): - data = {} - - for field in rule['fields']: - new_terms = self.get_new_terms_data(rule,starttime,endtime,field) - self.thread_data.num_hits += len(new_terms) - if type(field) == list: - data[tuple(field)] = new_terms - else: - data[field] = new_terms + lt = rule.get('use_local_time') status_log = "Queried rule %s from %s to %s: %s / %s hits" % ( @@ -749,7 +730,7 @@ def run_query(self, rule, start=None, end=None, scroll=False): rule['scrolling_cycle'] = rule.get('scrolling_cycle', 0) + 1 index = self.get_index(rule, start, end) if isinstance(rule_inst, NewTermsRule): - data = self.get_new_terms(rule, start, end) + data = self.get_terms_data(rule, start, end) elif rule.get('use_count_query'): data = self.get_hits_count(rule, start, end, index) elif rule.get('use_terms_query'): @@ -770,7 +751,7 @@ def run_query(self, rule, start=None, end=None, scroll=False): return False elif data: if isinstance(rule_inst, NewTermsRule): - rule_inst.add_new_term_data(data) + rule_inst.add_terms_data(data) elif rule.get('use_count_query'): rule_inst.add_count_data(data) elif rule.get('use_terms_query'): diff --git a/elastalert/ruletypes.py b/elastalert/ruletypes.py index 95f0cabfc..48ea4854a 100644 --- a/elastalert/ruletypes.py +++ b/elastalert/ruletypes.py @@ -3,6 +3,7 @@ import datetime import sys import time +import itertools from sortedcontainers import SortedKeyList as sortedlist @@ -410,6 +411,104 @@ def append_middle(self, event): self.running_count += event[1] self.data.rotate(-rotation) +class TermsWindow: + + """ For each field configured in new_term rule, This term window is created and maintained. + A sliding window is maintained and count of all the existing terms are stored. + + data - Sliding window which holds the queried terms and counts along with timestamp. This list is sorted in ascending order based on the timestamp + existing_terms - A set containing existing terms. mainly used for looking up new terms. + new_terms - Dictionary of EventWindows created for new terms. + count_dict - Dictionary containing the count of existing terms. When something is added to or popped from the sliding window - data, this count is updated + """ + def __init__(self, term_window_size, ts_field , threshold, threshold_window_size, get_ts): + self.term_window_size = term_window_size + self.ts_field = ts_field + self.threshold = threshold + self.threshold_window_size = threshold_window_size + self.get_ts = get_ts + + self.data = sortedlist(key= lambda x: x[0]) #sorted by timestamp + self.existing_terms = set() + self.potential_new_term_windows = {} + self.count_dict = {} + + """ used to add new terms and their counts for a timestamp into the sliding window - data """ + def add(self, timestamp, terms, counts): + for (term, count) in zip(terms, counts): + if term not in self.count_dict: + self.count_dict[term] = 0 + self.count_dict[term] += count + self.existing_terms.add(term) + self.data.add((timestamp, terms,counts)) + self.resize() + + """ function to split new terms and existing terms when given timestamp, terms and counts""" + def split(self,timestamp, terms, counts): + unseen_terms = [] + unseen_counts = [] + seen_terms = [] + seen_counts = [] + self.resize(till = timestamp - self.term_window_size) + for (term, count) in zip(terms, counts): + if term not in self.existing_terms: + unseen_terms.append(term) + unseen_counts.append(count) + else: + seen_terms.append(term) + seen_counts.append(count) + return seen_terms, seen_counts, unseen_terms, unseen_counts + + """ function to update the potential new terms windows""" + def update_potential_new_term_windows(self, timestamp, unseen_terms, unseen_counts): + for (term, count) in zip(unseen_terms, unseen_counts): + event = ({self.ts_field: timestamp}, count) + window = self.potential_new_term_windows.setdefault( term , EventWindow(self.threshold_window_size, getTimestamp=self.get_ts)) + window.append(event) + + + """function to get the matched new_terms that have crossed the threshold configured""" + def extract_new_terms(self, potential_new_terms, potential_term_counts): + new_terms = [] + new_counts = [] + for (potential_new_term, potential_term_count) in zip(potential_new_terms, potential_term_counts): + window = self.potential_new_term_windows.get(potential_new_term) + if window.count() >= self.threshold: + new_terms.append(potential_new_term) + new_counts.append(potential_term_count) + self.potential_new_term_windows.pop(potential_new_term) + return new_terms, new_counts + + def get_new_terms(self, timestamp, terms, counts): + existing_terms, existing_counts, potential_new_terms, potential_term_counts = self.split(timestamp, terms, counts) # Split the potential_new_terms and existing terms along with their counts based on current timestamp + self.update_potential_new_term_windows(timestamp, potential_new_terms, potential_term_counts) # Update the potential_new_term_windows + new_terms, new_counts = self.extract_new_terms( potential_new_terms, potential_term_counts) # extract and delete new terms from the potential_new_terms_window. + self.add(timestamp, existing_terms + new_terms, existing_counts + new_counts) # Add the exiting terms and new_terms to the terms_window + return new_terms, new_counts + + + """ This fn makes sure that the duration of the sliding window does not exceed term_window_size + all the events with their timestamp lesser than 'till' are popped and the counts of keys in popped events are subtracted from count_dict + After subtraction, if a term's count reaches 0, they are removed from count_dict and existing_terms, i.e they have not occured in terms_window duration + by default, till = (last event's timestamp - term_window_size ) , + """ + def resize(self, till=None): + if len(self.data)==0: + return + + if till == None: + till = self.data[-1][0] - self.term_window_size + + while len(self.data)!=0 and self.data[0][0] < till: + timestamp, keys, counts = self.data.pop(0) + for i in range(len(keys)): + self.count_dict[keys[i]] -= counts[i] + if self.count_dict[keys[i]] <= 0: + self.count_dict.pop(keys[i]) + self.existing_terms.discard(keys[i]) + + + class SpikeRule(RuleType): """ A rule that uses two sliding windows to compare relative event frequency. """ @@ -676,22 +775,26 @@ class NewTermsRule(RuleType): def __init__(self, rule, args=None): super(NewTermsRule, self).__init__(rule, args) - self.seen_values = {} + self.term_windows = {} self.last_updated_at = None self.es = kibana_adapter_client(self.rules) + self.ts_field = self.rules.get('timestamp_field', '@timestamp') + self.get_ts = new_get_event_ts(self.ts_field) + self.new_terms = {} + + self.threshold = rule.get('threshold',0) # terms_window_size : Default & Upperbound - 7 Days self.window_size = min(datetime.timedelta(**self.rules.get('terms_window_size', {'days': 7})), datetime.timedelta(**{'days': 7})) - # window_step_size : Default - 1 Days, Lowerbound: 6 hours - self.step = max( datetime.timedelta(**self.rules.get('window_step_size', {'days': 1})), datetime.timedelta(**{'hours': 6}) ) + self.step = datetime.timedelta(**{'hours': 1}) - # refresh_interval : Default - 6 hours, Lowerbound: 6 hours - self.refresh_interval = max( datetime.timedelta(**self.rules.get('refresh_interval', {'hours': 6})), datetime.timedelta(**{'hours': 6}) ) - - # refresh_interval : Default - 500, Upperbound: 1000 + # terms_size : Default - 500, Upperbound: 1000 self.terms_size = min(self.rules.get('terms_size', 500),1000) + # threshold_window_size + self.threshold_window_size = min( datetime.timedelta(**self.rules.get('threshold_window_size', {'hours': 1})), datetime.timedelta(**{'days': 2}) ) + # Allow the use of query_key or fields if 'fields' not in self.rules: if 'query_key' not in self.rules: @@ -713,17 +816,12 @@ def __init__(self, rule, args=None): if self.rules.get('use_keyword_postfix', False): # making it false by default as we wont use the keyword suffix elastalert_logger.warn('Warning: If query_key is a non-keyword field, you must set ' 'use_keyword_postfix to false, or add .keyword/.raw to your query_key.') - - def should_refresh_terms(self): - return self.last_updated_at is None or self.last_updated_at < ( ts_now() - self.refresh_interval) - - def update_terms(self,args=None): try: self.get_all_terms(args=args) except Exception as e: # Refuse to start if we cannot get existing terms raise EAException('Error searching for existing terms: %s' % (repr(e))).with_traceback(sys.exc_info()[2]) - + def get_new_term_query(self,starttime,endtime,field): @@ -771,7 +869,7 @@ def get_new_term_query(self,starttime,endtime,field): # For composite keys, we will need to perform sub-aggregations if type(field) == list: - self.seen_values.setdefault(tuple(field), []) + self.term_windows.setdefault(tuple(field), TermsWindow(self.window_size, self.ts_field , self.threshold, self.threshold_window_size, self.get_ts)) level = query['aggs'] # Iterate on each part of the composite key and add a sub aggs clause to the elastic search query for i, sub_field in enumerate(field): @@ -784,7 +882,7 @@ def get_new_term_query(self,starttime,endtime,field): level['values']['aggs'] = {'values': {'terms': copy.deepcopy(field_name)}} level = level['values']['aggs'] else: - self.seen_values.setdefault(field, []) + self.term_windows.setdefault(field, TermsWindow(self.window_size, self.ts_field , self.threshold, self.threshold_window_size, self.get_ts)) # For non-composite keys, only a single agg is needed if self.rules.get('use_keyword_postfix', False):# making it false by default as we wont use the keyword suffix field_name['field'] = add_raw_postfix(field, True) @@ -793,6 +891,32 @@ def get_new_term_query(self,starttime,endtime,field): return query + def get_terms_data(self, es, starttime, endtime, field, request_timeout= None): + terms = [] + counts = [] + query = self.get_new_term_query(starttime,endtime,field) + request = get_msearch_query(query,self.rules) + + if request_timeout == None: + res = es.msearch(body=request) + else: + res = es.msearch(body=request, request_timeout=request_timeout) + res = res['responses'][0] + + if 'aggregations' in res: + buckets = res['aggregations']['values']['buckets'] + if type(field) == list: + for bucket in buckets: + keys, doc_counts = self.flatten_aggregation_hierarchy(bucket) + terms += keys + counts += doc_counts + else: + for bucket in buckets: + terms.append(bucket['key']) + counts.append(bucket['doc_count']) + + return terms, counts + @@ -810,43 +934,18 @@ def get_all_terms(self,args): for field in self.fields: tmp_start = start - tmp_end = min(start + self.step, end) - query = self.get_new_term_query(tmp_start,tmp_end,field) - + # Query the entire time range in small chunks while tmp_start < end: - - msearch_query = get_msearch_query(query,self.rules) - - res = self.es.msearch(msearch_query,request_timeout=50) - res = res['responses'][0] - - if 'aggregations' in res: - buckets = res['aggregations']['values']['buckets'] - if type(field) == list: - # For composite keys, make the lookup based on all fields - # Make it a tuple since it can be hashed and used in dictionary lookups - for bucket in buckets: - # We need to walk down the hierarchy and obtain the value at each level - self.seen_values[tuple(field)] += self.flatten_aggregation_hierarchy(bucket) - else: - keys = [bucket['key'] for bucket in buckets] - self.seen_values[field] += keys - else: - if type(field) == list: - self.seen_values.setdefault(tuple(field), []) - else: - self.seen_values.setdefault(field, []) - if tmp_start == tmp_end: - break - tmp_start = tmp_end tmp_end = min(tmp_start + self.step, end) - query = self.get_new_term_query(tmp_start,tmp_end,field) + terms, counts = self.get_terms_data(self.es, tmp_start, tmp_end, field, request_timeout=50) + self.term_windows[self.get_lookup_key(field)].add(tmp_end,terms,counts) + tmp_start = tmp_end - for key, values in self.seen_values.items(): - if not values: - if type(key) == tuple: + for lookup_key, window in self.term_windows.items(): + if not window.existing_terms: + if type(lookup_key) == tuple: # If we don't have any results, it could either be because of the absence of any baseline data # OR it may be because the composite key contained a non-primitive type. Either way, give the # end-users a heads up to help them debug what might be going on. @@ -857,9 +956,8 @@ def get_all_terms(self,args): else: elastalert_logger.info('Found no values for %s' % (field)) continue - self.seen_values[key] = list(set(values)) - elastalert_logger.info('Found %s unique values for %s' % (len(set(values)), key)) - self.last_updated_at = ts_now() + elastalert_logger.info('Found %s unique values for %s' % (len(window.existing_terms), lookup_key)) + # self.last_updated_at = ts_now() def flatten_aggregation_hierarchy(self, root, hierarchy_tuple=()): """ For nested aggregations, the results come back in the following format: @@ -950,75 +1048,88 @@ def flatten_aggregation_hierarchy(self, root, hierarchy_tuple=()): A similar formatting will be performed in the add_data method and used as the basis for comparison """ - results = [] + final_keys = [] + final_counts = [] # There are more aggregation hierarchies left. Traverse them. if 'values' in root: - results += self.flatten_aggregation_hierarchy(root['values']['buckets'], hierarchy_tuple + (root['key'],)) + keys, counts = self.flatten_aggregation_hierarchy(root['values']['buckets'], hierarchy_tuple + (root['key'],)) + final_keys += keys + final_counts += counts else: # We've gotten to a sub-aggregation, which may have further sub-aggregations # See if we need to traverse further for node in root: if 'values' in node: - results += self.flatten_aggregation_hierarchy(node, hierarchy_tuple) + keys, counts = self.flatten_aggregation_hierarchy(node, hierarchy_tuple) + final_keys += keys + final_counts += counts else: - results.append(hierarchy_tuple + (node['key'],)) - return results + final_keys.append(hierarchy_tuple + (node['key'],)) + final_counts.append(node['doc_count']) + return final_keys, final_counts - def add_new_term_data(self, payload): - if self.should_refresh_terms(): - self.update_terms() + def add_terms_data(self, payload): timestamp = list(payload.keys())[0] data = payload[timestamp] for field in self.fields: - lookup_key =tuple(field) if type(field) == list else field - for value in data[lookup_key]: - if value not in self.seen_values[lookup_key]: - match = { - "field": lookup_key, - self.rules['timestamp_field']: timestamp, - 'new_value': tuple(value) if type(field) == list else value - } - self.add_match(copy.deepcopy(match)) - self.seen_values[lookup_key].append(value) - - def add_data(self, data): - for document in data: - for field in self.fields: - value = () - lookup_field = field - if type(field) == list: - # For composite keys, make the lookup based on all fields - # Make it a tuple since it can be hashed and used in dictionary lookups - lookup_field = tuple(field) - for sub_field in field: - lookup_result = lookup_es_key(document, sub_field) - if not lookup_result: - value = None - break - value += (lookup_result,) - else: - value = lookup_es_key(document, field) - if not value and self.rules.get('alert_on_missing_field'): - document['missing_field'] = lookup_field - self.add_match(copy.deepcopy(document)) - elif value: - if value not in self.seen_values[lookup_field]: - document['new_field'] = lookup_field - self.add_match(copy.deepcopy(document)) - self.seen_values[lookup_field].append(value) - - def add_terms_data(self, terms): - # With terms query, len(self.fields) is always 1 and the 0'th entry is always a string - field = self.fields[0] - for timestamp, buckets in terms.items(): - for bucket in buckets: - if bucket['doc_count']: - if bucket['key'] not in self.seen_values[field]: - match = {field: bucket['key'], - self.rules['timestamp_field']: timestamp, - 'new_field': field} - self.add_match(match) - self.seen_values[field].append(bucket['key']) + lookup_key = self.get_lookup_key(field) + keys, counts = data[lookup_key] + + new_terms, new_counts = self.term_windows[lookup_key].get_new_terms(timestamp, keys, counts ) + + # append and get all match keys and counts + for (new_term, new_count) in zip(new_terms, new_counts): + match = { + "field": lookup_key, + self.rules['timestamp_field']: timestamp, + "new_value": tuple(new_term) if type(new_term) == list else new_term, + "hits" : new_count + } + self.add_match(copy.deepcopy(match)) + + ### NOT USED ANYMORE ### + # def add_data(self, data): + # for document in data: + # for field in self.fields: + # value = () + # lookup_field = field + # if type(field) == list: + # # For composite keys, make the lookup based on all fields + # # Make it a tuple since it can be hashed and used in dictionary lookups + # lookup_field = tuple(field) + # for sub_field in field: + # lookup_result = lookup_es_key(document, sub_field) + # if not lookup_result: + # value = None + # break + # value += (lookup_result,) + # else: + # value = lookup_es_key(document, field) + # if not value and self.rules.get('alert_on_missing_field'): + # document['missing_field'] = lookup_field + # self.add_match(copy.deepcopy(document)) + # elif value: + # if value not in self.seen_values[lookup_field]: + # document['new_field'] = lookup_field + # self.add_match(copy.deepcopy(document)) + # self.seen_values[lookup_field].append(value) + + ### NOT USED ANYMORE ### + # def add_terms_data(self, terms): + # # With terms query, len(self.fields) is always 1 and the 0'th entry is always a string + # field = self.fields[0] + # for timestamp, buckets in terms.items(): + # for bucket in buckets: + # if bucket['doc_count']: + # if bucket['key'] not in self.seen_values[field]: + # match = {field: bucket['key'], + # self.rules['timestamp_field']: timestamp, + # 'new_field': field} + # self.add_match(match) + # self.seen_values[field].append(bucket['key']) + + def get_lookup_key(self,field): + return tuple(field) if type(field) == list else field class CardinalityRule(RuleType): diff --git a/examples/ex_flatline.yaml b/examples/ex_flatline.yaml new file mode 100644 index 000000000..70cd7033e --- /dev/null +++ b/examples/ex_flatline.yaml @@ -0,0 +1,20 @@ +name: freshemail debug rule +type: flatline +index: traces* +threshold: 3 +# use_count_query: true +timestamp_field: timestamp +timeframe: + minutes: 1 +filter: +- query: + query_string: + query: "*" +alert: +- "debug" +scan_entire_timeframe: true + +realert: + minutes: 0 +query_delay: + minutes: 3 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index ff8a2accd..570b9c945 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ envparse>=0.2.0 exotel==0.1.5 Jinja2==3.1.2 jira>=3.1.1 -jsonschema>=4.4.0 +jsonschema==4.17.3 mock>=2.0.0 prison>=0.2.1 prometheus_client>=0.13.1 diff --git a/tests/rules_test.py b/tests/rules_test.py index ee9299879..05a144500 100644 --- a/tests/rules_test.py +++ b/tests/rules_test.py @@ -620,14 +620,12 @@ def record_args(*args, **kwargs): # Key1 and key2 shouldn't cause a match data = { ts_now() : { - "a": ["key1"], - "b": ["key2"] + "a": (["key1"],[1]), + "b": (["key2"], [1]) } } - rule.add_new_term_data(data) + rule.add_terms_data(data) - # 30 day default range, 1 day default step, times 2 fields - assert rule.es.msearch.call_count == 14 # rule.add_data([{'@timestamp': ts_now(), 'a': 'key1', 'b': 'key2'}]) @@ -636,22 +634,22 @@ def record_args(*args, **kwargs): # Neither will missing values data = { ts_now() : { - "a": ["key2"], - "b": [] + "a": (["key2"],[1]), + "b": ([],[]) } } - rule.add_new_term_data(data) + rule.add_terms_data(data) # rule.add_data([{'@timestamp': ts_now(), 'a': 'key2'}]) assert rule.matches == [] # Key3 causes an alert for field b data = { ts_now() : { - "a": ["key2"], - "b": ["key3"] + "a": (["key2"],[1]), + "b": (["key3"],[1]) } } - rule.add_new_term_data(data) + rule.add_terms_data(data) #rule.add_data([{'@timestamp': ts_now(), 'a': 'key2', 'b': 'key3'}]) assert len(rule.matches) == 1 @@ -662,11 +660,11 @@ def record_args(*args, **kwargs): # Key3 doesn't cause another alert for field b data = { ts_now() : { - "a": ["key2"], - "b": ["key3"] + "a": (["key2"],[1]), + "b": (["key3"],[1]) } } - rule.add_new_term_data(data) + rule.add_terms_data(data) # rule.add_data([{'@timestamp': ts_now(), 'a': 'key2', 'b': 'key3'}]) assert rule.matches == [] @@ -683,7 +681,7 @@ def record_args(*args, **kwargs): # "b": [] # } # } - # rule.add_new_term_data(data) + # rule.add_terms_data(data) # #rule.add_data([{'@timestamp': ts_now(), 'a': 'key2'}]) # assert len(rule.matches) == 1 # assert rule.matches[0]['missing_field'] == 'b' @@ -708,13 +706,11 @@ def test_new_term_nested_field(): # Key3 causes an alert for nested field b.c data = { ts_now() : { - "a": [], - "b.c": ["key3"] + "a": ([],[]), + "b.c": (["key3"],[1]) } } - rule.add_new_term_data(data) - - assert rule.es.msearch.call_count == 14 + rule.add_terms_data(data) # rule.add_data([{'@timestamp': ts_now(), 'b': {'c': 'key3'}}]) assert len(rule.matches) == 1 @@ -722,104 +718,65 @@ def test_new_term_nested_field(): assert rule.matches[0]['new_value'] == 'key3' rule.matches = [] -def test_new_term_refresh_interval(): +def test_new_term_window_updates(): rules = {'fields': ['a'], 'timestamp_field': '@timestamp', 'kibana_adapter_host': 'example.com', 'kibana_adapter_port': 10, 'index': 'logstash', - 'ts_to_dt': ts_to_dt, 'dt_to_ts': dt_to_ts, 'terms_window_size': {'days': 1 } } - mock_res ={'responses' : [{'aggregations': {'values': {'buckets': [{'key': 'key1', 'doc_count': 1}, + 'ts_to_dt': ts_to_dt, 'dt_to_ts': dt_to_ts, 'terms_window_size': {'hours': 3 }, 'threshold': 20, 'threshold_window_size': {'hours': 1} } + mock_res ={'responses' : [{'aggregations': {'values': {'buckets': [{'key': 'key1', 'doc_count': 5}, {'key': 'key2', 'doc_count': 5}]}}}] } - #random_test_data - data = { ts_now() : { "a": [] } } + #empty_test_data + time_pointer = ts_now() + with mock.patch('elastalert.ruletypes.kibana_adapter_client') as mock_es: mock_es.return_value = mock.Mock() mock_es.return_value.msearch.return_value = mock_res mock_es.return_value.info.return_value = {'version': {'number': '2.x.x'}} - - - # Rule with refresh_interval not set, defaulting to 6 hours - rule = NewTermsRule(rules) - - - # get_all_terms should not be called as last_updated will be less than now - 6 hours - rule.add_new_term_data(data) - assert rule.es.msearch.assert_called - mock_es.return_value.msearch.reset_mock() - - # get_all_terms should be called when last_updated is none - rule.last_updated_at = None - rule.add_new_term_data(data) - assert rule.es.msearch.assert_called_once - mock_es.return_value.msearch.reset_mock() - - # get_all_terms should not be called as last_updated will not be less than now - 6 hours - rule.last_updated_at = ts_now() - datetime.timedelta(**{'hours': 4}) - rule.add_new_term_data(data) - assert not rule.es.msearch.called - - # get_all_terms should be called as last_updated will be less than now - 6 hours - rule.last_updated_at = ts_now() - datetime.timedelta(**{'hours': 7}) - rule.add_new_term_data(data) - assert rule.es.msearch.assert_called_once - mock_es.return_value.msearch.reset_mock() - - # Rule with refresh_interval set to 2 hours - rules["refresh_interval"] = {'hours': 2} rule = NewTermsRule(rules) - mock_es.return_value.msearch.reset_mock() - - # get_all_terms should not be called as last_updated will not be less than now - 2 hours - rule.last_updated_at = ts_now() - datetime.timedelta(**{'hours': 1}) - rule.add_new_term_data(data) - assert not rule.es.msearch.called - - # get_all_terms should be called as last_updated will be less than now - 2 hours - rule.last_updated_at = ts_now() - datetime.timedelta(**{'hours': 3}) - rule.add_new_term_data(data) - assert rule.es.msearch.assert_called_once - - -## New implementation will never use with_terms -# def test_new_term_with_terms(): -# rules = {'fields': ['a'], -# 'timestamp_field': '@timestamp', -# 'kibana_adapter_host': 'example.com', 'kibana_adapter_port': 10, 'index': 'logstash', 'query_key': 'a', -# 'window_step_size': {'days': 2}, -# 'ts_to_dt': ts_to_dt, 'dt_to_ts': dt_to_ts} -# mock_res = {'responses' : [{'aggregations': {'values': {'buckets': [{'key': 'key1', 'doc_count': 1}, -# {'key': 'key2', 'doc_count': 5}]}}}]} + + # key 2 keeps occuring every 1 hour + for i in range(4): + time_pointer += datetime.timedelta(hours=1) + data = { time_pointer : { "a": (['key2'],[5]) } } + rule.add_terms_data(data) + + # 4 hours later, if key1 comes again, match should come + data = { time_pointer : { "a": (['key1'],[20]) } } + rule.add_terms_data(data) + assert len(rule.matches) == 1 -# with mock.patch('elastalert.ruletypes.kibana_adapter_client') as mock_es: -# mock_es.return_value = mock.Mock() -# mock_es.return_value.msearch.return_value = mock_res -# mock_es.return_value.info.return_value = {'version': {'number': '2.x.x'}} -# rule = NewTermsRule(rules) + # if key1 comes again in the next 2 hour 59 minutes, match woundnt come, as it is now in existing terms + time_pointer += datetime.timedelta(hours=2, minutes=59) + data = { time_pointer : { "a": (['key1'],[20]) } } + rule.add_terms_data(data) + assert len(rule.matches) == 1 -# # Only 4 queries because of custom step size -# assert rule.es.msearch.call_count == 4 + # 3 hours later, if same key comes. it will be considered new term, but since threshold isnt reached no matches + time_pointer += datetime.timedelta(hours=3, minutes=1) + data = { time_pointer : { "a": (['key1'],[1]) } } + rule.add_terms_data(data) + assert len(rule.matches) == 1 -# # Key1 and key2 shouldn't cause a match -# terms = {ts_now(): [{'key': 'key1', 'doc_count': 1}, -# {'key': 'key2', 'doc_count': 1}]} -# rule.add_terms_data(terms) -# assert rule.matches == [] -# # Key3 causes an alert for field a -# terms = {ts_now(): [{'key': 'key3', 'doc_count': 1}]} -# rule.add_terms_data(terms) -# assert len(rule.matches) == 1 -# assert rule.matches[0]['new_field'] == 'a' -# assert rule.matches[0]['a'] == 'key3' -# rule.matches = [] + #in next 30 mins, threshold is reached and match is found + time_pointer += datetime.timedelta(minutes= 30) + data = { time_pointer : { "a": (['key1'],[19]) } } + rule.add_terms_data(data) + assert len(rule.matches) == 2 -# # Key3 doesn't cause another alert -# terms = {ts_now(): [{'key': 'key3', 'doc_count': 1}]} -# rule.add_terms_data(terms) -# assert rule.matches == [] + #another new term causing match + time_pointer += datetime.timedelta(minutes= 30) + data = { time_pointer : { "a": (['key2'],[21]) } } + rule.add_terms_data(data) + assert len(rule.matches) == 3 + time_pointer += datetime.timedelta(minutes= 40) + data = { time_pointer : { "a": (['key2'],[21]) } } + rule.add_terms_data(data) + assert len(rule.matches) == 3 def test_new_term_with_composite_fields(): rules = {'fields': [['a', 'b', 'c'], ['d', 'e.f']], @@ -864,48 +821,227 @@ def test_new_term_with_composite_fields(): mock_es.return_value.info.return_value = {'version': {'number': '2.x.x'}} rule = NewTermsRule(rules) - # key3 already exists, and thus shouldn't cause a match data = { ts_now() : { - tuple(['a','b','c']): [tuple(["key1","key2","key3"])], - tuple(['d','e.f']): [] + tuple(['a','b','c']): ([tuple(["key1","key2","key3"])],[1]), + tuple(['d','e.f']): ([],[]) } } - rule.add_new_term_data(data) + rule.add_terms_data(data) assert rule.matches == [] - assert rule.es.msearch.call_count == 14 - # key5 causes an alert for composite field [a, b, c] data = { ts_now() : { - ('a', 'b', 'c'): [("key1","key2","key5")], - ('d','e.f'): [] + ('a', 'b', 'c'): ([("key1","key2","key5")],[1]), + ('d','e.f'): ([],[]) + } + } + rule.add_terms_data(data) + assert len(rule.matches) == 1 + assert rule.matches[0]['field'] == ('a', 'b', 'c') + assert rule.matches[0]['new_value'] == ("key1","key2","key5") + rule.matches = [] + + # testing same with Threshold Window and Threshold + + rules['threshold'] = 10 + rules['threshold_window_size'] = {'hours': 6} + + + with mock.patch('elastalert.ruletypes.kibana_adapter_client') as mock_es: + mock_es.return_value = mock.Mock() + mock_es.return_value.msearch.return_value = mock_res + mock_es.return_value.info.return_value = {'version': {'number': '2.x.x'}} + rule = NewTermsRule(rules) + + time_pointer = ts_now() + + # will not cause match + data = { + time_pointer : { + ('a', 'b', 'c'): ([("key1","key2","key4")],[1]), + ('d','e.f'): ([],[]) + } + } + rule.add_terms_data(data) + assert len(rule.matches) == 0 + rule.matches = [] + + # will not cause match, as threshold wont be reached + time_pointer += datetime.timedelta(hours = 1) + data = { + time_pointer : { + ('a', 'b', 'c'): ([("key1","key2","key5")],[9]), + ('d','e.f'): ([],[]) } } - rule.add_new_term_data(data) + rule.add_terms_data(data) + assert len(rule.matches) == 0 + + + # will cause match, as threshold will be reached + data = { + time_pointer : { + ('a', 'b', 'c'): ([("key1","key2","key5")],[1]), + ('d','e.f'): ([],[]) + } + } + rule.add_terms_data(data) assert len(rule.matches) == 1 assert rule.matches[0]['field'] == ('a', 'b', 'c') assert rule.matches[0]['new_value'] == ("key1","key2","key5") rule.matches = [] + #test composite flatten buckets + keys,counts = rule.flatten_aggregation_hierarchy(mock_res['responses'][0]['aggregations']['values']['buckets']) + assert keys == [('key1', 'key2', 'key3'), ('key1', 'key2', 'key4')] + assert counts == [3, 2] + +def test_new_term_threshold(): + rules = {'fields': ['a'], + 'timestamp_field': '@timestamp', + 'kibana_adapter': 'example.com', 'kibana_adapter_port': 10, 'index': 'logstash', + 'ts_to_dt': ts_to_dt, 'dt_to_ts': dt_to_ts, 'terms_window_size': {'days': 10 }, + 'window_step_size' : {'hours': 1 }, 'terms_size': 10000, 'threshold': 0 } + + mock_res ={'responses' : [{'aggregations': {'values': {'buckets': [{'key': 'key1', 'doc_count': 1}]}}}] } + + with mock.patch('elastalert.ruletypes.kibana_adapter_client') as mock_es: + mock_es.return_value = mock.Mock() + mock_es.return_value.msearch.return_value = mock_res + rule = NewTermsRule(rules) + + + # introducting new value for field a, should trigger as threshold is 0 + data = { + ts_now() : { + ('a'): (["key2"],[1]) + } + } + rule.add_terms_data(data) + assert len(rule.matches) == 1 + + # changing threshold to 10 and threhold_duration to 2 hours + rules['threshold'] = 10 + rules['threshold_window_size'] = {"hours" : 2} + + # used for incrementing time + time_pointer = ts_now() + + with mock.patch('elastalert.ruletypes.kibana_adapter_client') as mock_es: + mock_es.return_value = mock.Mock() + mock_es.return_value.msearch.return_value = mock_res + rule = NewTermsRule(rules) + + # new value for field 'a' with count 8, shouldnt create a match + data = { + time_pointer : { + ('a'): (["key2"],[8]) + } + } + rule.add_terms_data(data) + assert len(rule.matches) == 0 + + + # new value for field 'a' with count 8 after 3 hours, shouldnt create a match + + time_pointer += datetime.timedelta(**{"hours":3}) + + data = { + time_pointer : { + ('a'): (["key2"],[8]) + } + } + rule.add_terms_data(data) + assert len(rule.matches) == 0 + + # new value for field a with count 2 after 10 minutes + # should create a match as the total count stored for the last 2 hours would be 10 + time_pointer += datetime.timedelta(**{"minutes":10}) + + data = { + time_pointer : { + ('a'): (["key1","key2"],[1,2]) + } + } + rule.add_terms_data(data) + assert len(rule.matches) == 1 + + # no new matches should be added, when the rule crosses the threshold the second time + + time_pointer += datetime.timedelta(**{"minutes":10}) + + data = { + time_pointer : { + ('a'): (["key2"],[20]) + } + } + rule.add_terms_data(data) + assert len(rule.matches) == 1 + def test_new_term_bounds(): rules = {'fields': ['a'], 'timestamp_field': '@timestamp', 'kibana_adapter': 'example.com', 'kibana_adapter_port': 10, 'index': 'logstash', 'ts_to_dt': ts_to_dt, 'dt_to_ts': dt_to_ts, 'terms_window_size': {'days': 10 }, - 'window_step_size' : {'hours': 1 },'refresh_interval' : {'hours': 2 }, 'terms_size': 10000 } + 'window_step_size' : {'hours': 1 }, 'terms_size': 10000, 'threshold_window_size': {"days": 3} } - rule = NewTermsRule(rules) + mock_res ={'responses' : [{'aggregations': {'values': {'buckets': [{'key': 'key1', 'doc_count': 1}, + {'key': 'key2', 'doc_count': 5}]}}}] } + + with mock.patch('elastalert.ruletypes.kibana_adapter_client') as mock_es: + mock_es.return_value = mock.Mock() + mock_es.return_value.msearch.return_value = mock_res + rule = NewTermsRule(rules) assert rule.window_size == datetime.timedelta(**{'days': 7}) - assert rule.step == datetime.timedelta(**{'hours': 6}) - assert rule.refresh_interval == datetime.timedelta(**{'hours': 6}) + assert rule.threshold_window_size == datetime.timedelta(**{'days': 2}) assert rule.terms_size == 1000 +## New implementation will never use with_terms +# def test_new_term_with_terms(): +# rules = {'fields': ['a'], +# 'timestamp_field': '@timestamp', +# 'kibana_adapter_host': 'example.com', 'kibana_adapter_port': 10, 'index': 'logstash', 'query_key': 'a', +# 'window_step_size': {'days': 2}, +# 'ts_to_dt': ts_to_dt, 'dt_to_ts': dt_to_ts} +# mock_res = {'responses' : [{'aggregations': {'values': {'buckets': [{'key': 'key1', 'doc_count': 1}, +# {'key': 'key2', 'doc_count': 5}]}}}]} + +# with mock.patch('elastalert.ruletypes.kibana_adapter_client') as mock_es: +# mock_es.return_value = mock.Mock() +# mock_es.return_value.msearch.return_value = mock_res +# mock_es.return_value.info.return_value = {'version': {'number': '2.x.x'}} +# rule = NewTermsRule(rules) + +# # Only 4 queries because of custom step size +# assert rule.es.msearch.call_count == 4 + +# # Key1 and key2 shouldn't cause a match +# terms = {ts_now(): [{'key': 'key1', 'doc_count': 1}, +# {'key': 'key2', 'doc_count': 1}]} +# rule.add_terms_data(terms) +# assert rule.matches == [] + +# # Key3 causes an alert for field a +# terms = {ts_now(): [{'key': 'key3', 'doc_count': 1}]} +# rule.add_terms_data(terms) +# assert len(rule.matches) == 1 +# assert rule.matches[0]['new_field'] == 'a' +# assert rule.matches[0]['a'] == 'key3' +# rule.matches = [] + +# # Key3 doesn't cause another alert +# terms = {ts_now(): [{'key': 'key3', 'doc_count': 1}]} +# rule.add_terms_data(terms) +# assert rule.matches == [] + + + def test_flatline(): events = hits(40) rules = {