fmtlog icon indicating copy to clipboard operation
fmtlog copied to clipboard

Why does SPSCVarQueueOPT guarantee thread safety?

Open JackYoung opened this issue 1 year ago • 2 comments

I see that the SPSCVarQueueOPT used in the library does not use atomic while pushing and poping. Does this guarantee thread safety?

JackYoung avatar Sep 13 '22 08:09 JackYoung

I was wondering the same thing, helgrind reports shows that there may be a RC

valgrind --tool=helgrind --read-var-info=yes ./fmtlog_threadsafe

I did a test, with fmtlog::startPollingThread and several threads generating logs, and I found this

==49069== Possible data race during read of size 4 at 0x5F86498 by thread #2
==49069== Locks held: none
==49069==    at 0xE05904: fmtlogT<0>::SPSCVarQueueOPT::pop() (fmtlog.h:220)
==49069==    by 0xE1C00D: fmtlogDetailT<0>::poll(bool) (fmtlog-inl.h:455)
==49069==    by 0xE1CFF4: fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}::operator()() const (fmtlog-inl.h:330)
==49069==    by 0xE55E72: void std::__invoke_impl<void, fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}>(std::__invoke_other, fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}&&) (invoke.h:60)
==49069==    by 0xE55D7B: std::__invoke_result<fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}>::type std::__invoke<fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}>(std::__invoke_result&&, (fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}&&)...) (invoke.h:95)
==49069==    by 0xE55C5B: void std::thread::_Invoker<std::tuple<fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}> >::_M_invoke<0ul>(std::_Index_tuple<0ul>) (thread:244)
==49069==    by 0xE55B25: std::thread::_Invoker<std::tuple<fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}> >::operator()() (thread:251)
==49069==    by 0xE55455: std::thread::_State_impl<std::thread::_Invoker<std::tuple<fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}> > >::_M_run() (thread:195)
==49069==    by 0x4CC2DE3: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.28)
==49069==    by 0x4842B1A: ??? (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_helgrind-amd64-linux.so)
==49069==    by 0x4BCB608: start_thread (pthread_create.c:477)
==49069==    by 0x5059162: clone (clone.S:95)
==49069== 
==49069== This conflicts with a previous write of size 4 by thread #1
==49069== Locks held: none
==49069==    at 0xE05870: fmtlogT<0>::SPSCVarQueueOPT::alloc(unsigned int) (fmtlog.h:205)
==49069==    by 0xE05300: fmtlogT<0>::allocMsg(unsigned int) (fmtlog-inl.h:516)
==49069==    by 0xF3A0F5: void fmtlogT<0>::log<char const*, char const*>(unsigned int&, long, char const*, fmtlogT<0>::LogLevel, fmt::v8::basic_format_string<char, fmt::v8::type_identity<fmtlogdetail::UnrefPtr<std::remove_cv<std::remove_reference<char const*>::type>::type>::type>::type, fmt::v8::type_identity<fmtlogdetail::UnrefPtr<std::remove_cv<std::remove_reference<char const*>::type>::type>::type>::type>, char const*&&, char const*&&) (fmtlog.h:637)
==49069==    by 0xF37C59: hlp::configureParserMappings(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) (hlp.cpp:76)
==49069==    by 0xE75059: cmd::test(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int, int, bool, std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, char, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) (cmdTest.cpp:55)
==49069==    by 0xDDFBD8: main (main.cpp:227)
==49069==  Address 0x5f86498 is 152 bytes inside a block of size 1,048,960 alloc'd
==49069==    at 0x483F3E0: memalign (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_helgrind-amd64-linux.so)
==49069==    by 0x4C96BF5: operator new(unsigned long, std::align_val_t) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.28)
==49069==    by 0xE1B997: fmtlogDetailT<0>::preallocate() (fmtlog-inl.h:281)
==49069==    by 0xE04B47: fmtlogT<0>::preallocate() (fmtlog-inl.h:527)
==49069==    by 0xE052E5: fmtlogT<0>::allocMsg(unsigned int) (fmtlog-inl.h:515)
==49069==    by 0xF3A0F5: void fmtlogT<0>::log<char const*, char const*>(unsigned int&, long, char const*, fmtlogT<0>::LogLevel, fmt::v8::basic_format_string<char, fmt::v8::type_identity<fmtlogdetail::UnrefPtr<std::remove_cv<std::remove_reference<char const*>::type>::type>::type>::type, fmt::v8::type_identity<fmtlogdetail::UnrefPtr<std::remove_cv<std::remove_reference<char const*>::type>::type>::type>::type>, char const*&&, char const*&&) (fmtlog.h:637)
==49069==    by 0xF37C59: hlp::configureParserMappings(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) (hlp.cpp:76)
==49069==    by 0xE75059: cmd::test(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int, int, bool, std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, char, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) (cmdTest.cpp:55)
==49069==    by 0xDDFBD8: main (main.cpp:227)
==49069==  Block was alloc'd by thread #1
==49069== 

juliancnn avatar Sep 19 '22 17:09 juliancnn

It can also be observed with thread sanitizer:

WARNING: ThreadSanitizer: data race (pid=92208)
  Read of size 4 at 0x7f6f05f24940 by thread T1:
    #0 fmtlogT<0>::SPSCVarQueueOPT::front() /root/repos/wazuh/src/engine/build/_deps/fmtlog-src/fmtlog.h:210 (main+0xd085e3)
    #1 fmtlogDetailT<0>::poll(bool) /root/repos/wazuh/src/engine/build/_deps/fmtlog-src/fmtlog-inl.h:456 (main+0xd29bea)
    #2 fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}::operator()() const /root/repos/wazuh/src/engine/build/_deps/fmtlog-src/fmtlog-inl.h:330 (main+0xd2ad94)
    #3 void std::__invoke_impl<void, fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}>(std::__invoke_other, fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}&&) /usr/include/c++/9/bits/invoke.h:60 (main+0xd81976)
    #4 std::__invoke_result<fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}>::type std::__invoke<fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}>(std::__invoke_result&&, (fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}&&)...) /usr/include/c++/9/bits/invoke.h:95 (main+0xd817cf)
    #5 void std::thread::_Invoker<std::tuple<fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}> >::_M_invoke<0ul>(std::_Index_tuple<0ul>) /usr/include/c++/9/thread:244 (main+0xd815bc)
    #6 std::thread::_Invoker<std::tuple<fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}> >::operator()() /usr/include/c++/9/thread:251 (main+0xd81368)
    #7 std::thread::_State_impl<std::thread::_Invoker<std::tuple<fmtlogDetailT<0>::startPollingThread(long)::{lambda()#1}> > >::_M_run() /usr/include/c++/9/thread:195 (main+0xd80960)
    #8 <null> <null> (libstdc++.so.6+0xd6de3)

  Previous write of size 4 at 0x7f6f05f24940 by main thread:
    #0 fmtlogT<0>::SPSCVarQueueOPT::alloc(unsigned int) /root/repos/wazuh/src/engine/build/_deps/fmtlog-src/fmtlog.h:205 (main+0xd08571)
    #1 fmtlogT<0>::allocMsg(unsigned int) /root/repos/wazuh/src/engine/build/_deps/fmtlog-src/fmtlog-inl.h:516 (main+0xd07a81)
    #2 void fmtlogT<0>::log<char const*, char const*>(unsigned int&, long, char const*, fmtlogT<0>::LogLevel, fmt::v8::basic_format_string<char, fmt::v8::type_identity<fmtlogdetail::UnrefPtr<std::remove_cv<std::remove_reference<char const*>::type>::type>::type>::type, fmt::v8::type_identity<fmtlogdetail::UnrefPtr<std::remove_cv<std::remove_reference<char const*>::type>::type>::type>::type>, char const*&&, char const*&&) <null> (main+0xeb7422)
    #3 hlp::configureParserMappings(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) /root/repos/wazuh/src/engine/source/hlp/src/hlp.cpp:76 (main+0xeb4116)
    #4 cmd::run(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int, int, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int) /root/repos/wazuh/src/engine/source/cmds/src/cmdRun.cpp:93 (main+0xda0f68)
    #5 main /root/repos/wazuh/src/engine/source/main.cpp:205 (main+0xcdaf95)

  Location is heap block of size 1048960 at 0x7f6f05f24000 allocated by main thread:
    #0 operator new(unsigned long, std::align_val_t) ../../../../src/libsanitizer/tsan/tsan_new_delete.cpp:88 (libtsan.so.0+0x8c2d5)
    #1 fmtlogDetailT<0>::preallocate() /root/repos/wazuh/src/engine/build/_deps/fmtlog-src/fmtlog-inl.h:281 (main+0xd293ba)
    #2 fmtlogT<0>::preallocate() /root/repos/wazuh/src/engine/build/_deps/fmtlog-src/fmtlog-inl.h:527 (main+0xd06e2f)
    #3 fmtlogT<0>::allocMsg(unsigned int) /root/repos/wazuh/src/engine/build/_deps/fmtlog-src/fmtlog-inl.h:515 (main+0xd07a4b)
    #4 void fmtlogT<0>::log<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&>(unsigned int&, long, char const*, fmtlogT<0>::LogLevel, fmt::v8::basic_format_string<char, fmt::v8::type_identity<fmtlogdetail::UnrefPtr<std::remove_cv<std::remove_reference<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&>::type>::type>::type>::type>, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) /root/repos/wazuh/src/engine/build/_deps/fmtlog-src/fmtlog.h:637 (main+0xda3c19)
    #5 engineserver::endpoints::TCPEndpoint::TCPEndpoint(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, moodycamel::BlockingConcurrentQueue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, moodycamel::ConcurrentQueueDefaultTraits>&) /root/repos/wazuh/src/engine/source/server/endpoints/tcpEndpoint.cpp:136 (main+0x18f66b6)
    #6 std::_MakeUniq<engineserver::endpoints::TCPEndpoint>::__single_object std::make_unique<engineserver::endpoints::TCPEndpoint, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, moodycamel::BlockingConcurrentQueue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, moodycamel::ConcurrentQueueDefaultTraits>&>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, moodycamel::BlockingConcurrentQueue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, moodycamel::ConcurrentQueueDefaultTraits>&) /usr/include/c++/9/bits/unique_ptr.h:857 (main+0x18ecbd6)
    #7 engineserver::EngineServer::createEndpoint(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, moodycamel::BlockingConcurrentQueue<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, moodycamel::ConcurrentQueueDefaultTraits>&) const /root/repos/wazuh/src/engine/source/server/engineServer.cpp:61 (main+0x18ec066)
    #8 engineserver::EngineServer::EngineServer(std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, unsigned long) /root/repos/wazuh/src/engine/source/server/engineServer.cpp:41 (main+0x18ebd54)
    #9 cmd::run(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int, int, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int) /root/repos/wazuh/src/engine/source/cmds/src/cmdRun.cpp:78 (main+0xda0dc7)
    #10 main /root/repos/wazuh/src/engine/source/main.cpp:205 (main+0xcdaf95)

  Thread T1 (tid=92210, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:962 (libtsan.so.0+0x5ea79)
    #1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) <null> (libstdc++.so.6+0xd70a8)
    #2 fmtlogDetailT<0>::startPollingThread(long) /root/repos/wazuh/src/engine/build/_deps/fmtlog-src/fmtlog-inl.h:327 (main+0xd2af0b)
    #3 fmtlogT<0>::startPollingThread(long) /root/repos/wazuh/src/engine/build/_deps/fmtlog-src/fmtlog-inl.h:605 (main+0xd07612)
    #4 loggingInit /root/repos/wazuh/src/engine/source/base/logging/logging.hpp:43 (main+0xd9ee51)
    #5 cmd::run(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int, int, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int) /root/repos/wazuh/src/engine/source/cmds/src/cmdRun.cpp:74 (main+0xda0d06)
    #6 main /root/repos/wazuh/src/engine/source/main.cpp:205 (main+0xcdaf95)

SUMMARY: ThreadSanitizer: data race /root/repos/wazuh/src/engine/build/_deps/fmtlog-src/fmtlog.h:210 in fmtlogT<0>::SPSCVarQueueOPT::front()

juliancnn avatar Sep 19 '22 20:09 juliancnn

fmtlog use SPSC(single producer and single consumer) queue for logging, so the communication is between 2 parities, which makes threading safe easier.

MengRao avatar Sep 29 '22 03:09 MengRao