Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event hooks #277

Open
sondrelg opened this issue Nov 8, 2021 · 14 comments
Open

Event hooks #277

sondrelg opened this issue Nov 8, 2021 · 14 comments

Comments

@sondrelg
Copy link
Contributor

sondrelg commented Nov 8, 2021

Hi @samuelcolvin!

Would you be open to having more signals/event hooks added to arq?

My use case

I use this request-id middleware, to attach UUIDs to logs, where each request has a unique UID. The middleware works well for code executed by the web server, but when enqueueing jobs with arq, there is no way to propagate the UUID to the logs generated by the worker.

This is something you can do with, e.g., Celery. You can use Celery's signals to pass state from your issuing process to the worker (by attaching data to the task object), like this:

from typing import Any, Dict
from uuid import uuid4

from celery import Task
from celery.signals import before_task_publish, task_prerun, task_postrun

from asgi_correlation_id.context_vars import correlation_id

header_key = 'CORRELATION_ID'


# Run on the web server (or any other process) when calling task.delay()
@before_task_publish.connect
def transfer_correlation_id(headers: Dict[str, str], **kwargs: Any) -> None:
    """
    Attach request ID to the task headers.
    """
    cid = correlation_id.get() 
    if cid:
        headers[header_key] = cid


# Run on the worker before the task is executed
@task_prerun.connect
def load_correlation_id(task: Task, **kwargs: Any) -> None:
    """
    Set request ID from the task header, if it exists.
    """
    id_value = task.request.get(header_key)
    if id_value:
        correlation_id.set(id_value)
    else:
        generated_correlation_id = uuid4().hex
        correlation_id.set(generated_correlation_id)


# Run on the worker after the task is executed
@task_postrun.connect
def cleanup(**kwargs: Any) -> None:
    """
    Clear context vars, to avoid re-using values in the next task.
    """
    correlation_id.set(None)

This seems like it would be simple to add to ARQ as well. Would you accept a PR for something like this?

@sondrelg
Copy link
Contributor Author

sondrelg commented Nov 8, 2021

I noticed after typing this out, that #274 is relevant here. It adds the Celery-equivalents of task_prerun and task_postrun.

To completely solve my use-case I believe I would also need a way to transfer request UUIDs from the web server to the task_prerun hook. Something that persists through job serialization/deserialization but can be set outside of the connection.enqueue_job call explicitly.

@sondrelg sondrelg changed the title Signals Event hooks Nov 8, 2021
@sondrelg
Copy link
Contributor Author

Since #274 was merged, all this would require is:

  1. For a "on-enqueue"-style hook to be added
  2. For there to be a way to set state in that hook, that persists to the worker when running task_prerun

@JonasKs
Copy link
Collaborator

JonasKs commented Mar 5, 2022

We're also very interested in this. Happy to join you on a PR for this, @sondrelg if the idea is accepted by @samuelcolvin. Should probably wait until #259 is finalized/aioredis 2 support is added though.

@samuelcolvin
Copy link
Member

Humm, thinking about this, I'd rather it was structured like middleware in http frameworks.

I'm intending to work on total rewrite of arq to be very close to fastapi but for long running tasks, I'll include middleware in that.

I don't think there' much point in working on a PR now.

@JonasKs
Copy link
Collaborator

JonasKs commented Mar 5, 2022

That sounds very interesting, and is what we want too! Let us know if we can do anything.

@sondrelg
Copy link
Contributor Author

sondrelg commented Mar 7, 2022

Just out of curiosity, how will a middleware concept work for something like event hooks, where you need things to happen in distinct places in an events lifecycle?

Also, this isn't incredibly urgent for me, but do you have rough timeline in mind for when you'll do the rewrite?

@samuelcolvin
Copy link
Member

Well like middleware, you could do stuff before or after the job or catch errors and do stuff with them.

Timeframe is hard to say, hopefully in the next month or two.

@teror4uks
Copy link

If I understood you correctly you could propagate a variable to a job by using ctx dict and then set contextvar , also write a custom LogHandler to receive correlation id from the contextvar.

@sondrelg
Copy link
Contributor Author

sondrelg commented Jul 3, 2022

Yes that's right; but a package cannot make this the default behavior for all tasks queued with the enqueue_job method call, which is what I'd like to enable.

In other words, I would like there to be a way for the asgi-correlation-id package to automatically pass this information through to the job and load it into the correct contextvar, without the user needing to do anything extra.

@waza-ari
Copy link

I'm in the exact same situation, FastAPI, want to use the library mentioned above to get some correlation id and want to pass this to arq jobs. I understand that there is currently no PR being worked on as the re-write should solve the issue, but did you guys found a workaround to make this work in the meantime?

@JonasKs
Copy link
Collaborator

JonasKs commented Dec 14, 2022

You can always pass the correlation ID into the job manually.

worker:

from asgi_correlation_id.context import correlation_id

async def job(cid: uuid4):
    correlation_id.set(cid)
    # business logic ..

FastAPI:

from asgi_correlation_id.context import correlation_id

async def my_view():
    arq_redis.enqueue_job('job', cid=correlation_id.get())

(Written on my phone, so it may have some errors, but the idea should work. Remember to set up logging in your worker too.)

@waza-ari
Copy link

waza-ari commented Dec 14, 2022

Thank you @JonasKs , that was extremely helpful. I basically was able to nearly use exactly what you've posted. I'll post my complete solution here in case anyone else is looking for it

Worker Tasks

from uuid import uuid4

from asgi_correlation_id import correlation_id

async def job(cid: uuid4):
    correlation_id.set(cid)
    # business logic ..

FastAPI:

from asgi_correlation_id import correlation_id

async def my_view():
    arq_redis.enqueue_job('job', cid=correlation_id.get())

In addition to that, I was also using arq cronjobs, which might call additional jobs as well. I wanted to have a correlation ID as well for them, which was a bit more complex:

from functools import wraps

def partial(f, *args, **kwargs):
    """
    Wrapper for cron functions to inject corellation_id
    """

    @wraps(f)
    async def partial_function(ctx):
        return await f(*args, **kwargs)

    return partial_function

class WorkerSettings:  # pylint: disable=too-few-public-methods
    """
    Settings for the ARQ worker.
    """

    cron_jobs = [
        cron(partial(your_job, {}, cid=uuid4().hex), hour={1}, minute={12}),
        ...
   ]

@JonasKs
Copy link
Collaborator

JonasKs commented Dec 14, 2022

For cron jobs you could also do this:

async def set_correlation_id(ctx: 'Ctx') -> None:
    correlation_id.set(uuid4().hex)


class WorkerSettings:
    on_job_start = set_correlation_id

@waza-ari
Copy link

I probably could, but some of the jobs can be called both as cron and manually from FastAPI, that's why I thought using the on_job_start hook would not work, as then the job could not be called from FastAPI anymore

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants