Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds task chains #96

Merged
merged 8 commits into from
Oct 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def monitor(result_queue, broker=None):
if task.get('cached', False):
save_cached(task, broker)
else:
save_task(task)
save_task(task, broker)
# log the result
if task['success']:
logger.info(_("Processed [{}]").format(task['name']))
Expand Down Expand Up @@ -386,13 +386,16 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT):
logger.info(_('{} stopped doing work').format(name))


def save_task(task):
def save_task(task, broker):
"""
Saves the task package to Django or the cache
"""
# SAVE LIMIT < 0 : Don't save success
if not task.get('save', Conf.SAVE_LIMIT > 0) and task['success']:
return
# async next in a chain
if task.get('chain', None):
tasks.async_chain(task['chain'], group=task['group'], cached=task['cached'], sync=task['sync'], broker=broker)
# SAVE LIMIT > 0: Prune database, SAVE_LIMIT 0: No pruning
db.close_old_connections()
try:
Expand All @@ -419,15 +422,15 @@ def save_cached(task, broker):
if timeout is True:
timeout = None
try:
group = task.get('group', False)
group = task.get('group', None)
iter_count = task.get('iter_count', 0)
# if it's a group append to the group list
if group:
task_key = '{}:{}:{}'.format(broker.list_key, group, task['id'])
group_key = '{}:{}:keys'.format(broker.list_key, group)
group_list = broker.cache.get(group_key) or []
# if it's an inter group, check if we are ready
if iter_count and len(group_list) == iter_count-1:
# if it's an iter group, check if we are ready
if iter_count and len(group_list) == iter_count - 1:
group_args = '{}:{}:args'.format(broker.list_key, group)
# collate the results into a Task result
results = [signing.SignedPackage.loads(broker.cache.get(k))['result'] for k in group_list]
Expand All @@ -441,13 +444,16 @@ def save_cached(task, broker):
task['cached'] = task.pop('iter_cached', None)
save_cached(task, broker=broker)
else:
save_task(task)
save_task(task, broker)
broker.cache.delete_many(group_list)
broker.cache.delete_many([group_key, group_args])
return
# save the group list
group_list.append(task_key)
broker.cache.set(group_key, group_list)
# async next in a chain
if task.get('chain', None):
tasks.async_chain(task['chain'], group=group, cached=task['cached'], sync=task['sync'], broker=broker)
# save the task
broker.cache.set(task_key,
signing.SignedPackage.dumps(task),
Expand Down
2 changes: 1 addition & 1 deletion django_q/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def time_taken(self):
return (self.stopped - self.started).total_seconds()

def __unicode__(self):
return self.name
return u'{}'.format(self.name or self.id)

class Meta:
app_label = 'django_q'
Expand Down
202 changes: 179 additions & 23 deletions django_q/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,33 @@ def async(func, *args, **kwargs):
"""Queue a task for the cluster."""
# get options from q_options dict or direct from kwargs
options = kwargs.pop('q_options', kwargs)
hook = options.pop('hook', None)
broker = options.pop('broker', get_broker())
sync = options.pop('sync', False)
group = options.pop('group', None)
save = options.pop('save', None)
cached = options.pop('cached', Conf.CACHED)
iter_count = options.pop('iter_count', None)
iter_cached = options.pop('iter_cached', None)
# get an id
# pop optionals
opts = {'hook': None,
'group': None,
'save': None,
'sync': None,
'cached': Conf.CACHED,
'iter_count': None,
'iter_cached': None,
'chain': None}
for key in opts:
opts[key] = options.pop(key, opts[key])
# get an id
tag = uuid()
# build the task package
task = {'id': tag[1], 'name': tag[0],
'func': func,
'args': args,
'kwargs': kwargs,
'started': timezone.now()}
# add optionals
if hook:
task['hook'] = hook
if group:
task['group'] = group
if save is not None:
task['save'] = save
if cached:
task['cached'] = cached
if iter_count:
task['iter_count'] = iter_count
if iter_cached:
task['iter_cached'] = iter_cached
# push optionals
for key in opts:
if opts[key] is not None:
task[key] = opts[key]
# sign it
pack = signing.SignedPackage.dumps(task)
if sync or Conf.SYNC:
if task.get('sync', False) or Conf.SYNC:
return _sync(pack)
# push it
broker.enqueue(pack)
Expand Down Expand Up @@ -371,7 +366,7 @@ def delete_cached(task_id, broker=None):
def queue_size(broker=None):
"""
Returns the current queue size.
Note that this doesn't count any tasks curren key = 'django_q:{}:results'.format(broker.list_key)tly being processed by workers.
Note that this doesn't count any tasks currently being processed by workers.

:param broker: optional broker
:return: current queue size
Expand All @@ -383,6 +378,9 @@ def queue_size(broker=None):


def async_iter(func, args_iter, **kwargs):
"""
async a function with iterable arguments
"""
iter_count = len(args_iter)
iter_group = uuid()[1]
# clean up the kwargs
Expand All @@ -404,6 +402,164 @@ def async_iter(func, args_iter, **kwargs):
return iter_group


def async_chain(chain, group=None, cached=Conf.CACHED, sync=Conf.SYNC, broker=None):
"""
async a chain of tasks
the chain must be in the format [(func,(args),{kwargs}),(func,(args),{kwargs})]
"""
if not group:
group = uuid()[1]
args = ()
kwargs = {}
task = chain.pop(0)
if type(task) is not tuple:
task = (task,)
if len(task) > 1:
args = task[1]
if len(task) > 2:
kwargs = task[2]
kwargs['chain'] = chain
kwargs['group'] = group
kwargs['cached'] = cached
kwargs['sync'] = sync
kwargs['broker'] = broker or get_broker()
async(task[0], *args, **kwargs)
return group


class Iter(object):
"""
An async task with iterable arguments
"""

def __init__(self, func=None, args=None, kwargs=None, cached=Conf.CACHED, sync=Conf.SYNC, broker=None):
self.func = func
self.args = args or []
self.kwargs = kwargs or {}
self.id = ''
self.broker = broker or get_broker()
self.cached = cached
self.sync = sync
self.started = False

def append(self, *args):
"""
add arguments to the set
"""
self.args.append(args)
if self.started:
self.started = False
return self.length()

def run(self):
"""
Start queueing the tasks to the worker cluster
:return: the task id
"""
self.kwargs['cached'] = self.cached
self.kwargs['sync'] = self.sync
self.kwargs['broker'] = self.broker
self.id = async_iter(self.func, self.args, **self.kwargs)
self.started = True
return self.id

def result(self, wait=0):
"""
return the full list of results.
:param int wait: how many milliseconds to wait for a result
:return: an unsorted list of results
"""
if self.started:
return result(self.id, wait=wait, cached=self.cached)

def fetch(self, wait=0):
"""
get the task result objects.
:param int wait: how many milliseconds to wait for a result
:return: an unsorted list of task objects
"""
if self.started:
return fetch(self.id, wait=wait, cached=self.cached)

def length(self):
"""
get the length of the arguments list
:return int: length of the argument list
"""
return len(self.args)


class Chain(object):
"""
A sequential chain of tasks
"""

def __init__(self, chain=None, group=None, cached=Conf.CACHED, sync=Conf.SYNC):
self.chain = chain or []
self.group = group or ''
self.broker = get_broker()
self.cached = cached
self.sync = sync
self.started = False

def append(self, func, *args, **kwargs):
"""
add a task to the chain
takes the same parameters as async()
"""
self.chain.append((func, args, kwargs))
# remove existing results
if self.started:
delete_group(self.group)
self.started = False
return self.length()

def run(self):
"""
Start queueing the chain to the worker cluster
:return: the chain's group id
"""
self.group = async_chain(chain=self.chain[:], group=self.group, cached=self.cached, sync=self.sync,
broker=self.broker)
self.started = True
return self.group

def result(self, wait=0):
"""
return the full list of results from the chain when it finishes. blocks until timeout.
:param int wait: how many milliseconds to wait for a result
:return: an unsorted list of results
"""
if self.started:
return result_group(self.group, wait=wait, count=self.length(), cached=self.cached)

def fetch(self, failures=True, wait=0):
"""
get the task result objects from the chain when it finishes. blocks until timeout.
:param failures: include failed tasks
:param int wait: how many milliseconds to wait for a result
:return: an unsorted list of task objects
"""
if self.started:
return fetch_group(self.group, failures=failures, wait=wait, count=self.length(), cached=self.cached)

def current(self):
"""
get the index of the currently executing chain element
:return int: current chain index
"""
if not self.started:
return None
return count_group(self.group, cached=self.cached)

def length(self):
"""
get the length of the chain
:return int: length of the chain
"""
return len(self.chain)


def _sync(pack):
"""Simulate a package travelling through the cluster."""
task_queue = Queue()
Expand Down
4 changes: 4 additions & 0 deletions django_q/tests/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,9 @@ def get_user_id(user):
return user.id


def hello():
return 'hello'


def result(obj):
print('RESULT HOOK {} : {}'.format(obj.name, obj.result))
51 changes: 47 additions & 4 deletions django_q/tests/test_cached.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from multiprocessing import Event, Queue, Value

import pytest
from django_q.cluster import pusher, worker, monitor

from django_q.cluster import pusher, worker, monitor
from django_q.conf import Conf
from django_q.tasks import async, result, fetch, count_group, result_group, fetch_group, delete_group, delete_cached, \
async_iter
async_iter, Chain, async_chain, Iter
from django_q.brokers import get_broker


Expand Down Expand Up @@ -96,9 +96,52 @@ def test_iter(broker):
result_t = result(t)
assert result_t is not None
task_t = fetch(t)
assert task_t. __unicode__ is not None
assert task_t.result == result_t
assert result(t2) is not None
assert result(t3) is not None
assert result(t4)[0] == 1
# test cached iter result
# test iter class
i = Iter('math.copysign', sync=True, cached=True)
i.append(1, -1)
i.append(2, -1)
i.append(3, -4)
i.append(5, 6)
assert i.started is False
assert i.length() == 4
assert i.run() is not None
assert len(i.result()) == 4
assert len(i.fetch().result) == 4
i.append(1, -7)
assert i.result() is None
i.run()
assert len(i.result()) == 5


@pytest.mark.django_db
def test_chain(broker):
broker.purge_queue()
broker.cache.clear()
task_chain = Chain(sync=True)
task_chain.append('math.floor', 1)
task_chain.append('math.copysign', 1, -1)
task_chain.append('math.floor', 2)
assert task_chain.length() == 3
assert task_chain.current() is None
task_chain.run()
r = task_chain.result(wait=1000)
assert task_chain.current() == task_chain.length()
assert len(r) == task_chain.length()
t = task_chain.fetch()
assert len(t) == task_chain.length()
task_chain.cached = True
task_chain.append('math.floor', 3)
assert task_chain.length() == 4
task_chain.run()
r = task_chain.result(wait=1000)
assert task_chain.current() == task_chain.length()
assert len(r) == task_chain.length()
t = task_chain.fetch()
assert len(t) == task_chain.length()
# test single
rid = async_chain(['django_q.tests.tasks.hello', 'django_q.tests.tasks.hello'], sync=True, cached=True)
assert result_group(rid, cached=True) == ['hello', 'hello']
Loading