redis-plus-plus
redis-plus-plus copied to clipboard
Create Subscriber with a connection in connection pool
So far, we always create a Subscriber
with a new connection. It might be useful to create a Subscriber
with a connection in connection pool, e.g. create a new connection is a cost.
However, it's not easy to implement. Because when Subscriber
is destructed, we need to release the connection back to the connection pool, and before we return the connection, we need to unsubscribe all channels and patterns that subscribed before, and wait for all responses from Redis. In this case, we might also need to save channels and patterns that we subscribed before, so that we can count how many unsubscribed messages we need to wait.
In a word, we need to changed lots of stuff to implement this feature.
sub = r.subscriber(); // this create a new connection
sub.psubscribe("abc-*");
while(true) {
try {
sub.consume();
} catch (const TimeoutError &e) { // catch socket timeout
continue;
} catch (const ReplyError &e) {
logger->warn("sub.consume() ReplyError: " + string(e.what()));
} catch (const std::exception &e) {
logger->error("sub.consume(): " +
string(e.what()));
try {
sub = r.subscriber(); // this create a new connection
sub.psubscribe("abc-*");
} catch (const std::exception &e) {
logger->error("failed to re-sub: " + string(e.what()));
}
}
}
When I'm running the above code, whenever the publish side throughput is quite high, the consumer will throw Failed to get reply: Server closed the connection
(this is reproducible). And based on the docs, I need to destroy the current subscriber
and create a new one.
However in my above code, after the new sub
is created, it never received the message anymore. May I know what's the right way to do it? thanks!
@qd452 It seems that your code is not complete, you missed the callback part, i.e. you need to register a callback for subscriber, before calling psubscribe
. I modified your code, and you can take a try:
sub = r.subscriber(); // this create a new connection
// HERE, WE NEED TO REGISTER A CALLBACK, AND THE CALLBACK WILL BE CALLED WHEN CONSUMING MESSAGES
sub.on__pmessage([](std::string pattern, std::string channel, std::string msg) {
cout << msg << endl;
});
sub.psubscribe("abc-*");
while(true) {
try {
sub.consume();
} catch (const TimeoutError &e) { // catch socket timeout
continue;
} catch (const ReplyError &e) {
logger->warn("sub.consume() ReplyError: " + string(e.what()));
} catch (const std::exception &e) {
logger->error("sub.consume(): " +
string(e.what()));
try {
sub = r.subscriber(); // this create a new connection
// SIMILARLY, WHEN YOU CRAETE A NEW SUBSCRIBER, YOU NEED TO REGISTER CALLBACKS BEFORE SUBSCRIBING
sub.on__pmessage([](std::string pattern, std::string channel, std::string msg) {
cout << msg << endl;
});
sub.psubscribe("abc-*");
} catch (const std::exception &e) {
logger->error("failed to re-sub: " + string(e.what()));
}
}
}
Regards
@qd452 It seems that your code is not complete, you missed the callback part, i.e. you need to register a callback for subscriber, before calling
psubscribe
. I modified your code, and you can take a try:sub = r.subscriber(); // this create a new connection // HERE, WE NEED TO REGISTER A CALLBACK, AND THE CALLBACK WILL BE CALLED WHEN CONSUMING MESSAGES sub.on__pmessage([](std::string pattern, std::string channel, std::string msg) { cout << msg << endl; }); sub.psubscribe("abc-*"); while(true) { try { sub.consume(); } catch (const TimeoutError &e) { // catch socket timeout continue; } catch (const ReplyError &e) { logger->warn("sub.consume() ReplyError: " + string(e.what())); } catch (const std::exception &e) { logger->error("sub.consume(): " + string(e.what())); try { sub = r.subscriber(); // this create a new connection // SIMILARLY, WHEN YOU CRAETE A NEW SUBSCRIBER, YOU NEED TO REGISTER CALLBACKS BEFORE SUBSCRIBING sub.on__pmessage([](std::string pattern, std::string channel, std::string msg) { cout << msg << endl; }); sub.psubscribe("abc-*"); } catch (const std::exception &e) { logger->error("failed to re-sub: " + string(e.what())); } } }
Regards
thanks for the reply, I tested and works now!