go-redis icon indicating copy to clipboard operation
go-redis copied to clipboard

Sharded pubsub subscriber cannot get messages from all subscribed channels

Open thx123 opened this issue 1 year ago • 9 comments

Issue tracker is used for reporting bugs and discussing new features. Please use stackoverflow for supporting issues.

Expected Behavior

Sharded pubsub subscriber should get messages from all channels that it's subscribed to.

Current Behavior

Sharded pubsub subscriber can only get messages from the shard that the first subscribed channel belongs.

Possible Solution

Have sharded pubsub subscriber listen to all shards to which its subscribed channels belong.

Steps to Reproduce

  1. Start Redis Server with cluster node on.
  2. Run the go script given below.
  3. Observe that the ssubscriber can only receive messages from a subset of channels that it has subscribed to.

Context (Environment)

This happens across different environments as long as a Redis cluster is used.

Detailed Description

Code to reproduce this bug:

package main

import (
    "context"
    "flag"
    "fmt"
    "os"

    "github.com/google/logger"
    "github.com/redis/go-redis/v9"
)

const logPath = "./ssub.log"

var verbose = flag.Bool("verbose", true, "print info level logs to stdout")

func main() {
    lf, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0660)
    if err != nil {
        logger.Fatalf("Failed to open log file: %v", err)
    }
    defer lf.Close()
    defer logger.Init("SsubLogger", *verbose, true, lf).Close()
    logger.Info("")

    // Create a new RedisCluster client
    rdb := redis.NewClusterClient(&redis.ClusterOptions{
        Addrs: []string{
            "localhost:6379",
        },
    })

    // Ping the Redis Cluster to ensure connectivity
    if err := rdb.Ping(context.Background()).Err(); err != nil {
        logger.Infof("Could not connect to Redis Cluster: %v\n", err)
    }
    logger.Info("RedisCluster client is ready")

    pubsub := rdb.SSubscribe(context.Background())
    defer pubsub.Close()

    // Ssubscribe to ch 1 - 9
    for i := 1; i <= 9; i++ {
        ch := fmt.Sprintf("ch%d", i)

        pubsub.SSubscribe(context.Background(), ch)
        logger.Info("Subscribed to ", ch)

        rdb.SPublish(context.Background(), ch, "Hello")
    }

    chss, _ := rdb.PubSubShardChannels(context.Background(), "ch*").Result()
    logger.Info("Currently subscribed channels: ", chss)

    for {
        // logger.Info("Waiting for sharded Redis pub-sub messages")
        // ReceiveMessage will crash when SSubscribe is used.
        // msg, err := pubsub.ReceiveMessage(context.Background())
        // if err != nil {
        //  logger.Info(err)
        // }
        msg := <-pubsub.Channel()
        logger.Infof("Received %s %s", msg.Channel, msg.Payload)
    }
}

Output:

INFO : 2024/09/25 15:35:14.774884 ssub.go:24:
INFO : 2024/09/25 15:35:14.781693 ssub.go:37: RedisCluster client is ready
INFO : 2024/09/25 15:35:14.782690 ssub.go:47: Subscribed to ch1
INFO : 2024/09/25 15:35:14.783934 ssub.go:47: Subscribed to ch2
INFO : 2024/09/25 15:35:14.784396 ssub.go:47: Subscribed to ch3
INFO : 2024/09/25 15:35:14.784838 ssub.go:47: Subscribed to ch4
INFO : 2024/09/25 15:35:14.785478 ssub.go:47: Subscribed to ch5
INFO : 2024/09/25 15:35:14.785714 ssub.go:47: Subscribed to ch6
INFO : 2024/09/25 15:35:14.785889 ssub.go:47: Subscribed to ch7
INFO : 2024/09/25 15:35:14.786079 ssub.go:47: Subscribed to ch8
INFO : 2024/09/25 15:35:14.786246 ssub.go:47: Subscribed to ch9
INFO : 2024/09/25 15:35:14.786489 ssub.go:53: Currently subscribed channels: []
INFO : 2024/09/25 15:35:14.786744 ssub.go:63: Received ch1 Hello
INFO : 2024/09/25 15:35:14.786808 ssub.go:63: Received ch2 Hello
INFO : 2024/09/25 15:35:14.887868 ssub.go:63: Received ch5 Hello
INFO : 2024/09/25 15:35:14.888170 ssub.go:63: Received ch6 Hello
INFO : 2024/09/25 15:35:14.989075 ssub.go:63: Received ch9 Hello

Possible Implementation

thx123 avatar Sep 25 '24 22:09 thx123

facing similar issue in sharded pubsub.

akshaykhairmode avatar Mar 25 '25 15:03 akshaykhairmode

Same here.

@ndyakov any chance you can look into it ?

ThomasAlxDmy avatar May 22 '25 03:05 ThomasAlxDmy

@ThomasAlxDmy , @akshaykhairmode , @thx123 is your assumption that SSubscribe should work with channels from different slots? Mine is that it should not, based on the https://redis.io/docs/latest/commands/ssubscribe/

ndyakov avatar Jun 17 '25 12:06 ndyakov

@ndyakov Thanks for your reply. I am using the same Redis client to psubscribe to multiple channels via separate SSubscribe calls, which complies to the https://redis.io/docs/latest/commands/ssubscribe/ doc you mentioned above. In particular:

"A client can subscribe to channels across different slots over separate SSUBSCRIBE call." Image

Could you suggest a way to use a Redis client to SSubscribe to multiple channels potentially from different slots? IIUC, Redis cluster client is designed such that a user doesn't need to be aware of the slotting information of the keys or sharded pubsub channels.

thx123 avatar Oct 31 '25 21:10 thx123

Based on the documentation, I would expect I need to make multiple SSubscribe calls (one per slot, if I don't know the slots, one per channel to be safe) so I have "separate connections". As far as I remember( not in front of a laptop at the moment) this should work.

ndyakov avatar Oct 31 '25 22:10 ndyakov

Yeah, that's basically what the code in the issue description does (multiple ssubcribe calls, one per channel). However, the Redis client is able to get messages from only a subset of channels that belongs to one shard only, while missing messages from all other channels.

thx123 avatar Oct 31 '25 23:10 thx123

@thx123 this is not what this code does. Each PubSub instance represents a single connection for handling pubsub. What this code tries to do it so subscribe to different channels reusing the same PubSub object. Try using the rdb.SSubscribe - which will create a new PubSub object, for each channel.

ndyakov avatar Nov 04 '25 12:11 ndyakov

Hi! I’m hitting the same issue and feel a bit lost— would really appreciate it if someone could help me understand the right approach.

Try using rdb.SSubscribe, which creates a new PubSub object per channel.

I have many channels (hence Sharded Pub/Sub), so this would create too many connections. I thought about connecting only to each shard and computing a hash slot in the app to pick the shard. But shouldn’t the library handle that automatically?

UPDATE: This is my version of how computers should works, tell me please where am I wrong:

For example,

  • I have a 3-node cluster:
  • On SSUBSCRIBE, I compute the hash slot for each key.
  • I group keys by shard based on the hash slot.
  • I send corresponding keys only to the corresponding shard via SSUBSCRIBE.

UPDATE2: It seems like, in my case, I have found a simple and effective approach. If I only need to subscribe to one client, I can use SSUBSCRIBE. But, if I need a list of clients (let's say 50 or more), I will have to use regular SUBSCRIBE.

UPDATE3: It seems like, SSUBSCRIBE and SUBSCRIBE working independently, so in my case it is not a solution.

Although it would be great if go-redis could handle this for me.

callmedenchick avatar Nov 18 '25 21:11 callmedenchick

+1. This works great for non-sharded pub/sub, where I am able to use a single Redis client to subscribe to many different channels, but for sharded pub/sub, this no longer works, especially in public clouds like AWS ElastiCache where there is a hard limit of 65K maxclients per cluster.

thx123 avatar Nov 18 '25 21:11 thx123