fastapi icon indicating copy to clipboard operation
fastapi copied to clipboard

[QUESTION] How to properly shut down websockets waiting forever

Open danielgtaylor opened this issue 5 years ago • 8 comments

Description

What is the proper way to shut down the server when you have a long-running websocket reading data from e.g. Redis or a DB waiting in a while True loop for new messages? For example, given this complete example:

import asyncio
from typing import AsyncGenerator

from fastapi import FastAPI
from starlette.responses import HTMLResponse
from starlette.websockets import WebSocket

app = FastAPI()

html = """
<!DOCTYPE html>
<html>
    <body>
        <script>
            var ws = new WebSocket("ws://localhost:8000/notifications");
            // Processing of messages goes here...
        </script>
    </body>
</html>
"""


@app.get("/")
async def get() -> HTMLResponse:
    return HTMLResponse(html)


async def subscribe(channel: str) -> AsyncGenerator[dict, None]:
    """Fake method which would normally go to Redis/DB and wait."""
    while True:
        await asyncio.sleep(600)
        yield {"hello": "world"}


@app.websocket("/notifications")
async def notifications_handler(websocket: WebSocket) -> None:
    await websocket.accept()

    async for msg in subscribe("*"):
        await websocket.send_json(msg)

If you run it, then go to http://localhost:8000/ in a browser (it'll load a blank page), then try to ctrl+c you get this:

$ uvicorn demo:app
INFO:     Started server process [48444]
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     127.0.0.1:54369 - "GET / HTTP/1.1" 200 OK
INFO:     ('127.0.0.1', 54371) - "WebSocket /notifications" [accepted]
^CINFO:     Shutting down
INFO:     Waiting for background tasks to complete. (CTRL+C to force quit)

It just hangs and never shuts down properly. What's the right way to handle this? The shutdown server event isn't fired until after all background tasks have completed so it's not useful for this either.

Additional context

So far I have come up with the following code, which creates an additional asyncio task that is not linked to uvicorn/starlette and so is ignored in the shutdown path, allowing a shutdown handler to be called that can close the DB server.

import asyncio
from typing import AsyncGenerator

from fastapi import FastAPI
from starlette.responses import HTMLResponse
from starlette.websockets import WebSocket

app = FastAPI()

html = """
<!DOCTYPE html>
<html>
    <body>
        <script>
            var ws = new WebSocket("ws://localhost:8000/notifications");
            // Processing of messages goes here...
        </script>
    </body>
</html>
"""


@app.get("/")
async def get() -> HTMLResponse:
    return HTMLResponse(html)


@app.on_event("shutdown")
async def shutdown() -> None:
    # Close the Redis or DB connection here
    # await redis.close()
    pass


async def subscribe(channel: str) -> AsyncGenerator[dict, None]:
    """Fake method which would normally go to Redis/DB and wait."""
    while True:
        # When using a real Redis or DB this await will return immediately
        # when the server shuts down.
        await asyncio.sleep(600)
        yield {"hello": "world"}


@app.websocket("/notifications")
async def notifications_handler(websocket: WebSocket) -> None:
    await websocket.accept()

    async def _handler() -> None:
        async for msg in subscribe("*"):
            print("Got message!", msg)
            await websocket.send_json(msg)

    # Create the handler as an unmanaged background task so the server won't
    # wait on it forever during shutdown.
    asyncio.create_task(_handler())

    # Use a long-blocking read to keep the socket alive in memory while the
    # above background task works. During shutdown this loop will exit, which
    # in turn will cause the above handler to exit as well.
    while True:
        try:
            print(await websocket.receive())
        except Exception:
            break

This is not ideal since:

  1. It creates two tasks for each incoming request
  2. It won't work for bidirectional websockets (it works for my use case of push notifications)

So what is the right way to ensure a quick and clean shutdown?

danielgtaylor avatar Nov 14 '19 06:11 danielgtaylor

I found the cleanest way is to subclass Starlette WebsocketEndpoint

Le jeu. 14 nov. 2019 à 7:30 AM, Daniel G. Taylor [email protected] a écrit :

Description

What is the proper way to shut down the server when you have a long-running websocket reading data from e.g. Redis or a DB waiting in a while True loop for new messages? For example, given this complete example:

import asynciofrom typing import AsyncGenerator from fastapi import FastAPIfrom starlette.responses import HTMLResponsefrom starlette.websockets import WebSocket

app = FastAPI()

html = """

"""

@app.get("/")async def get() -> HTMLResponse: return HTMLResponse(html)

async def subscribe(channel: str) -> AsyncGenerator[dict, None]: """Fake method which would normally go to Redis/DB and wait.""" while True: await asyncio.sleep(600) yield {"hello": "world"}

@app.websocket("/notifications")async def notifications_handler(websocket: WebSocket) -> None: await websocket.accept()

async for msg in subscribe("*"):
    await websocket.send_json(msg)

If you run it, then go to http://localhost:8000/ in a browser (it'll load a blank page), then try to ctrl+c you get this:

$ uvicorn demo:app INFO: Started server process [48444] INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit) INFO: Waiting for application startup. INFO: Application startup complete. INFO: 127.0.0.1:54369 - "GET / HTTP/1.1" 200 OK INFO: ('127.0.0.1', 54371) - "WebSocket /notifications" [accepted] ^CINFO: Shutting down INFO: Waiting for background tasks to complete. (CTRL+C to force quit)

It just hangs and never shuts down properly. What's the right way to handle this? The shutdown server event isn't fired until after all background tasks have completed so it's not useful for this either.

Additional context

So far I have come up with the following code, which creates an additional asyncio task that is not linked to uvicorn/starlette and so is ignored in the shutdown path, allowing a shutdown handler to be called that can close the DB server.

import asynciofrom typing import AsyncGenerator from fastapi import FastAPIfrom starlette.responses import HTMLResponsefrom starlette.websockets import WebSocket

app = FastAPI()

html = """

"""

@app.get("/")async def get() -> HTMLResponse: return HTMLResponse(html)

@app.on_event("shutdown")async def shutdown() -> None: # Close the Redis or DB connection here # await redis.close() pass

async def subscribe(channel: str) -> AsyncGenerator[dict, None]: """Fake method which would normally go to Redis/DB and wait.""" while True: # When using a real Redis or DB this await will return immediately # when the server shuts down. await asyncio.sleep(600) yield {"hello": "world"}

@app.websocket("/notifications")async def notifications_handler(websocket: WebSocket) -> None: await websocket.accept()

async def _handler() -> None:
    async for msg in subscribe("*"):
        print("Got message!", msg)
        await websocket.send_json(msg)

# Create the handler as an unmanaged background task so the server won't
# wait on it forever during shutdown.
asyncio.create_task(_handler())

# Use a long-blocking read to keep the socket alive in memory while the
# above background task works. During shutdown this loop will exit, which
# in turn will cause the above handler to exit as well.
while True:
    try:
        print(await websocket.receive())
    except Exception:
        break

This is not ideal since:

  1. It creates two tasks for each incoming request
  2. It won't work for bidirectional websockets (it works for my use case of push notifications)

So what is the right way to ensure a quick and clean shutdown?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/tiangolo/fastapi/issues/709?email_source=notifications&email_token=AAINSPWC5FJC4TRCT6OZH4LQTTV75A5CNFSM4JNGR5T2YY3PNVWWK3TUL52HS4DFUVEXG43VMWVGG33NNVSW45C7NFSM4HZHCE2A, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAINSPRBIQU4KGIPRQPMNRLQTTV75ANCNFSM4JNGR5TQ .

euri10 avatar Nov 14 '19 06:11 euri10

Back on a proper pc, I think I'm doing kind of the same of what you're trying to do but with rabbitmq in https://gitlab.com/euri10/celeryasgimon/tree/master/backend/app/main.py Here you'll find a snippet of a ws waiting indefinitely to consume the broker's messages, it handles shutdown gently

euri10 avatar Nov 14 '19 06:11 euri10

Just to add an example solution with WebSocketEndpoint (starlette) for people who find this via ~~google~~ duckduckgo

I'm relaying all messages on an asyncio.Queue to a connected client

class StatusSub(WebSocketEndpoint):
    async def on_connect(self, websocket, **kwargs):
        global state
        self.states = state.sub()
        await websocket.accept()
        await websocket.send_json(state.state)

        self.task = asyncio.create_task(self.send_events(websocket))

    async def send_events(self, websocket):
        async for state in iter_queue(self.states):
            await websocket.send_json(state)

        await websocket.close()

    async def on_disconnect(self, websocket, close_code):
        self.task.cancel()

Personally I'd like to see an event be emitted before shutdown, But that might need to be implemented in uvicorn, I'm not familiar enough with the internals to know.

EDIT: My first implementation was broken, Overly eager refactor, lol

UlisseMini avatar May 27 '20 22:05 UlisseMini

Is there any plan to add graceful websocket disconnects on shutdown into FastAPI directly?

As far as I can tell currently, it seems as though there are no means to return anything other than 1006 status codes to any active websockets when the app shuts down. I imagine this might be a uvicorn limitation, but it'd be great if there was a way to return 1001 if exiting gracefully.

cha-king avatar Dec 12 '21 01:12 cha-king

@cha-king I thnk you are right, this is impossible in FastAPI.

As a workaround, this is the code I use for graceful shutdown.

from asyncio import *
from uvicorn import Server

sessions = {}

class CustomServer(Server):
    async def shutdown(self, sockets=None):
        for session in list(sessions.values()):
            session['task'].cancel()
            await wait_for(session['closed'].wait(), 5)

        await super().shutdown(sockets)

@router.websocket("/sock")
async def sock(sock: WebSocket):    
    sessions[id(session)] = dict(task=current_task(), closed=Event())

    try:
        ...
    except CancelledError:
       ...
    finally:
        session.closed.set()
        del sessions[id(session)]

akvadrako avatar Dec 12 '21 10:12 akvadrako

Hi,

For Fast API this works for me, when client is disconnected:

@router.websocket("/tst")
async def tst(websocket: WebSocket) -> None:
    async def send_data():
        while websocket.client_state == WebSocketState.CONNECTED:
           await websocket.send_text("hi")
           await asyncio.sleep(1)

    async def watch_status():
        while websocket.client_state == WebSocketState.CONNECTED:
            await websocket.receive()

    await websocket.accept()
    await asyncio.gather(watch_status(), send_data())

matiuszka avatar Dec 15 '21 11:12 matiuszka

@matiuszka Thank you, but I believe your solution will still send a 1006 status code to clients when the server shuts down.

cha-king avatar Dec 15 '21 17:12 cha-king

Hi, I want to share this simple solution (It might be useful for someone) :

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse

app = FastAPI()

html = """
<!DOCTYPE html>
<html>
    <body>
        <script>
            var ws = new WebSocket("ws://localhost:8000/notifications");
            // Processing of messages goes here...
        </script>
    </body>
</html>
"""

active_connections_set = set()

@app.get("/")
async def get():
    return HTMLResponse(html)


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    active_connections_set.add(websocket)
    while True:
        try:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message text was: {data}")
        except WebSocketDisconnect:
            # handle disconnected clients
            pass

@app.on_event("shutdown")
async def shutdown():
    for websocket in active_connections_set:
        await websocket.close(code=1001)

This piece of code works for both cases (disconnected clients and shutdown server).

hilaz87 avatar Jan 13 '23 17:01 hilaz87