Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate query results (re #35) #339

Merged
merged 7 commits into from
Mar 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions client/app/components/queries/schedule-dialog.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ <h4 class="modal-title">Refresh Schedule</h4>
Stop scheduling at date/time (format yyyy-MM-ddTHH:mm:ss, like 2016-12-28T14:57:00):
<schedule-until query="$ctrl.query" save-query="$ctrl.saveQuery"></schedule-until>
</label>
<label>
Number of query results to keep <schedule-keep-results query="$ctrl.query" save-query="$ctrl.saveQuery"></schedule-keep-results>
</label>
</div>
13 changes: 12 additions & 1 deletion client/app/components/queries/schedule-dialog.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,21 @@ function scheduleUntil() {
};
}

function scheduleKeepResults() {
return {
restrict: 'E',
scope: {
query: '=',
saveQuery: '=',
},
template: '<input type="number" class="form-control" ng-model="query.schedule_resultset_size" ng-change="saveQuery()" ng-disabled="!query.schedule">',
};
}

const ScheduleForm = {
controller() {
this.query = this.resolve.query;
this.saveQuery = this.resolve.saveQuery;

if (this.query.hasDailySchedule()) {
this.refreshType = 'daily';
} else {
Expand All @@ -124,5 +134,6 @@ export default function init(ngModule) {
ngModule.directive('queryTimePicker', queryTimePicker);
ngModule.directive('queryRefreshSelect', queryRefreshSelect);
ngModule.directive('scheduleUntil', scheduleUntil);
ngModule.directive('scheduleKeepResults', scheduleKeepResults);
ngModule.component('scheduleDialog', ScheduleForm);
}
1 change: 0 additions & 1 deletion client/app/pages/alerts-list/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const stateClass = {

class AlertsListCtrl {
constructor(Alert) {

this.showEmptyState = false;
this.showList = false;

Expand Down
1 change: 1 addition & 0 deletions client/app/pages/queries/view.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ function QueryViewCtrl(
} else {
request = pick($scope.query, [
'schedule',
'schedule_resultset_size',
'query',
'id',
'description',
Expand Down
10 changes: 10 additions & 0 deletions client/app/services/query-result.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ function addPointToSeries(point, seriesCollection, seriesName) {

function QueryResultService($resource, $timeout, $q) {
const QueryResultResource = $resource('api/query_results/:id', { id: '@id' }, { post: { method: 'POST' } });
const QueryResultSetResource = $resource('api/queries/:id/resultset', { id: '@id' });
const Job = $resource('api/jobs/:id', { id: '@id' });
const statuses = {
1: 'waiting',
Expand Down Expand Up @@ -421,6 +422,15 @@ function QueryResultService($resource, $timeout, $q) {
return queryResult;
}

static getResultSet(queryId) {
const queryResult = new QueryResult();

QueryResultSetResource.get({ id: queryId }, (response) => {
queryResult.update(response);
});

return queryResult;
}
loadResult(tryCount) {
QueryResultResource.get(
{ id: this.job.query_result_id },
Expand Down
6 changes: 5 additions & 1 deletion client/app/services/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ function QueryResource($resource, $http, $q, $location, currentUser, QueryResult
this.latest_query_data_id = null;
}

if (this.latest_query_data && maxAge !== 0) {
if (this.schedule_resultset_size) {
if (!this.queryResult) {
this.queryResult = QueryResult.getResultSet(this.id);
}
} else if (this.latest_query_data && maxAge !== 0) {
if (!this.queryResult) {
this.queryResult = new QueryResult({
query_result: this.latest_query_data,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
"""incremental query results aggregation
"""Incremental query results aggregation

Revision ID: 2a2b3b58464e
Revision ID: 9d7678c47452
Revises: 15041b7085fe
Create Date: 2018-02-16 19:28:38.931253
Create Date: 2018-03-08 04:36:12.802199

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '2a2b3b58464e'
revision = '9d7678c47452'
down_revision = '15041b7085fe'
branch_labels = None
depends_on = None
Expand All @@ -24,9 +24,11 @@ def upgrade():
sa.ForeignKeyConstraint(['result_id'], ['query_results.id'], ),
sa.PrimaryKeyConstraint('query_id', 'result_id')
)
op.add_column(u'queries', sa.Column('schedule_keep_results', sa.Integer(), nullable=True))

op.add_column(u'queries', sa.Column('schedule_resultset_size', sa.Integer(), nullable=True))
1

def downgrade():
op.drop_column(u'queries', 'schedule_keep_results')
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was schedule_keep_results an unused field before? I don't see any other references to it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This migration went into master before it should have -- the field was misnamed. We're going to roll back the misnamed field on stage before this PR gets merged.

# ### commands auto generated by Alembic - please adjust! ###
op.drop_column(u'queries', 'schedule_resultset_size')
op.drop_table('query_resultsets')
# ### end Alembic commands ###
3 changes: 2 additions & 1 deletion redash/handlers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource, DataSourcePauseResource, DataSourceTestResource, DataSourceVersionResource
from redash.handlers.events import EventsResource
from redash.handlers.queries import QueryForkResource, QueryRefreshResource, QueryListResource, QueryRecentResource, QuerySearchResource, QueryResource, MyQueriesResource, QueryVersionListResource, ChangeResource
from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource
from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource, QueryResultSetResource
from redash.handlers.users import UserResource, UserListResource, UserInviteResource, UserResetPasswordResource
from redash.handlers.visualizations import VisualizationListResource
from redash.handlers.visualizations import VisualizationResource
Expand Down Expand Up @@ -76,6 +76,7 @@ def json_representation(data, code, headers=None):
api.add_org_resource(QueryRefreshResource, '/api/queries/<query_id>/refresh', endpoint='query_refresh')
api.add_org_resource(QueryResource, '/api/queries/<query_id>', endpoint='query')
api.add_org_resource(QueryForkResource, '/api/queries/<query_id>/fork', endpoint='query_fork')
api.add_org_resource(QueryResultSetResource, '/api/queries/<query_id>/resultset', endpoint='query_aggregate_results')
api.add_org_resource(QueryVersionListResource, '/api/queries/<query_id>/version', endpoint='query_versions')
api.add_org_resource(ChangeResource, '/api/changes/<change_id>', endpoint='changes')

Expand Down
3 changes: 3 additions & 0 deletions redash/handlers/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def post(self):
:<json string description:
:<json string schedule: Schedule interval, in seconds, for repeated execution of this query
:<json string schedule_until: Time in ISO format to stop scheduling this query (may be null to run indefinitely)
:<json number schedule_resultset_size: Number of result sets to keep (null to keep only one)
:<json object options: Query options

.. _query-response-label:
Expand Down Expand Up @@ -134,6 +135,8 @@ def post(self):
query_def['data_source'] = data_source
query_def['org'] = self.current_org
query_def['is_draft'] = True
if query_def.get('schedule_resultset_size') == 1:
query_def['schedule_resultset_size'] = None
query = models.Query.create(**query_def)
query.record_changes(changed_by=self.current_user)
models.db.session.add(query)
Expand Down
29 changes: 28 additions & 1 deletion redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,33 @@ def post(self):
ONE_YEAR = 60 * 60 * 24 * 365.25


class QueryResultSetResource(BaseResource):
@require_permission('view_query')
def get(self, query_id=None, filetype='json'):
query = get_object_or_404(models.Query.get_by_id_and_org, query_id, self.current_org)
if not query.schedule_resultset_size:
abort(404, message="query does not keep multiple results")

# Synthesize a result set from the last N results.
total = len(query.query_results)
offset = max(total - query.schedule_resultset_size, 0)
results = [qr.to_dict() for qr in query.query_results[offset:]]
if not results:
aggregate_result = {}
else:
# Start a synthetic data set with the data from the first result...
aggregate_result = results[0].copy()
aggregate_result['data'] = {'columns': results[0]['data']['columns'],
'rows': []}
# .. then add each subsequent result set into it.
for r in results:
aggregate_result['data']['rows'].extend(r['data']['rows'])

data = json.dumps({'query_result': aggregate_result}, cls=utils.JSONEncoder)
headers = {'Content-Type': "application/json"}
return make_response(data, 200, headers)


class QueryResultResource(BaseResource):
@staticmethod
def add_cors_headers(headers):
Expand Down Expand Up @@ -194,7 +221,7 @@ def get(self, query_id=None, query_result_id=None, filetype='json'):
query_result = run_query_sync(query.data_source, parameter_values, query.to_dict()['query'], max_age=max_age)
elif query.latest_query_data_id is not None:
query_result = get_object_or_404(models.QueryResult.get_by_id_and_org, query.latest_query_data_id, self.current_org)

if query is not None and query_result is not None and self.current_user.is_api_user():
if query.query_hash != query_result.query_hash:
abort(404, message='No cached result found for this query.')
Expand Down
55 changes: 51 additions & 4 deletions redash/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from redash.utils.comparators import CaseInsensitiveComparator
from redash.utils.configuration import ConfigurationContainer
from redash.settings.organization import settings as org_settings
from sqlalchemy import distinct, or_
from sqlalchemy import distinct, exists, or_
from sqlalchemy.dialects import postgresql
from sqlalchemy.event import listens_for
from sqlalchemy.ext.mutable import Mutable
Expand Down Expand Up @@ -728,9 +728,9 @@ def to_dict(self):
def unused(cls, days=7):
age_threshold = datetime.datetime.now() - datetime.timedelta(days=days)

unused_results = (db.session.query(QueryResult.id).filter(
Query.id == None, QueryResult.retrieved_at < age_threshold)
.outerjoin(Query))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why stop filtering out the results where the query id isn't set? (that's probably for when a query hasn't been saved yet?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outerjoin joins on Query.latest_query_data_id, so this deletes results over the age threshold that aren't referenced by any Query rows. We don't want that now that there can be multiple results for a single Query.

unused_results = db.session.query(QueryResult.id).filter(
QueryResult.retrieved_at < age_threshold,
~QueryResultSet.query.filter(QueryResultSet.result_id == QueryResult.id).exists())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my SQA foo is weak, could you explain what that query does?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"where no QueryResultSet rows refer to this QueryResult row"


return unused_results

Expand Down Expand Up @@ -769,9 +769,12 @@ def store_result(cls, org, data_source, query_hash, query, data, run_time, retri
queries = db.session.query(Query).filter(
Query.query_hash == query_hash,
Query.data_source == data_source)

for q in queries:
q.latest_query_data = query_result
db.session.add(q)
if q.schedule_resultset_size > 0:
q.query_results.append(query_result)
query_ids = [q.id for q in queries]
logging.info("Updated %s queries with result (%s).", len(query_ids), query_hash)

Expand Down Expand Up @@ -851,6 +854,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
data_source = db.relationship(DataSource, backref='queries')
latest_query_data_id = Column(db.Integer, db.ForeignKey("query_results.id"), nullable=True)
latest_query_data = db.relationship(QueryResult)
query_results = db.relationship("QueryResult", secondary="query_resultsets")
name = Column(db.String(255))
description = Column(db.String(4096), nullable=True)
query_text = Column("query", db.Text)
Expand All @@ -866,6 +870,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
schedule = Column(db.String(10), nullable=True)
schedule_failures = Column(db.Integer, default=0)
schedule_until = Column(db.DateTime(True), nullable=True)
schedule_resultset_size = Column(db.Integer, nullable=True)
visualizations = db.relationship("Visualization", cascade="all, delete-orphan")
options = Column(MutableDict.as_mutable(PseudoJSON), default={})
search_vector = Column(TSVectorType('id', 'name', 'description', 'query',
Expand All @@ -892,6 +897,7 @@ def to_dict(self, with_stats=False, with_visualizations=False, with_user=True, w
'query_hash': self.query_hash,
'schedule': self.schedule,
'schedule_until': self.schedule_until,
'schedule_resultset_size': self.schedule_resultset_size,
'api_key': self.api_key,
'is_archived': self.is_archived,
'is_draft': self.is_draft,
Expand Down Expand Up @@ -1000,6 +1006,37 @@ def outdated_queries(cls):

return outdated_queries.values()

@classmethod
def delete_stale_resultsets(cls):
delete_count = 0
texts = [c[0] for c in db.session.query(Query.query_text)
.filter(Query.schedule_resultset_size != None).distinct()]
for text in texts:
queries = (Query.query.filter(Query.query_text == text,
Query.schedule_resultset_size != None)
.order_by(Query.schedule_resultset_size.desc()))
# Multiple queries with the same text may request multiple result sets
# be kept. We start with the one that keeps the most, and delete both
# the unneeded bridge rows and result sets.
first_query = queries.first()
if first_query is not None and first_query.schedule_resultset_size:
resultsets = QueryResultSet.query.filter(QueryResultSet.query_rel == first_query).order_by(QueryResultSet.result_id)
resultset_count = resultsets.count()
if resultset_count > first_query.schedule_resultset_size:
n_to_delete = resultset_count - first_query.schedule_resultset_size
r_ids = [r.result_id for r in resultsets][:n_to_delete]
QueryResultSet.query.filter(QueryResultSet.result_id.in_(r_ids)).delete(synchronize_session=False)
delete_count += QueryResult.query.filter(QueryResult.id.in_(r_ids)).delete(synchronize_session=False)
# By this point there are no stale result sets left.
# Delete unneeded bridge rows for the remaining queries.
for q in queries[1:]:
resultsets = db.session.query(QueryResultSet.result_id).filter(QueryResultSet.query_rel == q).order_by(QueryResultSet.result_id)
n_to_delete = resultsets.count() - q.schedule_resultset_size
if n_to_delete > 0:
stale_r = QueryResultSet.query.filter(QueryResultSet.result_id.in_(resultsets.limit(n_to_delete).subquery()))
stale_r.delete(synchronize_session=False)
return delete_count

@classmethod
def search(cls, term, group_ids, include_drafts=False, limit=20):
where = cls.is_archived == False
Expand Down Expand Up @@ -1089,6 +1126,16 @@ def __repr__(self):
return '<Query %s: "%s">' % (self.id, self.name or 'untitled')


class QueryResultSet(db.Model):
query_id = Column(db.Integer, db.ForeignKey("queries.id"),
primary_key=True)
query_rel = db.relationship(Query)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should just be query, no need for the extra suffix.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The suffix is to avoid collision with db.Model.query.

result_id = Column(db.Integer, db.ForeignKey("query_results.id"),
primary_key=True)
result = db.relationship(QueryResult)
__tablename__ = 'query_resultsets'


@vectorizer(db.Integer)
def integer_vectorizer(column):
return db.func.cast(column, db.Text)
Expand Down
1 change: 1 addition & 0 deletions redash/tasks/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ def cleanup_query_results():
deleted_count = models.QueryResult.query.filter(
models.QueryResult.id.in_(unused_query_results.subquery())
).delete(synchronize_session=False)
deleted_count += models.Query.delete_stale_resultsets()
models.db.session.commit()
logger.info("Deleted %d unused query results.", deleted_count)

Expand Down
7 changes: 6 additions & 1 deletion tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ def __call__(self):
query_hash=gen_query_hash('SELECT 1'),
data_source=data_source_factory.create,
org_id=1)

query_resultset_factory = ModelFactory(redash.models.QueryResultSet,
query_rel=query_factory.create,
result=query_result_factory.create)
visualization_factory = ModelFactory(redash.models.Visualization,
type='CHART',
query_rel=query_factory.create,
Expand Down Expand Up @@ -295,6 +297,9 @@ def create_query_result(self, **kwargs):

return query_result_factory.create(**args)

def create_query_resultset(self, **kwargs):
return query_resultset_factory.create(**kwargs)

def create_visualization(self, **kwargs):
args = {
'query_rel': self.create_query()
Expand Down
Loading