Skip to content

Commit

Permalink
Merge pull request #29 from Koed00/dev
Browse files Browse the repository at this point in the history
timeout fix
  • Loading branch information
Koed00 committed Jul 22, 2015
2 parents 4b4c64b + a26ffbf commit bbdeac5
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ target/

dev-requirements.txt
manage.py
testq
db.sqlite3
.venv
.idea
djq
djq
node_modules
28 changes: 14 additions & 14 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def dummy_close():
self.sentinel.start()
logger.info(_('Q Cluster-{} starting.').format(self.pid))
while not self.start_event.is_set():
sleep(0.2)
sleep(0.1)
return self.pid

def stop(self):
Expand Down Expand Up @@ -176,7 +176,7 @@ def spawn_pusher(self):
return self.spawn_process(pusher, self.task_queue, self.event_out, self.list_key, self.r)

def spawn_worker(self):
self.spawn_process(worker, self.task_queue, self.result_queue, Value('b', -1), self.timeout)
self.spawn_process(worker, self.task_queue, self.result_queue, Value('f', -1), self.timeout)

def spawn_monitor(self):
return self.spawn_process(monitor, self.result_queue)
Expand Down Expand Up @@ -226,31 +226,32 @@ def guard(self):
logger.info(_('Q Cluster-{} running.').format(self.parent_pid))
scheduler(list_key=self.list_key)
counter = 0
cycle = 0.5 # guard loop sleep in seconds
# Guard loop. Runs at least once
while not self.stop_event.is_set() or not counter:
# Check Workers
for p in self.pool:
# Are you alive?
if not p.is_alive() or (self.timeout and int(p.timer.value) == 0):
if not p.is_alive() or (self.timeout and p.timer.value == 0):
self.reincarnate(p)
continue
# Decrement timer if work is being done
if p.timer.value > 0:
p.timer.value -= 1
if self.timeout and p.timer.value > 0:
p.timer.value -= cycle
# Check Monitor
if not self.monitor.is_alive():
self.reincarnate(self.monitor)
# Check Pusher
if not self.pusher.is_alive():
self.reincarnate(self.pusher)
# Call scheduler once a minute (or so)
counter += 1
if counter > 120:
counter += cycle
if counter == 30:
counter = 0
scheduler(list_key=self.list_key)
# Save current status
Stat(self).save()
sleep(0.5)
sleep(cycle)
self.stop()

def stop(self):
Expand All @@ -261,7 +262,7 @@ def stop(self):
self.event_out.set()
# Wait for it to stop
while self.pusher.is_alive():
sleep(0.2)
sleep(0.1)
Stat(self).save()
# Put poison pills in the queue
for _ in range(len(self.pool)):
Expand All @@ -274,7 +275,7 @@ def stop(self):
for p in self.pool:
if not p.is_alive():
self.pool.remove(p)
sleep(0.2)
sleep(0.1)
Stat(self).save()
# Finally stop the monitor
self.result_queue.put('STOP')
Expand All @@ -286,8 +287,8 @@ def stop(self):
count = 0
if not self.timeout:
self.timeout = 30
while self.status() == Conf.STOPPING and count < self.timeout * 5:
sleep(0.2)
while self.status() == Conf.STOPPING and count < self.timeout * 10:
sleep(0.1)
Stat(self).save()
count += 1
# Final status
Expand All @@ -311,8 +312,7 @@ def pusher(task_queue, e, list_key=Conf.Q_LIST, r=redis_client):
sleep(10)
break
if task:
task = task[1]
task_queue.put(task)
task_queue.put(task[1])
logger.debug(_('queueing from {}').format(list_key))
if e.is_set():
break
Expand Down

0 comments on commit bbdeac5

Please sign in to comment.