diff --git a/client/app/pages/queries/query.html b/client/app/pages/queries/query.html
index d95b0b76db..6cf10c5216 100644
--- a/client/app/pages/queries/query.html
+++ b/client/app/pages/queries/query.html
@@ -207,7 +207,7 @@
- Query in queue…
+ Query in queue (waiting on {{queryResult.queueStatus}} )…
diff --git a/client/app/services/query-result.js b/client/app/services/query-result.js
index 06fa301c9b..5a2ad38812 100644
--- a/client/app/services/query-result.js
+++ b/client/app/services/query-result.js
@@ -50,7 +50,7 @@ function addPointToSeries(point, seriesCollection, seriesName) {
seriesCollection[seriesName].data.push(point);
}
-function QueryResultService($resource, $timeout, $q, QueryResultError) {
+function QueryResultService($http, $resource, $timeout, $q, QueryResultError) {
const QueryResultResource = $resource('api/query_results/:id', { id: '@id' }, { post: { method: 'POST' } });
const Job = $resource('api/jobs/:id', { id: '@id' });
const statuses = {
@@ -89,6 +89,7 @@ function QueryResultService($resource, $timeout, $q, QueryResultError) {
// extended status flags
this.isLoadingResult = false;
+ this.queueStatus = null;
if (props) {
this.update(props);
@@ -519,6 +520,18 @@ function QueryResultService($resource, $timeout, $q, QueryResultError) {
);
}
+ refreshQueueStatus(dataSourceId) {
+ if (this.getStatus() === 'waiting') {
+ const p = $http.get(`/api/queue_status/${this.job.id}?data_source=${dataSourceId}`);
+ p.then((response) => {
+ $timeout(() => this.refreshQueueStatus(dataSourceId), 10000);
+ this.queueStatus = response.data.num_tasks;
+ });
+ } else {
+ this.queueStatus = null;
+ }
+ }
+
getLink(queryId, fileType, apiKey) {
let link = `api/queries/${queryId}/results/${this.getId()}.${fileType}`;
if (apiKey) {
@@ -576,6 +589,7 @@ function QueryResultService($resource, $timeout, $q, QueryResultError) {
if ('job' in response) {
queryResult.refreshStatus(query);
+ queryResult.refreshQueueStatus(dataSourceId);
}
},
(error) => {
diff --git a/redash/handlers/api.py b/redash/handlers/api.py
index 94da6aa859..dc4478c89e 100644
--- a/redash/handlers/api.py
+++ b/redash/handlers/api.py
@@ -39,7 +39,8 @@
from redash.handlers.query_results import (JobResource,
QueryResultDropdownResource,
QueryResultListResource,
- QueryResultResource)
+ QueryResultResource,
+ QueueStatusResource)
from redash.handlers.query_snippets import (QuerySnippetListResource,
QuerySnippetResource)
from redash.handlers.settings import OrganizationSettings
@@ -128,7 +129,7 @@ def json_representation(data, code, headers=None):
'/api/queries//results/.',
endpoint='query_result')
api.add_org_resource(JobResource, '/api/jobs/', endpoint='job')
-
+api.add_org_resource(QueueStatusResource, '/api/queue_status/', endpoint='2job_queue_status')
api.add_org_resource(UserListResource, '/api/users', endpoint='users')
api.add_org_resource(UserResource, '/api/users/', endpoint='user')
api.add_org_resource(UserInviteResource, '/api/users//invite', endpoint='user_invite')
diff --git a/redash/handlers/query_results.py b/redash/handlers/query_results.py
index 548b41173a..3062648de6 100644
--- a/redash/handlers/query_results.py
+++ b/redash/handlers/query_results.py
@@ -12,6 +12,8 @@
from redash.tasks.queries import enqueue_query
from redash.utils import (collect_parameters_from_request, gen_query_hash, json_dumps, utcnow, to_filename)
from redash.utils.parameterized_query import ParameterizedQuery, InvalidParameterError, dropdown_values
+from redash.monitor import parse_tasks, get_waiting_in_queue
+from redash.worker import celery
def error_response(message):
@@ -332,3 +334,15 @@ def delete(self, job_id):
"""
job = QueryTask(job_id=job_id)
job.cancel()
+
+
+class QueueStatusResource(BaseResource):
+ def get(self, job_id):
+ job = QueryTask(job_id=job_id)
+ task_info = job._async_result._get_task_meta()
+
+ data_source_id = request.args.get('data_source')
+ data_source = models.DataSource.get_by_id_and_org(data_source_id, self.current_org)
+ tasks = ([t for t in parse_tasks(celery.control.inspect().reserved(), 'reserved') if t['queue'] == data_source.queue_name] +
+ get_waiting_in_queue(data_source.queue_name))
+ return {'queue_name': data_source.queue_name, 'num_tasks': len(tasks)}
diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py
index a83ab3242e..fdd98ddc79 100644
--- a/redash/tasks/queries.py
+++ b/redash/tasks/queries.py
@@ -12,6 +12,7 @@
from redash.query_runner import InterruptException
from redash.tasks.alerts import check_alerts_for_query
from redash.utils import gen_query_hash, json_dumps, json_loads, utcnow, mustache_render
+from redash.monitor import get_waiting_in_queue
from redash.worker import celery
logger = get_task_logger(__name__)
diff --git a/tests/handlers/test_query_results.py b/tests/handlers/test_query_results.py
index 455cb81704..93945d732f 100644
--- a/tests/handlers/test_query_results.py
+++ b/tests/handlers/test_query_results.py
@@ -1,8 +1,8 @@
+import mock
from tests import BaseTestCase
from redash.models import db
-from redash.utils import json_dumps
-
+from redash.utils import gen_query_hash, json_dumps
class TestQueryResultsCacheHeaders(BaseTestCase):
def test_uses_cache_headers_for_specific_result(self):
@@ -38,7 +38,6 @@ def test_get_existing_result(self):
self.assertEquals(query_result.id, rv.json['query_result']['id'])
def test_execute_new_query(self):
- query_result = self.factory.create_query_result()
query = self.factory.create_query()
rv = self.make_request('post', '/api/query_results',
@@ -50,6 +49,46 @@ def test_execute_new_query(self):
self.assertNotIn('query_result', rv.json)
self.assertIn('job', rv.json)
+ def test_queue_length(self):
+ query = self.factory.create_query()
+ tasks = []
+ def fake_all(*a, **kw):
+ return tasks
+ def enqueue_query(query, *a, **kw):
+ from redash.tasks.queries import enqueue_query
+ job = enqueue_query(query, *a, **kw)
+ tasks.append(dict(
+ state='waiting_in_queue',
+ task_name='test task',
+ worker=None,
+ worker_pid=None,
+ start_time=None,
+ task_id=job.id,
+ queue='queries',
+ ))
+ return job
+ patch_all = mock.patch('redash.tasks.queries.get_waiting_in_queue', fake_all)
+ patch_enqueue_query = mock.patch('redash.handlers.query_results.enqueue_query',
+ enqueue_query)
+ with patch_all, patch_enqueue_query:
+ rv0 = self.make_request('post', '/api/query_results',
+ data={'data_source_id': self.factory.data_source.id,
+ 'query': query.query_text,
+ 'max_age': 0})
+ rv1 = self.make_request('post', '/api/query_results',
+ data={'data_source_id': self.factory.data_source.id,
+ 'query': query.query_text,
+ 'max_age': 0})
+ rv2 = self.make_request('post', '/api/query_results',
+ data={'data_source_id': self.factory.data_source.id,
+ 'query': query.query_text,
+ 'max_age': 0})
+
+ self.assertEquals(rv0.json['job']['queue_length'], 0)
+ self.assertEquals(rv1.json['job']['queue_length'], 1)
+ self.assertEquals(rv2.json['job']['queue_length'], 2)
+
+
def test_execute_query_without_access(self):
group = self.factory.create_group()
db.session.commit()