Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds stale db connection check before every transaction #77

Merged
merged 3 commits into from
Sep 28, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ def __init__(self, stop_event, start_event, broker=None, timeout=Conf.TIMEOUT, s
self.task_queue = Queue(maxsize=Conf.QUEUE_LIMIT) if Conf.QUEUE_LIMIT else Queue()
self.result_queue = Queue()
self.event_out = Event()
self.monitor = Process()
self.pusher = Process()
self.monitor = None
self.pusher = None
if start:
self.start()

Expand Down Expand Up @@ -163,7 +163,7 @@ def spawn_monitor(self):
def reincarnate(self, process):
"""
:param process: the process to reincarnate
:type process: Process
:type process: Process or None
"""
if process == self.monitor:
self.monitor = self.spawn_monitor()
Expand Down Expand Up @@ -276,7 +276,7 @@ def stop(self):

def pusher(task_queue, event, broker=None):
"""
Pulls tasks of the Redis List and puts them in the task queue
Pulls tasks of the broker and puts them in the task queue
:type task_queue: multiprocessing.Queue
:type event: multiprocessing.Event
"""
Expand Down Expand Up @@ -318,11 +318,12 @@ def monitor(result_queue, broker=None):
broker = get_broker()
name = current_process().name
logger.info(_("{} monitoring at {}").format(name, current_process().pid))
db.close_old_connections()
for task in iter(result_queue.get, 'STOP'):
# acknowledge
ack_id = task.pop('ack_id', False)
if ack_id:
broker.acknowledge(ack_id)
# save the result
save_task(task)
if task['success']:
logger.info(_("Processed [{}]").format(task['name']))
Expand All @@ -340,7 +341,6 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT):
"""
name = current_process().name
logger.info(_('{} ready for work at {}').format(name, current_process().pid))
db.close_old_connections()
task_count = 0
# Start reading the task queue
for task in iter(task_queue.get, 'STOP'):
Expand All @@ -360,6 +360,7 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT):
result = (e, False)
# We're still going
if not result:
db.close_old_connections()
# execute the payload
timer.value = task['kwargs'].pop('timeout', timeout or 0) # Busy
try:
Expand Down Expand Up @@ -388,6 +389,7 @@ def save_task(task):
if not task.get('save', Conf.SAVE_LIMIT > 0) and task['success']:
return
# SAVE LIMIT > 0: Prune database, SAVE_LIMIT 0: No pruning
db.close_old_connections()
try:
if task['success'] and 0 < Conf.SAVE_LIMIT <= Success.objects.count():
Success.objects.last().delete()
Expand All @@ -412,6 +414,7 @@ def scheduler(broker=None):
"""
if not broker:
broker = get_broker()
db.close_old_connections()
try:
for s in Schedule.objects.exclude(repeats=0).filter(next_run__lt=timezone.now()):
args = ()
Expand Down