python-socketio icon indicating copy to clipboard operation
python-socketio copied to clipboard

Connection silently drops after large message

Open le1nux opened this issue 2 years ago • 3 comments

Description It seems like when you send a big message to the websocket server, the message is not received and subsequent messages are silently dropped.

Code To Reproduce

Client: Note that we send different sizes of messages to the server (message_large, message_small). Only the first message_small is received by the server properly and the subsequent message_large and message_small are being dropped. It seems like messages with a character length exceeding 1000000 are being dropped and slightly smaller messages just pass through without any delay.

import socketio
from typing import Dict, List
from time import sleep


class ClientFactory:

    @staticmethod
    def get_buffered_client(client_id: str, host: str, port: int, disconnect_buffer_size: int, rooms: List[str]):
        sio_client = socketio.Client()
        bc = BufferedClient(client_id=client_id,
                            host=host,
                            port=port,
                            disconnect_buffer_size=disconnect_buffer_size,
                            sio_client=sio_client,
                            rooms=rooms)
        bc.connect()
        return bc


class BufferedClient:

    def __init__(self, client_id: str, host: str, port: int, disconnect_buffer_size: int, sio_client: socketio.Client, rooms: List[str]):
        self._client_id = client_id
        self._host = host
        self._port = port
        self._disconnect_buffer_size = disconnect_buffer_size
        self._sio_client = sio_client
        self.rooms = rooms

    @staticmethod
    def _register_callback_funs(sio_client: socketio.Client):
        # on message event
        sio_client.on("mlgym_event", BufferedClient.on_mlgym_event_message)

    def connect(self):
        self._sio_client.connect(f"{self._host}:{self._port}", wait=True, wait_timeout=20)
        BufferedClient._register_callback_funs(self._sio_client)
        self.emit("join", {"client_id": self._client_id, "rooms": [*self.rooms, self._client_id]})

    def leave(self):
        self.emit("leave", None)

    def on_mlgym_event_message(data: Dict):
        print(data)

    def emit(self, message_key: str,  message: Dict):
        self._sio_client.emit(message_key, message)

if __name__ == '__main__':
    client = ClientFactory.get_buffered_client(client_id="X",
                                               host="http://localhost",
                                               port=5000,
                                               disconnect_buffer_size=0,
                                               rooms=[])
    sleep(10)
    message_large = {"event_type": "checkpoint", "creation_ts": "ts"}
    payload_large = {
        "grid_search_id": "gs_id",
        "experiment_id": "e_id",
        "checkpoint_id": 1,
        "model": "m"*1000000,
        "optimizer": "o",
        "stateful_components": "s"
    }
    message_large["payload"] = payload_large

    message_small = {"event_type": "checkpoint", "creation_ts": "ts"}
    payload_small = {
        "grid_search_id": "gs_id",
        "experiment_id": "e_id",
        "checkpoint_id": 1,
        "model": "m",
        "optimizer": "o",
        "stateful_components": "s"
    }
    message_small["payload"] = payload_small

    client.emit("mlgym_event", message_small)
    print("sent 0")

    client.emit("mlgym_event", message_large)
    print("sent 1")

    client.emit("mlgym_event", message_small)
    print("sent 2")

Server:

from flask import Flask, request
from flask_socketio import SocketIO, emit, rooms, disconnect
from typing import List
from engineio.payload import Payload

Payload.max_decode_packets = 10000


class EventSubscriberIF:

    def callback(self):
        raise NotImplementedError


class WebSocketServer:

    def __init__(self, port: int, async_mode: str, app: Flask):
        self._port = port
        self._socketio = SocketIO(app, async_mode=async_mode, cors_allowed_origins=["http://localhost:5000"])
        self._client_sids = []
        self._init_call_backs()

    def emit_server_log_message(self, data):
        emit("server_log_message", data)

    @property
    def client_sids(self) -> List[str]:
        return self._client_sids

    def _init_call_backs(self):

        @self._socketio.on("join")
        def on_join(data):
            client_sid = request.sid
            self._client_sids.append(client_sid)
            if 'client_id' in data:
                client_id = data['client_id']
            else:
                client_id = "<unknown>"
            self.emit_server_log_message(f"Client {client_id} joined rooms: {rooms()}")

        @self._socketio.on("leave")
        def on_leave():
            self._client_sids.remove(request.sid)
            # TODO  leave all rooms
            # leave_room(message['room'])
            self.emit_server_log_message("You are now disconnected.")
            disconnect()

        @self._socketio.on("mlgym_event")
        def on_mlgym_event(data):
            grid_search_id = data["payload"]["grid_search_id"]
            if data["event_type"] in set(["experiment_status", "job_status", "experiment_config", "evaluation_result"]):
                print("mlgym_event: " + str(data))
                event_id = self._room_id_to_event_storage["mlgym_event_subscribers"].add_event(grid_search_id, data)
                emit('mlgym_event', {'event_id': event_id, 'data': data}, to="mlgym_event_subscribers")
            elif data["event_type"] == "checkpoint":
                print("received checkpoint")
            else:
                print(f"Unsupported event_type {data['event_type']}")

    def run(self, app: Flask):
        self._socketio.run(app)


if __name__ == '__main__':
    app = Flask(__name__, template_folder="template")
    app.config['SECRET_KEY'] = 'secret!'

    # thread = socketio.start_background_task(background_thread, )
    port = 5000
    async_mode = "eventlet"

    ws = WebSocketServer(port=port, async_mode=async_mode, app=app)

    ws.run(app)

le1nux avatar Aug 23 '22 09:08 le1nux

The max_http_buffer_size parameter controls the largest size a message can have. This is a security feature to prevent your server from being attacked and rendered unusable. The default is (you guessed) 1,000,000 bytes.

I now see that the documentation for this option is actually out of date. It used to be that this parameter would only be used during polling, but at some point it was also extended to WebSocket as well. I will update the docs to reflect this.

Also, you said packets are dropped silently, but if you look at your logs there should be error messages there each time a packet is dropped.

miguelgrinberg avatar Aug 23 '22 10:08 miguelgrinberg

ah perfect, thanks for the hint.

If I run the code as shown above for the client and server, there is no exception raised or error message printed. Do you receive one of those?

le1nux avatar Aug 23 '22 10:08 le1nux

I did not run your code yet. But I'll review this when I update the documentation.

miguelgrinberg avatar Aug 23 '22 11:08 miguelgrinberg

It seems like when you send a big message to the websocket server, the message is not received and subsequent messages are silently dropped.

I think I ran into the same issue after updating flask_socketio. It took a lot of debugging to figure out that it wasn't flask_socketio's fault, but the issue was caused by its dependencies (python-socketio/python-engineio). In my case, since I was only sending big messages during my tests, all I saw was the server executing the .emit() successfully (no error raised) but the client never receiving anything.

It used to be that this parameter would only be used during polling, but at some point it was also extended to WebSocket as well.

I tried with different versions and the problem seems to have been introduced in python-engineio 4.7.1:

 flask_socketio==5.3.6
 python-socketio==5.9.0
-python-engineio==4.7.0
+python-engineio==4.7.1

(You can find a more comprehensive table of versions in https://github.com/overthesun/simoc/pull/443#issuecomment-1793856752.)

If I run the code as shown above for the client and server, there is no exception raised or error message printed.

Unless I'm missing something, I also haven't seen any error message in the logs or anywhere else either, even after enabling engineio_logger. I would consider this a bug that needs to be fixed. If you prefer to handle it separately from the documentation issue, I can open a separate issue.

In addition, passing a different (bigger) value to max_http_buffer_size doesn't seem to fix the problem. I can open another issue for this too.

@miguelgrinberg: let me know if you want me to run more tests and/or open more issues for the other problems mentioned here.

ezio-melotti avatar Nov 05 '23 21:11 ezio-melotti

@ezio-melotti This issue is a documentation issue, and it was actually fixed long ago. This cannot be related to the problem you are reporting since it was filed long ago when python-engineio was in the 4.3.x versions, long before 4.7.0 and 4.7.1. Please write a new issue and include your logs.

miguelgrinberg avatar Nov 06 '23 19:11 miguelgrinberg