rust-rdkafka
rust-rdkafka copied to clipboard
Seeing an issue where a `StreamConsumer` hangs indefinitely in `BaseConsumer<C>::drop()`
I'm able to fairly regularly able to produce the issue using this sample program: https://github.com/penick/kafka_hang/blob/main/src/main.rs. This hang also manifests itself in the Kafka Source for Vector.
Versions: v0.28.0 and v0.29.0
Also, I noticed this comment in the code:
trace!("Destroying consumer: {:?}", self.client.native_ptr()); // TODO: fix me (multiple executions ?)
I was wondering what that meant as it might have some relevance here, but I'm not sure. After looking in the librdkafka
code I think it may be possible to hang rd_kafka_consumer_close()
if called in rapid succession.
This is the stack trace when it hangs:
Thread 9 (Thread 0x7fe4cb0a4640 (LWP 50640) "tokio-runtime-w"):
#0 __futex_abstimed_wait_common64 (private=0, cancel=true, abstime=0x0, op=393, expected=0, futex_word=0x7fe4bc034a90) at ./nptl/futex-internal.c:57
#1 __futex_abstimed_wait_common (cancel=true, private=0, abstime=0x0, clockid=0, expected=0, futex_word=0x7fe4bc034a90) at ./nptl/futex-internal.c:87
#2 __GI___futex_abstimed_wait_cancelable64 (futex_word=futex_word@entry=0x7fe4bc034a90, expected=expected@entry=0, clockid=clockid@entry=0, abstime=abstime@entry=0x0, private=private@entry=0) at ./nptl/futex-internal.c:139
#3 0x00007fe4cbf59ac1 in __pthread_cond_wait_common (abstime=0x0, clockid=0, mutex=0x7fe4bc034a40, cond=0x7fe4bc034a68) at ./nptl/pthread_cond_wait.c:503
#4 ___pthread_cond_wait (cond=0x7fe4bc034a68, mutex=0x7fe4bc034a40) at ./nptl/pthread_cond_wait.c:627
#5 0x00007fe4cbf636fd in __cnd_wait (cond=<optimized out>, mutex=<optimized out>) at ../sysdeps/pthread/cnd_wait.c:25
#6 0x00005633385fd775 in cnd_timedwait_abs (cnd=cnd@entry=0x7fe4bc034a68, mtx=mtx@entry=0x7fe4bc034a40, tspec=tspec@entry=0x7fe4cb09f550) at tinycthread_extra.c:102
#7 0x00005633385ab45f in rd_kafka_q_pop_serve (rkq=rkq@entry=0x7fe4bc034a40, timeout_us=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:412
#8 0x00005633385ab4f4 in rd_kafka_q_pop (rkq=rkq@entry=0x7fe4bc034a40, timeout_us=<optimized out>, version=version@entry=0) at rdkafka_queue.c:436
#9 0x00005633385ae437 in rd_kafka_op_req0 (destq=destq@entry=0x7fe4bc01e160, recvq=recvq@entry=0x7fe4bc034a40, rko=rko@entry=0x7fe4a43de450, timeout_ms=timeout_ms@entry=-1) at rdkafka_op.c:615
#10 0x00005633385ae491 in rd_kafka_op_req (destq=0x7fe4bc01e160, rko=0x7fe4a43de450, timeout_ms=timeout_ms@entry=-1) at rdkafka_op.c:633
#11 0x00005633385ae550 in rd_kafka_op_req2 (destq=<optimized out>, type=type@entry=RD_KAFKA_OP_GET_REBALANCE_PROTOCOL) at rdkafka_op.c:648
#12 0x00005633385ecd30 in rd_kafka_rebalance_protocol (rk=<optimized out>) at rdkafka_subscription.c:194
#13 0x000056333851ba7a in rdkafka::client::NativeClient::rebalance_protocol ()
#14 0x00005633384ff0f1 in rdkafka::consumer::ConsumerContext::rebalance ()
#15 0x00005633384e936a in _ZN7rdkafka8consumer13base_consumer19native_rebalance_cb17h46bfcc0de91aff7fE.llvm.10799211045468606124 ()
#16 0x0000563338574df9 in rd_kafka_poll_cb (rk=rk@entry=0x7fe4bc016770, rkq=rkq@entry=0x7fe4bc1c22f0, rko=rko@entry=0x7fe3ac02ea50, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, opaque=opaque@entry=0x0) at rdkafka.c:3703
#17 0x0000563338575273 in rd_kafka_consumer_close (rk=0x7fe4bc016770) at rdkafka.c:3242
#18 0x00005633384e960a in <rdkafka::consumer::base_consumer::BaseConsumer<C> as core::ops::drop::Drop>::drop ()
#19 0x00005633384fc01f in core::ptr::drop_in_place<rdkafka::consumer::stream_consumer::StreamConsumer<rdkafka_hang::KafkaStatisticsContext>> ()
#20 0x00005633384ffc8b in <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll ()
#21 0x00005633384fa4aa in tokio::runtime::task::core::CoreStage<T>::poll ()
#22 0x000056333850638d in tokio::runtime::task::harness::Harness<T,S>::poll ()
#23 0x000056333855e775 in std::thread::local::LocalKey<T>::with ()
#24 0x00005633385533dd in tokio::runtime::scheduler::multi_thread::worker::Context::run_task ()
#25 0x0000563338552690 in tokio::runtime::scheduler::multi_thread::worker::Context::run ()
#26 0x0000563338557947 in tokio::macros::scoped_tls::ScopedKey<T>::set ()
#27 0x0000563338552159 in tokio::runtime::scheduler::multi_thread::worker::run ()
#28 0x000056333854eff1 in <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll ()
#29 0x000056333853c459 in tokio::runtime::task::harness::Harness<T,S>::poll ()
#30 0x000056333854e965 in tokio::runtime::blocking::pool::Inner::run ()
#31 0x0000563338541442 in _ZN3std10sys_common9backtrace28__rust_begin_short_backtrace17hb37d7f4c32f7da34E.llvm.3147363539223746572 ()
#32 0x00005633385443ff in core::ops::function::FnOnce::call_once{{vtable.shim}} ()
#33 0x000056333879b613 in alloc::boxed::{impl#44}::call_once<(), dyn core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global> () at library/alloc/src/boxed.rs:1935
#34 alloc::boxed::{impl#44}::call_once<(), alloc::boxed::Box<dyn core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global>, alloc::alloc::Global> () at library/alloc/src/boxed.rs:1935
#35 std::sys::unix::thread::{impl#2}::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#36 0x00007fe4cbf5ab43 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#37 0x00007fe4cbfeca00 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81