Skip to content

Commit

Permalink
Do not create a span when task is triggered by Celery Beat (#2510)
Browse files Browse the repository at this point in the history
We create a span for submitting a Celery task for execution (when apply_async() is called). In cases where web frameworks are calling apply_async() this is fine, because the web framework created a transaction where the span is attached.

When Celery Beat wakes up and is calling apply_async() this is not good, because there is no transaction and then the span ID of the newly created span will be given to the task as parent_span_id leading to orphaned transactions.

So in case apply_async() is called by Celery Beat, we do not create a span for submitting the task for execution.
  • Loading branch information
antonpirker authored Nov 16, 2023
1 parent 5a6b5d4 commit 0c9803a
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 5 deletions.
34 changes: 30 additions & 4 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from typing import TypeVar
from typing import Union

from sentry_sdk.tracing import Span
from sentry_sdk._types import EventProcessor, Event, Hint, ExcInfo

F = TypeVar("F", bound=Callable[..., Any])
Expand Down Expand Up @@ -133,6 +134,16 @@ def _now_seconds_since_epoch():
return time.time()


class NoOpMgr:
def __enter__(self):
# type: () -> None
return None

def __exit__(self, exc_type, exc_value, traceback):
# type: (Any, Any, Any) -> None
return None


def _wrap_apply_async(f):
# type: (F) -> F
@wraps(f)
Expand All @@ -154,11 +165,26 @@ def apply_async(*args, **kwargs):
if not propagate_traces:
return f(*args, **kwargs)

with hub.start_span(
op=OP.QUEUE_SUBMIT_CELERY, description=args[0].name
) as span:
try:
task_started_from_beat = args[1][0] == "BEAT"
except IndexError:
task_started_from_beat = False

task = args[0]

span_mgr = (
hub.start_span(op=OP.QUEUE_SUBMIT_CELERY, description=task.name)
if not task_started_from_beat
else NoOpMgr()
) # type: Union[Span, NoOpMgr]

with span_mgr as span:
with capture_internal_exceptions():
headers = dict(hub.iter_trace_propagation_headers(span))
headers = (
dict(hub.iter_trace_propagation_headers(span))
if span is not None
else {}
)
if integration.monitor_beat_tasks:
headers.update(
{
Expand Down
40 changes: 39 additions & 1 deletion tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import pytest

from sentry_sdk import Hub, configure_scope, start_transaction, get_current_span
from sentry_sdk.integrations.celery import CeleryIntegration, _get_headers
from sentry_sdk.integrations.celery import (
CeleryIntegration,
_get_headers,
_wrap_apply_async,
)

from sentry_sdk._compat import text_type

Expand Down Expand Up @@ -555,3 +559,37 @@ def dummy_task(self, message):
headers={"sentry-propagate-traces": False},
).get()
assert transaction_trace_id != task_transaction_id


def test_apply_async_manually_span(sentry_init):
sentry_init(
integrations=[CeleryIntegration()],
)

def dummy_function(*args, **kwargs):
headers = kwargs.get("headers")
assert "sentry-trace" in headers
assert "baggage" in headers

wrapped = _wrap_apply_async(dummy_function)
wrapped(mock.MagicMock(), (), headers={})


def test_apply_async_from_beat_no_span(sentry_init):
sentry_init(
integrations=[CeleryIntegration()],
)

def dummy_function(*args, **kwargs):
headers = kwargs.get("headers")
assert "sentry-trace" not in headers
assert "baggage" not in headers

wrapped = _wrap_apply_async(dummy_function)
wrapped(
mock.MagicMock(),
[
"BEAT",
],
headers={},
)

0 comments on commit 0c9803a

Please sign in to comment.