IXWebSocket
IXWebSocket copied to clipboard
O(N^2) time complexity bug in WebsocketTransport
https://github.com/machinezone/IXWebSocket/blob/679ce519dd0d6d3919990abdee7109a5eeb99aa0/ixwebsocket/IXWebSocketTransport.cpp#L1067 https://github.com/machinezone/IXWebSocket/blob/679ce519dd0d6d3919990abdee7109a5eeb99aa0/ixwebsocket/IXWebSocketTransport.cpp#L691
The way the code is using std::vector for the receive buffer with data being added at the end and removed from the front creates a time complexity bug. When erasing at the front, vector::erase is O(N) where N is the size of the whole vec, not the size of the packet data being erased. That means that dequeuing all packets is actually O(N^2)
If for whatever reason the receive buffer grows big enough, the time to dequeue one packet can exceed the time to receive one packet at which point it's a downward spiral that cannot be recovered (the process will grind to a halt).
This is not just theoretical, I experienced it in production.
The issue can be fixed by using a different datastructure for the receive buffer, such as std::deque.
Thanks for the report, sorry I missed that. I've made a code change to use a deque that I'm trying now.
Interestingly using a deque triggers an ASAN (address sanitizer error).
15: ==2343==ERROR: AddressSanitizer: heap-buffer-overflow on address 0x6150001ee880 at pc 0x5614c25f30ca bp 0x7fa833ffc380 sp 0x7fa833ffc370
15: READ of size 1 at 0x6150001ee880 thread T3
15: #0 0x5614c25f30c9 in ix::WebSocketTransport::dispatch(ix::WebSocketTransport::PollResult, std::function<void (std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, unsigned long, bool, ix::WebSocketTransport::MessageKind)> const&) /home/runner/work/IXWebSocket/IXWebSocket/ixwebsocket/IXWebSocketTransport.cpp:497
15: #1 0x5614c2636a9c in ix::WebSocket::run() /home/runner/work/IXWebSocket/IXWebSocket/ixwebsocket/IXWebSocket.cpp:397
15: #2 0x5614c266ce13 in void std::__invoke_impl<void, void (ix::WebSocket::*)(), ix::WebSocket*>(std::__invoke_memfun_deref, void (ix::WebSocket::*&&)(), ix::WebSocket*&&) /usr/include/c++/11/bits/invoke.h:74
15: #3 0x5614c266cc9c in std::__invoke_result<void (ix::WebSocket::*)(), ix::WebSocket*>::type std::__invoke<void (ix::WebSocket::*)(), ix::WebSocket*>(void (ix::WebSocket::*&&)(), ix::WebSocket*&&) /usr/include/c++/11/bits/invoke.h:96
15: #4 0x5614c266cbfc in void std::thread::_Invoker<std::tuple<void (ix::WebSocket::*)(), ix::WebSocket*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/include/c++/11/bits/std_thread.h:253
15: #5 0x5614c266cbb1 in std::thread::_Invoker<std::tuple<void (ix::WebSocket::*)(), ix::WebSocket*> >::operator()() /usr/include/c++/11/bits/std_thread.h:260
15: #6 0x5614c266cb91 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (ix::WebSocket::*)(), ix::WebSocket*> > >::_M_run() /usr/include/c++/11/bits/std_thread.h:211
15: #7 0x7fa839a4b2b2 (/lib/x86_64-linux-gnu/libstdc++.so.6+0xdc2b2)
15: #8 0x7fa8396d2b42 (/lib/x86_64-linux-gnu/libc.so.6+0x94b42)
15: #9 0x7fa8397649ff (/lib/x86_64-linux-gnu/libc.so.6+0x1269ff)
15:
15: 0x6150001ee880 is located 0 bytes to the right of 512-byte region [0x6150001ee680,0x6150001ee880)
15: allocated by thread T3 here:
15: #0 0x7fa83a1521c7 in operator new(unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:99
15: #1 0x5614c251b971 in __gnu_cxx::new_allocator<unsigned char>::allocate(unsigned long, void const*) /usr/include/c++/11/ext/new_allocator.h:127
15: #2 0x5614c24fcf05 in std::allocator_traits<std::allocator<unsigned char> >::allocate(std::allocator<unsigned char>&, unsigned long) /usr/include/c++/11/bits/alloc_traits.h:464
15: #3 0x5614c261f147 in std::_Deque_base<unsigned char, std::allocator<unsigned char> >::_M_allocate_node() /usr/include/c++/11/bits/stl_deque.h:561
15: #4 0x5614c262118e in std::deque<unsigned char, std::allocator<unsigned char> >::_M_new_elements_at_back(unsigned long) /usr/include/c++/11/bits/deque.tcc:919
15: #5 0x5614c261a57a in std::deque<unsigned char, std::allocator<unsigned char> >::_M_reserve_elements_at_back(unsigned long) /usr/include/c++/11/bits/stl_deque.h:2104
15: #6 0x5614c261358c in void std::deque<unsigned char, std::allocator<unsigned char> >::_M_range_insert_aux<__gnu_cxx::__normal_iterator<unsigned char*, std::vector<unsigned char, std::allocator<unsigned char> > > >(std::_Deque_iterator<unsigned char, unsigned char&, unsigned char*>, __gnu_cxx::__normal_iterator<unsigned char*, std::vector<unsigned char, std::allocator<unsigned char> > >, __gnu_cxx::__normal_iterator<unsigned char*, std::vector<unsigned char, std::allocator<unsigned char> > >, std::forward_iterator_tag) /usr/include/c++/11/bits/deque.tcc:622
15: #7 0x5614c260a987 in std::_Deque_iterator<unsigned char, unsigned char&, unsigned char*> std::deque<unsigned char, std::allocator<unsigned char> >::insert<__gnu_cxx::__normal_iterator<unsigned char*, std::vector<unsigned char, std::allocator<unsigned char> > >, void>(std::_Deque_iterator<unsigned char, unsigned char const&, unsigned char const*>, __gnu_cxx::__normal_iterator<unsigned char*, std::vector<unsigned char, std::allocator<unsigned char> > >, __gnu_cxx::__normal_iterator<unsigned char*, std::vector<unsigned char, std::allocator<unsigned char> > >) /usr/include/c++/11/bits/stl_deque.h:1691
15: #8 0x5614c25f7dcd in ix::WebSocketTransport::receiveFromSocket() /home/runner/work/IXWebSocket/IXWebSocket/ixwebsocket/IXWebSocketTransport.cpp:1108
15: #9 0x5614c25f2350 in ix::WebSocketTransport::poll() /home/runner/work/IXWebSocket/IXWebSocket/ixwebsocket/IXWebSocketTransport.cpp:370
15: #10 0x5614c2636a0b in ix::WebSocket::run() /home/runner/work/IXWebSocket/IXWebSocket/ixwebsocket/IXWebSocket.cpp:394
15: #11 0x5614c266ce13 in void std::__invoke_impl<void, void (ix::WebSocket::*)(), ix::WebSocket*>(std::__invoke_memfun_deref, void (ix::WebSocket::*&&)(), ix::WebSocket*&&) /usr/include/c++/11/bits/invoke.h:74
15: #12 0x5614c266cc9c in std::__invoke_result<void (ix::WebSocket::*)(), ix::WebSocket*>::type std::__invoke<void (ix::WebSocket::*)(), ix::WebSocket*>(void (ix::WebSocket::*&&)(), ix::WebSocket*&&) /usr/include/c++/11/bits/invoke.h:96
15: #13 0x5614c266cbfc in void std::thread::_Invoker<std::tuple<void (ix::WebSocket::*)(), ix::WebSocket*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/include/c++/11/bits/std_thread.h:253
15: #14 0x5614c266cbb1 in std::thread::_Invoker<std::tuple<void (ix::WebSocket::*)(), ix::WebSocket*> >::operator()() /usr/include/c++/11/bits/std_thread.h:260
15: #15 0x5614c266cb91 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (ix::WebSocket::*)(), ix::WebSocket*> > >::_M_run() /usr/include/c++/11/bits/std_thread.h:211
15: #16 0x7fa839a4b2b2 (/lib/x86_64-linux-gnu/libstdc++.so.6+0xdc2b2)
15:
15: Thread T3 created by T0 here:
15: #0 0x7fa83a0f4685 in __interceptor_pthread_create ../../../../src/libsanitizer/asan/asan_interceptors.cpp:216
15: #1 0x7fa839a4b388 in std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) (/lib/x86_64-linux-gnu/libstdc++.so.6+0xdc388)
15: #2 0x5614c2632f1d in ix::WebSocket::start() /home/runner/work/IXWebSocket/IXWebSocket/ixwebsocket/IXWebSocket.cpp:183
15: #3 0x5614c23940ca in start /home/runner/work/IXWebSocket/IXWebSocket/test/IXWebSocketChatTest.cpp:163
15: #4 0x5614c239714c in C_A_T_C_H_T_E_S_T_0 /home/runner/work/IXWebSocket/IXWebSocket/test/IXWebSocketChatTest.cpp:255
15: #5 0x5614c23e5f28 in Catch::TestInvokerAsFunction::invoke() const /home/runner/work/IXWebSocket/IXWebSocket/test/Catch2/single_include/catch.hpp:14328
15: #6 0x5614c23e3bff in Catch::TestCase::invoke() const /home/runner/work/IXWebSocket/IXWebSocket/test/Catch2/single_include/catch.hpp:14167
15: #7 0x5614c23d5f86 in Catch::RunContext::invokeActiveTestCase() /home/runner/work/IXWebSocket/IXWebSocket/test/Catch2/single_include/catch.hpp:13027
15: #8 0x5614c23d5784 in Catch::RunContext::runCurrentTest(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /home/runner/work/IXWebSocket/IXWebSocket/test/Catch2/single_include/catch.hpp:13000
15: #9 0x5614c23d1eeb in Catch::RunContext::runTest(Catch::TestCase const&) /home/runner/work/IXWebSocket/IXWebSocket/test/Catch2/single_include/catch.hpp:12761
15: #10 0x5614c23da2ec in execute /home/runner/work/IXWebSocket/IXWebSocket/test/Catch2/single_include/catch.hpp:13354
15: #11 0x5614c23dcf94 in Catch::Session::runInternal() /home/runner/work/IXWebSocket/IXWebSocket/test/Catch2/single_include/catch.hpp:13560
15: #12 0x5614c23dc88c in Catch::Session::run() /home/runner/work/IXWebSocket/IXWebSocket/test/Catch2/single_include/catch.hpp:13516
15: #13 0x5614c24a3705 in int Catch::Session::run<char>(int, char const* const*) /home/runner/work/IXWebSocket/IXWebSocket/test/Catch2/single_include/catch.hpp:13238
15: #14 0x5614c24143ea in main /home/runner/work/IXWebSocket/IXWebSocket/test/test_runner.cpp:25
15: #15 0x7fa839667d8f (/lib/x86_64-linux-gnu/libc.so.6+0x29d8f)
15:
15: SUMMARY: AddressSanitizer: heap-buffer-overflow /home/runner/work/IXWebSocket/IXWebSocket/ixwebsocket/IXWebSocketTransport.cpp:497 in ix::WebSocketTransport::dispatch(ix::WebSocketTransport::PollResult, std::function<void (std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, unsigned long, bool, ix::WebSocketTransport::MessageKind)> const&)
15: Shadow bytes around the buggy address:
15: 0x0c2a80035cc0: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
15: 0x0c2a80035cd0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
15: 0x0c2a80035ce0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
15: 0x0c2a80035cf0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
15: 0x0c2a80035d00: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
15: =>0x0c2a80035d10:[fa]fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
15: 0x0c2a80035d20: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
15: 0x0c2a80035d30: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
15: 0x0c2a80035d40: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
15: 0x0c2a80035d50: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
15: 0x0c2a80035d60: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
15: Shadow byte legend (one shadow byte represents 8 application bytes):
15: Addressable: 00
15: Partially addressable: 01 02 03 04 05 06 07
15: Heap left redzone: fa
15: Freed heap region: fd
15: Stack left redzone: f1
15: Stack mid redzone: f2
15: Stack right redzone: f3
....
It's not quite enough to just replace vector with deque. The memory inside a deque isn't guaranteed to be contiguous like a vector, so any code that assumes that needs to be modified, for example https://github.com/machinezone/IXWebSocket/blob/9157873f5bafbe967bce84df3b5c2a67f929a62f/ixwebsocket/IXWebSocketTransport.cpp#L462
Hi bsergean & dbregman,
Are we sure std::deque is a good idea? I may be rather inefficient for the general case unless you use something like boost's deque were we can configure the block size. I would say the complexity is O(N*M) where N is the buffer size and M is the number of messages.
After the code
std::string frameData(_rxbuf.begin() + ws.header_size, _rxbuf.begin() + ws.header_size + (size_t) ws.N);
where you have a full message, could you not keep an index into _rxbuf and continue processing from there? Then when you are done processing the messages, you can erase only once [0,endOfLastMessage] from _rxbuf?
Also, storing the chunks in vector<string> would be better now that we have move-semantics.
kind regards
Thorsten
From https://en.cppreference.com/w/cpp/container/deque:
Insertion or removal of elements at the end or beginning - constant O(1)
so std::deque has the right complexity characteristics.
It also has the advantage of being immediately available and largely compatible with the existing code -- almost a drop in replacement except for a few lines that need to be modified to remove the assumption of contiguous memory.
Some other solutions could be slightly more performant (e.g. optimized ring buffer) but I don't think that is of much concern here.
My concern here is not O(1) removal. My concern is that std::deque often degenerates into a random-access list. For my system, the largest block size is 16. So you will use one heap allocation for every 16 bytes of data! That is very inefficient for the general case, even though it might give you a O(1) front removal. boost::deque can customize the block size, but we probably do not want a dependency on boost.
So I recommend that we use an index into _rxbuf and do one erase in the end.
kind regards
Thorsten
16 byte blocks is quite surprising, which system is that?
In any case, I'm certainly not opposed to using an even better solution than a std::deque. Any growable ringbuffer should work well here.
I would not suggest storing chunks in vector<string> though as that would create a lot of extra allocations.
Hi dbregman,
That would be windows.
As for chunks in vector<string> vs list<string> then I don't think you are right.
Let's analyze it.
If we add N strings to a list<string>, we get N allocations inside string and N allocations inside list (assuming that the strings are so large that they require heap-allocations). So 2N allocations.
For vector<string> we get an allocation each time the vector regrows; assuming it is never possible to know in advance how to many chunks to receive, and assuming clear() actually releases memory (it doesn't), we get lg N allocations inside vector and N allocations inside string. So we have only N + lg N allocations for vector<string>.
In addition to this we then get a number of move-operations of strings when the vector needs to move elements to a larger buffer. We get O(N) move operations, but these are much faster than allocations. Overall, I will expect vector<string> to be faster. Quite quickly, we should expect the vector never to grow (because clear() reuses memory), and from there on we have only N allocations for N chunks, half the number for list.
kind regards
Thorsten
I confirmed the std lib shipped by Microsoft uses 16 byte blocks. Surprising and good to know. It seems deque should be avoided for this kind of use case if portable high performance is required (although it would still be better than the current code with the O(N^2) bug).
I thought you meant something else by chunks, so that comment was based on a misunderstanding. For vector vs list I agree vector is clearly better here assuming move semantics.
This is not just theoretical, I experienced it in production.
The issue can be fixed by using a different datastructure for the receive buffer, such as std::deque.
We're also running into this issue zeek/zeek#4440 - I'll open a PR that caps the size of _rxbuf as that seems to alleviate the issue without too much restructuring.
This is the server-side flamegraph when running a single client sending 500.000 messages as fast as possible. The client needs ~1min to complete (code below).
$ /usr/bin/time python3 client-sender.py --url ws://localhost:1234 --messages 500000
2.05user 0.42system 1:01.43elapsed 4%CPU (0avgtext+0avgdata 25140maxresident)k
0inputs+0outputs (0major+3888minor)pagefaults 0swaps
This is the client and server for testing:
import argparse
from websockets.sync.client import connect
def run(args):
clients = []
for i in range(args.clients):
c = connect(args.url)
print(i, c.recv())
clients.append(c)
i = 0
buf = ("A" * 1024).encode("ascii")
while i < args.messages or args.messages == 0:
if i % 5000 == 0:
print("Sending", i)
for c in clients:
c.send(buf)
i = i + 1
for c in clients:
c.close()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--url", required=True)
parser.add_argument("--messages", type=int, default=0)
parser.add_argument("--clients", type=int, default=1)
args = parser.parse_args()
run(args)
if __name__ == "__main__":
main()
Server:
#include "ixwebsocket/IXConnectionState.h"
#include "ixwebsocket/IXWebSocket.h"
#include "ixwebsocket/IXWebSocketMessage.h"
#include "ixwebsocket/IXWebSocketMessageType.h"
#include "ixwebsocket/IXWebSocketServer.h"
#include <cstdlib>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
struct Client
{
std::shared_ptr<ix::ConnectionState> cs;
bool closed = false;
uint64_t msgs = 0;
};
// Global clients table.
std::mutex clients_mtx;
std::map<std::string, Client> clients;
int main()
{
int port = 1234;
const char* port_str = getenv("SERVER_PORT");
if (port_str) port = atoi(port_str);
auto server = std::make_shared<ix::WebSocketServer>(port);
server->disablePerMessageDeflate();
server->setOnConnectionCallback(
[](std::weak_ptr<ix::WebSocket> wws, std::shared_ptr<ix::ConnectionState> cs)
{
auto ws = wws.lock();
if (!ws)
{
std::cerr << "Could not acquire WS client!" << std::endl;
return;
}
ws->setOnMessageCallback(
[ws = ws, cs = cs](const ix::WebSocketMessagePtr& msg)
{
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "Open " << cs->getId() << std::endl;
// Spawn a thread streaming data to the client.
{
std::lock_guard<std::mutex> lk {clients_mtx};
clients[cs->getId()] = {};
clients[cs->getId()].cs = cs;
}
ws->sendText("Hello!");
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
uint64_t msgs = 0;
{
std::lock_guard<std::mutex> lk {clients_mtx};
clients[cs->getId()].closed = true;
msgs = clients[cs->getId()].msgs;
}
std::cerr << "Close " << cs->getId() << " " << msgs << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::lock_guard<std::mutex> lk {clients_mtx};
++clients[cs->getId()].msgs;
}
});
});
if (!server->listenAndStart())
{
std::cerr << "Failed ot listen and start" << std::endl;
exit(1);
}
server->wait();
server->stop();
return 0;
}