diff --git a/elastalert/elastalert.py b/elastalert/elastalert.py index 75b8d1272..64026cb60 100755 --- a/elastalert/elastalert.py +++ b/elastalert/elastalert.py @@ -147,6 +147,7 @@ def __init__(self, args): self.rule_hashes = self.rules_loader.get_hashes(self.conf, self.args.rule) self.starttime = self.args.start self.disabled_rules = [] + self.rules_not_running = [] self.replace_dots_in_field_names = self.conf.get('replace_dots_in_field_names', False) self.thread_data.alerts_sent = 0 self.thread_data.num_hits = 0 @@ -394,6 +395,8 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False): :return: A list of hits, bounded by rule['max_query_size'] (or self.max_query_size). """ + + query = self.get_query( rule['filter'], starttime, @@ -449,7 +452,7 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False): # (so big that they will fill the entire terminal buffer) if len(str(e)) > 1024: e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024) - self.handle_error('Error running query: %s' % (e), {'rule': rule['name'], 'query': query}) + self.handle_error('Error running query: %s' % (e), {'rule': rule['name'], 'query': query},rule=rule) return None hits = res['hits']['hits'] self.thread_data.num_hits += len(hits) @@ -500,7 +503,7 @@ def get_hits_count(self, rule, starttime, endtime, index): # (so big that they will fill the entire terminal buffer) if len(str(e)) > 1024: e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024) - self.handle_error('Error running count query: %s' % (e), {'rule': rule['name'], 'query': query}) + self.handle_error('Error running count query: %s' % (e), {'rule': rule['name'], 'query': query}, rule=rule) return None self.thread_data.num_hits += res['hits']['total'] @@ -554,7 +557,7 @@ def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=Non # (so big that they will fill the entire terminal buffer) if len(str(e)) > 1024: e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024) - self.handle_error('Error running terms query: %s' % (e), {'rule': rule['name'], 'query': query}) + self.handle_error('Error running terms query: %s' % (e), {'rule': rule['name'], 'query': query}, rule=rule) return None if 'aggregations' not in res: @@ -590,7 +593,7 @@ def get_hits_aggregation(self, rule, starttime, endtime, index, query_key, term_ except ElasticsearchException as e: if len(str(e)) > 1024: e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024) - self.handle_error('Error running query: %s' % (e), {'rule': rule['name']}) + self.handle_error('Error running query: %s' % (e), {'rule': rule['name']}, rule=rule) return None if 'aggregations' not in res: return {} @@ -656,7 +659,7 @@ def get_ch_data(self, rule, starttime, endtime, agg_key, freshquery,aggregation) except requests.exceptions.RequestException as e: if len(str(e)) > 1024: e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024) - self.handle_error('Error running query: %s' % (e), {'rule': rule['name']}) + self.handle_error('Error running query: %s' % (e), {'rule': rule['name']}, rule=rule) return None,0 res = json.loads(res.content) return int(res['data'][0][agg_key]), res['rows'] @@ -788,7 +791,7 @@ def get_starttime(self, rule): elastalert_logger.info("Found expired previous run for %s at %s" % (rule['name'], endtime)) return None except (ElasticsearchException, KeyError) as e: - self.handle_error('Error querying for last run: %s' % (e), {'rule': rule['name']}) + self.handle_error('Error querying for last run: %s' % (e), {'rule': rule['name']}, rule=rule) def set_starttime(self, rule, endtime): """ Given a rule and an endtime, sets the appropriate starttime for it. """ @@ -1034,7 +1037,7 @@ def run_rule(self, rule, endtime, starttime=None): try: enhancement.process(match) except EAException as e: - self.handle_error("Error running match enhancement: %s" % (e), {'rule': rule['name']}) + self.handle_error("Error running match enhancement: %s" % (e), {'rule': rule['name']}, rule=rule) except DropMatchException: continue @@ -1059,7 +1062,7 @@ def run_rule(self, rule, endtime, starttime=None): 'hits': max(self.thread_data.num_hits, self.thread_data.cumulative_hits), '@timestamp': ts_now(), 'time_taken': time_taken} - self.writeback('elastalert_status', body) + self.writeback('elastalert_status', body, rule) # Write metrics about the run to statsd if self.statsd: @@ -1269,6 +1272,10 @@ def start(self): if next_run < datetime.datetime.utcnow(): continue + for rule in self.rules_not_running: + print('coming in for') + self.handle_error("[rule-not-running] %s is disabled and not running" % rule['name'],rule=rule) + # Show disabled rules if self.show_disabled_rules: elastalert_logger.info("Disabled rules are: %s" % (str(self.get_disabled_rules()))) @@ -1368,7 +1375,7 @@ def handle_rule_execution(self, rule): try: num_matches = self.run_rule(rule, endtime, rule.get('initial_starttime')) except EAException as e: - self.handle_error("Error running rule %s: %s" % (rule['name'], e), {'rule': rule['name']}) + self.handle_error("Error running rule %s: %s" % (rule['name'], e), {'rule': rule['name']}, rule=rule) except Exception as e: self.handle_uncaught_exception(e, rule) else: @@ -1487,7 +1494,7 @@ def send_alert(self, matches, rule, alert_time=None, retried=False): except DropMatchException: pass except EAException as e: - self.handle_error("Error running match enhancement: %s" % (e), {'rule': rule['name']}) + self.handle_error("Error running match enhancement: %s" % (e), {'rule': rule['name']}, rule=rule) matches = valid_matches if not matches: return None @@ -1509,7 +1516,7 @@ def send_alert(self, matches, rule, alert_time=None, retried=False): try: alert.alert(matches) except EAException as e: - self.handle_error('Error while running alert %s: %s' % (alert.get_info()['type'], e), {'rule': rule['name']}) + self.handle_error('Error while running alert %s: %s' % (alert.get_info()['type'], e), {'rule': rule['name']}, rule=rule) alert_exception = str(e) else: self.thread_data.alerts_sent += 1 @@ -1667,7 +1674,7 @@ def send_pending_alerts(self): doc_type='elastalert', id=_id) except ElasticsearchException: # TODO: Give this a more relevant exception, try:except: is evil. - self.handle_error("Failed to delete alert %s at %s" % (_id, alert_time)) + self.handle_error("Failed to delete alert %s at %s" % (_id, alert_time), rule=rule) # Send in memory aggregated alerts for rule in self.rules: @@ -1727,7 +1734,7 @@ def find_pending_aggregate_alert(self, rule, aggregation_key_value=None): if len(res['hits']['hits']) == 0: return None except (KeyError, ElasticsearchException) as e: - self.handle_error("Error searching for pending aggregated matches: %s" % (e), {'rule_name': rule['name']}) + self.handle_error("Error searching for pending aggregated matches: %s" % (e), {'rule_name': rule['name']}, rule=rule) return None return res['hits']['hits'][0] @@ -1766,7 +1773,7 @@ def add_aggregated_alert(self, match, rule): iter = croniter(rule['aggregation']['schedule'], ts_now()) alert_time = unix_to_dt(iter.get_next()) except Exception as e: - self.handle_error("Error parsing aggregate send time Cron format %s" % (e), rule['aggregation']['schedule']) + self.handle_error("Error parsing aggregate send time Cron format %s" % (e), rule['aggregation']['schedule'], rule=rule) else: try: if rule.get('aggregate_by_match_time', False): @@ -1775,7 +1782,7 @@ def add_aggregated_alert(self, match, rule): else: alert_time = ts_now() + rule['aggregation'] except Exception as e: - self.handle_error("[add_aggregated_alert]Error parsing aggregate send time format %s" % (e), rule['aggregation']) + self.handle_error("[add_aggregated_alert]Error parsing aggregate send time format %s" % (e), rule['aggregation'], rule=rule) rule['aggregate_alert_time'][aggregation_key_value] = alert_time agg_id = None @@ -1885,7 +1892,7 @@ def is_silenced(self, rule_name): return True return False - def handle_error(self, message, data=None): + def handle_error(self, message, data=None, rule=None): ''' Logs message at error level and writes message, data and traceback to Elasticsearch. ''' elastalert_logger.error(message) body = {'message': message} @@ -1893,15 +1900,16 @@ def handle_error(self, message, data=None): body['traceback'] = tb.strip().split('\n') if data: body['data'] = data - self.writeback('elastalert_error', body) + self.writeback('elastalert_error', body, rule) def handle_uncaught_exception(self, exception, rule): """ Disables a rule and sends a notification. """ elastalert_logger.error(traceback.format_exc()) - self.handle_error('Uncaught exception running rule %s: %s' % (rule['name'], exception), {'rule': rule['name']}) + self.handle_error('Uncaught exception running rule %s: %s' % (rule['name'], exception), {'rule': rule['name']}, rule=rule) if self.disable_rules_on_error: self.rules = [running_rule for running_rule in self.rules if running_rule['name'] != rule['name']] self.disabled_rules.append(rule) + self.rules_not_running.append(rule) self.scheduler.pause_job(job_id=rule['name']) elastalert_logger.info('Rule %s disabled', rule['name']) if self.notify_email: @@ -1943,7 +1951,7 @@ def send_notification_email(self, text='', exception=None, rule=None, subject=No smtp = SMTP(self.smtp_host) smtp.sendmail(self.from_addr, recipients, email.as_string()) except (SMTPException, error) as e: - self.handle_error('Error connecting to SMTP host: %s' % (e), {'email_body': email_body}) + self.handle_error('Error connecting to SMTP host: %s' % (e), {'email_body': email_body}, rule=rule) def get_top_counts(self, rule, starttime, endtime, keys, number=None, qk=None): """ Counts the number of events for each unique value for each key field. diff --git a/elastalert/prometheus_wrapper.py b/elastalert/prometheus_wrapper.py index e20ac0b88..50501772b 100644 --- a/elastalert/prometheus_wrapper.py +++ b/elastalert/prometheus_wrapper.py @@ -1,4 +1,5 @@ import prometheus_client +from elastalert.util import (elastalert_logger) class PrometheusWrapper: @@ -14,13 +15,15 @@ def __init__(self, client): client.writeback = self.metrics_writeback # initialize prometheus metrics to be exposed - self.prom_scrapes = prometheus_client.Counter('elastalert_scrapes', 'Number of scrapes for rule', ['rule_name']) - self.prom_hits = prometheus_client.Counter('elastalert_hits', 'Number of hits for rule', ['rule_name']) - self.prom_matches = prometheus_client.Counter('elastalert_matches', 'Number of matches for rule', ['rule_name']) - self.prom_time_taken = prometheus_client.Counter('elastalert_time_taken', 'Time taken to evaluate rule', ['rule_name']) - self.prom_alerts_sent = prometheus_client.Counter('elastalert_alerts_sent', 'Number of alerts sent for rule', ['rule_name']) - self.prom_alerts_not_sent = prometheus_client.Counter('elastalert_alerts_not_sent', 'Number of alerts not sent', ['rule_name']) - self.prom_errors = prometheus_client.Counter('elastalert_errors', 'Number of errors for rule') + self.prom_scrapes = prometheus_client.Counter('elastalert_scrapes', 'Number of scrapes for rule', ['rule_name', 'tenant']) + self.prom_hits = prometheus_client.Counter('elastalert_hits', 'Number of hits for rule', ['rule_name', 'tenant']) + self.prom_matches = prometheus_client.Counter('elastalert_matches', 'Number of matches for rule', ['rule_name', 'tenant']) + self.prom_time_taken = prometheus_client.Counter('elastalert_time_taken', 'Time taken to evaluate rule', ['rule_name', 'tenant']) + self.prom_alerts_sent = prometheus_client.Counter('elastalert_alerts_sent', 'Number of alerts sent for rule', ['rule_name', 'tenant']) + self.prom_alerts_not_sent = prometheus_client.Counter('elastalert_alerts_not_sent', 'Number of alerts not sent', ['rule_name', 'tenant']) + self.prom_errors = prometheus_client.Counter('elastalert_errors', 'Number of errors ', ['tenant']) + self.prom_errors_per_rule = prometheus_client.Counter('elastalert_errors_per_rule', 'Number of errors per rule ', ['rule_name', 'tenant' ]) + self.disabled_rules = prometheus_client.Counter('elastalert_disabled_rules', 'Metric used for presence of disabled rules', ['rule_name', 'tenant' ]) self.prom_alerts_silenced = prometheus_client.Counter('elastalert_alerts_silenced', 'Number of silenced alerts', ['rule_name']) def start(self): @@ -38,18 +41,31 @@ def metrics_writeback(self, doc_type, body, rule=None, match_body=None): res = self.writeback(doc_type, body) try: + tenant = "all" + + if rule is not None and "tenant" in rule: + tenant = rule["tenant"] if doc_type == 'elastalert_status': - self.prom_hits.labels(body['rule_name']).inc(int(body['hits'])) - self.prom_matches.labels(body['rule_name']).inc(int(body['matches'])) - self.prom_time_taken.labels(body['rule_name']).inc(float(body['time_taken'])) + self.prom_hits.labels(body['rule_name'], tenant).inc(int(body['hits'])) + self.prom_matches.labels(body['rule_name'], tenant).inc(int(body['matches'])) + self.prom_time_taken.labels(body['rule_name'], tenant).inc(float(body['time_taken'])) elif doc_type == 'elastalert': if body['alert_sent']: - self.prom_alerts_sent.labels(body['rule_name']).inc() + self.prom_alerts_sent.labels(body['rule_name'], tenant).inc() else: - self.prom_alerts_not_sent.labels(body['rule_name']).inc() + self.prom_alerts_not_sent.labels(body['rule_name'], tenant).inc() elif doc_type == 'elastalert_error': - self.prom_errors.inc() + print('coming here to elastalert_error') + if rule is not None: + print('coming here to rule_not_none %s' % body['message']) + if '[rule-not-running]' in body['message']: + print('coming here') + self.disabled_rules.labels(rule['name'],tenant).inc() + self.prom_errors_per_rule.labels(rule['name'],tenant).inc() + self.prom_errors.labels(tenant).inc() elif doc_type == 'silence': self.prom_alerts_silenced.labels(body['rule_name']).inc() + except Exception as e: + elastalert_logger.error("Error in prometheus wrapper: %s" % e) finally: return res \ No newline at end of file