WebSocketResponse.close code/message don't reach the client
Long story short
Following this example from the docs https://docs.aiohttp.org/en/stable/web_advanced.html?highlight=going_away#graceful-shutdown
The code and message passed to ws.close don't reach the client side and instead, a default code and message is used.
Expected behaviour
I would expect that the code and message should reach the client. If I'm wrong, the documentation could explain why this is not the case.
Actual behaviour
Default code and message are returned (code=1000, message=b'')
Steps to reproduce
The commented-out asserts fail.
import weakref
import aiohttp.http
import aiohttp.web
async def websocket_handler(request):
websocket = aiohttp.web.WebSocketResponse()
await websocket.prepare(request)
request.app["websockets"].add(websocket)
try:
async for message in websocket:
await websocket.send_json({"ok": True, "message": message.json()})
finally:
request.app["websockets"].discard(websocket)
return websocket
async def on_shutdown(app):
for websocket in set(app["websockets"]):
await websocket.close(
code=aiohttp.WSCloseCode.GOING_AWAY,
message="Server shutdown",
)
async def test_foo(aiohttp_client):
url = "/ws"
app = aiohttp.web.Application()
app["websockets"] = weakref.WeakSet()
app.router.add_get(url, websocket_handler)
app.on_shutdown.append(on_shutdown)
client = await aiohttp_client(app)
websocket = await client.ws_connect(url)
message = {"message": "hi"}
await websocket.send_json(message)
reply = await websocket.receive_json()
assert reply == {"ok": True, "message": message}
await app.shutdown()
assert websocket.closed is False
reply = await websocket.receive()
assert reply.type == aiohttp.http.WSMsgType.CLOSE
assert reply.data == aiohttp.WSCloseCode.OK
assert reply.extra == ""
# assert reply.data == aiohttp.WSCloseCode.GOING_AWAY
# assert reply.extra == "Server shutdown"
assert websocket.closed is True
Your environment
Python 3.6.7
aiohttp==3.4.4
pytest==4.0.1
pytest-aiohttp==0.3.0
GitMate.io thinks the contributor most likely able to help you is @asvetlov.
Possibly related issues are https://github.com/aio-libs/aiohttp/issues/3391 (WebSocketResponse requires closing/closed checks before sending something), https://github.com/aio-libs/aiohttp/issues/2309 (WebSocketResponse does not throw/close when connection is abruptly cut), https://github.com/aio-libs/aiohttp/issues/1043 (Automatically close unclosed responses returned by test client), https://github.com/aio-libs/aiohttp/issues/2595 (Server handler halts after client connection closed), and https://github.com/aio-libs/aiohttp/issues/694 (Async iterator never returns close message).
@JoseKilo Hi!
This tangled history, in the documentation bug.
You need to have a separate WeakSet for the on_shutdown task, or you can not remove it from app['websockets'] if you use it only for shutdown
I added app['shutdown_websockets'] and now my test works correctly.
import weakref
import aiohttp.http
import aiohttp.web
async def websocket_handler(request):
websocket = aiohttp.web.WebSocketResponse()
await websocket.prepare(request)
request.app["websockets"].add(websocket)
request.app["shutdown_websockets"].add(websocket)
try:
async for message in websocket:
await websocket.send_json({"ok": True, "message": message.json()})
finally:
request.app["websockets"].discard(websocket)
return websocket
async def on_shutdown(app):
while app['shutdown_websockets']:
websocket = app['shutdown_websockets'].pop()
await websocket.close(
code=aiohttp.WSCloseCode.GOING_AWAY,
message="Server shutdown",
)
async def test_foo(aiohttp_client):
url = "/ws"
app = aiohttp.web.Application()
app["websockets"] = weakref.WeakSet()
# need for send signal shutdown server
app["shutdown_websockets"] = weakref.WeakSet()
app.router.add_get(url, websocket_handler)
app.on_shutdown.append(on_shutdown)
client = await aiohttp_client(app)
websocket = await client.ws_connect(url)
message = {"message": "hi"}
await websocket.send_json(message)
reply = await websocket.receive_json()
assert reply == {"ok": True, "message": message}
await app.shutdown()
assert websocket.closed is False
reply = await websocket.receive()
assert reply.type == aiohttp.http.WSMsgType.CLOSE
assert reply.data == aiohttp.WSCloseCode.GOING_AWAY
assert reply.extra == "Server shutdown"
assert websocket.closed is True
@JoseKilo thought of your question.
@asvetlov @jettify @webknjaz it really turns out that you have to intervene in the logic of the iteration, to organize the proper cleaning of dead sockets and use on_shutdown.
You can use this approach.
I created async iterator RespIter to change the logic of the iteration of WebSocketResponse
This allows you to clear the app [" websockets "] before exiting the iteration loop. which will allow the use of app [" websockets "] by leaving the server in shutdown.
class RespIter(AsyncIterator):
__slots__ = ('_response', '_websockets')
def __init__(self, response, websockets):
self._response = response
self._websockets = websockets
async def __anext__(self):
try:
return await self._response.__anext__()
except StopAsyncIteration:
self._websockets.discard(self._response)
raise StopAsyncIteration
You also need to change the iteration loop and clear the app ['websockets'] only in case of an error.
async def websocket_handler(request):
websocket = aiohttp.web.WebSocketResponse()
await websocket.prepare(request)
request.app["websockets"].add(websocket)
try:
async for message in RespIter(websocket, request.app["websockets"]):
await websocket.send_json({"ok": True, "message": message.json()})
except Exception as e:
request.app["websockets"].discard(websocket)
raise e
Full code and tests:
import weakref
from collections.abc import AsyncIterator
import aiohttp.http
import aiohttp.web
class RespIter(AsyncIterator):
__slots__ = ('_response', '_websockets')
def __init__(self, response, websockets):
self._response = response
self._websockets = websockets
async def __anext__(self):
try:
return await self._response.__anext__()
except StopAsyncIteration:
self._websockets.discard(self._response)
raise StopAsyncIteration
async def websocket_handler(request):
websocket = aiohttp.web.WebSocketResponse()
await websocket.prepare(request)
request.app["websockets"].add(websocket)
try:
async for message in RespIter(websocket, request.app["websockets"]):
await websocket.send_json({"ok": True, "message": message.json()})
except Exception as e:
request.app["websockets"].discard(websocket)
raise e
async def on_shutdown(app):
while app['websockets']:
websocket = app['websockets'].pop()
await websocket.close(
code=aiohttp.WSCloseCode.GOING_AWAY,
message="Server shutdown",
)
async def test_foo(aiohttp_client):
url = "/ws"
app = aiohttp.web.Application()
app["websockets"] = weakref.WeakSet()
app.router.add_get(url, websocket_handler)
app.on_shutdown.append(on_shutdown)
client = await aiohttp_client(app)
websocket = await client.ws_connect(url)
assert len(app["websockets"]) == 1
await websocket.close()
assert len(app["websockets"]) == 0
websocket = await client.ws_connect(url)
assert len(app["websockets"]) == 1
await websocket.send_str('asdc')
reply = await websocket.receive()
assert reply.type == aiohttp.http.WSMsgType.CLOSED
assert len(app["websockets"]) == 0
websocket = await client.ws_connect(url)
assert len(app["websockets"]) == 1
message = {"message": "hi"}
await websocket.send_json(message)
reply = await websocket.receive_json()
assert reply == {"ok": True, "message": message}
await app.shutdown()
assert websocket.closed is False
reply = await websocket.receive()
assert len(app["websockets"]) == 0
assert reply.type == aiohttp.http.WSMsgType.CLOSE
assert reply.data == aiohttp.WSCloseCode.GOING_AWAY
assert reply.extra == "Server shutdown"
assert websocket.closed is True
Bump issue.
@vir-mir In you example, if websocket_handler return websocket var, test will fail. In docs handler always return WebSocketResponse. So, in my tests when i use wscat and handler without return (return None) i got
Unhandled runtime exception
Traceback (most recent call last):
File "aiohttp/web_protocol.py", line 580, in finish_response
prepare_meth = resp.prepare
AttributeError: 'NoneType' object has no attribute 'prepare'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "aiohttp/web_protocol.py", line 485, in start
resp, reset = await task
File "aiohttp/web_protocol.py", line 440, in _handle_request
reset = await self.finish_response(request, resp, start_time)
File "aiohttp/web_protocol.py", line 583, in finish_response
raise RuntimeError("Missing return " "statement on request handler")
RuntimeError: Missing return statement on request handler
Any ideas?
Bump issue.
@vir-mir In you example, if websocket_handler return websocket var, test will fail. In docs handler always return WebSocketResponse. So, in my tests when i use wscat and handler without return (return None) i got
Unhandled runtime exception Traceback (most recent call last): File "aiohttp/web_protocol.py", line 580, in finish_response prepare_meth = resp.prepare AttributeError: 'NoneType' object has no attribute 'prepare' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "aiohttp/web_protocol.py", line 485, in start resp, reset = await task File "aiohttp/web_protocol.py", line 440, in _handle_request reset = await self.finish_response(request, resp, start_time) File "aiohttp/web_protocol.py", line 583, in finish_response raise RuntimeError("Missing return " "statement on request handler") RuntimeError: Missing return statement on request handlerAny ideas?
@Roman1us were you able to find solution for the above error? I am facing the same issue in my code.
This problem goes beyond custom code not reaching the client during application shutdown. I observe the same problem (aiohttp 3.8.1) when calling WebSocketResponse.close() during normal application operation, eg. when I want to break a WS connection if the client is no longer allowed to receive notifications.
The root cause seems to be WebSocketResponse.close() closing the reader (reader.feed_data(WS_CLOSING_MESSAGE, 0)) before actually sending the code itself. This causes the reader loop (async for message in websocket) to finish and the handler to exit. This causes the second call to WebSocketResponse.close() which finishes before the first call resumes. So the readers's closure "steals" the opportunity to send the final code and sends the default one.
The workaround I found isn't pretty because there is no public API to await the first close() until it finishes. Inside the handler, after the reader loop and before returning the response I added:
while not websocket.closed:
await asyncio.sleep(.1)
This ensures that the second call to close() by the reader after the WebSocketResponse is returned from the handler will omit the "stealing" part due to the if not self._closed condition. The first close() will win the race even if it resumes later than the second call.
Thanks @wodny - we've been scratching our heads on this exact issue, and the workaround you mentioned is functional, if not pretty as you said.
@wodny Thanks a lot for figuring this out, you're a savior
@bdraco Maybe one you'd be interested in? If you take the reproducer in the original post, this is based on our documentation examples, but doesn't work correctly.
There seems to be a race condition. First we call .close() in the shutdown handler, which internally will end up awaiting ._close_wait. Then when we return from the handler, that also calls .close() (with a different status code), but ._waiting is now False because we've exited the receive loop and it immediately continues with the close logic. Then the first .close() call gets to resume, but it's already closed and sent the message by then, so it returns immediately without ever sending that first close message.
I’ll add it to my queue
I'm not sure if it's necessarily the correct solution to this, but if we yield to the event loop before exiting the async for loop, then it works correctly:
async def __anext__(self) -> WSMessage:
msg = await self.receive()
if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
await asyncio.sleep(0) # <---
raise StopAsyncIteration
return msg
Reordering breaking the receive with the feed_data to after sending the close message, but before the reader.read() calls to fix the race.
I made that change in https://github.com/aio-libs/aiohttp/pull/8680 and adapted the above test. It passes now