python-socketio
python-socketio copied to clipboard
Connection silently drops after large message
Description It seems like when you send a big message to the websocket server, the message is not received and subsequent messages are silently dropped.
Code To Reproduce
Client:
Note that we send different sizes of messages to the server (message_large
, message_small
).
Only the first message_small
is received by the server properly and the subsequent message_large
and message_small
are being dropped. It seems like messages with a character length exceeding 1000000 are being dropped and slightly smaller messages just pass through without any delay.
import socketio
from typing import Dict, List
from time import sleep
class ClientFactory:
@staticmethod
def get_buffered_client(client_id: str, host: str, port: int, disconnect_buffer_size: int, rooms: List[str]):
sio_client = socketio.Client()
bc = BufferedClient(client_id=client_id,
host=host,
port=port,
disconnect_buffer_size=disconnect_buffer_size,
sio_client=sio_client,
rooms=rooms)
bc.connect()
return bc
class BufferedClient:
def __init__(self, client_id: str, host: str, port: int, disconnect_buffer_size: int, sio_client: socketio.Client, rooms: List[str]):
self._client_id = client_id
self._host = host
self._port = port
self._disconnect_buffer_size = disconnect_buffer_size
self._sio_client = sio_client
self.rooms = rooms
@staticmethod
def _register_callback_funs(sio_client: socketio.Client):
# on message event
sio_client.on("mlgym_event", BufferedClient.on_mlgym_event_message)
def connect(self):
self._sio_client.connect(f"{self._host}:{self._port}", wait=True, wait_timeout=20)
BufferedClient._register_callback_funs(self._sio_client)
self.emit("join", {"client_id": self._client_id, "rooms": [*self.rooms, self._client_id]})
def leave(self):
self.emit("leave", None)
def on_mlgym_event_message(data: Dict):
print(data)
def emit(self, message_key: str, message: Dict):
self._sio_client.emit(message_key, message)
if __name__ == '__main__':
client = ClientFactory.get_buffered_client(client_id="X",
host="http://localhost",
port=5000,
disconnect_buffer_size=0,
rooms=[])
sleep(10)
message_large = {"event_type": "checkpoint", "creation_ts": "ts"}
payload_large = {
"grid_search_id": "gs_id",
"experiment_id": "e_id",
"checkpoint_id": 1,
"model": "m"*1000000,
"optimizer": "o",
"stateful_components": "s"
}
message_large["payload"] = payload_large
message_small = {"event_type": "checkpoint", "creation_ts": "ts"}
payload_small = {
"grid_search_id": "gs_id",
"experiment_id": "e_id",
"checkpoint_id": 1,
"model": "m",
"optimizer": "o",
"stateful_components": "s"
}
message_small["payload"] = payload_small
client.emit("mlgym_event", message_small)
print("sent 0")
client.emit("mlgym_event", message_large)
print("sent 1")
client.emit("mlgym_event", message_small)
print("sent 2")
Server:
from flask import Flask, request
from flask_socketio import SocketIO, emit, rooms, disconnect
from typing import List
from engineio.payload import Payload
Payload.max_decode_packets = 10000
class EventSubscriberIF:
def callback(self):
raise NotImplementedError
class WebSocketServer:
def __init__(self, port: int, async_mode: str, app: Flask):
self._port = port
self._socketio = SocketIO(app, async_mode=async_mode, cors_allowed_origins=["http://localhost:5000"])
self._client_sids = []
self._init_call_backs()
def emit_server_log_message(self, data):
emit("server_log_message", data)
@property
def client_sids(self) -> List[str]:
return self._client_sids
def _init_call_backs(self):
@self._socketio.on("join")
def on_join(data):
client_sid = request.sid
self._client_sids.append(client_sid)
if 'client_id' in data:
client_id = data['client_id']
else:
client_id = "<unknown>"
self.emit_server_log_message(f"Client {client_id} joined rooms: {rooms()}")
@self._socketio.on("leave")
def on_leave():
self._client_sids.remove(request.sid)
# TODO leave all rooms
# leave_room(message['room'])
self.emit_server_log_message("You are now disconnected.")
disconnect()
@self._socketio.on("mlgym_event")
def on_mlgym_event(data):
grid_search_id = data["payload"]["grid_search_id"]
if data["event_type"] in set(["experiment_status", "job_status", "experiment_config", "evaluation_result"]):
print("mlgym_event: " + str(data))
event_id = self._room_id_to_event_storage["mlgym_event_subscribers"].add_event(grid_search_id, data)
emit('mlgym_event', {'event_id': event_id, 'data': data}, to="mlgym_event_subscribers")
elif data["event_type"] == "checkpoint":
print("received checkpoint")
else:
print(f"Unsupported event_type {data['event_type']}")
def run(self, app: Flask):
self._socketio.run(app)
if __name__ == '__main__':
app = Flask(__name__, template_folder="template")
app.config['SECRET_KEY'] = 'secret!'
# thread = socketio.start_background_task(background_thread, )
port = 5000
async_mode = "eventlet"
ws = WebSocketServer(port=port, async_mode=async_mode, app=app)
ws.run(app)
The max_http_buffer_size
parameter controls the largest size a message can have. This is a security feature to prevent your server from being attacked and rendered unusable. The default is (you guessed) 1,000,000 bytes.
I now see that the documentation for this option is actually out of date. It used to be that this parameter would only be used during polling, but at some point it was also extended to WebSocket as well. I will update the docs to reflect this.
Also, you said packets are dropped silently, but if you look at your logs there should be error messages there each time a packet is dropped.
ah perfect, thanks for the hint.
If I run the code as shown above for the client and server, there is no exception raised or error message printed. Do you receive one of those?
I did not run your code yet. But I'll review this when I update the documentation.
It seems like when you send a big message to the websocket server, the message is not received and subsequent messages are silently dropped.
I think I ran into the same issue after updating flask_socketio
. It took a lot of debugging to figure out that it wasn't flask_socketio
's fault, but the issue was caused by its dependencies (python-socketio
/python-engineio
). In my case, since I was only sending big messages during my tests, all I saw was the server executing the .emit()
successfully (no error raised) but the client never receiving anything.
It used to be that this parameter would only be used during polling, but at some point it was also extended to WebSocket as well.
I tried with different versions and the problem seems to have been introduced in python-engineio
4.7.1:
flask_socketio==5.3.6
python-socketio==5.9.0
-python-engineio==4.7.0
+python-engineio==4.7.1
(You can find a more comprehensive table of versions in https://github.com/overthesun/simoc/pull/443#issuecomment-1793856752.)
If I run the code as shown above for the client and server, there is no exception raised or error message printed.
Unless I'm missing something, I also haven't seen any error message in the logs or anywhere else either, even after enabling engineio_logger
. I would consider this a bug that needs to be fixed. If you prefer to handle it separately from the documentation issue, I can open a separate issue.
In addition, passing a different (bigger) value to max_http_buffer_size
doesn't seem to fix the problem. I can open another issue for this too.
@miguelgrinberg: let me know if you want me to run more tests and/or open more issues for the other problems mentioned here.
@ezio-melotti This issue is a documentation issue, and it was actually fixed long ago. This cannot be related to the problem you are reporting since it was filed long ago when python-engineio was in the 4.3.x versions, long before 4.7.0 and 4.7.1. Please write a new issue and include your logs.