Skip to content

Commit

Permalink
Aggregate query results (re #35)
Browse files Browse the repository at this point in the history
  • Loading branch information
Allen Short committed Feb 28, 2018
1 parent ebc3d9f commit 9164ce4
Show file tree
Hide file tree
Showing 13 changed files with 256 additions and 15 deletions.
3 changes: 3 additions & 0 deletions client/app/components/queries/schedule-dialog.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ <h4 class="modal-title">Refresh Schedule</h4>
Stop scheduling at date/time (format yyyy-MM-ddTHH:mm:ss, like 2016-12-28T14:57:00):
<schedule-until query="$ctrl.query" save-query="$ctrl.saveQuery"></schedule-until>
</label>
<label>
Number of result sets to keep <schedule-keep-results query="$ctrl.query" save-query="$ctrl.saveQuery"></schedule-keep-results>
</label>
</div>
14 changes: 13 additions & 1 deletion client/app/components/queries/schedule-dialog.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,22 @@ function scheduleUntil() {
};
}

function scheduleKeepResults() {
return {
restrict: 'E',
scope: {
query: '=',
saveQuery: '=',
},
template: '<input type="number" class="form-control" ng-model="query.schedule_keep_results" ng-change="saveQuery()">',
};
}

const ScheduleForm = {
controller() {
this.query = this.resolve.query;
this.saveQuery = this.resolve.saveQuery;

this.isIncremental = false;
if (this.query.hasDailySchedule()) {
this.refreshType = 'daily';
} else {
Expand All @@ -158,5 +169,6 @@ export default function init(ngModule) {
ngModule.directive('queryTimePicker', queryTimePicker);
ngModule.directive('queryRefreshSelect', queryRefreshSelect);
ngModule.directive('scheduleUntil', scheduleUntil);
ngModule.directive('scheduleKeepResults', scheduleKeepResults);
ngModule.component('scheduleDialog', ScheduleForm);
}
1 change: 1 addition & 0 deletions client/app/pages/queries/view.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ function QueryViewCtrl(
} else {
request = pick($scope.query, [
'schedule',
'schedule_keep_results',
'query',
'id',
'description',
Expand Down
10 changes: 10 additions & 0 deletions client/app/services/query-result.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ function addPointToSeries(point, seriesCollection, seriesName) {

function QueryResultService($resource, $timeout, $q) {
const QueryResultResource = $resource('api/query_results/:id', { id: '@id' }, { post: { method: 'POST' } });
const QueryAggregateResultResource = $resource('api/queries/:id/aggregate_results', { id: '@id' });
const Job = $resource('api/jobs/:id', { id: '@id' });
const statuses = {
1: 'waiting',
Expand Down Expand Up @@ -421,6 +422,15 @@ function QueryResultService($resource, $timeout, $q) {
return queryResult;
}

static getAggregate(queryId) {
const queryResult = new QueryResult();

QueryAggregateResultResource.get({ id: queryId }, (response) => {
queryResult.update(response);
});

return queryResult;
}
loadResult(tryCount) {
QueryResultResource.get(
{ id: this.job.query_result_id },
Expand Down
6 changes: 5 additions & 1 deletion client/app/services/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ function QueryResource($resource, $http, $q, $location, currentUser, QueryResult
this.latest_query_data_id = null;
}

if (this.latest_query_data && maxAge !== 0) {
if (this.schedule_keep_results) {
if (!this.queryResult) {
this.queryResult = QueryResult.getAggregate(this.id);
}
} else if (this.latest_query_data && maxAge !== 0) {
if (!this.queryResult) {
this.queryResult = new QueryResult({
query_result: this.latest_query_data,
Expand Down
32 changes: 32 additions & 0 deletions migrations/versions/2a2b3b58464e_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""incremental query results aggregation
Revision ID: 2a2b3b58464e
Revises: 15041b7085fe
Create Date: 2018-02-16 19:28:38.931253
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '2a2b3b58464e'
down_revision = '15041b7085fe'
branch_labels = None
depends_on = None


def upgrade():
op.create_table('query_resultsets',
sa.Column('query_id', sa.Integer(), nullable=False),
sa.Column('result_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['query_id'], ['queries.id'], ),
sa.ForeignKeyConstraint(['result_id'], ['query_results.id'], ),
sa.PrimaryKeyConstraint('query_id', 'result_id')
)
op.add_column(u'queries', sa.Column('schedule_keep_results', sa.Integer(), nullable=True))


def downgrade():
op.drop_column(u'queries', 'schedule_keep_results')
op.drop_table('query_resultsets')
3 changes: 2 additions & 1 deletion redash/handlers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource, DataSourcePauseResource, DataSourceTestResource, DataSourceVersionResource
from redash.handlers.events import EventsResource
from redash.handlers.queries import QueryForkResource, QueryRefreshResource, QueryListResource, QueryRecentResource, QuerySearchResource, QueryResource, MyQueriesResource, QueryVersionListResource, ChangeResource
from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource
from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource, QueryResultSetResource
from redash.handlers.users import UserResource, UserListResource, UserInviteResource, UserResetPasswordResource
from redash.handlers.visualizations import VisualizationListResource
from redash.handlers.visualizations import VisualizationResource
Expand Down Expand Up @@ -76,6 +76,7 @@ def json_representation(data, code, headers=None):
api.add_org_resource(QueryRefreshResource, '/api/queries/<query_id>/refresh', endpoint='query_refresh')
api.add_org_resource(QueryResource, '/api/queries/<query_id>', endpoint='query')
api.add_org_resource(QueryForkResource, '/api/queries/<query_id>/fork', endpoint='query_fork')
api.add_org_resource(QueryResultSetResource, '/api/queries/<query_id>/aggregate_results', endpoint='query_aggregate_results')
api.add_org_resource(QueryVersionListResource, '/api/queries/<query_id>/version', endpoint='query_versions')
api.add_org_resource(ChangeResource, '/api/changes/<change_id>', endpoint='changes')

Expand Down
27 changes: 26 additions & 1 deletion redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,31 @@ def post(self):
ONE_YEAR = 60 * 60 * 24 * 365.25


class QueryResultSetResource(BaseResource):
@require_permission('view_query')
def get(self, query_id=None, filetype='json'):
query = get_object_or_404(models.Query.get_by_id_and_org, query_id, self.current_org)
if not query.schedule_keep_results:
abort(404, message="query does not keep multiple results")

# Synthesize a result set from the last N results.
total = len(query.query_results)
offset = max(total - query.schedule_keep_results, 0)
results = [qr.to_dict() for qr in query.query_results[offset:offset + total]]
if not results:
aggregate_result = {}
else:
aggregate_result = results[0].copy()
aggregate_result['data'] = {'columns': results[0]['data']['columns'],
'rows': []}
for r in results:
aggregate_result['data']['rows'].extend(r['data']['rows'])

data = json.dumps({'query_result': aggregate_result}, cls=utils.JSONEncoder)
headers = {'Content-Type': "application/json"}
return make_response(data, 200, headers)


class QueryResultResource(BaseResource):
@staticmethod
def add_cors_headers(headers):
Expand Down Expand Up @@ -194,7 +219,7 @@ def get(self, query_id=None, query_result_id=None, filetype='json'):
query_result = run_query_sync(query.data_source, parameter_values, query.to_dict()['query'], max_age=max_age)
elif query.latest_query_data_id is not None:
query_result = get_object_or_404(models.QueryResult.get_by_id_and_org, query.latest_query_data_id, self.current_org)

if query is not None and query_result is not None and self.current_user.is_api_user():
if query.query_hash != query_result.query_hash:
abort(404, message='No cached result found for this query.')
Expand Down
48 changes: 44 additions & 4 deletions redash/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from redash.utils.comparators import CaseInsensitiveComparator
from redash.utils.configuration import ConfigurationContainer
from redash.settings.organization import settings as org_settings
from sqlalchemy import distinct, or_
from sqlalchemy import distinct, exists, or_
from sqlalchemy.dialects import postgresql
from sqlalchemy.event import listens_for
from sqlalchemy.ext.mutable import Mutable
Expand Down Expand Up @@ -728,9 +728,9 @@ def to_dict(self):
def unused(cls, days=7):
age_threshold = datetime.datetime.now() - datetime.timedelta(days=days)

unused_results = (db.session.query(QueryResult.id).filter(
Query.id == None, QueryResult.retrieved_at < age_threshold)
.outerjoin(Query))
unused_results = db.session.query(QueryResult.id).filter(
QueryResult.retrieved_at < age_threshold,
~QueryResultSet.query.filter(QueryResultSet.result_id == QueryResult.id).exists())

return unused_results

Expand Down Expand Up @@ -769,9 +769,13 @@ def store_result(cls, org, data_source, query_hash, query, data, run_time, retri
queries = db.session.query(Query).filter(
Query.query_hash == query_hash,
Query.data_source == data_source)

for q in queries:
q.latest_query_data = query_result
db.session.add(q)
if q.schedule_keep_results > 0:

q.query_results.append(query_result)
query_ids = [q.id for q in queries]
logging.info("Updated %s queries with result (%s).", len(query_ids), query_hash)

Expand Down Expand Up @@ -851,6 +855,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
data_source = db.relationship(DataSource, backref='queries')
latest_query_data_id = Column(db.Integer, db.ForeignKey("query_results.id"), nullable=True)
latest_query_data = db.relationship(QueryResult)
query_results = db.relationship("QueryResult", secondary="query_resultsets")
name = Column(db.String(255))
description = Column(db.String(4096), nullable=True)
query_text = Column("query", db.Text)
Expand All @@ -866,6 +871,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
schedule = Column(db.String(10), nullable=True)
schedule_failures = Column(db.Integer, default=0)
schedule_until = Column(db.DateTime(True), nullable=True)
schedule_keep_results = Column(db.Integer, nullable=True)
visualizations = db.relationship("Visualization", cascade="all, delete-orphan")
options = Column(MutableDict.as_mutable(PseudoJSON), default={})
search_vector = Column(TSVectorType('id', 'name', 'description', 'query',
Expand All @@ -892,6 +898,7 @@ def to_dict(self, with_stats=False, with_visualizations=False, with_user=True, w
'query_hash': self.query_hash,
'schedule': self.schedule,
'schedule_until': self.schedule_until,
'schedule_keep_results': self.schedule_keep_results,
'api_key': self.api_key,
'is_archived': self.is_archived,
'is_draft': self.is_draft,
Expand Down Expand Up @@ -1000,6 +1007,29 @@ def outdated_queries(cls):

return outdated_queries.values()

@classmethod
def delete_stale_resultsets(cls):
delete_count = 0
queries = Query.query.filter(Query.schedule_keep_results != None).order_by(Query.schedule_keep_results.desc())
if queries.first() and queries[0].schedule_keep_results:
resultsets = QueryResultSet.query.filter(QueryResultSet.query_rel == queries[0]).order_by(QueryResultSet.result_id)
c = resultsets.count()
if c > queries[0].schedule_keep_results:
n_to_delete = c - queries[0].schedule_keep_results
r_ids = [r.result_id for r in resultsets][:n_to_delete]
delete_count = QueryResultSet.query.filter(QueryResultSet.result_id.in_(r_ids)).delete(synchronize_session=False)
print "one", delete_count
QueryResult.query.filter(QueryResult.id.in_(r_ids)).delete(synchronize_session=False)
for q in queries[1:]:
resultsets = db.session.query(QueryResultSet.result_id).filter(QueryResultSet.query_rel == q).order_by(QueryResultSet.result_id)
n_to_delete = resultsets.count() - q.schedule_keep_results
if n_to_delete > 0:
stale_r = QueryResultSet.query.filter(QueryResultSet.result_id.in_(resultsets.limit(n_to_delete).subquery()))
n = stale_r.delete(synchronize_session=False)
print "n", n
delete_count += n
return delete_count

@classmethod
def search(cls, term, group_ids, include_drafts=False, limit=20):
where = cls.is_archived == False
Expand Down Expand Up @@ -1089,6 +1119,16 @@ def __repr__(self):
return '<Query %s: "%s">' % (self.id, self.name or 'untitled')


class QueryResultSet(db.Model):
query_id = Column(db.Integer, db.ForeignKey("queries.id"),
primary_key=True)
query_rel = db.relationship(Query)
result_id = Column(db.Integer, db.ForeignKey("query_results.id"),
primary_key=True)
result = db.relationship(QueryResult)
__tablename__ = 'query_resultsets'


@vectorizer(db.Integer)
def integer_vectorizer(column):
return db.func.cast(column, db.Text)
Expand Down
1 change: 1 addition & 0 deletions redash/tasks/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ def cleanup_query_results():
deleted_count = models.QueryResult.query.filter(
models.QueryResult.id.in_(unused_query_results.subquery())
).delete(synchronize_session=False)
deleted_count += models.Query.delete_stale_resultsets()
models.db.session.commit()
logger.info("Deleted %d unused query results.", deleted_count)

Expand Down
7 changes: 6 additions & 1 deletion tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ def __call__(self):
query_hash=gen_query_hash('SELECT 1'),
data_source=data_source_factory.create,
org_id=1)

query_resultset_factory = ModelFactory(redash.models.QueryResultSet,
query_rel=query_factory.create,
result=query_result_factory.create)
visualization_factory = ModelFactory(redash.models.Visualization,
type='CHART',
query_rel=query_factory.create,
Expand Down Expand Up @@ -295,6 +297,9 @@ def create_query_result(self, **kwargs):

return query_result_factory.create(**args)

def create_query_resultset(self, **kwargs):
return query_resultset_factory.create(**kwargs)

def create_visualization(self, **kwargs):
args = {
'query_rel': self.create_query()
Expand Down
79 changes: 79 additions & 0 deletions tests/handlers/test_queries.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json

from tests import BaseTestCase
from redash import models
from redash.models import db
Expand Down Expand Up @@ -226,3 +228,80 @@ def test_get(self):
rv2 = self.make_request('get', '/api/changes/' + str(ch2.id))
self.assertEqual(rv2.status_code, 200)
self.assertEqual(rv2.json['change']['name']['current'], 'version B')


class AggregateResultsTests(BaseTestCase):
def test_aggregate(self):
qtxt = "SELECT x FROM mytable;"
q = self.factory.create_query(query_text=qtxt, schedule_keep_results=3)
qr0 = self.factory.create_query_result(
query_text=qtxt,
data = json.dumps({'columns': ['name', 'color'],
'rows': [{'name': 'eve', 'color': 'grue'},
{'name': 'mallory', 'color': 'bleen'}]}))
qr1 = self.factory.create_query_result(
query_text=qtxt,
data = json.dumps({'columns': ['name', 'color'],
'rows': [{'name': 'bob', 'color': 'green'},
{'name': 'fred', 'color': 'blue'}]}))
qr2 = self.factory.create_query_result(
query_text=qtxt,
data = json.dumps({'columns': ['name', 'color'],
'rows': [{'name': 'alice', 'color': 'red'},
{'name': 'eddie', 'color': 'orange'}]}))
qr3 = self.factory.create_query_result(
query_text=qtxt,
data = json.dumps({'columns': ['name', 'color'],
'rows': [{'name': 'dave', 'color': 'yellow'},
{'name': 'carol', 'color': 'taupe'}]}))
for qr in (qr0, qr1, qr2, qr3):
self.factory.create_query_resultset(query_rel=q, result=qr)
rv = self.make_request('get', '/api/queries/{}/aggregate_results'.format(q.id))
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.json['query_result']['data'],
{'columns': ['name', 'color'],
'rows': [
{'name': 'bob', 'color': 'green'},
{'name': 'fred', 'color': 'blue'},
{'name': 'alice', 'color': 'red'},
{'name': 'eddie', 'color': 'orange'},
{'name': 'dave', 'color': 'yellow'},
{'name': 'carol', 'color': 'taupe'}
]})

def test_underfilled_aggregate(self):
qtxt = "SELECT x FROM mytable;"
q = self.factory.create_query(query_text=qtxt, schedule_keep_results=3)
qr1 = self.factory.create_query_result(
query_text=qtxt,
data = json.dumps({'columns': ['name', 'color'],
'rows': [{'name': 'bob', 'color': 'green'},
{'name': 'fred', 'color': 'blue'}]}))
qr2 = self.factory.create_query_result(
query_text=qtxt,
data = json.dumps({'columns': ['name', 'color'],
'rows': [{'name': 'alice', 'color': 'red'},
{'name': 'eddie', 'color': 'orange'}]}))
for qr in (qr1, qr2):
self.factory.create_query_resultset(query_rel=q, result=qr)
rv = self.make_request('get', '/api/queries/{}/aggregate_results'.format(q.id))
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.json['query_result']['data'],
{'columns': ['name', 'color'],
'rows': [
{'name': 'bob', 'color': 'green'},
{'name': 'fred', 'color': 'blue'},
{'name': 'alice', 'color': 'red'},
{'name': 'eddie', 'color': 'orange'}
]})

def test_no_aggregate(self):
qtxt = "SELECT x FROM mytable;"
q = self.factory.create_query(query_text=qtxt)
qr0 = self.factory.create_query_result(
query_text=qtxt,
data = json.dumps({'columns': ['name', 'color'],
'rows': [{'name': 'eve', 'color': 'grue'},
{'name': 'mallory', 'color': 'bleen'}]}))
rv = self.make_request('get', '/api/queries/{}/aggregate_results'.format(q.id))
self.assertEqual(rv.status_code, 404)
Loading

0 comments on commit 9164ce4

Please sign in to comment.