pulsar-client-cpp icon indicating copy to clipboard operation
pulsar-client-cpp copied to clipboard

[Bug] Consumer might not subscribe the topic after keep alive timer failed

Open BewareMyPower opened this issue 1 year ago • 1 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Version

Based on 35bf161ba25c9ea073b730e3dcdaa50c30703bcb

Minimal reproduce step

Apply the following patch and run the SampleConsumer against a standalone and a partitioned topic output with 1 partition.

diff --git a/examples/SampleConsumer.cc b/examples/SampleConsumer.cc
index bbf210d..19734c0 100644
--- a/examples/SampleConsumer.cc
+++ b/examples/SampleConsumer.cc
@@ -18,7 +18,10 @@
  */
 #include <pulsar/Client.h>
 
+#include <atomic>
+#include <chrono>
 #include <iostream>
+#include <thread>
 
 #include "lib/LogUtils.h"
 
@@ -30,20 +33,25 @@ int main() {
     Client client("pulsar://localhost:6650");
 
     Consumer consumer;
-    Result result = client.subscribe("persistent://public/default/my-topic", "consumer-1", consumer);
-    if (result != ResultOk) {
-        LOG_ERROR("Failed to subscribe: " << result);
-        return -1;
+    std::atomic_int n{100};
+    {
+        ConsumerConfiguration conf;
+        conf.setMessageListener([&n](Consumer& consumer, Message msg) {
+            n--;
+            LOG_INFO("Received " << msg.getDataAsString() << " from " << msg.getMessageId());
+            consumer.acknowledge(msg);
+        });
+        auto result = client.subscribe("output", "sub", conf, consumer);
+        if (result != ResultOk) {
+            LOG_ERROR("Error subscribing: " << result);
+            return -1;
+        }
     }
 
-    Message msg;
-
-    while (true) {
-        consumer.receive(msg);
-        LOG_INFO("Received: " << msg << "  with payload '" << msg.getDataAsString() << "'");
-
-        consumer.acknowledge(msg);
+    while (n > 0) {
+        std::this_thread::sleep_for(std::chrono::seconds(1));
     }
+    consumer.close();
 
     client.close();
 }
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 9b78540..32a453a 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -1226,30 +1226,7 @@ void ClientConnection::handleKeepAliveTimeout() {
         return;
     }
 
-    if (havePendingPingRequest_) {
-        LOG_WARN(cnxString_ << "Forcing connection to close after keep-alive timeout");
-        close(ResultDisconnected);
-    } else {
-        // Send keep alive probe to peer
-        LOG_DEBUG(cnxString_ << "Sending ping message");
-        havePendingPingRequest_ = true;
-        sendCommand(Commands::newPing());
-
-        // If the close operation has already called the keepAliveTimer_.reset() then the use_count will
-        // be zero And we do not attempt to dereference the pointer.
-        Lock lock(mutex_);
-        if (keepAliveTimer_) {
-            keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds));
-            auto weakSelf = weak_from_this();
-            keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
-                auto self = weakSelf.lock();
-                if (self) {
-                    self->handleKeepAliveTimeout();
-                }
-            });
-        }
-        lock.unlock();
-    }
+    close(ResultDisconnected);
 }
 
 void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec,

The patch above simulates the case that the keep alive timer failed.

Then, start a Pulsar standalone 3.3.0 and create a partitioned topic named "output":

./bin/pulsar-admin topics create-partitioned-topic output -p 1

Then, start the SampleConsumer and you will see a multi-topics consumer is created.

2024-07-31 16:29:26.540 INFO  [0x16bb53000] MultiTopicsConsumerImpl:308 | Successfully Subscribed to a single partition of topic in TopicsConsumer. Partitions need to create : 0
2024-07-31 16:29:26.540 INFO  [0x16bb53000] MultiTopicsConsumerImpl:150 | Successfully Subscribed to Topics

Then, wait for 30 seconds until close is called in handleKeepAliveTimeout.

What did you expect to see?

The consumer should reconnect successfully.

What did you see instead?

Client outputs:

2024-07-31 16:29:56.207 INFO  [0x16bb53000] ClientConnection:1299 | [127.0.0.1:56207 -> 127.0.0.1:6650] Connection disconnected (refCnt: 3)
2024-07-31 16:29:56.207 INFO  [0x16bb53000] ConnectionPool:141 | Remove connection for pulsar://localhost:6650-pulsar://localhost:6650-0
2024-07-31 16:29:56.207 INFO  [0x16bb53000] HandlerBase:178 | [persistent://public/default/output-partition-0, sub, 0] Schedule reconnection in 0.1 s
2024-07-31 16:29:56.217 INFO  [0x16bb53000] ClientConnection:277 | [127.0.0.1:56207 -> 127.0.0.1:6650] Destroyed connection to pulsar://localhost:6650-0
2024-07-31 16:30:26.544 INFO  [0x16bc6b000] ClientConnection:188 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2024-07-31 16:30:26.545 INFO  [0x16bc6b000] ConnectionPool:124 | Created connection for pulsar://localhost:6650-pulsar://localhost:6650-0
2024-07-31 16:30:26.553 INFO  [0x16bb53000] ClientConnection:405 | [127.0.0.1:56218 -> 127.0.0.1:6650] Connected to broker

It scheduled the reconnection at 16:29:56. However, the new connection was created after 30 seconds at 16:30:26.

Here are the broker side logs:

2024-07-31T16:29:56,208+0800 [pulsar-io-28-3] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:56207
2024-07-31T16:29:56,212+0800 [pulsar-io-28-3] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/output-partition-0, name=sub}, consumerId=0, consumerName=47e3bc11dc, address=[id: 0x5c806b44, L:/127.0.0.1:6650 ! R:/127.0.0.1:56207] [SR:127.0.0.1, state:Connected]}
2024-07-31T16:30:26,562+0800 [pulsar-io-28-14] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:56218] connected with clientVersion=Pulsar-CPP-v3.6.0-pre, clientProtocolVersion=20, proxyVersion=null

What's worse is, though the consumer reconnected at 16:30:26, it didn't send any Subscribe requests. See the topics stats:

  "subscriptions" : {
    "sub" : {
      "type" : "Exclusive",
      "consumers" : [ ],

Anything else?

Unloading the topic could not fix this issue. It could even make the subscription type "None":

    "sub" : {
      "type" : "None",

Are you willing to submit a PR?

  • [X] I'm willing to submit a PR!

BewareMyPower avatar Jul 19 '24 12:07 BewareMyPower