Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery publish from celery creating separate traces #609

Open
zionsofer opened this issue Jul 28, 2021 · 8 comments
Open

Celery publish from celery creating separate traces #609

zionsofer opened this issue Jul 28, 2021 · 8 comments
Labels
bug Something isn't working triaged

Comments

@zionsofer
Copy link

Describe your environment
python==3.7.4/3.7.6
Platform==Local - MacOS(Darwin-20.5.0-x86_64-i386-64bit), Remote - Linux (many distros)
Otel release==0.22b0 (verified on OTEL main as well - same code and same behavior)

Steps to reproduce
When running two celery apps where one uses another, when instrumenting the celery apps with OpenTelemetry, the second celery worker creates a span within a separate trace from the first one.

Reproduction example:
otel_celery_first.py

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor

from celery import Celery
from celery.signals import worker_process_init


@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    trace.set_tracer_provider(TracerProvider())
    span_processor = BatchSpanProcessor(ConsoleSpanExporter())
    trace.get_tracer_provider().add_span_processor(span_processor)
    CeleryInstrumentor().instrument()


app = Celery("first", broker="amqp://localhost")


@app.task(name="first.ping")
def ping():
    print("first ping")
    second_app = Celery("second", broker="amqp://localhost")
    second_app.send_task("second.ping", queue="second")

otel_celery_second.py

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor

from celery import Celery
from celery.signals import worker_process_init


@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    trace.set_tracer_provider(TracerProvider())
    span_processor = BatchSpanProcessor(ConsoleSpanExporter())
    trace.get_tracer_provider().add_span_processor(span_processor)
    CeleryInstrumentor().instrument()


app = Celery("second", broker="amqp://localhost")


@app.task(name="second.ping")
def ping():
    print("second ping")

test_celery.py

from celery import Celery

app = Celery("first", broker="amqp://localhost")

app.send_task("first.ping", queue="first")

Running the workers:
celery -A otel_celery_first worker -n first@%h -Q first
celery -A otel_celery_second worker -n second@%h -Q second

Sending the task:
python test_celery.py

What is the expected behavior?
Two spans, with the same trace ID.

First celery worker output:

[2021-07-27 20:35:03,691: WARNING/ForkPoolWorker-8] first ping
{
    "name": "run/first.ping",
    "context": {
        "trace_id": "0x15ac5bd9daf196693bf11a2dc5973763",
        "span_id": "0x83b707a77a03780a",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CONSUMER",
    "parent_id": null,
    "start_time": "2021-07-27T17:35:03.691780Z",
    "end_time": "2021-07-27T17:35:03.724748Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "celery.action": "run",
        "celery.state": "SUCCESS",
        "messaging.conversation_id": "8bc5049e-0ad0-461f-a91b-e2e182ffd30d",
        "messaging.destination": "first",
        "celery.delivery_info": "{'exchange': '', 'routing_key': 'first', 'priority': 0, 'redelivered': False}",
        "messaging.message_id": "8bc5049e-0ad0-461f-a91b-e2e182ffd30d",
        "celery.reply_to": "bf7eae06-9d67-35ba-976c-4e58d3612c7e",
        "celery.hostname": "gen39617@Zion-MacBook-Pro",
        "celery.task_name": "first.ping"
    },
    "events": [],
    "links": [],
    "resource": {
        "telemetry.sdk.language": "python",
        "telemetry.sdk.name": "opentelemetry",
        "telemetry.sdk.version": "1.1.0",
        "service.name": "unknown_service"
    }
}

Second celery worker output:

[2021-07-27 20:35:03,731: WARNING/ForkPoolWorker-8] second ping
{
    "name": "run/second.ping",
    "context": {
        "trace_id": "0x15ac5bd9daf196693bf11a2dc5973763",
        "span_id": "0xa8ff39fb540c86cc",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CONSUMER",
    "parent_id": null,
    "start_time": "2021-07-27T17:35:03.731708Z",
    "end_time": "2021-07-27T17:35:03.733191Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "celery.action": "run",
        "celery.state": "SUCCESS",
        "messaging.conversation_id": "6dead959-c702-4aed-960d-68641bec23e4",
        "messaging.destination": "second",
        "celery.delivery_info": "{'exchange': '', 'routing_key': 'second', 'priority': 0, 'redelivered': False}",
        "messaging.message_id": "6dead959-c702-4aed-960d-68641bec23e4",
        "celery.reply_to": "dbf19329-41f0-32da-9b83-626ea38407f8",
        "celery.hostname": "gen39573@Zion-MacBook-Pro",
        "celery.task_name": "second.ping"
    },
    "events": [],
    "links": [],
    "resource": {
        "telemetry.sdk.language": "python",
        "telemetry.sdk.name": "opentelemetry",
        "telemetry.sdk.version": "1.1.0",
        "service.name": "unknown_service"
    }
}

What is the actual behavior?
Two spans, with a different trace ID for each:

First celery worker output:

[2021-07-27 20:35:03,691: WARNING/ForkPoolWorker-8] first ping
{
    "name": "run/first.ping",
    "context": {
        "trace_id": "0x15ac5bd9daf196693bf11a2dc5973763",
        "span_id": "0x83b707a77a03780a",
        "trace_state": "[]"
    },
    # same attributes
}

Second celery worker output:

[2021-07-27 20:35:03,731: WARNING/ForkPoolWorker-8] second ping
{
    "name": "run/second.ping",
    "context": {
        "trace_id": "0x2488345758db06e139f722b4bba08cb3,
        "span_id": "0xa8ff39fb540c86cc",
        "trace_state": "[]"
    },
    # same attributes
}

I would expect a span of apply_async which is also missing from the first worker.

Additional context
I believe this has got to do with the _trace_before_publish signal handler:

def _trace_before_publish(self, *args, **kwargs):
    task = utils.retrieve_task_from_sender(kwargs)
    task_id = utils.retrieve_task_id_from_message(kwargs)

    if task is None or task_id is None:
         return

The task is tried to be retrieved from the celery registry by the sender name, which is the task name that we send to. But, the first worker does not explicitly declare the tasks it sends as part of its registry, so the task is not found and thus returns None which causes the function to exit prematurely. Perhaps there's a better to way to handle this?

@zionsofer zionsofer added the bug Something isn't working label Jul 28, 2021
@github-actions
Copy link

This issue was marked stale due to lack of activity. It will be closed in 30 days.

@owais owais added help wanted Extra attention is needed and removed help wanted Extra attention is needed labels Aug 28, 2021
@yossicohn
Copy link

Any news with this one that is problematic for celery usage.
any workaround advice?

@srikanthccv
Copy link
Member

I don't think there is any work around here other than fixing the bug.

@yossicohn
Copy link

yossicohn commented Nov 29, 2021

@owais, @lonewolf3739
@zionsofer and I found a workaround for this.

Workaround
We have found that adding the task definition ("second.ping") to the first Service i.e.
adding a task to the worker tasks registry, would enable the
_trace_before_publish function to find the task in the registry.

It would be suffice to add for example:

@app.task(name="second.ping")
def ping():
    pass

Now,
task = utils.retrieve_task_from_sender(kwargs) can get the task.
and from there it would mostly work and the trace would contain both span is expected.

if the workaround is ok, we can update the documentation.
as it is a major use case when using celery, since, usually,
a worker (first service) publishes a task that would be consumed by another service (second worker).

@owais another thing is the SpanKind that is created for the current processing worker.
in the function _trace_prerun I see that the SpanKind=CONSUMER and it seems that SpanKind=SERVER is what should be there.
The current resulting Service Map, while using the workaround mentioned above will not show a node for the Second Celery worker, while it should as it is a server in the system.
Changing the SpanKind=SERVER fixes that.
WDYT ?

@owais
Copy link
Contributor

owais commented Dec 1, 2021

SpanKind should definitely NOT be SERVER. This looks like a limitation in the APM backend you are using.

I don't think users should have to worry about the workaround. Ideally, instrumenting with Otel should require zero code changes especially to existing non-otel related code. IMO this is a bug in the Celery instrumentation and should be fixed so it works for all users.

@blumamir
Copy link
Member

IMO this is a bug in the Celery instrumentation and should be fixed so it works for all users.

@owais - I run into this issue and wondering what the right fix is.
I see that now we use the task instance to store a dictionary from task_id to span.
Since we store the dict on the task object with setattr, we cannot support str tasks.
I wonder if it makes sense to instead store the open spans on a celery-global dictionary (stored as instrumentation property) with key (task_name, task_id, is_publish) in order to resolve this issue.

@srikanthccv
Copy link
Member

@blumamir You might want to check this proposed WIP fix by a contributor goatsthatcode#1. I think there are multiple Github issues created for this same problem.

@blumamir
Copy link
Member

@blumamir You might want to check this proposed WIP fix by a contributor goatsthatcode#1. I think there are multiple Github issues created for this same problem.

Thanks! I commented on the PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triaged
Projects
None yet
Development

No branches or pull requests

5 participants