Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
Use the VERSION attribute exposed by Billiard to decide whether to
import ExceptionWithTraceback.

Add a test for a failing task and check that the exceptions' type
and message are preserved.
  • Loading branch information
unflxw committed Jul 13, 2023
1 parent 6166d1f commit 63a8376
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@ def add(x, y):
from billiard.einfo import ExceptionInfo
from celery import signals # pylint: disable=no-name-in-module

try:
from billiard.einfo import ( # pylint: disable=no-name-in-module
ExceptionWithTraceback,
)
except ImportError:
ExceptionWithTraceback = None

from opentelemetry import trace
from opentelemetry.instrumentation.celery import utils
from opentelemetry.instrumentation.celery.package import _instruments
Expand All @@ -84,6 +77,8 @@ def add(x, y):
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.status import Status, StatusCode

ExceptionWithTraceback = utils.import_exception_with_traceback()

logger = logging.getLogger(__name__)

# Task operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging

from celery import registry # pylint: disable=no-name-in-module
from billiard import VERSION

from opentelemetry.semconv.trace import SpanAttributes

Expand Down Expand Up @@ -46,6 +47,12 @@
"state",
)

def import_exception_with_traceback():
if VERSION >= (4, 0, 1):
from billiard.einfo import ExceptionWithTraceback
return ExceptionWithTraceback

return None

# pylint:disable=too-many-branches
def set_attributes_from_context(span, context):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ class Config:
app = Celery(broker="memory:///")
app.config_from_object(Config)

class CustomError(Exception):
pass

@app.task
def task_add(num_a, num_b):
return num_a + num_b

@app.task
def task_raises():
raise CustomError("The task failed!")
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import SpanKind
from opentelemetry.trace import SpanKind, StatusCode

from .celery_test_tasks import app, task_add
from .celery_test_tasks import app, task_add, task_raises


class TestCeleryInstrumentation(TestBase):
Expand Down Expand Up @@ -66,6 +66,10 @@ def test_task(self):
},
)

self.assertEqual(consumer.status.status_code, StatusCode.UNSET)

self.assertEqual(0, len(span.events))

self.assertEqual(
producer.name, "apply_async/tests.celery_test_tasks.task_add"
)
Expand All @@ -84,6 +88,68 @@ def test_task(self):
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)

def test_task_raises(self):
CeleryInstrumentor().instrument()

result = task_raises.delay()

timeout = time.time() + 60 * 1 # 1 minutes from now
while not result.ready():
if time.time() > timeout:
break
time.sleep(0.05)

spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
self.assertEqual(len(spans), 2)

consumer, producer = spans

self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_raises")
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
self.assertSpanHasAttributes(
consumer,
{
"celery.action": "run",
"celery.state": "FAILURE",
SpanAttributes.MESSAGING_DESTINATION: "celery",
"celery.task_name": "tests.celery_test_tasks.task_raises",
},
)

self.assertEqual(consumer.status.status_code, StatusCode.ERROR)

self.assertEqual(1, len(span.events))
event = span.events[0]

self.assertIn(SpanAttributes.EXCEPTION_STACKTRACE, event.attributes)
self.assertEqual(
event.attributes[SpanAttributes.EXCEPTION_TYPE],
"CustomError"
)

self.assertEqual(
event.attributes[SpanAttributes.EXCEPTION_TYPE],
"The task failed!"
)

self.assertEqual(
producer.name, "apply_async/tests.celery_test_tasks.task_raises"
)
self.assertEqual(producer.kind, SpanKind.PRODUCER)
self.assertSpanHasAttributes(
producer,
{
"celery.action": "apply_async",
"celery.task_name": "tests.celery_test_tasks.task_raises",
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
SpanAttributes.MESSAGING_DESTINATION: "celery",
},
)

self.assertNotEqual(consumer.parent, producer.context)
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)

def test_uninstrument(self):
CeleryInstrumentor().instrument()
CeleryInstrumentor().uninstrument()
Expand Down

0 comments on commit 63a8376

Please sign in to comment.