arq
arq copied to clipboard
Event hooks
Hi @samuelcolvin!
Would you be open to having more signals/event hooks added to arq?
My use case
I use this request-id middleware, to attach UUIDs to logs, where each request has a unique UID. The middleware works well for code executed by the web server, but when enqueueing jobs with arq
, there is no way to propagate the UUID to the logs generated by the worker.
This is something you can do with, e.g., Celery. You can use Celery's signals to pass state from your issuing process to the worker (by attaching data to the task object), like this:
from typing import Any, Dict
from uuid import uuid4
from celery import Task
from celery.signals import before_task_publish, task_prerun, task_postrun
from asgi_correlation_id.context_vars import correlation_id
header_key = 'CORRELATION_ID'
# Run on the web server (or any other process) when calling task.delay()
@before_task_publish.connect
def transfer_correlation_id(headers: Dict[str, str], **kwargs: Any) -> None:
"""
Attach request ID to the task headers.
"""
cid = correlation_id.get()
if cid:
headers[header_key] = cid
# Run on the worker before the task is executed
@task_prerun.connect
def load_correlation_id(task: Task, **kwargs: Any) -> None:
"""
Set request ID from the task header, if it exists.
"""
id_value = task.request.get(header_key)
if id_value:
correlation_id.set(id_value)
else:
generated_correlation_id = uuid4().hex
correlation_id.set(generated_correlation_id)
# Run on the worker after the task is executed
@task_postrun.connect
def cleanup(**kwargs: Any) -> None:
"""
Clear context vars, to avoid re-using values in the next task.
"""
correlation_id.set(None)
This seems like it would be simple to add to ARQ as well. Would you accept a PR for something like this?
I noticed after typing this out, that https://github.com/samuelcolvin/arq/pull/274 is relevant here. It adds the Celery-equivalents of task_prerun
and task_postrun
.
To completely solve my use-case I believe I would also need a way to transfer request UUIDs from the web server to the task_prerun
hook. Something that persists through job serialization/deserialization but can be set outside of the connection.enqueue_job
call explicitly.
Since #274 was merged, all this would require is:
- For a "on-enqueue"-style hook to be added
- For there to be a way to set state in that hook, that persists to the worker when running
task_prerun
We're also very interested in this. Happy to join you on a PR for this, @sondrelg if the idea is accepted by @samuelcolvin. Should probably wait until #259 is finalized/aioredis 2 support is added though.
Humm, thinking about this, I'd rather it was structured like middleware in http frameworks.
I'm intending to work on total rewrite of arq to be very close to fastapi but for long running tasks, I'll include middleware in that.
I don't think there' much point in working on a PR now.
That sounds very interesting, and is what we want too! Let us know if we can do anything.
Just out of curiosity, how will a middleware concept work for something like event hooks, where you need things to happen in distinct places in an events lifecycle?
Also, this isn't incredibly urgent for me, but do you have rough timeline in mind for when you'll do the rewrite?
Well like middleware, you could do stuff before or after the job or catch errors and do stuff with them.
Timeframe is hard to say, hopefully in the next month or two.
If I understood you correctly you could propagate a variable to a job by using ctx
dict and then set contextvar , also write a custom LogHandler to receive correlation id from the contextvar.
Yes that's right; but a package cannot make this the default behavior for all tasks queued with the enqueue_job
method call, which is what I'd like to enable.
In other words, I would like there to be a way for the asgi-correlation-id
package to automatically pass this information through to the job and load it into the correct contextvar, without the user needing to do anything extra.
I'm in the exact same situation, FastAPI, want to use the library mentioned above to get some correlation id and want to pass this to arq jobs. I understand that there is currently no PR being worked on as the re-write should solve the issue, but did you guys found a workaround to make this work in the meantime?
You can always pass the correlation ID into the job manually.
worker:
from asgi_correlation_id.context import correlation_id
async def job(cid: uuid4):
correlation_id.set(cid)
# business logic ..
FastAPI:
from asgi_correlation_id.context import correlation_id
async def my_view():
arq_redis.enqueue_job('job', cid=correlation_id.get())
(Written on my phone, so it may have some errors, but the idea should work. Remember to set up logging in your worker too.)
Thank you @JonasKs , that was extremely helpful. I basically was able to nearly use exactly what you've posted. I'll post my complete solution here in case anyone else is looking for it
Worker Tasks
from uuid import uuid4
from asgi_correlation_id import correlation_id
async def job(cid: uuid4):
correlation_id.set(cid)
# business logic ..
FastAPI:
from asgi_correlation_id import correlation_id
async def my_view():
arq_redis.enqueue_job('job', cid=correlation_id.get())
In addition to that, I was also using arq cronjobs, which might call additional jobs as well. I wanted to have a correlation ID as well for them, which was a bit more complex:
from functools import wraps
def partial(f, *args, **kwargs):
"""
Wrapper for cron functions to inject corellation_id
"""
@wraps(f)
async def partial_function(ctx):
return await f(*args, **kwargs)
return partial_function
class WorkerSettings: # pylint: disable=too-few-public-methods
"""
Settings for the ARQ worker.
"""
cron_jobs = [
cron(partial(your_job, {}, cid=uuid4().hex), hour={1}, minute={12}),
...
]
For cron jobs you could also do this:
async def set_correlation_id(ctx: 'Ctx') -> None:
correlation_id.set(uuid4().hex)
class WorkerSettings:
on_job_start = set_correlation_id
I probably could, but some of the jobs can be called both as cron and manually from FastAPI, that's why I thought using the on_job_start
hook would not work, as then the job could not be called from FastAPI anymore