diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index 6d79f45115..b54c7ee06a 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -183,10 +183,17 @@ def _trace_before_publish(self, *args, **kwargs): task = utils.retrieve_task_from_sender(kwargs) task_id = utils.retrieve_task_id_from_message(kwargs) - if task is None or task_id is None: + if task_id is None: return - operation_name = f"{_TASK_APPLY_ASYNC}/{task.name}" + if task is None: + # task is an anonymous task send using send_task or using canvas workflow + # Signatures() to send to a task not in the current processes dependency + # tree + task_name = kwargs.get("sender", "unknown") + else: + task_name = task.name + operation_name = f"{_TASK_APPLY_ASYNC}/{task_name}" span = self._tracer.start_span( operation_name, kind=trace.SpanKind.PRODUCER ) @@ -195,7 +202,7 @@ def _trace_before_publish(self, *args, **kwargs): if span.is_recording(): span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC) span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id) - span.set_attribute(_TASK_NAME_KEY, task.name) + span.set_attribute(_TASK_NAME_KEY, task_name) utils.set_attributes_from_context(span, kwargs) activation = trace.use_span(span, end_on_exit=True) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py index 77abb89af8..6f4f9cbc3a 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py @@ -132,6 +132,8 @@ def attach_span(task, task_id, span, is_publish=False): NOTE: We cannot test for this well yet, because we do not run a celery worker, and cannot run `task.apply_async()` """ + if task is None: + return span_dict = getattr(task, CTX_KEY, None) if span_dict is None: span_dict = {} diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index 6b2ee9a94c..0af786e59f 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -60,9 +60,7 @@ def test_task(self): }, ) - self.assertEqual( - producer.name, "apply_async/tests.celery_test_tasks.task_add" - ) + self.assertEqual(producer.name, "apply_async/tests.celery_test_tasks.task_add") self.assertEqual(producer.kind, SpanKind.PRODUCER) self.assertSpanHasAttributes( producer, @@ -77,3 +75,45 @@ def test_task(self): self.assertNotEqual(consumer.parent, producer.context) self.assertEqual(consumer.parent.span_id, producer.context.span_id) self.assertEqual(consumer.context.trace_id, producer.context.trace_id) + + +class TestCelerySignatureTask(TestBase): + def setUp(self): + super().setUp() + + def start_app(*args, **kwargs): + # Add an additional task that will not be registered with parent thread + @app.task + def hidden_task(x): + return x * 2 + + self._worker = app.Worker(app=app, pool="solo", concurrency=1) + return self._worker.start(*args, **kwargs) + + self._thread = threading.Thread(target=start_app) + self._worker = app.Worker(app=app, pool="solo", concurrency=1) + self._thread.daemon = True + self._thread.start() + + def tearDown(self): + super().tearDown() + self._worker.stop() + self._thread.join() + + def test_hidden_task(self): + # no-op since already instrumented + CeleryInstrumentor().instrument() + import ipdb + + ipdb.set_trace() + + res = app.signature("app.hidden_task", (2,)).apply_async() + while not res.ready(): + time.sleep(0.05) + + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) + self.assertEqual(len(spans), 1) + + producer = spans + + self.assertEqual(producer.name, "apply_async/app.hidden_task") diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py index 8b5352fe3a..55aa3eec1e 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py @@ -185,6 +185,13 @@ def fn_task(): utils.detach_span(fn_task, task_id) self.assertEqual(utils.retrieve_span(fn_task, task_id), (None, None)) + def test_optional_task_span_attach(self): + task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f" + span = trace._Span("name", mock.Mock(spec=trace_api.SpanContext)) + + # assert this is is a no-aop + self.assertIsNone(utils.attach_span(None, task_id, span)) + def test_span_delete_empty(self): # ensure detach_span doesn't raise an exception if span is not present @self.app.task