procrastinate icon indicating copy to clipboard operation
procrastinate copied to clipboard

Middleware feature

Open medihack opened this issue 10 months ago • 15 comments
trafficstars

In #1262 and #1289 we mentioned that a middleware feature would be helpful in some scenarios. In the middleware branch, I have already started to implement it, but I would like to discuss the implementation (@ewjoachim). I use a similar implementation to Django for its middleware. I would like to discuss two API alternatives.

  1. The way I implemented it currently:
ProcessTask: TypeAlias = Callable[[tasks.Task, job_context.JobContext], Awaitable[Any]]
Middleware: TypeAlias = Callable[
    [ProcessTask],
    Callable[[tasks.Task, job_context.JobContext], Awaitable[Any]],
]

def simple_middleware(process_task: ProcessTask):
    async def middleware(task: tasks.Task, context: job_context.JobContext):
        # Do something before the task is processed
        result = await process_task(task, context)
        # Do something after the task is processed
        return result
    return middleware
  1. An alternative (maybe better) way:
ProcessJob: TypeAlias = Callable[[job_context.JobContext], Awaitable[Any]]
Middleware: TypeAlias = Callable[
    [ProcessJob],
    Callable[[job_context.JobContext], Awaitable[Any]],
]

def simple_middleware(process_job: ProcessTask):
    async def middleware(context: job_context.JobContext):
        # Do something before the job is processed
        result = await process_job(context)
        # Do something after the job is processed
        return result
    return middleware

After some thought, 2) is better because process_job also wraps the part where the job status is saved back to the db. This is especially important if we allow the user to raise a StopWorker exception (https://github.com/procrastinate-org/procrastinate/issues/1262#issuecomment-2568467649). One thing that bothers me a bit is that in both scenarios, the context was already created, and thereby a start_timestamp is already set. So when the user adds some pause in the middleware (e.g. for #1289) before the job is processed, this pause will be included in the overall job duration. I'm not sure if this is what we want or not.

medihack avatar Jan 18 '25 13:01 medihack

One thing that bothers me a bit is that in both scenarios, the context was already created, and thereby a start_timestamp is already set

Aha, you assume that the process_job function of the lowest middleware is the real process_job, but we could have our own "last middleware", that sets this right before the task actually starts.

There's a 3rd alternative (not saying it's better, but I think it's simpler):


async def middleware(process_job: ProcessTask, context: job_context.JobContext):
    # Do something before the job is processed
  	result = await process_job(context)
    # Do something after the job is processed
	return result

(your implementation highlight the fact that we will always have the same process_job while it's less visible in mine.

The 3 implementation put the focus on the idea that the middleware could modify the job_context (well, it's immutable, but it could send a different instance). I'm still trying to try and think whether it's a good idea.

A 4th alternative could be:

async def middleware(process_job: ..., context: job_context.JobContext):
    # Do something before the job is processed
   result = await process_job()
    # Do something after the job is processed
	return result

(note that not much value would be lost if we didn't bother returning the job's result)

(Or even, 5th:

async def middleware(process_job: Awaitable, context: job_context.JobContext):
    # Do something before the job is processed
   result = await process_job
    # Do something after the job is processed
	return result

but in this case, in the event one would choose not to execute the task, they'd get a Warning for awaitble never awaited)

I think my favorite one might be the 3rd, but I'm open to discussion if you think the 2nd (your preferred?) is better.

ewjoachim avatar Jan 18 '25 14:01 ewjoachim

One thing that bothers me a bit is that in both scenarios, the context was already created, and thereby a start_timestamp is already set

Aha, you assume that the process_job function of the lowest middleware is the real process_job, but we could have our own "last middleware", that sets this right before the task actually starts.

So, you mean that the context we pass to the middleware is not the same context we internally use to process the task? We re-create the context in our process_job function and then use that one?

There's a 3rd alternative (not saying it's better, but I think it's simpler):

async def middleware(process_job: ProcessTask, context: job_context.JobContext): # Do something before the job is processed result = await process_job(context) # Do something after the job is processed return result (your implementation highlight the fact that we will always have the same process_job while it's less visible in mine.

Yes, I have taken the Django middleware as a model (https://docs.djangoproject.com/en/5.1/topics/http/middleware/#writing-your-own-middleware) and thought it would be a good fit as our process_job is, like the get_response function in Django, always the same one. But I don't have a strong opinion about that.

If we recreate the context and don't use the one passed to the middleware, the 4th alternative may be better because it clarifies that the context can't be changed (or better to say it would not affect anything).

medihack avatar Jan 18 '25 16:01 medihack

I am not sure that rate limiting is best solved by middleware. By the time the middleware runs, the job is already fetched and in a doing status. One could probably achieve some sort of rate limiting through middleware but I don't think that would be efficient, or pratical for rate limiting across multiple workers.

If all we want with the middleware is to inject logic before/after the task, I would lean towards what @ewjoachim labels option 3. I would just name the function differently:

async def middleware(next: ProcessTask, context: job_context.JobContext):
        # Do something before the job is processed
  	result = await next(context)
        # Do something after the job is processed
	return result

I would be interested to see a few examples of useful middleware.

onlyann avatar Jan 27 '25 08:01 onlyann

I would be interested to see a few examples of useful middleware.

Things we currently do by monkey patching procrastinate that this could be useful for:

  • Wrap task execution in our own structured logging/OpenTelemetry spans/other instrumentation
  • Wrap the task in a common dependency injection container

I could also envision us using middleware to wrap certain tasks in an asyncio timeout.

ashleyheath avatar Jan 27 '25 10:01 ashleyheath

I am not sure that rate limiting is best solved by middleware.

Yes, trying different implementations, I came to the same conclusion.

I even wonder if a middleware feature is flexible enough or if we should not better implement several callback hooks the user can plug into, e.g.:

  • before_job_fetched
  • after_job_fetched
  • after_job_processed

Or both, a middleware that wraps the job processing and a before_job_fetched callback.

medihack avatar Jan 27 '25 14:01 medihack

My preference for rate limiting is to make it a distinct feature that is implemented at the DB level.

I will try something out and report back so we have something to discuss.

In the meantime, the middleware feature could be limited to chaining functions as part of task processing, which would help with examples shared above.

onlyann avatar Jan 27 '25 20:01 onlyann

My preference for rate limiting is to make it a distinct feature that is implemented at the DB level.

Yes, a dedicated rate-limiting feature probably makes the most sense. And then provide a rate limit per task?

medihack avatar Jan 28 '25 09:01 medihack

In the meantime, the middleware feature could be limited to chaining functions as part of task processing, which would help with examples shared above.

@onlyann my only concern with the chaining approach is that it will make it harder to use python's native context managers, which in my experience is a large part of the usefulness of middleware-style patterns.

ashleyheath avatar Jan 28 '25 12:01 ashleyheath

@onlyann my only concern with the chaining approach is that it will make it harder to use python's native context managers, which in my experience is a large part of the usefulness of middleware-style patterns.

Context managers from contextlib?

Can you please give me an example where it wouldn't work with middleware?

onlyann avatar Jan 28 '25 20:01 onlyann

Sorry, I might have misinterpreted the above conversation - it seemed that the discussion had turned to implementing distinct before/after callbacks rather than a single middleware interface (i.e. your original sketch here). If that's still the plan then ignore me 😄

ashleyheath avatar Jan 28 '25 21:01 ashleyheath

After thinking about it even more, I will implement a middleware that wraps the worker._process_job. My hook plans are on hold, at least for now. I think @onlyann is right that a dedicated rate-limiting feature is better than using a hook for that.

medihack avatar Jan 28 '25 21:01 medihack

After thinking about it even more, I will implement a middleware that wraps the worker._process_job. My hook plans are on hold, at least for now. I think @onlyann is right that a dedicated rate-limiting feature is better than using a hook for that.

If this is what is wrapped, there will be no ability for a middleware to modify the outcome of a task.

onlyann avatar Jan 28 '25 22:01 onlyann

After thinking about it even more, I will implement a middleware that wraps the worker._process_job. My hook plans are on hold, at least for now. I think @onlyann is right that a dedicated rate-limiting feature is better than using a hook for that.

If this is what is wrapped, there will be no ability for a middleware to modify the outcome of a task.

What would you suggest? Wrap the ensure_async?

medihack avatar Jan 28 '25 22:01 medihack

After thinking about it even more, I will implement a middleware that wraps the worker._process_job. My hook plans are on hold, at least for now. I think @onlyann is right that a dedicated rate-limiting feature is better than using a hook for that.

If this is what is wrapped, there will be no ability for a middleware to modify the outcome of a task.

What would you suggest? Wrap the ensure_async?

Yes. That would be similar to the hand rolled middleware from the docs.

onlyann avatar Jan 28 '25 22:01 onlyann

Yes. That would be similar to the hand rolled middleware from the docs.

The main reason why I wrap _process_job instead of ensure_async is that an exception can be more easily raised to stop the worker. But now I think raising an exception is not the way to go, as the job status depends too much on where that exception is raised (before or after calling the provided process_job function). A better alternative could be to pass the worker instance itself so that worker.stop() can be called from within the middleware. And then we can also wrap the ensure_async instead of process_job.

medihack avatar Feb 01 '25 22:02 medihack