Skip to content

Commit

Permalink
Merge pull request #298 from Balletie/master
Browse files Browse the repository at this point in the history
Add option for acknowledging failed tasks (globally and per-task)
  • Loading branch information
Koed00 authored Mar 13, 2018
2 parents 65afb63 + 03abbc9 commit d5e430b
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 8 deletions.
10 changes: 5 additions & 5 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,12 @@ def monitor(result_queue, broker=None):
save_cached(task, broker)
else:
save_task(task, broker)
# acknowledge and log the result
# acknowledge result
ack_id = task.pop('ack_id', False)
if ack_id and (task['success'] or task.get('ack_failure', False)):
broker.acknowledge(ack_id)
# log the result
if task['success']:
# acknowledge
ack_id = task.pop('ack_id', False)
if ack_id:
broker.acknowledge(ack_id)
# log success
logger.info(_("Processed [{}]").format(task['name']))
else:
Expand Down
5 changes: 5 additions & 0 deletions django_q/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ class Conf(object):
# Number of seconds to wait for a worker to finish.
TIMEOUT = conf.get('timeout', None)

# Whether to acknowledge unsuccessful tasks.
# This causes failed tasks to be considered delivered, thereby removing them from
# the task queue. Defaults to False.
ACK_FAILURES = conf.get('ack_failures', False)

# Number of seconds to wait for acknowledgement before retrying a task
# Only works with brokers that guarantee delivery. Defaults to 60 seconds.
RETRY = conf.get('retry', 60)
Expand Down
4 changes: 3 additions & 1 deletion django_q/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
def async(func, *args, **kwargs):
"""Queue a task for the cluster."""
keywords = kwargs.copy()
opt_keys = ('hook', 'group', 'save', 'sync', 'cached', 'iter_count', 'iter_cached', 'chain', 'broker')
opt_keys = ('hook', 'group', 'save', 'sync', 'cached', 'ack_failure', 'iter_count', 'iter_cached', 'chain', 'broker')
q_options = keywords.pop('q_options', {})
# get an id
tag = uuid()
Expand All @@ -42,6 +42,8 @@ def async(func, *args, **kwargs):
task['cached'] = Conf.CACHED
if 'sync' not in task and Conf.SYNC:
task['sync'] = Conf.SYNC
if 'ack_failure' not in task and Conf.ACK_FAILURES:
task['ack_failure'] = Conf.ACK_FAILURES
# finalize
task['kwargs'] = keywords
task['started'] = timezone.now()
Expand Down
53 changes: 52 additions & 1 deletion django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from django_q.models import Task, Success
from django_q.conf import Conf
from django_q.status import Stat
from django_q.brokers import get_broker
from django_q.brokers import get_broker, Broker
from django_q.tests.tasks import multiply
from django_q.queues import Queue

Expand Down Expand Up @@ -379,6 +379,57 @@ def test_update_failed(broker):
assert saved_task.success is True
assert saved_task.result == 'result'

@pytest.mark.django_db
def test_acknowledge_failure_override():
class VerifyAckMockBroker(Broker):
def __init__(self, *args, **kwargs):
super(VerifyAckMockBroker, self).__init__(*args, **kwargs)
self.acknowledgements = {}

def acknowledge(self, task_id):
count = self.acknowledgements.get(task_id, 0)
self.acknowledgements[task_id] = count + 1

tag = uuid()
task_fail_ack = {'id': tag[1],
'name': tag[0],
'ack_id': 'test_fail_ack_id',
'ack_failure': True,
'func': 'math.copysign',
'args': (1, -1),
'kwargs': {},
'started': timezone.now(),
'stopped': timezone.now(),
'success': False,
'result': None}

tag = uuid()
task_fail_no_ack = task_fail_ack.copy()
task_fail_no_ack.update({'id': tag[1],
'name': tag[0],
'ack_id': 'test_fail_no_ack_id'})
del task_fail_no_ack['ack_failure']

tag = uuid()
task_success_ack = task_fail_ack.copy()
task_success_ack.update({'id': tag[1],
'name': tag[0],
'ack_id': 'test_success_ack_id',
'success': True,})
del task_success_ack['ack_failure']

result_queue = Queue()
result_queue.put(task_fail_ack)
result_queue.put(task_fail_no_ack)
result_queue.put(task_success_ack)
result_queue.put('STOP')
broker = VerifyAckMockBroker(list_key='key')

monitor(result_queue, broker)

assert broker.acknowledgements.get('test_fail_ack_id') == 1
assert broker.acknowledgements.get('test_fail_no_ack_id') is None
assert broker.acknowledgements.get('test_success_ack_id') == 1

@pytest.mark.django_db
def assert_result(task):
Expand Down
6 changes: 5 additions & 1 deletion docs/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ Broker
The broker collects task packages from the django instances and queues them for pick up by a cluster.
If the broker supports message receipts, it will keep a copy of the tasks around until a cluster acknowledges the processing of the task.
Otherwise it is put back in the queue after a timeout period. This ensure at-least-once delivery.
Note that even if the task errors when processed by the cluster, this is considered a successful delivery.
Most failed deliveries will be the result of a worker or the cluster crashing before the task was saved.

.. note::
When the :ref:`ack_failures` option is set to ``False`` (the default), a task is
considered a failed delivery when it raises an ``Exception``. Set
this option to ``True`` to acknowledge failed tasks as successful.

Pusher
""""""

Expand Down
7 changes: 7 additions & 0 deletions docs/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ timeout
The number of seconds a worker is allowed to spend on a task before it's terminated. Defaults to ``None``, meaning it will never time out.
Set this to something that makes sense for your project. Can be overridden for individual tasks.

.. _ack_failures:

ack_failures
~~~~~~~~~~~~

When set to ``True``, also acknowledge unsuccessful tasks. This causes failed tasks to be considered as successful deliveries, thereby removing them from the task queue. Can also be set per-task by passing the ``ack_failure`` option to :func:`async`. Defaults to ``False``.

.. _retry:

retry
Expand Down
5 changes: 5 additions & 0 deletions docs/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ timeout
"""""""
Overrides the cluster's timeout setting for this task.

ack_failure
"""""""""""
Overrides the cluster's :ref:`ack_failures` setting for this task.

sync
""""
Simulates a task execution synchronously. Useful for testing.
Expand Down Expand Up @@ -244,6 +248,7 @@ Reference
:param str group: An optional group identifier
:param int timeout: Overrides global cluster :ref:`timeout`.
:param bool save: Overrides global save setting for this task.
:param bool ack_failure: Overrides the global :ref:`ack_failures` setting for this task.
:param bool sync: If set to True, async will simulate a task execution
:param cached: Output the result to the cache backend. Bool or timeout in seconds
:param broker: Optional broker connection from :func:`brokers.get_broker`
Expand Down

0 comments on commit d5e430b

Please sign in to comment.