diff --git a/migrations/versions/640888ce445d_.py b/migrations/versions/640888ce445d_.py new file mode 100644 index 0000000000..e33eee8d5f --- /dev/null +++ b/migrations/versions/640888ce445d_.py @@ -0,0 +1,107 @@ +""" +Add new scheduling data. + +Revision ID: 640888ce445d +Revises: 71477dadd6ef +Create Date: 2018-09-21 19:35:58.578796 +""" + +import json +from alembic import op +import sqlalchemy as sa +from sqlalchemy.sql import table + +from redash.models import MutableDict, PseudoJSON + + +# revision identifiers, used by Alembic. +revision = '640888ce445d' +down_revision = '71477dadd6ef' +branch_labels = None +depends_on = None + + +def upgrade(): + # Copy "schedule" column into "old_schedule" column + op.add_column('queries', sa.Column('old_schedule', sa.String(length=10), nullable=True)) + + queries = table( + 'queries', + sa.Column('schedule', sa.String(length=10)), + sa.Column('old_schedule', sa.String(length=10))) + + op.execute( + queries + .update() + .values({'old_schedule': queries.c.schedule})) + + # Recreate "schedule" column as a dict type + op.drop_column('queries', 'schedule') + op.add_column('queries', sa.Column('schedule', MutableDict.as_mutable(PseudoJSON), nullable=False, server_default=json.dumps({}))) + + # Move over values from old_schedule + queries = table( + 'queries', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('schedule', MutableDict.as_mutable(PseudoJSON)), + sa.Column('old_schedule', sa.String(length=10))) + + conn = op.get_bind() + for query in conn.execute(queries.select()): + schedule_json = { + 'interval': None, + 'until': None, + 'day_of_week': None, + 'time': None + } + + if query.old_schedule is not None: + if ":" in query.old_schedule: + schedule_json['interval'] = 86400 + schedule_json['time'] = query.old_schedule + else: + schedule_json['interval'] = query.old_schedule + + conn.execute( + queries + .update() + .where(queries.c.id == query.id) + .values(schedule=MutableDict(schedule_json))) + + op.drop_column('queries', 'old_schedule') + +def downgrade(): + op.add_column('queries', sa.Column('old_schedule', MutableDict.as_mutable(PseudoJSON), nullable=False, server_default=json.dumps({}))) + + queries = table( + 'queries', + sa.Column('schedule', MutableDict.as_mutable(PseudoJSON)), + sa.Column('old_schedule', MutableDict.as_mutable(PseudoJSON))) + + op.execute( + queries + .update() + .values({'old_schedule': queries.c.schedule})) + + op.drop_column('queries', 'schedule') + op.add_column('queries', sa.Column('schedule', sa.String(length=10), nullable=True)) + + queries = table( + 'queries', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('schedule', sa.String(length=10)), + sa.Column('old_schedule', MutableDict.as_mutable(PseudoJSON))) + + conn = op.get_bind() + for query in conn.execute(queries.select()): + scheduleValue = query.old_schedule['interval'] + if scheduleValue <= 86400: + scheduleValue = query.old_schedule['time'] + + conn.execute( + queries + .update() + .where(queries.c.id == query.id) + .values(schedule=scheduleValue)) + + op.drop_column('queries', 'old_schedule') diff --git a/redash/models.py b/redash/models.py index 44f96474ce..b6b004e9f0 100644 --- a/redash/models.py +++ b/redash/models.py @@ -1,11 +1,13 @@ import cStringIO import csv import datetime +import calendar import functools import hashlib import itertools import logging import time +import pytz from functools import reduce from six import python_2_unicode_compatible, string_types, text_type @@ -851,12 +853,14 @@ def make_excel_content(self): return s.getvalue() -def should_schedule_next(previous_iteration, now, schedule, failures): - if schedule.isdigit(): - ttl = int(schedule) +def should_schedule_next(previous_iteration, now, interval, time=None, day_of_week=None, failures=0): + # if time exists then interval > 23 hours (82800s) + # if day_of_week exists then interval > 6 days (518400s) + if (time is None): + ttl = int(interval) next_iteration = previous_iteration + datetime.timedelta(seconds=ttl) else: - hour, minute = schedule.split(':') + hour, minute = time.split(':') hour, minute = int(hour), int(minute) # The following logic is needed for cases like the following: @@ -864,10 +868,18 @@ def should_schedule_next(previous_iteration, now, schedule, failures): # - The scheduler wakes up at 00:01. # - Using naive implementation of comparing timestamps, it will skip the execution. normalized_previous_iteration = previous_iteration.replace(hour=hour, minute=minute) + if normalized_previous_iteration > previous_iteration: previous_iteration = normalized_previous_iteration - datetime.timedelta(days=1) - next_iteration = (previous_iteration + datetime.timedelta(days=1)).replace(hour=hour, minute=minute) + days_delay = int(interval) / 60 / 60 / 24 + + days_to_add = 0 + if (day_of_week is not None): + days_to_add = list(calendar.day_name).index(day_of_week) - normalized_previous_iteration.weekday() + + next_iteration = (previous_iteration + datetime.timedelta(days=days_delay) + + datetime.timedelta(days=days_to_add)).replace(hour=hour, minute=minute) if failures: next_iteration += datetime.timedelta(minutes=2**failures) return now > next_iteration @@ -895,7 +907,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model): foreign_keys=[last_modified_by_id]) is_archived = Column(db.Boolean, default=False, index=True) is_draft = Column(db.Boolean, default=True, index=True) - schedule = Column(db.String(10), nullable=True) + schedule = Column(MutableDict.as_mutable(PseudoJSON), nullable=True) schedule_failures = Column(db.Integer, default=0) visualizations = db.relationship("Visualization", cascade="all, delete-orphan") options = Column(MutableDict.as_mutable(PseudoJSON), default={}) @@ -917,7 +929,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model): def archive(self, user=None): db.session.add(self) self.is_archived = True - self.schedule = None + self.schedule = {} for vis in self.visualizations: for w in vis.widgets: @@ -1020,7 +1032,7 @@ def by_user(cls, user): def outdated_queries(cls): queries = (db.session.query(Query) .options(joinedload(Query.latest_query_data).load_only('retrieved_at')) - .filter(Query.schedule != None) + .filter(Query.schedule != {}) .order_by(Query.id)) now = utils.utcnow() @@ -1028,6 +1040,13 @@ def outdated_queries(cls): scheduled_queries_executions.refresh() for query in queries: + schedule_until = pytz.utc.localize(datetime.datetime.strptime( + query.schedule['until'], '%Y-%m-%d')) if query.schedule['until'] else None + if (query.schedule['interval'] == None or ( + schedule_until != None and ( + schedule_until <= now))): + continue + if query.latest_query_data: retrieved_at = query.latest_query_data.retrieved_at else: @@ -1035,7 +1054,8 @@ def outdated_queries(cls): retrieved_at = scheduled_queries_executions.get(query.id) or retrieved_at - if should_schedule_next(retrieved_at, now, query.schedule, query.schedule_failures): + if should_schedule_next(retrieved_at, now, query.schedule['interval'], query.schedule['time'], + query.schedule['day_of_week'], query.schedule_failures): key = "{}:{}".format(query.query_hash, query.data_source_id) outdated_queries[key] = query diff --git a/tests/factories.py b/tests/factories.py index 0b56ac016d..73a1c21bd5 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -75,7 +75,7 @@ def __call__(self): user=user_factory.create, is_archived=False, is_draft=False, - schedule=None, + schedule={}, data_source=data_source_factory.create, org_id=1) @@ -86,7 +86,7 @@ def __call__(self): user=user_factory.create, is_archived=False, is_draft=False, - schedule=None, + schedule={}, data_source=data_source_factory.create, org_id=1) diff --git a/tests/handlers/test_queries.py b/tests/handlers/test_queries.py index 8e2352553e..d4219365e2 100644 --- a/tests/handlers/test_queries.py +++ b/tests/handlers/test_queries.py @@ -168,7 +168,7 @@ def test_create_query(self): query_data = { 'name': 'Testing', 'query': 'SELECT 1', - 'schedule': "3600", + 'schedule': {"interval": "3600"}, 'data_source_id': self.factory.data_source.id } diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py index 501e0a49b2..17680a00a0 100644 --- a/tests/tasks/test_queries.py +++ b/tests/tasks/test_queries.py @@ -94,7 +94,7 @@ def test_success_scheduled(self): """ cm = mock.patch("celery.app.task.Context.delivery_info", {'routing_key': 'test'}) - q = self.factory.create_query(query_text="SELECT 1, 2", schedule=300) + q = self.factory.create_query(query_text="SELECT 1, 2", schedule={"interval": 300}) with cm, mock.patch.object(PostgreSQL, "run_query") as qr: qr.return_value = ([1, 2], None) result_id = execute_query( @@ -112,7 +112,7 @@ def test_failure_scheduled(self): """ cm = mock.patch("celery.app.task.Context.delivery_info", {'routing_key': 'test'}) - q = self.factory.create_query(query_text="SELECT 1, 2", schedule=300) + q = self.factory.create_query(query_text="SELECT 1, 2", schedule={"interval": 300}) with cm, mock.patch.object(PostgreSQL, "run_query") as qr: qr.side_effect = ValueError("broken") with self.assertRaises(QueryExecutionError): @@ -132,7 +132,7 @@ def test_success_after_failure(self): """ cm = mock.patch("celery.app.task.Context.delivery_info", {'routing_key': 'test'}) - q = self.factory.create_query(query_text="SELECT 1, 2", schedule=300) + q = self.factory.create_query(query_text="SELECT 1, 2", schedule={"interval": 300}) with cm, mock.patch.object(PostgreSQL, "run_query") as qr: qr.side_effect = ValueError("broken") with self.assertRaises(QueryExecutionError): diff --git a/tests/test_models.py b/tests/test_models.py index 5ccf6e4af0..314040694c 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,4 +1,5 @@ #encoding: utf8 +import calendar import datetime import json from unittest import TestCase @@ -32,58 +33,117 @@ class ShouldScheduleNextTest(TestCase): def test_interval_schedule_that_needs_reschedule(self): now = utcnow() two_hours_ago = now - datetime.timedelta(hours=2) - self.assertTrue(models.should_schedule_next(two_hours_ago, now, "3600", - 0)) + self.assertTrue(models.should_schedule_next(two_hours_ago, now, "3600")) def test_interval_schedule_that_doesnt_need_reschedule(self): now = utcnow() half_an_hour_ago = now - datetime.timedelta(minutes=30) - self.assertFalse(models.should_schedule_next(half_an_hour_ago, now, - "3600", 0)) + self.assertFalse(models.should_schedule_next(half_an_hour_ago, now, "3600")) def test_exact_time_that_needs_reschedule(self): now = utcnow() yesterday = now - datetime.timedelta(days=1) scheduled_datetime = now - datetime.timedelta(hours=3) scheduled_time = "{:02d}:00".format(scheduled_datetime.hour) - self.assertTrue(models.should_schedule_next(yesterday, now, - scheduled_time, 0)) + self.assertTrue(models.should_schedule_next(yesterday, now, "86400", + scheduled_time)) def test_exact_time_that_doesnt_need_reschedule(self): now = date_parse("2015-10-16 20:10") yesterday = date_parse("2015-10-15 23:07") schedule = "23:00" - self.assertFalse(models.should_schedule_next(yesterday, now, schedule, - 0)) + self.assertFalse(models.should_schedule_next(yesterday, now, "86400", schedule)) def test_exact_time_with_day_change(self): now = utcnow().replace(hour=0, minute=1) previous = (now - datetime.timedelta(days=2)).replace(hour=23, minute=59) schedule = "23:59".format(now.hour + 3) - self.assertTrue(models.should_schedule_next(previous, now, schedule, - 0)) + self.assertTrue(models.should_schedule_next(previous, now, "86400", schedule)) + + def test_exact_time_every_x_days_that_needs_reschedule(self): + now = utcnow() + four_days_ago = now - datetime.timedelta(days=4) + three_day_interval = "259200" + scheduled_datetime = now - datetime.timedelta(hours=3) + scheduled_time = "{:02d}:00".format(scheduled_datetime.hour) + self.assertTrue(models.should_schedule_next(four_days_ago, now, three_day_interval, + scheduled_time)) + + def test_exact_time_every_x_days_that_doesnt_need_reschedule(self): + now = utcnow() + four_days_ago = now - datetime.timedelta(days=2) + three_day_interval = "259200" + scheduled_datetime = now - datetime.timedelta(hours=3) + scheduled_time = "{:02d}:00".format(scheduled_datetime.hour) + self.assertFalse(models.should_schedule_next(four_days_ago, now, three_day_interval, + scheduled_time)) + + def test_exact_time_every_x_days_with_day_change(self): + now = utcnow().replace(hour=23, minute=59) + previous = (now - datetime.timedelta(days=2)).replace(hour=0, minute=1) + schedule = "23:58" + three_day_interval = "259200" + self.assertTrue(models.should_schedule_next(previous, now, three_day_interval, schedule)) + + def test_exact_time_every_x_weeks_that_needs_reschedule(self): + # Setup: + # + # 1) The query should run every 3 weeks on Tuesday + # 2) The last time it ran was 3 weeks ago from this week's Thursday + # 3) It is now Wednesday of this week + # + # Expectation: Even though less than 3 weeks have passed since the + # last run 3 weeks ago on Thursday, it's overdue since + # it should be running on Tuesdays. + this_thursday = utcnow() + datetime.timedelta(days=list(calendar.day_name).index("Thursday") - utcnow().weekday()) + three_weeks_ago = this_thursday - datetime.timedelta(weeks=3) + now = this_thursday - datetime.timedelta(days=1) + three_week_interval = "1814400" + scheduled_datetime = now - datetime.timedelta(hours=3) + scheduled_time = "{:02d}:00".format(scheduled_datetime.hour) + self.assertTrue(models.should_schedule_next(three_weeks_ago, now, three_week_interval, + scheduled_time, "Tuesday")) + + def test_exact_time_every_x_weeks_that_doesnt_need_reschedule(self): + # Setup: + # + # 1) The query should run every 3 weeks on Thurday + # 2) The last time it ran was 3 weeks ago from this week's Tuesday + # 3) It is now Wednesday of this week + # + # Expectation: Even though more than 3 weeks have passed since the + # last run 3 weeks ago on Tuesday, it's not overdue since + # it should be running on Thursdays. + this_tuesday = utcnow() + datetime.timedelta(days=list(calendar.day_name).index("Tuesday") - utcnow().weekday()) + three_weeks_ago = this_tuesday - datetime.timedelta(weeks=3) + now = this_tuesday + datetime.timedelta(days=1) + three_week_interval = "1814400" + scheduled_datetime = now - datetime.timedelta(hours=3) + scheduled_time = "{:02d}:00".format(scheduled_datetime.hour) + self.assertFalse(models.should_schedule_next(three_weeks_ago, now, three_week_interval, + scheduled_time, "Thursday")) def test_backoff(self): now = utcnow() two_hours_ago = now - datetime.timedelta(hours=2) self.assertTrue(models.should_schedule_next(two_hours_ago, now, "3600", - 5)) + failures=5)) self.assertFalse(models.should_schedule_next(two_hours_ago, now, - "3600", 10)) + "3600", failures=10)) class QueryOutdatedQueriesTest(BaseTestCase): # TODO: this test can be refactored to use mock version of should_schedule_next to simplify it. def test_outdated_queries_skips_unscheduled_queries(self): - query = self.factory.create_query(schedule=None) + query = self.factory.create_query(schedule={'interval':None, 'time': None, 'until':None, 'day_of_week':None}) queries = models.Query.outdated_queries() self.assertNotIn(query, queries) def test_outdated_queries_works_with_ttl_based_schedule(self): two_hours_ago = utcnow() - datetime.timedelta(hours=2) - query = self.factory.create_query(schedule="3600") + query = self.factory.create_query(schedule={'interval':'3600', 'time': None, 'until':None, 'day_of_week':None}) query_result = self.factory.create_query_result(query=query.query_text, retrieved_at=two_hours_ago) query.latest_query_data = query_result @@ -92,7 +152,7 @@ def test_outdated_queries_works_with_ttl_based_schedule(self): def test_outdated_queries_works_scheduled_queries_tracker(self): two_hours_ago = datetime.datetime.now() - datetime.timedelta(hours=2) - query = self.factory.create_query(schedule="3600") + query = self.factory.create_query(schedule={'interval':'3600', 'time': None, 'until':None, 'day_of_week':None}) query_result = self.factory.create_query_result(query=query, retrieved_at=two_hours_ago) query.latest_query_data = query_result @@ -103,7 +163,7 @@ def test_outdated_queries_works_scheduled_queries_tracker(self): def test_skips_fresh_queries(self): half_an_hour_ago = utcnow() - datetime.timedelta(minutes=30) - query = self.factory.create_query(schedule="3600") + query = self.factory.create_query(schedule={'interval':'3600', 'time': None, 'until':None, 'day_of_week':None}) query_result = self.factory.create_query_result(query=query.query_text, retrieved_at=half_an_hour_ago) query.latest_query_data = query_result @@ -112,7 +172,7 @@ def test_skips_fresh_queries(self): def test_outdated_queries_works_with_specific_time_schedule(self): half_an_hour_ago = utcnow() - datetime.timedelta(minutes=30) - query = self.factory.create_query(schedule=half_an_hour_ago.strftime('%H:%M')) + query = self.factory.create_query(schedule={'interval':'86400', 'time':half_an_hour_ago.strftime('%H:%M'), 'until':None, 'day_of_week':None}) query_result = self.factory.create_query_result(query=query.query_text, retrieved_at=half_an_hour_ago - datetime.timedelta(days=1)) query.latest_query_data = query_result @@ -124,9 +184,9 @@ def test_enqueues_query_only_once(self): Only one query per data source with the same text will be reported by Query.outdated_queries(). """ - query = self.factory.create_query(schedule="60") + query = self.factory.create_query(schedule={'interval':'60', 'until':None, 'time': None, 'day_of_week':None}) query2 = self.factory.create_query( - schedule="60", query_text=query.query_text, + schedule={'interval':'60', 'until':None, 'time': None, 'day_of_week':None}, query_text=query.query_text, query_hash=query.query_hash) retrieved_at = utcnow() - datetime.timedelta(minutes=10) query_result = self.factory.create_query_result( @@ -143,9 +203,9 @@ def test_enqueues_query_with_correct_data_source(self): Query.outdated_queries() even if they have the same query text. """ query = self.factory.create_query( - schedule="60", data_source=self.factory.create_data_source()) + schedule={'interval':'60', 'until':None, 'time': None, 'day_of_week':None}, data_source=self.factory.create_data_source()) query2 = self.factory.create_query( - schedule="60", query_text=query.query_text, + schedule={'interval':'60', 'until':None, 'time': None, 'day_of_week':None}, query_text=query.query_text, query_hash=query.query_hash) retrieved_at = utcnow() - datetime.timedelta(minutes=10) query_result = self.factory.create_query_result( @@ -162,9 +222,9 @@ def test_enqueues_only_for_relevant_data_source(self): If multiple queries with the same text exist, only ones that are scheduled to be refreshed are reported by Query.outdated_queries(). """ - query = self.factory.create_query(schedule="60") + query = self.factory.create_query(schedule={'interval':'60', 'until':None, 'time': None, 'day_of_week':None}) query2 = self.factory.create_query( - schedule="3600", query_text=query.query_text, + schedule={'interval':'3600', 'until':None, 'time': None, 'day_of_week':None}, query_text=query.query_text, query_hash=query.query_hash) retrieved_at = utcnow() - datetime.timedelta(minutes=10) query_result = self.factory.create_query_result( @@ -180,7 +240,7 @@ def test_failure_extends_schedule(self): Execution failures recorded for a query result in exponential backoff for scheduling future execution. """ - query = self.factory.create_query(schedule="60", schedule_failures=4) + query = self.factory.create_query(schedule={'interval':'60', 'until':None, 'time': None, 'day_of_week':None}, schedule_failures=4) retrieved_at = utcnow() - datetime.timedelta(minutes=16) query_result = self.factory.create_query_result( retrieved_at=retrieved_at, query_text=query.query_text, @@ -192,6 +252,34 @@ def test_failure_extends_schedule(self): query_result.retrieved_at = utcnow() - datetime.timedelta(minutes=17) self.assertEqual(list(models.Query.outdated_queries()), [query]) + def test_schedule_until_after(self): + """ + Queries with non-null ``schedule['until']`` are not reported by + Query.outdated_queries() after the given time is past. + """ + one_day_ago = (utcnow() - datetime.timedelta(days=1)).strftime("%Y-%m-%d") + two_hours_ago = utcnow() - datetime.timedelta(hours=2) + query = self.factory.create_query(schedule={'interval':'3600', 'until':one_day_ago, 'time':None, 'day_of_week':None}) + query_result = self.factory.create_query_result(query=query.query_text, retrieved_at=two_hours_ago) + query.latest_query_data = query_result + + queries = models.Query.outdated_queries() + self.assertNotIn(query, queries) + + def test_schedule_until_before(self): + """ + Queries with non-null ``schedule['until']`` are reported by + Query.outdated_queries() before the given time is past. + """ + one_day_from_now = (utcnow() + datetime.timedelta(days=1)).strftime("%Y-%m-%d") + two_hours_ago = utcnow() - datetime.timedelta(hours=2) + query = self.factory.create_query(schedule={'interval':'3600', 'until':one_day_from_now, 'time': None, 'day_of_week':None}) + query_result = self.factory.create_query_result(query=query.query_text, retrieved_at=two_hours_ago) + query.latest_query_data = query_result + + queries = models.Query.outdated_queries() + self.assertIn(query, queries) + class QueryArchiveTest(BaseTestCase): def setUp(self): @@ -205,7 +293,7 @@ def test_archive_query_sets_flag(self): self.assertEquals(query.is_archived, True) def test_archived_query_doesnt_return_in_all(self): - query = self.factory.create_query(schedule="1") + query = self.factory.create_query(schedule={'interval':'1', 'until':None, 'time': None, 'day_of_week':None}) yesterday = utcnow() - datetime.timedelta(days=1) query_result, _ = models.QueryResult.store_result( query.org_id, query.data_source, query.query_hash, query.query_text, @@ -230,11 +318,11 @@ def test_removes_associated_widgets_from_dashboards(self): self.assertEqual(db.session.query(models.Widget).get(widget.id), None) def test_removes_scheduling(self): - query = self.factory.create_query(schedule="1") + query = self.factory.create_query(schedule={'interval':'1', 'until':None, 'time': None, 'day_of_week':None}) query.archive() - self.assertEqual(None, query.schedule) + self.assertEqual({}, query.schedule) def test_deletes_alerts(self): subscription = self.factory.create_alert_subscription()