Skip to content

Commit

Permalink
Merge pull request #18 from percipient/eager-clean-ups
Browse files Browse the repository at this point in the history
Minor clean-ups to eager code path (PR #16)
  • Loading branch information
clokep authored Jan 28, 2020
2 parents be817fd + e3f1337 commit 4ce26b9
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ next

* Properly set the ``current_task`` when running Batch tasks.
* Call the success signal after a successful run of the Batch task.
* Support running tasks eagerly via the ``Task.apply()`` method. This causes
the task to execute with a batch of a single item.

0.2 2018-04-20
==============
Expand Down
12 changes: 10 additions & 2 deletions celery_batches/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,24 @@ def task_message_handler(message, body, ack, reject, callbacks, **kw):
return task_message_handler

def apply(self, args=None, kwargs=None, *_args, **_kwargs):
"""
Execute this task locally as a batch of size 1, by blocking until the task returns.
Arguments:
args (Tuple): positional arguments passed on to the task.
Returns:
celery.result.EagerResult: pre-evaluated result.
"""
request = SimpleRequest(
id=_kwargs.get("task_id", uuid()),
name="batch application",
name="batch request",
args=args or (),
kwargs=kwargs or {},
delivery_info=None,
hostname="localhost",
)

return super().apply(([request],), {}, *_args, **_kwargs)
return super(Batches, self).apply(([request],), {}, *_args, **_kwargs)

def flush(self, requests):
return self.apply_buffer(requests, ([SimpleRequest.from_request(r)
Expand Down
1 change: 1 addition & 0 deletions t/integration/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def add(requests):
result += request.args[0]

Results().set(result)
return result


@shared_task(base=Batches, flush_every=2, flush_interval=1)
Expand Down
13 changes: 12 additions & 1 deletion t/integration/test_batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,21 @@ def test_always_eager():
task_always_eager = app.conf.task_always_eager
app.conf["task_always_eager"] = True

add.delay(1)
result = add.delay(1)

app.conf["task_always_eager"] = task_always_eager

# An EagerResult that resolve to 1 should be returned.
assert result.get() == 1
assert Results().get() == 1


def test_apply():
"""The batch task runs immediately, in the same thread."""
result = add.apply(args=(1, ))

# An EagerResult that resolve to 1 should be returned.
assert result.get() == 1
assert Results().get() == 1


Expand Down

0 comments on commit 4ce26b9

Please sign in to comment.