diff --git a/client/app/components/queries/schedule-dialog.html b/client/app/components/queries/schedule-dialog.html
index f9344238a1..b6f8030073 100644
--- a/client/app/components/queries/schedule-dialog.html
+++ b/client/app/components/queries/schedule-dialog.html
@@ -19,4 +19,7 @@
Refresh Schedule
Stop scheduling at date/time (format yyyy-MM-ddTHH:mm:ss, like 2016-12-28T14:57:00):
+
diff --git a/client/app/components/queries/schedule-dialog.js b/client/app/components/queries/schedule-dialog.js
index fdfa2dd259..667e211008 100644
--- a/client/app/components/queries/schedule-dialog.js
+++ b/client/app/components/queries/schedule-dialog.js
@@ -135,11 +135,22 @@ function scheduleUntil() {
};
}
+function scheduleKeepResults() {
+ return {
+ restrict: 'E',
+ scope: {
+ query: '=',
+ saveQuery: '=',
+ },
+ template: '',
+ };
+}
+
const ScheduleForm = {
controller() {
this.query = this.resolve.query;
this.saveQuery = this.resolve.saveQuery;
-
+ this.isIncremental = false;
if (this.query.hasDailySchedule()) {
this.refreshType = 'daily';
} else {
@@ -158,5 +169,6 @@ export default function init(ngModule) {
ngModule.directive('queryTimePicker', queryTimePicker);
ngModule.directive('queryRefreshSelect', queryRefreshSelect);
ngModule.directive('scheduleUntil', scheduleUntil);
+ ngModule.directive('scheduleKeepResults', scheduleKeepResults);
ngModule.component('scheduleDialog', ScheduleForm);
}
diff --git a/client/app/pages/queries/view.js b/client/app/pages/queries/view.js
index cfdee26b27..b3cfc1b972 100644
--- a/client/app/pages/queries/view.js
+++ b/client/app/pages/queries/view.js
@@ -184,6 +184,7 @@ function QueryViewCtrl(
} else {
request = pick($scope.query, [
'schedule',
+ 'schedule_keep_results',
'query',
'id',
'description',
diff --git a/client/app/services/query-result.js b/client/app/services/query-result.js
index 1625921671..9f15277e56 100644
--- a/client/app/services/query-result.js
+++ b/client/app/services/query-result.js
@@ -54,6 +54,7 @@ function addPointToSeries(point, seriesCollection, seriesName) {
function QueryResultService($resource, $timeout, $q) {
const QueryResultResource = $resource('api/query_results/:id', { id: '@id' }, { post: { method: 'POST' } });
+ const QueryAggregateResultResource = $resource('api/queries/:id/aggregate_results', { id: '@id' });
const Job = $resource('api/jobs/:id', { id: '@id' });
const statuses = {
1: 'waiting',
@@ -421,6 +422,15 @@ function QueryResultService($resource, $timeout, $q) {
return queryResult;
}
+ static getAggregate(queryId) {
+ const queryResult = new QueryResult();
+
+ QueryAggregateResultResource.get({ id: queryId }, (response) => {
+ queryResult.update(response);
+ });
+
+ return queryResult;
+ }
loadResult(tryCount) {
QueryResultResource.get(
{ id: this.job.query_result_id },
diff --git a/client/app/services/query.js b/client/app/services/query.js
index 935b8cba1b..79d70d45b2 100644
--- a/client/app/services/query.js
+++ b/client/app/services/query.js
@@ -298,7 +298,11 @@ function QueryResource($resource, $http, $q, $location, currentUser, QueryResult
this.latest_query_data_id = null;
}
- if (this.latest_query_data && maxAge !== 0) {
+ if (this.schedule_keep_results) {
+ if (!this.queryResult) {
+ this.queryResult = QueryResult.getAggregate(this.id);
+ }
+ } else if (this.latest_query_data && maxAge !== 0) {
if (!this.queryResult) {
this.queryResult = new QueryResult({
query_result: this.latest_query_data,
diff --git a/migrations/versions/2a2b3b58464e_.py b/migrations/versions/2a2b3b58464e_.py
new file mode 100644
index 0000000000..f0cee7cdbd
--- /dev/null
+++ b/migrations/versions/2a2b3b58464e_.py
@@ -0,0 +1,32 @@
+"""incremental query results aggregation
+
+Revision ID: 2a2b3b58464e
+Revises: 15041b7085fe
+Create Date: 2018-02-16 19:28:38.931253
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = '2a2b3b58464e'
+down_revision = '15041b7085fe'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ op.create_table('query_resultsets',
+ sa.Column('query_id', sa.Integer(), nullable=False),
+ sa.Column('result_id', sa.Integer(), nullable=False),
+ sa.ForeignKeyConstraint(['query_id'], ['queries.id'], ),
+ sa.ForeignKeyConstraint(['result_id'], ['query_results.id'], ),
+ sa.PrimaryKeyConstraint('query_id', 'result_id')
+ )
+ op.add_column(u'queries', sa.Column('schedule_keep_results', sa.Integer(), nullable=True))
+
+
+def downgrade():
+ op.drop_column(u'queries', 'schedule_keep_results')
+ op.drop_table('query_resultsets')
diff --git a/redash/handlers/api.py b/redash/handlers/api.py
index 9c2dd83361..1d8f054c0e 100644
--- a/redash/handlers/api.py
+++ b/redash/handlers/api.py
@@ -10,7 +10,7 @@
from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource, DataSourcePauseResource, DataSourceTestResource, DataSourceVersionResource
from redash.handlers.events import EventsResource
from redash.handlers.queries import QueryForkResource, QueryRefreshResource, QueryListResource, QueryRecentResource, QuerySearchResource, QueryResource, MyQueriesResource, QueryVersionListResource, ChangeResource
-from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource
+from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource, QueryResultSetResource
from redash.handlers.users import UserResource, UserListResource, UserInviteResource, UserResetPasswordResource
from redash.handlers.visualizations import VisualizationListResource
from redash.handlers.visualizations import VisualizationResource
@@ -76,6 +76,7 @@ def json_representation(data, code, headers=None):
api.add_org_resource(QueryRefreshResource, '/api/queries//refresh', endpoint='query_refresh')
api.add_org_resource(QueryResource, '/api/queries/', endpoint='query')
api.add_org_resource(QueryForkResource, '/api/queries//fork', endpoint='query_fork')
+api.add_org_resource(QueryResultSetResource, '/api/queries//aggregate_results', endpoint='query_aggregate_results')
api.add_org_resource(QueryVersionListResource, '/api/queries//version', endpoint='query_versions')
api.add_org_resource(ChangeResource, '/api/changes/', endpoint='changes')
diff --git a/redash/handlers/query_results.py b/redash/handlers/query_results.py
index 9c2e1f33d9..545365bab9 100644
--- a/redash/handlers/query_results.py
+++ b/redash/handlers/query_results.py
@@ -132,6 +132,31 @@ def post(self):
ONE_YEAR = 60 * 60 * 24 * 365.25
+class QueryResultSetResource(BaseResource):
+ @require_permission('view_query')
+ def get(self, query_id=None, filetype='json'):
+ query = get_object_or_404(models.Query.get_by_id_and_org, query_id, self.current_org)
+ if not query.schedule_keep_results:
+ abort(404, message="query does not keep multiple results")
+
+ # Synthesize a result set from the last N results.
+ total = len(query.query_results)
+ offset = max(total - query.schedule_keep_results, 0)
+ results = [qr.to_dict() for qr in query.query_results[offset:offset + total]]
+ if not results:
+ aggregate_result = {}
+ else:
+ aggregate_result = results[0].copy()
+ aggregate_result['data'] = {'columns': results[0]['data']['columns'],
+ 'rows': []}
+ for r in results:
+ aggregate_result['data']['rows'].extend(r['data']['rows'])
+
+ data = json.dumps({'query_result': aggregate_result}, cls=utils.JSONEncoder)
+ headers = {'Content-Type': "application/json"}
+ return make_response(data, 200, headers)
+
+
class QueryResultResource(BaseResource):
@staticmethod
def add_cors_headers(headers):
@@ -194,7 +219,7 @@ def get(self, query_id=None, query_result_id=None, filetype='json'):
query_result = run_query_sync(query.data_source, parameter_values, query.to_dict()['query'], max_age=max_age)
elif query.latest_query_data_id is not None:
query_result = get_object_or_404(models.QueryResult.get_by_id_and_org, query.latest_query_data_id, self.current_org)
-
+
if query is not None and query_result is not None and self.current_user.is_api_user():
if query.query_hash != query_result.query_hash:
abort(404, message='No cached result found for this query.')
diff --git a/redash/models.py b/redash/models.py
index 34bf11ba75..77ed970b5e 100644
--- a/redash/models.py
+++ b/redash/models.py
@@ -25,7 +25,7 @@
from redash.utils.comparators import CaseInsensitiveComparator
from redash.utils.configuration import ConfigurationContainer
from redash.settings.organization import settings as org_settings
-from sqlalchemy import distinct, or_
+from sqlalchemy import distinct, exists, or_
from sqlalchemy.dialects import postgresql
from sqlalchemy.event import listens_for
from sqlalchemy.ext.mutable import Mutable
@@ -728,9 +728,9 @@ def to_dict(self):
def unused(cls, days=7):
age_threshold = datetime.datetime.now() - datetime.timedelta(days=days)
- unused_results = (db.session.query(QueryResult.id).filter(
- Query.id == None, QueryResult.retrieved_at < age_threshold)
- .outerjoin(Query))
+ unused_results = db.session.query(QueryResult.id).filter(
+ QueryResult.retrieved_at < age_threshold,
+ ~QueryResultSet.query.filter(QueryResultSet.result_id == QueryResult.id).exists())
return unused_results
@@ -769,9 +769,13 @@ def store_result(cls, org, data_source, query_hash, query, data, run_time, retri
queries = db.session.query(Query).filter(
Query.query_hash == query_hash,
Query.data_source == data_source)
+
for q in queries:
q.latest_query_data = query_result
db.session.add(q)
+ if q.schedule_keep_results > 0:
+
+ q.query_results.append(query_result)
query_ids = [q.id for q in queries]
logging.info("Updated %s queries with result (%s).", len(query_ids), query_hash)
@@ -851,6 +855,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
data_source = db.relationship(DataSource, backref='queries')
latest_query_data_id = Column(db.Integer, db.ForeignKey("query_results.id"), nullable=True)
latest_query_data = db.relationship(QueryResult)
+ query_results = db.relationship("QueryResult", secondary="query_resultsets")
name = Column(db.String(255))
description = Column(db.String(4096), nullable=True)
query_text = Column("query", db.Text)
@@ -866,6 +871,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
schedule = Column(db.String(10), nullable=True)
schedule_failures = Column(db.Integer, default=0)
schedule_until = Column(db.DateTime(True), nullable=True)
+ schedule_keep_results = Column(db.Integer, nullable=True)
visualizations = db.relationship("Visualization", cascade="all, delete-orphan")
options = Column(MutableDict.as_mutable(PseudoJSON), default={})
search_vector = Column(TSVectorType('id', 'name', 'description', 'query',
@@ -892,6 +898,7 @@ def to_dict(self, with_stats=False, with_visualizations=False, with_user=True, w
'query_hash': self.query_hash,
'schedule': self.schedule,
'schedule_until': self.schedule_until,
+ 'schedule_keep_results': self.schedule_keep_results,
'api_key': self.api_key,
'is_archived': self.is_archived,
'is_draft': self.is_draft,
@@ -1000,6 +1007,29 @@ def outdated_queries(cls):
return outdated_queries.values()
+ @classmethod
+ def delete_stale_resultsets(cls):
+ delete_count = 0
+ queries = Query.query.filter(Query.schedule_keep_results != None).order_by(Query.schedule_keep_results.desc())
+ if queries.first() and queries[0].schedule_keep_results:
+ resultsets = QueryResultSet.query.filter(QueryResultSet.query_rel == queries[0]).order_by(QueryResultSet.result_id)
+ c = resultsets.count()
+ if c > queries[0].schedule_keep_results:
+ n_to_delete = c - queries[0].schedule_keep_results
+ r_ids = [r.result_id for r in resultsets][:n_to_delete]
+ delete_count = QueryResultSet.query.filter(QueryResultSet.result_id.in_(r_ids)).delete(synchronize_session=False)
+ print "one", delete_count
+ QueryResult.query.filter(QueryResult.id.in_(r_ids)).delete(synchronize_session=False)
+ for q in queries[1:]:
+ resultsets = db.session.query(QueryResultSet.result_id).filter(QueryResultSet.query_rel == q).order_by(QueryResultSet.result_id)
+ n_to_delete = resultsets.count() - q.schedule_keep_results
+ if n_to_delete > 0:
+ stale_r = QueryResultSet.query.filter(QueryResultSet.result_id.in_(resultsets.limit(n_to_delete).subquery()))
+ n = stale_r.delete(synchronize_session=False)
+ print "n", n
+ delete_count += n
+ return delete_count
+
@classmethod
def search(cls, term, group_ids, include_drafts=False, limit=20):
where = cls.is_archived == False
@@ -1089,6 +1119,16 @@ def __repr__(self):
return '' % (self.id, self.name or 'untitled')
+class QueryResultSet(db.Model):
+ query_id = Column(db.Integer, db.ForeignKey("queries.id"),
+ primary_key=True)
+ query_rel = db.relationship(Query)
+ result_id = Column(db.Integer, db.ForeignKey("query_results.id"),
+ primary_key=True)
+ result = db.relationship(QueryResult)
+ __tablename__ = 'query_resultsets'
+
+
@vectorizer(db.Integer)
def integer_vectorizer(column):
return db.func.cast(column, db.Text)
diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py
index c2c4322fc3..391a9b2fae 100644
--- a/redash/tasks/queries.py
+++ b/redash/tasks/queries.py
@@ -355,6 +355,7 @@ def cleanup_query_results():
deleted_count = models.QueryResult.query.filter(
models.QueryResult.id.in_(unused_query_results.subquery())
).delete(synchronize_session=False)
+ deleted_count += models.Query.delete_stale_resultsets()
models.db.session.commit()
logger.info("Deleted %d unused query results.", deleted_count)
diff --git a/tests/factories.py b/tests/factories.py
index 6d6f77e628..a051719d26 100644
--- a/tests/factories.py
+++ b/tests/factories.py
@@ -109,7 +109,9 @@ def __call__(self):
query_hash=gen_query_hash('SELECT 1'),
data_source=data_source_factory.create,
org_id=1)
-
+query_resultset_factory = ModelFactory(redash.models.QueryResultSet,
+ query_rel=query_factory.create,
+ result=query_result_factory.create)
visualization_factory = ModelFactory(redash.models.Visualization,
type='CHART',
query_rel=query_factory.create,
@@ -295,6 +297,9 @@ def create_query_result(self, **kwargs):
return query_result_factory.create(**args)
+ def create_query_resultset(self, **kwargs):
+ return query_resultset_factory.create(**kwargs)
+
def create_visualization(self, **kwargs):
args = {
'query_rel': self.create_query()
diff --git a/tests/handlers/test_queries.py b/tests/handlers/test_queries.py
index 4027267c21..22d6b6c42c 100644
--- a/tests/handlers/test_queries.py
+++ b/tests/handlers/test_queries.py
@@ -1,3 +1,5 @@
+import json
+
from tests import BaseTestCase
from redash import models
from redash.models import db
@@ -226,3 +228,80 @@ def test_get(self):
rv2 = self.make_request('get', '/api/changes/' + str(ch2.id))
self.assertEqual(rv2.status_code, 200)
self.assertEqual(rv2.json['change']['name']['current'], 'version B')
+
+
+class AggregateResultsTests(BaseTestCase):
+ def test_aggregate(self):
+ qtxt = "SELECT x FROM mytable;"
+ q = self.factory.create_query(query_text=qtxt, schedule_keep_results=3)
+ qr0 = self.factory.create_query_result(
+ query_text=qtxt,
+ data = json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'eve', 'color': 'grue'},
+ {'name': 'mallory', 'color': 'bleen'}]}))
+ qr1 = self.factory.create_query_result(
+ query_text=qtxt,
+ data = json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'bob', 'color': 'green'},
+ {'name': 'fred', 'color': 'blue'}]}))
+ qr2 = self.factory.create_query_result(
+ query_text=qtxt,
+ data = json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'alice', 'color': 'red'},
+ {'name': 'eddie', 'color': 'orange'}]}))
+ qr3 = self.factory.create_query_result(
+ query_text=qtxt,
+ data = json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'dave', 'color': 'yellow'},
+ {'name': 'carol', 'color': 'taupe'}]}))
+ for qr in (qr0, qr1, qr2, qr3):
+ self.factory.create_query_resultset(query_rel=q, result=qr)
+ rv = self.make_request('get', '/api/queries/{}/aggregate_results'.format(q.id))
+ self.assertEqual(rv.status_code, 200)
+ self.assertEqual(rv.json['query_result']['data'],
+ {'columns': ['name', 'color'],
+ 'rows': [
+ {'name': 'bob', 'color': 'green'},
+ {'name': 'fred', 'color': 'blue'},
+ {'name': 'alice', 'color': 'red'},
+ {'name': 'eddie', 'color': 'orange'},
+ {'name': 'dave', 'color': 'yellow'},
+ {'name': 'carol', 'color': 'taupe'}
+ ]})
+
+ def test_underfilled_aggregate(self):
+ qtxt = "SELECT x FROM mytable;"
+ q = self.factory.create_query(query_text=qtxt, schedule_keep_results=3)
+ qr1 = self.factory.create_query_result(
+ query_text=qtxt,
+ data = json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'bob', 'color': 'green'},
+ {'name': 'fred', 'color': 'blue'}]}))
+ qr2 = self.factory.create_query_result(
+ query_text=qtxt,
+ data = json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'alice', 'color': 'red'},
+ {'name': 'eddie', 'color': 'orange'}]}))
+ for qr in (qr1, qr2):
+ self.factory.create_query_resultset(query_rel=q, result=qr)
+ rv = self.make_request('get', '/api/queries/{}/aggregate_results'.format(q.id))
+ self.assertEqual(rv.status_code, 200)
+ self.assertEqual(rv.json['query_result']['data'],
+ {'columns': ['name', 'color'],
+ 'rows': [
+ {'name': 'bob', 'color': 'green'},
+ {'name': 'fred', 'color': 'blue'},
+ {'name': 'alice', 'color': 'red'},
+ {'name': 'eddie', 'color': 'orange'}
+ ]})
+
+ def test_no_aggregate(self):
+ qtxt = "SELECT x FROM mytable;"
+ q = self.factory.create_query(query_text=qtxt)
+ qr0 = self.factory.create_query_result(
+ query_text=qtxt,
+ data = json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'eve', 'color': 'grue'},
+ {'name': 'mallory', 'color': 'bleen'}]}))
+ rv = self.make_request('get', '/api/queries/{}/aggregate_results'.format(q.id))
+ self.assertEqual(rv.status_code, 404)
diff --git a/tests/test_models.py b/tests/test_models.py
index 0c318b12f6..7aec6a4124 100644
--- a/tests/test_models.py
+++ b/tests/test_models.py
@@ -335,22 +335,49 @@ def test_get_latest_returns_the_last_cached_result_for_negative_ttl(self):
class TestUnusedQueryResults(BaseTestCase):
def test_returns_only_unused_query_results(self):
two_weeks_ago = utcnow() - datetime.timedelta(days=14)
- qr = self.factory.create_query_result()
- query = self.factory.create_query(latest_query_data=qr)
+ qt = "SELECT 1"
+ qr = self.factory.create_query_result(query_text=qt)
+ query = self.factory.create_query(query_text=qt, latest_query_data=qr)
+ unused_qr = self.factory.create_query_result(query_text=qt, retrieved_at=two_weeks_ago)
db.session.flush()
- unused_qr = self.factory.create_query_result(retrieved_at=two_weeks_ago)
self.assertIn((unused_qr.id,), models.QueryResult.unused())
self.assertNotIn((qr.id,), list(models.QueryResult.unused()))
def test_returns_only_over_a_week_old_results(self):
two_weeks_ago = utcnow() - datetime.timedelta(days=14)
- unused_qr = self.factory.create_query_result(retrieved_at=two_weeks_ago)
+ qt = "SELECT 1"
+ unused_qr = self.factory.create_query_result(query_text=qt, retrieved_at=two_weeks_ago)
db.session.flush()
- new_unused_qr = self.factory.create_query_result()
-
+ new_unused_qr = self.factory.create_query_result(query_text=qt)
self.assertIn((unused_qr.id,), models.QueryResult.unused())
self.assertNotIn((new_unused_qr.id,), models.QueryResult.unused())
+ def test_doesnt_return_live_incremental_results(self):
+ two_weeks_ago = utcnow() - datetime.timedelta(days=14)
+ qt = "SELECT 1"
+ qrs = [self.factory.create_query_result(query_text=qt, retrieved_at=two_weeks_ago)
+ for _ in range(5)]
+ q = self.factory.create_query(query_text=qt, latest_query_data=qrs[0],
+ schedule_keep_results=3)
+ for qr in qrs:
+ self.factory.create_query_resultset(query_rel=q, result=qr)
+ db.session.flush()
+ self.assertEqual([], list(models.QueryResult.unused()))
+
+ def test_deletes_stale_resultsets(self):
+ qt = "SELECT 17"
+ query = self.factory.create_query(query_text=qt, schedule_keep_results=5)
+ for _ in range(10):
+ r = self.factory.create_query_result(query_text=qt)
+ self.factory.create_query_resultset(query_rel=query, result=r)
+ query2 = self.factory.create_query(query_text=qt, schedule_keep_results=3)
+ for _ in range(10):
+ self.factory.create_query_result(query_text=qt)
+ self.factory.create_query_resultset(query_rel=query2)
+ db.session.flush()
+ self.assertEqual(models.Query.delete_stale_resultsets(), 12)
+ self.assertEqual(models.QueryResultSet.query.count(), 8)
+
class TestQueryAll(BaseTestCase):
def test_returns_only_queries_in_given_groups(self):
@@ -469,6 +496,7 @@ def test_doesnt_update_queries_with_different_data_source(self):
self.assertNotEqual(query3.latest_query_data, query_result)
+
class TestEvents(BaseTestCase):
def raw_event(self):
timestamp = 1411778709.791