python-socketio icon indicating copy to clipboard operation
python-socketio copied to clipboard

TimeoutError AsyncRedisManager with redis+sentinel

Open bl4ckwind opened this issue 7 months ago • 3 comments

Describe the bug When trying to use a AsyncRedisManager with a Redis Sentinel config a TimeoutError occurs

To Reproduce

import socketio
from socketio import AsyncServer
from socketio import AsyncRedisManager

app = FastAPI()

url = (
    "redis+sentinel://"
    "h1.net:26379,"
    "h2.net:26379,"
    "h3.net:26379"
    "/0/mymaster"
)

mgr = AsyncRedisManager(
    url,
    redis_options={
        "username": "...",
        "password": "...",
        "ssl": True,
        "sentinel_kwargs": {"ssl": True},
        "socket_timeout": 5,
        "socket_connect_timeout": 5,
    },
)
sio = AsyncServer(async_mode="asgi", cors_allowed_origins="*", client_manager=mgr)
app = socketio.ASGIApp(sio, app)


@sio.event
async def connect(sid, environ):
    print(f"Client connected: {sid}")
    await sio.emit("welcome", {"msg": "connected"}, to=sid)


@sio.event
async def ping(sid, data):
    print(f"Ping received: {data}")
    await sio.emit("pong", {"data": data}, to=sid)

uvicorn test:app --host 0.0.0.0 --port 5000 Fails after the 5 seconds with:

Cannot receive from redis... retrying in 1 secs

With added trace:

Traceback (most recent call last): File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\connection.py", line 541, in read_response response = await self._parser.read_response( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis_parsers\resp2.py", line 82, in read_response response = await self._read_response(disable_decoding=disable_decoding) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis_parsers\resp2.py", line 90, in _read_response raw = await self._readline() ^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis_parsers\base.py", line 219, in _readline data = await self._stream.readline() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u\AppData\Local\Programs\Python\Python311\Lib\asyncio\streams.py", line 563, in readline line = await self.readuntil(sep) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u\AppData\Local\Programs\Python\Python311\Lib\asyncio\streams.py", line 655, in readuntil await self._wait_for_data('readuntil') File "C:\Users\u\AppData\Local\Programs\Python\Python311\Lib\asyncio\streams.py", line 540, in _wait_for_data await self._waiter asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\connection.py", line 540, in read_response async with async_timeout(read_timeout): File "C:\Users\u\AppData\Local\Programs\Python\Python311\Lib\asyncio\timeouts.py", line 115, in aexit raise TimeoutError from exc_val TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\socketio\async_redis_manager.py", line 110, in _redis_listen_with_retries async for message in self.pubsub.listen(): File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\client.py", line 1035, in listen response = await self.handle_message(await self.parse_response(block=True)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\client.py", line 922, in parse_response response = await self._execute( ^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\client.py", line 902, in _execute return await conn.retry.call_with_retry( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\retry.py", line 62, in call_with_retry await fail(error) File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\client.py", line 891, in _disconnect_raise_connect raise error File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\retry.py", line 59, in call_with_retry return await do() ^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\sentinel.py", line 74, in read_response return await super().read_response( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\connection.py", line 559, in read_response raise TimeoutError(f"Timeout reading from {host_error}") redis.exceptions.TimeoutError: Timeout reading from h2.net:6379

The redis sentinel connection in isolation works:

from redis.asyncio.sentinel import Sentinel
redis_options={'username': '...', 'password': '...', 'ssl': True, 'sentinel_kwargs': {'ssl': True}, 'socket_timeout': 1, 'db': 0}
sentinel = Sentinel([("h1.net", 26379),("h2", 26379),("h3", 26379)], **redis_options)
print(sentinel.__dict__)
r = sentinel.master_for("mymaster") 

bl4ckwind avatar Apr 24 '25 09:04 bl4ckwind

@bl4ckwind Are you 100% sure your the test that you claim works is equivalent to what is done inside the Socket.IO server? The code that you shared at the bottom of your message configures the Redis Sentinel and stops there, without creating or using a Redis instance. But the stack trace that you shared above that shows a timeout error when talking to one of your Redis nodes, which suggests that the first part where the connection to the Sentinel node is made also was successful.

As far as I can see the connection arguments in your example are almost identical to the arguments that are sent by the Socket.IO server, so to me it looks like at least one of your Redis nodes (the one you have running on h2.net) is timing out.

miguelgrinberg avatar Apr 27 '25 16:04 miguelgrinberg

from redis.asyncio.sentinel import Sentinel
redis_options={'username': '...', 'password': '...', 'ssl': True, 'sentinel_kwargs': {'ssl': True}, 'socket_timeout': 1, 'db': 0}
sentinel = Sentinel([("h1.net", 26379),("h2", 26379),("h3", 26379)], **redis_options)
r = sentinel.master_for("mymaster")                   
response = await r.set("key", "value", ex=1)

This works just fine

This edit in the async_redis_manager.py also works


    async def _test(self):
        await self.redis.set("key", "val", ex=1)
    def _redis_connect(self):
        if not self.redis_url.startswith('redis+sentinel://'):
            self.redis = aioredis.Redis.from_url(self.redis_url,
                                                 **self.redis_options)
        else:
            sentinels, service_name, connection_kwargs = \
                parse_redis_sentinel_url(self.redis_url)

            kwargs = self.redis_options
            kwargs.update(connection_kwargs)

            sentinel = Sentinel(sentinels, **kwargs)
            self.redis = sentinel.master_for(service_name or self.channel)
            # get asyn loop
            loop = asyncio.get_event_loop()
            loop.create_task(self._test())
            
        self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)

But i missed an important detail. Only when i connect with my test client, the timeout occurs AFTER the client disconnect.

import asyncio
import socketio

# Create a Socket.IO client instance
sio = socketio.AsyncClient()


@sio.event
async def connect():
    print("Connected to server")


@sio.event
async def disconnect():
    print("Disconnected from server")


@sio.event
async def welcome(data):
    print(f"Received welcome message: {data}")


@sio.event
async def pong(data):
    print(f"Received pong: {data}")


async def main():
    # Connect to the server
    await sio.connect("http://localhost:5000")

    try:
        # Send a ping event
        print("Sending ping...")
        await sio.emit("ping", "test message")

        # Wait a bit to receive responses
        await asyncio.sleep(2)

        # Disconnect
        await sio.disconnect()
    except Exception as e:
        print(f"Error: {e}")


if __name__ == "__main__":
    asyncio.run(main())

Client:

Received welcome message: {'msg': 'connected'} Connected to server Sending ping... Received pong: {'data': 'test message'} Disconnected from server

Server with printed message from async def _redis_listen_with_retries(self) :

INFO: Started server process [19576] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:5000 (Press CTRL+C to quit) ROFL INFO: 127.0.0.1:50983 - "GET /socket.io/?transport=polling&EIO=4&t=1745830187.7274513 HTTP/1.1" 200 OK INFO: ('127.0.0.1', 50983) - "WebSocket /socket.io/?transport=websocket&EIO=4&sid=5DM7RjGG-v3BArAWAAAA&t=1745830187.7328672" [accepted] INFO: connection open Client connected: wfyobAQGVswZX55zAAAB MESSAGE {'type': 'message', 'pattern': None, 'channel': b'socketio', 'data': b'\x80\x04\x95\xb6\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x06method\x94\x8c\x04emit\x94\x8c\x05event\x94\x8c\x07welcome\x94\x8c\x04data\x94}\x94\x8c\x03msg\x94\x8c\tconnected\x94s\x8c\tnamespace\x94\x8c\x01/\x94\x8c\x04room\x94\x8c\x14wfyobAQGVswZX55zAAAB\x94\x8c\x08skip_sid\x94N\x8c\x08callback\x94N\x8c\x07host_id\x94\x8c 277dfab116eb4ab68123eddbfa89cae2\x94u.'} Ping received: test message MESSAGE {'type': 'message', 'pattern': None, 'channel': b'socketio', 'data': b'\x80\x04\x95\xb2\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x06method\x94\x8c\x04emit\x94\x8c\x05event\x94\x8c\x04pong\x94\x8c\x04data\x94}\x94h\x05\x8c\x0ctest message\x94s\x8c\tnamespace\x94\x8c\x01/\x94\x8c\x04room\x94\x8c\x14wfyobAQGVswZX55zAAAB\x94\x8c\x08skip_sid\x94N\x8c\x08callback\x94N\x8c\x07host_id\x94\x8c 277dfab116eb4ab68123eddbfa89cae2\x94u.'} INFO: connection closed Cannot receive from redis... retrying in 1 secs Traceback (most recent call last): File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\connection.py", line 541, in read_response response = await self._parser.read_response( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis_parsers\resp2.py", line 82, in read_response response = await self._read_response(disable_decoding=disable_decoding) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis_parsers\resp2.py", line 90, in _read_response raw = await self._readline() ^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis_parsers\base.py", line 219, in _readline data = await self._stream.readline() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u\AppData\Local\Programs\Python\Python311\Lib\asyncio\streams.py", line 563, in readline line = await self.readuntil(sep) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u\AppData\Local\Programs\Python\Python311\Lib\asyncio\streams.py", line 655, in readuntil await self._wait_for_data('readuntil') File "C:\Users\u\AppData\Local\Programs\Python\Python311\Lib\asyncio\streams.py", line 540, in _wait_for_data await self._waiter asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\connection.py", line 540, in read_response async with async_timeout(read_timeout): File "C:\Users\u\AppData\Local\Programs\Python\Python311\Lib\asyncio\timeouts.py", line 115, in aexit raise TimeoutError from exc_val TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\socketio\async_redis_manager.py", line 110, in _redis_listen_with_retries async for message in self.pubsub.listen(): File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\client.py", line 1035, in listen response = await self.handle_message(await self.parse_response(block=True)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\client.py", line 922, in parse_response response = await self._execute( ^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\client.py", line 902, in _execute return await conn.retry.call_with_retry( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\retry.py", line 62, in call_with_retry await fail(error) File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\client.py", line 891, in _disconnect_raise_connect raise error File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\retry.py", line 59, in call_with_retry return await do() ^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\sentinel.py", line 74, in read_response return await super().read_response( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\u.virtualenvs\proj-FiwSyrnc\Lib\site-packages\redis\asyncio\connection.py", line 559, in read_response raise TimeoutError(f"Timeout reading from {host_error}") redis.exceptions.TimeoutError: Timeout reading from h2.net:6379

bl4ckwind avatar Apr 28 '25 09:04 bl4ckwind

@bl4ckwind I still fail to see how the disconnect event is related to this. This is basically a timeout error generated by your h2.net Redis node, right? Or am I misinterpreting something?

There is one suspicious thing that I see in your stack trace, which is that there is a CancelledError at the top of the stack trace. Not sure what's that about to be honest. From the Socket.IO server side a client disconnecting has no influence on the Redis connection, which is used server-wide, so I guess I need help understanding how a disconnection can end up causing this, if these two events in fact are related.

miguelgrinberg avatar May 11 '25 16:05 miguelgrinberg