From 880f0ffce601928a464a46200c14dfd4e7422d11 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 28 Jan 2020 12:31:12 -0500 Subject: [PATCH 1/5] Add a docstring to the apply method. --- celery_batches/__init__.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/celery_batches/__init__.py b/celery_batches/__init__.py index 861e09a..4e4ed1e 100644 --- a/celery_batches/__init__.py +++ b/celery_batches/__init__.py @@ -289,9 +289,17 @@ def task_message_handler(message, body, ack, reject, callbacks, **kw): return task_message_handler def apply(self, args=None, kwargs=None, *_args, **_kwargs): + """ + Execute this task locally as a batch of size 1, by blocking until the task returns. + + Arguments: + args (Tuple): positional arguments passed on to the task. + Returns: + celery.result.EagerResult: pre-evaluated result. + """ request = SimpleRequest( id=_kwargs.get("task_id", uuid()), - name="batch application", + name="batch request", args=args or (), kwargs=kwargs or {}, delivery_info=None, From c057a37a29414d86d341e1d04513b5f49669c032 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 28 Jan 2020 12:33:01 -0500 Subject: [PATCH 2/5] Support Python 2 properly. --- celery_batches/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/celery_batches/__init__.py b/celery_batches/__init__.py index 4e4ed1e..f52c5c7 100644 --- a/celery_batches/__init__.py +++ b/celery_batches/__init__.py @@ -306,7 +306,7 @@ def apply(self, args=None, kwargs=None, *_args, **_kwargs): hostname="localhost", ) - return super().apply(([request],), {}, *_args, **_kwargs) + return super(Batches, self).apply(([request],), {}, *_args, **_kwargs) def flush(self, requests): return self.apply_buffer(requests, ([SimpleRequest.from_request(r) From faa03a60edf6527609721d6aa06d4efc27f2896e Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 28 Jan 2020 12:37:00 -0500 Subject: [PATCH 3/5] Ensure the EagerResult works. --- t/integration/tasks.py | 1 + t/integration/test_batches.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/t/integration/tasks.py b/t/integration/tasks.py index 0928d08..4d7884c 100644 --- a/t/integration/tasks.py +++ b/t/integration/tasks.py @@ -38,6 +38,7 @@ def add(requests): result += request.args[0] Results().set(result) + return result @shared_task(base=Batches, flush_every=2, flush_interval=1) diff --git a/t/integration/test_batches.py b/t/integration/test_batches.py index 7b7f5db..c8c9149 100644 --- a/t/integration/test_batches.py +++ b/t/integration/test_batches.py @@ -53,10 +53,12 @@ def test_always_eager(): task_always_eager = app.conf.task_always_eager app.conf["task_always_eager"] = True - add.delay(1) + result = add.delay(1) app.conf["task_always_eager"] = task_always_eager + # An EagerResult that resolve to 1 should be returned. + assert result.get() == 1 assert Results().get() == 1 From ed20f7a3ab1a249fdb86435bc34dff5e18ac4606 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 28 Jan 2020 12:45:08 -0500 Subject: [PATCH 4/5] Update the changelog. --- CHANGELOG.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index ec7504e..4a4a7b5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -8,6 +8,8 @@ next * Properly set the ``current_task`` when running Batch tasks. * Call the success signal after a successful run of the Batch task. +* Support running tasks eagerly via the ``Task.apply()`` method. This causes + the task to execute with a batch of a single item. 0.2 2018-04-20 ============== From e3f1337c30b88eddb3c594a2e9a9e564cb9cff1c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 28 Jan 2020 12:45:13 -0500 Subject: [PATCH 5/5] Add an additional test. --- t/integration/test_batches.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/t/integration/test_batches.py b/t/integration/test_batches.py index c8c9149..a223029 100644 --- a/t/integration/test_batches.py +++ b/t/integration/test_batches.py @@ -62,6 +62,15 @@ def test_always_eager(): assert Results().get() == 1 +def test_apply(): + """The batch task runs immediately, in the same thread.""" + result = add.apply(args=(1, )) + + # An EagerResult that resolve to 1 should be returned. + assert result.get() == 1 + assert Results().get() == 1 + + def test_flush_interval(celery_worker): """The batch task runs after the flush interval has elapsed.""" add.delay(1)