ceph icon indicating copy to clipboard operation
ceph copied to clipboard

rgw/amqp: lock the iterator to prevent operating same element

Open Svelar opened this issue 1 year ago • 11 comments

When sanitizer is enabled, unittest_rgw_amqp shows,

=================================================================
==1429129==ERROR: LeakSanitizer: detected memory leaks

Direct leak of 416 byte(s) in 1 object(s) allocated from:
    #0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
    #1 0xaaaab57eecfc in amqp_new_connection /root/ceph/src/test/rgw/amqp_mock.cc:110:12
    #2 0xaaaab58095d8 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:373:16
    #3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
    #4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
    #5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
    #6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
    #7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
    #8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
    #9 0xffffb0fb31f8  (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
    #10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
    #11 0xffffb0de5ed8  misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79

Direct leak of 64 byte(s) in 1 object(s) allocated from:
    #0 0xaaaab5669bb8 in posix_memalign (/root/ceph/build/bin/unittest_rgw_amqp+0x189bb8) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
    #1 0xaaaab57f5294 in boost::alignment::aligned_alloc(unsigned long, unsigned long) /root/ceph/build/boost/include/boost/align/detail/aligned_alloc_posix.hpp:26:9
    #2 0xaaaab57f4d88 in boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul>::allocate(unsigned long, void const*) /root/ceph/build/boost/include/boost/align/aligned_allocator.hpp:70:19
    #3 0xaaaab57f4204 in boost::lockfree::detail::freelist_stack<boost::lockfree::queue<amqp_basic_ack_t_>::node, boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul> >::freelist_stack<boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul> >(boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul> const&, unsigned long) /root/ceph/build/boost/include/boost/lockfree/detail/freelist.hpp:62:31
    #4 0xaaaab57f3728 in boost::lockfree::queue<amqp_basic_ack_t_>::queue(unsigned long) /root/ceph/build/boost/include/boost/lockfree/queue.hpp:234:9
    #5 0xaaaab57f2ea8 in amqp_connection_state_t_::amqp_connection_state_t_() /root/ceph/src/test/rgw/amqp_mock.cc:90:5
    #6 0xaaaab57eed04 in amqp_new_connection /root/ceph/src/test/rgw/amqp_mock.cc:110:16
    #7 0xaaaab58095d8 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:373:16
    #8 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
    #9 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
    #10 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
    #11 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
    #12 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
    #13 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
    #14 0xffffb0fb31f8  (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
    #15 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
    #16 0xffffb0de5ed8  misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79

Direct leak of 64 byte(s) in 1 object(s) allocated from:
    #0 0xaaaab5669bb8 in posix_memalign (/root/ceph/build/bin/unittest_rgw_amqp+0x189bb8) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
    #1 0xaaaab57f5294 in boost::alignment::aligned_alloc(unsigned long, unsigned long) /root/ceph/build/boost/include/boost/align/detail/aligned_alloc_posix.hpp:26:9
    #2 0xaaaab57f90bc in boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul>::allocate(unsigned long, void const*) /root/ceph/build/boost/include/boost/align/aligned_allocator.hpp:70:19
    #3 0xaaaab57f8538 in boost::lockfree::detail::freelist_stack<boost::lockfree::queue<amqp_basic_nack_t_>::node, boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul> >::freelist_stack<boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul> >(boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul> const&, unsigned long) /root/ceph/build/boost/include/boost/lockfree/detail/freelist.hpp:62:31
    #4 0xaaaab57f3a6c in boost::lockfree::queue<amqp_basic_nack_t_>::queue(unsigned long) /root/ceph/build/boost/include/boost/lockfree/queue.hpp:234:9
    #5 0xaaaab57f2eb8 in amqp_connection_state_t_::amqp_connection_state_t_() /root/ceph/src/test/rgw/amqp_mock.cc:91:5
    #6 0xaaaab57eed04 in amqp_new_connection /root/ceph/src/test/rgw/amqp_mock.cc:110:16
    #7 0xaaaab58095d8 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:373:16
    #8 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
    #9 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
    #10 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
    #11 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
    #12 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
    #13 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
    #14 0xffffb0fb31f8  (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
    #15 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
    #16 0xffffb0de5ed8  misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79

Direct leak of 9 byte(s) in 1 object(s) allocated from:
    #0 0xaaaab56690a0 in malloc (/root/ceph/build/bin/unittest_rgw_amqp+0x1890a0) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
    #1 0xaaaab57f2754 in amqp_bytes_malloc_dup /root/ceph/src/test/rgw/amqp_mock.cc:384:18
    #2 0xaaaab580b4b4 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:509:28
    #3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
    #4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
    #5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
    #6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
    #7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
    #8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
    #9 0xffffb0fb31f8  (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
    #10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
    #11 0xffffb0de5ed8  misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79

Indirect leak of 65536 byte(s) in 1024 object(s) allocated from:
    #0 0xaaaab5669bb8 in posix_memalign (/root/ceph/build/bin/unittest_rgw_amqp+0x189bb8) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
    #1 0xaaaab57f5294 in boost::alignment::aligned_alloc(unsigned long, unsigned long) /root/ceph/build/boost/include/boost/align/detail/aligned_alloc_posix.hpp:26:9
    #2 0xaaaab57f4d88 in boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul>::allocate(unsigned long, void const*) /root/ceph/build/boost/include/boost/align/aligned_allocator.hpp:70:19
    #3 0xaaaab57f4204 in boost::lockfree::detail::freelist_stack<boost::lockfree::queue<amqp_basic_ack_t_>::node, boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul> >::freelist_stack<boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul> >(boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_ack_t_>::node, 64ul> const&, unsigned long) /root/ceph/build/boost/include/boost/lockfree/detail/freelist.hpp:62:31
    #4 0xaaaab57f3728 in boost::lockfree::queue<amqp_basic_ack_t_>::queue(unsigned long) /root/ceph/build/boost/include/boost/lockfree/queue.hpp:234:9
    #5 0xaaaab57f2ea8 in amqp_connection_state_t_::amqp_connection_state_t_() /root/ceph/src/test/rgw/amqp_mock.cc:90:5
    #6 0xaaaab57eed04 in amqp_new_connection /root/ceph/src/test/rgw/amqp_mock.cc:110:16
    #7 0xaaaab58095d8 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:373:16
    #8 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
    #9 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
    #10 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
    #11 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
    #12 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
    #13 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
    #14 0xffffb0fb31f8  (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
    #15 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
    #16 0xffffb0de5ed8  misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79

Indirect leak of 65536 byte(s) in 1024 object(s) allocated from:
    #0 0xaaaab5669bb8 in posix_memalign (/root/ceph/build/bin/unittest_rgw_amqp+0x189bb8) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
    #1 0xaaaab57f5294 in boost::alignment::aligned_alloc(unsigned long, unsigned long) /root/ceph/build/boost/include/boost/align/detail/aligned_alloc_posix.hpp:26:9
    #2 0xaaaab57f90bc in boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul>::allocate(unsigned long, void const*) /root/ceph/build/boost/include/boost/align/aligned_allocator.hpp:70:19
    #3 0xaaaab57f8538 in boost::lockfree::detail::freelist_stack<boost::lockfree::queue<amqp_basic_nack_t_>::node, boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul> >::freelist_stack<boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul> >(boost::alignment::aligned_allocator<boost::lockfree::queue<amqp_basic_nack_t_>::node, 64ul> const&, unsigned long) /root/ceph/build/boost/include/boost/lockfree/detail/freelist.hpp:62:31
    #4 0xaaaab57f3a6c in boost::lockfree::queue<amqp_basic_nack_t_>::queue(unsigned long) /root/ceph/build/boost/include/boost/lockfree/queue.hpp:234:9
    #5 0xaaaab57f2eb8 in amqp_connection_state_t_::amqp_connection_state_t_() /root/ceph/src/test/rgw/amqp_mock.cc:91:5
    #6 0xaaaab57eed04 in amqp_new_connection /root/ceph/src/test/rgw/amqp_mock.cc:110:16
    #7 0xaaaab58095d8 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:373:16
    #8 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
    #9 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
    #10 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
    #11 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
    #12 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
    #13 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
    #14 0xffffb0fb31f8  (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
    #15 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
    #16 0xffffb0de5ed8  misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79

Indirect leak of 24 byte(s) in 1 object(s) allocated from:
    #0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
    #1 0xaaaab57eefb0 in amqp_tcp_socket_new /root/ceph/src/test/rgw/amqp_mock.cc:127:19
    #2 0xaaaab5809740 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:401:14
    #3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
    #4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
    #5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
    #6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
    #7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
    #8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
    #9 0xffffb0fb31f8  (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
    #10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
    #11 0xffffb0de5ed8  misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79

Indirect leak of 24 byte(s) in 1 object(s) allocated from:
    #0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
    #1 0xaaaab57f102c in amqp_queue_declare /root/ceph/src/test/rgw/amqp_mock.cc:283:18
    #2 0xaaaab580ad14 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:480:27
    #3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
    #4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
    #5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
    #6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
    #7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
    #8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
    #9 0xffffb0fb31f8  (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
    #10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
    #11 0xffffb0de5ed8  misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79

Indirect leak of 16 byte(s) in 1 object(s) allocated from:
    #0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
    #1 0xaaaab57f2280 in amqp_basic_consume /root/ceph/src/test/rgw/amqp_mock.cc:359:20
    #2 0xaaaab580b124 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:493:29
    #3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
    #4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
    #5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
    #6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
    #7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
    #8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
    #9 0xffffb0fb31f8  (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
    #10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
    #11 0xffffb0de5ed8  misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79

Indirect leak of 16 byte(s) in 1 object(s) allocated from:
    #0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
    #1 0xaaaab57f0214 in amqp_channel_open /root/ceph/src/test/rgw/amqp_mock.cc:213:23
    #2 0xaaaab5809e78 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:448:21
    #3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
    #4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
    #5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
    #6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
    #7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
    #8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
    #9 0xffffb0fb31f8  (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
    #10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
    #11 0xffffb0de5ed8  misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79

Indirect leak of 16 byte(s) in 1 object(s) allocated from:
    #0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
    #1 0xaaaab57f0294 in amqp_channel_open /root/ceph/src/test/rgw/amqp_mock.cc:217:21
    #2 0xaaaab580a188 in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:453:21
    #3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
    #4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
    #5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
    #6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
    #7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
    #8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
    #9 0xffffb0fb31f8  (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
    #10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
    #11 0xffffb0de5ed8  misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79

Indirect leak of 1 byte(s) in 1 object(s) allocated from:
    #0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
    #1 0xaaaab57f1454 in amqp_confirm_select /root/ceph/src/test/rgw/amqp_mock.cc:291:20
    #2 0xaaaab580a49c in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:458:21
    #3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
    #4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
    #5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
    #6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
    #7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
    #8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
    #9 0xffffb0fb31f8  (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
    #10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
    #11 0xffffb0de5ed8  misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79

Indirect leak of 1 byte(s) in 1 object(s) allocated from:
    #0 0xaaaab56a0008 in operator new(unsigned long) (/root/ceph/build/bin/unittest_rgw_amqp+0x1c0008) (BuildId: a20c317434e8d5f2ec33bbb71a69d81eb751c494)
    #1 0xaaaab57f0548 in amqp_exchange_declare /root/ceph/src/test/rgw/amqp_mock.cc:231:21
    #2 0xaaaab580a93c in rgw::amqp::new_state(rgw::amqp::connection_t*, rgw::amqp::connection_id_t const&) /root/ceph/src/rgw/rgw_amqp.cc:466:21
    #3 0xaaaab5813c70 in rgw::amqp::Manager::run() /root/ceph/src/rgw/rgw_amqp.cc:684:18
    #4 0xaaaab5849e50 in void std::__invoke_impl<void, void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(std::__invoke_memfun_deref, void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
    #5 0xaaaab5849b48 in std::__invoke_result<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>::type std::__invoke<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*>(void (rgw::amqp::Manager::*&&)() noexcept, rgw::amqp::Manager*&&) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14
    #6 0xaaaab5849978 in void std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:259:13
    #7 0xaaaab584979c in std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> >::operator()() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:266:11
    #8 0xaaaab5849420 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rgw::amqp::Manager::*)() noexcept, rgw::amqp::Manager*> > >::_M_run() /usr/bin/../lib/gcc/aarch64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
    #9 0xffffb0fb31f8  (/lib/aarch64-linux-gnu/libstdc++.so.6+0xd31f8) (BuildId: a012b2bb77110e84b266cd7425b50e57427abb02)
    #10 0xffffb0d7d5c4 in start_thread nptl/./nptl/pthread_create.c:442:8
    #11 0xffffb0de5ed8  misc/../sysdeps/unix/sysv/linux/aarch64/clone.S:79

SUMMARY: AddressSanitizer: 131723 byte(s) leaked in 2059 allocation(s).

So to prevent multiple threads from operating the same element at the same time, add the lock to iterator.

Fixes: https://tracker.ceph.com/issues/66266

Contribution Guidelines

  • To sign and title your commits, please refer to Submitting Patches to Ceph.

  • If you are submitting a fix for a stable branch (e.g. "quincy"), please refer to Submitting Patches to Ceph - Backports for the proper workflow.

  • When filling out the below checklist, you may click boxes directly in the GitHub web UI. When entering or editing the entire PR message in the GitHub web UI editor, you may also select a checklist item by adding an x between the brackets: [x]. Spaces and capitalization matter when checking off items this way.

Checklist

  • Tracker (select at least one)
    • [ ] References tracker ticket
    • [ ] Very recent bug; references commit where it was introduced
    • [ ] New feature (ticket optional)
    • [ ] Doc update (no ticket needed)
    • [ ] Code cleanup (no ticket needed)
  • Component impact
    • [ ] Affects Dashboard, opened tracker ticket
    • [ ] Affects Orchestrator, opened tracker ticket
    • [ ] No impact that needs to be tracked
  • Documentation (select at least one)
    • [ ] Updates relevant documentation
    • [ ] No doc update is appropriate
  • Tests (select at least one)
Show available Jenkins commands
  • jenkins retest this please
  • jenkins test classic perf
  • jenkins test crimson perf
  • jenkins test signed
  • jenkins test make check
  • jenkins test make check arm64
  • jenkins test submodules
  • jenkins test dashboard
  • jenkins test dashboard cephadm
  • jenkins test api
  • jenkins test docs
  • jenkins render docs
  • jenkins test ceph-volume all
  • jenkins test ceph-volume tox
  • jenkins test windows
  • jenkins test rook e2e

Svelar avatar May 28 '24 03:05 Svelar

the address sanitizer gived errors regarding memory leaks in the internal object in the amqp library. it is probaly a cleanup bug where we do not call the proper cleanup functions. this seems unrelated to race conditions.

regarding the race condition, the usage of these iterator is done from the managers thread. the only multithreaded access is between the calls in the main loop of the manager thread and the connect() call that may emplace a new connection. however, emplace does not invalidate any iterator. the only operation that may invalidate an iterator is erase() which is called from the same thread. calling emplace() and erase() from 2 different threads may create a race condition, so, the erase call should be mutex protected.

yuvalif avatar May 28 '24 15:05 yuvalif

the address sanitizer gived errors regarding memory leaks in the internal object in the amqp library. it is probaly a cleanup bug where we do not call the proper cleanup functions. this seems unrelated to race conditions.

Partly agreed, but what Asan reported led to why we need to fix race condition. Asan shows more like a phenomenon, we should figure out why memory leaks here.

regarding the race condition, the usage of these iterator is done from the managers thread. the only multithreaded access is between the calls in the main loop of the manager thread and the connect() call that may emplace a new connection. however, emplace does not invalidate any iterator. the only operation that may invalidate an iterator is erase() which is called from the same thread. calling emplace() and erase() from 2 different threads may create a race condition, so, the erase call should be mutex protected.

Yeh, as well from code side, new_state should not be called from 2 different threads.

Svelar avatar May 29 '24 03:05 Svelar

@Svelar please note that was major refactoring done for the kafka client code (rgw_kafka.cc). And, we should probably perform similar refactoring on the amqp code. would be great if we can discuss that in one of the "RGW refactoring" meeting (if you are interesyed please add to the agenda: https://pad.ceph.com/p/rgw-weekly in one of the upcoming meetings).

yuvalif avatar May 29 '24 11:05 yuvalif

Hi @yuvalif, glad to hear that amqp part would also be refactored. Actually I am not familiar with these modules' code, the reason why I opened this PR is to address problems which #56537 found. It is great if this bug could also be covered in RGW team's further refactor work. Before that, this PR might be a temp resolution or any suggestion?

Svelar avatar May 30 '24 07:05 Svelar

jenkins test make check arm64

Svelar avatar May 30 '24 08:05 Svelar

Hi @yuvalif, glad to hear that amqp part would also be refactored. Actually I am not familiar with these modules' code, the reason why I opened this PR is to address problems which #56537 found. It is great if this bug could also be covered in RGW team's further refactor work. Before that, this PR might be a temp resolution or any suggestion?

ok. so please add a lock only for the erase case. also, you would need to check if the mem leak warning go away after the fix. otherwise, there is another issue there.

yuvalif avatar May 30 '24 15:05 yuvalif

regarding the race condition, the usage of these iterator is done from the managers thread. the only multithreaded access is between the calls in the main loop of the manager thread and the connect() call that may emplace a new connection. however, emplace does not invalidate any iterator. the only operation that may invalidate an iterator is erase() which is called from the same thread. calling emplace() and erase() from 2 different threads may create a race condition, so, the erase call should be mutex protected.

ok. so please add a lock only for the erase case. also, you would need to check if the mem leak warning go away after the fix. otherwise, there is another issue there.

Hi @yuvalif , IMO the modifications I made is fine.

Without lock, the main loop of the manager thread may get the connection which was just emplaced by another thread's connect call (mark this connection as connA). Because connA's state is not ok, the main loop of the manager thread will call new_state on the connA https://github.com/ceph/ceph/blob/f1761d6239665b769e2de9948cdea14bfd7facd2/src/rgw/rgw_amqp.cc#L684, as well another thread that called connect will call new_state on the connA, which cause race condition. https://github.com/ceph/ceph/blob/f1761d6239665b769e2de9948cdea14bfd7facd2/src/rgw/rgw_amqp.cc#L888.

Svelar avatar Jun 03 '24 07:06 Svelar

regarding the race condition, the usage of these iterator is done from the managers thread. the only multithreaded access is between the calls in the main loop of the manager thread and the connect() call that may emplace a new connection. however, emplace does not invalidate any iterator. the only operation that may invalidate an iterator is erase() which is called from the same thread. calling emplace() and erase() from 2 different threads may create a race condition, so, the erase call should be mutex protected. ok. so please add a lock only for the erase case. also, you would need to check if the mem leak warning go away after the fix. otherwise, there is another issue there.

Hi @yuvalif , IMO the modifications I made is fine.

Without lock, the main loop of the manager thread may get the connection which was just emplaced by another thread's connect call (mark this connection as connA). Because connA's state is not ok, the main loop of the manager thread will call new_state on the connA

you are right, but I think that right the solution is to fully create the connection before we emplace

https://github.com/ceph/ceph/blob/f1761d6239665b769e2de9948cdea14bfd7facd2/src/rgw/rgw_amqp.cc#L684

, as well another thread that called connect will call new_state on the connA, which cause race condition. https://github.com/ceph/ceph/blob/f1761d6239665b769e2de9948cdea14bfd7facd2/src/rgw/rgw_amqp.cc#L888

.

multiple emplacies are protected by a mutex in the connect() function

yuvalif avatar Jun 03 '24 12:06 yuvalif

@Svelar this is an important fix. are you still working on this PR?

yuvalif avatar Jul 01 '24 14:07 yuvalif

@Svelar ping?

tchaikov avatar Jul 04 '24 06:07 tchaikov

@Svelar this is an important fix. are you still working on this PR?

Will bring a new version ASAP.

Svelar avatar Jul 04 '24 07:07 Svelar

@Svelar did you run the new code under ASAN? did it find any issues?

yuvalif avatar Jul 09 '24 09:07 yuvalif

@Svelar did you run the new code under ASAN? did it find any issues?

I ran hundreds time in my local environment with ASAN on, seems like working fine.

Svelar avatar Jul 10 '24 03:07 Svelar

teuthology passing: https://pulpito.ceph.com/yuvalif-2024-07-18_13:21:56-rgw:notifications-wip-yuval-66266-distro-default-smithi/

yuvalif avatar Jul 18 '24 14:07 yuvalif