pulsar-client-cpp icon indicating copy to clipboard operation
pulsar-client-cpp copied to clipboard

[Bug] std::system_error thrown during/after client->subscribe(...) when using configuration setUnAckedMessagesTimeoutMs.

Open jato-c8y opened this issue 2 years ago • 10 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Version

Pulsar version 3.3. OS - Red Hat Enterprise Linux 8.9 and RHEL 9.3 and Linux 5.10.0-26-amd64

Minimal reproduce step

Our test that is observing the issue is when we are unable to connect to the server immediately on startup, but the server is reachable after a while. We attempt to subscribe, if the subscribe fails we try again. Sometimes when the subscribe eventually succeeds all is well. Sometimes when the subscribe is successful it still throws the exception. Sometimes even before the subscribe returns there is the exception.

//Rough setup.

clientConfig.setReceiverQueueSize(1000);
clientConfig.setUnAckedMessagesTimeoutMs(10000);
clientConfig.setConsumerType(pulsar::ConsumerType::ConsumerExclusive);
client = std::shared_ptr<pulsar::Client> (new pulsar::Client(serviceURL, clientConfig));
pulsar::Result result = pulsar::ResultRetryable;
pulsar::Consumer consumer;
while (true && result != pulsar::ResultOk) {
   result = client->subscribe(["A topic", "Another topic"], subscriberName, clientConfig, consumer);
   if (result != pulsar::ResultOk) {
      //sleep 5s;
   }
}

What did you expect to see?

No exception.

What did you see instead?

std::system_error thrown and not handled.

Anything else?

std::system_error thrown during/after client->subscribe(...) when using configuration setUnAckedMessagesTimeoutMs.

On attempt to subscribe and we have already set a value in the Client Configuration for setUnAckedMessagesTimeoutMs we are observing std::system_error being thrown, not handled, and not caught in ExecutorService and leads to terminating the application. setUnAckedMessagesTimeoutMs is set to 10000, as this is the minimum we have not experimented with other values. Without setUnAckedMessagesTimeoutMs set in the configuration no exception is seen on or after subscribe(..).

We suspect the exception is being thrown by std::recursive_mutex when trying to aquire the lock in: UnAckedMessageTrackerEnabled::timeoutHandlerHelper() of pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc

  1. Please can this be investigated for the cause of the exception.
  2. Please can exceptions thrown in your library background threads be caught and not terminate the application.

stderr:
terminate called after throwing an instance of 'std::system_error' what(): Invalid argument

backtrace: Program terminated with signal SIGABRT, Aborted. 0 0x00007fdef2329acf in raise () from /lib64/libc.so.6 [Current thread is 1 (Thread 0x7fddd1253700 (LWP 1875868))] 0 0x00007fdef2329acf in raise () from /lib64/libc.so.6 1 0x00007fdef22fcea5 in abort () from /lib64/libc.so.6 2 0x00007fdef68e69e3 in ::coreHandler(int, siginfo_t*, void*) () from /libapclient.so.10.15 3 4 0x00007fdef2329acf in raise () from /lib64/libc.so.6 5 0x00007fdef22fcea5 in abort () from /lib64/libc.so.6 6 0x00007fdef2eea09b in __gnu_cxx::__verbose_terminate_handler() [clone .cold.1] () from /lib64/libstdc++.so.6 7 0x00007fdef2ef054c in __cxxabiv1::__terminate(void ()()) () from /lib64/libstdc++.so.6 8 0x00007fdef2ef05a7 in std::terminate() () from /lib64/libstdc++.so.6 9 0x00007fdef2ef0808 in __cxa_throw () from /lib64/libstdc++.so.6 10 0x00007fdef2eec235 in std::__throw_system_error(int) [clone .cold.28] () from /lib64/libstdc++.so.6 11 0x00007fddd380f508 in pulsar::UnAckedMessageTrackerEnabled::timeoutHandlerHelper() () from /libconnectivity-pulsar-client.so 12 0x00007fddd380f5a9 in pulsar::UnAckedMessageTrackerEnabled::timeoutHandler() () from /libconnectivity-pulsar-client.so 13 0x00007fddd38111e2 in boost::asio::detail::wait_handler<pulsar::UnAckedMessageTrackerEnabled::timeoutHandler()::{lambda(boost::system::error_code const&) # 1}, boost::asio::any_io_executor>::do_complete(void, boost::asio::detail::scheduler_operation, boost::system::error_code const&, unsigned long) () from */libconnectivity-pulsar-client.so 14 0x00007fddd374ac38 in boost::asio::detail::scheduler::run(boost::system::error_code&) () from */libconnectivity-pulsar-client.so 15 0x00007fddd3743f92 in pulsar::ExecutorService::start()::{lambda() # 1}::operator()() const [clone .isra.334] () from */libconnectivity-pulsar-client.so 16 0x00007fdef2f1cb23 in execute_native_thread_routine () from /lib64/libstdc++.so.6 17 0x00007fdef26a81ca in start_thread () from /lib64/libpthread.so.0 18 0x00007fdef2314e73 in clone () from /lib64/libc.so.6

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

jato-c8y avatar Nov 30 '23 14:11 jato-c8y

We suspect the exception is being thrown by std::recursive_mutex when trying to aquire the lock in:

You can enable the debug level logs to verify your guess. See https://github.com/apache/pulsar-client-cpp/blob/d2094828f9fe756b315ba194e1f8a69ca24ac6b4/lib/UnAckedMessageTrackerEnabled.cc#L49

BewareMyPower avatar Dec 01 '23 12:12 BewareMyPower

I think it should be caused by https://github.com/apache/pulsar-client-cpp/blob/d2094828f9fe756b315ba194e1f8a69ca24ac6b4/lib/UnAckedMessageTrackerEnabled.cc#L38

We should catch shared_from_this() rather than & to ensure the this pointer is valid when the callback is called.

when we are unable to connect to the server immediately on startup

Is there an easy way to simulate the case? I tried starting a consumer and then starting the Pulsar standalone locally but I cannot reproduce it.

BewareMyPower avatar Dec 01 '23 12:12 BewareMyPower

We suspect the exception is being thrown by std::recursive_mutex when trying to aquire the lock in:

You can enable the debug level logs to verify your guess. See

https://github.com/apache/pulsar-client-cpp/blob/d2094828f9fe756b315ba194e1f8a69ca24ac6b4/lib/UnAckedMessageTrackerEnabled.cc#L49

pulsarLogs.txt

Please find example logs attached. We can see that the line 'LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ " ' is not logged right before the crash, but the stacktrace indicates it is thrown from within timeoutHandlerHelper pointing to the only line above this log which is https://github.com/apache/pulsar-client-cpp/blob/d2094828f9fe756b315ba194e1f8a69ca24ac6b4/lib/UnAckedMessageTrackerEnabled.cc#L48C1-L48C1

jato-c8y avatar Dec 01 '23 14:12 jato-c8y

I think it should be caused by

https://github.com/apache/pulsar-client-cpp/blob/d2094828f9fe756b315ba194e1f8a69ca24ac6b4/lib/UnAckedMessageTrackerEnabled.cc#L38

We should catch shared_from_this() rather than & to ensure the this pointer is valid when the callback is called.

when we are unable to connect to the server immediately on startup

Is there an easy way to simulate the case? I tried starting a consumer and then starting the Pulsar standalone locally but I cannot reproduce it.

I am not sure I see the link between the stacktrace and the line you think it could be caused by. Is it not being thrown from within call to line 34?: https://github.com/apache/pulsar-client-cpp/blob/d2094828f9fe756b315ba194e1f8a69ca24ac6b4/lib/UnAckedMessageTrackerEnabled.cc#L34

Unfortunately currently our code is within our application and I do not have a snippet or a standalone reproducible script to show you. I should have mentioned that the exception is not 100% reproducible. Our test is as follows. We have a Pulsar server running, we have a proxy server that we use to connect to the Pulsar server, we suspend the proxy server to simulate an inability to connect to Pulsar, we start and attempt to connect our pulsar-client-cpp to the proxy server, the client.subscribe call fails, we resume the proxy server such that the pulsar-client-cpp can now communicate with the Pulsar server.

The exception is seen around 20% of the time when I cycle the test.

jato-c8y avatar Dec 01 '23 14:12 jato-c8y

but the stacktrace indicates it is thrown from within timeoutHandlerHelper pointing to the only line above this log which is

If the UnAckedMessageTrackerEnabled object has been destroyed, locking the lock_ field that belongs to a destroyed object could be a undefined behavior.

BewareMyPower avatar Dec 04 '23 03:12 BewareMyPower

I opened a PR that will close this issue. When you have time, you can test if that patch works.

BewareMyPower avatar Dec 04 '23 13:12 BewareMyPower

Thank you for the prompt investigation and PR. I have run against this patch and I do not see a crash. There is however a slight change in behaviour. If the client.subscribe did not succeed, after a while (I think 10s) it would return with Result != ResultOk. Now the client.subscribe seemingly does not return until it succeeds, I have observed over 60s, the reconnects are still attempting, but no return from the subscribe call.
I can work with this, but is the change intentional or was it always intended to behave this way?

jato-c8y avatar Dec 05 '23 11:12 jato-c8y

Now the client.subscribe seemingly does not return until it succeeds, I have observed over 60s, the reconnects are still attempting, but no return from the subscribe call.

The current behavior should be wrong. I will take a look soon.

BewareMyPower avatar Dec 05 '23 12:12 BewareMyPower

In my local env, it failed after 30s when I ran SampleConsumer without any Pulsar server running.

2023-12-05 21:02:37.643 INFO  [0x202635e00] Client:86 | Subscribing on Topic :persistent://public/default/my-topic
2023-12-05 21:02:37.643 INFO  [0x202635e00] ClientConnection:189 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2023-12-05 21:02:37.643 INFO  [0x202635e00] ConnectionPool:114 | Created connection for pulsar://localhost:6650-0
2023-12-05 21:02:37.644 WARN  [0x16b78b000] ClientConnection:468 | [<none> -> pulsar://localhost:6650] Failed to establish connection: Connection refused
2023-12-05 21:02:37.644 WARN  [0x16b78b000] ClientConnection:468 | [<none> -> pulsar://localhost:6650] Failed to establish connection: Connection refused
2023-12-05 21:02:37.644 INFO  [0x16b78b000] ClientConnection:1317 | [<none> -> pulsar://localhost:6650] Connection disconnected (refCnt: 2)
2023-12-05 21:02:37.644 INFO  [0x16b78b000] ConnectionPool:129 | Remove connection for pulsar://localhost:6650-0
...
2023-12-05 21:03:07.684 ERROR [0x16b78b000] ClientImpl:489 | Error Checking/Getting Partition Metadata while Subscribing on persistent://public/default/my-topic -- TimeOut
2023-12-05 21:03:07.684 INFO  [0x16b78b000] ClientConnection:266 | [<none> -> pulsar://localhost:6650] Destroyed connection to pulsar://localhost:6650-0
2023-12-05 21:03:07.684 ERROR [0x202635e00] SampleConsumer:35 | Failed to subscribe: TimeOut

The timeout is the operationTimeout (30s by default) because connection timeout means the TCP connection timeout that does not include the connection refused error (ECONNREFUSED in POSIX).

@jato-sag Could you upload your client logs?

BewareMyPower avatar Dec 05 '23 13:12 BewareMyPower

I tried changing/reducing the operationTimeout, but I don't see any difference, subscribe does not return. Please see logs attached.

Also for clarity, this is the rough code snippet. void subscribeClient() { pulsar::Result result = pulsar::ResultRetryable; while (m_running && result != pulsar::ResultOk) { result = client->subscribe(...); if (result != pulsar::ResultOk) { //LOG ERROR } else { //LOG SUCCESS } } }

pulsarLogsPostPatch.txt

jato-c8y avatar Dec 05 '23 14:12 jato-c8y