From 4ceee42be97cfca475addc7b5702eb8bc8726726 Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Mon, 28 Sep 2015 10:42:08 +0200 Subject: [PATCH 1/3] #76 resets stale db connection on every scheduler ping --- django_q/cluster.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index d5ffdeba..a250cf04 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -117,8 +117,8 @@ def __init__(self, stop_event, start_event, broker=None, timeout=Conf.TIMEOUT, s self.task_queue = Queue(maxsize=Conf.QUEUE_LIMIT) if Conf.QUEUE_LIMIT else Queue() self.result_queue = Queue() self.event_out = Event() - self.monitor = Process() - self.pusher = Process() + self.monitor = None + self.pusher = None if start: self.start() @@ -163,7 +163,7 @@ def spawn_monitor(self): def reincarnate(self, process): """ :param process: the process to reincarnate - :type process: Process + :type process: Process or None """ if process == self.monitor: self.monitor = self.spawn_monitor() @@ -276,7 +276,7 @@ def stop(self): def pusher(task_queue, event, broker=None): """ - Pulls tasks of the Redis List and puts them in the task queue + Pulls tasks of the broker and puts them in the task queue :type task_queue: multiprocessing.Queue :type event: multiprocessing.Event """ @@ -412,6 +412,8 @@ def scheduler(broker=None): """ if not broker: broker = get_broker() + # reset stale db connections + db.close_old_connections() try: for s in Schedule.objects.exclude(repeats=0).filter(next_run__lt=timezone.now()): args = () From 7075a91d3744c8007e978efb70daa8b18d6107a8 Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Mon, 28 Sep 2015 11:42:48 +0200 Subject: [PATCH 2/3] #76 adds connection checked with timeout Adds a check for old connections in both the workers and the monitor , every DB_TIMEOUT seconds --- django_q/cluster.py | 12 ++++++++++++ django_q/conf.py | 3 +++ django_q/tests/test_cluster.py | 2 ++ 3 files changed, 17 insertions(+) diff --git a/django_q/cluster.py b/django_q/cluster.py index a250cf04..0f057ecb 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -319,10 +319,17 @@ def monitor(result_queue, broker=None): name = current_process().name logger.info(_("{} monitoring at {}").format(name, current_process().pid)) db.close_old_connections() + connection_timer = timezone.now() for task in iter(result_queue.get, 'STOP'): + # check db connection timeout + if (timezone.now() - connection_timer).total_seconds() >= Conf.DB_TIMEOUT: + db.close_old_connections() + connection_timer = timezone.now() + # acknowledge ack_id = task.pop('ack_id', False) if ack_id: broker.acknowledge(ack_id) + # save the result save_task(task) if task['success']: logger.info(_("Processed [{}]").format(task['name'])) @@ -341,6 +348,7 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT): name = current_process().name logger.info(_('{} ready for work at {}').format(name, current_process().pid)) db.close_old_connections() + connection_timer = timezone.now() task_count = 0 # Start reading the task queue for task in iter(task_queue.get, 'STOP'): @@ -360,6 +368,10 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT): result = (e, False) # We're still going if not result: + # check db connection timeout + if (timezone.now() - connection_timer).total_seconds() >= Conf.DB_TIMEOUT: + db.close_old_connections() + connection_timer = timezone.now() # execute the payload timer.value = task['kwargs'].pop('timeout', timeout or 0) # Busy try: diff --git a/django_q/conf.py b/django_q/conf.py index 9ca2146c..f45d8537 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -64,6 +64,9 @@ class Conf(object): # Failures are always saved SAVE_LIMIT = conf.get('save_limit', 250) + # Sets the time in seconds to check for stale database connections + DB_TIMEOUT = conf.get('db_timeout', 60) + # Disable the scheduler SCHEDULER = conf.get('scheduler', True) diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 37dc79ab..75eec79d 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -113,6 +113,7 @@ def test_cluster(broker): @pytest.mark.django_db def test_async(broker, admin_user): broker.list_key = 'cluster_test:q' + Conf.DB_TIMEOUT = 0 broker.delete_queue() a = async('django_q.tests.tasks.count_letters', DEFAULT_WORDLIST, hook='django_q.tests.test_cluster.assert_result', broker=broker) @@ -237,6 +238,7 @@ def test_async(broker, admin_user): assert fetch(k, 100) is None assert result(k, 100) is None broker.delete_queue() + Conf.DB_TIMEOUT = 60 @pytest.mark.django_db From 765c84301fa8af4d38b3456ae4b67d4b8d4bb8f2 Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Mon, 28 Sep 2015 12:48:11 +0200 Subject: [PATCH 3/3] #76 checks for stale connections always It turns out that checking stale connections on a timer takes between 1-2 times as long as just checking them always. This also has the benefit of catching timeouts that happen between timer loops. --- django_q/cluster.py | 15 ++------------- django_q/conf.py | 3 --- django_q/tests/test_cluster.py | 2 -- 3 files changed, 2 insertions(+), 18 deletions(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index 0f057ecb..f0d8d686 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -318,13 +318,7 @@ def monitor(result_queue, broker=None): broker = get_broker() name = current_process().name logger.info(_("{} monitoring at {}").format(name, current_process().pid)) - db.close_old_connections() - connection_timer = timezone.now() for task in iter(result_queue.get, 'STOP'): - # check db connection timeout - if (timezone.now() - connection_timer).total_seconds() >= Conf.DB_TIMEOUT: - db.close_old_connections() - connection_timer = timezone.now() # acknowledge ack_id = task.pop('ack_id', False) if ack_id: @@ -347,8 +341,6 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT): """ name = current_process().name logger.info(_('{} ready for work at {}').format(name, current_process().pid)) - db.close_old_connections() - connection_timer = timezone.now() task_count = 0 # Start reading the task queue for task in iter(task_queue.get, 'STOP'): @@ -368,10 +360,7 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT): result = (e, False) # We're still going if not result: - # check db connection timeout - if (timezone.now() - connection_timer).total_seconds() >= Conf.DB_TIMEOUT: - db.close_old_connections() - connection_timer = timezone.now() + db.close_old_connections() # execute the payload timer.value = task['kwargs'].pop('timeout', timeout or 0) # Busy try: @@ -400,6 +389,7 @@ def save_task(task): if not task.get('save', Conf.SAVE_LIMIT > 0) and task['success']: return # SAVE LIMIT > 0: Prune database, SAVE_LIMIT 0: No pruning + db.close_old_connections() try: if task['success'] and 0 < Conf.SAVE_LIMIT <= Success.objects.count(): Success.objects.last().delete() @@ -424,7 +414,6 @@ def scheduler(broker=None): """ if not broker: broker = get_broker() - # reset stale db connections db.close_old_connections() try: for s in Schedule.objects.exclude(repeats=0).filter(next_run__lt=timezone.now()): diff --git a/django_q/conf.py b/django_q/conf.py index f45d8537..9ca2146c 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -64,9 +64,6 @@ class Conf(object): # Failures are always saved SAVE_LIMIT = conf.get('save_limit', 250) - # Sets the time in seconds to check for stale database connections - DB_TIMEOUT = conf.get('db_timeout', 60) - # Disable the scheduler SCHEDULER = conf.get('scheduler', True) diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 75eec79d..37dc79ab 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -113,7 +113,6 @@ def test_cluster(broker): @pytest.mark.django_db def test_async(broker, admin_user): broker.list_key = 'cluster_test:q' - Conf.DB_TIMEOUT = 0 broker.delete_queue() a = async('django_q.tests.tasks.count_letters', DEFAULT_WORDLIST, hook='django_q.tests.test_cluster.assert_result', broker=broker) @@ -238,7 +237,6 @@ def test_async(broker, admin_user): assert fetch(k, 100) is None assert result(k, 100) is None broker.delete_queue() - Conf.DB_TIMEOUT = 60 @pytest.mark.django_db