hypercorn icon indicating copy to clipboard operation
hypercorn copied to clipboard

disconnect detection broken when running with trio

Open jakkdl opened this issue 1 year ago • 7 comments

The problem

abort/disconnect events when a request is aborted before it is finished does not appear to propagate to the running application, when running with trio as backend. It does work on asyncio.

I'm currently trying to dig into the codebase and figure out what could be causing this, and can write a PR if/when I figure out a fix.

Repro (trio)

Setup server

> pip install -r requirements.txt
> python main.py
[2024-04-04 14:56:30 +0200] [242832] [INFO] Running on http://127.0.0.1:8080 (CTRL + C to quit)

On a different terminal:

> curl http://127.0.0.1:8080

or even with http2

> curl http://127.0.0.1:8080 --http2

Wait for a few secs, and abort the connection (CTRL + C). Notice that on server side, it us not able to detect the disconnection, and keeps on looping:

[2024-04-04 14:56:30 +0200] [242832] [INFO] Running on http://127.0.0.1:8080 (CTRL + C to quit)
request initialized
Checking for disconnection...
Checking for disconnection...
Checking for disconnection...
Checking for disconnection...
[2024-04-04 15:18:01 +0200] [243610] [INFO] 127.0.0.1:54280 - - [04/Apr/2024:15:18:01 +0200] "GET / 1.1" - - "-" "curl/8.7.1" <=== printed on disconnect with later versions of hypercorn
Checking for disconnection... <=== but the app keeps looping and printing
Checking for disconnection...
Checking for disconnection...

Code

main.py

from fastapi import FastAPI, Request
from hypercorn.config import Config
from hypercorn.trio import serve
import trio

app = FastAPI()


@app.get("/")
async def hello(request: Request) -> None:
    print("request initialized")
    while True:
        await trio.sleep(1)
        print("Checking for disconnection...")
        if await request.is_disconnected():
            print("This is never printed(!)")
            break


if __name__ == "__main__":
    config = Config()
    config.accesslog = "-"
    config.bind = ["localhost:8080"]
    config.loglevel = "DEBUG"

    trio.run(serve, app, config)

requirements.txt

anyio==3.7.1  # also tested with 4.3.0
fastapi==0.100.0 # also tested with 0.110.1
hypercorn==0.14.4  # or master/, or 0.16.0
starlette==0.27.0 # also tested with 0.37.2
trio==0.22.2 # also tested with 0.25.0

working example with asyncio

Setup server

> pip install -r requirements.txt
> python main.py
2024-03-27 13:26:30 -0700] [89671] [INFO] Running on http://127.0.0.1:8000 (CTRL + C to quit)

On a different terminal:

> curl http://127.0.0.1:8080

Wait for a few secs, and abort the connection (CTRL + C). On the server side it does detect the disconnect and kills the loop.

[2024-04-04 15:46:24 +0200] [244990] [INFO] Running on http://127.0.0.1:8080 (CTRL + C to quit)
request initialized
Checking for disconnection...
Checking for disconnection...
Checking for disconnection...
[2024-04-04 15:46:33 +0200] [244990] [INFO] 127.0.0.1:45102 - - [04/Apr/2024:15:46:33 +0200] "GET / 1.1" - - "-" "curl/8.7.1"
Checking for disconnection...
This is never printed on trio
[2024-04-04 15:46:33 +0200] [244990] [INFO] 127.0.0.1:45102 - - [04/Apr/2024:15:46:33 +0200] "GET / 1.1" 200 4 "-" "curl/8.7.1"
^C

Code

main.py (identical, except replacing trio with asyncio)

from fastapi import FastAPI, Request
from hypercorn.config import Config
from hypercorn.asyncio import serve
import asyncio

app = FastAPI()


@app.get("/")
async def hello(request: Request) -> None:
    print("request initialized")
    while True:
        await asyncio.sleep(1)
        print("Checking for disconnection...")
        if await request.is_disconnected():
            print("This is never printed on trio")
            break


if __name__ == "__main__":
    config = Config()
    config.accesslog = "-"
    config.bind = ["localhost:8080"]
    config.loglevel = "DEBUG"

    asyncio.run(serve(app, config))

requirements.txt

fastapi
hypercorn
starlette

system info

tested on linux, in clean virtualenvs and tox, on python 3.9, 3.11 and 3.12

jakkdl avatar Apr 04 '24 13:04 jakkdl

@davidbrochart Given that it seems to be a trio/asyncio problem I found #208 and subsequently tried out your fork https://github.com/davidbrochart/anycorn (the fix-tests branch). The repro fails there as well, both with trio and asyncio - so whatever magic that currently is making it work on asyncio seems to have been lost.

jakkdl avatar Apr 04 '24 14:04 jakkdl

After a lot of digging I'm pretty sure this is actually a bug in starlette and will open an issue there instead.

jakkdl avatar Apr 10 '24 14:04 jakkdl

Starlette has not escalated the discussion about this problem to a proper issue, but I have made several comments in there about my research around the problem: https://github.com/encode/starlette/discussions/2094

Hypercorn could work around it by creating a wrapper object for receiving that sidesteps trio checkpointing at the beginning of MemoryReceiveChannel.receive. This wrapper object would be used in src/hypercorn/trio/task_group.TaskGroup.spawn_app

class ReceiveWrapper:
    def __init__(self, receive_channel: trio.MemoryReceiveChannel[ASGIReceiveEvent]) -> None:
        self._receive_channel = receive_channel

    # starlette requires that a receiver does not checkpoint before attempting
    # to return the first event, which `trio.MemoryreceiveChannel.receive` does.
    async def receive(self) -> ASGIReceiveEvent:
        try:
            return self._receive_channel.receive_nowait()
        except trio.WouldBlock:
            pass
        await trio.lowlevel.checkpoint()
        return await self._receive_channel.receive()


class TaskGroup:
    def __init__(self) -> None:
        self._nursery: Optional[trio._core._run.Nursery] = None
        self._nursery_manager: Optional[trio._core._run.NurseryManager] = None

    async def spawn_app(
        self,
        app: AppWrapper,
        config: Config,
        scope: Scope,
        send: Callable[[Optional[ASGISendEvent]], Awaitable[None]],
    ) -> Callable[[ASGIReceiveEvent], Awaitable[None]]:
        app_send_channel, app_receive_channel = trio.open_memory_channel[ASGIReceiveEvent](
            config.max_app_queue_size
        )

        self._nursery.start_soon(
            _handle,
            app,
            config,
            scope,
            ReceiveWrapper(app_receive_channel).receive,
            send,
            trio.to_thread.run_sync,
            trio.from_thread.run,
        )
        return app_send_channel.send

But the ASGI spec certainly doesn't require that receive callables cannot checkpoint before attempting to return a value, and I don't know if any other ASGI frameworks have any similar issues, so I could totally see if the maintainers of this project don't care to implement a framework-specific workaround. But if that would be welcomed I can make a PR @pgjones

jakkdl avatar Apr 11 '24 14:04 jakkdl

@jakkdl I opened https://github.com/davidbrochart/anycorn/pull/8, if using anycorn could be an option for you.

davidbrochart avatar Apr 11 '24 14:04 davidbrochart

@jakkdl I opened davidbrochart/anycorn#8, if using anycorn could be an option for you.

thanks! I'll let the-people-that-makes-those-decisions know :)

jakkdl avatar Apr 11 '24 15:04 jakkdl

I'm thinking I need to alter the documentation to discourage trying to detect disconnections and rather ensure that the right context managers/finally clauses are used to cleanup. Quart will cancel the request handling task on disconnect so the is_disconnected method seems odd to me.

pgjones avatar May 28 '24 20:05 pgjones

I'm thinking I need to alter the documentation to discourage trying to detect disconnections and rather ensure that the right context managers/finally clauses are used to cleanup. Quart will cancel the request handling task on disconnect so the is_disconnected method seems odd to me.

yeah after attempting to write different potential fixes for starlette I mostly settled on the current interface being very bad, and there being better ways of handling it. If the disconnect occurs while receiving data with .body()/.json()/.stream() you get a ClientDisconnect you can handle, and when sending the response you should use a StreamingResponse which spawns a TaskGroup that handles disconnects.

jakkdl avatar May 29 '24 11:05 jakkdl