Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for interval cron expressions #93

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ branches:
- master
- develop

install: pip install --quiet --use-mirrors tox
install: pip install --quiet tox

script: tox

after_script:
- if [ $TOXENV == "cov" ]; then
pip install --quiet --use-mirrors coveralls;
pip install --quiet coveralls;
coveralls;
fi
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,32 @@ Currently two types of alerts are supported:
]
```

##### Cron expression intervals

In addition to checking metrics at a fixed time interval, graphite-beacon also
supports cron expressions.

See the below example for how to check a metric every 20 minutes between 8am and 5pm on weekdays:

```js
alerts: [
{
"name": "Cron-based alert",
// the cron expression
"interval": "*/20 8-17 * * 1-5",
"query": "Your graphite query here",
"rules": [
"warning: 10",
"critical: 20"
]
}
],
```

For more information about cron expressions, see https://en.wikipedia.org/wiki/Cron#CRON_expression.

To build and test cron expressions, try http://crontab.guru.

##### Historical values

graphite-beacon supports "historical" values for a rule.
Expand Down
126 changes: 106 additions & 20 deletions graphite_beacon/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import math
from collections import deque, defaultdict
from itertools import islice
from croniter import croniter
from datetime import datetime, timedelta


LOGGER = log.gen_log
Expand All @@ -38,6 +40,59 @@ def __getitem__(self, index):
return type(self)(islice(self, index.start, index.stop, index.step))


class CronCallback(object):

"""Callback that runs on a cron schedule."""

def __init__(self, callback, cron):
"""Initialize a CronCallback object with the specified cron schedule and callback."""
self.callback = callback
self.cron = cron
self.is_running = False
self.handle = None

def start(self):
"""Start running."""
if not self.is_running:
self.is_running = True
self.schedule_next_run()

def stop(self):
"""Stop running."""
if self.is_running:
handle = self.handle
self.is_running = False
if handle:
ioloop.IOLoop.instance().remove_timeout(handle)
self.handle = None

def is_running(self):
"""Is running."""
return self.is_running
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use the same name for the function and the variable here


def scheduled_run(self):
"""Invoke the callback and schedule the next run."""
if self.is_running:
LOGGER.debug("CronCallback: running cron schedule")
try:
self.callback()
finally:
self.schedule_next_run()

def schedule_next_run(self):
"""Schedule the next run of this callback."""
if self.is_running:
now = datetime.now()
next_time = self.cron.get_next(datetime)
while next_time <= now:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's worth having a limit on how far this will search before erroring/failing early?

next_time = self.cron.get_next(datetime)
LOGGER.debug("CronCallback: now: %s", now)
LOGGER.debug("CronCallback: next_time: %s", next_time)
td = next_time - now
total_seconds = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
self.handle = ioloop.IOLoop.instance().call_later(total_seconds, self.scheduled_run)


class AlertFabric(type):

"""Register alert's classes and produce an alert by source."""
Expand Down Expand Up @@ -79,7 +134,8 @@ def __init__(self, reactor, **options):

self.waiting = False
self.state = {None: "normal", "waiting": "normal", "loading": "normal"}
self.history = defaultdict(lambda: sliceable_deque([], self.history_size))
self.history = defaultdict(lambda: sliceable_deque([]))
self.history_times = defaultdict(lambda: sliceable_deque([]))

LOGGER.info("Alert '%s': has inited", self)

Expand All @@ -95,6 +151,10 @@ def __str__(self):
"""String representation."""
return "%s (%s)" % (self.name, self.interval)

def is_cron(self):
"""Detect if an expression is a valid cron expression."""
return len(self.interval.split()) in [5, 6]

def configure(self, name=None, rules=None, query=None, **options):
"""Configure the alert."""
self.name = name
Expand All @@ -109,13 +169,6 @@ def configure(self, name=None, rules=None, query=None, **options):
assert query, "%s: Alert's query is invalid" % self.name
self.query = query

self.interval = interval_to_graphite(
options.get('interval', self.reactor.options['interval']))
interval = parse_interval(self.interval)

self.time_window = interval_to_graphite(
options.get('time_window', options.get('interval', self.reactor.options['interval'])))

self.until = interval_to_graphite(
options.get('until', self.reactor.options['until'])
)
Expand All @@ -128,15 +181,38 @@ def configure(self, name=None, rules=None, query=None, **options):

self.history_size = options.get('history_size', self.reactor.options['history_size'])
self.history_size = parse_interval(self.history_size)
self.history_size = int(math.ceil(self.history_size / interval))
self.history_size = timedelta(milliseconds=self.history_size)

self.no_data = options.get('no_data', self.reactor.options['no_data'])
self.loading_error = options.get('loading_error', self.reactor.options['loading_error'])

if self.reactor.options.get('debug'):
self.callback = ioloop.PeriodicCallback(self.load, 5000)
self.interval = options.get('interval', self.reactor.options['interval'])
time_window = options.get('time_window', None)

if self.is_cron():
try:
cron = croniter(self.interval)
except Exception as e:
"""Raise error if we failed parsing the cron interval"""
LOGGER.exception(e)
raise ValueError("Invalid cron expression '%s': %s" % (self.interval, e))
assert time_window, "%s: Must supply time_window for cron scheduled alerts" % self.name
self.time_window = interval_to_graphite(time_window)
if self.reactor.options.get('debug'):
self.callback = ioloop.PeriodicCallback(self.load, 5000)
else:
self.callback = CronCallback(self.load, cron)
else:
self.callback = ioloop.PeriodicCallback(self.load, interval)
self.interval = interval_to_graphite(self.interval)
interval = parse_interval(self.interval)
if time_window:
self.time_window = interval_to_graphite(time_window)
else:
self.time_window = interval_to_graphite(self.interval)
if self.reactor.options.get('debug'):
self.callback = ioloop.PeriodicCallback(self.load, 5000)
else:
self.callback = ioloop.PeriodicCallback(self.load, interval)

def convert(self, value):
"""Convert self value."""
Expand All @@ -161,28 +237,36 @@ def stop(self):
self.callback.stop()
return self

def check(self, records):
def check(self, records, now=datetime.now()):
"""Check current value."""
for value, target in records:
LOGGER.info("%s [%s]: %s", self.name, target, value)
if value is None:
self.notify(self.no_data, value, target)
continue
for rule in self.rules:
if self.evaluate_rule(rule, value, target):
if self.evaluate_rule(rule, value, target, now):
self.notify(rule['level'], value, target, rule=rule)
break
else:
self.notify('normal', value, target, rule=rule)

self.history[target].append(value)

def evaluate_rule(self, rule, value, target):
history = self.history[target]
history.append(value)
history_times = self.history_times[target]
history_times.append(now)
history_threshold = now - self.history_size
"""Remove historical values older than history_size"""
while len(history_times) > 0 and history_times[0] <= history_threshold:
history.popleft()
history_times.popleft()

def evaluate_rule(self, rule, value, target, now):
"""Calculate the value."""
def evaluate(expr):
if expr in LOGICAL_OPERATORS.values():
return expr
rvalue = self.get_value_for_expr(expr, target)
rvalue = self.get_value_for_expr(expr, target, now)
if rvalue is None:
return False # ignore this result
return expr['op'](value, rvalue)
Expand All @@ -194,14 +278,16 @@ def evaluate(expr):

return evaluated[0]

def get_value_for_expr(self, expr, target):
def get_value_for_expr(self, expr, target, now=datetime.now()):
"""I have no idea."""
if expr in LOGICAL_OPERATORS.values():
return None
rvalue = expr['value']
if rvalue == HISTORICAL:
history = self.history[target]
if len(history) < self.history_size:
history_times = self.history_times[target]
"""Don't return a historical value if the history buffer is not full"""
if len(history_times) < 1 or history_times[0] + self.history_size > now:
return None
rvalue = sum(history) / float(len(history))

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
tornado == 4.1.0
funcparserlib==0.3.6
croniter==0.3.10
38 changes: 27 additions & 11 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@
import pytest
import mock

from graphite_beacon.alerts import BaseAlert
from graphite_beacon.utils import parse_interval
from datetime import datetime, timedelta

"""Make every call to check() elapse the alert interval (or 10 minutes)"""
def check(alert, records, now=datetime.now()):
if not alert.is_cron():
alert.delta = timedelta(milliseconds=parse_interval(alert.interval))
alert._check(records, alert.time)
alert.time = alert.time + alert.delta

BaseAlert.time = datetime.now()
BaseAlert.delta = timedelta(minutes=10)
BaseAlert._check = BaseAlert.check
BaseAlert.check = check

@pytest.fixture
def reactor():
Expand Down Expand Up @@ -59,7 +74,7 @@ def test_public_graphite_url():


def test_alert(reactor):
from graphite_beacon.alerts import BaseAlert, GraphiteAlert, URLAlert
from graphite_beacon.alerts import GraphiteAlert, URLAlert

alert1 = BaseAlert.get(reactor, name='Test', query='*', rules=["normal: == 0"])
assert alert1
Expand All @@ -76,13 +91,16 @@ def test_alert(reactor):
assert alert1 == alert3
assert set([alert1, alert3]) == set([alert1])

alert4 = BaseAlert.get(reactor, name='Test', query='*', interval='*/10 09-18 * * 1-5', time_window='1h', rules=["normal: == 0"])
assert alert4.interval == '*/10 09-18 * * 1-5'
alert4.start()
alert4.stop()

alert = BaseAlert.get(reactor, name='Test', query='*', rules=["warning: >= 3MB"])
assert alert.rules[0]['exprs'][0]['value'] == 3145728


def test_multimetrics(reactor):
from graphite_beacon.alerts import BaseAlert

alert = BaseAlert.get(
reactor, name="Test", query="*", rules=[
"critical: > 100", "warning: > 50", "warning: < historical / 2"])
Expand Down Expand Up @@ -150,16 +168,17 @@ def test_multimetrics(reactor):


def test_multiexpressions(reactor):
from graphite_beacon.alerts import BaseAlert

alert = BaseAlert.get(
reactor, name="Test", query="*", rules=["warning: > historical * 1.05 AND > 70"])
reactor.alerts = set([alert])

with mock.patch.object(reactor, 'notify'):
alert.check([
(50, 'metric1'), (65, 'metric1'), (85, 'metric1'), (65, 'metric1'),
(68, 'metric1'), (75, 'metric1')])
alert.check([(50, 'metric1')])
alert.check([(65, 'metric1')])
alert.check([(85, 'metric1')])
alert.check([(65, 'metric1')])
alert.check([(68, 'metric1')])
alert.check([(75, 'metric1')])

assert reactor.notify.call_count == 1

Expand Down Expand Up @@ -213,8 +232,6 @@ def test_convert():


def test_parse_interval():
from graphite_beacon.utils import parse_interval

assert parse_interval(10) == 10000.0
assert parse_interval('10') == 10000.0
assert parse_interval('15s') == 15000.0
Expand Down Expand Up @@ -270,7 +287,6 @@ def test_parse_rule():

def test_html_template(reactor):
from graphite_beacon.handlers.smtp import SMTPHandler
from graphite_beacon.alerts import BaseAlert

target = 'node.com'
galert = BaseAlert.get(reactor, name='Test', query='*', rules=["normal: == 0"])
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ deps =
mock
pytest
tornado
croniter

[testenv:cov]
deps =
Expand Down