Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elastalert instrumentation upgrade #19

Open
wants to merge 5 commits into
base: fw_2.9.0_migration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 27 additions & 19 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def __init__(self, args):
self.rule_hashes = self.rules_loader.get_hashes(self.conf, self.args.rule)
self.starttime = self.args.start
self.disabled_rules = []
self.rules_not_running = []
self.replace_dots_in_field_names = self.conf.get('replace_dots_in_field_names', False)
self.thread_data.alerts_sent = 0
self.thread_data.num_hits = 0
Expand Down Expand Up @@ -394,6 +395,8 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False):
:return: A list of hits, bounded by rule['max_query_size'] (or self.max_query_size).
"""



query = self.get_query(
rule['filter'],
starttime,
Expand Down Expand Up @@ -449,7 +452,7 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False):
# (so big that they will fill the entire terminal buffer)
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
self.handle_error('Error running query: %s' % (e), {'rule': rule['name'], 'query': query})
self.handle_error('Error running query: %s' % (e), {'rule': rule['name'], 'query': query},rule=rule)
return None
hits = res['hits']['hits']
self.thread_data.num_hits += len(hits)
Expand Down Expand Up @@ -500,7 +503,7 @@ def get_hits_count(self, rule, starttime, endtime, index):
# (so big that they will fill the entire terminal buffer)
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
self.handle_error('Error running count query: %s' % (e), {'rule': rule['name'], 'query': query})
self.handle_error('Error running count query: %s' % (e), {'rule': rule['name'], 'query': query}, rule=rule)
return None

self.thread_data.num_hits += res['hits']['total']
Expand Down Expand Up @@ -554,7 +557,7 @@ def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=Non
# (so big that they will fill the entire terminal buffer)
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
self.handle_error('Error running terms query: %s' % (e), {'rule': rule['name'], 'query': query})
self.handle_error('Error running terms query: %s' % (e), {'rule': rule['name'], 'query': query}, rule=rule)
return None

if 'aggregations' not in res:
Expand Down Expand Up @@ -590,7 +593,7 @@ def get_hits_aggregation(self, rule, starttime, endtime, index, query_key, term_
except ElasticsearchException as e:
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
self.handle_error('Error running query: %s' % (e), {'rule': rule['name']})
self.handle_error('Error running query: %s' % (e), {'rule': rule['name']}, rule=rule)
return None
if 'aggregations' not in res:
return {}
Expand Down Expand Up @@ -656,7 +659,7 @@ def get_ch_data(self, rule, starttime, endtime, agg_key, freshquery,aggregation)
except requests.exceptions.RequestException as e:
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
self.handle_error('Error running query: %s' % (e), {'rule': rule['name']})
self.handle_error('Error running query: %s' % (e), {'rule': rule['name']}, rule=rule)
return None,0
res = json.loads(res.content)
return int(res['data'][0][agg_key]), res['rows']
Expand Down Expand Up @@ -788,7 +791,7 @@ def get_starttime(self, rule):
elastalert_logger.info("Found expired previous run for %s at %s" % (rule['name'], endtime))
return None
except (ElasticsearchException, KeyError) as e:
self.handle_error('Error querying for last run: %s' % (e), {'rule': rule['name']})
self.handle_error('Error querying for last run: %s' % (e), {'rule': rule['name']}, rule=rule)

def set_starttime(self, rule, endtime):
""" Given a rule and an endtime, sets the appropriate starttime for it. """
Expand Down Expand Up @@ -1034,7 +1037,7 @@ def run_rule(self, rule, endtime, starttime=None):
try:
enhancement.process(match)
except EAException as e:
self.handle_error("Error running match enhancement: %s" % (e), {'rule': rule['name']})
self.handle_error("Error running match enhancement: %s" % (e), {'rule': rule['name']}, rule=rule)
except DropMatchException:
continue

Expand All @@ -1059,7 +1062,7 @@ def run_rule(self, rule, endtime, starttime=None):
'hits': max(self.thread_data.num_hits, self.thread_data.cumulative_hits),
'@timestamp': ts_now(),
'time_taken': time_taken}
self.writeback('elastalert_status', body)
self.writeback('elastalert_status', body, rule)

# Write metrics about the run to statsd
if self.statsd:
Expand Down Expand Up @@ -1269,6 +1272,10 @@ def start(self):
if next_run < datetime.datetime.utcnow():
continue

for rule in self.rules_not_running:
print('coming in for')
self.handle_error("[rule-not-running] %s is disabled and not running" % rule['name'],rule=rule)

# Show disabled rules
if self.show_disabled_rules:
elastalert_logger.info("Disabled rules are: %s" % (str(self.get_disabled_rules())))
Expand Down Expand Up @@ -1368,7 +1375,7 @@ def handle_rule_execution(self, rule):
try:
num_matches = self.run_rule(rule, endtime, rule.get('initial_starttime'))
except EAException as e:
self.handle_error("Error running rule %s: %s" % (rule['name'], e), {'rule': rule['name']})
self.handle_error("Error running rule %s: %s" % (rule['name'], e), {'rule': rule['name']}, rule=rule)
except Exception as e:
self.handle_uncaught_exception(e, rule)
else:
Expand Down Expand Up @@ -1487,7 +1494,7 @@ def send_alert(self, matches, rule, alert_time=None, retried=False):
except DropMatchException:
pass
except EAException as e:
self.handle_error("Error running match enhancement: %s" % (e), {'rule': rule['name']})
self.handle_error("Error running match enhancement: %s" % (e), {'rule': rule['name']}, rule=rule)
matches = valid_matches
if not matches:
return None
Expand All @@ -1509,7 +1516,7 @@ def send_alert(self, matches, rule, alert_time=None, retried=False):
try:
alert.alert(matches)
except EAException as e:
self.handle_error('Error while running alert %s: %s' % (alert.get_info()['type'], e), {'rule': rule['name']})
self.handle_error('Error while running alert %s: %s' % (alert.get_info()['type'], e), {'rule': rule['name']}, rule=rule)
alert_exception = str(e)
else:
self.thread_data.alerts_sent += 1
Expand Down Expand Up @@ -1667,7 +1674,7 @@ def send_pending_alerts(self):
doc_type='elastalert',
id=_id)
except ElasticsearchException: # TODO: Give this a more relevant exception, try:except: is evil.
self.handle_error("Failed to delete alert %s at %s" % (_id, alert_time))
self.handle_error("Failed to delete alert %s at %s" % (_id, alert_time), rule=rule)

# Send in memory aggregated alerts
for rule in self.rules:
Expand Down Expand Up @@ -1727,7 +1734,7 @@ def find_pending_aggregate_alert(self, rule, aggregation_key_value=None):
if len(res['hits']['hits']) == 0:
return None
except (KeyError, ElasticsearchException) as e:
self.handle_error("Error searching for pending aggregated matches: %s" % (e), {'rule_name': rule['name']})
self.handle_error("Error searching for pending aggregated matches: %s" % (e), {'rule_name': rule['name']}, rule=rule)
return None

return res['hits']['hits'][0]
Expand Down Expand Up @@ -1766,7 +1773,7 @@ def add_aggregated_alert(self, match, rule):
iter = croniter(rule['aggregation']['schedule'], ts_now())
alert_time = unix_to_dt(iter.get_next())
except Exception as e:
self.handle_error("Error parsing aggregate send time Cron format %s" % (e), rule['aggregation']['schedule'])
self.handle_error("Error parsing aggregate send time Cron format %s" % (e), rule['aggregation']['schedule'], rule=rule)
else:
try:
if rule.get('aggregate_by_match_time', False):
Expand All @@ -1775,7 +1782,7 @@ def add_aggregated_alert(self, match, rule):
else:
alert_time = ts_now() + rule['aggregation']
except Exception as e:
self.handle_error("[add_aggregated_alert]Error parsing aggregate send time format %s" % (e), rule['aggregation'])
self.handle_error("[add_aggregated_alert]Error parsing aggregate send time format %s" % (e), rule['aggregation'], rule=rule)

rule['aggregate_alert_time'][aggregation_key_value] = alert_time
agg_id = None
Expand Down Expand Up @@ -1885,23 +1892,24 @@ def is_silenced(self, rule_name):
return True
return False

def handle_error(self, message, data=None):
def handle_error(self, message, data=None, rule=None):
''' Logs message at error level and writes message, data and traceback to Elasticsearch. '''
elastalert_logger.error(message)
body = {'message': message}
tb = traceback.format_exc()
body['traceback'] = tb.strip().split('\n')
if data:
body['data'] = data
self.writeback('elastalert_error', body)
self.writeback('elastalert_error', body, rule)

def handle_uncaught_exception(self, exception, rule):
""" Disables a rule and sends a notification. """
elastalert_logger.error(traceback.format_exc())
self.handle_error('Uncaught exception running rule %s: %s' % (rule['name'], exception), {'rule': rule['name']})
self.handle_error('Uncaught exception running rule %s: %s' % (rule['name'], exception), {'rule': rule['name']}, rule=rule)
if self.disable_rules_on_error:
self.rules = [running_rule for running_rule in self.rules if running_rule['name'] != rule['name']]
self.disabled_rules.append(rule)
self.rules_not_running.append(rule)
self.scheduler.pause_job(job_id=rule['name'])
elastalert_logger.info('Rule %s disabled', rule['name'])
if self.notify_email:
Expand Down Expand Up @@ -1943,7 +1951,7 @@ def send_notification_email(self, text='', exception=None, rule=None, subject=No
smtp = SMTP(self.smtp_host)
smtp.sendmail(self.from_addr, recipients, email.as_string())
except (SMTPException, error) as e:
self.handle_error('Error connecting to SMTP host: %s' % (e), {'email_body': email_body})
self.handle_error('Error connecting to SMTP host: %s' % (e), {'email_body': email_body}, rule=rule)

def get_top_counts(self, rule, starttime, endtime, keys, number=None, qk=None):
""" Counts the number of events for each unique value for each key field.
Expand Down
42 changes: 29 additions & 13 deletions elastalert/prometheus_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import prometheus_client
from elastalert.util import (elastalert_logger)


class PrometheusWrapper:
Expand All @@ -14,13 +15,15 @@ def __init__(self, client):
client.writeback = self.metrics_writeback

# initialize prometheus metrics to be exposed
self.prom_scrapes = prometheus_client.Counter('elastalert_scrapes', 'Number of scrapes for rule', ['rule_name'])
self.prom_hits = prometheus_client.Counter('elastalert_hits', 'Number of hits for rule', ['rule_name'])
self.prom_matches = prometheus_client.Counter('elastalert_matches', 'Number of matches for rule', ['rule_name'])
self.prom_time_taken = prometheus_client.Counter('elastalert_time_taken', 'Time taken to evaluate rule', ['rule_name'])
self.prom_alerts_sent = prometheus_client.Counter('elastalert_alerts_sent', 'Number of alerts sent for rule', ['rule_name'])
self.prom_alerts_not_sent = prometheus_client.Counter('elastalert_alerts_not_sent', 'Number of alerts not sent', ['rule_name'])
self.prom_errors = prometheus_client.Counter('elastalert_errors', 'Number of errors for rule')
self.prom_scrapes = prometheus_client.Counter('elastalert_scrapes', 'Number of scrapes for rule', ['rule_name', 'tenant'])
self.prom_hits = prometheus_client.Counter('elastalert_hits', 'Number of hits for rule', ['rule_name', 'tenant'])
self.prom_matches = prometheus_client.Counter('elastalert_matches', 'Number of matches for rule', ['rule_name', 'tenant'])
self.prom_time_taken = prometheus_client.Counter('elastalert_time_taken', 'Time taken to evaluate rule', ['rule_name', 'tenant'])
self.prom_alerts_sent = prometheus_client.Counter('elastalert_alerts_sent', 'Number of alerts sent for rule', ['rule_name', 'tenant'])
self.prom_alerts_not_sent = prometheus_client.Counter('elastalert_alerts_not_sent', 'Number of alerts not sent', ['rule_name', 'tenant'])
self.prom_errors = prometheus_client.Counter('elastalert_errors', 'Number of errors ', ['tenant'])
self.prom_errors_per_rule = prometheus_client.Counter('elastalert_errors_per_rule', 'Number of errors per rule ', ['rule_name', 'tenant' ])
self.disabled_rules = prometheus_client.Counter('elastalert_disabled_rules', 'Metric used for presence of disabled rules', ['rule_name', 'tenant' ])
self.prom_alerts_silenced = prometheus_client.Counter('elastalert_alerts_silenced', 'Number of silenced alerts', ['rule_name'])

def start(self):
Expand All @@ -38,18 +41,31 @@ def metrics_writeback(self, doc_type, body, rule=None, match_body=None):

res = self.writeback(doc_type, body)
try:
tenant = "all"

if rule is not None and "tenant" in rule:
tenant = rule["tenant"]
if doc_type == 'elastalert_status':
self.prom_hits.labels(body['rule_name']).inc(int(body['hits']))
self.prom_matches.labels(body['rule_name']).inc(int(body['matches']))
self.prom_time_taken.labels(body['rule_name']).inc(float(body['time_taken']))
self.prom_hits.labels(body['rule_name'], tenant).inc(int(body['hits']))
self.prom_matches.labels(body['rule_name'], tenant).inc(int(body['matches']))
self.prom_time_taken.labels(body['rule_name'], tenant).inc(float(body['time_taken']))
elif doc_type == 'elastalert':
if body['alert_sent']:
self.prom_alerts_sent.labels(body['rule_name']).inc()
self.prom_alerts_sent.labels(body['rule_name'], tenant).inc()
else:
self.prom_alerts_not_sent.labels(body['rule_name']).inc()
self.prom_alerts_not_sent.labels(body['rule_name'], tenant).inc()
elif doc_type == 'elastalert_error':
self.prom_errors.inc()
print('coming here to elastalert_error')
if rule is not None:
print('coming here to rule_not_none %s' % body['message'])
if '[rule-not-running]' in body['message']:
print('coming here')
self.disabled_rules.labels(rule['name'],tenant).inc()
self.prom_errors_per_rule.labels(rule['name'],tenant).inc()
self.prom_errors.labels(tenant).inc()
elif doc_type == 'silence':
self.prom_alerts_silenced.labels(body['rule_name']).inc()
except Exception as e:
elastalert_logger.error("Error in prometheus wrapper: %s" % e)
finally:
return res