From f6cbd9817a84c2c6ef79ed7e1d467ee0b86a40be Mon Sep 17 00:00:00 2001 From: Ilan Date: Wed, 22 Jul 2015 13:11:52 +0200 Subject: [PATCH 1/2] fix for timeout --- .gitignore | 4 ++-- django_q/cluster.py | 24 ++++++++++++------------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index 3ba95fdd..66079ea8 100644 --- a/.gitignore +++ b/.gitignore @@ -58,8 +58,8 @@ target/ dev-requirements.txt manage.py -testq db.sqlite3 .venv .idea -djq \ No newline at end of file +djq +node_modules \ No newline at end of file diff --git a/django_q/cluster.py b/django_q/cluster.py index c474c439..ce9e1fb8 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -72,7 +72,7 @@ def dummy_close(): self.sentinel.start() logger.info(_('Q Cluster-{} starting.').format(self.pid)) while not self.start_event.is_set(): - sleep(0.2) + sleep(0.1) return self.pid def stop(self): @@ -176,7 +176,7 @@ def spawn_pusher(self): return self.spawn_process(pusher, self.task_queue, self.event_out, self.list_key, self.r) def spawn_worker(self): - self.spawn_process(worker, self.task_queue, self.result_queue, Value('b', -1), self.timeout) + self.spawn_process(worker, self.task_queue, self.result_queue, Value('f', -1), self.timeout) def spawn_monitor(self): return self.spawn_process(monitor, self.result_queue) @@ -226,6 +226,7 @@ def guard(self): logger.info(_('Q Cluster-{} running.').format(self.parent_pid)) scheduler(list_key=self.list_key) counter = 0 + cycle = 0.5 # Guard loop. Runs at least once while not self.stop_event.is_set() or not counter: # Check Workers @@ -236,7 +237,7 @@ def guard(self): continue # Decrement timer if work is being done if p.timer.value > 0: - p.timer.value -= 1 + p.timer.value -= cycle # Check Monitor if not self.monitor.is_alive(): self.reincarnate(self.monitor) @@ -244,13 +245,13 @@ def guard(self): if not self.pusher.is_alive(): self.reincarnate(self.pusher) # Call scheduler once a minute (or so) - counter += 1 - if counter > 120: + counter += cycle + if counter == 30: counter = 0 scheduler(list_key=self.list_key) # Save current status Stat(self).save() - sleep(0.5) + sleep(cycle) self.stop() def stop(self): @@ -261,7 +262,7 @@ def stop(self): self.event_out.set() # Wait for it to stop while self.pusher.is_alive(): - sleep(0.2) + sleep(0.1) Stat(self).save() # Put poison pills in the queue for _ in range(len(self.pool)): @@ -274,7 +275,7 @@ def stop(self): for p in self.pool: if not p.is_alive(): self.pool.remove(p) - sleep(0.2) + sleep(0.1) Stat(self).save() # Finally stop the monitor self.result_queue.put('STOP') @@ -286,8 +287,8 @@ def stop(self): count = 0 if not self.timeout: self.timeout = 30 - while self.status() == Conf.STOPPING and count < self.timeout * 5: - sleep(0.2) + while self.status() == Conf.STOPPING and count < self.timeout * 10: + sleep(0.1) Stat(self).save() count += 1 # Final status @@ -311,8 +312,7 @@ def pusher(task_queue, e, list_key=Conf.Q_LIST, r=redis_client): sleep(10) break if task: - task = task[1] - task_queue.put(task) + task_queue.put(task[1]) logger.debug(_('queueing from {}').format(list_key)) if e.is_set(): break From a26ffbf03b92c05f758bb502e4b4b9e11867d30b Mon Sep 17 00:00:00 2001 From: Ilan Steemers Date: Wed, 22 Jul 2015 14:25:32 +0200 Subject: [PATCH 2/2] tuning timeout --- django_q/cluster.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index ce9e1fb8..9f510cec 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -226,18 +226,18 @@ def guard(self): logger.info(_('Q Cluster-{} running.').format(self.parent_pid)) scheduler(list_key=self.list_key) counter = 0 - cycle = 0.5 + cycle = 0.5 # guard loop sleep in seconds # Guard loop. Runs at least once while not self.stop_event.is_set() or not counter: # Check Workers for p in self.pool: # Are you alive? - if not p.is_alive() or (self.timeout and int(p.timer.value) == 0): + if not p.is_alive() or (self.timeout and p.timer.value == 0): self.reincarnate(p) continue # Decrement timer if work is being done - if p.timer.value > 0: - p.timer.value -= cycle + if self.timeout and p.timer.value > 0: + p.timer.value -= cycle # Check Monitor if not self.monitor.is_alive(): self.reincarnate(self.monitor)