falcon icon indicating copy to clipboard operation
falcon copied to clipboard

Ability to execute logic after the response is fully streamed

Open vytas7 opened this issue 5 years ago • 9 comments

Right now, the middleware method process_response can be used to post-process responses after they are routed and handled by responders.

However, in case response body is a stream, one may want to execute logic when the stream iterable is exausted. We are already wrapping req.stream in some cases, so we could extend this to wrap .close() with code to execute the relevant method (finalize_response?) from middlewares added to the API.

Use cases could include cleaning up resources that were used to stream the response, such as releasing database connections back to the pool.

vytas7 avatar May 05 '19 22:05 vytas7

Maybe a callback attached to the response object would be better in this case? Like a after_stream_closed or something? Otherwise a middleware hook that runs only when there is a stream response could be it a bit strange? Ideally you could use process_response to add the callback when appropriate and the application would call it if needed?

CaselIT avatar Feb 01 '20 10:02 CaselIT

I do not think finalize_response or whatever name we settle on should be only called for streamed responses. For a "normal" bytestring response body, we should still run it, just after process_response. Or we could wrap the bytestring in a one-item iterable with .close() to make it closer follow the PEP-3333 spirit.

Of course, ASGI is a different story as it has more explicit flow control support from the standard itself. See also: https://github.com/falconry/falcon/issues/1581

vytas7 avatar Jul 07 '20 09:07 vytas7

For a "normal" bytestring response body, we should still run it, just after process_response

I think it would be called after the body has been serialized, so if media is used, after the media handlers have run.

Regarding how to behave in a stream vs bytestring body, the idea of wrapping the bytestring in a stream is interesting, if it does not impact performance in a significant way. That would make both type of body behave in the same way, meaning that the callback would not be called by falcon but by the wsgi server close call, after the falcon app has handed over control to it.

CaselIT avatar Jul 07 '20 10:07 CaselIT

~~FWIW, process_response is also called after the media handlers have run.~~ :arrow_left: as pointed out by @CaselIT below -- plainly untrue.

Re wrapping the response body in a stream, it is hard to assess performance across WSGI servers, but one could benchmark the most popular ones. Anyway, it is probably not a big deal since we would only be wrapping if any finalize_responses are present in the middleware stack.

That would make both type of body behave in the same way, meaning that the callback would not be called by falcon but by the wsgi server close call, after the falcon app has handed over control to it.

That's the idea, at least it is as close to "after the response is fully streamed to the client" as WSGI can provide.

vytas7 avatar Jul 07 '20 10:07 vytas7

FWIW, process_response is also called after the media handlers have run.

I think they are called just before. https://github.com/falconry/falcon/blob/0fc91cbf028779cf0f7a8761dded149683429adb/falcon/app.py#L325-L338

CaselIT avatar Jul 07 '20 11:07 CaselIT

Doh, sorry... My bad :grimacing: Anyway, finalize_response would very obviously be executed after everything else in the request-response cycle.

vytas7 avatar Jul 07 '20 11:07 vytas7

+1 I'd love to have a way to run cleanup logic after the stream has been consumed, or after the stream raises, or the connection is closed.

I'm using the following to wrap the response stream and execute a callback when read raises or close is called[0].

import io

class StreamWithCallback(io.BufferedIOBase):
    def __init__(self, wrapped: io.BufferedIOBase, callback=None) -> None:
        self._wrapped = wrapped
        self._callback = callback

    def close(self):
        if self._wrapped:
            try:
                self._wrapped.close()
                self._wrapped = None
            finally:
                if self._callback:
                    self._callback()

    def read(self, n: int) -> bytes:
        try:
            return self._wrapped.read(n)
        except:
            self.close()
            raise

And I'm using it like so:

def on_get(self, req: falcon.Request, resp: falcon.Response, id: str) -> None:
    stream = self.controller.get_read_stream(id)
    def cleanup():
        self.controller.remove_by_id(id)
    resp.status = falcon.HTTP_200
    resp.stream = ReadWithCallback(stream, cleanup)

[0] Most wsgi frameworks should call close on the stream, although I only verified this for gunicorn sync threads.

numberoverzero avatar Jul 07 '22 22:07 numberoverzero