diff --git a/client/app/components/queries/ScheduleDialog.jsx b/client/app/components/queries/ScheduleDialog.jsx
index ffdbb6ccb5..8972f3e4c3 100644
--- a/client/app/components/queries/ScheduleDialog.jsx
+++ b/client/app/components/queries/ScheduleDialog.jsx
@@ -3,6 +3,7 @@ import PropTypes from 'prop-types';
import Modal from 'antd/lib/modal';
import DatePicker from 'antd/lib/date-picker';
import TimePicker from 'antd/lib/time-picker';
+import InputNumber from 'antd/lib/input-number';
import Select from 'antd/lib/select';
import Radio from 'antd/lib/radio';
import { capitalize, clone, isEqual } from 'lodash';
@@ -60,10 +61,12 @@ class ScheduleDialog extends React.Component {
schedule: RefreshScheduleType,
refreshOptions: PropTypes.arrayOf(PropTypes.number).isRequired,
dialog: DialogPropType.isRequired,
+ resultsetSize: PropTypes.number,
};
static defaultProps = {
schedule: RefreshScheduleDefault,
+ resultsetSize: 1,
};
state = this.getState();
@@ -81,6 +84,7 @@ class ScheduleDialog extends React.Component {
interval,
dayOfWeek: day ? WEEKDAYS_SHORT[WEEKDAYS_FULL.indexOf(day)] : null,
newSchedule,
+ resultsetSize: this.props.resultsetSize,
};
}
@@ -175,15 +179,19 @@ class ScheduleDialog extends React.Component {
this.setScheduleUntil(null, date);
};
+ setResultsetSize = (resultsetSize) => {
+ this.setState({ resultsetSize });
+ }
+
save() {
const { newSchedule } = this.state;
// save if changed
if (!isEqual(newSchedule, this.props.schedule)) {
if (newSchedule.interval) {
- this.props.dialog.close(clone(newSchedule));
+ this.props.dialog.close([clone(newSchedule), this.state.resultsetSize]);
} else {
- this.props.dialog.close(null);
+ this.props.dialog.close([null, this.state.resultsetSize]);
}
}
this.props.dialog.dismiss();
@@ -197,6 +205,7 @@ class ScheduleDialog extends React.Component {
hour,
seconds,
newSchedule: { until },
+ resultsetSize,
} = this.state;
return (
@@ -266,6 +275,14 @@ class ScheduleDialog extends React.Component {
) : null}
+ Number of query results to keep
+
+
);
}
diff --git a/client/app/pages/queries/view.js b/client/app/pages/queries/view.js
index 88870fef38..ca9a699a56 100644
--- a/client/app/pages/queries/view.js
+++ b/client/app/pages/queries/view.js
@@ -230,6 +230,7 @@ function QueryViewCtrl(
} else {
request = pick($scope.query, [
'schedule',
+ 'schedule_resultset_size',
'query',
'id',
'description',
@@ -486,8 +487,10 @@ function QueryViewCtrl(
ScheduleDialog.showModal({
schedule: $scope.query.schedule,
refreshOptions: $scope.refreshOptions,
- }).result.then((schedule) => {
+ resultsetSize: $scope.query.schedule_resultset_size,
+ }).result.then(([schedule, resultsetSize]) => {
$scope.query.schedule = schedule;
+ $scope.query.schedule_resultset_size = resultsetSize;
$scope.saveQuery();
});
};
diff --git a/client/app/services/query-result.js b/client/app/services/query-result.js
index b4d6dbcdec..f46c0570f8 100644
--- a/client/app/services/query-result.js
+++ b/client/app/services/query-result.js
@@ -37,6 +37,7 @@ function getColumnFriendlyName(column) {
function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) {
const QueryResultResource = $resource('api/query_results/:id', { id: '@id' }, { post: { method: 'POST' } });
+ const QueryResultSetResource = $resource('api/queries/:id/resultset', { id: '@id' });
const Job = $resource('api/jobs/:id', { id: '@id' });
const JobWithApiKey = $resource('api/queries/:queryId/jobs/:id', { queryId: '@queryId', id: '@id' });
const statuses = {
@@ -295,6 +296,16 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) {
return queryResult;
}
+ static getResultSet(queryId) {
+ const queryResult = new QueryResult();
+
+ QueryResultSetResource.get({ id: queryId }, (response) => {
+ queryResult.update(response);
+ });
+
+ return queryResult;
+ }
+
loadLatestCachedResult(queryId, parameters) {
$resource('api/queries/:id/results', { id: '@queryId' }, { post: { method: 'POST' } })
.post({ queryId, parameters },
diff --git a/client/app/services/query.js b/client/app/services/query.js
index 6606de1547..ad5209d3fe 100644
--- a/client/app/services/query.js
+++ b/client/app/services/query.js
@@ -507,7 +507,11 @@ function QueryResource(
this.latest_query_data_id = null;
}
- if (this.latest_query_data && maxAge !== 0) {
+ if (this.schedule_resultset_size > 1) {
+ if (!this.queryResult) {
+ this.queryResult = QueryResult.getResultSet(this.id);
+ }
+ } else if (this.latest_query_data && maxAge !== 0) {
if (!this.queryResult) {
this.queryResult = new QueryResult({
query_result: this.latest_query_data,
diff --git a/migrations/versions/2ba47e9812b1_.py b/migrations/versions/2ba47e9812b1_.py
new file mode 100644
index 0000000000..93d0f59268
--- /dev/null
+++ b/migrations/versions/2ba47e9812b1_.py
@@ -0,0 +1,24 @@
+"""empty message
+
+Revision ID: 2ba47e9812b1
+Revises: 71477dadd6ef, 9d7678c47452
+Create Date: 2018-07-25 16:09:54.769289
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = '2ba47e9812b1'
+down_revision = ('71477dadd6ef', '9d7678c47452', )
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ pass
+
+
+def downgrade():
+ pass
diff --git a/migrations/versions/9d7678c47452_.py b/migrations/versions/9d7678c47452_.py
new file mode 100644
index 0000000000..d351153c87
--- /dev/null
+++ b/migrations/versions/9d7678c47452_.py
@@ -0,0 +1,34 @@
+"""Incremental query results aggregation
+
+Revision ID: 9d7678c47452
+Revises: 15041b7085fe
+Create Date: 2018-03-08 04:36:12.802199
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = '9d7678c47452'
+down_revision = '15041b7085fe'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ op.create_table('query_resultsets',
+ sa.Column('query_id', sa.Integer(), nullable=False),
+ sa.Column('result_id', sa.Integer(), nullable=False),
+ sa.ForeignKeyConstraint(['query_id'], ['queries.id'], ),
+ sa.ForeignKeyConstraint(['result_id'], ['query_results.id'], ),
+ sa.PrimaryKeyConstraint('query_id', 'result_id')
+ )
+ op.add_column(u'queries', sa.Column('schedule_resultset_size', sa.Integer(), nullable=True))
+1
+
+def downgrade():
+ # ### commands auto generated by Alembic - please adjust! ###
+ op.drop_column(u'queries', 'schedule_resultset_size')
+ op.drop_table('query_resultsets')
+ # ### end Alembic commands ###
diff --git a/redash/handlers/api.py b/redash/handlers/api.py
index 688f600adf..7e124497c0 100644
--- a/redash/handlers/api.py
+++ b/redash/handlers/api.py
@@ -42,6 +42,7 @@
QueryResultDropdownResource,
QueryDropdownsResource,
QueryResultListResource,
+ QueryResultSetResource,
QueryResultResource)
from redash.handlers.query_snippets import (QuerySnippetListResource,
QuerySnippetResource)
@@ -117,6 +118,7 @@ def json_representation(data, code, headers=None):
api.add_org_resource(QueryRefreshResource, '/api/queries//refresh', endpoint='query_refresh')
api.add_org_resource(QueryResource, '/api/queries/', endpoint='query')
api.add_org_resource(QueryForkResource, '/api/queries//fork', endpoint='query_fork')
+api.add_org_resource(QueryResultSetResource, '/api/queries//resultset', endpoint='query_aggregate_results')
api.add_org_resource(QueryVersionListResource, '/api/queries//version', endpoint='query_versions')
api.add_org_resource(ChangeResource, '/api/changes/', endpoint='changes')
diff --git a/redash/handlers/queries.py b/redash/handlers/queries.py
index 7ff91eba45..bc6bb88eab 100644
--- a/redash/handlers/queries.py
+++ b/redash/handlers/queries.py
@@ -196,6 +196,7 @@ def post(self):
: 0:
+ q.query_results.append(query_result)
query_ids = [q.id for q in queries]
logging.info("Updated %s queries with result (%s).", len(query_ids), query_hash)
@@ -406,6 +409,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
data_source = db.relationship(DataSource, backref='queries')
latest_query_data_id = Column(db.Integer, db.ForeignKey("query_results.id"), nullable=True)
latest_query_data = db.relationship(QueryResult)
+ query_results = db.relationship("QueryResult", secondary="query_resultsets")
name = Column(db.String(255))
description = Column(db.String(4096), nullable=True)
query_text = Column("query", db.Text)
@@ -420,6 +424,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
is_draft = Column(db.Boolean, default=True, index=True)
schedule = Column(MutableDict.as_mutable(PseudoJSON), nullable=True)
schedule_failures = Column(db.Integer, default=0)
+ schedule_resultset_size = Column(db.Integer, nullable=True)
visualizations = db.relationship("Visualization", cascade="all, delete-orphan")
options = Column(MutableDict.as_mutable(PseudoJSON), default={})
search_vector = Column(TSVectorType('id', 'name', 'description', 'query',
@@ -600,6 +605,37 @@ def search(cls, term, group_ids, user_id=None, include_drafts=False,
# sort the result using the weight as defined in the search vector column
return all_queries.search(term, sort=True).limit(limit)
+ @classmethod
+ def delete_stale_resultsets(cls):
+ delete_count = 0
+ texts = [c[0] for c in db.session.query(Query.query_text)
+ .filter(Query.schedule_resultset_size != None).distinct()]
+ for text in texts:
+ queries = (Query.query.filter(Query.query_text == text,
+ Query.schedule_resultset_size != None)
+ .order_by(Query.schedule_resultset_size.desc()))
+ # Multiple queries with the same text may request multiple result sets
+ # be kept. We start with the one that keeps the most, and delete both
+ # the unneeded bridge rows and result sets.
+ first_query = queries.first()
+ if first_query is not None and first_query.schedule_resultset_size:
+ resultsets = QueryResultSet.query.filter(QueryResultSet.query_rel == first_query).order_by(QueryResultSet.result_id)
+ resultset_count = resultsets.count()
+ if resultset_count > first_query.schedule_resultset_size:
+ n_to_delete = resultset_count - first_query.schedule_resultset_size
+ r_ids = [r.result_id for r in resultsets][:n_to_delete]
+ QueryResultSet.query.filter(QueryResultSet.result_id.in_(r_ids)).delete(synchronize_session=False)
+ delete_count += QueryResult.query.filter(QueryResult.id.in_(r_ids)).delete(synchronize_session=False)
+ # By this point there are no stale result sets left.
+ # Delete unneeded bridge rows for the remaining queries.
+ for q in queries[1:]:
+ resultsets = db.session.query(QueryResultSet.result_id).filter(QueryResultSet.query_rel == q).order_by(QueryResultSet.result_id)
+ n_to_delete = resultsets.count() - q.schedule_resultset_size
+ if n_to_delete > 0:
+ stale_r = QueryResultSet.query.filter(QueryResultSet.result_id.in_(resultsets.limit(n_to_delete).subquery()))
+ stale_r.delete(synchronize_session=False)
+ return delete_count
+
@classmethod
def search_by_user(cls, term, user, limit=None):
return cls.by_user(user).search(term, sort=True).limit(limit)
@@ -693,6 +729,16 @@ def parameterized(self):
return ParameterizedQuery(self.query_text, self.parameters)
+class QueryResultSet(db.Model):
+ query_id = Column(db.Integer, db.ForeignKey("queries.id"),
+ primary_key=True)
+ query_rel = db.relationship(Query)
+ result_id = Column(db.Integer, db.ForeignKey("query_results.id"),
+ primary_key=True)
+ result = db.relationship(QueryResult)
+ __tablename__ = 'query_resultsets'
+
+
@listens_for(Query.query_text, 'set')
def gen_query_hash(target, val, oldval, initiator):
target.query_hash = utils.gen_query_hash(val)
diff --git a/redash/serializers.py b/redash/serializers.py
index 81e1df38d2..82146634a4 100644
--- a/redash/serializers.py
+++ b/redash/serializers.py
@@ -92,6 +92,7 @@ def serialize_query(query, with_stats=False, with_visualizations=False, with_use
'query': query.query_text,
'query_hash': query.query_hash,
'schedule': query.schedule,
+ 'schedule_resultset_size': query.schedule_resultset_size,
'api_key': query.api_key,
'is_archived': query.is_archived,
'is_draft': query.is_draft,
diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py
index 6002ccd275..c666d5eeef 100644
--- a/redash/tasks/queries.py
+++ b/redash/tasks/queries.py
@@ -224,6 +224,7 @@ def cleanup_query_results():
deleted_count = models.QueryResult.query.filter(
models.QueryResult.id.in_(unused_query_results.subquery())
).delete(synchronize_session=False)
+ deleted_count += models.Query.delete_stale_resultsets()
models.db.session.commit()
logger.info("Deleted %d unused query results.", deleted_count)
diff --git a/tests/factories.py b/tests/factories.py
index 2c82e186da..06feb3480c 100644
--- a/tests/factories.py
+++ b/tests/factories.py
@@ -111,7 +111,9 @@ def __call__(self):
query_hash=gen_query_hash('SELECT 1'),
data_source=data_source_factory.create,
org_id=1)
-
+query_resultset_factory = ModelFactory(redash.models.QueryResultSet,
+ query_rel=query_factory.create,
+ result=query_result_factory.create)
visualization_factory = ModelFactory(redash.models.Visualization,
type='CHART',
query_rel=query_factory.create,
@@ -297,6 +299,9 @@ def create_query_result(self, **kwargs):
return query_result_factory.create(**args)
+ def create_query_resultset(self, **kwargs):
+ return query_resultset_factory.create(**kwargs)
+
def create_visualization(self, **kwargs):
args = {
'query_rel': self.create_query()
diff --git a/tests/handlers/test_queries.py b/tests/handlers/test_queries.py
index 1c2485db2f..448c37c940 100644
--- a/tests/handlers/test_queries.py
+++ b/tests/handlers/test_queries.py
@@ -1,3 +1,5 @@
+import json
+
from tests import BaseTestCase
from redash import models
from redash.models import db
@@ -432,3 +434,81 @@ def test_get(self):
rv2 = self.make_request('get', '/api/changes/' + str(ch2.id))
self.assertEqual(rv2.status_code, 200)
self.assertEqual(rv2.json['change']['name']['current'], 'version B')
+
+
+class AggregateResultsTests(BaseTestCase):
+ def test_aggregate(self):
+ qtxt = "SELECT x FROM mytable;"
+ q = self.factory.create_query(query_text=qtxt, schedule_resultset_size=3)
+ qr0 = self.factory.create_query_result(
+ query_text=qtxt,
+ data=json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'eve', 'color': 'grue'},
+ {'name': 'mallory', 'color': 'bleen'}]}))
+ qr1 = self.factory.create_query_result(
+ query_text=qtxt,
+ data=json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'bob', 'color': 'green'},
+ {'name': 'fred', 'color': 'blue'}]}))
+ qr2 = self.factory.create_query_result(
+ query_text=qtxt,
+ data=json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'alice', 'color': 'red'},
+ {'name': 'eddie', 'color': 'orange'}]}))
+ qr3 = self.factory.create_query_result(
+ query_text=qtxt,
+ data=json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'dave', 'color': 'yellow'},
+ {'name': 'carol', 'color': 'taupe'}]}))
+ for qr in (qr0, qr1, qr2, qr3):
+ self.factory.create_query_resultset(query_rel=q, result=qr)
+ rv = self.make_request('get', '/api/queries/{}/resultset'.format(q.id))
+ self.assertEqual(rv.status_code, 200)
+ self.assertEqual(rv.json['query_result']['data'],
+ {'columns': ['name', 'color'],
+ 'rows': [
+ {'name': 'bob', 'color': 'green'},
+ {'name': 'fred', 'color': 'blue'},
+ {'name': 'alice', 'color': 'red'},
+ {'name': 'eddie', 'color': 'orange'},
+ {'name': 'dave', 'color': 'yellow'},
+ {'name': 'carol', 'color': 'taupe'}
+ ]})
+
+ def test_underfilled_aggregate(self):
+ qtxt = "SELECT x FROM mytable;"
+ q = self.factory.create_query(query_text=qtxt,
+ schedule_resultset_size=3)
+ qr1 = self.factory.create_query_result(
+ query_text=qtxt,
+ data=json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'bob', 'color': 'green'},
+ {'name': 'fred', 'color': 'blue'}]}))
+ qr2 = self.factory.create_query_result(
+ query_text=qtxt,
+ data=json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'alice', 'color': 'red'},
+ {'name': 'eddie', 'color': 'orange'}]}))
+ for qr in (qr1, qr2):
+ self.factory.create_query_resultset(query_rel=q, result=qr)
+ rv = self.make_request('get', '/api/queries/{}/resultset'.format(q.id))
+ self.assertEqual(rv.status_code, 200)
+ self.assertEqual(rv.json['query_result']['data'],
+ {'columns': ['name', 'color'],
+ 'rows': [
+ {'name': 'bob', 'color': 'green'},
+ {'name': 'fred', 'color': 'blue'},
+ {'name': 'alice', 'color': 'red'},
+ {'name': 'eddie', 'color': 'orange'}
+ ]})
+
+ def test_no_aggregate(self):
+ qtxt = "SELECT x FROM mytable;"
+ q = self.factory.create_query(query_text=qtxt)
+ self.factory.create_query_result(
+ query_text=qtxt,
+ data=json.dumps({'columns': ['name', 'color'],
+ 'rows': [{'name': 'eve', 'color': 'grue'},
+ {'name': 'mallory', 'color': 'bleen'}]}))
+ rv = self.make_request('get', '/api/queries/{}/resultset'.format(q.id))
+ self.assertEqual(rv.status_code, 404)
diff --git a/tests/test_models.py b/tests/test_models.py
index 465d6bf65e..357cc245e5 100644
--- a/tests/test_models.py
+++ b/tests/test_models.py
@@ -336,7 +336,8 @@ def test_deletes_alerts(self):
class TestUnusedQueryResults(BaseTestCase):
def test_returns_only_unused_query_results(self):
two_weeks_ago = utcnow() - datetime.timedelta(days=14)
- qr = self.factory.create_query_result()
+ qt = "SELECT 1"
+ qr = self.factory.create_query_result(query_text=qt, retrieved_at=two_weeks_ago)
self.factory.create_query(latest_query_data=qr)
db.session.flush()
unused_qr = self.factory.create_query_result(retrieved_at=two_weeks_ago)
@@ -345,13 +346,65 @@ def test_returns_only_unused_query_results(self):
def test_returns_only_over_a_week_old_results(self):
two_weeks_ago = utcnow() - datetime.timedelta(days=14)
- unused_qr = self.factory.create_query_result(retrieved_at=two_weeks_ago)
+ qt = "SELECT 1"
+ unused_qr = self.factory.create_query_result(query_text=qt, retrieved_at=two_weeks_ago)
db.session.flush()
- new_unused_qr = self.factory.create_query_result()
+ new_unused_qr = self.factory.create_query_result(query_text=qt)
self.assertIn(unused_qr, list(models.QueryResult.unused()))
self.assertNotIn(new_unused_qr, list(models.QueryResult.unused()))
+ def test_doesnt_return_live_incremental_results(self):
+ two_weeks_ago = utcnow() - datetime.timedelta(days=14)
+ qt = "SELECT 1"
+ qrs = [self.factory.create_query_result(query_text=qt, retrieved_at=two_weeks_ago)
+ for _ in range(5)]
+ q = self.factory.create_query(query_text=qt, latest_query_data=qrs[0],
+ schedule_resultset_size=3)
+ for qr in qrs:
+ self.factory.create_query_resultset(query_rel=q, result=qr)
+ db.session.flush()
+ self.assertEqual([], list(models.QueryResult.unused()))
+
+ def test_deletes_stale_resultsets(self):
+ qt = "SELECT 17"
+ query = self.factory.create_query(query_text=qt,
+ schedule_resultset_size=5)
+ for _ in range(10):
+ r = self.factory.create_query_result(query_text=qt)
+ self.factory.create_query_resultset(query_rel=query, result=r)
+ qt2 = "SELECT 100"
+ query2 = self.factory.create_query(query_text=qt2, schedule_resultset_size=5)
+ for _ in range(10):
+ r = self.factory.create_query_result(query_text=qt2)
+ self.factory.create_query_resultset(query_rel=query2, result=r)
+ db.session.flush()
+ self.assertEqual(models.QueryResultSet.query.count(), 20)
+ self.assertEqual(models.Query.delete_stale_resultsets(), 10)
+ self.assertEqual(models.QueryResultSet.query.count(), 10)
+
+ def test_deletes_stale_resultsets_with_dupe_queries(self):
+ qt = "SELECT 17"
+ query = self.factory.create_query(query_text=qt,
+ schedule_resultset_size=5)
+ for _ in range(10):
+ r = self.factory.create_query_result(query_text=qt)
+ self.factory.create_query_resultset(query_rel=query, result=r)
+ query2 = self.factory.create_query(query_text=qt,
+ schedule_resultset_size=3)
+ for _ in range(10):
+ self.factory.create_query_result(query_text=qt)
+ self.factory.create_query_resultset(query_rel=query2)
+ qt2 = "SELECT 100"
+ query3 = self.factory.create_query(query_text=qt2, schedule_resultset_size=5)
+ for _ in range(10):
+ r = self.factory.create_query_result(query_text=qt2)
+ self.factory.create_query_resultset(query_rel=query3, result=r)
+ db.session.flush()
+ self.assertEqual(models.QueryResultSet.query.count(), 30)
+ self.assertEqual(models.Query.delete_stale_resultsets(), 10)
+ self.assertEqual(models.QueryResultSet.query.count(), 13)
+
class TestQueryAll(BaseTestCase):
def test_returns_only_queries_in_given_groups(self):