[Bug] client subscribe hang
Search before asking
- [x] I searched in the issues and found nothing similar.
Version
pulsar: 2.10.0. OS: Linux 4.19.160-0419160-generic x86_64.
Minimal reproduce step
when I close the old consumer and client, and make new client and subcribe, then sometimes hang.
What did you expect to see?
expect success
What did you see instead?
`
#0 0x00007f31b5565ad3 in futex_wait_cancelable (private=
#1 __pthread_cond_wait_common (abstime=0x0, mutex=0x5607e2a0af40, cond=0x5607e2a0af68) at pthread_cond_wait.c:502
#2 __pthread_cond_wait (cond=0x5607e2a0af68, mutex=0x5607e2a0af40) at pthread_cond_wait.c:655
#3 0x00007f319e5d68bc in std::condition_variable::wait(std::unique_lockstd::mutex&) () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#4 0x00007f314ed9b173 in pulsar::Client::subscribe(std::__cxx11::basic_string<char, std::char_traits
Anything else?
when I close the old consumer and client, and make new client and subcribe, then sometimes hang.
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
It seems that you're using Python clients. What's the client version? Could it be reproduced with the latest Python client (3.6.1)?
oh, I don't use python client. I use cpp client in pybind. So, you can see somthing with python in stack. The python version is 3.7.13.
Could it be reproduce it via a simple C++ application? What's the C++ client version you're using? The pre-built binaries or you built from source? It's recommended to use vcpkg to install now.
Hi @BewareMyPower, I think there might be a race condition in the client/consumer([email protected]). In our case, we close the consumer immediately after performing a seek operation. It seems there's a timing issue: when we try to create a new consumer right after closing the previous one, we occasionally hit a "consumer busy" error.
#include <iostream>
#include <thread>
#include <pulsar/Client.h>
using namespace std;
using namespace pulsar;
int main() {
Client client("pulsar://localhost:6650");
const auto topic = "persistent://public/default/my-topic-1";
// Create producer
Producer producer;
const auto producerRes = client.createProducer(topic, producer);
if (producerRes != ResultOk) {
cout << "Failed to create producer: " << producerRes << endl;
return 1;
}
// Create consumer
Consumer consumer;
ConsumerConfiguration consConfig;
consConfig.setMessageListener([](Consumer &consumer, const Message &msg) {
cout << msg.getMessageId() << endl;
});
std::string subName = "sub";
const auto subscribeRes = client.subscribe(topic, subName, consConfig, consumer);
if (subscribeRes != ResultOk) {
cout << "Failed to subscribe: " << subscribeRes << endl;
return 1;
}
const auto seekTS = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).
count();
std::string msgContent = "msg-content";
const Message msg = MessageBuilder().setContent("content").setProperty("x", "1").build();
producer.send(msg);
std::this_thread::sleep_for(std::chrono::seconds(3));
const auto consumer1SeekRes = consumer.seek(seekTS);
cout << "Consumer1 seek result: " << consumer1SeekRes << endl;
if (consumer1SeekRes != ResultOk) {
return 1;
}
// When the client performs the seek operation, the broker disconnects the consumer, and then the consumer re-subscribes in the background.
// If we close the consumer immediately after the seek operation, there may be a race condition.
// Uses sleep_for to reduce the race condition.
// cout << "sleep_for 5s" << endl;
// std::this_thread::sleep_for(std::chrono::seconds(5));
const auto closeRes = consumer.close();
cout << "Consumer1 close result: " << closeRes << endl;
if (closeRes != ResultOk) {
return 1;
}
cout << "Starting consumer2" << endl;
Consumer consumer2;
const auto consumer2Res = client.subscribe(topic, subName, consConfig, consumer2);
cout << "Consumer2 subscribe result: " << consumer2Res << endl; // ConsumerBusy
if (consumer2Res != ResultOk) {
return 1;
}
const auto consumer2SeekRes = consumer2.seek(seekTS);
cout << "Consumer2 seek result: " << consumer2SeekRes << endl;
if (consumer2SeekRes != ResultOk) {
return 1;
}
consumer2.close();
client.close();
return 0;
}
I cannot reproduce this issue with the code above against a standalone.
- client: 3be5267ef781b9a1928c009da1ac53dc60a12a35
- broker: 4.1.1
@nodece If you can reproduce, could you share the full logs?
@BewareMyPower
Run pulsar with standalone:
docker run --rm -it -p 6650:6650 apachepulsar/pulsar:3.0.7 bin/pulsar standalone
vcpkg.json:
{
"name": "client-cpp-test",
"version-string": "0.1.0",
"builtin-baseline": "38d1652f152d36481f2f4e8a85c0f1e14f3769f7",
"dependencies": [
{
"name": "pulsar-client-cpp",
"version>=": "3.7.0"
}
],
"overrides": [
{
"name": "zlib",
"version": "1.3.1"
}
]
}
Broker log:
2025-10-21T12:55:07,827+0000 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-1-sub] Rewind from 9:0 to 9:0
2025-10-21T12:55:07,827+0000 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.215.1:33994] Created subscription on topic persistent://public/default/my-topic-1 / sub
2025-10-21T12:55:10,869+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/my-topic-1][sub] Unable to find position for timestamp 1761051307828. Resetting cursor to first position 9:0 in ledger
2025-10-21T12:55:10,869+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=PersistentSubscription{topic=persistent://public/default/my-topic-1, name=sub}, consumerId=0, consumerName=aabe9ed0b4, address=[id: 0xf3736425, L:/192.168.215.2:6650 - R:/192.168.215.1:33994] [SR:192.168.215.1, state:Connected]}
2025-10-21T12:55:10,870+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/my-topic-1, name=sub}, consumerId=0, consumerName=aabe9ed0b4, address=[id: 0xf3736425, L:/192.168.215.2:6650 - R:/192.168.215.1:33994] [SR:192.168.215.1, state:Connected]}
2025-10-21T12:55:10,870+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/my-topic-1][sub] Successfully disconnected consumers from subscription, proceeding with cursor reset
2025-10-21T12:55:10,872+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-1] Initiate reset readPosition from 9:1 to 9:0 on cursor sub
2025-10-21T12:55:10,895+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO org.apache.pulsar.metadata.impl.AbstractMetadataStore - Deleting path: /ledgers/idgen/ID-10 (v. Optional.empty)
2025-10-21T12:55:10,897+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO org.apache.pulsar.metadata.impl.AbstractMetadataStore - Deleted path: /ledgers/idgen/ID-10 (v. Optional.empty)
2025-10-21T12:55:10,900+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [192.168.215.2:38191] for ledger: 10
2025-10-21T12:55:10,907+0000 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-1] Updated cursor sub with ledger id 10 md-position=9:-1 rd-position=9:1
2025-10-21T12:55:10,910+0000 [BookKeeperClientWorker-OrderedExecutor-5-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-1] reset readPosition to 9:0 before current read readPosition 9:1 on cursor sub
2025-10-21T12:55:10,910+0000 [BookKeeperClientWorker-OrderedExecutor-5-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.215.1:33994] [persistent://public/default/my-topic-1][sub] Reset subscription to publish time 1761051307828
2025-10-21T12:55:10,975+0000 [pulsar-io-19-14] INFO org.apache.pulsar.broker.service.ServerCnx - [[id: 0xf3736425, L:/192.168.215.2:6650 - R:/192.168.215.1:33994] [SR:192.168.215.1, state:Connected]] Subscribing on topic persistent://public/default/my-topic-1 / sub. consumerId: 0
2025-10-21T12:55:10,975+0000 [pulsar-io-19-14] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-1-sub] Rewind from 9:0 to 9:0
2025-10-21T12:55:10,975+0000 [pulsar-io-19-14] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.215.1:33994] Created subscription on topic persistent://public/default/my-topic-1 / sub
2025-10-21T12:55:10,980+0000 [pulsar-io-19-14] INFO org.apache.pulsar.broker.service.ServerCnx - [[id: 0xf3736425, L:/192.168.215.2:6650 - R:/192.168.215.1:33994] [SR:192.168.215.1, state:Connected]] Subscribing on topic persistent://public/default/my-topic-1 / sub. consumerId: 1
2025-10-21T12:55:10,984+0000 [pulsar-io-19-14] WARN org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic-1][sub] Consumer 1 aabe9ed0b4 already connected: Exclusive consumer is already connected
2025-10-21T12:55:10,986+0000 [pulsar-io-19-14] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.215.1:33994] Closing consumer: consumerId=0
2025-10-21T12:55:10,986+0000 [pulsar-io-19-14] INFO org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/my-topic-1, name=sub}, consumerId=0, consumerName=aabe9ed0b4, address=[id: 0xf3736425, L:/192.168.215.2:6650 - R:/192.168.215.1:33994] [SR:192.168.215.1, state:Connected]}
2025-10-21T12:55:10,987+0000 [pulsar-io-19-14] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.215.1:33994] Closed consumer, consumerId=0
2025-10-21T12:55:10,989+0000 [pulsar-io-19-14] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /192.168.215.1:33994
2025-10-21T12:55:24,064+0000 [ForkJoinPool.commonPool-worker-5] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [21/Oct/2025:12:55:24 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false&getEarliestTimeInBacklog=false HTTP/1.1" 200 2711 "-" "Pulsar-Java-v3.0.7" 11
2025-10-21T12:55:24,071+0000 [ForkJoinPool.commonPool-worker-5] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [21/Oct/2025:12:55:24 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false&getEarliestTimeInBacklog=false HTTP/1.1" 200 2711 "-" "Pulsar-Java-v3.0.7" 3
Client log:
2025-10-21 20:55:07.679 INFO [0x7ff85a467340] ClientConnection:193 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2025-10-21 20:55:07.679 INFO [0x7ff85a467340] ConnectionPool:124 | Created connection for pulsar://localhost:6650-pulsar://localhost:6650-0
2025-10-21 20:55:07.681 INFO [0x70000bea0000] ClientConnection:410 | [[::1]:54758 -> [::1]:6650] Connected to broker
2025-10-21 20:55:07.692 INFO [0x70000bea0000] HandlerBase:115 | [persistent://public/default/my-topic-1, ] Getting connection from pool
2025-10-21 20:55:07.744 INFO [0x70000bea0000] BinaryProtoLookupService:86 | Lookup response for persistent://public/default/my-topic-1, lookup-broker-url pulsar://localhost:6650, from [[::1]:54758 -> [::1]:6650]
2025-10-21 20:55:07.744 INFO [0x70000bea0000] ProducerImpl:148 | Creating producer for topic:persistent://public/default/my-topic-1, producerName: on [[::1]:54758 -> [::1]:6650]
2025-10-21 20:55:07.818 INFO [0x70000bea0000] ProducerImpl:220 | [persistent://public/default/my-topic-1, ] Created producer on broker [[::1]:54758 -> [::1]:6650]
2025-10-21 20:55:07.818 INFO [0x70000bea0000] HandlerBase:138 | Finished connecting to broker after 126 ms
2025-10-21 20:55:07.819 INFO [0x7ff85a467340] Client:86 | Subscribing on Topic :persistent://public/default/my-topic-1
2025-10-21 20:55:07.821 INFO [0x70000bea0000] HandlerBase:115 | [persistent://public/default/my-topic-1, sub, 0] Getting connection from pool
2025-10-21 20:55:07.822 INFO [0x70000bea0000] BinaryProtoLookupService:86 | Lookup response for persistent://public/default/my-topic-1, lookup-broker-url pulsar://localhost:6650, from [[::1]:54758 -> [::1]:6650]
2025-10-21 20:55:07.828 INFO [0x70000bea0000] ConsumerImpl:311 | [persistent://public/default/my-topic-1, sub, 0] Created consumer on broker [[::1]:54758 -> [::1]:6650]
2025-10-21 20:55:07.828 INFO [0x70000bea0000] HandlerBase:138 | Finished connecting to broker after 7 ms
(9,0,-1,0)
2025-10-21 20:55:10.846 INFO [0x7ff85a467340] ConsumerImpl:1692 | [persistent://public/default/my-topic-1, sub, 0] Seeking subscription to 1761051307828
2025-10-21 20:55:10.871 INFO [0x70000bea0000] ConsumerImpl:1262 | Broker notification of Closed consumer: 0
2025-10-21 20:55:10.871 INFO [0x70000bea0000] HandlerBase:190 | [persistent://public/default/my-topic-1, sub, 0] Schedule reconnection in 0.1 s
2025-10-21 20:55:10.911 INFO [0x70000bea0000] ConsumerImpl:1705 | [persistent://public/default/my-topic-1, sub, 0] Seek successfully
2025-10-21 20:55:10.971 INFO [0x70000bea0000] HandlerBase:115 | [persistent://public/default/my-topic-1, sub, 0] Getting connection from pool
2025-10-21 20:55:10.974 INFO [0x70000bea0000] BinaryProtoLookupService:86 | Lookup response for persistent://public/default/my-topic-1, lookup-broker-url pulsar://localhost:6650, from [[::1]:54758 -> [::1]:6650]
Consumer1 seek result: Ok
2025-10-21 20:55:10.974 INFO [0x7ff85a467340] ConsumerImpl:1288 | [persistent://public/default/my-topic-1, sub, 0] Closing consumer for topic persistent://public/default/my-topic-1
2025-10-21 20:55:10.974 INFO [0x7ff85a467340] ConsumerImpl:1272 | [persistent://public/default/my-topic-1, sub, 0] Closed consumer 0
Consumer1 close result: Ok
Starting consumer2
2025-10-21 20:55:10.974 INFO [0x7ff85a467340] Client:86 | Subscribing on Topic :persistent://public/default/my-topic-1
2025-10-21 20:55:10.976 INFO [0x70000bea0000] ConsumerImpl:311 | [persistent://public/default/my-topic-1, sub, 0] Created consumer on broker [[::1]:54758 -> [::1]:6650]
2025-10-21 20:55:10.976 INFO [0x70000bea0000] HandlerBase:138 | Finished connecting to broker after 4 ms
2025-10-21 20:55:10.977 INFO [0x70000bea0000] HandlerBase:115 | [persistent://public/default/my-topic-1, sub, 1] Getting connection from pool
2025-10-21 20:55:10.979 INFO [0x70000bea0000] BinaryProtoLookupService:86 | Lookup response for persistent://public/default/my-topic-1, lookup-broker-url pulsar://localhost:6650, from [[::1]:54758 -> [::1]:6650]
2025-10-21 20:55:10.985 WARN [0x70000bea0000] ClientConnection:1746 | [[::1]:54758 -> [::1]:6650] Received error response from server: ConsumerBusy (Exclusive consumer is already connected) -- req_id: 4
2025-10-21 20:55:10.985 ERROR [0x70000bea0000] ConsumerImpl:352 | [persistent://public/default/my-topic-1, sub, 1] Failed to create consumer: ConsumerBusy
Consumer2 subscribe result: ConsumerBusy
2025-10-21 20:55:10.986 WARN [0x7ff85a467340] ConsumerImpl:168 | [persistent://public/default/my-topic-1, sub, 0] Destroyed consumer which was not properly closed
2025-10-21 20:55:10.986 INFO [0x7ff85a467340] ConsumerImpl:176 | [persistent://public/default/my-topic-1, sub, 0] Closed consumer for race condition: 0
2025-10-21 20:55:10.986 INFO [0x7ff85a467340] ProducerImpl:755 | Producer - [persistent://public/default/my-topic-1, standalone-0-0] , [batchMessageContainer = { BatchMessageContainer [size = 0] [bytes = 0] [maxSize = 1000] [maxBytes = 131072] [topicName = persistent://public/default/my-topic-1] [numberOfBatchesSent_ = 1] [averageBatchSize_ = 0] }]
2025-10-21 20:55:10.986 INFO [0x7ff85a467340] ClientConnection:1334 | [[::1]:54758 -> [::1]:6650] Connection disconnected (refCnt: 1)
2025-10-21 20:55:10.986 INFO [0x7ff85a467340] ClientConnection:282 | [[::1]:54758 -> [::1]:6650] Destroyed connection to pulsar://localhost:6650-0
Ah, that's right, it can be reproduced by 3.7.2 client on macOS. But this issue should have been fixed in master branch. Not sure which PR fixes it for now.
Interesting, it can be reproduce on master branch as well, let me take a look