Skip to content

Commit

Permalink
Merge pull request #1108 from getredash/fix-1097
Browse files Browse the repository at this point in the history
Remove potnetially concurrency not safe code form enqueue_query
  • Loading branch information
arikfr committed Jun 9, 2016
2 parents 095e759 + 7159f0b commit 8c21e91
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 7 deletions.
7 changes: 1 addition & 6 deletions redash/tasks/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def save(self, connection=None):
if l != self._get_list():
connection.zrem(l, key_name)

# TOOD: this is not thread/concurrency safe. In current code this is not an issue, but better to fix this.
def update(self, **kwargs):
self.data.update(kwargs)
self.save()
Expand Down Expand Up @@ -216,12 +217,6 @@ def enqueue_query(query, data_source, scheduled=False, metadata={}):
logging.info("[%s] Found existing job: %s", query_hash, job_id)

job = QueryTask(job_id=job_id)
tracker = QueryTaskTracker.get_by_task_id(job_id, connection=pipe)
# tracker might not exist, if it's an old job
if scheduled and tracker:
tracker.update(retries=tracker.retries+1)
elif tracker:
tracker.update(scheduled_retries=tracker.scheduled_retries+1)

if job.ready():
logging.info("[%s] job found is ready (%s), removing lock", query_hash, job.celery_status)
Expand Down
41 changes: 40 additions & 1 deletion tests/tasks/test_queries.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from tests import BaseTestCase
from redash import redis_connection
from redash.tasks.queries import QueryTaskTracker
from redash.tasks.queries import QueryTaskTracker, enqueue_query, execute_query
from unittest import TestCase
from mock import MagicMock
from collections import namedtuple
import uuid


class TestPrune(TestCase):
Expand Down Expand Up @@ -29,3 +33,38 @@ def test_removes_oldest_items_first(self):

for k in self.keys[0:50]:
self.assertFalse(redis_connection.exists(k))


FakeResult = namedtuple('FakeResult', 'id')


def gen_hash(*args, **kwargs):
return FakeResult(uuid.uuid4().hex)


class TestEnqueueTask(BaseTestCase):
def test_multiple_enqueue_of_same_query(self):
query = self.factory.create_query()
execute_query.apply_async = MagicMock(side_effect=gen_hash)

enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})

self.assertEqual(1, execute_query.apply_async.call_count)
self.assertEqual(1, redis_connection.zcard(QueryTaskTracker.WAITING_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.IN_PROGRESS_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.DONE_LIST))

def test_multiple_enqueue_of_different_query(self):
query = self.factory.create_query()
execute_query.apply_async = MagicMock(side_effect=gen_hash)

enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
enqueue_query(query.query + '2', query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
enqueue_query(query.query + '3', query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})

self.assertEqual(3, execute_query.apply_async.call_count)
self.assertEqual(3, redis_connection.zcard(QueryTaskTracker.WAITING_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.IN_PROGRESS_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.DONE_LIST))

0 comments on commit 8c21e91

Please sign in to comment.