From eb477d35cc27893ae4099098ea2d4dfc3bc2b546 Mon Sep 17 00:00:00 2001 From: Allen Short Date: Thu, 24 Jan 2019 17:32:09 -0600 Subject: [PATCH] Show number of queries ahead in queue when executing (#786) --- client/app/pages/queries/query.html | 1 + client/app/services/query-result.js | 22 +++++++++++ redash/handlers/api.py | 3 +- redash/handlers/query_results.py | 16 ++++++++ tests/handlers/test_query_results.py | 56 ++++++++++++++++++++++++++-- 5 files changed, 94 insertions(+), 4 deletions(-) diff --git a/client/app/pages/queries/query.html b/client/app/pages/queries/query.html index e3175dc320..bf82ef5284 100644 --- a/client/app/pages/queries/query.html +++ b/client/app/pages/queries/query.html @@ -214,6 +214,7 @@

Query in queue… + (waiting on {{queryResult.queueStatus}} ) diff --git a/client/app/services/query-result.js b/client/app/services/query-result.js index a267cee050..6ec8f68601 100644 --- a/client/app/services/query-result.js +++ b/client/app/services/query-result.js @@ -74,6 +74,7 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) { // extended status flags this.isLoadingResult = false; + this.queueStatus = null; if (props) { this.update(props); @@ -367,6 +368,26 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) { ); } + refreshQueueStatus(dataSourceId) { + if (this.getStatus() === 'waiting') { + const actions = { + get: { method: 'GET', cache: false, isArray: false }, + }; + $resource('api/jobs/:id/data_source/:dataSourceId/status', { id: '@id', dataSourceId: '@dataSourceId' }, actions).get( + { + id: this.job.id, + dataSourceId, + }, + (statusResponse) => { + $timeout(() => this.refreshQueueStatus(dataSourceId), 10000); + this.queueStatus = statusResponse.data.num_tasks; + }, + ); + } else { + this.queueStatus = null; + } + } + getLink(queryId, fileType, apiKey) { let link = `api/queries/${queryId}/results/${this.getId()}.${fileType}`; if (apiKey) { @@ -424,6 +445,7 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) { if ('job' in response) { queryResult.refreshStatus(query, parameters); + queryResult.refreshQueueStatus(dataSourceId); } }, (error) => { diff --git a/redash/handlers/api.py b/redash/handlers/api.py index 786de7622b..90ab2172e6 100644 --- a/redash/handlers/api.py +++ b/redash/handlers/api.py @@ -39,6 +39,7 @@ QueryTagsResource, QueryRegenerateApiKeyResource) from redash.handlers.query_results import (JobResource, + JobStatusResource, QueryResultDropdownResource, QueryDropdownsResource, QueryResultListResource, @@ -140,7 +141,7 @@ def json_representation(data, code, headers=None): '/api/jobs/', '/api/queries//jobs/', endpoint='job') - +api.add_org_resource(JobStatusResource, '/api/jobs//data_source//status', endpoint='job_queue_status') api.add_org_resource(UserListResource, '/api/users', endpoint='users') api.add_org_resource(UserResource, '/api/users/', endpoint='user') api.add_org_resource(UserInviteResource, '/api/users//invite', endpoint='user_invite') diff --git a/redash/handlers/query_results.py b/redash/handlers/query_results.py index 5fdf8e7314..43e5e766e8 100644 --- a/redash/handlers/query_results.py +++ b/redash/handlers/query_results.py @@ -14,6 +14,8 @@ from redash.models.parameterized_query import (ParameterizedQuery, InvalidParameterError, QueryDetachedFromDataSourceError, dropdown_values) from redash.serializers import serialize_query_result, serialize_query_result_to_csv, serialize_query_result_to_xlsx +from redash.monitor import parse_tasks, get_waiting_in_queue +from redash.worker import celery def error_response(message, http_status=400): @@ -326,3 +328,17 @@ def delete(self, job_id): """ job = QueryTask(job_id=job_id) job.cancel() + + +class JobStatusResource(BaseResource): + def get(self, job_id, data_source_id): + job = QueryTask(job_id=job_id) + job._async_result._get_task_meta() + data_source = models.DataSource.get_by_id_and_org(data_source_id, self.current_org) + reserved_tasks = [ + task + for task in parse_tasks(celery.control.inspect().reserved(), 'reserved') + if task['queue'] == data_source.queue_name + ] + tasks = reserved_tasks + get_waiting_in_queue(data_source.queue_name) + return {'queue_name': data_source.queue_name, 'num_tasks': len(tasks)} diff --git a/tests/handlers/test_query_results.py b/tests/handlers/test_query_results.py index e54e12648c..8c149341c2 100644 --- a/tests/handlers/test_query_results.py +++ b/tests/handlers/test_query_results.py @@ -1,8 +1,9 @@ +import mock from tests import BaseTestCase from redash.models import db -from redash.utils import json_dumps from redash.handlers.query_results import error_messages +from redash.utils import json_dumps class TestQueryResultsCacheHeaders(BaseTestCase): @@ -39,7 +40,6 @@ def test_get_existing_result(self): self.assertEquals(query_result.id, rv.json['query_result']['id']) def test_execute_new_query(self): - query_result = self.factory.create_query_result() query = self.factory.create_query() rv = self.make_request('post', '/api/query_results', @@ -51,6 +51,56 @@ def test_execute_new_query(self): self.assertNotIn('query_result', rv.json) self.assertIn('job', rv.json) + def test_queue_length(self): + query = self.factory.create_query() + tasks = [] + + def fake_all(*a, **kw): + return tasks + + def enqueue_query(query, *a, **kw): + from redash.tasks.queries import enqueue_query + job = enqueue_query(query, *a, **kw) + tasks.append(dict( + state='waiting_in_queue', + task_name='test task', + worker=None, + worker_pid=None, + start_time=None, + task_id=job.id, + queue='queries', + )) + return job + + patch_all = mock.patch('redash.handlers.query_results.get_waiting_in_queue', fake_all) + patch_parse_tasks = mock.patch('redash.handlers.query_results.parse_tasks', lambda *_: []) + patch_enqueue_query = mock.patch('redash.handlers.query_results.enqueue_query', + enqueue_query) + db.session.commit() + with patch_all, patch_enqueue_query, patch_parse_tasks: + job0 = self.make_request('post', '/api/query_results', + data={'data_source_id': self.factory.data_source.id, + 'query': query.query_text, + 'max_age': 0}) + rv0 = self.make_request('get', '/api/jobs/{}/data_source/{}/status'.format( + job0.json['job']['id'], self.factory.data_source.id)) + job1 = self.make_request('post', '/api/query_results', + data={'data_source_id': self.factory.data_source.id, + 'query': query.query_text, + 'max_age': 0}) + rv1 = self.make_request('get', '/api/jobs/{}/data_source/{}/status'.format( + job1.json['job']['id'], self.factory.data_source.id)) + job2 = self.make_request('post', '/api/query_results', + data={'data_source_id': self.factory.data_source.id, + 'query': query.query_text, + 'max_age': 0}) + rv2 = self.make_request('get', '/api/jobs/{}/data_source/{}/status'.format( + job2.json['job']['id'], self.factory.data_source.id)) + + self.assertEquals(rv0.json['num_tasks'], 1) + self.assertEquals(rv1.json['num_tasks'], 2) + self.assertEquals(rv2.json['num_tasks'], 3) + def test_execute_query_without_access(self): group = self.factory.create_group() db.session.commit() @@ -144,7 +194,7 @@ def test_execute_new_query(self): self.assertEquals(rv.status_code, 200) self.assertIn('job', rv.json) - + def test_execute_but_has_no_access_to_data_source(self): ds = self.factory.create_data_source(group=self.factory.create_group()) query = self.factory.create_query(data_source=ds)