From fa7f86b39958b7ae3767e47a40617687ea35e531 Mon Sep 17 00:00:00 2001 From: Allen Short Date: Thu, 24 Jan 2019 17:32:09 -0600 Subject: [PATCH] Show number of queries ahead in queue when executing --- client/app/pages/queries/query.html | 2 +- client/app/services/query-result.js | 16 +++++++++- redash/handlers/api.py | 5 ++-- redash/handlers/query_results.py | 14 +++++++++ redash/tasks/queries.py | 1 + tests/handlers/test_query_results.py | 45 ++++++++++++++++++++++++++-- 6 files changed, 76 insertions(+), 7 deletions(-) diff --git a/client/app/pages/queries/query.html b/client/app/pages/queries/query.html index d95b0b76db..6cf10c5216 100644 --- a/client/app/pages/queries/query.html +++ b/client/app/pages/queries/query.html @@ -207,7 +207,7 @@

- Query in queue… + Query in queue (waiting on {{queryResult.queueStatus}} ) diff --git a/client/app/services/query-result.js b/client/app/services/query-result.js index 06fa301c9b..5a2ad38812 100644 --- a/client/app/services/query-result.js +++ b/client/app/services/query-result.js @@ -50,7 +50,7 @@ function addPointToSeries(point, seriesCollection, seriesName) { seriesCollection[seriesName].data.push(point); } -function QueryResultService($resource, $timeout, $q, QueryResultError) { +function QueryResultService($http, $resource, $timeout, $q, QueryResultError) { const QueryResultResource = $resource('api/query_results/:id', { id: '@id' }, { post: { method: 'POST' } }); const Job = $resource('api/jobs/:id', { id: '@id' }); const statuses = { @@ -89,6 +89,7 @@ function QueryResultService($resource, $timeout, $q, QueryResultError) { // extended status flags this.isLoadingResult = false; + this.queueStatus = null; if (props) { this.update(props); @@ -519,6 +520,18 @@ function QueryResultService($resource, $timeout, $q, QueryResultError) { ); } + refreshQueueStatus(dataSourceId) { + if (this.getStatus() === 'waiting') { + const p = $http.get(`/api/queue_status/${this.job.id}?data_source=${dataSourceId}`); + p.then((response) => { + $timeout(() => this.refreshQueueStatus(dataSourceId), 10000); + this.queueStatus = response.data.num_tasks; + }); + } else { + this.queueStatus = null; + } + } + getLink(queryId, fileType, apiKey) { let link = `api/queries/${queryId}/results/${this.getId()}.${fileType}`; if (apiKey) { @@ -576,6 +589,7 @@ function QueryResultService($resource, $timeout, $q, QueryResultError) { if ('job' in response) { queryResult.refreshStatus(query); + queryResult.refreshQueueStatus(dataSourceId); } }, (error) => { diff --git a/redash/handlers/api.py b/redash/handlers/api.py index 94da6aa859..dc4478c89e 100644 --- a/redash/handlers/api.py +++ b/redash/handlers/api.py @@ -39,7 +39,8 @@ from redash.handlers.query_results import (JobResource, QueryResultDropdownResource, QueryResultListResource, - QueryResultResource) + QueryResultResource, + QueueStatusResource) from redash.handlers.query_snippets import (QuerySnippetListResource, QuerySnippetResource) from redash.handlers.settings import OrganizationSettings @@ -128,7 +129,7 @@ def json_representation(data, code, headers=None): '/api/queries//results/.', endpoint='query_result') api.add_org_resource(JobResource, '/api/jobs/', endpoint='job') - +api.add_org_resource(QueueStatusResource, '/api/queue_status/', endpoint='2job_queue_status') api.add_org_resource(UserListResource, '/api/users', endpoint='users') api.add_org_resource(UserResource, '/api/users/', endpoint='user') api.add_org_resource(UserInviteResource, '/api/users//invite', endpoint='user_invite') diff --git a/redash/handlers/query_results.py b/redash/handlers/query_results.py index 548b41173a..3062648de6 100644 --- a/redash/handlers/query_results.py +++ b/redash/handlers/query_results.py @@ -12,6 +12,8 @@ from redash.tasks.queries import enqueue_query from redash.utils import (collect_parameters_from_request, gen_query_hash, json_dumps, utcnow, to_filename) from redash.utils.parameterized_query import ParameterizedQuery, InvalidParameterError, dropdown_values +from redash.monitor import parse_tasks, get_waiting_in_queue +from redash.worker import celery def error_response(message): @@ -332,3 +334,15 @@ def delete(self, job_id): """ job = QueryTask(job_id=job_id) job.cancel() + + +class QueueStatusResource(BaseResource): + def get(self, job_id): + job = QueryTask(job_id=job_id) + task_info = job._async_result._get_task_meta() + + data_source_id = request.args.get('data_source') + data_source = models.DataSource.get_by_id_and_org(data_source_id, self.current_org) + tasks = ([t for t in parse_tasks(celery.control.inspect().reserved(), 'reserved') if t['queue'] == data_source.queue_name] + + get_waiting_in_queue(data_source.queue_name)) + return {'queue_name': data_source.queue_name, 'num_tasks': len(tasks)} diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py index a83ab3242e..fdd98ddc79 100644 --- a/redash/tasks/queries.py +++ b/redash/tasks/queries.py @@ -12,6 +12,7 @@ from redash.query_runner import InterruptException from redash.tasks.alerts import check_alerts_for_query from redash.utils import gen_query_hash, json_dumps, json_loads, utcnow, mustache_render +from redash.monitor import get_waiting_in_queue from redash.worker import celery logger = get_task_logger(__name__) diff --git a/tests/handlers/test_query_results.py b/tests/handlers/test_query_results.py index 455cb81704..93945d732f 100644 --- a/tests/handlers/test_query_results.py +++ b/tests/handlers/test_query_results.py @@ -1,8 +1,8 @@ +import mock from tests import BaseTestCase from redash.models import db -from redash.utils import json_dumps - +from redash.utils import gen_query_hash, json_dumps class TestQueryResultsCacheHeaders(BaseTestCase): def test_uses_cache_headers_for_specific_result(self): @@ -38,7 +38,6 @@ def test_get_existing_result(self): self.assertEquals(query_result.id, rv.json['query_result']['id']) def test_execute_new_query(self): - query_result = self.factory.create_query_result() query = self.factory.create_query() rv = self.make_request('post', '/api/query_results', @@ -50,6 +49,46 @@ def test_execute_new_query(self): self.assertNotIn('query_result', rv.json) self.assertIn('job', rv.json) + def test_queue_length(self): + query = self.factory.create_query() + tasks = [] + def fake_all(*a, **kw): + return tasks + def enqueue_query(query, *a, **kw): + from redash.tasks.queries import enqueue_query + job = enqueue_query(query, *a, **kw) + tasks.append(dict( + state='waiting_in_queue', + task_name='test task', + worker=None, + worker_pid=None, + start_time=None, + task_id=job.id, + queue='queries', + )) + return job + patch_all = mock.patch('redash.tasks.queries.get_waiting_in_queue', fake_all) + patch_enqueue_query = mock.patch('redash.handlers.query_results.enqueue_query', + enqueue_query) + with patch_all, patch_enqueue_query: + rv0 = self.make_request('post', '/api/query_results', + data={'data_source_id': self.factory.data_source.id, + 'query': query.query_text, + 'max_age': 0}) + rv1 = self.make_request('post', '/api/query_results', + data={'data_source_id': self.factory.data_source.id, + 'query': query.query_text, + 'max_age': 0}) + rv2 = self.make_request('post', '/api/query_results', + data={'data_source_id': self.factory.data_source.id, + 'query': query.query_text, + 'max_age': 0}) + + self.assertEquals(rv0.json['job']['queue_length'], 0) + self.assertEquals(rv1.json['job']['queue_length'], 1) + self.assertEquals(rv2.json['job']['queue_length'], 2) + + def test_execute_query_without_access(self): group = self.factory.create_group() db.session.commit()