redis-plus-plus icon indicating copy to clipboard operation
redis-plus-plus copied to clipboard

Create Subscriber with a connection in connection pool

Open sewenew opened this issue 3 years ago • 3 comments

So far, we always create a Subscriber with a new connection. It might be useful to create a Subscriber with a connection in connection pool, e.g. create a new connection is a cost.

However, it's not easy to implement. Because when Subscriber is destructed, we need to release the connection back to the connection pool, and before we return the connection, we need to unsubscribe all channels and patterns that subscribed before, and wait for all responses from Redis. In this case, we might also need to save channels and patterns that we subscribed before, so that we can count how many unsubscribed messages we need to wait.

In a word, we need to changed lots of stuff to implement this feature.

sewenew avatar Aug 17 '20 15:08 sewenew

sub = r.subscriber(); // this create a new connection
sub.psubscribe("abc-*");

while(true) {
     try {
            sub.consume();
        } catch (const TimeoutError &e) {  // catch socket timeout
            continue;
        } catch (const ReplyError &e) {
            logger->warn("sub.consume() ReplyError: " + string(e.what()));
        } catch (const std::exception &e) {
            logger->error("sub.consume(): " +
                          string(e.what()));
            try {
                sub = r.subscriber(); // this create a new connection
                sub.psubscribe("abc-*");
            } catch (const std::exception &e) {
                logger->error("failed to re-sub: " + string(e.what()));
            }
        }
}

When I'm running the above code, whenever the publish side throughput is quite high, the consumer will throw Failed to get reply: Server closed the connection (this is reproducible). And based on the docs, I need to destroy the current subscriber and create a new one.

However in my above code, after the new sub is created, it never received the message anymore. May I know what's the right way to do it? thanks!

qd452 avatar Oct 15 '20 07:10 qd452

@qd452 It seems that your code is not complete, you missed the callback part, i.e. you need to register a callback for subscriber, before calling psubscribe. I modified your code, and you can take a try:

sub = r.subscriber(); // this create a new connection

// HERE, WE NEED TO REGISTER A CALLBACK, AND THE CALLBACK WILL BE CALLED WHEN CONSUMING MESSAGES
sub.on__pmessage([](std::string pattern, std::string channel, std::string msg) {
                cout << msg << endl;
            });

sub.psubscribe("abc-*");

while(true) {
     try {
            sub.consume();
        } catch (const TimeoutError &e) {  // catch socket timeout
            continue;
        } catch (const ReplyError &e) {
            logger->warn("sub.consume() ReplyError: " + string(e.what()));
        } catch (const std::exception &e) {
            logger->error("sub.consume(): " +
                          string(e.what()));
            try {
                sub = r.subscriber(); // this create a new connection

                 // SIMILARLY, WHEN YOU CRAETE A NEW SUBSCRIBER, YOU NEED TO REGISTER CALLBACKS BEFORE SUBSCRIBING
                sub.on__pmessage([](std::string pattern, std::string channel, std::string msg) {
                         cout << msg << endl;
                });

                sub.psubscribe("abc-*");
            } catch (const std::exception &e) {
                logger->error("failed to re-sub: " + string(e.what()));
            }
        }
}

Regards

sewenew avatar Oct 15 '20 15:10 sewenew

@qd452 It seems that your code is not complete, you missed the callback part, i.e. you need to register a callback for subscriber, before calling psubscribe. I modified your code, and you can take a try:

sub = r.subscriber(); // this create a new connection

// HERE, WE NEED TO REGISTER A CALLBACK, AND THE CALLBACK WILL BE CALLED WHEN CONSUMING MESSAGES
sub.on__pmessage([](std::string pattern, std::string channel, std::string msg) {
                cout << msg << endl;
            });

sub.psubscribe("abc-*");

while(true) {
     try {
            sub.consume();
        } catch (const TimeoutError &e) {  // catch socket timeout
            continue;
        } catch (const ReplyError &e) {
            logger->warn("sub.consume() ReplyError: " + string(e.what()));
        } catch (const std::exception &e) {
            logger->error("sub.consume(): " +
                          string(e.what()));
            try {
                sub = r.subscriber(); // this create a new connection

                 // SIMILARLY, WHEN YOU CRAETE A NEW SUBSCRIBER, YOU NEED TO REGISTER CALLBACKS BEFORE SUBSCRIBING
                sub.on__pmessage([](std::string pattern, std::string channel, std::string msg) {
                         cout << msg << endl;
                });

                sub.psubscribe("abc-*");
            } catch (const std::exception &e) {
                logger->error("failed to re-sub: " + string(e.what()));
            }
        }
}

Regards

thanks for the reply, I tested and works now!

qd452 avatar Oct 30 '20 06:10 qd452