Skip to content

Commit

Permalink
Closes #187: Add finer-grained scheduling - backend.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marina Samuel authored and ranbena committed Jan 1, 2019
1 parent 63f38b7 commit 4906068
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 42 deletions.
107 changes: 107 additions & 0 deletions migrations/versions/640888ce445d_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
Add new scheduling data.
Revision ID: 640888ce445d
Revises: 71477dadd6ef
Create Date: 2018-09-21 19:35:58.578796
"""

import json
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table

from redash.models import MutableDict, PseudoJSON


# revision identifiers, used by Alembic.
revision = '640888ce445d'
down_revision = '71477dadd6ef'
branch_labels = None
depends_on = None


def upgrade():
# Copy "schedule" column into "old_schedule" column
op.add_column('queries', sa.Column('old_schedule', sa.String(length=10), nullable=True))

queries = table(
'queries',
sa.Column('schedule', sa.String(length=10)),
sa.Column('old_schedule', sa.String(length=10)))

op.execute(
queries
.update()
.values({'old_schedule': queries.c.schedule}))

# Recreate "schedule" column as a dict type
op.drop_column('queries', 'schedule')
op.add_column('queries', sa.Column('schedule', MutableDict.as_mutable(PseudoJSON), nullable=False, server_default=json.dumps({})))

# Move over values from old_schedule
queries = table(
'queries',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('schedule', MutableDict.as_mutable(PseudoJSON)),
sa.Column('old_schedule', sa.String(length=10)))

conn = op.get_bind()
for query in conn.execute(queries.select()):
schedule_json = {
'interval': None,
'until': None,
'day_of_week': None,
'time': None
}

if query.old_schedule is not None:
if ":" in query.old_schedule:
schedule_json['interval'] = 86400
schedule_json['time'] = query.old_schedule
else:
schedule_json['interval'] = query.old_schedule

conn.execute(
queries
.update()
.where(queries.c.id == query.id)
.values(schedule=MutableDict(schedule_json)))

op.drop_column('queries', 'old_schedule')

def downgrade():
op.add_column('queries', sa.Column('old_schedule', MutableDict.as_mutable(PseudoJSON), nullable=False, server_default=json.dumps({})))

queries = table(
'queries',
sa.Column('schedule', MutableDict.as_mutable(PseudoJSON)),
sa.Column('old_schedule', MutableDict.as_mutable(PseudoJSON)))

op.execute(
queries
.update()
.values({'old_schedule': queries.c.schedule}))

op.drop_column('queries', 'schedule')
op.add_column('queries', sa.Column('schedule', sa.String(length=10), nullable=True))

queries = table(
'queries',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('schedule', sa.String(length=10)),
sa.Column('old_schedule', MutableDict.as_mutable(PseudoJSON)))

conn = op.get_bind()
for query in conn.execute(queries.select()):
scheduleValue = query.old_schedule['interval']
if scheduleValue <= 86400:
scheduleValue = query.old_schedule['time']

conn.execute(
queries
.update()
.where(queries.c.id == query.id)
.values(schedule=scheduleValue))

op.drop_column('queries', 'old_schedule')
38 changes: 29 additions & 9 deletions redash/models.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import cStringIO
import csv
import datetime
import calendar
import functools
import hashlib
import itertools
import logging
import time
import pytz
from functools import reduce

from six import python_2_unicode_compatible, string_types, text_type
Expand Down Expand Up @@ -857,23 +859,33 @@ def make_excel_content(self):
return s.getvalue()


def should_schedule_next(previous_iteration, now, schedule, failures):
if schedule.isdigit():
ttl = int(schedule)
def should_schedule_next(previous_iteration, now, interval, time=None, day_of_week=None, failures=0):
# if time exists then interval > 23 hours (82800s)
# if day_of_week exists then interval > 6 days (518400s)
if (time is None):
ttl = int(interval)
next_iteration = previous_iteration + datetime.timedelta(seconds=ttl)
else:
hour, minute = schedule.split(':')
hour, minute = time.split(':')
hour, minute = int(hour), int(minute)

# The following logic is needed for cases like the following:
# - The query scheduled to run at 23:59.
# - The scheduler wakes up at 00:01.
# - Using naive implementation of comparing timestamps, it will skip the execution.
normalized_previous_iteration = previous_iteration.replace(hour=hour, minute=minute)

if normalized_previous_iteration > previous_iteration:
previous_iteration = normalized_previous_iteration - datetime.timedelta(days=1)

next_iteration = (previous_iteration + datetime.timedelta(days=1)).replace(hour=hour, minute=minute)
days_delay = int(interval) / 60 / 60 / 24

days_to_add = 0
if (day_of_week is not None):
days_to_add = list(calendar.day_name).index(day_of_week) - normalized_previous_iteration.weekday()

next_iteration = (previous_iteration + datetime.timedelta(days=days_delay) +
datetime.timedelta(days=days_to_add)).replace(hour=hour, minute=minute)
if failures:
next_iteration += datetime.timedelta(minutes=2**failures)
return now > next_iteration
Expand Down Expand Up @@ -901,7 +913,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
foreign_keys=[last_modified_by_id])
is_archived = Column(db.Boolean, default=False, index=True)
is_draft = Column(db.Boolean, default=True, index=True)
schedule = Column(db.String(10), nullable=True)
schedule = Column(MutableDict.as_mutable(PseudoJSON), nullable=True)
schedule_failures = Column(db.Integer, default=0)
visualizations = db.relationship("Visualization", cascade="all, delete-orphan")
options = Column(MutableDict.as_mutable(PseudoJSON), default={})
Expand All @@ -923,7 +935,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
def archive(self, user=None):
db.session.add(self)
self.is_archived = True
self.schedule = None
self.schedule = {}

for vis in self.visualizations:
for w in vis.widgets:
Expand Down Expand Up @@ -1026,22 +1038,30 @@ def by_user(cls, user):
def outdated_queries(cls):
queries = (db.session.query(Query)
.options(joinedload(Query.latest_query_data).load_only('retrieved_at'))
.filter(Query.schedule != None)
.filter(Query.schedule != {})
.order_by(Query.id))

now = utils.utcnow()
outdated_queries = {}
scheduled_queries_executions.refresh()

for query in queries:
schedule_until = pytz.utc.localize(datetime.datetime.strptime(
query.schedule['until'], '%Y-%m-%d')) if query.schedule['until'] else None
if (query.schedule['interval'] == None or (
schedule_until != None and (
schedule_until <= now))):
continue

if query.latest_query_data:
retrieved_at = query.latest_query_data.retrieved_at
else:
retrieved_at = now

retrieved_at = scheduled_queries_executions.get(query.id) or retrieved_at

if should_schedule_next(retrieved_at, now, query.schedule, query.schedule_failures):
if should_schedule_next(retrieved_at, now, query.schedule['interval'], query.schedule['time'],
query.schedule['day_of_week'], query.schedule_failures):
key = "{}:{}".format(query.query_hash, query.data_source_id)
outdated_queries[key] = query

Expand Down
4 changes: 2 additions & 2 deletions tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __call__(self):
user=user_factory.create,
is_archived=False,
is_draft=False,
schedule=None,
schedule={},
data_source=data_source_factory.create,
org_id=1)

Expand All @@ -86,7 +86,7 @@ def __call__(self):
user=user_factory.create,
is_archived=False,
is_draft=False,
schedule=None,
schedule={},
data_source=data_source_factory.create,
org_id=1)

Expand Down
2 changes: 1 addition & 1 deletion tests/handlers/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def test_create_query(self):
query_data = {
'name': 'Testing',
'query': 'SELECT 1',
'schedule': "3600",
'schedule': {"interval": "3600"},
'data_source_id': self.factory.data_source.id
}

Expand Down
6 changes: 3 additions & 3 deletions tests/tasks/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_success_scheduled(self):
"""
cm = mock.patch("celery.app.task.Context.delivery_info",
{'routing_key': 'test'})
q = self.factory.create_query(query_text="SELECT 1, 2", schedule=300)
q = self.factory.create_query(query_text="SELECT 1, 2", schedule={"interval": 300})
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.return_value = ([1, 2], None)
result_id = execute_query(
Expand All @@ -112,7 +112,7 @@ def test_failure_scheduled(self):
"""
cm = mock.patch("celery.app.task.Context.delivery_info",
{'routing_key': 'test'})
q = self.factory.create_query(query_text="SELECT 1, 2", schedule=300)
q = self.factory.create_query(query_text="SELECT 1, 2", schedule={"interval": 300})
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.side_effect = ValueError("broken")
with self.assertRaises(QueryExecutionError):
Expand All @@ -132,7 +132,7 @@ def test_success_after_failure(self):
"""
cm = mock.patch("celery.app.task.Context.delivery_info",
{'routing_key': 'test'})
q = self.factory.create_query(query_text="SELECT 1, 2", schedule=300)
q = self.factory.create_query(query_text="SELECT 1, 2", schedule={"interval": 300})
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.side_effect = ValueError("broken")
with self.assertRaises(QueryExecutionError):
Expand Down
Loading

0 comments on commit 4906068

Please sign in to comment.