-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aggregate query results (re #35) #339
Changes from all commits
4afd8a2
32c5dc6
8df71de
49e6c5d
25ecf76
d940320
db86cf3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,7 @@ | |
from redash.utils.comparators import CaseInsensitiveComparator | ||
from redash.utils.configuration import ConfigurationContainer | ||
from redash.settings.organization import settings as org_settings | ||
from sqlalchemy import distinct, or_ | ||
from sqlalchemy import distinct, exists, or_ | ||
from sqlalchemy.dialects import postgresql | ||
from sqlalchemy.event import listens_for | ||
from sqlalchemy.ext.mutable import Mutable | ||
|
@@ -728,9 +728,9 @@ def to_dict(self): | |
def unused(cls, days=7): | ||
age_threshold = datetime.datetime.now() - datetime.timedelta(days=days) | ||
|
||
unused_results = (db.session.query(QueryResult.id).filter( | ||
Query.id == None, QueryResult.retrieved_at < age_threshold) | ||
.outerjoin(Query)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why stop filtering out the results where the query id isn't set? (that's probably for when a query hasn't been saved yet?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
unused_results = db.session.query(QueryResult.id).filter( | ||
QueryResult.retrieved_at < age_threshold, | ||
~QueryResultSet.query.filter(QueryResultSet.result_id == QueryResult.id).exists()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think my SQA foo is weak, could you explain what that query does? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "where no |
||
|
||
return unused_results | ||
|
||
|
@@ -769,9 +769,12 @@ def store_result(cls, org, data_source, query_hash, query, data, run_time, retri | |
queries = db.session.query(Query).filter( | ||
Query.query_hash == query_hash, | ||
Query.data_source == data_source) | ||
|
||
for q in queries: | ||
q.latest_query_data = query_result | ||
db.session.add(q) | ||
if q.schedule_resultset_size > 0: | ||
q.query_results.append(query_result) | ||
query_ids = [q.id for q in queries] | ||
logging.info("Updated %s queries with result (%s).", len(query_ids), query_hash) | ||
|
||
|
@@ -851,6 +854,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model): | |
data_source = db.relationship(DataSource, backref='queries') | ||
latest_query_data_id = Column(db.Integer, db.ForeignKey("query_results.id"), nullable=True) | ||
latest_query_data = db.relationship(QueryResult) | ||
query_results = db.relationship("QueryResult", secondary="query_resultsets") | ||
name = Column(db.String(255)) | ||
description = Column(db.String(4096), nullable=True) | ||
query_text = Column("query", db.Text) | ||
|
@@ -866,6 +870,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model): | |
schedule = Column(db.String(10), nullable=True) | ||
schedule_failures = Column(db.Integer, default=0) | ||
schedule_until = Column(db.DateTime(True), nullable=True) | ||
schedule_resultset_size = Column(db.Integer, nullable=True) | ||
visualizations = db.relationship("Visualization", cascade="all, delete-orphan") | ||
options = Column(MutableDict.as_mutable(PseudoJSON), default={}) | ||
search_vector = Column(TSVectorType('id', 'name', 'description', 'query', | ||
|
@@ -892,6 +897,7 @@ def to_dict(self, with_stats=False, with_visualizations=False, with_user=True, w | |
'query_hash': self.query_hash, | ||
'schedule': self.schedule, | ||
'schedule_until': self.schedule_until, | ||
'schedule_resultset_size': self.schedule_resultset_size, | ||
'api_key': self.api_key, | ||
'is_archived': self.is_archived, | ||
'is_draft': self.is_draft, | ||
|
@@ -1000,6 +1006,37 @@ def outdated_queries(cls): | |
|
||
return outdated_queries.values() | ||
|
||
@classmethod | ||
def delete_stale_resultsets(cls): | ||
delete_count = 0 | ||
texts = [c[0] for c in db.session.query(Query.query_text) | ||
.filter(Query.schedule_resultset_size != None).distinct()] | ||
for text in texts: | ||
queries = (Query.query.filter(Query.query_text == text, | ||
Query.schedule_resultset_size != None) | ||
.order_by(Query.schedule_resultset_size.desc())) | ||
# Multiple queries with the same text may request multiple result sets | ||
# be kept. We start with the one that keeps the most, and delete both | ||
# the unneeded bridge rows and result sets. | ||
first_query = queries.first() | ||
if first_query is not None and first_query.schedule_resultset_size: | ||
resultsets = QueryResultSet.query.filter(QueryResultSet.query_rel == first_query).order_by(QueryResultSet.result_id) | ||
resultset_count = resultsets.count() | ||
if resultset_count > first_query.schedule_resultset_size: | ||
n_to_delete = resultset_count - first_query.schedule_resultset_size | ||
r_ids = [r.result_id for r in resultsets][:n_to_delete] | ||
QueryResultSet.query.filter(QueryResultSet.result_id.in_(r_ids)).delete(synchronize_session=False) | ||
delete_count += QueryResult.query.filter(QueryResult.id.in_(r_ids)).delete(synchronize_session=False) | ||
# By this point there are no stale result sets left. | ||
# Delete unneeded bridge rows for the remaining queries. | ||
for q in queries[1:]: | ||
resultsets = db.session.query(QueryResultSet.result_id).filter(QueryResultSet.query_rel == q).order_by(QueryResultSet.result_id) | ||
n_to_delete = resultsets.count() - q.schedule_resultset_size | ||
if n_to_delete > 0: | ||
stale_r = QueryResultSet.query.filter(QueryResultSet.result_id.in_(resultsets.limit(n_to_delete).subquery())) | ||
stale_r.delete(synchronize_session=False) | ||
return delete_count | ||
|
||
@classmethod | ||
def search(cls, term, group_ids, include_drafts=False, limit=20): | ||
where = cls.is_archived == False | ||
|
@@ -1089,6 +1126,16 @@ def __repr__(self): | |
return '<Query %s: "%s">' % (self.id, self.name or 'untitled') | ||
|
||
|
||
class QueryResultSet(db.Model): | ||
query_id = Column(db.Integer, db.ForeignKey("queries.id"), | ||
primary_key=True) | ||
query_rel = db.relationship(Query) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should just be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The suffix is to avoid collision with |
||
result_id = Column(db.Integer, db.ForeignKey("query_results.id"), | ||
primary_key=True) | ||
result = db.relationship(QueryResult) | ||
__tablename__ = 'query_resultsets' | ||
|
||
|
||
@vectorizer(db.Integer) | ||
def integer_vectorizer(column): | ||
return db.func.cast(column, db.Text) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was
schedule_keep_results
an unused field before? I don't see any other references to it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This migration went into master before it should have -- the field was misnamed. We're going to roll back the misnamed field on stage before this PR gets merged.