Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery instrument didn't propagate baggage #784

Closed
homholueng opened this issue Oct 31, 2021 · 1 comment · Fixed by #2385
Closed

Celery instrument didn't propagate baggage #784

homholueng opened this issue Oct 31, 2021 · 1 comment · Fixed by #2385
Labels
bug Something isn't working

Comments

@homholueng
Copy link

Describe your environment

  • Platform: Darwin-20.3.0-x86_64-i386-64bit
  • Python Version: Python 3.6.7
  • Installed Dependencies:
aiocontextvars==0.2.2
amqp==2.6.1
appnope==0.1.2
backcall==0.2.0
billiard==3.6.4.0
black==21.9b0
celery==4.4.7
click==8.0.3
contextvars==2.4
dataclasses==0.8
decorator==5.1.0
Deprecated==1.2.13
googleapis-common-protos==1.53.0
grpcio==1.41.1
immutables==0.16
importlib-metadata==4.8.1
ipython==7.0.0
ipython-genutils==0.2.0
jedi==0.18.0
kombu==4.6.11
mypy-extensions==0.4.3
opentelemetry-api==1.6.2
opentelemetry-exporter-jaeger==1.6.2
opentelemetry-exporter-jaeger-proto-grpc==1.6.2
opentelemetry-exporter-jaeger-thrift==1.6.2
opentelemetry-instrumentation==0.25b2
opentelemetry-instrumentation-celery==0.25b2
opentelemetry-sdk==1.6.2
opentelemetry-semantic-conventions==0.25b2
parso==0.8.2
pathspec==0.9.0
pexpect==4.8.0
pickleshare==0.7.5
platformdirs==2.4.0
prompt-toolkit==2.0.10
protobuf==3.19.1
ptyprocess==0.7.0
Pygments==2.10.0
pytz==2021.3
regex==2021.10.23
simplegeneric==0.8.1
six==1.16.0
thrift==0.15.0
tomli==1.2.2
traitlets==4.3.3
typed-ast==1.4.3
typing-extensions==3.10.0.2
vine==1.3.0
wcwidth==0.2.5
wrapt==1.13.2
zipp==3.6.0

Steps to reproduce

Code Sample:

from opentelemetry import trace, baggage
from opentelemetry.context import attach, detach
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor

from celery import Celery
from celery.signals import worker_process_init


@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    trace.set_tracer_provider(TracerProvider())
    span_processor = BatchSpanProcessor(ConsoleSpanExporter())
    trace.get_tracer_provider().add_span_processor(span_processor)
    CeleryInstrumentor().instrument()


app = Celery("my_tasks", broker="amqp://localhost")


@app.task
def add(x, y):
    print("### project_id: %s" % baggage.get_baggage("project_id"))
    return x + y


if __name__ == "__main__":
    trace.set_tracer_provider(TracerProvider())
    span_processor = BatchSpanProcessor(ConsoleSpanExporter())
    trace.get_tracer_provider().add_span_processor(span_processor)
    CeleryInstrumentor().instrument()
    ctx = baggage.set_baggage("project_id", "project-1")
    attach(ctx)
    add.delay(42, 50)
    detach(ctx)
  1. start celery worker with command: celery -A my_tasks worker
  2. send task with command: python my_tasks.py

What is the expected behavior?

The worker should output:

[2021-10-31 16:45:38,867: WARNING/ForkPoolWorker-7] ### project_id: project-1

What is the actual behavior?

The actual worker output:

[2021-10-31 16:45:38,867: WARNING/ForkPoolWorker-7] ### project_id: None

Additional context

Clue1

baggage header are present in rabbitmq message:

image

Clue2

Dive into the code, I found the baggage has been extracted.

image

Try to modify the code

Base on clue1 and clue2, I try to modify opentelemetry.instrumentation.celery.__init__.CeleryInstrumentor._trace_prerun function and this is what I do:

    def _trace_prerun(self, *args, **kwargs):
        task = utils.retrieve_task(kwargs)
        task_id = utils.retrieve_task_id(kwargs)

        if task is None or task_id is None:
            return

        request = task.request
        tracectx = extract(request, getter=celery_getter) or None
        
        ### add this line
        attach(tracectx)
        ### add this line

        logger.debug("prerun signal start task_id=%s", task_id)

        operation_name = f"{_TASK_RUN}/{task.name}"
        span = self._tracer.start_span(
            operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
        )

        activation = trace.use_span(span, end_on_exit=True)
        activation.__enter__()  # pylint: disable=E1101
        utils.attach_span(task, task_id, (span, activation))

and then the worker output is work as expected:

[2021-10-31 16:55:26,745: WARNING/ForkPoolWorker-7] ### project_id: project-1
@homholueng homholueng added the bug Something isn't working label Oct 31, 2021
@homholueng homholueng changed the title Celery instrument didn't attach to context which extract from task.request Celery instrument didn't propagate baggage Oct 31, 2021
@hananto1
Copy link

hananto1 commented Mar 20, 2024

This issue still persists after #1407 was merged , does anyone have a solution for the baggage not propgating with the celery instrumentation?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
2 participants