redis-py icon indicating copy to clipboard operation
redis-py copied to clipboard

(asyncio) PubSub does not automatically reconnect

Open kristjanvalur opened this issue 2 years ago • 8 comments

Version: 4.2.2

Platform: python 3.8 on windows

Description: Running a pub-sub task in a loop, requires me to manually call pubsub.connection.connect()

So:

I'm new to python redis, having previously used hiredis and written clients for UnrealEngine using it.

So, I was pleasantly surprised at how the client typically handles reconnections for you, in case there are disconnects. However, this does not happen with the PubSub client.

Please note that I am using the redis.asyncio module here.

There is a retry mechanism in PubSub._execute, which callse PubSub._disconnect_raise_connect in case of failure. However, that function only calls connect() in case of a TimeoutError.

If there is a disconnect, for example, because of the connection to the server disappearing for a few seconds, or other, the connection stays in a disconnected state.

In my tests, I killed and restarted the server and the PubSub.connection stayed in a is_connected==False state, with any read operations resulting in a ConnectionError("Connection closed by server.").

Previously the loop looked somethign like:

async def loop(self):
    await self.perform_any_subscriptions()
    while True:
        try:
            await self.loop_step()
        except redis.ConnectionError:
            pass

async def loop_step(self):
    async for message in self.pubsub.listen():
        await self.dispatch_message(message)
        break

If the connection becomes disconnected, the listen() will simply continually raise ConnectionError("Connection closed by server.").

What I had to do, as a workaround, was to do something like this, for my listening loop:

async def loop(self):
    while True:
        await self.pubsub.connection.call_with_retry(
            self.loop_step,
            lambda e: if isinstance(e, redis.ConnectionError) and self.pubsub.connection and not self.pubsub.connection.is_connected: await self.pubsubconnection.connect()
            )

(use the call_with_retry as a handy catch-all system)

I'm not sure if this is expected, or if it is an error, but from the documentation I'm reading, it seems like PubSub ought to be re-usable across disconnects, automatically reconnecting any previous subscriptions, etc.

UPDATE:

After PR #2148, the async PubSub class now has a connect() method, which can be used to remedy this. The top loop becomes:

async def loop(self):
    await self.perform_any_subscriptions()
    while True:
        try:
            await self.pubsub.connect()
            await self.loop_step()
        except redis.ConnectionError:
            pass

Previously it was not possible to reconnect a PubSub object after a ConnectionError without issuing a "subscribe()" or "unsubscribe"

kristjanvalur avatar Apr 06 '22 10:04 kristjanvalur

Does the synchronous version auto-reconnect?

agronholm avatar May 04 '22 22:05 agronholm

Good question. I'd guess not, because the "connect" feature is part of the PubSub.execute() which only gets invoked as part of a subscribe/unsubscribe, not as part of a get_message. But I'll try to see if I can create a similar unittest for syncronous (although that is probably more tedious)

kristjanvalur avatar May 05 '22 08:05 kristjanvalur

Finally got round to do this. Wrtingin the unit-tests for automatic PubSub reconnectin in syncronous PubSub (#2256) confirms that syncronous version does indeed auto-connect. It would appear to be the intention that things behave in this way. And so, it is my opitnion that the asyncio PubSub should do so as well.

kristjanvalur avatar Jun 27 '22 14:06 kristjanvalur

So, what should I do? Should I attempt to provide a PR (or extend #2256) to make the corresponding fix to the async version? @agronholm ? Are there any maintainers here who have an opinion on these things? @chayim ?

kristjanvalur avatar Jul 09 '22 11:07 kristjanvalur

The best course of action would probably be to make a new PR that adapts the tests from #2256 to async and implements that functionality on the async side.

agronholm avatar Jul 09 '22 12:07 agronholm

@kristjanvalur Agree with @agronholm, if you can provide a new PR that will be great!

dvora-h avatar Jul 11 '22 09:07 dvora-h

Will do. Working on getting the tests clear and indicative of the problem.

kristjanvalur avatar Jul 11 '22 11:07 kristjanvalur

So, I spent some time on making these unittests and having them reliable. In the process, I discovered that the issue was only present when using pubsub.listen() and not pubsub.get_message(), and that it was an issue in both sync and async versions of the library. The pr #2281 addresses this issue.

Researching this, I noticed how very much the async library is just a straight copy of the synchronous library. There are many inefficiencies, such as the use of can_read which is superfluous in the async library. I expect I'll make a separate PR to simplify the async code a tiny bit, while maintaining api compatibility.

kristjanvalur avatar Jul 17 '22 10:07 kristjanvalur