From 80ec17a05913f9458496afc824246ed39f1be19b Mon Sep 17 00:00:00 2001 From: john Date: Fri, 4 Dec 2015 02:31:58 -0800 Subject: [PATCH 1/9] add support for cron expressions --- graphite_beacon/alerts.py | 103 ++++++++++++++++++++++++++++++++++---- requirements.txt | 3 +- tests.py | 3 ++ tox.ini | 1 + 4 files changed, 98 insertions(+), 12 deletions(-) diff --git a/graphite_beacon/alerts.py b/graphite_beacon/alerts.py index 7e3738b..4465895 100644 --- a/graphite_beacon/alerts.py +++ b/graphite_beacon/alerts.py @@ -15,6 +15,9 @@ import math from collections import deque, defaultdict from itertools import islice +from croniter import croniter +from datetime import datetime +from threading import Lock LOGGER = log.gen_log @@ -38,6 +41,76 @@ def __getitem__(self, index): return type(self)(islice(self, index.start, index.stop, index.step)) +def synchronized(lock): + """ Synchronization decorator. """ + + def wrap(f): + def newFunction(*args, **kw): + lock.acquire() + try: + return f(*args, **kw) + finally: + lock.release() + return newFunction + return wrap + + +def is_cron(expr): + """Detect if an expression is a valid cron expression.""" + return not(expr is None or len(expr.split()) < 5) + + +class CronCallback(object): + + """Callback that runs on a cron schedule.""" + lock = Lock() + + def __init__(self, callback, cron): + """Initialize a CronCallback object with the specified cron schedule and callback.""" + self.callback = callback + self.cron = croniter(cron) + self.is_running = False + self.handle = None + + @synchronized(lock) + def start(self): + """Start running.""" + if not self.is_running: + self.is_running = True + self.schedule_next_run() + + @synchronized(lock) + def stop(self): + """Stop running.""" + if self.is_running: + handle = self.handle + self.is_running = False + if handle: + ioloop.remove_timeout(handle) + self.handle = None + + @synchronized(lock) + def is_running(self): + """Is running.""" + return self.is_running + + @synchronized(lock) + def scheduled_run(self): + """Invoke the callback and schedule the next run.""" + if self.is_running: + self.callback() + self.schedule_next_run() + + @synchronized(lock) + def schedule_next_run(self): + """Schedule the next run of this callback.""" + if self.is_running: + now = datetime.now() + while not next or next <= now: + next = self.cron.get_next(datetime) + self.handle = ioloop.call_later((next-now).total_seconds(), self.scheduled_run) + + class AlertFabric(type): """Register alert's classes and produce an alert by source.""" @@ -109,13 +182,6 @@ def configure(self, name=None, rules=None, query=None, **options): assert query, "%s: Alert's query is invalid" % self.name self.query = query - self.interval = interval_to_graphite( - options.get('interval', self.reactor.options['interval'])) - interval = parse_interval(self.interval) - - self.time_window = interval_to_graphite( - options.get('time_window', options.get('interval', self.reactor.options['interval']))) - self.until = interval_to_graphite( options.get('until', self.reactor.options['until']) ) @@ -128,15 +194,30 @@ def configure(self, name=None, rules=None, query=None, **options): self.history_size = options.get('history_size', self.reactor.options['history_size']) self.history_size = parse_interval(self.history_size) - self.history_size = int(math.ceil(self.history_size / interval)) self.no_data = options.get('no_data', self.reactor.options['no_data']) self.loading_error = options.get('loading_error', self.reactor.options['loading_error']) - if self.reactor.options.get('debug'): - self.callback = ioloop.PeriodicCallback(self.load, 5000) + interval = options.get('interval', self.reactor.options['interval']) + + if is_cron(interval): + self.interval = interval + self.time_window = interval_to_graphite(options.get('time_window', None)) + self.time_window = interval_to_graphite( + options.get('time_window', self.reactor.options['interval'])) + if self.reactor.options.get('debug'): + self.callback = ioloop.PeriodicCallback(self.load, 5000) + else: + self.callback = CronCallback(self.load, interval) else: - self.callback = ioloop.PeriodicCallback(self.load, interval) + self.interval = interval_to_graphite(interval) + self.time_window = interval_to_graphite(options.get('time_window', interval)) + interval = parse_interval(self.interval) + self.history_size = int(math.ceil(self.history_size / interval)) + if self.reactor.options.get('debug'): + self.callback = ioloop.PeriodicCallback(self.load, 5000) + else: + self.callback = ioloop.PeriodicCallback(self.load, interval) def convert(self, value): """Convert self value.""" diff --git a/requirements.txt b/requirements.txt index 5d2cd63..f81d5e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ tornado == 4.1.0 -funcparserlib==0.3.6 \ No newline at end of file +funcparserlib==0.3.6 +croniter==0.3.10 diff --git a/tests.py b/tests.py index a8c2fef..9fe2038 100644 --- a/tests.py +++ b/tests.py @@ -76,6 +76,9 @@ def test_alert(reactor): assert alert1 == alert3 assert set([alert1, alert3]) == set([alert1]) + alert4 = BaseAlert.get(reactor, name='Test', query='*', interval='*/10 09-18 * * 1-5', time_window='1h', rules=["normal: == 0"]) + assert alert4.interval == '*/10 09-18 * * 1-5' + alert = BaseAlert.get(reactor, name='Test', query='*', rules=["warning: >= 3MB"]) assert alert.rules[0]['exprs'][0]['value'] == 3145728 diff --git a/tox.ini b/tox.ini index 530b4a7..6b6df49 100644 --- a/tox.ini +++ b/tox.ini @@ -7,6 +7,7 @@ deps = mock pytest tornado + croniter [testenv:cov] deps = From e49ef9ad0672b072314f3eab4702a0565cf7ea40 Mon Sep 17 00:00:00 2001 From: john Date: Fri, 4 Dec 2015 04:52:21 -0800 Subject: [PATCH 2/9] fix cron execution --- graphite_beacon/alerts.py | 38 ++++++++++++-------------------------- tests.py | 2 ++ 2 files changed, 14 insertions(+), 26 deletions(-) diff --git a/graphite_beacon/alerts.py b/graphite_beacon/alerts.py index 4465895..23b2ca0 100644 --- a/graphite_beacon/alerts.py +++ b/graphite_beacon/alerts.py @@ -41,29 +41,14 @@ def __getitem__(self, index): return type(self)(islice(self, index.start, index.stop, index.step)) -def synchronized(lock): - """ Synchronization decorator. """ - - def wrap(f): - def newFunction(*args, **kw): - lock.acquire() - try: - return f(*args, **kw) - finally: - lock.release() - return newFunction - return wrap - - def is_cron(expr): """Detect if an expression is a valid cron expression.""" - return not(expr is None or len(expr.split()) < 5) + return expr is not None and len(expr.split()) >= 5 class CronCallback(object): """Callback that runs on a cron schedule.""" - lock = Lock() def __init__(self, callback, cron): """Initialize a CronCallback object with the specified cron schedule and callback.""" @@ -72,43 +57,44 @@ def __init__(self, callback, cron): self.is_running = False self.handle = None - @synchronized(lock) def start(self): """Start running.""" if not self.is_running: self.is_running = True self.schedule_next_run() - @synchronized(lock) def stop(self): """Stop running.""" if self.is_running: handle = self.handle self.is_running = False if handle: - ioloop.remove_timeout(handle) + ioloop.IOLoop.instance().remove_timeout(handle) self.handle = None - @synchronized(lock) def is_running(self): """Is running.""" return self.is_running - @synchronized(lock) def scheduled_run(self): """Invoke the callback and schedule the next run.""" if self.is_running: - self.callback() - self.schedule_next_run() + LOGGER.debug("running cron schedule") + try: + self.callback() + finally: + self.schedule_next_run() - @synchronized(lock) def schedule_next_run(self): """Schedule the next run of this callback.""" if self.is_running: now = datetime.now() - while not next or next <= now: + next = self.cron.get_next(datetime) + while next <= now: next = self.cron.get_next(datetime) - self.handle = ioloop.call_later((next-now).total_seconds(), self.scheduled_run) + LOGGER.debug("now: %s", now) + LOGGER.debug("next: %s", next) + self.handle = ioloop.IOLoop.instance().call_later((next-now).total_seconds(), self.scheduled_run) class AlertFabric(type): diff --git a/tests.py b/tests.py index 9fe2038..19f94ec 100644 --- a/tests.py +++ b/tests.py @@ -78,6 +78,8 @@ def test_alert(reactor): alert4 = BaseAlert.get(reactor, name='Test', query='*', interval='*/10 09-18 * * 1-5', time_window='1h', rules=["normal: == 0"]) assert alert4.interval == '*/10 09-18 * * 1-5' + alert4.start() + alert4.stop() alert = BaseAlert.get(reactor, name='Test', query='*', rules=["warning: >= 3MB"]) assert alert.rules[0]['exprs'][0]['value'] == 3145728 From cc56f282eee8c323e9e78f3deb4971c2cdf2ecd3 Mon Sep 17 00:00:00 2001 From: john Date: Fri, 4 Dec 2015 12:27:13 -0800 Subject: [PATCH 3/9] enforce time_window on cron schedules --- graphite_beacon/alerts.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/graphite_beacon/alerts.py b/graphite_beacon/alerts.py index 23b2ca0..76775a5 100644 --- a/graphite_beacon/alerts.py +++ b/graphite_beacon/alerts.py @@ -185,20 +185,23 @@ def configure(self, name=None, rules=None, query=None, **options): self.loading_error = options.get('loading_error', self.reactor.options['loading_error']) interval = options.get('interval', self.reactor.options['interval']) + time_window = options.get('time_window', None) if is_cron(interval): self.interval = interval - self.time_window = interval_to_graphite(options.get('time_window', None)) - self.time_window = interval_to_graphite( - options.get('time_window', self.reactor.options['interval'])) + assert time_window, "%s: Must supply time_window for cron scheduled alerts" % self.name + self.time_window = interval_to_graphite(time_window) if self.reactor.options.get('debug'): self.callback = ioloop.PeriodicCallback(self.load, 5000) else: self.callback = CronCallback(self.load, interval) else: self.interval = interval_to_graphite(interval) - self.time_window = interval_to_graphite(options.get('time_window', interval)) interval = parse_interval(self.interval) + if time_window: + self.time_window = interval_to_graphite(time_window) + else: + self.time_window = interval_to_graphite(self.interval) self.history_size = int(math.ceil(self.history_size / interval)) if self.reactor.options.get('debug'): self.callback = ioloop.PeriodicCallback(self.load, 5000) From 83282bab0d77c4d61e2e4ddce1f3be71965d888f Mon Sep 17 00:00:00 2001 From: john Date: Fri, 4 Dec 2015 15:40:36 -0800 Subject: [PATCH 4/9] more explicit check --- graphite_beacon/alerts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphite_beacon/alerts.py b/graphite_beacon/alerts.py index 76775a5..1ce2b06 100644 --- a/graphite_beacon/alerts.py +++ b/graphite_beacon/alerts.py @@ -43,7 +43,7 @@ def __getitem__(self, index): def is_cron(expr): """Detect if an expression is a valid cron expression.""" - return expr is not None and len(expr.split()) >= 5 + return expr is not None and len(expr.split()) == 5 class CronCallback(object): From 23f51f5817c7ef60c73d1d89ac0bad7ba3cd888e Mon Sep 17 00:00:00 2001 From: john Date: Tue, 8 Dec 2015 13:08:57 -0800 Subject: [PATCH 5/9] fix for python 2.6 --- graphite_beacon/alerts.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/graphite_beacon/alerts.py b/graphite_beacon/alerts.py index 1ce2b06..3b9fc3a 100644 --- a/graphite_beacon/alerts.py +++ b/graphite_beacon/alerts.py @@ -94,7 +94,9 @@ def schedule_next_run(self): next = self.cron.get_next(datetime) LOGGER.debug("now: %s", now) LOGGER.debug("next: %s", next) - self.handle = ioloop.IOLoop.instance().call_later((next-now).total_seconds(), self.scheduled_run) + td = next - now + total_seconds = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6 + self.handle = ioloop.IOLoop.instance().call_later(total_seconds, self.scheduled_run) class AlertFabric(type): From cff148fb6600f306469fb51f9a6c5f4ce171eb12 Mon Sep 17 00:00:00 2001 From: john Date: Wed, 19 Oct 2016 00:20:39 -0700 Subject: [PATCH 6/9] implement history alerts --- graphite_beacon/alerts.py | 74 +++++++++++++++++++++++---------------- tests.py | 33 +++++++++++------ 2 files changed, 66 insertions(+), 41 deletions(-) diff --git a/graphite_beacon/alerts.py b/graphite_beacon/alerts.py index 3b9fc3a..dd1bc43 100644 --- a/graphite_beacon/alerts.py +++ b/graphite_beacon/alerts.py @@ -16,8 +16,7 @@ from collections import deque, defaultdict from itertools import islice from croniter import croniter -from datetime import datetime -from threading import Lock +from datetime import datetime, timedelta LOGGER = log.gen_log @@ -41,11 +40,6 @@ def __getitem__(self, index): return type(self)(islice(self, index.start, index.stop, index.step)) -def is_cron(expr): - """Detect if an expression is a valid cron expression.""" - return expr is not None and len(expr.split()) == 5 - - class CronCallback(object): """Callback that runs on a cron schedule.""" @@ -53,7 +47,7 @@ class CronCallback(object): def __init__(self, callback, cron): """Initialize a CronCallback object with the specified cron schedule and callback.""" self.callback = callback - self.cron = croniter(cron) + self.cron = cron self.is_running = False self.handle = None @@ -79,7 +73,7 @@ def is_running(self): def scheduled_run(self): """Invoke the callback and schedule the next run.""" if self.is_running: - LOGGER.debug("running cron schedule") + LOGGER.debug("CronCallback: running cron schedule") try: self.callback() finally: @@ -89,12 +83,12 @@ def schedule_next_run(self): """Schedule the next run of this callback.""" if self.is_running: now = datetime.now() - next = self.cron.get_next(datetime) - while next <= now: - next = self.cron.get_next(datetime) - LOGGER.debug("now: %s", now) - LOGGER.debug("next: %s", next) - td = next - now + next_time = self.cron.get_next(datetime) + while next_time <= now: + next_time = self.cron.get_next(datetime) + LOGGER.debug("CronCallback: now: %s", now) + LOGGER.debug("CronCallback: next_time: %s", next_time) + td = next_time - now total_seconds = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6 self.handle = ioloop.IOLoop.instance().call_later(total_seconds, self.scheduled_run) @@ -140,7 +134,8 @@ def __init__(self, reactor, **options): self.waiting = False self.state = {None: "normal", "waiting": "normal", "loading": "normal"} - self.history = defaultdict(lambda: sliceable_deque([], self.history_size)) + self.history = defaultdict(lambda: sliceable_deque([])) + self.history_times = defaultdict(lambda: sliceable_deque([])) LOGGER.info("Alert '%s': has inited", self) @@ -156,6 +151,10 @@ def __str__(self): """String representation.""" return "%s (%s)" % (self.name, self.interval) + def is_cron(self): + """Detect if an expression is a valid cron expression.""" + return len(self.interval.split()) in [5, 6] + def configure(self, name=None, rules=None, query=None, **options): """Configure the alert.""" self.name = name @@ -182,29 +181,34 @@ def configure(self, name=None, rules=None, query=None, **options): self.history_size = options.get('history_size', self.reactor.options['history_size']) self.history_size = parse_interval(self.history_size) + self.history_size = timedelta(milliseconds=self.history_size) self.no_data = options.get('no_data', self.reactor.options['no_data']) self.loading_error = options.get('loading_error', self.reactor.options['loading_error']) - interval = options.get('interval', self.reactor.options['interval']) + self.interval = options.get('interval', self.reactor.options['interval']) time_window = options.get('time_window', None) - if is_cron(interval): - self.interval = interval + if self.is_cron(): + try: + cron = croniter(self.interval) + except Exception as e: + """Raise error if we failed parsing the cron interval""" + LOGGER.exception(e) + raise ValueError("Invalid cron expression '%s': %s" % (self.interval, e)) assert time_window, "%s: Must supply time_window for cron scheduled alerts" % self.name self.time_window = interval_to_graphite(time_window) if self.reactor.options.get('debug'): self.callback = ioloop.PeriodicCallback(self.load, 5000) else: - self.callback = CronCallback(self.load, interval) + self.callback = CronCallback(self.load, cron) else: - self.interval = interval_to_graphite(interval) + self.interval = interval_to_graphite(self.interval) interval = parse_interval(self.interval) if time_window: self.time_window = interval_to_graphite(time_window) else: self.time_window = interval_to_graphite(self.interval) - self.history_size = int(math.ceil(self.history_size / interval)) if self.reactor.options.get('debug'): self.callback = ioloop.PeriodicCallback(self.load, 5000) else: @@ -233,7 +237,7 @@ def stop(self): self.callback.stop() return self - def check(self, records): + def check(self, records, now=datetime.now()): """Check current value.""" for value, target in records: LOGGER.info("%s [%s]: %s", self.name, target, value) @@ -241,20 +245,28 @@ def check(self, records): self.notify(self.no_data, value, target) continue for rule in self.rules: - if self.evaluate_rule(rule, value, target): + if self.evaluate_rule(rule, value, target, now): self.notify(rule['level'], value, target, rule=rule) break else: self.notify('normal', value, target, rule=rule) - self.history[target].append(value) - - def evaluate_rule(self, rule, value, target): + history = self.history[target] + history.append(value) + history_times = self.history_times[target] + history_times.append(now) + history_threshold = now - self.history_size + """Remove historical values older than history_size""" + while len(history_times) > 0 and history_times[0] <= history_threshold: + history.popleft() + history_times.popleft() + + def evaluate_rule(self, rule, value, target, now): """Calculate the value.""" def evaluate(expr): if expr in LOGICAL_OPERATORS.values(): return expr - rvalue = self.get_value_for_expr(expr, target) + rvalue = self.get_value_for_expr(expr, target, now) if rvalue is None: return False # ignore this result return expr['op'](value, rvalue) @@ -266,14 +278,16 @@ def evaluate(expr): return evaluated[0] - def get_value_for_expr(self, expr, target): + def get_value_for_expr(self, expr, target, now=datetime.now()): """I have no idea.""" if expr in LOGICAL_OPERATORS.values(): return None rvalue = expr['value'] if rvalue == HISTORICAL: history = self.history[target] - if len(history) < self.history_size: + history_times = self.history_times[target] + """Don't return a historical value if the history buffer is not full""" + if len(history_times) < 1 or history_times[0] + self.history_size > now: return None rvalue = sum(history) / float(len(history)) diff --git a/tests.py b/tests.py index fc5ecb4..04be115 100644 --- a/tests.py +++ b/tests.py @@ -4,6 +4,21 @@ import pytest import mock +from graphite_beacon.alerts import BaseAlert +from graphite_beacon.utils import parse_interval +from datetime import datetime, timedelta + +"""Make every call to check() elapse the alert interval (or 10 minutes)""" +def check(alert, records, now=datetime.now()): + if not alert.is_cron(): + alert.delta = timedelta(milliseconds=parse_interval(alert.interval)) + alert._check(records, alert.time) + alert.time = alert.time + alert.delta + +BaseAlert.time = datetime.now() +BaseAlert.delta = timedelta(minutes=10) +BaseAlert._check = BaseAlert.check +BaseAlert.check = check @pytest.fixture def reactor(): @@ -59,7 +74,7 @@ def test_public_graphite_url(): def test_alert(reactor): - from graphite_beacon.alerts import BaseAlert, GraphiteAlert, URLAlert + from graphite_beacon.alerts import GraphiteAlert, URLAlert alert1 = BaseAlert.get(reactor, name='Test', query='*', rules=["normal: == 0"]) assert alert1 @@ -86,8 +101,6 @@ def test_alert(reactor): def test_multimetrics(reactor): - from graphite_beacon.alerts import BaseAlert - alert = BaseAlert.get( reactor, name="Test", query="*", rules=[ "critical: > 100", "warning: > 50", "warning: < historical / 2"]) @@ -155,16 +168,17 @@ def test_multimetrics(reactor): def test_multiexpressions(reactor): - from graphite_beacon.alerts import BaseAlert - alert = BaseAlert.get( reactor, name="Test", query="*", rules=["warning: > historical * 1.05 AND > 70"]) reactor.alerts = set([alert]) with mock.patch.object(reactor, 'notify'): - alert.check([ - (50, 'metric1'), (65, 'metric1'), (85, 'metric1'), (65, 'metric1'), - (68, 'metric1'), (75, 'metric1')]) + alert.check([(50, 'metric1')]) + alert.check([(65, 'metric1')]) + alert.check([(85, 'metric1')]) + alert.check([(65, 'metric1')]) + alert.check([(68, 'metric1')]) + alert.check([(75, 'metric1')]) assert reactor.notify.call_count == 1 @@ -218,8 +232,6 @@ def test_convert(): def test_parse_interval(): - from graphite_beacon.utils import parse_interval - assert parse_interval(10) == 10000.0 assert parse_interval('10') == 10000.0 assert parse_interval('15s') == 15000.0 @@ -275,7 +287,6 @@ def test_parse_rule(): def test_html_template(reactor): from graphite_beacon.handlers.smtp import SMTPHandler - from graphite_beacon.alerts import BaseAlert target = 'node.com' galert = BaseAlert.get(reactor, name='Test', query='*', rules=["normal: == 0"]) From 267a996c6c7f4d3bfb971c1e3e0c489a4e9a1bcb Mon Sep 17 00:00:00 2001 From: john Date: Wed, 19 Oct 2016 00:45:09 -0700 Subject: [PATCH 7/9] update travis config for new pip version --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index d11b035..2c4a800 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,6 +19,6 @@ script: tox after_script: - if [ $TOXENV == "cov" ]; then - pip install --quiet --use-mirrors coveralls; + pip install --quiet coveralls; coveralls; fi From 32e3c755844d3219dce05a4a24c7088a6c641a9b Mon Sep 17 00:00:00 2001 From: john Date: Wed, 19 Oct 2016 00:48:59 -0700 Subject: [PATCH 8/9] update travis config for new pip version --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 2c4a800..8c67aa0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,7 +13,7 @@ branches: - master - develop -install: pip install --quiet --use-mirrors tox +install: pip install --quiet tox script: tox From f36744cecd244ebaf0522f9532d0f61458e0790a Mon Sep 17 00:00:00 2001 From: john Date: Wed, 19 Oct 2016 01:10:29 -0700 Subject: [PATCH 9/9] update readme --- README.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/README.md b/README.md index 0c045b6..58a7771 100644 --- a/README.md +++ b/README.md @@ -248,6 +248,32 @@ Currently two types of alerts are supported: ] ``` +##### Cron expression intervals + +In addition to checking metrics at a fixed time interval, graphite-beacon also +supports cron expressions. + +See the below example for how to check a metric every 20 minutes between 8am and 5pm on weekdays: + +```js +alerts: [ + { + "name": "Cron-based alert", + // the cron expression + "interval": "*/20 8-17 * * 1-5", + "query": "Your graphite query here", + "rules": [ + "warning: 10", + "critical: 20" + ] + } +], +``` + +For more information about cron expressions, see https://en.wikipedia.org/wiki/Cron#CRON_expression. + +To build and test cron expressions, try http://crontab.guru. + ##### Historical values graphite-beacon supports "historical" values for a rule.