pulsar-client-cpp
pulsar-client-cpp copied to clipboard
[Bug] Consumer might not subscribe the topic after keep alive timer failed
Search before asking
- [X] I searched in the issues and found nothing similar.
Version
Based on 35bf161ba25c9ea073b730e3dcdaa50c30703bcb
Minimal reproduce step
Apply the following patch and run the SampleConsumer against a standalone and a partitioned topic output with 1 partition.
diff --git a/examples/SampleConsumer.cc b/examples/SampleConsumer.cc
index bbf210d..19734c0 100644
--- a/examples/SampleConsumer.cc
+++ b/examples/SampleConsumer.cc
@@ -18,7 +18,10 @@
*/
#include <pulsar/Client.h>
+#include <atomic>
+#include <chrono>
#include <iostream>
+#include <thread>
#include "lib/LogUtils.h"
@@ -30,20 +33,25 @@ int main() {
Client client("pulsar://localhost:6650");
Consumer consumer;
- Result result = client.subscribe("persistent://public/default/my-topic", "consumer-1", consumer);
- if (result != ResultOk) {
- LOG_ERROR("Failed to subscribe: " << result);
- return -1;
+ std::atomic_int n{100};
+ {
+ ConsumerConfiguration conf;
+ conf.setMessageListener([&n](Consumer& consumer, Message msg) {
+ n--;
+ LOG_INFO("Received " << msg.getDataAsString() << " from " << msg.getMessageId());
+ consumer.acknowledge(msg);
+ });
+ auto result = client.subscribe("output", "sub", conf, consumer);
+ if (result != ResultOk) {
+ LOG_ERROR("Error subscribing: " << result);
+ return -1;
+ }
}
- Message msg;
-
- while (true) {
- consumer.receive(msg);
- LOG_INFO("Received: " << msg << " with payload '" << msg.getDataAsString() << "'");
-
- consumer.acknowledge(msg);
+ while (n > 0) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
}
+ consumer.close();
client.close();
}
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 9b78540..32a453a 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -1226,30 +1226,7 @@ void ClientConnection::handleKeepAliveTimeout() {
return;
}
- if (havePendingPingRequest_) {
- LOG_WARN(cnxString_ << "Forcing connection to close after keep-alive timeout");
- close(ResultDisconnected);
- } else {
- // Send keep alive probe to peer
- LOG_DEBUG(cnxString_ << "Sending ping message");
- havePendingPingRequest_ = true;
- sendCommand(Commands::newPing());
-
- // If the close operation has already called the keepAliveTimer_.reset() then the use_count will
- // be zero And we do not attempt to dereference the pointer.
- Lock lock(mutex_);
- if (keepAliveTimer_) {
- keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds));
- auto weakSelf = weak_from_this();
- keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleKeepAliveTimeout();
- }
- });
- }
- lock.unlock();
- }
+ close(ResultDisconnected);
}
void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec,
The patch above simulates the case that the keep alive timer failed.
Then, start a Pulsar standalone 3.3.0 and create a partitioned topic named "output":
./bin/pulsar-admin topics create-partitioned-topic output -p 1
Then, start the SampleConsumer and you will see a multi-topics consumer is created.
2024-07-31 16:29:26.540 INFO [0x16bb53000] MultiTopicsConsumerImpl:308 | Successfully Subscribed to a single partition of topic in TopicsConsumer. Partitions need to create : 0
2024-07-31 16:29:26.540 INFO [0x16bb53000] MultiTopicsConsumerImpl:150 | Successfully Subscribed to Topics
Then, wait for 30 seconds until close is called in handleKeepAliveTimeout.
What did you expect to see?
The consumer should reconnect successfully.
What did you see instead?
Client outputs:
2024-07-31 16:29:56.207 INFO [0x16bb53000] ClientConnection:1299 | [127.0.0.1:56207 -> 127.0.0.1:6650] Connection disconnected (refCnt: 3)
2024-07-31 16:29:56.207 INFO [0x16bb53000] ConnectionPool:141 | Remove connection for pulsar://localhost:6650-pulsar://localhost:6650-0
2024-07-31 16:29:56.207 INFO [0x16bb53000] HandlerBase:178 | [persistent://public/default/output-partition-0, sub, 0] Schedule reconnection in 0.1 s
2024-07-31 16:29:56.217 INFO [0x16bb53000] ClientConnection:277 | [127.0.0.1:56207 -> 127.0.0.1:6650] Destroyed connection to pulsar://localhost:6650-0
2024-07-31 16:30:26.544 INFO [0x16bc6b000] ClientConnection:188 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2024-07-31 16:30:26.545 INFO [0x16bc6b000] ConnectionPool:124 | Created connection for pulsar://localhost:6650-pulsar://localhost:6650-0
2024-07-31 16:30:26.553 INFO [0x16bb53000] ClientConnection:405 | [127.0.0.1:56218 -> 127.0.0.1:6650] Connected to broker
It scheduled the reconnection at 16:29:56. However, the new connection was created after 30 seconds at 16:30:26.
Here are the broker side logs:
2024-07-31T16:29:56,208+0800 [pulsar-io-28-3] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:56207
2024-07-31T16:29:56,212+0800 [pulsar-io-28-3] INFO org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/output-partition-0, name=sub}, consumerId=0, consumerName=47e3bc11dc, address=[id: 0x5c806b44, L:/127.0.0.1:6650 ! R:/127.0.0.1:56207] [SR:127.0.0.1, state:Connected]}
2024-07-31T16:30:26,562+0800 [pulsar-io-28-14] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:56218] connected with clientVersion=Pulsar-CPP-v3.6.0-pre, clientProtocolVersion=20, proxyVersion=null
What's worse is, though the consumer reconnected at 16:30:26, it didn't send any Subscribe requests. See the topics stats:
"subscriptions" : {
"sub" : {
"type" : "Exclusive",
"consumers" : [ ],
Anything else?
Unloading the topic could not fix this issue. It could even make the subscription type "None":
"sub" : {
"type" : "None",
Are you willing to submit a PR?
- [X] I'm willing to submit a PR!