redis-py
redis-py copied to clipboard
(asyncio) PubSub does not automatically reconnect
Version: 4.2.2
Platform: python 3.8 on windows
Description: Running a pub-sub task in a loop, requires me to manually call pubsub.connection.connect()
So:
I'm new to python redis, having previously used hiredis and written clients for UnrealEngine using it.
So, I was pleasantly surprised at how the client typically handles reconnections for you, in case there are disconnects. However, this does not happen with the PubSub client.
Please note that I am using the redis.asyncio
module here.
There is a retry mechanism in PubSub._execute, which callse PubSub._disconnect_raise_connect in case of failure.
However, that function only calls connect()
in case of a TimeoutError.
If there is a disconnect, for example, because of the connection to the server disappearing for a few seconds, or other, the connection stays in a disconnected state.
In my tests, I killed and restarted the server and the PubSub.connection
stayed in a is_connected==False
state, with any read operations resulting in a ConnectionError("Connection closed by server.")
.
Previously the loop looked somethign like:
async def loop(self):
await self.perform_any_subscriptions()
while True:
try:
await self.loop_step()
except redis.ConnectionError:
pass
async def loop_step(self):
async for message in self.pubsub.listen():
await self.dispatch_message(message)
break
If the connection becomes disconnected, the listen()
will simply continually raise ConnectionError("Connection closed by server.")
.
What I had to do, as a workaround, was to do something like this, for my listening loop:
async def loop(self):
while True:
await self.pubsub.connection.call_with_retry(
self.loop_step,
lambda e: if isinstance(e, redis.ConnectionError) and self.pubsub.connection and not self.pubsub.connection.is_connected: await self.pubsubconnection.connect()
)
(use the call_with_retry as a handy catch-all system)
I'm not sure if this is expected, or if it is an error, but from the documentation I'm reading, it seems like PubSub ought to be re-usable across disconnects, automatically reconnecting any previous subscriptions, etc.
UPDATE:
After PR #2148, the async PubSub
class now has a connect()
method, which can be used to remedy
this. The top loop becomes:
async def loop(self):
await self.perform_any_subscriptions()
while True:
try:
await self.pubsub.connect()
await self.loop_step()
except redis.ConnectionError:
pass
Previously it was not possible to reconnect a PubSub
object after a ConnectionError without issuing a "subscribe()" or "unsubscribe"
Does the synchronous version auto-reconnect?
Good question. I'd guess not, because the "connect" feature is part of the PubSub.execute()
which only gets invoked as part of a subscribe/unsubscribe, not as part of a get_message
. But I'll try to see if I can create a similar unittest for syncronous (although that is probably more tedious)
Finally got round to do this. Wrtingin the unit-tests for automatic PubSub reconnectin in syncronous PubSub (#2256) confirms that syncronous version does indeed auto-connect. It would appear to be the intention that things behave in this way. And so, it is my opitnion that the asyncio PubSub should do so as well.
So, what should I do? Should I attempt to provide a PR (or extend #2256) to make the corresponding fix to the async version? @agronholm ? Are there any maintainers here who have an opinion on these things? @chayim ?
The best course of action would probably be to make a new PR that adapts the tests from #2256 to async and implements that functionality on the async side.
@kristjanvalur Agree with @agronholm, if you can provide a new PR that will be great!
Will do. Working on getting the tests clear and indicative of the problem.
So, I spent some time on making these unittests and having them reliable. In the process, I discovered that the issue was only present when using pubsub.listen()
and not pubsub.get_message()
, and that it was an issue in both sync and async versions of the library. The pr #2281 addresses this issue.
Researching this, I noticed how very much the async library is just a straight copy of the synchronous library. There are many inefficiencies, such as the use of can_read
which is superfluous in the async library. I expect I'll make a separate PR to simplify the async code a tiny bit, while maintaining api compatibility.