spring-data-redis
spring-data-redis copied to clipboard
Message streams returned by ReactiveRedisMessageListenerContainer cannot be shared
This is something I noticed shortly after opening #2229 but failed to report it back then.
On a project I worked at that time, we wanted to reduce the number of Redis topic subscriptions by employing channel multiplexing in a way like imperative RedisMessageListenerContainer
is typically used. However, our attempt to use ReactiveRedisMessageListenerContainer
in similar fashion failed due to an issue/limitation I'll try to describe here.
We created ReactiveRedisMessageListenerContainer
bean and used it to create message streams that would be exposed over HTTP endpoints, and noticed that when multiple clients subscribe to the same topic, the stream breaks (or better said stalls) for all subscribers the moment first client cancels their connection (and thus the apparently underlying stream as well). When some client connects again, the stream continues working for all the clients that kept their connection open.
The issue can be reproduced using sample-webflux
project from webflux-channel-multiplexing
branch in this repo. After starting the sample, follow the instructions from project's readme to subscribe multiple clients and then unsubscribe some.
I acknowledge that this might be an issue with how we used ReactiveRedisMessageListenerContainer
(as none of use were reactive experts), but to us it appeared that message streams returned by ReactiveRedisMessageListenerContainer
should be shareable in order to achieve a true channel multiplexing capability.
The issue here is that each subscription creates a Redis subscription that is unsubscribed when cancelling the stream:
1661324939.061675 [0 127.0.0.1:50528] "SUBSCRIBE" "sample:topics:fo"
1661324940.871447 [0 127.0.0.1:50528] "SUBSCRIBE" "sample:topics:fo"
1661324942.432204 [0 127.0.0.1:50528] "UNSUBSCRIBE" "sample:topics:fo"
1661324942.432933 [0 127.0.0.1:50528] "PING"
1661324944.344752 [0 127.0.0.1:50516] "PUBLISH" "sample:topics:fo" "{\"id\":\"dee80611-b068-4899-84ee-4ff452ca87c4\",\"type\":\"sample\"}" <--- This is never consumed.
In contrast to RedisMessageListenerContainer
, we have simplified the setup to avoid lazy subscriptions/unsubscriptions. If you have two subscribers to the same channel/pattern, the first unsubscribe cancels the subscription for the other one because of how Redis works.
To achieve a similar usage pattern you would need to share subscriptions within your controller and store processors that handle multiplexing in a e.g. Map<Topic, Flux<Message<…>>
.
It would be possible to rewrite ReactiveRedisMessageListenerContainer
to handle multiple subscribers to the same topic, but that would require a bigger rewrite.
Thanks for the quick feedback Mark.
The issue here is that each subscription creates a Redis subscription that is unsubscribed when cancelling the stream:
Hmm, but doesn't that mean that number of subscribers as returned by ReactiveRedisOperations#convertAndSend
is flawed as it always returns 1 with things set up this way? To clarify, I execute watch -n 1 "curl -s -X POST http://localhost:8080/topics/aa"
in one terminal to create a constant stream of events (and observe the number of subscribers), and then use curl -N http://localhost:8080/topics/aa
in several other terminals to simulate multiple clients subscribing to the same topic.
To achieve a similar usage pattern you would need to share subscriptions within your controller and store processors that handle multiplexing in a e.g.
Map<Topic, Flux<Message<…>>
.
I suspected something like that, but then ReactiveRedisMessageListenerContainer
isn't really a complete channel multiplexing solution (and can be especially surprising to those coming from RedisMessageListenerContainer
). These statements from javadoc also look a bit misleading:
https://github.com/spring-projects/spring-data-redis/blob/eaef5dade76b7820da23e2b04180a283cc27e8f9/src/main/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainer.java#L62-L64
The number of subscribers is computed from all subscribers. Typically you would have a count > 1 when using multiple connections.
channel multiplexing
is meant to subscribe to multiple channels/patterns using a single connection, not that you can subscribe to the same channel multiple times. We should improve our documentation wording to clarify that aspect.
We revisited this topic. Reactive messaging is associated with demand/back pressure, and we don't want to create a place where we cause congestion or growing queues because of shared subscriptions.
We can (and should do) add protection against unintended un-subscriptions. That is, if you have two subscribers to the same channel and one performs an unsubscribe, the other will also unsubscribe.