From fefd2cee7033622929a5fa8e47c2dc97ecbb217c Mon Sep 17 00:00:00 2001 From: Naor Malca Date: Wed, 26 Oct 2022 09:39:55 +0300 Subject: [PATCH 1/6] handle uninitialized tasks --- .../instrumentation/celery/__init__.py | 13 ++++-- .../instrumentation/celery/utils.py | 2 + .../tests/test_tasks.py | 46 +++++++++++++++++-- .../tests/test_utils.py | 7 +++ 4 files changed, 62 insertions(+), 6 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index 6d79f45115..b54c7ee06a 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -183,10 +183,17 @@ def _trace_before_publish(self, *args, **kwargs): task = utils.retrieve_task_from_sender(kwargs) task_id = utils.retrieve_task_id_from_message(kwargs) - if task is None or task_id is None: + if task_id is None: return - operation_name = f"{_TASK_APPLY_ASYNC}/{task.name}" + if task is None: + # task is an anonymous task send using send_task or using canvas workflow + # Signatures() to send to a task not in the current processes dependency + # tree + task_name = kwargs.get("sender", "unknown") + else: + task_name = task.name + operation_name = f"{_TASK_APPLY_ASYNC}/{task_name}" span = self._tracer.start_span( operation_name, kind=trace.SpanKind.PRODUCER ) @@ -195,7 +202,7 @@ def _trace_before_publish(self, *args, **kwargs): if span.is_recording(): span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC) span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id) - span.set_attribute(_TASK_NAME_KEY, task.name) + span.set_attribute(_TASK_NAME_KEY, task_name) utils.set_attributes_from_context(span, kwargs) activation = trace.use_span(span, end_on_exit=True) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py index 77abb89af8..6f4f9cbc3a 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py @@ -132,6 +132,8 @@ def attach_span(task, task_id, span, is_publish=False): NOTE: We cannot test for this well yet, because we do not run a celery worker, and cannot run `task.apply_async()` """ + if task is None: + return span_dict = getattr(task, CTX_KEY, None) if span_dict is None: span_dict = {} diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index 6b2ee9a94c..0af786e59f 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -60,9 +60,7 @@ def test_task(self): }, ) - self.assertEqual( - producer.name, "apply_async/tests.celery_test_tasks.task_add" - ) + self.assertEqual(producer.name, "apply_async/tests.celery_test_tasks.task_add") self.assertEqual(producer.kind, SpanKind.PRODUCER) self.assertSpanHasAttributes( producer, @@ -77,3 +75,45 @@ def test_task(self): self.assertNotEqual(consumer.parent, producer.context) self.assertEqual(consumer.parent.span_id, producer.context.span_id) self.assertEqual(consumer.context.trace_id, producer.context.trace_id) + + +class TestCelerySignatureTask(TestBase): + def setUp(self): + super().setUp() + + def start_app(*args, **kwargs): + # Add an additional task that will not be registered with parent thread + @app.task + def hidden_task(x): + return x * 2 + + self._worker = app.Worker(app=app, pool="solo", concurrency=1) + return self._worker.start(*args, **kwargs) + + self._thread = threading.Thread(target=start_app) + self._worker = app.Worker(app=app, pool="solo", concurrency=1) + self._thread.daemon = True + self._thread.start() + + def tearDown(self): + super().tearDown() + self._worker.stop() + self._thread.join() + + def test_hidden_task(self): + # no-op since already instrumented + CeleryInstrumentor().instrument() + import ipdb + + ipdb.set_trace() + + res = app.signature("app.hidden_task", (2,)).apply_async() + while not res.ready(): + time.sleep(0.05) + + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) + self.assertEqual(len(spans), 1) + + producer = spans + + self.assertEqual(producer.name, "apply_async/app.hidden_task") diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py index 8b5352fe3a..55aa3eec1e 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py @@ -185,6 +185,13 @@ def fn_task(): utils.detach_span(fn_task, task_id) self.assertEqual(utils.retrieve_span(fn_task, task_id), (None, None)) + def test_optional_task_span_attach(self): + task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f" + span = trace._Span("name", mock.Mock(spec=trace_api.SpanContext)) + + # assert this is is a no-aop + self.assertIsNone(utils.attach_span(None, task_id, span)) + def test_span_delete_empty(self): # ensure detach_span doesn't raise an exception if span is not present @self.app.task From 45ef9d0e6a0b886e87c99747ab694fc5d3eab650 Mon Sep 17 00:00:00 2001 From: Naor Malca Date: Thu, 27 Oct 2022 09:54:36 +0300 Subject: [PATCH 2/6] fix lint and add changelog --- CHANGELOG.md | 2 ++ .../opentelemetry-instrumentation-celery/tests/test_tasks.py | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05028d04a3..41e00c9a78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1333](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1333)) - `opentelemetry-instrumentation-asgi` Make ASGIGetter.get() compare all keys in a case insensitive manner. ([#1333](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1333)) +- `opentelemetry-instrumentation-celery` Add support for anonymous tasks. + ([#1407](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1407) ## [1.13.0-0.34b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.13.0-0.34b0) - 2022-09-26 diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index 0af786e59f..49c074d254 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -60,7 +60,9 @@ def test_task(self): }, ) - self.assertEqual(producer.name, "apply_async/tests.celery_test_tasks.task_add") + self.assertEqual( + producer.name, "apply_async/tests.celery_test_tasks.task_add" + ) self.assertEqual(producer.kind, SpanKind.PRODUCER) self.assertSpanHasAttributes( producer, From 29d52dcbebd632833896ca9d308bb135b3065510 Mon Sep 17 00:00:00 2001 From: Naor Malca Date: Sat, 29 Oct 2022 19:07:48 +0300 Subject: [PATCH 3/6] fix unit test --- .../tests/test_tasks.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index 49c074d254..baf2b7c7a1 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -86,8 +86,8 @@ def setUp(self): def start_app(*args, **kwargs): # Add an additional task that will not be registered with parent thread @app.task - def hidden_task(x): - return x * 2 + def hidden_task(num_a): + return num_a * 2 self._worker = app.Worker(app=app, pool="solo", concurrency=1) return self._worker.start(*args, **kwargs) @@ -105,9 +105,6 @@ def tearDown(self): def test_hidden_task(self): # no-op since already instrumented CeleryInstrumentor().instrument() - import ipdb - - ipdb.set_trace() res = app.signature("app.hidden_task", (2,)).apply_async() while not res.ready(): @@ -116,6 +113,7 @@ def test_hidden_task(self): spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) self.assertEqual(len(spans), 1) - producer = spans + producer = spans[0] self.assertEqual(producer.name, "apply_async/app.hidden_task") + self.assertEqual(producer.kind, SpanKind.PRODUCER) From fd7254ff99e5974e3d3e88e7293814aad4577564 Mon Sep 17 00:00:00 2001 From: Naor Malca Date: Wed, 2 Nov 2022 17:32:23 +0200 Subject: [PATCH 4/6] fix unit test --- .../tests/test_tasks.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index baf2b7c7a1..74d8ba234d 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -106,14 +106,23 @@ def test_hidden_task(self): # no-op since already instrumented CeleryInstrumentor().instrument() - res = app.signature("app.hidden_task", (2,)).apply_async() + res = app.signature("tests.test_tasks.hidden_task", (2,)).apply_async() while not res.ready(): time.sleep(0.05) spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) - self.assertEqual(len(spans), 1) + self.assertEqual(len(spans), 2) - producer = spans[0] + consumer, producer = spans - self.assertEqual(producer.name, "apply_async/app.hidden_task") + self.assertEqual(consumer.name, "run/tests.test_tasks.hidden_task") + self.assertEqual(consumer.kind, SpanKind.CONSUMER) + + self.assertEqual( + producer.name, "apply_async/tests.test_tasks.hidden_task" + ) self.assertEqual(producer.kind, SpanKind.PRODUCER) + + self.assertNotEqual(consumer.parent, producer.context) + self.assertEqual(consumer.parent.span_id, producer.context.span_id) + self.assertEqual(consumer.context.trace_id, producer.context.trace_id) From 0cd6cfe283c92381ec73ba552f5c3e8696ad9a04 Mon Sep 17 00:00:00 2001 From: Naor Malca Date: Sat, 5 Nov 2022 22:33:35 +0200 Subject: [PATCH 5/6] fix unit test --- .../opentelemetry-instrumentation-celery/tests/test_tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index 74d8ba234d..1026b424e8 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -35,6 +35,7 @@ def tearDown(self): super().tearDown() self._worker.stop() self._thread.join() + CeleryInstrumentor().uninstrument() def test_task(self): CeleryInstrumentor().instrument() @@ -101,6 +102,7 @@ def tearDown(self): super().tearDown() self._worker.stop() self._thread.join() + CeleryInstrumentor().uninstrument() def test_hidden_task(self): # no-op since already instrumented @@ -109,7 +111,6 @@ def test_hidden_task(self): res = app.signature("tests.test_tasks.hidden_task", (2,)).apply_async() while not res.ready(): time.sleep(0.05) - spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) self.assertEqual(len(spans), 2) From 83c7660a8abbfeb0c91b5e0b4fc870b03a49278e Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 29 Apr 2023 04:01:00 +0530 Subject: [PATCH 6/6] fix pipeline --- CHANGELOG.md | 5 +++-- .../opentelemetry-instrumentation-celery/tests/test_tasks.py | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35d27c70ee..73fc2b9aa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1730](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1730)) - Make ASGI request span attributes available for `start_span`. ([#1762](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1762)) +- `opentelemetry-instrumentation-celery` Add support for anonymous tasks. + ([#1407](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1407) + ### Fixed @@ -199,8 +202,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1333](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1333)) - `opentelemetry-instrumentation-asgi` Make ASGIGetter.get() compare all keys in a case insensitive manner. ([#1333](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1333)) -- `opentelemetry-instrumentation-celery` Add support for anonymous tasks. - ([#1407](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1407) - Use resp.text instead of resp.body for Falcon 3 to avoid a deprecation warning. ([#1412](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1412)) diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index 5b2624033f..47f79d7e1c 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -83,7 +83,7 @@ def test_task(self): self.assertNotEqual(consumer.parent, producer.context) self.assertEqual(consumer.parent.span_id, producer.context.span_id) self.assertEqual(consumer.context.trace_id, producer.context.trace_id) - + def test_uninstrument(self): CeleryInstrumentor().instrument() CeleryInstrumentor().uninstrument() @@ -99,6 +99,7 @@ def test_uninstrument(self): spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 0) + class TestCelerySignatureTask(TestBase): def setUp(self): super().setUp()