go-redis icon indicating copy to clipboard operation
go-redis copied to clipboard

PSubscribe selects only one node in Cluster mode

Open dzvancuks opened this issue 2 years ago • 1 comments

PSubscribe is subscribing on one slot, one node. Pattern could not be predicted beforehand, thus subscription should be done for all nodes.

Expected Behavior

Since PSubscribe is creating subscription based on pattern then channel should be ready to receive changes from any cluster node.

Current Behavior

In cluster mode client attempts to calculate node based on pattern string. It will never work as pattern characters *? will be replaced with different key value.

https://github.com/redis/go-redis/blob/master/osscluster.go#L1663

Possible Solution

  1. Cluster mode client should create subscription for all master nodes
  2. Aggregate all connection output into 1 channel
  3. On cluster scale and/or slot resharding connection should be updated. In the end channel output should hide all cluster implementation details.

Steps to Reproduce

  1. Create a cluster.
  2. Turn on key change notifications. Send config set notify-keyspace-events KEA for every node
  3. Create PubSub channel by pattern
chan := client.PSubscribe(context.TODO(), __keyspace@0__:MyKeyPattern*).Channel()
  1. set data
Set MyKeyPattern123 123
Set MyKeyPattern456 456

It is expected that keys will be populated in different slots. It depends on your cluster setup

Context (Environment)

Application tries to reduce amount of traffic to DB. It triggers value update only on specific key pattern change only if such key is added/removed in DB. Solution works in regular single node approach, but fails in cluster mode.

Detailed Description

Possible Implementation

dzvancuks avatar Nov 02 '23 16:11 dzvancuks

There is a workaround for cluster mode PSubscribe issue by using ForEachMaster. This creates regular client and it uses regular connection directly to every node: https://github.com/redis/go-redis/blob/master/redis.go#L708 . This solution creates multiple channels that is aggregated into single output channel.

	channels := []<-chan *redis.Message{}
	buff.client.ForEachMaster(context.TODO(), func(ctx context.Context, client *redis.Client) error {
		channels = append(channels, client.PSubscribe(context.TODO(), __keyspace@0__:MyKeyPattern*).Channel())
		return nil
	})
	buff.pubSubCh = make(chan *redis.Message, len(channels)) // avoid blocking on write by using buffered channel
	for _, ch := range channels {
		go func(c <-chan *redis.Message) {
			for message := range c {
				buff.pubSubCh <- message
			}
		}(ch)
	}

It would be nice if Cluster client would do it under the hood. As a go-redis library user I would assume this is how it works if it provides PSubscribe interface https://github.com/redis/go-redis/blob/master/osscluster.go#L1703

dzvancuks avatar Nov 03 '23 09:11 dzvancuks