granian icon indicating copy to clipboard operation
granian copied to clipboard

Disconnect event is not propogated to the middleware while a request is processing

Open speksen-kb01 opened this issue 9 months ago • 11 comments

Hello,

We were working on an HttpDisconnectMiddleware, which would cancel the ongoing tasks when user cancels the requests. We were using this same middleware with other servers like uvicorn and hypercorn, which would trigger the exception upon cancellation, but in the case of granian, it seems like the disconnect event is queued and not sent to the middleware, and only sent to it after the task finishes.

I've created a repo for showcasing the effect, https://github.com/speksen-kb01/fastapi-disconnect-example

Is there a reason why we couldn't get the event while a request is being processed? If so, what do you think it is related to and is it fixable by changing the middleware to some extent?

Funding

  • You can sponsor this specific effort via a Polar.sh pledge below
  • We receive the pledge once the issue is completed & verified
Fund with Polar

speksen-kb01 avatar May 03 '24 13:05 speksen-kb01

@speksen-kb01 yes, at the moment the network I/O in Granian doesn't allow to be notified when connection gets closed prematurely by clients. In fact, network I/O in Granian today is eager, meaning checks over connection state are run only when actually reading and writing: if you change the code to try writing back empty contents during that 10 seconds, you should be able to get the exception you're looking for.

This might be changed in the future – if a valid reason to change the current behaviour exists – but it really depends on the lower library capabilities. I have a question indeed: can you provide a use-case for this? 'cause looking at the middleware code you provided, there's nothing special the app might benefit from, a part getting an exception – if I got it correctly?

gi0baro avatar May 06 '24 08:05 gi0baro

Imagine that the endpoint is something that consumes too much resource and it can be called multiple times by a single client in parallel. In order not to waste resource we want to cancel as soon as we receive a disconnect event

aeb-dev avatar May 06 '24 09:05 aeb-dev

@speksen-kb01 yes, at the moment the network I/O in Granian doesn't allow to be notified when connection gets closed prematurely by clients. In fact, network I/O in Granian today is eager, meaning checks over connection state are run only when actually reading and writing: if you change the code to try writing back empty contents during that 10 seconds, you should be able to get the exception you're looking for.

This might be changed in the future – if a valid reason to change the current behaviour exists – but it really depends on the lower library capabilities. I have a question indeed: can you provide a use-case for this? 'cause looking at the middleware code you provided, there's nothing special the app might benefit from, a part getting an exception – if I got it correctly?

I see, thanks for the explanation. For the use case part, Imagine a "request chain" that depend on each other to finish, and it takes some time, for each to finish it's task. user -> service1 -> service2 -> service3 Let's assume user just cancelled the request, after each service called each other. Using the middleware, service1 would get an exception, basically cancelling whatever it's doing. Since service1 abruptly disconnects, service2 also would get an exception and so on along the chain, cancelling each task. Without the middleware, if the user disconnects, the chain continues it's work for naught, making db connections etc. Basically continuing hogging resources.

speksen-kb01 avatar May 06 '24 11:05 speksen-kb01

Just to clarify on this: this seems to be not achievable due to how Hyper is currently designed. I gonna put this on-hold with the "wontfix" label, but I will also add the ability to pledge this issue just in case someone wants to make additional investigations on Hyper and/or work on changes with the Hyper team in order to support this.

gi0baro avatar Jun 07 '24 13:06 gi0baro

I don't really know the intricacies of the granian project, so I dont know if this could help but I'd like to also add that hyper seems to drop the request task upon a disconnect. Could be related to this issue somehow: https://github.com/tokio-rs/axum/discussions/1094#discussioncomment-2958921

speksen-kb01 avatar Jun 10 '24 12:06 speksen-kb01

This issue was closed as inactive https://github.com/emmett-framework/granian/issues/324 but I think it relates to this. The original author probably got jack of granian and went back to something stable like gunicorn/uwsgi. I manage multiple django projects accross various organisations. One of them we stuck with granian but put connection timeouts on the postgresql server - this fixed the issue. One of them we went back to unicorn because the system admin insisted it wasn't his systems problem and didn't want to change anything if it "wasn't his fault" - not that it was but the postgresql server timeouts would have worked also.

Either way, it would be good if this could be resolved.

SkyHyve avatar Jun 18 '24 23:06 SkyHyve

This issue was closed as inactive #324 but I think it relates to this. The original author probably got jack of granian and went back to something stable like gunicorn/uwsgi. I manage multiple django projects accross various organisations. One of them we stuck with granian but put connection timeouts on the postgresql server - this fixed the issue. One of them we went back to unicorn because the system admin insisted it wasn't his systems problem and didn't want to change anything if it "wasn't his fault" - not that it was but the postgresql server timeouts would have worked also.

Either way, it would be good if this could be resolved.

Definitely not related, even if you're using ASGI with Django. In regards of #324, release 1.4.4 is getting built right now, which also includes fixes in closing Django response iterator.

gi0baro avatar Jun 18 '24 23:06 gi0baro

thanks for confirmation, can you please explain what the actual issue is for django though so I understand?

SkyHyve avatar Jun 18 '24 23:06 SkyHyve

thanks for confirmation, can you please explain what the actual issue is for django though so I understand?

Django uses the close method of its response classes to close used database connections (see also https://github.com/emmett-framework/granian/issues/306). So if – for any reason – Granian is not able to call that method, connections will be kept in place. 1.4.4 puts in a more aggressive policy for calling that method, and thus should definitely fix related issues.

gi0baro avatar Jun 18 '24 23:06 gi0baro

Thank you, will the --backpressure argument still be required for this to take effect? Would it be possible to please include an example usage in the releases/changelog for 1.4.4?

SkyHyve avatar Jun 18 '24 23:06 SkyHyve

Thank you, will the --backpressure argument still be required for this to take effect? Would it be possible to please include an example usage in the releases/changelog for 1.4.4?

Backpressure is still needed, the usage is the same for all the 1.4.x releases. If you have additional questions, please open up dedicated discussions, we shouldn't pollute this unrelated issue with all this.

gi0baro avatar Jun 18 '24 23:06 gi0baro

Hi! This seems to be an important and problematic issue: cleanup stage won't be executed if client cancelled request.

This hurts bad in my case. I want to initialize—and shutdown once done—a costly resource for each request to an SSE endpoint in Litestar application. Since cleanups are not executed after request is cancelled, this leads to a memory leak.

import asyncio
import typing

import litestar
from litestar.response import ServerSentEvent


async def print_every_second() -> typing.NoReturn:
    while True:
        print("hi")
        await asyncio.sleep(1)


async def iter_something() -> typing.AsyncIterable[str]:
    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(print_every_second())
        yield "hi"


@litestar.get("/")
async def root() -> ServerSentEvent:
    return ServerSentEvent(iter_something())


app = litestar.Litestar([root])

In this example hi will be printed forever, even when requests are cancelled.

vrslev avatar Oct 02 '24 21:10 vrslev

@gi0baro

vrslev avatar Oct 02 '24 21:10 vrslev

Hi! This seems to be an important and problematic issue: cleanup stage won't be executed if client cancelled request.

This hurts bad in my case. I want to initialize—and shutdown once done—a costly resource for each request to an SSE endpoint in Litestar application. Since cleanups are not executed after request is cancelled, this leads to a memory leak.

[...]

In this example hi will be printed forever, even when requests are cancelled.

The example you provided doesn't seem to be related to the OP. Granian already sends disconnect events on send failures, so streamed responses should be able to catch cancellations. This smells like a Litestar issue to me, enabling debug in asyncio should probably show Granian tasks' lifecycles are correct.

On another side, even if it turns out to be actually related, you can find in previous discussions all the details on why this is on hold, and the overall state of this issue is quite clear IMO: contributions trying to address the actual OP issue are more than welcome. Mentioning me directly won't change any of this.

gi0baro avatar Oct 03 '24 09:10 gi0baro

Thanks for your answer. I didn't mean to be rude, sorry.

This issue seems to be a two-piece, I have opened the issue on Litestar side as well as writing here: https://github.com/litestar-org/litestar/issues/3772.

  • Litestar + Uvicorn: synchronous cleanups work out of the box, but async ones are executed only when using anyio.CancelScope(shield=True),
  • Litestar + Granian: neither synchronous, nor asynchronous cleanups are not executed, with or without anyio.CancelScope(shield=True).

I will continue to investigate the issue with different frameworks and servers.

you can find in previous discussions all the details on why this is on hold

Yes, I understand that, I just wanted to point out that this issue may have another corner case that I highlighted.

vrslev avatar Oct 03 '24 09:10 vrslev

  • Litestar + Granian: neither synchronous, nor asynchronous cleanups are not executed, with or without anyio.CancelScope(shield=True).

I will continue to investigate the issue with different frameworks and servers.

So, after running your MRE locally, I can say what's the issue with Granian: you're blocking the iterator which will never send the final ASGI message to the server, and thus you won't either receive the disconnect event (which makes sense, the client is still connected waiting for more body).

Changing your snippet with this:

async def iter_something() -> typing.AsyncIterable[str]:
    async with asyncio.TaskGroup() as task_group:
        task = task_group.create_task(print_every_second())
        yield "hi"
        task.cancel()

makes everything work as intended. Not sure it that applies to your original issue as well, but again, I don't think this is related to the OP; neither this is an issue with Granian itself.

gi0baro avatar Oct 03 '24 10:10 gi0baro

async def iter_something() -> typing.AsyncIterable[str]:
    async with asyncio.TaskGroup() as task_group:
        task = task_group.create_task(print_every_second())
        yield "hi"
        task.cancel()

This snippet basically means this: yield once, and proactively cancel the task.

More realistic example would be this:

import asyncio
import typing

import litestar
from litestar.response import ServerSentEvent


async def print_every_second() -> typing.NoReturn:
    while True:
        print("hi")
        await asyncio.sleep(1)


async def iter_something() -> typing.AsyncIterable[str]:
    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(print_every_second())
        while True:
            yield "hi"
            await asyncio.sleep(1)


@litestar.get("/")
async def root() -> ServerSentEvent:
    return ServerSentEvent(iter_something())


app = litestar.Litestar([root])

SSE route generates data forever—and won't be interrupted when client cancels the request.

Uvicorn—on other hand—would send CancelledError into the async generator—task group would understand it and cancel the task by itself.

vrslev avatar Oct 03 '24 12:10 vrslev

async def iter_something() -> typing.AsyncIterable[str]:
    async with asyncio.TaskGroup() as task_group:
        task = task_group.create_task(print_every_second())
        yield "hi"
        task.cancel()

This function will exit without intervention by CancelledError: whether client cancels the request or not

vrslev avatar Oct 03 '24 12:10 vrslev

This snippet basically means this: yield once, and proactively cancel the task.

More realistic example would be this:

[...]

SSE route generates data forever—and won't be interrupted when client cancels the request.

No it doesn't. Tested that snippet locally and works as expected, as soon as I close the request on the client-side Granian logs that:

[WARNING] ASGI transport error: SendError { .. }

and sends the disconnect event to the receive handler as expected. Again, this is not an issue with Granian itself. Also, given your context is not the one the OP refers to, if you have anything to add please switch to a separated issue/discussion so we can stop bloating this one with un-related messages.

gi0baro avatar Oct 03 '24 13:10 gi0baro

Also, given your context is not the one the OP refers to, if you have anything to add please switch to a separated issue/discussion so we can stop bloating this one with un-related messages.

Sure: https://github.com/emmett-framework/granian/issues/412

vrslev avatar Oct 04 '24 11:10 vrslev