arq icon indicating copy to clipboard operation
arq copied to clipboard

Event hooks

Open sondrelg opened this issue 3 years ago • 14 comments

Hi @samuelcolvin!

Would you be open to having more signals/event hooks added to arq?

My use case

I use this request-id middleware, to attach UUIDs to logs, where each request has a unique UID. The middleware works well for code executed by the web server, but when enqueueing jobs with arq, there is no way to propagate the UUID to the logs generated by the worker.

This is something you can do with, e.g., Celery. You can use Celery's signals to pass state from your issuing process to the worker (by attaching data to the task object), like this:

from typing import Any, Dict
from uuid import uuid4

from celery import Task
from celery.signals import before_task_publish, task_prerun, task_postrun

from asgi_correlation_id.context_vars import correlation_id

header_key = 'CORRELATION_ID'


# Run on the web server (or any other process) when calling task.delay()
@before_task_publish.connect
def transfer_correlation_id(headers: Dict[str, str], **kwargs: Any) -> None:
    """
    Attach request ID to the task headers.
    """
    cid = correlation_id.get() 
    if cid:
        headers[header_key] = cid


# Run on the worker before the task is executed
@task_prerun.connect
def load_correlation_id(task: Task, **kwargs: Any) -> None:
    """
    Set request ID from the task header, if it exists.
    """
    id_value = task.request.get(header_key)
    if id_value:
        correlation_id.set(id_value)
    else:
        generated_correlation_id = uuid4().hex
        correlation_id.set(generated_correlation_id)


# Run on the worker after the task is executed
@task_postrun.connect
def cleanup(**kwargs: Any) -> None:
    """
    Clear context vars, to avoid re-using values in the next task.
    """
    correlation_id.set(None)

This seems like it would be simple to add to ARQ as well. Would you accept a PR for something like this?

sondrelg avatar Nov 08 '21 13:11 sondrelg

I noticed after typing this out, that https://github.com/samuelcolvin/arq/pull/274 is relevant here. It adds the Celery-equivalents of task_prerun and task_postrun.

To completely solve my use-case I believe I would also need a way to transfer request UUIDs from the web server to the task_prerun hook. Something that persists through job serialization/deserialization but can be set outside of the connection.enqueue_job call explicitly.

sondrelg avatar Nov 08 '21 13:11 sondrelg

Since #274 was merged, all this would require is:

  1. For a "on-enqueue"-style hook to be added
  2. For there to be a way to set state in that hook, that persists to the worker when running task_prerun

sondrelg avatar Jan 27 '22 20:01 sondrelg

We're also very interested in this. Happy to join you on a PR for this, @sondrelg if the idea is accepted by @samuelcolvin. Should probably wait until #259 is finalized/aioredis 2 support is added though.

JonasKs avatar Mar 05 '22 10:03 JonasKs

Humm, thinking about this, I'd rather it was structured like middleware in http frameworks.

I'm intending to work on total rewrite of arq to be very close to fastapi but for long running tasks, I'll include middleware in that.

I don't think there' much point in working on a PR now.

samuelcolvin avatar Mar 05 '22 12:03 samuelcolvin

That sounds very interesting, and is what we want too! Let us know if we can do anything.

JonasKs avatar Mar 05 '22 12:03 JonasKs

Just out of curiosity, how will a middleware concept work for something like event hooks, where you need things to happen in distinct places in an events lifecycle?

Also, this isn't incredibly urgent for me, but do you have rough timeline in mind for when you'll do the rewrite?

sondrelg avatar Mar 07 '22 20:03 sondrelg

Well like middleware, you could do stuff before or after the job or catch errors and do stuff with them.

Timeframe is hard to say, hopefully in the next month or two.

samuelcolvin avatar Mar 07 '22 21:03 samuelcolvin

If I understood you correctly you could propagate a variable to a job by using ctx dict and then set contextvar , also write a custom LogHandler to receive correlation id from the contextvar.

teror4uks avatar Jul 01 '22 09:07 teror4uks

Yes that's right; but a package cannot make this the default behavior for all tasks queued with the enqueue_job method call, which is what I'd like to enable.

In other words, I would like there to be a way for the asgi-correlation-id package to automatically pass this information through to the job and load it into the correct contextvar, without the user needing to do anything extra.

sondrelg avatar Jul 03 '22 20:07 sondrelg

I'm in the exact same situation, FastAPI, want to use the library mentioned above to get some correlation id and want to pass this to arq jobs. I understand that there is currently no PR being worked on as the re-write should solve the issue, but did you guys found a workaround to make this work in the meantime?

waza-ari avatar Dec 14 '22 13:12 waza-ari

You can always pass the correlation ID into the job manually.

worker:

from asgi_correlation_id.context import correlation_id

async def job(cid: uuid4):
    correlation_id.set(cid)
    # business logic ..

FastAPI:

from asgi_correlation_id.context import correlation_id

async def my_view():
    arq_redis.enqueue_job('job', cid=correlation_id.get())

(Written on my phone, so it may have some errors, but the idea should work. Remember to set up logging in your worker too.)

JonasKs avatar Dec 14 '22 15:12 JonasKs

Thank you @JonasKs , that was extremely helpful. I basically was able to nearly use exactly what you've posted. I'll post my complete solution here in case anyone else is looking for it

Worker Tasks

from uuid import uuid4

from asgi_correlation_id import correlation_id

async def job(cid: uuid4):
    correlation_id.set(cid)
    # business logic ..

FastAPI:

from asgi_correlation_id import correlation_id

async def my_view():
    arq_redis.enqueue_job('job', cid=correlation_id.get())

In addition to that, I was also using arq cronjobs, which might call additional jobs as well. I wanted to have a correlation ID as well for them, which was a bit more complex:

from functools import wraps

def partial(f, *args, **kwargs):
    """
    Wrapper for cron functions to inject corellation_id
    """

    @wraps(f)
    async def partial_function(ctx):
        return await f(*args, **kwargs)

    return partial_function

class WorkerSettings:  # pylint: disable=too-few-public-methods
    """
    Settings for the ARQ worker.
    """

    cron_jobs = [
        cron(partial(your_job, {}, cid=uuid4().hex), hour={1}, minute={12}),
        ...
   ]

waza-ari avatar Dec 14 '22 19:12 waza-ari

For cron jobs you could also do this:


async def set_correlation_id(ctx: 'Ctx') -> None:
    correlation_id.set(uuid4().hex)


class WorkerSettings:
    on_job_start = set_correlation_id

JonasKs avatar Dec 14 '22 19:12 JonasKs

I probably could, but some of the jobs can be called both as cron and manually from FastAPI, that's why I thought using the on_job_start hook would not work, as then the job could not be called from FastAPI anymore

waza-ari avatar Dec 14 '22 19:12 waza-ari