-
-
Notifications
You must be signed in to change notification settings - Fork 176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Event hooks #277
Comments
I noticed after typing this out, that #274 is relevant here. It adds the Celery-equivalents of To completely solve my use-case I believe I would also need a way to transfer request UUIDs from the web server to the |
Since #274 was merged, all this would require is:
|
We're also very interested in this. Happy to join you on a PR for this, @sondrelg if the idea is accepted by @samuelcolvin. Should probably wait until #259 is finalized/aioredis 2 support is added though. |
Humm, thinking about this, I'd rather it was structured like middleware in http frameworks. I'm intending to work on total rewrite of arq to be very close to fastapi but for long running tasks, I'll include middleware in that. I don't think there' much point in working on a PR now. |
That sounds very interesting, and is what we want too! Let us know if we can do anything. |
Just out of curiosity, how will a middleware concept work for something like event hooks, where you need things to happen in distinct places in an events lifecycle? Also, this isn't incredibly urgent for me, but do you have rough timeline in mind for when you'll do the rewrite? |
Well like middleware, you could do stuff before or after the job or catch errors and do stuff with them. Timeframe is hard to say, hopefully in the next month or two. |
If I understood you correctly you could propagate a variable to a job by using |
Yes that's right; but a package cannot make this the default behavior for all tasks queued with the In other words, I would like there to be a way for the |
I'm in the exact same situation, FastAPI, want to use the library mentioned above to get some correlation id and want to pass this to arq jobs. I understand that there is currently no PR being worked on as the re-write should solve the issue, but did you guys found a workaround to make this work in the meantime? |
You can always pass the correlation ID into the job manually. worker: from asgi_correlation_id.context import correlation_id
async def job(cid: uuid4):
correlation_id.set(cid)
# business logic .. FastAPI: from asgi_correlation_id.context import correlation_id
async def my_view():
arq_redis.enqueue_job('job', cid=correlation_id.get()) (Written on my phone, so it may have some errors, but the idea should work. Remember to set up logging in your worker too.) |
Thank you @JonasKs , that was extremely helpful. I basically was able to nearly use exactly what you've posted. I'll post my complete solution here in case anyone else is looking for it Worker Tasks from uuid import uuid4
from asgi_correlation_id import correlation_id
async def job(cid: uuid4):
correlation_id.set(cid)
# business logic .. FastAPI: from asgi_correlation_id import correlation_id
async def my_view():
arq_redis.enqueue_job('job', cid=correlation_id.get()) In addition to that, I was also using arq cronjobs, which might call additional jobs as well. I wanted to have a correlation ID as well for them, which was a bit more complex: from functools import wraps
def partial(f, *args, **kwargs):
"""
Wrapper for cron functions to inject corellation_id
"""
@wraps(f)
async def partial_function(ctx):
return await f(*args, **kwargs)
return partial_function
class WorkerSettings: # pylint: disable=too-few-public-methods
"""
Settings for the ARQ worker.
"""
cron_jobs = [
cron(partial(your_job, {}, cid=uuid4().hex), hour={1}, minute={12}),
...
] |
For cron jobs you could also do this: async def set_correlation_id(ctx: 'Ctx') -> None:
correlation_id.set(uuid4().hex)
class WorkerSettings:
on_job_start = set_correlation_id |
I probably could, but some of the jobs can be called both as cron and manually from FastAPI, that's why I thought using the |
Hi @samuelcolvin!
Would you be open to having more signals/event hooks added to arq?
My use case
I use this request-id middleware, to attach UUIDs to logs, where each request has a unique UID. The middleware works well for code executed by the web server, but when enqueueing jobs with
arq
, there is no way to propagate the UUID to the logs generated by the worker.This is something you can do with, e.g., Celery. You can use Celery's signals to pass state from your issuing process to the worker (by attaching data to the task object), like this:
This seems like it would be simple to add to ARQ as well. Would you accept a PR for something like this?
The text was updated successfully, but these errors were encountered: