redis-py icon indicating copy to clipboard operation
redis-py copied to clipboard

Redis pubsub task cancellation raises ConnectionError

Open cosminacho opened this issue 1 year ago • 4 comments

Version: 4.5.4

Platform: Windows / Linux - Python 3.10.10

Description: When cancelling a pubsub task that is listening for messages, the ConnectionError exception is thrown. This seems like an internal library problem because I can't catch the exception inside redis_subscribe function and the exception is thrown only after the CancellationError is raised inside the task.

Is this the intended behaviour? If so, what is an elegant work-around?

This code used to work in the previous versions of redis (4.5.3 an below).

redis_client is a global variable, initialized on the startup event of a server. The redis_subscribe function is supposed to be called inside endpoints that feed messages using websockets or server sent events. So the cancellation of the task needs to happen when for example a client disconnects from the server.

from redis.asyncio import Redis

import asyncio

redis_client = Redis()

async def redis_subscribe():
    async with redis_client.pubsub() as pubsub:
        pubsub.subscribe("channel")
        async for message in pubsub.listen():
            print(message)

task = asyncio.create_task(redis_subscribe())

asyncio.sleep(5)

task.cancel()

asyncio.sleep(5)
Task exception was never retrieved
future: <Task finished name='Task-27' coro=<Retry.call_with_retry() done, defined at redis/asyncio/retry.py:46> exception=ConnectionError('Connection closed by server.')>
Traceback (most recent call last):
  File "redis/asyncio/retry.py", line 62, in call_with_retry
    await fail(error)
  File "redis/asyncio/client.py", line 779, in _disconnect_raise_connect
    raise error
  File "redis/asyncio/retry.py", line 59, in call_with_retry
    return await do()
  File "redis/asyncio/client.py", line 784, in _try_execute
    return await command(*arg, **kwargs)
  File "redis/asyncio/connection.py", line 840, in read_response
    response = await self._parser.read_response(
  File "redis/asyncio/connection.py", line 413, in read_response
    await self.read_from_socket()
  File "redis/asyncio/connection.py", line 396, in read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
redis.exceptions.ConnectionError: Connection closed by server.

cosminacho avatar Apr 21 '23 09:04 cosminacho

See https://github.com/redis/redis-py/pull/2666#issuecomment-1514452720 for the explanation why this happens. PR https://github.com/redis/redis-py/pull/2719 would potentially fix this.

agronholm avatar Apr 22 '23 19:04 agronholm

@agronholm I don't think #2719 will fix it because it fixes the sync connection and this issue is in the async connection (maybe #2695 will fix it).

dvora-h avatar May 01 '23 08:05 dvora-h

@cosminacho Can you confirm that the issue solved in #2695?

dvora-h avatar Jul 16 '23 22:07 dvora-h

I am still seeing this issue in 5.02

schutztj avatar Mar 07 '24 07:03 schutztj