-
Notifications
You must be signed in to change notification settings - Fork 118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for interval cron expressions #93
Open
jbrody1
wants to merge
11
commits into
klen:develop
Choose a base branch
from
jbrody1:develop
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
80ec17a
add support for cron expressions
john-speakeasy e49ef9a
fix cron execution
john-speakeasy cc56f28
enforce time_window on cron schedules
john-speakeasy 83282ba
more explicit check
john-speakeasy 23f51f5
fix for python 2.6
john-speakeasy 9ac77f7
Merge remote-tracking branch 'upstream/develop' into develop
john-speakeasy cff148f
implement history alerts
john-speakeasy 86f0d1a
Merge remote-tracking branch 'upstream/develop' into develop
john-speakeasy 267a996
update travis config for new pip version
john-speakeasy 32e3c75
update travis config for new pip version
john-speakeasy f36744c
update readme
john-speakeasy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,8 @@ | |
import math | ||
from collections import deque, defaultdict | ||
from itertools import islice | ||
from croniter import croniter | ||
from datetime import datetime, timedelta | ||
|
||
|
||
LOGGER = log.gen_log | ||
|
@@ -38,6 +40,59 @@ def __getitem__(self, index): | |
return type(self)(islice(self, index.start, index.stop, index.step)) | ||
|
||
|
||
class CronCallback(object): | ||
|
||
"""Callback that runs on a cron schedule.""" | ||
|
||
def __init__(self, callback, cron): | ||
"""Initialize a CronCallback object with the specified cron schedule and callback.""" | ||
self.callback = callback | ||
self.cron = cron | ||
self.is_running = False | ||
self.handle = None | ||
|
||
def start(self): | ||
"""Start running.""" | ||
if not self.is_running: | ||
self.is_running = True | ||
self.schedule_next_run() | ||
|
||
def stop(self): | ||
"""Stop running.""" | ||
if self.is_running: | ||
handle = self.handle | ||
self.is_running = False | ||
if handle: | ||
ioloop.IOLoop.instance().remove_timeout(handle) | ||
self.handle = None | ||
|
||
def is_running(self): | ||
"""Is running.""" | ||
return self.is_running | ||
|
||
def scheduled_run(self): | ||
"""Invoke the callback and schedule the next run.""" | ||
if self.is_running: | ||
LOGGER.debug("CronCallback: running cron schedule") | ||
try: | ||
self.callback() | ||
finally: | ||
self.schedule_next_run() | ||
|
||
def schedule_next_run(self): | ||
"""Schedule the next run of this callback.""" | ||
if self.is_running: | ||
now = datetime.now() | ||
next_time = self.cron.get_next(datetime) | ||
while next_time <= now: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think it's worth having a limit on how far this will search before erroring/failing early? |
||
next_time = self.cron.get_next(datetime) | ||
LOGGER.debug("CronCallback: now: %s", now) | ||
LOGGER.debug("CronCallback: next_time: %s", next_time) | ||
td = next_time - now | ||
total_seconds = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6 | ||
self.handle = ioloop.IOLoop.instance().call_later(total_seconds, self.scheduled_run) | ||
|
||
|
||
class AlertFabric(type): | ||
|
||
"""Register alert's classes and produce an alert by source.""" | ||
|
@@ -79,7 +134,8 @@ def __init__(self, reactor, **options): | |
|
||
self.waiting = False | ||
self.state = {None: "normal", "waiting": "normal", "loading": "normal"} | ||
self.history = defaultdict(lambda: sliceable_deque([], self.history_size)) | ||
self.history = defaultdict(lambda: sliceable_deque([])) | ||
self.history_times = defaultdict(lambda: sliceable_deque([])) | ||
|
||
LOGGER.info("Alert '%s': has inited", self) | ||
|
||
|
@@ -95,6 +151,10 @@ def __str__(self): | |
"""String representation.""" | ||
return "%s (%s)" % (self.name, self.interval) | ||
|
||
def is_cron(self): | ||
"""Detect if an expression is a valid cron expression.""" | ||
return len(self.interval.split()) in [5, 6] | ||
|
||
def configure(self, name=None, rules=None, query=None, **options): | ||
"""Configure the alert.""" | ||
self.name = name | ||
|
@@ -109,13 +169,6 @@ def configure(self, name=None, rules=None, query=None, **options): | |
assert query, "%s: Alert's query is invalid" % self.name | ||
self.query = query | ||
|
||
self.interval = interval_to_graphite( | ||
options.get('interval', self.reactor.options['interval'])) | ||
interval = parse_interval(self.interval) | ||
|
||
self.time_window = interval_to_graphite( | ||
options.get('time_window', options.get('interval', self.reactor.options['interval']))) | ||
|
||
self.until = interval_to_graphite( | ||
options.get('until', self.reactor.options['until']) | ||
) | ||
|
@@ -128,15 +181,38 @@ def configure(self, name=None, rules=None, query=None, **options): | |
|
||
self.history_size = options.get('history_size', self.reactor.options['history_size']) | ||
self.history_size = parse_interval(self.history_size) | ||
self.history_size = int(math.ceil(self.history_size / interval)) | ||
self.history_size = timedelta(milliseconds=self.history_size) | ||
|
||
self.no_data = options.get('no_data', self.reactor.options['no_data']) | ||
self.loading_error = options.get('loading_error', self.reactor.options['loading_error']) | ||
|
||
if self.reactor.options.get('debug'): | ||
self.callback = ioloop.PeriodicCallback(self.load, 5000) | ||
self.interval = options.get('interval', self.reactor.options['interval']) | ||
time_window = options.get('time_window', None) | ||
|
||
if self.is_cron(): | ||
try: | ||
cron = croniter(self.interval) | ||
except Exception as e: | ||
"""Raise error if we failed parsing the cron interval""" | ||
LOGGER.exception(e) | ||
raise ValueError("Invalid cron expression '%s': %s" % (self.interval, e)) | ||
assert time_window, "%s: Must supply time_window for cron scheduled alerts" % self.name | ||
self.time_window = interval_to_graphite(time_window) | ||
if self.reactor.options.get('debug'): | ||
self.callback = ioloop.PeriodicCallback(self.load, 5000) | ||
else: | ||
self.callback = CronCallback(self.load, cron) | ||
else: | ||
self.callback = ioloop.PeriodicCallback(self.load, interval) | ||
self.interval = interval_to_graphite(self.interval) | ||
interval = parse_interval(self.interval) | ||
if time_window: | ||
self.time_window = interval_to_graphite(time_window) | ||
else: | ||
self.time_window = interval_to_graphite(self.interval) | ||
if self.reactor.options.get('debug'): | ||
self.callback = ioloop.PeriodicCallback(self.load, 5000) | ||
else: | ||
self.callback = ioloop.PeriodicCallback(self.load, interval) | ||
|
||
def convert(self, value): | ||
"""Convert self value.""" | ||
|
@@ -161,28 +237,36 @@ def stop(self): | |
self.callback.stop() | ||
return self | ||
|
||
def check(self, records): | ||
def check(self, records, now=datetime.now()): | ||
"""Check current value.""" | ||
for value, target in records: | ||
LOGGER.info("%s [%s]: %s", self.name, target, value) | ||
if value is None: | ||
self.notify(self.no_data, value, target) | ||
continue | ||
for rule in self.rules: | ||
if self.evaluate_rule(rule, value, target): | ||
if self.evaluate_rule(rule, value, target, now): | ||
self.notify(rule['level'], value, target, rule=rule) | ||
break | ||
else: | ||
self.notify('normal', value, target, rule=rule) | ||
|
||
self.history[target].append(value) | ||
|
||
def evaluate_rule(self, rule, value, target): | ||
history = self.history[target] | ||
history.append(value) | ||
history_times = self.history_times[target] | ||
history_times.append(now) | ||
history_threshold = now - self.history_size | ||
"""Remove historical values older than history_size""" | ||
while len(history_times) > 0 and history_times[0] <= history_threshold: | ||
history.popleft() | ||
history_times.popleft() | ||
|
||
def evaluate_rule(self, rule, value, target, now): | ||
"""Calculate the value.""" | ||
def evaluate(expr): | ||
if expr in LOGICAL_OPERATORS.values(): | ||
return expr | ||
rvalue = self.get_value_for_expr(expr, target) | ||
rvalue = self.get_value_for_expr(expr, target, now) | ||
if rvalue is None: | ||
return False # ignore this result | ||
return expr['op'](value, rvalue) | ||
|
@@ -194,14 +278,16 @@ def evaluate(expr): | |
|
||
return evaluated[0] | ||
|
||
def get_value_for_expr(self, expr, target): | ||
def get_value_for_expr(self, expr, target, now=datetime.now()): | ||
"""I have no idea.""" | ||
if expr in LOGICAL_OPERATORS.values(): | ||
return None | ||
rvalue = expr['value'] | ||
if rvalue == HISTORICAL: | ||
history = self.history[target] | ||
if len(history) < self.history_size: | ||
history_times = self.history_times[target] | ||
"""Don't return a historical value if the history buffer is not full""" | ||
if len(history_times) < 1 or history_times[0] + self.history_size > now: | ||
return None | ||
rvalue = sum(history) / float(len(history)) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
tornado == 4.1.0 | ||
funcparserlib==0.3.6 | ||
croniter==0.3.10 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ deps = | |
mock | ||
pytest | ||
tornado | ||
croniter | ||
|
||
[testenv:cov] | ||
deps = | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use the same name for the function and the variable here