rgw/amqp: lock the iterator to prevent operating same element
When sanitizer is enabled, unittest_rgw_amqp shows,
=================================================================
==1429129==ERROR: LeakSanitizer: detected memory leaks
Direct leak of 416 byte(s) in 1 object(s) allocated from:
#0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
#1 0xaaaab57eecfc in amqp_new_connection /root/ceph/src/test/rgw/amqp_mock.cc:110:12
#2 0xaaaab58095d8 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:373:16
#3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
#4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
#5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
#6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
#7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
#8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
#9 0xffffb0fb31f8 (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
#10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
#11 0xffffb0de5ed8 misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79
Direct leak of 64 byte(s) in 1 object(s) allocated from:
#0 0xaaaab5669bb8 in posix_memalign (/root/ceph/build/bin/unittest_rgw_amqp+0x189bb8) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
#1 0xaaaab57f5294 in boost::alignment::aligned_alloc(unsigned long, unsigned long) /root/ceph/build/boost/include/boost/align/detail/aligned_alloc_posix.hpp:26:9
#2 0xaaaab57f4d88 in boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul>::allocate(unsigned long, void const*) /root/ceph/build/boost/include/boost/align/aligned_allocator.hpp:70:19
#3 0xaaaab57f4204 in boost::lockfree::detail::freelist_stack<boost::lockfree::queue<amqp_basic_ack_t_>::node, boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul> >::freelist_stack<boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul> >(boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul> const&, unsigned long) /root/ceph/build/boost/include/boost/lockfree/detail/freelist.hpp:62:31
#4 0xaaaab57f3728 in boost::lockfree::queue<amqp_basic_ack_t_>::queue(unsigned long) /root/ceph/build/boost/include/boost/lockfree/queue.hpp:234:9
#5 0xaaaab57f2ea8 in amqp_connection_state_t_::amqp_connection_state_t_() /root/ceph/src/test/rgw/amqp_mock.cc:90:5
#6 0xaaaab57eed04 in amqp_new_connection /root/ceph/src/test/rgw/amqp_mock.cc:110:16
#7 0xaaaab58095d8 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:373:16
#8 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
#9 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
#10 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
#11 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
#12 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
#13 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
#14 0xffffb0fb31f8 (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
#15 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
#16 0xffffb0de5ed8 misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79
Direct leak of 64 byte(s) in 1 object(s) allocated from:
#0 0xaaaab5669bb8 in posix_memalign (/root/ceph/build/bin/unittest_rgw_amqp+0x189bb8) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
#1 0xaaaab57f5294 in boost::alignment::aligned_alloc(unsigned long, unsigned long) /root/ceph/build/boost/include/boost/align/detail/aligned_alloc_posix.hpp:26:9
#2 0xaaaab57f90bc in boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul>::allocate(unsigned long, void const*) /root/ceph/build/boost/include/boost/align/aligned_allocator.hpp:70:19
#3 0xaaaab57f8538 in boost::lockfree::detail::freelist_stack<boost::lockfree::queue<amqp_basic_nack_t_>::node, boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul> >::freelist_stack<boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul> >(boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul> const&, unsigned long) /root/ceph/build/boost/include/boost/lockfree/detail/freelist.hpp:62:31
#4 0xaaaab57f3a6c in boost::lockfree::queue<amqp_basic_nack_t_>::queue(unsigned long) /root/ceph/build/boost/include/boost/lockfree/queue.hpp:234:9
#5 0xaaaab57f2eb8 in amqp_connection_state_t_::amqp_connection_state_t_() /root/ceph/src/test/rgw/amqp_mock.cc:91:5
#6 0xaaaab57eed04 in amqp_new_connection /root/ceph/src/test/rgw/amqp_mock.cc:110:16
#7 0xaaaab58095d8 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:373:16
#8 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
#9 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
#10 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
#11 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
#12 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
#13 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
#14 0xffffb0fb31f8 (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
#15 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
#16 0xffffb0de5ed8 misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79
Direct leak of 9 byte(s) in 1 object(s) allocated from:
#0 0xaaaab56690a0 in malloc (/root/ceph/build/bin/unittest_rgw_amqp+0x1890a0) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
#1 0xaaaab57f2754 in amqp_bytes_malloc_dup /root/ceph/src/test/rgw/amqp_mock.cc:384:18
#2 0xaaaab580b4b4 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:509:28
#3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
#4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
#5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
#6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
#7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
#8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
#9 0xffffb0fb31f8 (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
#10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
#11 0xffffb0de5ed8 misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79
Indirect leak of 65536 byte(s) in 1024 object(s) allocated from:
#0 0xaaaab5669bb8 in posix_memalign (/root/ceph/build/bin/unittest_rgw_amqp+0x189bb8) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
#1 0xaaaab57f5294 in boost::alignment::aligned_alloc(unsigned long, unsigned long) /root/ceph/build/boost/include/boost/align/detail/aligned_alloc_posix.hpp:26:9
#2 0xaaaab57f4d88 in boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul>::allocate(unsigned long, void const*) /root/ceph/build/boost/include/boost/align/aligned_allocator.hpp:70:19
#3 0xaaaab57f4204 in boost::lockfree::detail::freelist_stack<boost::lockfree::queue<amqp_basic_ack_t_>::node, boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul> >::freelist_stack<boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul> >(boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul> const&, unsigned long) /root/ceph/build/boost/include/boost/lockfree/detail/freelist.hpp:62:31
#4 0xaaaab57f3728 in boost::lockfree::queue<amqp_basic_ack_t_>::queue(unsigned long) /root/ceph/build/boost/include/boost/lockfree/queue.hpp:234:9
#5 0xaaaab57f2ea8 in amqp_connection_state_t_::amqp_connection_state_t_() /root/ceph/src/test/rgw/amqp_mock.cc:90:5
#6 0xaaaab57eed04 in amqp_new_connection /root/ceph/src/test/rgw/amqp_mock.cc:110:16
#7 0xaaaab58095d8 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:373:16
#8 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
#9 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
#10 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
#11 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
#12 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
#13 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
#14 0xffffb0fb31f8 (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
#15 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
#16 0xffffb0de5ed8 misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79
Indirect leak of 65536 byte(s) in 1024 object(s) allocated from:
#0 0xaaaab5669bb8 in posix_memalign (/root/ceph/build/bin/unittest_rgw_amqp+0x189bb8) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
#1 0xaaaab57f5294 in boost::alignment::aligned_alloc(unsigned long, unsigned long) /root/ceph/build/boost/include/boost/align/detail/aligned_alloc_posix.hpp:26:9
#2 0xaaaab57f90bc in boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul>::allocate(unsigned long, void const*) /root/ceph/build/boost/include/boost/align/aligned_allocator.hpp:70:19
#3 0xaaaab57f8538 in boost::lockfree::detail::freelist_stack<boost::lockfree::queue<amqp_basic_nack_t_>::node, boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul> >::freelist_stack<boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul> >(boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul> const&, unsigned long) /root/ceph/build/boost/include/boost/lockfree/detail/freelist.hpp:62:31
#4 0xaaaab57f3a6c in boost::lockfree::queue<amqp_basic_nack_t_>::queue(unsigned long) /root/ceph/build/boost/include/boost/lockfree/queue.hpp:234:9
#5 0xaaaab57f2eb8 in amqp_connection_state_t_::amqp_connection_state_t_() /root/ceph/src/test/rgw/amqp_mock.cc:91:5
#6 0xaaaab57eed04 in amqp_new_connection /root/ceph/src/test/rgw/amqp_mock.cc:110:16
#7 0xaaaab58095d8 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:373:16
#8 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
#9 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
#10 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
#11 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
#12 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
#13 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
#14 0xffffb0fb31f8 (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
#15 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
#16 0xffffb0de5ed8 misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79
Indirect leak of 24 byte(s) in 1 object(s) allocated from:
#0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
#1 0xaaaab57eefb0 in amqp_tcp_socket_new /root/ceph/src/test/rgw/amqp_mock.cc:127:19
#2 0xaaaab5809740 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:401:14
#3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
#4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
#5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
#6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
#7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
#8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
#9 0xffffb0fb31f8 (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
#10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
#11 0xffffb0de5ed8 misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79
Indirect leak of 24 byte(s) in 1 object(s) allocated from:
#0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
#1 0xaaaab57f102c in amqp_queue_declare /root/ceph/src/test/rgw/amqp_mock.cc:283:18
#2 0xaaaab580ad14 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:480:27
#3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
#4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
#5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
#6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
#7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
#8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
#9 0xffffb0fb31f8 (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
#10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
#11 0xffffb0de5ed8 misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79
Indirect leak of 16 byte(s) in 1 object(s) allocated from:
#0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
#1 0xaaaab57f2280 in amqp_basic_consume /root/ceph/src/test/rgw/amqp_mock.cc:359:20
#2 0xaaaab580b124 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:493:29
#3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
#4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
#5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
#6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
#7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
#8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
#9 0xffffb0fb31f8 (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
#10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
#11 0xffffb0de5ed8 misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79
Indirect leak of 16 byte(s) in 1 object(s) allocated from:
#0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
#1 0xaaaab57f0214 in amqp_channel_open /root/ceph/src/test/rgw/amqp_mock.cc:213:23
#2 0xaaaab5809e78 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:448:21
#3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
#4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
#5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
#6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
#7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
#8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
#9 0xffffb0fb31f8 (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
#10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
#11 0xffffb0de5ed8 misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79
Indirect leak of 16 byte(s) in 1 object(s) allocated from:
#0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
#1 0xaaaab57f0294 in amqp_channel_open /root/ceph/src/test/rgw/amqp_mock.cc:217:21
#2 0xaaaab580a188 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:453:21
#3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
#4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
#5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
#6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
#7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
#8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
#9 0xffffb0fb31f8 (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
#10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
#11 0xffffb0de5ed8 misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79
Indirect leak of 1 byte(s) in 1 object(s) allocated from:
#0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
#1 0xaaaab57f1454 in amqp_confirm_select /root/ceph/src/test/rgw/amqp_mock.cc:291:20
#2 0xaaaab580a49c in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:458:21
#3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
#4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
#5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
#6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
#7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
#8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
#9 0xffffb0fb31f8 (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
#10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
#11 0xffffb0de5ed8 misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79
Indirect leak of 1 byte(s) in 1 object(s) allocated from:
#0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
#1 0xaaaab57f0548 in amqp_exchange_declare /root/ceph/src/test/rgw/amqp_mock.cc:231:21
#2 0xaaaab580a93c in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:466:21
#3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
#4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
#5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
#6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
#7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
#8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
#9 0xffffb0fb31f8 (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
#10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
#11 0xffffb0de5ed8 misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79
SUMMARY: AddressSanitizer: 131723 byte(s) leaked in 2059 allocation(s).
So to prevent multiple threads from operating the same element at the same time, add the lock to iterator.
Fixes: https://tracker.ceph.com/issues/66266
Contribution Guidelines
-
To sign and title your commits, please refer to Submitting Patches to Ceph.
-
If you are submitting a fix for a stable branch (e.g. "quincy"), please refer to Submitting Patches to Ceph - Backports for the proper workflow.
-
When filling out the below checklist, you may click boxes directly in the GitHub web UI. When entering or editing the entire PR message in the GitHub web UI editor, you may also select a checklist item by adding an
xbetween the brackets:[x]. Spaces and capitalization matter when checking off items this way.
Checklist
- Tracker (select at least one)
- [ ] References tracker ticket
- [ ] Very recent bug; references commit where it was introduced
- [ ] New feature (ticket optional)
- [ ] Doc update (no ticket needed)
- [ ] Code cleanup (no ticket needed)
- Component impact
- [ ] Affects Dashboard, opened tracker ticket
- [ ] Affects Orchestrator, opened tracker ticket
- [ ] No impact that needs to be tracked
- Documentation (select at least one)
- [ ] Updates relevant documentation
- [ ] No doc update is appropriate
- Tests (select at least one)
- [ ] Includes unit test(s)
- [ ] Includes integration test(s)
- [ ] Includes bug reproducer
- [ ] No tests
Show available Jenkins commands
jenkins retest this pleasejenkins test classic perfjenkins test crimson perfjenkins test signedjenkins test make checkjenkins test make check arm64jenkins test submodulesjenkins test dashboardjenkins test dashboard cephadmjenkins test apijenkins test docsjenkins render docsjenkins test ceph-volume alljenkins test ceph-volume toxjenkins test windowsjenkins test rook e2e
the address sanitizer gived errors regarding memory leaks in the internal object in the amqp library. it is probaly a cleanup bug where we do not call the proper cleanup functions. this seems unrelated to race conditions.
regarding the race condition, the usage of these iterator is done from the managers thread. the only multithreaded access is between the calls in the main loop of the manager thread and the connect() call that may emplace a new connection.
however, emplace does not invalidate any iterator. the only operation that may invalidate an iterator is erase() which is called from the same thread.
calling emplace() and erase() from 2 different threads may create a race condition, so, the erase call should be mutex protected.
the address sanitizer gived errors regarding memory leaks in the internal object in the amqp library. it is probaly a cleanup bug where we do not call the proper cleanup functions. this seems unrelated to race conditions.
Partly agreed, but what Asan reported led to why we need to fix race condition. Asan shows more like a phenomenon, we should figure out why memory leaks here.
regarding the race condition, the usage of these iterator is done from the managers thread. the only multithreaded access is between the calls in the main loop of the manager thread and the
connect()call that may emplace a new connection. however, emplace does not invalidate any iterator. the only operation that may invalidate an iterator iserase()which is called from the same thread. callingemplace()anderase()from 2 different threads may create a race condition, so, the erase call should be mutex protected.
Yeh, as well from code side, new_state should not be called from 2 different threads.
@Svelar please note that was major refactoring done for the kafka client code (rgw_kafka.cc). And, we should probably perform similar refactoring on the amqp code. would be great if we can discuss that in one of the "RGW refactoring" meeting (if you are interesyed please add to the agenda: https://pad.ceph.com/p/rgw-weekly in one of the upcoming meetings).
Hi @yuvalif, glad to hear that amqp part would also be refactored. Actually I am not familiar with these modules' code, the reason why I opened this PR is to address problems which #56537 found. It is great if this bug could also be covered in RGW team's further refactor work. Before that, this PR might be a temp resolution or any suggestion?
jenkins test make check arm64
Hi @yuvalif, glad to hear that amqp part would also be refactored. Actually I am not familiar with these modules' code, the reason why I opened this PR is to address problems which #56537 found. It is great if this bug could also be covered in RGW team's further refactor work. Before that, this PR might be a temp resolution or any suggestion?
ok. so please add a lock only for the erase case. also, you would need to check if the mem leak warning go away after the fix. otherwise, there is another issue there.
regarding the race condition, the usage of these iterator is done from the managers thread. the only multithreaded access is between the calls in the main loop of the manager thread and the connect() call that may emplace a new connection. however, emplace does not invalidate any iterator. the only operation that may invalidate an iterator is erase() which is called from the same thread. calling emplace() and erase() from 2 different threads may create a race condition, so, the erase call should be mutex protected.
ok. so please add a lock only for the erase case. also, you would need to check if the mem leak warning go away after the fix. otherwise, there is another issue there.
Hi @yuvalif , IMO the modifications I made is fine.
Without lock, the main loop of the manager thread may get the connection which was just emplaced by another thread's connect call (mark this connection as connA). Because connA's state is not ok, the main loop of the manager thread will call new_state on the connA https://github.com/ceph/ceph/blob/f1761d6239665b769e2de9948cdea14bfd7facd2/src/rgw/rgw_amqp.cc#L684, as well another thread that called connect will call new_state on the connA, which cause race condition. https://github.com/ceph/ceph/blob/f1761d6239665b769e2de9948cdea14bfd7facd2/src/rgw/rgw_amqp.cc#L888.
regarding the race condition, the usage of these iterator is done from the managers thread. the only multithreaded access is between the calls in the main loop of the manager thread and the connect() call that may emplace a new connection. however, emplace does not invalidate any iterator. the only operation that may invalidate an iterator is erase() which is called from the same thread. calling emplace() and erase() from 2 different threads may create a race condition, so, the erase call should be mutex protected. ok. so please add a lock only for the erase case. also, you would need to check if the mem leak warning go away after the fix. otherwise, there is another issue there.
Hi @yuvalif , IMO the modifications I made is fine.
Without lock, the main loop of the manager thread may get the connection which was just emplaced by another thread's
connectcall (mark this connection as connA). Because connA's state is not ok, the main loop of the manager thread will callnew_stateon the connA
you are right, but I think that right the solution is to fully create the connection before we emplace
https://github.com/ceph/ceph/blob/f1761d6239665b769e2de9948cdea14bfd7facd2/src/rgw/rgw_amqp.cc#L684
, as well another thread that called
connectwill callnew_stateon the connA, which cause race condition. https://github.com/ceph/ceph/blob/f1761d6239665b769e2de9948cdea14bfd7facd2/src/rgw/rgw_amqp.cc#L888.
multiple emplacies are protected by a mutex in the connect() function
@Svelar this is an important fix. are you still working on this PR?
@Svelar ping?
@Svelar this is an important fix. are you still working on this PR?
Will bring a new version ASAP.
@Svelar did you run the new code under ASAN? did it find any issues?
@Svelar did you run the new code under ASAN? did it find any issues?
I ran hundreds time in my local environment with ASAN on, seems like working fine.
teuthology passing: https://pulpito.ceph.com/yuvalif-2024-07-18_13:21:56-rgw:notifications-wip-yuval-66266-distro-default-smithi/