Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using CeleryInstrumentor and linking Celery task trace back to requested trace #1002

Closed
alucarddelta opened this issue Mar 16, 2022 · 14 comments · Fixed by #1407 · May be fixed by goatsthatcode/opentelemetry-python-contrib#1

Comments

@alucarddelta
Copy link

Hi,

I have a FastAPI frontend (with FastAPIInstrumentor set up) that sends tasks to a Celery backend using CeleryInstrumentor. However I having issues with linking the traces together to see an end to end request.

When I view the trace information in Jaeger, the trace appears as 2 separate services, an API service and a Celery service. There is no information linking the 2 together to show 1 full service trace.

However I am able to partially correct this. If I send some context information manually to the Celery task, I can then attach the Context to the span inside the worker. This works to a point, as while all child spans are linked back the API, the main CeleryInstrumentor parent span is not updated therefor is not linked.

Am I missing something that would link or at least update the CeleryInstrumentor parent span back to the API spans.

API send celery task:

TRACER = set_traceing(__name__) # sets TracerProvider(Resource)
CELERY = Celery("tasks",
                broker="redis://localhost:6379",
                backend="redis://localhost:6379"
                )
CELERY.conf.task_default_queue = "request-controller-2-default"

def send_service_request_to_celery(obj_in, celery_task: str) -> AsyncResult:
     with TRACER.start_as_current_span("send_service_request_to_celery") as span:
        tracer_context = span.get_span_context()
        tracer_context_obj = schema.ContextBase(trace_id= tracer_context.trace_id, span_id= tracer_context.span_id, is_remote=True)

        try:
            celery_out: AsyncResult = CELERY.send_task(
                name=celery_task,
                task_id=service_request_article.service_task_id,
                args=[service_request_article.dict(),
                tracer_context_obj.dict()],
            )
        
        except KombuError as error:
            message = (
                f"Failed to send service {obj_in.service_operation} request to request controller via redis."
                f" Service id: {obj_in.service_id}"
            )

celery task

from celery import Celery
from celery.signals import worker_process_init

from opentelemetry import trace, context
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags

from time import sleep
from typing import Union

from app.core.tracer import set_traceing

RequestsInstrumentor().instrument()

TRACER = set_traceing(__name__)
CELERY = Celery("tasks",
                broker="redis://localhost:6379",
                backend="redis://localhost:6379")
CELERY.conf.task_default_queue = "request-controller-2-default"

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
   
    # Configure the tracer to export traces to Jaeger
    jaeger_exporter = JaegerExporter(
        agent_host_name="localhost",
        agent_port=6831,
    )
    span_processor = BatchSpanProcessor(jaeger_exporter)
    trace.get_tracer_provider().add_span_processor(span_processor)
    
    CeleryInstrumentor().instrument()

@CELERY.task(name="processing_task", bind=True, track_started=True)
def broadband_service_request(self, service_obj_in, tracer_context, **kwargs) -> Union[dict, str]:
    ctx = trace.set_span_in_context(NonRecordingSpan(SpanContext(
        trace_id=tracer_context['trace_id'],
        span_id=tracer_context['span_id'],
        is_remote=False,
        trace_flags=TraceFlags(0x01) )))
    context.attach(ctx)
    with TRACER.start_as_current_span("processing_task_span"):
        print(service_obj_in)
        sleep(1)
        with TRACER.start_as_current_span("some_work"):
            sleep(5)
        with TRACER.start_as_current_span("some_more_work"):
            sleep(5)

Search result
image
Unlinked CeleryInstrumentor Span
image
Linked Spans with Updated Child Context
image

@srikanthccv
Copy link
Member

Did you instrument the celery program which is creating the tasks?

@alucarddelta
Copy link
Author

@srikanthccv Yes, in my api I do call the celeryinstrumentor, I have this located in the main.py file which launches the FastAPI application.

from fastapi import FastAPI

from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from app.core.config import SETTINGS
from app.core.tracer import set_traceing
from app.api.api_v1.api import api_router
from app.api.api_v1.metadata import metadata

app = FastAPI(
    title=SETTINGS.PROJECT_NAME_SHORT,
    description=SETTINGS.PROJECT_DESCRIPTION,
    version=SETTINGS.PROJECT_VERSION,
    openapi_tags=metadata,
    docs_url="/",
    openapi_url="/api/v1/openapi.json",
)

TRACER = set_traceing(__name__)

jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

CeleryInstrumentor().instrument()
FastAPIInstrumentor.instrument_app(app, excluded_urls="/api/v1/openapi.json")

app.include_router(api_router, prefix="/api/v1")

@srikanthccv
Copy link
Member

Ideally trace context propagation should happen automatically. If you can share a repo with some minimal reproducible application that would greatly help debug the issue.

@goatsthatcode
Copy link

Hey I'm running into this issue as well, I tried to reproduce it in a sample app I just made (that pretty closely follows the dependency versions of everything) and I'm unable to reproduce it there, but I am experiencing a similar issue in the original codebase I based this off of.

The issue I have in my case though is that the CeleryInstrumentor().instrument() command is not instrumenting the celery client (but it is doing the worker) which is the reason the spans are not included under the same trace ID.

I will update the above example if I am able to reproduce my issue locally here. Any ideas or suggestions would be greatly appreciated :)

@alucarddelta
Copy link
Author

alucarddelta commented Apr 1, 2022

I have created a project demonstrating the issue.
https://github.com/alucarddelta/issue_opentelm_celery_linking

Test1 - FastAPI -> Celery Worker -> FastAPI
Test2 - FastAPI -> Fast API (checks that FastAPI instrument is working)
Test3 - FastAPI -> Celery Worker -> same Celery Worker (different task) -> FastAPI
Test4 - FastAPI -> Celery Worker -> different Celery Worker -> FastAPI

In the app1 FastAPI notes, it has details of each of the issues observed.

@alucarddelta
Copy link
Author

Example of the same Task1. Where App1 is not linking to "run/no_modification" task and /test1_b does have it linked, but has a disconnected span.

image

@alucarddelta
Copy link
Author

alucarddelta commented Apr 5, 2022

Inside the worker, I'm able to set_attribute on the CeleryInstrumentor span. So it does appear the span is created and is accessible as expected. But the linking to the FastAPI via the CeleryInstrumentor issue remains.

@CELERY.task(name="test_task", bind=True, track_started=True)
def test_task(self: Task, obj_in, **kwargs):
    current_span = trace.get_current_span()
    current_span.set_attribute("var", obj_in["some_var"])

@goatsthatcode
Copy link

@alucarddelta I was able to identify and reproduce this issue now in my own example code above and looking over your example code and I think we are experiencing the same issue. (you are using send_task, I'm using signature but I think the net effect is the same based on looking over your example).

what I see happening is that the worker tasks need to be discoverable by the calling service (e.g. fastapi) since it needs the task object itself based on this line

one of the the _trace_before_publish() checks is to pull the string_id of the task sent and then pull the task object itself from the task registry. it looks like in both examples now the tasks are not imported or shared between modules and only called out to through the broker by string identifier.

I believe this is a bug since celery does support this use case (calling unknown tasks by string identifier) but maybe there is a way to stub the registration as a workaround for now. I can update here if I find more

@srikanthccv
Copy link
Member

@goatsthatcode I think I also hit a wall with similar findings. I can't recall specific details exactly as it has been some time I looked into this issue but I was stuck at this point celery/celery#7189.

@blumamir
Copy link
Member

I am able to reproduce the issue with this minimalistic snippet:

from celery import Celery, group, signature

BROKER_URL = 'redis://localhost:6379/0'
app = Celery('tasks', broker=BROKER_URL)

@app.task
def foo():
    return 'hello foo'

group([signature("tasks.bar"), foo.s()])()

When I instrument and run this code, I get only one "publish" span for the "foo" task. The task published as signature("tasks.bar") does not generate any "publish" span and does not inject any propagation context into the headers.

@goatsthatcode
Copy link

goatsthatcode commented Apr 13, 2022

I am able to reproduce the issue with this minimalistic snippet:

from celery import Celery, group, signature

BROKER_URL = 'redis://localhost:6379/0'
app = Celery('tasks', broker=BROKER_URL)

@app.task
def foo():
    return 'hello foo'

group([signature("tasks.bar"), foo.s()])()

When I instrument and run this code, I get only one "publish" span for the "foo" task. The task published as signature("tasks.bar") does not generate any "publish" span and does not inject any propagation context into the headers.

This is a great example. I'm going to update the above PR to try and adapt this into a unittest since I wasn't sure it would reproduce the issue with the app being initialized in the same module namespace. I guess I agree with the idea that sending a request to task that doesnt even exist should still create a publish span since that is what is happening in the code (even though in this case we never would expect the task to actually complete).

@danielloganking
Copy link

danielloganking commented Oct 11, 2022

I have recently run into this behavior and think there may be an additional wrinkle. Specifically I have found that, due to the way CeleryInstrumentor registers the span context propagation to the before_task_publish signal, the _trace_before_publish() callable is only available in contexts where CeleryInstrumentor().instrument() has actually been executed, ie not on the client if you follow the documented usage to exec on worker_process_init signal. Interestingly, calling CeleryInstrumentor().instrument() on the FastAPI client then "resolves" the problem because the client also has the callable registered and therefore it gets executed.

This can be confirmed by starting both the client and worker in a REPL and executing

$ import celery

$ any('_trace_before_publish' in str(r) for r in celery.signals.before_task_publish.receivers)

The client instance (after startup) will return False while the worker instance returns True. Once I added the instrumentation call to both the app startup path of the FastAPI client and the worker (via the suggested worker_process_init signal) then trace contexts were properly joined.

@inbarpatashnik
Copy link

inbarpatashnik commented May 16, 2023

It's exactly the solution we were looking for!! @naormalca
It worked for us🤗
Is it known when the new version with this fix is going to be released? We are really looking forward to it!
@srikanthccv

@MrMegaMango
Copy link

I am on opentelemetry-instrumentation-celery==0.48b0 and celetry creates a seperate trace

@background_app.task
@tracer.start_as_current_span("trace-leftout-along-with-celery")
def foo(
    context_carrier=None,
):
 
    if context_carrier is None:
        context_carrier = {}  
    ctx = TraceContextTextMapPropagator().extract(carrier=context_carrier)
    with tracer.start_as_current_span("span-in-my-main-trace", context=ctx):
        bar()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment