Reactive API observeChannels does not propagate sharded messages
Bug Report
The observeChannels() API does not propagate sharded pub/sub messages.
Current Behavior
Was switching some existing code that was using classic pub/sub to sharded pub/sub. When we switched to subscribing with ssubscribe I was confused why messages suddenly stopped flowing and it turns out that the RedisPubSubAdapter<> that observeChannels creates doesn't override the smessage method so it's simply dropping them on the floor.
Input Code
The original code boils down to simply doing:
connection.reactive().observeChannels()
.map(cm -> {
// ... do stuff w/message here ...
});
This still "works" from an API call perspective obviously, but never receives sharded messages.
Expected behavior/code
I would expect that sharded messages flow through this API as well for a consistent experience.
Environment
- Lettuce version(s): 6.6.0.RELEASE
- Redis version: 7.x (n/a here really)
Possible Solution
Here's the current implementation from main as of writing that creates a RedisPubSubAdapter<>:
https://github.com/redis/lettuce/blob/ec10c602890c7a10318e564335412b00cea75a82/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java#L98-L105
Could also just add an override of the smessage callback method on the RedisPubSubAdapter<> created and tunnel those through to the sink:
@Override
public void smessage(K shardChannel, V message) {
sink.next(new ChannelMessage<>(shardChannel, message));
}
Hello @drub0y Yes, the solution you are proposing is a possible one. However i can see that PubSubListener and PubSubClusterListener provide default implementation of those method by delegating to their non-sharded counterparts. So i wonder is it necessary at all that the adapters (Standalone and Cluster) override those methods in the first place. CC: @tishun @atakavci
I think this was missed when we were introducing the #2758 so it is a missing functionality
After discussing this with @a-TODO-rov I think there are two general ways to solve it:
- Implement another set of observeShardedMessage() methods, similar to
observePatterns()andobserveChannels() - (recommended) deprecate existing
observePatterns()andobserveChannels()methods and implement another one that accepts a custom RedisPubSubAdapter that would allow the user to choose which methods to listen in the sink.
Hi @tishun @a-TODO-rov !
(recommended) deprecate existing observePatterns() and observeChannels() methods and implement another one that accepts a custom RedisPubSubAdapter that would allow the user to choose which methods to listen in the sink.
Would it be okay if I created a PR following the approach you mentioned ? ?
Thanks
Yes, @bandalgomsu feel free to contribute to this one :)