pulsar-client-python
pulsar-client-python copied to clipboard
bad_weak_ptr will be thrown if client.close() is not called after ResultInterrupted is caught
Reproduce:
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', "my-sub")
while True:
try:
msg = consumer.receive()
consumer.acknowledge(msg)
except Exception as e:
print("Exception: {}".format(type(e)))
break
Press Ctrl+C and you will see:
^CException: <class '_pulsar.Interrupted'>
2023-03-09 22:32:39.870 WARN [139780853032768] ConsumerImpl:126 | [persistent://public/default/my-topic, my-sub, 0] Destroyed consumer which was not properly closed
2023-03-09 22:32:39.870 INFO [139780853032768] ConsumerImpl:134 | [persistent://public/default/my-topic, my-sub, 0] Closed consumer for race condition: 0
terminate called after throwing an instance of 'std::bad_weak_ptr'
what(): bad_weak_ptr
Aborted
After enabling the debug level logs, you will see:
2023-03-09 22:33:41.191 DEBUG [140614849333056] ConsumerImpl:120 | [persistent://public/default/my-topic, my-sub, 0] ~ConsumerImpl
2023-03-09 22:33:41.191 WARN [140614849333056] ConsumerImpl:126 | [persistent://public/default/my-topic, my-sub, 0] Destroyed consumer which was not properly closed
2023-03-09 22:33:41.191 INFO [140614849333056] ConsumerImpl:134 | [persistent://public/default/my-topic, my-sub, 0] Closed consumer for race condition: 0
2023-03-09 22:33:41.191 DEBUG [140614849333056] AckGroupingTrackerEnabled:111 | Reference to the HandlerBase is not valid.
terminate called after throwing an instance of 'std::bad_weak_ptr'
what(): bad_weak_ptr
2023-03-09 22:33:41.193 DEBUG [140614821328640] ClientConnection:814 | [127.0.0.1:39724 -> 127.0.0.1:6650] Handling incoming command: SUCCESS
2023-03-09 22:33:41.193 DEBUG [140614821328640] ClientConnection:904 | [127.0.0.1:39724 -> 127.0.0.1:6650] Received success response from server. req_id: 1
Aborted
The stack:
#7 0x00007ffff68be5aa in _Unwind_Resume () from /lib/x86_64-linux-gnu/libgcc_s.so.1
#8 0x00007ffff6ba7a99 in pulsar::ConsumerImpl::shutdown() [clone .cold] ()
from /usr/local/lib/python3.8/dist-packages/pulsar_client.libs/libpulsar-9425ac2e.so
#9 0x00007ffff6c98d74 in pulsar::ConsumerImpl::~ConsumerImpl() () from /usr/local/lib/python3.8/dist-packages/pulsar_client.libs/libpulsar-9425ac2e.so
#10 0x00007ffff7524bfa in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() ()
from /usr/local/lib/python3.8/dist-packages/_pulsar.cpython-38-x86_64-linux-gnu.so
#11 0x00007ffff755b28c in pybind11::class_<pulsar::Consumer>::dealloc(pybind11::detail::value_and_holder&) ()
from /usr/local/lib/python3.8/dist-packages/_pulsar.cpython-38-x86_64-linux-gnu.so
#12 0x00007ffff753240e in pybind11::detail::clear_instance(_object*) () from /usr/local/lib/python3.8/dist-packages/_pulsar.cpython-38-x86_64-linux-gnu.so
#13 0x00007ffff753305f in pybind11_object_dealloc () from /usr/local/lib/python3.8/dist-packages/_pulsar.cpython-38-x86_64-linux-gnu.so
Summary
This issue is hard to avoid because it happens when the destructor of a std::shared_ptr is called from the libpulsar.so while libpulsar.so is already unloaded by the Python interpreter.
Analysis
We can see the crash happened in ConsumerImpl::shutdown:
#10 std::shared_ptr<pulsar::ClientImpl>::~shared_ptr (this=<optimized out>, __in_chrg=<optimized out>) at /usr/include/c++/9/bits/shared_ptr.h:103
#11 pulsar::ConsumerImpl::shutdown (this=<optimized out>) at /home/xyz/github.com/apache/pulsar-client-cpp/lib/ConsumerImpl.cc:1267
#12 0x00007ffff7123a5e in pulsar::ConsumerImpl::~ConsumerImpl (this=0x7ffff0003250, __in_chrg=<optimized out>)
at /home/xyz/github.com/apache/pulsar-client-cpp/lib/ConsumerImpl.cc:164
https://github.com/apache/pulsar-client-cpp/blob/1dad87bb3b804d2aa8542ac48e4c35228ac2f1bf/lib/ConsumerImpl.cc#L1267
auto client = client_.lock();
Even if I commented the client_.lock() related code, it now happened when constructing a Message object in shutdown:
#10 std::shared_ptr<pulsar::MessageImpl>::~shared_ptr (this=<optimized out>, __in_chrg=<optimized out>) at /usr/include/c++/9/bits/shared_ptr.h:103
#11 pulsar::Message::~Message (this=<optimized out>, __in_chrg=<optimized out>) at /home/xyz/github.com/apache/pulsar-client-cpp/include/pulsar/Message.h:43
#12 pulsar::ConsumerImpl::failPendingReceiveCallback (this=<optimized out>) at /home/xyz/github.com/apache/pulsar-client-cpp/lib/ConsumerImpl.cc:622
#13 0x00007ffff7122cd0 in pulsar::ConsumerImpl::shutdown (this=0x7ffff0003250) at /home/xyz/github.com/apache/pulsar-client-cpp/lib/ConsumerImpl.cc:1274
https://github.com/apache/pulsar-client-cpp/blob/1dad87bb3b804d2aa8542ac48e4c35228ac2f1bf/lib/ConsumerImpl.cc#L622
Message msg;