IXWebSocket icon indicating copy to clipboard operation
IXWebSocket copied to clipboard

Callbacks for Message after Close if sending fails

Open awelzel opened this issue 7 months ago • 3 comments

This was reported on zeek/zeek/issues/4475. Without going too much into details, a standalone reproducer contains:

  • A ws client connecting to a server
  • The server spawning a thread sending messages to the client as fast a possible
  • The client also sending messages as fast as possible, the server needing some time to process these (100us sleep).
  • The client hanging up without properly sending a CLOSE frame (os._exit(0) in Python).

The observation is that Message callbacks are invoked after a Close callback. That's not great if one models state handling with the assumption that there won't be messages after a Close callback (e.g. due to state cleanup during Close processing).

My understanding of the scenario:

  • The send() on the client's socket detects an error condition, closes the socket and switches into CLOSED state, thereby invoking the callback.
  • There is still some data buffered. Either in _rxbuf, or on the socket. The receiving thread spawned process these messages and invokes the Message callbacks.
  • This is confirmed by TSAN sometimes reporting a data-race between sender and receiver threads on the socket.

I'm thinking a fix could be to move responsibility of invoking the CLOSED callback to the receiving thread when in the server setup. Any attempts to fix this so far have ended up a bit hairy, however.

The alternative could be to discard anything on the socket or pending in _rxbuf if a send() fails, but that seems suboptimal due to discarding messages that have actually made it to the server.


Server code:

run-threaded.cpp.txt

Client code:

send-recv.py.txt

Unexpected output:

Client-side:
$ python3 send-recv.py --url ws://localhost:1234 --send-messages 0 --hard-exit-after 10000 

Server-side output
$ ./build/stress/run-threaded                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            
Open 0                                                                                                                                                                                                                                                                                                                        
Close 0 1011 Internal error                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
Thread[139881410963136] error after 10248 sends (0)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
Thread[139881410963136] exit (0)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             
Got message, but client was closed client=0                                                                                                                    
Got message, but client was closed client=0                                                                                                                                                                                                                                                                                   
Got message, but client was closed client=0                                                                                                                                                                                                                                                                                   
Got message, but client was closed client=0                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
Got message, but client was closed client=0                                                                                                                                                                                                                                                                                   
Got message, but client was closed client=0                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
Got message, but client was closed client=0                                                                                                                                                                                                                                                                                   
Got message, but client was closed client=0 

Output of TSAN data-race:

$ ./build/stress/run-threaded 
Open 0
==================
WARNING: ThreadSanitizer: data race (pid=2107780)
  Write of size 8 at 0x72b0000000c0 by thread T5 (mutexes: write M0, write M1, write M2, write M3):
    #0 close ..<...>/tsan_interceptors_posix.cpp:1818 (libtsan.so.2+0x54741) (BuildId: 38097064631f7912bd33117a9c83d08b42e15571)
    #1 ix::Socket::closeSocket(int) <...>/IXSocket.cpp:298 (run-threaded+0x45bce) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #2 ix::Socket::close() <...>/IXSocket.cpp:239 (run-threaded+0x45c40) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #3 ix::WebSocketTransport::closeSocket() <...>/IXWebSocketTransport.cpp:1166 (run-threaded+0x1bafd) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #4 ix::WebSocketTransport::sendOnSocket() <...>/IXWebSocketTransport.cpp:1084 (run-threaded+0x1c9d0) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #5 bool ix::WebSocketTransport::sendFragment<ix::IXWebSocketSendData::IXWebSocketSendData_const_iterator<char> >(ix::WebSocketTransport::wsheader_type::opcode_type, bool, ix::IXWebSocketSendData::IXWebSocketSendData_const_iterator<char>, ix::IXWebSocketSendData::IXWebSocketSendData_const_iterator<char>, bool) <...>/IXWebSocketTransport.cpp:1033 (run-threaded+0x2d81f) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #6 ix::WebSocketTransport::sendData(ix::WebSocketTransport::wsheader_type::opcode_type, ix::IXWebSocketSendData const&, bool, std::function<bool (int, int)> const&) <...>/IXWebSocketTransport.cpp:877 (run-threaded+0x23882) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #7 ix::WebSocketTransport::sendText(ix::IXWebSocketSendData const&, std::function<bool (int, int)> const&) <...>/IXWebSocketTransport.cpp:1062 (run-threaded+0x23ce7) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #8 ix::WebSocket::sendMessage(ix::IXWebSocketSendData const&, ix::SendMessageKind, std::function<bool (int, int)> const&) <...>/IXWebSocket.cpp:557 (run-threaded+0x31faa) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #9 ix::WebSocket::sendText(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::function<bool (int, int)> const&) <...>/IXWebSocket.cpp:523 (run-threaded+0x3258b) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #10 sender(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>) <...>/run-threaded.cpp:45 (run-threaded+0x12fd7) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #11 void std::__invoke_impl<void, void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> >(std::__invoke_other, void (*&&)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:61 (run-threaded+0x15cb9) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #12 std::__invoke_result<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> >::type std::__invoke<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> >(void (*&&)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:96 (run-threaded+0x15cb9)
    #13 void std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > >::_M_invoke<0ul, 1ul, 2ul>(std::_Index_tuple<0ul, 1ul, 2ul>) <...>/std_thread.h:292 (run-threaded+0x15cb9)
    #14 std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > >::operator()() <...>/std_thread.h:299 (run-threaded+0x15cb9)
    #15 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > > >::_M_run() <...>/std_thread.h:244 (run-threaded+0x15cb9)
    #16 <null> <null> (libstdc++.so.6+0xecdb3) (BuildId: ca77dae775ec87540acd7218fa990c40d1c94ab1)

  Previous read of size 8 at 0x72b0000000c0 by thread T4:
    #0 recv ..<...>/sanitizer_common_interceptors.inc:6718 (libtsan.so.2+0x7ac9d) (BuildId: 38097064631f7912bd33117a9c83d08b42e15571)
    #1 recv <...>/socket2.h:38 (run-threaded+0x45232) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #2 ix::Socket::recv(void*, unsigned long) <...>/IXSocket.cpp:265 (run-threaded+0x45232)
    #3 ix::WebSocketTransport::receiveFromSocket() <...>/IXWebSocketTransport.cpp:1116 (run-threaded+0x1bc12) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #4 ix::WebSocketTransport::poll() <...>/IXWebSocketTransport.cpp:376 (run-threaded+0x25167) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #5 ix::WebSocket::run() <...>/IXWebSocket.cpp:398 (run-threaded+0x4197a) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #6 ix::WebSocketServer::handleUpgrade(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>, std::shared_ptr<ix::HttpRequest>) <...>/IXWebSocketServer.cpp:153 (run-threaded+0x2200f) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #7 ix::WebSocketServer::handleConnection(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>) <...>/IXWebSocketServer.cpp:85 (run-threaded+0x22a18) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #8 void std::__invoke_impl<void, void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> >(std::__invoke_memfun_deref, void (ix::SocketServer::*&&)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*&&, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:74 (run-threaded+0x5b0b4) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #9 std::__invoke_result<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> >::type std::__invoke<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> >(void (ix::SocketServer::*&&)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*&&, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:96 (run-threaded+0x5b0b4)
    #10 void std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> > >::_M_invoke<0ul, 1ul, 2ul, 3ul>(std::_Index_tuple<0ul, 1ul, 2ul, 3ul>) <...>/std_thread.h:292 (run-threaded+0x5b0b4)
    #11 std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> > >::operator()() <...>/std_thread.h:299 (run-threaded+0x5b0b4)
    #12 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> > > >::_M_run() <...>/std_thread.h:244 (run-threaded+0x5b0b4)
    #13 <null> <null> (libstdc++.so.6+0xecdb3) (BuildId: ca77dae775ec87540acd7218fa990c40d1c94ab1)

  Location is file descriptor 6 created by thread T2 at:
    #0 accept ..<...>/sanitizer_common_interceptors.inc:2993 (libtsan.so.2+0x6937a) (BuildId: 38097064631f7912bd33117a9c83d08b42e15571)
    #1 ix::SocketServer::run() <...>/IXSocketServer.cpp:315 (run-threaded+0x54d8f) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #2 void std::__invoke_impl<void, void (ix::SocketServer::*)(), ix::SocketServer*>(std::__invoke_memfun_deref, void (ix::SocketServer::*&&)(), ix::SocketServer*&&) <...>/invoke.h:74 (run-threaded+0x5a801) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #3 std::__invoke_result<void (ix::SocketServer::*)(), ix::SocketServer*>::type std::__invoke<void (ix::SocketServer::*)(), ix::SocketServer*>(void (ix::SocketServer::*&&)(), ix::SocketServer*&&) <...>/invoke.h:96 (run-threaded+0x5a801)
    #4 void std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(), ix::SocketServer*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) <...>/std_thread.h:292 (run-threaded+0x5a801)
    #5 std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(), ix::SocketServer*> >::operator()() <...>/std_thread.h:299 (run-threaded+0x5a801)
    #6 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(), ix::SocketServer*> > >::_M_run() <...>/std_thread.h:244 (run-threaded+0x5a801)
    #7 <null> <null> (libstdc++.so.6+0xecdb3) (BuildId: ca77dae775ec87540acd7218fa990c40d1c94ab1)

  Mutex M0 (0x7268000004b8) created at:
    #0 pthread_mutex_lock ..<...>/tsan_interceptors_posix.cpp:1341 (libtsan.so.2+0x59a13) (BuildId: 38097064631f7912bd33117a9c83d08b42e15571)
    #1 __gthread_mutex_lock <...>/gthr-default.h:749 (run-threaded+0x31f7b) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #2 std::mutex::lock() <...>/std_mutex.h:113 (run-threaded+0x31f7b)
    #3 std::lock_guard<std::mutex>::lock_guard(std::mutex&) <...>/std_mutex.h:249 (run-threaded+0x31f7b)
    #4 ix::WebSocket::sendMessage(ix::IXWebSocketSendData const&, ix::SendMessageKind, std::function<bool (int, int)> const&) <...>/IXWebSocket.cpp:550 (run-threaded+0x31f7b)
    #5 ix::WebSocket::sendText(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::function<bool (int, int)> const&) <...>/IXWebSocket.cpp:523 (run-threaded+0x3258b) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #6 sender(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>) <...>/run-threaded.cpp:45 (run-threaded+0x12fd7) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #7 void std::__invoke_impl<void, void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> >(std::__invoke_other, void (*&&)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:61 (run-threaded+0x15cb9) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #8 std::__invoke_result<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> >::type std::__invoke<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> >(void (*&&)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:96 (run-threaded+0x15cb9)
    #9 void std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > >::_M_invoke<0ul, 1ul, 2ul>(std::_Index_tuple<0ul, 1ul, 2ul>) <...>/std_thread.h:292 (run-threaded+0x15cb9)
    #10 std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > >::operator()() <...>/std_thread.h:299 (run-threaded+0x15cb9)
    #11 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > > >::_M_run() <...>/std_thread.h:244 (run-threaded+0x15cb9)
    #12 <null> <null> (libstdc++.so.6+0xecdb3) (BuildId: ca77dae775ec87540acd7218fa990c40d1c94ab1)

  Mutex M1 (0x726800000088) created at:
    #0 pthread_mutex_lock ..<...>/tsan_interceptors_posix.cpp:1341 (libtsan.so.2+0x59a13) (BuildId: 38097064631f7912bd33117a9c83d08b42e15571)
    #1 __gthread_mutex_lock <...>/gthr-default.h:749 (run-threaded+0x23618) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #2 std::mutex::lock() <...>/std_mutex.h:113 (run-threaded+0x23618)
    #3 std::lock_guard<std::mutex>::lock_guard(std::mutex&) <...>/std_mutex.h:249 (run-threaded+0x23618)
    #4 ix::WebSocketTransport::sendData(ix::WebSocketTransport::wsheader_type::opcode_type, ix::IXWebSocketSendData const&, bool, std::function<bool (int, int)> const&) <...>/IXWebSocketTransport.cpp:868 (run-threaded+0x23618)
    #5 ix::WebSocketTransport::sendText(ix::IXWebSocketSendData const&, std::function<bool (int, int)> const&) <...>/IXWebSocketTransport.cpp:1062 (run-threaded+0x23ce7) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #6 ix::WebSocket::sendMessage(ix::IXWebSocketSendData const&, ix::SendMessageKind, std::function<bool (int, int)> const&) <...>/IXWebSocket.cpp:557 (run-threaded+0x31faa) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #7 ix::WebSocket::sendText(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::function<bool (int, int)> const&) <...>/IXWebSocket.cpp:523 (run-threaded+0x3258b) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #8 sender(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>) <...>/run-threaded.cpp:45 (run-threaded+0x12fd7) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #9 void std::__invoke_impl<void, void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> >(std::__invoke_other, void (*&&)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:61 (run-threaded+0x15cb9) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #10 std::__invoke_result<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> >::type std::__invoke<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> >(void (*&&)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:96 (run-threaded+0x15cb9)
    #11 void std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > >::_M_invoke<0ul, 1ul, 2ul>(std::_Index_tuple<0ul, 1ul, 2ul>) <...>/std_thread.h:292 (run-threaded+0x15cb9)
    #12 std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > >::operator()() <...>/std_thread.h:299 (run-threaded+0x15cb9)
    #13 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > > >::_M_run() <...>/std_thread.h:244 (run-threaded+0x15cb9)
    #14 <null> <null> (libstdc++.so.6+0xecdb3) (BuildId: ca77dae775ec87540acd7218fa990c40d1c94ab1)

  Mutex M2 (0x7268000000d8) created at:
    #0 pthread_mutex_lock ..<...>/tsan_interceptors_posix.cpp:1341 (libtsan.so.2+0x59a13) (BuildId: 38097064631f7912bd33117a9c83d08b42e15571)
    #1 __gthread_mutex_lock <...>/gthr-default.h:749 (run-threaded+0x1c3c8) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #2 std::mutex::lock() <...>/std_mutex.h:113 (run-threaded+0x1c3c8)
    #3 std::lock_guard<std::mutex>::lock_guard(std::mutex&) <...>/std_mutex.h:249 (run-threaded+0x1c3c8)
    #4 ix::WebSocketTransport::connectToSocket(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, int, bool, std::shared_ptr<ix::HttpRequest>) <...>/IXWebSocketTransport.cpp:177 (run-threaded+0x1c3c8)
    #5 ix::WebSocket::connectToSocket(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, int, bool, std::shared_ptr<ix::HttpRequest>) <...>/IXWebSocket.cpp:268 (run-threaded+0x387e4) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #6 ix::WebSocketServer::handleUpgrade(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>, std::shared_ptr<ix::HttpRequest>) <...>/IXWebSocketServer.cpp:148 (run-threaded+0x21f9d) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #7 ix::WebSocketServer::handleConnection(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>) <...>/IXWebSocketServer.cpp:85 (run-threaded+0x22a18) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #8 void std::__invoke_impl<void, void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> >(std::__invoke_memfun_deref, void (ix::SocketServer::*&&)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*&&, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:74 (run-threaded+0x5b0b4) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #9 std::__invoke_result<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> >::type std::__invoke<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> >(void (ix::SocketServer::*&&)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*&&, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:96 (run-threaded+0x5b0b4)
    #10 void std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> > >::_M_invoke<0ul, 1ul, 2ul, 3ul>(std::_Index_tuple<0ul, 1ul, 2ul, 3ul>) <...>/std_thread.h:292 (run-threaded+0x5b0b4)
    #11 std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> > >::operator()() <...>/std_thread.h:299 (run-threaded+0x5b0b4)
    #12 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> > > >::_M_run() <...>/std_thread.h:244 (run-threaded+0x5b0b4)
    #13 <null> <null> (libstdc++.so.6+0xecdb3) (BuildId: ca77dae775ec87540acd7218fa990c40d1c94ab1)

  Mutex M3 (0x721000002010) created at:
    #0 pthread_mutex_lock ..<...>/tsan_interceptors_posix.cpp:1341 (libtsan.so.2+0x59a13) (BuildId: 38097064631f7912bd33117a9c83d08b42e15571)
    #1 __gthread_mutex_lock <...>/gthr-default.h:749 (run-threaded+0x45c12) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #2 std::mutex::lock() <...>/std_mutex.h:113 (run-threaded+0x45c12)
    #3 std::lock_guard<std::mutex>::lock_guard(std::mutex&) <...>/std_mutex.h:249 (run-threaded+0x45c12)
    #4 ix::Socket::close() <...>/IXSocket.cpp:235 (run-threaded+0x45c12)
    #5 ix::WebSocketTransport::closeSocket() <...>/IXWebSocketTransport.cpp:1166 (run-threaded+0x1bafd) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #6 ix::WebSocketTransport::sendOnSocket() <...>/IXWebSocketTransport.cpp:1084 (run-threaded+0x1c9d0) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #7 bool ix::WebSocketTransport::sendFragment<ix::IXWebSocketSendData::IXWebSocketSendData_const_iterator<char> >(ix::WebSocketTransport::wsheader_type::opcode_type, bool, ix::IXWebSocketSendData::IXWebSocketSendData_const_iterator<char>, ix::IXWebSocketSendData::IXWebSocketSendData_const_iterator<char>, bool) <...>/IXWebSocketTransport.cpp:1033 (run-threaded+0x2d81f) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #8 ix::WebSocketTransport::sendData(ix::WebSocketTransport::wsheader_type::opcode_type, ix::IXWebSocketSendData const&, bool, std::function<bool (int, int)> const&) <...>/IXWebSocketTransport.cpp:877 (run-threaded+0x23882) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #9 ix::WebSocketTransport::sendText(ix::IXWebSocketSendData const&, std::function<bool (int, int)> const&) <...>/IXWebSocketTransport.cpp:1062 (run-threaded+0x23ce7) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #10 ix::WebSocket::sendMessage(ix::IXWebSocketSendData const&, ix::SendMessageKind, std::function<bool (int, int)> const&) <...>/IXWebSocket.cpp:557 (run-threaded+0x31faa) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #11 ix::WebSocket::sendText(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::function<bool (int, int)> const&) <...>/IXWebSocket.cpp:523 (run-threaded+0x3258b) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #12 sender(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>) <...>/run-threaded.cpp:45 (run-threaded+0x12fd7) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #13 void std::__invoke_impl<void, void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> >(std::__invoke_other, void (*&&)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:61 (run-threaded+0x15cb9) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #14 std::__invoke_result<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> >::type std::__invoke<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> >(void (*&&)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:96 (run-threaded+0x15cb9)
    #15 void std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > >::_M_invoke<0ul, 1ul, 2ul>(std::_Index_tuple<0ul, 1ul, 2ul>) <...>/std_thread.h:292 (run-threaded+0x15cb9)
    #16 std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > >::operator()() <...>/std_thread.h:299 (run-threaded+0x15cb9)
    #17 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > > >::_M_run() <...>/std_thread.h:244 (run-threaded+0x15cb9)
    #18 <null> <null> (libstdc++.so.6+0xecdb3) (BuildId: ca77dae775ec87540acd7218fa990c40d1c94ab1)

  Thread T5 (tid=2107794, running) created by thread T4 at:
    #0 pthread_create ..<...>/tsan_interceptors_posix.cpp:1022 (libtsan.so.2+0x5ac1a) (BuildId: 38097064631f7912bd33117a9c83d08b42e15571)
    #1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) <null> (libstdc++.so.6+0xeceb0) (BuildId: ca77dae775ec87540acd7218fa990c40d1c94ab1)
    #2 std::function<void (std::unique_ptr<ix::WebSocketMessage, std::default_delete<ix::WebSocketMessage> > const&)>::operator()(std::unique_ptr<ix::WebSocketMessage, std::default_delete<ix::WebSocketMessage> > const&) const <...>/std_function.h:591 (run-threaded+0x38cdf) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #3 ix::WebSocket::connectToSocket(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, int, bool, std::shared_ptr<ix::HttpRequest>) <...>/IXWebSocket.cpp:274 (run-threaded+0x38cdf)
    #4 ix::WebSocketServer::handleUpgrade(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>, std::shared_ptr<ix::HttpRequest>) <...>/IXWebSocketServer.cpp:148 (run-threaded+0x21f9d) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #5 ix::WebSocketServer::handleConnection(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>) <...>/IXWebSocketServer.cpp:85 (run-threaded+0x22a18) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #6 void std::__invoke_impl<void, void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> >(std::__invoke_memfun_deref, void (ix::SocketServer::*&&)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*&&, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:74 (run-threaded+0x5b0b4) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #7 std::__invoke_result<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> >::type std::__invoke<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> >(void (ix::SocketServer::*&&)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*&&, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >&&, std::shared_ptr<ix::ConnectionState>&&) <...>/invoke.h:96 (run-threaded+0x5b0b4)
    #8 void std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> > >::_M_invoke<0ul, 1ul, 2ul, 3ul>(std::_Index_tuple<0ul, 1ul, 2ul, 3ul>) <...>/std_thread.h:292 (run-threaded+0x5b0b4)
    #9 std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> > >::operator()() <...>/std_thread.h:299 (run-threaded+0x5b0b4)
    #10 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState>), ix::SocketServer*, std::unique_ptr<ix::Socket, std::default_delete<ix::Socket> >, std::shared_ptr<ix::ConnectionState> > > >::_M_run() <...>/std_thread.h:244 (run-threaded+0x5b0b4)
    #11 <null> <null> (libstdc++.so.6+0xecdb3) (BuildId: ca77dae775ec87540acd7218fa990c40d1c94ab1)

  Thread T4 'Srv:ws:0' (tid=2107793, running) created by thread T2 at:
    #0 pthread_create ..<...>/tsan_interceptors_posix.cpp:1022 (libtsan.so.2+0x5ac1a) (BuildId: 38097064631f7912bd33117a9c83d08b42e15571)
    #1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) <null> (libstdc++.so.6+0xeceb0) (BuildId: ca77dae775ec87540acd7218fa990c40d1c94ab1)
    #2 void std::__invoke_impl<void, void (ix::SocketServer::*)(), ix::SocketServer*>(std::__invoke_memfun_deref, void (ix::SocketServer::*&&)(), ix::SocketServer*&&) <...>/invoke.h:74 (run-threaded+0x5a801) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #3 std::__invoke_result<void (ix::SocketServer::*)(), ix::SocketServer*>::type std::__invoke<void (ix::SocketServer::*)(), ix::SocketServer*>(void (ix::SocketServer::*&&)(), ix::SocketServer*&&) <...>/invoke.h:96 (run-threaded+0x5a801)
    #4 void std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(), ix::SocketServer*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) <...>/std_thread.h:292 (run-threaded+0x5a801)
    #5 std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(), ix::SocketServer*> >::operator()() <...>/std_thread.h:299 (run-threaded+0x5a801)
    #6 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (ix::SocketServer::*)(), ix::SocketServer*> > >::_M_run() <...>/std_thread.h:244 (run-threaded+0x5a801)
    #7 <null> <null> (libstdc++.so.6+0xecdb3) (BuildId: ca77dae775ec87540acd7218fa990c40d1c94ab1)

  Thread T2 'Srv:ac:1234' (tid=2107783, running) created by main thread at:
    #0 pthread_create ..<...>/tsan_interceptors_posix.cpp:1022 (libtsan.so.2+0x5ac1a) (BuildId: 38097064631f7912bd33117a9c83d08b42e15571)
    #1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) <null> (libstdc++.so.6+0xeceb0) (BuildId: ca77dae775ec87540acd7218fa990c40d1c94ab1)
    #2 ix::WebSocketServer::listenAndStart() <...>/IXWebSocketServer.cpp:226 (run-threaded+0x1ada7) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)
    #3 main <...>/run-threaded.cpp:125 (run-threaded+0x11e84) (BuildId: 3607eddec6a0910918d8f839d1c38505f6cab55e)

SUMMARY: ThreadSanitizer: data race <...>/IXSocket.cpp:298 in ix::Socket::closeSocket(int)

awelzel avatar May 21 '25 08:05 awelzel

I'm thinking a fix could be to move responsibility of invoking the CLOSED callback to the receiving thread when in the server setup. Any attempts to fix this so far have ended up a bit hairy, however.

@bsergean - would be glad to hear if you have input on the scenario / use-case. Maybe you have a clever idea for a fix?

awelzel avatar May 21 '25 09:05 awelzel

The observation is that Message callbacks are invoked after a Close callback Can we check that explicitly, and prevent it ? If the state of the connection is now "closed", we refuse to invoke the callback (or only permit it to send the close message, but not for ping/pong/messages ?

bsergean avatar May 22 '25 15:05 bsergean

https://github.com/machinezone/IXWebSocket/pull/552

Would that work ?

bsergean avatar May 22 '25 19:05 bsergean