aiohttp icon indicating copy to clipboard operation
aiohttp copied to clipboard

WebSocketResponse.close code/message don't reach the client

Open JoseKilo opened this issue 7 years ago • 8 comments

Long story short

Following this example from the docs https://docs.aiohttp.org/en/stable/web_advanced.html?highlight=going_away#graceful-shutdown

The code and message passed to ws.close don't reach the client side and instead, a default code and message is used.

Expected behaviour

I would expect that the code and message should reach the client. If I'm wrong, the documentation could explain why this is not the case.

Actual behaviour

Default code and message are returned (code=1000, message=b'')

Steps to reproduce

The commented-out asserts fail.

import weakref

import aiohttp.http
import aiohttp.web


async def websocket_handler(request):
    websocket = aiohttp.web.WebSocketResponse()
    await websocket.prepare(request)
    request.app["websockets"].add(websocket)

    try:
        async for message in websocket:
            await websocket.send_json({"ok": True, "message": message.json()})
    finally:
        request.app["websockets"].discard(websocket)

    return websocket


async def on_shutdown(app):
    for websocket in set(app["websockets"]):
        await websocket.close(
            code=aiohttp.WSCloseCode.GOING_AWAY,
            message="Server shutdown",
        )


async def test_foo(aiohttp_client):
    url = "/ws"
    app = aiohttp.web.Application()
    app["websockets"] = weakref.WeakSet()
    app.router.add_get(url, websocket_handler)
    app.on_shutdown.append(on_shutdown)

    client = await aiohttp_client(app)

    websocket = await client.ws_connect(url)

    message = {"message": "hi"}
    await websocket.send_json(message)
    reply = await websocket.receive_json()
    assert reply == {"ok": True, "message": message}

    await app.shutdown()

    assert websocket.closed is False

    reply = await websocket.receive()

    assert reply.type == aiohttp.http.WSMsgType.CLOSE
    assert reply.data == aiohttp.WSCloseCode.OK
    assert reply.extra == ""
    # assert reply.data == aiohttp.WSCloseCode.GOING_AWAY
    # assert reply.extra == "Server shutdown"
    assert websocket.closed is True

Your environment

Python 3.6.7

aiohttp==3.4.4
pytest==4.0.1
pytest-aiohttp==0.3.0

JoseKilo avatar Dec 12 '18 09:12 JoseKilo

GitMate.io thinks the contributor most likely able to help you is @asvetlov.

Possibly related issues are https://github.com/aio-libs/aiohttp/issues/3391 (WebSocketResponse requires closing/closed checks before sending something), https://github.com/aio-libs/aiohttp/issues/2309 (WebSocketResponse does not throw/close when connection is abruptly cut), https://github.com/aio-libs/aiohttp/issues/1043 (Automatically close unclosed responses returned by test client), https://github.com/aio-libs/aiohttp/issues/2595 (Server handler halts after client connection closed), and https://github.com/aio-libs/aiohttp/issues/694 (Async iterator never returns close message).

aio-libs-bot avatar Dec 12 '18 09:12 aio-libs-bot

@JoseKilo Hi!

This tangled history, in the documentation bug.

You need to have a separate WeakSet for the on_shutdown task, or you can not remove it from app['websockets'] if you use it only for shutdown

I added app['shutdown_websockets'] and now my test works correctly.

import weakref

import aiohttp.http
import aiohttp.web


async def websocket_handler(request):
    websocket = aiohttp.web.WebSocketResponse()
    await websocket.prepare(request)
    request.app["websockets"].add(websocket)

    request.app["shutdown_websockets"].add(websocket)

    try:
        async for message in websocket:
            await websocket.send_json({"ok": True, "message": message.json()})
    finally:
        request.app["websockets"].discard(websocket)

    return websocket


async def on_shutdown(app):
    while app['shutdown_websockets']:
        websocket = app['shutdown_websockets'].pop()
        await websocket.close(
            code=aiohttp.WSCloseCode.GOING_AWAY,
            message="Server shutdown",
        )


async def test_foo(aiohttp_client):
    url = "/ws"
    app = aiohttp.web.Application()
    app["websockets"] = weakref.WeakSet()

    # need for send signal shutdown server
    app["shutdown_websockets"] = weakref.WeakSet()

    app.router.add_get(url, websocket_handler)
    app.on_shutdown.append(on_shutdown)

    client = await aiohttp_client(app)

    websocket = await client.ws_connect(url)

    message = {"message": "hi"}
    await websocket.send_json(message)
    reply = await websocket.receive_json()
    assert reply == {"ok": True, "message": message}

    await app.shutdown()

    assert websocket.closed is False

    reply = await websocket.receive()

    assert reply.type == aiohttp.http.WSMsgType.CLOSE
    assert reply.data == aiohttp.WSCloseCode.GOING_AWAY
    assert reply.extra == "Server shutdown"

    assert websocket.closed is True

vir-mir avatar Jan 06 '19 05:01 vir-mir

@JoseKilo thought of your question.

@asvetlov @jettify @webknjaz it really turns out that you have to intervene in the logic of the iteration, to organize the proper cleaning of dead sockets and use on_shutdown.

You can use this approach. I created async iterator RespIter to change the logic of the iteration of WebSocketResponse

This allows you to clear the app [" websockets "] before exiting the iteration loop. which will allow the use of app [" websockets "] by leaving the server in shutdown.

class RespIter(AsyncIterator):
    __slots__ = ('_response', '_websockets')

    def __init__(self, response, websockets):
        self._response = response
        self._websockets = websockets

    async def __anext__(self):
        try:
            return await self._response.__anext__()
        except StopAsyncIteration:
            self._websockets.discard(self._response)
            raise StopAsyncIteration

You also need to change the iteration loop and clear the app ['websockets'] only in case of an error.

async def websocket_handler(request):
    websocket = aiohttp.web.WebSocketResponse()
    await websocket.prepare(request)

    request.app["websockets"].add(websocket)

    try:
        async for message in RespIter(websocket, request.app["websockets"]):
            await websocket.send_json({"ok": True, "message": message.json()})
    except Exception as e:
        request.app["websockets"].discard(websocket)
        raise e

Full code and tests:

import weakref
from collections.abc import AsyncIterator

import aiohttp.http
import aiohttp.web


class RespIter(AsyncIterator):
    __slots__ = ('_response', '_websockets')

    def __init__(self, response, websockets):
        self._response = response
        self._websockets = websockets

    async def __anext__(self):
        try:
            return await self._response.__anext__()
        except StopAsyncIteration:
            self._websockets.discard(self._response)
            raise StopAsyncIteration


async def websocket_handler(request):
    websocket = aiohttp.web.WebSocketResponse()
    await websocket.prepare(request)

    request.app["websockets"].add(websocket)

    try:
        async for message in RespIter(websocket, request.app["websockets"]):
            await websocket.send_json({"ok": True, "message": message.json()})
    except Exception as e:
        request.app["websockets"].discard(websocket)
        raise e


async def on_shutdown(app):
    while app['websockets']:
        websocket = app['websockets'].pop()
        await websocket.close(
            code=aiohttp.WSCloseCode.GOING_AWAY,
            message="Server shutdown",
        )


async def test_foo(aiohttp_client):
    url = "/ws"
    app = aiohttp.web.Application()
    app["websockets"] = weakref.WeakSet()

    app.router.add_get(url, websocket_handler)
    app.on_shutdown.append(on_shutdown)

    client = await aiohttp_client(app)

    websocket = await client.ws_connect(url)
    assert len(app["websockets"]) == 1
    await websocket.close()
    assert len(app["websockets"]) == 0

    websocket = await client.ws_connect(url)
    assert len(app["websockets"]) == 1
    await websocket.send_str('asdc')
    reply = await websocket.receive()
    assert reply.type == aiohttp.http.WSMsgType.CLOSED
    assert len(app["websockets"]) == 0

    websocket = await client.ws_connect(url)
    assert len(app["websockets"]) == 1
    message = {"message": "hi"}
    await websocket.send_json(message)
    reply = await websocket.receive_json()
    assert reply == {"ok": True, "message": message}

    await app.shutdown()

    assert websocket.closed is False

    reply = await websocket.receive()

    assert len(app["websockets"]) == 0

    assert reply.type == aiohttp.http.WSMsgType.CLOSE
    assert reply.data == aiohttp.WSCloseCode.GOING_AWAY
    assert reply.extra == "Server shutdown"

    assert websocket.closed is True

vir-mir avatar Jan 07 '19 10:01 vir-mir

Bump issue.

@vir-mir In you example, if websocket_handler return websocket var, test will fail. In docs handler always return WebSocketResponse. So, in my tests when i use wscat and handler without return (return None) i got

Unhandled runtime exception
Traceback (most recent call last):
  File "aiohttp/web_protocol.py", line 580, in finish_response
    prepare_meth = resp.prepare
AttributeError: 'NoneType' object has no attribute 'prepare'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "aiohttp/web_protocol.py", line 485, in start
    resp, reset = await task
  File "aiohttp/web_protocol.py", line 440, in _handle_request
    reset = await self.finish_response(request, resp, start_time)
  File "aiohttp/web_protocol.py", line 583, in finish_response
    raise RuntimeError("Missing return " "statement on request handler")
RuntimeError: Missing return statement on request handler

Any ideas?

Roman1us avatar Dec 30 '20 13:12 Roman1us

Bump issue.

@vir-mir In you example, if websocket_handler return websocket var, test will fail. In docs handler always return WebSocketResponse. So, in my tests when i use wscat and handler without return (return None) i got

Unhandled runtime exception
Traceback (most recent call last):
  File "aiohttp/web_protocol.py", line 580, in finish_response
    prepare_meth = resp.prepare
AttributeError: 'NoneType' object has no attribute 'prepare'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "aiohttp/web_protocol.py", line 485, in start
    resp, reset = await task
  File "aiohttp/web_protocol.py", line 440, in _handle_request
    reset = await self.finish_response(request, resp, start_time)
  File "aiohttp/web_protocol.py", line 583, in finish_response
    raise RuntimeError("Missing return " "statement on request handler")
RuntimeError: Missing return statement on request handler

Any ideas?

@Roman1us were you able to find solution for the above error? I am facing the same issue in my code.

PARTHAVPATEL28 avatar Apr 09 '21 02:04 PARTHAVPATEL28

This problem goes beyond custom code not reaching the client during application shutdown. I observe the same problem (aiohttp 3.8.1) when calling WebSocketResponse.close() during normal application operation, eg. when I want to break a WS connection if the client is no longer allowed to receive notifications.

The root cause seems to be WebSocketResponse.close() closing the reader (reader.feed_data(WS_CLOSING_MESSAGE, 0)) before actually sending the code itself. This causes the reader loop (async for message in websocket) to finish and the handler to exit. This causes the second call to WebSocketResponse.close() which finishes before the first call resumes. So the readers's closure "steals" the opportunity to send the final code and sends the default one.

The workaround I found isn't pretty because there is no public API to await the first close() until it finishes. Inside the handler, after the reader loop and before returning the response I added:

    while not websocket.closed:
        await asyncio.sleep(.1)

This ensures that the second call to close() by the reader after the WebSocketResponse is returned from the handler will omit the "stealing" part due to the if not self._closed condition. The first close() will win the race even if it resumes later than the second call.

wodny avatar Dec 30 '21 11:12 wodny

Thanks @wodny - we've been scratching our heads on this exact issue, and the workaround you mentioned is functional, if not pretty as you said.

AirbornePorcine avatar Jan 11 '22 17:01 AirbornePorcine

@wodny Thanks a lot for figuring this out, you're a savior

eLvErDe avatar Nov 08 '23 14:11 eLvErDe

@bdraco Maybe one you'd be interested in? If you take the reproducer in the original post, this is based on our documentation examples, but doesn't work correctly.

There seems to be a race condition. First we call .close() in the shutdown handler, which internally will end up awaiting ._close_wait. Then when we return from the handler, that also calls .close() (with a different status code), but ._waiting is now False because we've exited the receive loop and it immediately continues with the close logic. Then the first .close() call gets to resume, but it's already closed and sent the message by then, so it returns immediately without ever sending that first close message.

Dreamsorcerer avatar Aug 11 '24 00:08 Dreamsorcerer

I’ll add it to my queue

bdraco avatar Aug 11 '24 01:08 bdraco

I'm not sure if it's necessarily the correct solution to this, but if we yield to the event loop before exiting the async for loop, then it works correctly:

    async def __anext__(self) -> WSMessage:
        msg = await self.receive()
        if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
            await asyncio.sleep(0)  # <---
            raise StopAsyncIteration
        return msg

Dreamsorcerer avatar Aug 11 '24 11:08 Dreamsorcerer

Reordering breaking the receive with the feed_data to after sending the close message, but before the reader.read() calls to fix the race.

I made that change in https://github.com/aio-libs/aiohttp/pull/8680 and adapted the above test. It passes now

bdraco avatar Aug 11 '24 12:08 bdraco