From fdf8a6534dc7e81ec2cc8ec97027be09b5cbaca7 Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Thu, 15 Jun 2023 10:46:19 +0200 Subject: [PATCH 1/5] Unwrap `ExceptionInfo` and `ExceptionWithTraceback` Instead of reporting the `ExceptionInfo` and `ExceptionWithTraceback` wrappers raised by Celery, report the exceptions that they wrap. This ensures that the exception in the OpenTelemetry span has a type and traceback that are meaningful and relevant to the developer. --- CHANGELOG.md | 1 + .../instrumentation/celery/__init__.py | 21 +++++++++++++++++++ .../tests/celery/test_celery_functional.py | 2 +- 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f80b18eaa..423c191b48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1889](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1889)) - Fixed union typing error not compatible with Python 3.7 introduced in `opentelemetry-util-http`, fix tests introduced by patch related to sanitize method for wsgi ([#1913](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1913)) +- `opentelemetry-instrumentation-celery` Unwrap Celery's `ExceptionInfo` errors and report the actual exception that was raised. ([#1863](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1863)) ### Added diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index a216765fb0..3c77fdbaa2 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -64,6 +64,12 @@ def add(x, y): from typing import Collection, Iterable from celery import signals # pylint: disable=no-name-in-module +from billiard.einfo import ExceptionInfo + +try: + from billiard.einfo import ExceptionWithTraceback # pylint: disable=no-name-in-module +except ImportError: + ExceptionWithTraceback = None from opentelemetry import trace from opentelemetry.instrumentation.celery import utils @@ -271,6 +277,21 @@ def _trace_failure(*args, **kwargs): return if ex is not None: + # Unwrap the actual exception wrapped by billiard's + # `ExceptionInfo` and `ExceptionWithTraceback`. + if ( + isinstance(ex, ExceptionInfo) + and ex.exception is not None + ): + ex = ex.exception + + if ( + ExceptionWithTraceback is not None + and isinstance(ex, ExceptionWithTraceback) + and ex.exc is not None + ): + ex = ex.exc + status_kwargs["description"] = str(ex) span.record_exception(ex) span.set_status(Status(**status_kwargs)) diff --git a/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py b/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py index f284272c5d..ebe121fc27 100644 --- a/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py +++ b/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py @@ -279,7 +279,7 @@ def fn_exception(): assert len(span.events) == 1 event = span.events[0] assert event.name == "exception" - assert event.attributes[SpanAttributes.EXCEPTION_TYPE] == "ExceptionInfo" + assert event.attributes[SpanAttributes.EXCEPTION_TYPE] == "Exception" assert SpanAttributes.EXCEPTION_MESSAGE in event.attributes assert ( span.attributes.get(SpanAttributes.MESSAGING_MESSAGE_ID) From 08bd6e03bff6ee05628e48e3cfb23ffa79e625fd Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Thu, 15 Jun 2023 10:48:39 +0200 Subject: [PATCH 2/5] Fix typo The exception is expected, not excepted. Well, I guess it is also excepted, because it's an exception, but you get what I mean. --- .../tests/celery/test_celery_functional.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py b/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py index ebe121fc27..0b9cb3dac5 100644 --- a/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py +++ b/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py @@ -420,7 +420,7 @@ def run(self): assert "Task class is failing" in span.status.description -def test_class_task_exception_excepted(celery_app, memory_exporter): +def test_class_task_exception_expected(celery_app, memory_exporter): class BaseTask(celery_app.Task): throws = (MyException,) From 0293c4e98857b2a6b6d5598ff3e1f650733dfc01 Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Mon, 19 Jun 2023 13:15:21 +0200 Subject: [PATCH 3/5] Reformat file with `black` Reformat the `__init__.py` file in the Celery instrumentation using `black`, fixing a CI linter error. --- .../opentelemetry/instrumentation/celery/__init__.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index 3c77fdbaa2..5537c55d09 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -63,11 +63,13 @@ def add(x, y): from timeit import default_timer from typing import Collection, Iterable -from celery import signals # pylint: disable=no-name-in-module from billiard.einfo import ExceptionInfo +from celery import signals # pylint: disable=no-name-in-module try: - from billiard.einfo import ExceptionWithTraceback # pylint: disable=no-name-in-module + from billiard.einfo import ( # pylint: disable=no-name-in-module + ExceptionWithTraceback, + ) except ImportError: ExceptionWithTraceback = None @@ -279,10 +281,7 @@ def _trace_failure(*args, **kwargs): if ex is not None: # Unwrap the actual exception wrapped by billiard's # `ExceptionInfo` and `ExceptionWithTraceback`. - if ( - isinstance(ex, ExceptionInfo) - and ex.exception is not None - ): + if isinstance(ex, ExceptionInfo) and ex.exception is not None: ex = ex.exception if ( From d09933a7ca19b10edab0b49d254d5552bc8ec2b9 Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Thu, 13 Jul 2023 13:21:46 +0200 Subject: [PATCH 4/5] Address review feedback Use the VERSION attribute exposed by Billiard to decide whether to import ExceptionWithTraceback. Add a test for a failing task and check that the exceptions' type and message are preserved. --- .../instrumentation/celery/__init__.py | 9 +-- .../instrumentation/celery/utils.py | 10 +++ .../tests/celery_test_tasks.py | 9 +++ .../tests/test_tasks.py | 72 ++++++++++++++++++- 4 files changed, 91 insertions(+), 9 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index 5537c55d09..d4e53ff875 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -66,13 +66,6 @@ def add(x, y): from billiard.einfo import ExceptionInfo from celery import signals # pylint: disable=no-name-in-module -try: - from billiard.einfo import ( # pylint: disable=no-name-in-module - ExceptionWithTraceback, - ) -except ImportError: - ExceptionWithTraceback = None - from opentelemetry import trace from opentelemetry.instrumentation.celery import utils from opentelemetry.instrumentation.celery.package import _instruments @@ -84,6 +77,8 @@ def add(x, y): from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace.status import Status, StatusCode +ExceptionWithTraceback = utils.import_exception_with_traceback() + logger = logging.getLogger(__name__) # Task operations diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py index 6f4f9cbc3a..acf949d551 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py @@ -15,6 +15,7 @@ import logging from celery import registry # pylint: disable=no-name-in-module +from billiard import VERSION from opentelemetry.semconv.trace import SpanAttributes @@ -47,6 +48,15 @@ ) +def import_exception_with_traceback(): + if VERSION >= (4, 0, 1): + from billiard.einfo import ExceptionWithTraceback + + return ExceptionWithTraceback + + return None + + # pylint:disable=too-many-branches def set_attributes_from_context(span, context): """Helper to extract meta values from a Celery Context""" diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py index d9660412f0..9ac78f6d8b 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py @@ -24,6 +24,15 @@ class Config: app.config_from_object(Config) +class CustomError(Exception): + pass + + @app.task def task_add(num_a, num_b): return num_a + num_b + + +@app.task +def task_raises(): + raise CustomError("The task failed!") diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index 47f79d7e1c..ed4dbb5b1d 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -18,9 +18,9 @@ from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.test.test_base import TestBase -from opentelemetry.trace import SpanKind +from opentelemetry.trace import SpanKind, StatusCode -from .celery_test_tasks import app, task_add +from .celery_test_tasks import app, task_add, task_raises class TestCeleryInstrumentation(TestBase): @@ -66,6 +66,10 @@ def test_task(self): }, ) + self.assertEqual(consumer.status.status_code, StatusCode.UNSET) + + self.assertEqual(0, len(consumer.events)) + self.assertEqual( producer.name, "apply_async/tests.celery_test_tasks.task_add" ) @@ -84,6 +88,70 @@ def test_task(self): self.assertEqual(consumer.parent.span_id, producer.context.span_id) self.assertEqual(consumer.context.trace_id, producer.context.trace_id) + def test_task_raises(self): + CeleryInstrumentor().instrument() + + result = task_raises.delay() + + timeout = time.time() + 60 * 1 # 1 minutes from now + while not result.ready(): + if time.time() > timeout: + break + time.sleep(0.05) + + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) + self.assertEqual(len(spans), 2) + + consumer, producer = spans + + self.assertEqual( + consumer.name, "run/tests.celery_test_tasks.task_raises" + ) + self.assertEqual(consumer.kind, SpanKind.CONSUMER) + self.assertSpanHasAttributes( + consumer, + { + "celery.action": "run", + "celery.state": "FAILURE", + SpanAttributes.MESSAGING_DESTINATION: "celery", + "celery.task_name": "tests.celery_test_tasks.task_raises", + }, + ) + + self.assertEqual(consumer.status.status_code, StatusCode.ERROR) + + self.assertEqual(1, len(consumer.events)) + event = consumer.events[0] + + self.assertIn(SpanAttributes.EXCEPTION_STACKTRACE, event.attributes) + + self.assertEqual( + event.attributes[SpanAttributes.EXCEPTION_TYPE], "CustomError" + ) + + self.assertEqual( + event.attributes[SpanAttributes.EXCEPTION_MESSAGE], + "The task failed!", + ) + + self.assertEqual( + producer.name, "apply_async/tests.celery_test_tasks.task_raises" + ) + self.assertEqual(producer.kind, SpanKind.PRODUCER) + self.assertSpanHasAttributes( + producer, + { + "celery.action": "apply_async", + "celery.task_name": "tests.celery_test_tasks.task_raises", + SpanAttributes.MESSAGING_DESTINATION_KIND: "queue", + SpanAttributes.MESSAGING_DESTINATION: "celery", + }, + ) + + self.assertNotEqual(consumer.parent, producer.context) + self.assertEqual(consumer.parent.span_id, producer.context.span_id) + self.assertEqual(consumer.context.trace_id, producer.context.trace_id) + def test_uninstrument(self): CeleryInstrumentor().instrument() CeleryInstrumentor().uninstrument() From fb909ed000c74f2f67a22457bf5c94c7c25eb72c Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Fri, 1 Sep 2023 17:31:04 +0200 Subject: [PATCH 5/5] Amend ExceptionWithTraceback import --- .../src/opentelemetry/instrumentation/celery/__init__.py | 7 ++++++- .../src/opentelemetry/instrumentation/celery/utils.py | 9 --------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index d4e53ff875..bb83a5c192 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -76,8 +76,13 @@ def add(x, y): from opentelemetry.propagators.textmap import Getter from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace.status import Status, StatusCode +from billiard import VERSION -ExceptionWithTraceback = utils.import_exception_with_traceback() + +if VERSION >= (4, 0, 1): + from billiard.einfo import ExceptionWithTraceback +else: + ExceptionWithTraceback = None logger = logging.getLogger(__name__) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py index acf949d551..f92c5e03c8 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py @@ -48,15 +48,6 @@ ) -def import_exception_with_traceback(): - if VERSION >= (4, 0, 1): - from billiard.einfo import ExceptionWithTraceback - - return ExceptionWithTraceback - - return None - - # pylint:disable=too-many-branches def set_attributes_from_context(span, context): """Helper to extract meta values from a Celery Context"""