aioredis-py
aioredis-py copied to clipboard
Pubsub does not auto-reconnect with get_message()
Counterpart of https://github.com/andymccurdy/redis-py/issues/1572
Run the following script:
import itertools
import traceback
import asyncio
import aioredis
async def consumer(pubsub):
await pubsub.subscribe('test')
while True:
try:
message = await pubsub.get_message(
ignore_subscribe_messages=True,
timeout=1.0,
)
if message:
print(f'Receive: {message}')
except asyncio.CancelledError:
raise
except Exception:
traceback.print_exc()
await asyncio.sleep(1.0)
async def producer(redis):
for i in itertools.count():
try:
print(f'Publish {i}')
await redis.publish('test', str(i))
except Exception:
traceback.print_exc()
await asyncio.sleep(1.0)
loop = asyncio.get_event_loop()
redis = aioredis.from_url('redis://', decode_responses=True)
loop.create_task(consumer(redis.pubsub()))
loop.run_until_complete(producer(redis))
While running, stop the Redis server and then start it again.
Expected behavior: publish()
and get_message()
will fail while server is stopped, then succeed again after server restarts.
Observed behavior: publish()
resumes working, but get_message()
continues to fail with the following traceback:
Traceback (most recent call last):
File "/Users/luhn/Code/aioredis-py/test.py", line 12, in consumer
message = await pubsub.get_message(
File "/Users/luhn/Code/aioredis-py/aioredis/client.py", line 4154, in get_message
response = await self.parse_response(block=False, timeout=timeout)
File "/Users/luhn/Code/aioredis-py/aioredis/client.py", line 4034, in parse_response
if not block and not await conn.can_read(timeout=timeout):
File "/Users/luhn/Code/aioredis-py/aioredis/connection.py", line 850, in can_read
return await self._parser.can_read(timeout)
File "/Users/luhn/Code/aioredis-py/aioredis/connection.py", line 453, in can_read
return await self.read_from_socket(timeout=timeout, raise_on_timeout=False)
File "/Users/luhn/Code/aioredis-py/aioredis/connection.py", line 464, in read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
aioredis.exceptions.ConnectionError: Connection closed by server.
This is great code, but I believe we had a discussion before about how this was not the job of redis-py or aioredis. In my personal belief, I agree with this since people may want to be able to have a custom implementation for server failing.
Currently when a connection breaks, get_message()
behaves differently than Redis.publish()
, PubSub.subscribe()
, or whatever your favorite command might be. That's why I'm reporting a bug. It should work exactly the same as everything else.
There's even a whole connect callback that will resubscribe to all subscribe topics upon reconnection, so obviously the implementation is meant to reconnect.
I don't see where publish
has a reconnection option. subscribe
needs reconnection since it's long living, whereas publish (which I didn't see have a reconnect option) and get_message
are single execution commands.
Maybe "reconnect" is the wrong word. All get_message
needs to do is conn.disconnect()
when a ConnectionError
occurs. Connection
will handle the rest.
I'm not sure what publish()
does behind the scenes, but the effect is the same—The next call to publish()
will use a new connection.
FWIW, the equivalent issue+PR I opened with redis-py has been merged. https://github.com/andymccurdy/redis-py/issues/1572
@luhn apologies, I never had a chance to respond, especially after I made sense of "Maybe "reconnect" is the wrong word". Thanks for also linking to redis-py. I'll probably be skipping your PR since I'll be making a huge PR port from redis-py tomorrow.
Thanks for creating the issue though!