lettuce icon indicating copy to clipboard operation
lettuce copied to clipboard

Reactive API observeChannels does not propagate sharded messages

Open drub0y opened this issue 1 month ago • 4 comments

Bug Report

The observeChannels() API does not propagate sharded pub/sub messages.

Current Behavior

Was switching some existing code that was using classic pub/sub to sharded pub/sub. When we switched to subscribing with ssubscribe I was confused why messages suddenly stopped flowing and it turns out that the RedisPubSubAdapter<> that observeChannels creates doesn't override the smessage method so it's simply dropping them on the floor.

Input Code

The original code boils down to simply doing:

connection.reactive().observeChannels()
 .map(cm -> {
    // ... do stuff w/message here ... 
 });

This still "works" from an API call perspective obviously, but never receives sharded messages.

Expected behavior/code

I would expect that sharded messages flow through this API as well for a consistent experience.

Environment

  • Lettuce version(s): 6.6.0.RELEASE
  • Redis version: 7.x (n/a here really)

Possible Solution

Here's the current implementation from main as of writing that creates a RedisPubSubAdapter<>:

https://github.com/redis/lettuce/blob/ec10c602890c7a10318e564335412b00cea75a82/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java#L98-L105

Could also just add an override of the smessage callback method on the RedisPubSubAdapter<> created and tunnel those through to the sink:

@Override
public void smessage(K shardChannel, V message) {
    sink.next(new ChannelMessage<>(shardChannel, message));
}

drub0y avatar Oct 30 '25 23:10 drub0y

Hello @drub0y Yes, the solution you are proposing is a possible one. However i can see that PubSubListener and PubSubClusterListener provide default implementation of those method by delegating to their non-sharded counterparts. So i wonder is it necessary at all that the adapters (Standalone and Cluster) override those methods in the first place. CC: @tishun @atakavci

a-TODO-rov avatar Oct 31 '25 10:10 a-TODO-rov

I think this was missed when we were introducing the #2758 so it is a missing functionality

After discussing this with @a-TODO-rov I think there are two general ways to solve it:

  1. Implement another set of observeShardedMessage() methods, similar to observePatterns() and observeChannels()
  2. (recommended) deprecate existing observePatterns() and observeChannels() methods and implement another one that accepts a custom RedisPubSubAdapter that would allow the user to choose which methods to listen in the sink.

tishun avatar Oct 31 '25 12:10 tishun

Hi @tishun @a-TODO-rov !

(recommended) deprecate existing observePatterns() and observeChannels() methods and implement another one that accepts a custom RedisPubSubAdapter that would allow the user to choose which methods to listen in the sink.

Would it be okay if I created a PR following the approach you mentioned ? ?

Thanks

bandalgomsu avatar Nov 01 '25 03:11 bandalgomsu

Yes, @bandalgomsu feel free to contribute to this one :)

a-TODO-rov avatar Nov 01 '25 06:11 a-TODO-rov