spring-data-redis icon indicating copy to clipboard operation
spring-data-redis copied to clipboard

Message streams returned by ReactiveRedisMessageListenerContainer cannot be shared

Open vpavic opened this issue 2 years ago • 3 comments

This is something I noticed shortly after opening #2229 but failed to report it back then.

On a project I worked at that time, we wanted to reduce the number of Redis topic subscriptions by employing channel multiplexing in a way like imperative RedisMessageListenerContainer is typically used. However, our attempt to use ReactiveRedisMessageListenerContainer in similar fashion failed due to an issue/limitation I'll try to describe here.

We created ReactiveRedisMessageListenerContainer bean and used it to create message streams that would be exposed over HTTP endpoints, and noticed that when multiple clients subscribe to the same topic, the stream breaks (or better said stalls) for all subscribers the moment first client cancels their connection (and thus the apparently underlying stream as well). When some client connects again, the stream continues working for all the clients that kept their connection open.

The issue can be reproduced using sample-webflux project from webflux-channel-multiplexing branch in this repo. After starting the sample, follow the instructions from project's readme to subscribe multiple clients and then unsubscribe some.

I acknowledge that this might be an issue with how we used ReactiveRedisMessageListenerContainer (as none of use were reactive experts), but to us it appeared that message streams returned by ReactiveRedisMessageListenerContainer should be shareable in order to achieve a true channel multiplexing capability.

vpavic avatar Aug 23 '22 20:08 vpavic

The issue here is that each subscription creates a Redis subscription that is unsubscribed when cancelling the stream:

1661324939.061675 [0 127.0.0.1:50528] "SUBSCRIBE" "sample:topics:fo"
1661324940.871447 [0 127.0.0.1:50528] "SUBSCRIBE" "sample:topics:fo"
1661324942.432204 [0 127.0.0.1:50528] "UNSUBSCRIBE" "sample:topics:fo"
1661324942.432933 [0 127.0.0.1:50528] "PING"
1661324944.344752 [0 127.0.0.1:50516] "PUBLISH" "sample:topics:fo" "{\"id\":\"dee80611-b068-4899-84ee-4ff452ca87c4\",\"type\":\"sample\"}" <--- This is never consumed.

In contrast to RedisMessageListenerContainer, we have simplified the setup to avoid lazy subscriptions/unsubscriptions. If you have two subscribers to the same channel/pattern, the first unsubscribe cancels the subscription for the other one because of how Redis works.

To achieve a similar usage pattern you would need to share subscriptions within your controller and store processors that handle multiplexing in a e.g. Map<Topic, Flux<Message<…>>.

It would be possible to rewrite ReactiveRedisMessageListenerContainer to handle multiple subscribers to the same topic, but that would require a bigger rewrite.

mp911de avatar Aug 24 '22 07:08 mp911de

Thanks for the quick feedback Mark.

The issue here is that each subscription creates a Redis subscription that is unsubscribed when cancelling the stream:

Hmm, but doesn't that mean that number of subscribers as returned by ReactiveRedisOperations#convertAndSend is flawed as it always returns 1 with things set up this way? To clarify, I execute watch -n 1 "curl -s -X POST http://localhost:8080/topics/aa" in one terminal to create a constant stream of events (and observe the number of subscribers), and then use curl -N http://localhost:8080/topics/aa in several other terminals to simulate multiple clients subscribing to the same topic.

To achieve a similar usage pattern you would need to share subscriptions within your controller and store processors that handle multiplexing in a e.g. Map<Topic, Flux<Message<…>>.

I suspected something like that, but then ReactiveRedisMessageListenerContainer isn't really a complete channel multiplexing solution (and can be especially surprising to those coming from RedisMessageListenerContainer). These statements from javadoc also look a bit misleading: https://github.com/spring-projects/spring-data-redis/blob/eaef5dade76b7820da23e2b04180a283cc27e8f9/src/main/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainer.java#L62-L64

vpavic avatar Aug 24 '22 07:08 vpavic

The number of subscribers is computed from all subscribers. Typically you would have a count > 1 when using multiple connections.

channel multiplexing

is meant to subscribe to multiple channels/patterns using a single connection, not that you can subscribe to the same channel multiple times. We should improve our documentation wording to clarify that aspect.

mp911de avatar Aug 24 '22 08:08 mp911de

We revisited this topic. Reactive messaging is associated with demand/back pressure, and we don't want to create a place where we cause congestion or growing queues because of shared subscriptions.

We can (and should do) add protection against unintended un-subscriptions. That is, if you have two subscribers to the same channel and one performs an unsubscribe, the other will also unsubscribe.

mp911de avatar Nov 29 '22 10:11 mp911de