pulsar-client-cpp icon indicating copy to clipboard operation
pulsar-client-cpp copied to clipboard

[Bug] client subscribe hang

Open haolujun opened this issue 7 months ago • 3 comments

Search before asking

  • [x] I searched in the issues and found nothing similar.

Version

pulsar: 2.10.0. OS: Linux 4.19.160-0419160-generic x86_64.

Minimal reproduce step

when I close the old consumer and client, and make new client and subcribe, then sometimes hang.

What did you expect to see?

expect success

What did you see instead?

` #0 0x00007f31b5565ad3 in futex_wait_cancelable (private=, expected=0, futex_word=0x5607e2a0af90) at ../sysdeps/unix/sysv/linux/futex-internal.h:88

#1 __pthread_cond_wait_common (abstime=0x0, mutex=0x5607e2a0af40, cond=0x5607e2a0af68) at pthread_cond_wait.c:502

#2 __pthread_cond_wait (cond=0x5607e2a0af68, mutex=0x5607e2a0af40) at pthread_cond_wait.c:655

#3 0x00007f319e5d68bc in std::condition_variable::wait(std::unique_lockstd::mutex&) () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6

#4 0x00007f314ed9b173 in pulsar::Client::subscribe(std::__cxx11::basic_string<char, std::char_traits, std::allocator > const&, std::__cxx11::basic_string<char, std::char_traits, std::allocator > const&, pulsar::ConsumerConfiguration const&, pulsar::Consumer&) () from /usr/local/lib/python3.7/site-packages/xdl/dataset2/_dataset2_pyext.cpython-37m-x86_64-linux-gnu.so `

Anything else?

when I close the old consumer and client, and make new client and subcribe, then sometimes hang.

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

haolujun avatar Apr 27 '25 03:04 haolujun

It seems that you're using Python clients. What's the client version? Could it be reproduced with the latest Python client (3.6.1)?

BewareMyPower avatar Apr 27 '25 03:04 BewareMyPower

oh, I don't use python client. I use cpp client in pybind. So, you can see somthing with python in stack. The python version is 3.7.13.

haolujun avatar Apr 27 '25 04:04 haolujun

Could it be reproduce it via a simple C++ application? What's the C++ client version you're using? The pre-built binaries or you built from source? It's recommended to use vcpkg to install now.

BewareMyPower avatar Apr 27 '25 07:04 BewareMyPower

Hi @BewareMyPower, I think there might be a race condition in the client/consumer([email protected]). In our case, we close the consumer immediately after performing a seek operation. It seems there's a timing issue: when we try to create a new consumer right after closing the previous one, we occasionally hit a "consumer busy" error.

#include <iostream>
#include <thread>
#include <pulsar/Client.h>
using namespace std;
using namespace pulsar;

int main() {
    Client client("pulsar://localhost:6650");

    const auto topic = "persistent://public/default/my-topic-1";

    // Create producer
    Producer producer;
    const auto producerRes = client.createProducer(topic, producer);
    if (producerRes != ResultOk) {
        cout << "Failed to create producer: " << producerRes << endl;
        return 1;
    }

    // Create consumer
    Consumer consumer;
    ConsumerConfiguration consConfig;
    consConfig.setMessageListener([](Consumer &consumer, const Message &msg) {
        cout << msg.getMessageId() << endl;
    });
    std::string subName = "sub";
    const auto subscribeRes = client.subscribe(topic, subName, consConfig, consumer);
    if (subscribeRes != ResultOk) {
        cout << "Failed to subscribe: " << subscribeRes << endl;
        return 1;
    }

    const auto seekTS = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).
            count();

    std::string msgContent = "msg-content";
    const Message msg = MessageBuilder().setContent("content").setProperty("x", "1").build();
    producer.send(msg);
    std::this_thread::sleep_for(std::chrono::seconds(3));

    const auto consumer1SeekRes = consumer.seek(seekTS);
    cout << "Consumer1 seek result: " << consumer1SeekRes << endl;
    if (consumer1SeekRes != ResultOk) {
        return 1;
    }

    // When the client performs the seek operation, the broker disconnects the consumer, and then the consumer re-subscribes in the background.
    // If we close the consumer immediately after the seek operation, there may be a race condition.
    // Uses sleep_for to reduce the race condition.
    // cout << "sleep_for 5s" << endl;            
    // std::this_thread::sleep_for(std::chrono::seconds(5));
    const auto closeRes = consumer.close();
    cout << "Consumer1 close result: " << closeRes << endl;
    if (closeRes != ResultOk) {
        return 1;
    }

    cout << "Starting consumer2" << endl;
    Consumer consumer2;
    const auto consumer2Res = client.subscribe(topic, subName, consConfig, consumer2);
    cout << "Consumer2 subscribe result: " << consumer2Res << endl; // ConsumerBusy
    if (consumer2Res != ResultOk) {
        return 1;
    }

    const auto consumer2SeekRes = consumer2.seek(seekTS);
    cout << "Consumer2 seek result: " << consumer2SeekRes << endl;
    if (consumer2SeekRes != ResultOk) {
        return 1;
    }

    consumer2.close();
    client.close();

    return 0;
}

nodece avatar Oct 21 '25 03:10 nodece

I cannot reproduce this issue with the code above against a standalone.

  • client: 3be5267ef781b9a1928c009da1ac53dc60a12a35
  • broker: 4.1.1

@nodece If you can reproduce, could you share the full logs?

BewareMyPower avatar Oct 21 '25 07:10 BewareMyPower

@BewareMyPower

Run pulsar with standalone:

docker run --rm -it -p 6650:6650 apachepulsar/pulsar:3.0.7 bin/pulsar standalone

vcpkg.json:

{
  "name": "client-cpp-test",
  "version-string": "0.1.0",
  "builtin-baseline": "38d1652f152d36481f2f4e8a85c0f1e14f3769f7",
  "dependencies": [
    {
      "name": "pulsar-client-cpp",
      "version>=": "3.7.0"
    }
  ],
  "overrides": [
    {
      "name": "zlib",
      "version": "1.3.1"
    }
  ]
}

Broker log:

2025-10-21T12:55:07,827+0000 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-1-sub] Rewind from 9:0 to 9:0
2025-10-21T12:55:07,827+0000 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/192.168.215.1:33994] Created subscription on topic persistent://public/default/my-topic-1 / sub
2025-10-21T12:55:10,869+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/my-topic-1][sub] Unable to find position for timestamp 1761051307828. Resetting cursor to first position 9:0 in ledger
2025-10-21T12:55:10,869+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO  org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=PersistentSubscription{topic=persistent://public/default/my-topic-1, name=sub}, consumerId=0, consumerName=aabe9ed0b4, address=[id: 0xf3736425, L:/192.168.215.2:6650 - R:/192.168.215.1:33994] [SR:192.168.215.1, state:Connected]}
2025-10-21T12:55:10,870+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/my-topic-1, name=sub}, consumerId=0, consumerName=aabe9ed0b4, address=[id: 0xf3736425, L:/192.168.215.2:6650 - R:/192.168.215.1:33994] [SR:192.168.215.1, state:Connected]}
2025-10-21T12:55:10,870+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/my-topic-1][sub] Successfully disconnected consumers from subscription, proceeding with cursor reset
2025-10-21T12:55:10,872+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-1] Initiate reset readPosition from 9:1 to 9:0 on cursor sub
2025-10-21T12:55:10,895+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO  org.apache.pulsar.metadata.impl.AbstractMetadataStore - Deleting path: /ledgers/idgen/ID-10 (v. Optional.empty)
2025-10-21T12:55:10,897+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO  org.apache.pulsar.metadata.impl.AbstractMetadataStore - Deleted path: /ledgers/idgen/ID-10 (v. Optional.empty)
2025-10-21T12:55:10,900+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [192.168.215.2:38191] for ledger: 10
2025-10-21T12:55:10,907+0000 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-1] Updated cursor sub with ledger id 10 md-position=9:-1 rd-position=9:1
2025-10-21T12:55:10,910+0000 [BookKeeperClientWorker-OrderedExecutor-5-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-1] reset readPosition to 9:0 before current read readPosition 9:1 on cursor sub
2025-10-21T12:55:10,910+0000 [BookKeeperClientWorker-OrderedExecutor-5-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/192.168.215.1:33994] [persistent://public/default/my-topic-1][sub] Reset subscription to publish time 1761051307828
2025-10-21T12:55:10,975+0000 [pulsar-io-19-14] INFO  org.apache.pulsar.broker.service.ServerCnx - [[id: 0xf3736425, L:/192.168.215.2:6650 - R:/192.168.215.1:33994] [SR:192.168.215.1, state:Connected]] Subscribing on topic persistent://public/default/my-topic-1 / sub. consumerId: 0
2025-10-21T12:55:10,975+0000 [pulsar-io-19-14] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-1-sub] Rewind from 9:0 to 9:0
2025-10-21T12:55:10,975+0000 [pulsar-io-19-14] INFO  org.apache.pulsar.broker.service.ServerCnx - [/192.168.215.1:33994] Created subscription on topic persistent://public/default/my-topic-1 / sub
2025-10-21T12:55:10,980+0000 [pulsar-io-19-14] INFO  org.apache.pulsar.broker.service.ServerCnx - [[id: 0xf3736425, L:/192.168.215.2:6650 - R:/192.168.215.1:33994] [SR:192.168.215.1, state:Connected]] Subscribing on topic persistent://public/default/my-topic-1 / sub. consumerId: 1
2025-10-21T12:55:10,984+0000 [pulsar-io-19-14] WARN  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic-1][sub] Consumer 1 aabe9ed0b4 already connected: Exclusive consumer is already connected
2025-10-21T12:55:10,986+0000 [pulsar-io-19-14] INFO  org.apache.pulsar.broker.service.ServerCnx - [/192.168.215.1:33994] Closing consumer: consumerId=0
2025-10-21T12:55:10,986+0000 [pulsar-io-19-14] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/my-topic-1, name=sub}, consumerId=0, consumerName=aabe9ed0b4, address=[id: 0xf3736425, L:/192.168.215.2:6650 - R:/192.168.215.1:33994] [SR:192.168.215.1, state:Connected]}
2025-10-21T12:55:10,987+0000 [pulsar-io-19-14] INFO  org.apache.pulsar.broker.service.ServerCnx - [/192.168.215.1:33994] Closed consumer, consumerId=0
2025-10-21T12:55:10,989+0000 [pulsar-io-19-14] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /192.168.215.1:33994
2025-10-21T12:55:24,064+0000 [ForkJoinPool.commonPool-worker-5] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [21/Oct/2025:12:55:24 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false&getEarliestTimeInBacklog=false HTTP/1.1" 200 2711 "-" "Pulsar-Java-v3.0.7" 11
2025-10-21T12:55:24,071+0000 [ForkJoinPool.commonPool-worker-5] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [21/Oct/2025:12:55:24 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false&getEarliestTimeInBacklog=false HTTP/1.1" 200 2711 "-" "Pulsar-Java-v3.0.7" 3

Client log:

2025-10-21 20:55:07.679 INFO  [0x7ff85a467340] ClientConnection:193 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2025-10-21 20:55:07.679 INFO  [0x7ff85a467340] ConnectionPool:124 | Created connection for pulsar://localhost:6650-pulsar://localhost:6650-0
2025-10-21 20:55:07.681 INFO  [0x70000bea0000] ClientConnection:410 | [[::1]:54758 -> [::1]:6650] Connected to broker
2025-10-21 20:55:07.692 INFO  [0x70000bea0000] HandlerBase:115 | [persistent://public/default/my-topic-1, ] Getting connection from pool
2025-10-21 20:55:07.744 INFO  [0x70000bea0000] BinaryProtoLookupService:86 | Lookup response for persistent://public/default/my-topic-1, lookup-broker-url pulsar://localhost:6650, from [[::1]:54758 -> [::1]:6650] 
2025-10-21 20:55:07.744 INFO  [0x70000bea0000] ProducerImpl:148 | Creating producer for topic:persistent://public/default/my-topic-1, producerName: on [[::1]:54758 -> [::1]:6650] 
2025-10-21 20:55:07.818 INFO  [0x70000bea0000] ProducerImpl:220 | [persistent://public/default/my-topic-1, ] Created producer on broker [[::1]:54758 -> [::1]:6650] 
2025-10-21 20:55:07.818 INFO  [0x70000bea0000] HandlerBase:138 | Finished connecting to broker after 126 ms
2025-10-21 20:55:07.819 INFO  [0x7ff85a467340] Client:86 | Subscribing on Topic :persistent://public/default/my-topic-1
2025-10-21 20:55:07.821 INFO  [0x70000bea0000] HandlerBase:115 | [persistent://public/default/my-topic-1, sub, 0] Getting connection from pool
2025-10-21 20:55:07.822 INFO  [0x70000bea0000] BinaryProtoLookupService:86 | Lookup response for persistent://public/default/my-topic-1, lookup-broker-url pulsar://localhost:6650, from [[::1]:54758 -> [::1]:6650] 
2025-10-21 20:55:07.828 INFO  [0x70000bea0000] ConsumerImpl:311 | [persistent://public/default/my-topic-1, sub, 0] Created consumer on broker [[::1]:54758 -> [::1]:6650] 
2025-10-21 20:55:07.828 INFO  [0x70000bea0000] HandlerBase:138 | Finished connecting to broker after 7 ms
(9,0,-1,0)
2025-10-21 20:55:10.846 INFO  [0x7ff85a467340] ConsumerImpl:1692 | [persistent://public/default/my-topic-1, sub, 0]  Seeking subscription to 1761051307828
2025-10-21 20:55:10.871 INFO  [0x70000bea0000] ConsumerImpl:1262 | Broker notification of Closed consumer: 0
2025-10-21 20:55:10.871 INFO  [0x70000bea0000] HandlerBase:190 | [persistent://public/default/my-topic-1, sub, 0] Schedule reconnection in 0.1 s
2025-10-21 20:55:10.911 INFO  [0x70000bea0000] ConsumerImpl:1705 | [persistent://public/default/my-topic-1, sub, 0] Seek successfully
2025-10-21 20:55:10.971 INFO  [0x70000bea0000] HandlerBase:115 | [persistent://public/default/my-topic-1, sub, 0] Getting connection from pool
2025-10-21 20:55:10.974 INFO  [0x70000bea0000] BinaryProtoLookupService:86 | Lookup response for persistent://public/default/my-topic-1, lookup-broker-url pulsar://localhost:6650, from [[::1]:54758 -> [::1]:6650] 
Consumer1 seek result: Ok
2025-10-21 20:55:10.974 INFO  [0x7ff85a467340] ConsumerImpl:1288 | [persistent://public/default/my-topic-1, sub, 0] Closing consumer for topic persistent://public/default/my-topic-1
2025-10-21 20:55:10.974 INFO  [0x7ff85a467340] ConsumerImpl:1272 | [persistent://public/default/my-topic-1, sub, 0] Closed consumer 0
Consumer1 close result: Ok
Starting consumer2
2025-10-21 20:55:10.974 INFO  [0x7ff85a467340] Client:86 | Subscribing on Topic :persistent://public/default/my-topic-1
2025-10-21 20:55:10.976 INFO  [0x70000bea0000] ConsumerImpl:311 | [persistent://public/default/my-topic-1, sub, 0] Created consumer on broker [[::1]:54758 -> [::1]:6650] 
2025-10-21 20:55:10.976 INFO  [0x70000bea0000] HandlerBase:138 | Finished connecting to broker after 4 ms
2025-10-21 20:55:10.977 INFO  [0x70000bea0000] HandlerBase:115 | [persistent://public/default/my-topic-1, sub, 1] Getting connection from pool
2025-10-21 20:55:10.979 INFO  [0x70000bea0000] BinaryProtoLookupService:86 | Lookup response for persistent://public/default/my-topic-1, lookup-broker-url pulsar://localhost:6650, from [[::1]:54758 -> [::1]:6650] 
2025-10-21 20:55:10.985 WARN  [0x70000bea0000] ClientConnection:1746 | [[::1]:54758 -> [::1]:6650] Received error response from server: ConsumerBusy (Exclusive consumer is already connected) -- req_id: 4
2025-10-21 20:55:10.985 ERROR [0x70000bea0000] ConsumerImpl:352 | [persistent://public/default/my-topic-1, sub, 1] Failed to create consumer: ConsumerBusy
Consumer2 subscribe result: ConsumerBusy
2025-10-21 20:55:10.986 WARN  [0x7ff85a467340] ConsumerImpl:168 | [persistent://public/default/my-topic-1, sub, 0] Destroyed consumer which was not properly closed
2025-10-21 20:55:10.986 INFO  [0x7ff85a467340] ConsumerImpl:176 | [persistent://public/default/my-topic-1, sub, 0] Closed consumer for race condition: 0
2025-10-21 20:55:10.986 INFO  [0x7ff85a467340] ProducerImpl:755 | Producer - [persistent://public/default/my-topic-1, standalone-0-0] , [batchMessageContainer = { BatchMessageContainer [size = 0] [bytes = 0] [maxSize = 1000] [maxBytes = 131072] [topicName = persistent://public/default/my-topic-1] [numberOfBatchesSent_ = 1] [averageBatchSize_ = 0] }]
2025-10-21 20:55:10.986 INFO  [0x7ff85a467340] ClientConnection:1334 | [[::1]:54758 -> [::1]:6650] Connection disconnected (refCnt: 1)
2025-10-21 20:55:10.986 INFO  [0x7ff85a467340] ClientConnection:282 | [[::1]:54758 -> [::1]:6650] Destroyed connection to pulsar://localhost:6650-0

nodece avatar Oct 21 '25 12:10 nodece

Ah, that's right, it can be reproduced by 3.7.2 client on macOS. But this issue should have been fixed in master branch. Not sure which PR fixes it for now.

BewareMyPower avatar Oct 24 '25 11:10 BewareMyPower

Interesting, it can be reproduce on master branch as well, let me take a look

BewareMyPower avatar Oct 24 '25 11:10 BewareMyPower