PubSub: another Receive() with hooks
We use short-lived pubsub channels. And typical case is:
- subscribe to channel
- check key in redis or wait message in channel
- unsubscribe from channel
Sometimes this cycle is really short in time.
Problem
-
client.Receive()If using first pattern from doc viaclient.Receive()i can't guarantee that subsequentUNSUBSCRIBEcall will be done strongly afterSUBSCRIBE. Possible situation:- do
client.Receive()withSUBSCRIBEin another goroutine - do something and after conditions we decide unsubscribe from channel
- do
client.Do()withUNSUBSCRIBEcommand - goroutine with
SUBSCRIBEwake up to late and register subscribe handler in internal structures forever and handler never - receive
unsubscribemessage
- do
-
If using alternative pattern with dedicated client i have another problem - too many open connections: connection per channel. We have hundreds or even thousands channels concurrently and no one connection pool can't handle this or ensure predictable behavior.
Reproduce problem
package main
import (
"context"
"flag"
"fmt"
"time"
"github.com/redis/rueidis"
)
func main() {
var redisAddr = flag.String("redis-addr", "", "redis address")
flag.Parse()
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{*redisAddr},
DisableCache: true,
})
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
chName := "chan:{123}"
subscribeDoneCh := make(chan struct{})
go func() {
// emulate goroutine start lag
// in real case is several nanosecs but otherwise
time.Sleep(1000 * time.Millisecond)
fmt.Printf("Initiate subscribe to %s\n", chName)
err := client.Receive(ctx, client.B().Ssubscribe().Channel(chName).Build(), func(msg rueidis.PubSubMessage) {
fmt.Printf("received message: %+v\n", msg)
})
fmt.Println("Subscribe done:", err)
subscribeDoneCh <- struct{}{}
}()
fmt.Printf("Unsubscribe from %s\n", chName)
err = client.Do(ctx, client.B().Sunsubscribe().Channel(chName).Build()).NonRedisError()
if err != nil {
panic(err)
}
err = client.Do(ctx, client.B().Spublish().Channel(chName).Message("test").Build()).NonRedisError()
if err != nil {
panic(err)
}
fmt.Println("Wait subscriber is done")
<-subscribeDoneCh
}
This code stuck on
<-subscribeDoneCh
Solutions
I think if client.Receive() will have option wait subscribe success and post it knowledge somewhere here pipe.go#L691
if ch, cancel := sb.Subscribe(args); ch != nil {
defer cancel()
if err := p.Do(ctx, subscribe).Error(); err != nil {
return err
}
// HERE
subscribeDoneChan <- true
Or another option. New version of client.Receive() with additional callback function with PubSubSubscription when received (s|p)subscribe response from Redis. For example:
subscribeSuccessCh := make(chan bool)
go func() {
err := client.ReceiveWithConfirmation(
ctx,
client.B().Ssubscribe().Channel(chName).Build(),
func(msg rueidis.PubSubMessage) {
fmt.Printf("received message: %+v\n", msg)
},
func(sub rueidis.PubSubSubscription) {
if sub.Kind == "ssubscribe" {
subscribeSuccessCh <- true
}
},
)
fmt.Println("Subscribe done:", err)
subscribeDoneCh <- struct{}{}
}()
<-subscribeSuccessCh
// do something useful
fmt.Printf("Unsubscribe from %s\n", chName)
err = client.Do(ctx, client.B().Sunsubscribe().Channel(chName).Build()).NonRedisError()
if err != nil {
panic(err)
}
I will try to make PR with a new additional method.
Hi @DriverX,
Thank you for raising the issue. I do think the current Receive lacks a confirmation mechanism. However, given that the Receive will try reconnection automatically, the confirmation form is hard to design. Callbacks is a good idea. We can re-use the PubSubHooks struct for passing callbacks.
Does the possible memory leak in the title refer to the too many connections issue?
Does the possible memory leak in the title refer to the too many connections issue?
I wrote about memory leak based on this point
- goroutine with
SUBSCRIBEwake up to late and register subscribe handler in internal structures forever and handler never
UNSUBSCRIBE done before SUBSCRIBE command sent. I think in this case client.Receive() store subscriber internal data and callback function for the whole lifetime of the connection. This is potential memory leak for both sides - redis and application.
And again for a better understanding of my case =) In real case is important for us and may lead to race condition. For example:
I have 2 apps
- app_executor - receive and execute some task and store result in redis and publish notification in pubsub
- app_runner - send task to app_executor and wait notification from pubsub
And its apps works with 2 redis clusters:
- redis_store - for store data
- redis_pubsub - for notifications
Now some pseudocode
executor
task = queue.get()
result = executor.execute(task)
redis_store.set("task_result:" + task.id, result)
redis_pubsub.publish("task_result_channel:" + task.id, "1")
runner
queue.put(task)
// channel returns only after successful `SUBSCRIBE` redis response otherwise throw exception
channel = redis_pubsub.subscribe("task_result_channel:" + task.id)
result = redis_store.get("task_result:" + task.id)
if result == null {
msg = channel.wait()
result = redis_store.get("task_result:" + task.id)
}
// send `UNSUBSCRIBE` to redis_pubsub
channel.unsubscribe()
This code is race condition safe:
- executor: store result -> publish to channel
- runner: subscribe -> check store -> wait channel -> get stored result
But imagine if redis_pubsub.subscribe return channel before actually SUBSCRIBE call done, what client.Receive() do. Sequence of race:
- runner:
channel = redis_pubsub.subscribe("task_result_channel:" + task.id) - runner:
result = redis_store.get("task_result:" + task.id)- returnsnull - executor:
redis_store.set("task_result:" + task.id, result) - executor:
redis_pubsub.publish("task_result_channel:" + task.id, "1")- message dissapear because no subscribers for channel in time - runner: finally
SUBSCRIBEcommand sent to redis and we received responsesubscribe - runner:
msg = channel.wait()- wait forever :(
We can re-use the PubSubHooks struct for passing callbacks.
Sounds good!
Hi @DriverX,
Thank you for your detailed explanation. If you are using Redis > 6, I think there is a simpler solution by leveraging invalidation notifications of client-side-caching and no pubsub cluster needed:
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/redis/rueidis"
)
func main() {
var mu sync.Mutex
channels := make(map[string]chan struct{})
client, _ := rueidis.NewClient(rueidis.ClientOption{
OnInvalidations: func(messages []rueidis.RedisMessage) {
mu.Lock()
defer mu.Unlock()
for _, message := range messages {
key, _ := message.ToString()
if ch, ok := channels[key]; ok {
delete(channels, key)
close(ch)
}
}
},
})
key := "task_result:ooxx"
RETRY:
ch := make(chan struct{})
mu.Lock()
channels[key] = ch
mu.Unlock()
result, err := client.DoCache(context.Background(), client.B().Get().Key(key).Cache(), time.Second).ToString()
if rueidis.IsRedisNil(err) {
<-ch
result, err = client.DoCache(context.Background(), client.B().Get().Key(key).Cache(), time.Second).ToString()
} else {
mu.Lock()
delete(channels, key)
mu.Unlock()
}
if rueidis.IsRedisNil(err) {
goto RETRY
}
if err != nil {
panic(err)
}
fmt.Println(result)
}
If you are using Redis > 6, I think there is a simpler solution by leveraging invalidation notifications of client-side-caching and no pubsub cluster needed:
We use Redis 7.0 in production. And this is very interesting suggestion. I think about it. But now i would like to use pubsub scheme because big piece of infrastructure based on this pattern and unfortunately not all client support client side caching.
I got a similar problem too when I try to use redis pubsub to implement the nats request feature in nats.io
e.g. service A want to send a message to service B, and then wait for service B's reply
in service B step 1: service B subscribe a channel for receiving message from service A step 2: wait for the message, after the message is received and processed then it will send a reply message to service A using another channel
In service A step 1: service A subscribe a channel for receiving reply message from service B, process the reply message if received step 2: service A send a message to service B
In service A, now I need to ensure the step 2 is after step 1. But client.Receive is blocking so I cannot make sure that.
Can we pass a function to client.Receive so that it will run the function after it subscribed to the channel?
Hi @TommyLeng,
We don’t have and ETA on this feature, but I do hope we and a ReceiveHooks or extending the existing Receive to accept a second callback that will be triggered whenever a subscription is confirmed. If we do that, one thing should be awarded is that the confirmation callback will be called more than once due to the auto recovery mechanism of Receive in the case of disconnection.
Talking about the case of disconnection, it is also the reason why generally using Redis PubSub to implement asynchronous RPC is not recommended. Messages can be lost in this case. A recommended way is using the BLPOP and LPUSH instead.
Hi @DriverX Are you still tackling this issue? If not, I’d be happy to jump in
Hi @DriverX, @TommyLeng,
In order not to break the current interface, I plan to introduce rueidis.WithOnSubscriptionHook, allowing you to carry a confirmation hook with the ctx.
Example Usage
Use rueidis.WithOnSubscriptionHook when you need to observe subscribe / unsubscribe confirmations that the server sends. The hook can be triggered multiple times because the client may automatically reconnect and resubscribe.
ctx := rueidis.WithOnSubscriptionHook(context.Background(), func(s rueidis.PubSubSubscription) {
// This hook runs in the pipeline goroutine. If you need to perform
// heavy work or invoke additional commands, do it in another
// goroutine to avoid blocking the pipeline, for example:
// go func() {
// // long work or client.Do(...)
// }()
fmt.Printf("%s %s (count %d)\n", s.Kind, s.Channel, s.Count)
})
err := client.Receive(ctx, client.B().Subscribe().Channel("news").Build(), func(m rueidis.PubSubMessage) {
// ...
})
See https://github.com/redis/rueidis/pull/846 for more details. Any feedback is appreciated.