cpp-httplib
cpp-httplib copied to clipboard
SSE (server-side-event) example can hang the sent_event method
Hi guys,
I have been using the SSE for a while now exactly as describe in the example/ssesvr.cc
file.
I noticed that my main thread hung / stale sporadically, after a long and painful investigation, it looks like the send_event()
method, can hang for some reason.
My use case is: I always have a least one listener waiting for an event, and I send about 1 event per second. I can easily see a couple of successive 5s hang here and there in a 24h time period (meaning the send_event()
method take 5s ish to get back.
Not familiar with wait conditions etc, but do you guys know what could be done to avoid any locking like that? Not even sure what the root cause can be. Can the sink->write
be super long on network failure and lock the entire thing ? Any help is appreciated!
Here is a reminder of the SSE implementation :
class EventDispatcher {
public:
EventDispatcher() {
}
void wait_event(DataSink *sink) {
unique_lock<mutex> lk(m_);
int id = id_;
cv_.wait(lk, [&] { return cid_ == id; });
if (sink->is_writable()) { sink->write(message_.data(), message_.size()); }
}
void send_event(const string &message) {
lock_guard<mutex> lk(m_);
cid_ = id_++;
message_ = message;
cv_.notify_all();
}
private:
mutex m_;
condition_variable cv_;
atomic_int id_{0};
atomic_int cid_{-1};
string message_;
};
@gh4ck3r, do you have any comment of this issue?
The case described is different from mine. I didn't experience such a stale or hung for sending message. @vrince, you may need to offer more information. Because, I couldn't find problem only with the code above. As I saw/tested it several times too, I know that the implementation is not perfect but it's good enough for starting point.
Only point send_event
could be hold is first line where lock_guard<mutex> lk(m_)
is written. To make it happen, wait_event()
must owns/locks the mutex m_
in other thread. If sink->write(...)
takes some time, it could keep the lock longer because the mutex will be unlocked when returning from the function.
So, IMO, it could be about message
and underlying socket. For example, it multiple message
s fill up socket's send buffer, it could be happen I guess.
@vrince would you make/share your case in unittest? I can run it more than 24 hours on my environment to check the reproducibility next week.
@gh4ck3r, thanks for the comment. @vrince, could you respond to @gh4ck3r's comment? Thanks!
Thanks for the interest you guys put into this issue! To be honest to narrow it down to the sse part in our project was already really long. I'll try to make a simple (no other deps) example that reproduce the behaviour.
Le ven. 9 sept. 2022, 2 h 31 p.m., yhirose @.***> a écrit :
@gh4ck3r https://github.com/gh4ck3r, thanks for the comment. @vrince https://github.com/vrince, could you respond to @gh4ck3r https://github.com/gh4ck3r's comment? Thanks!
— Reply to this email directly, view it on GitHub https://github.com/yhirose/cpp-httplib/issues/1388#issuecomment-1241919836, or unsubscribe https://github.com/notifications/unsubscribe-auth/AANJLASSYH2L3HFOHZ7C7UTV5MUZ7ANCNFSM6AAAAAAQHAZE6A . You are receiving this because you were mentioned.Message ID: @.***>
Btw I'm using 0.10.2
on Linux. Maybe I should update to 0.11.1
first?
@vrince, I just released the new version 0.11.2 which includes the @gh4ck3r's change for better SSE support.
Ok good to know so I am making my best to have something as unit as possible that we can run long enough to see something hanging with 0.10.2 and hopefully it will be a good enough use case to see that 0.11.2 is stronger.
Ok tried my best to do the simplest example possible (100Hz trivial event dispatcher).
I am even more confused now. I was not able to replicate the "hang" with either 0.10.2 or 0.11.2.
Running my example with a simple HTTP server, impossible to see any problem (response time is really tight ~250us over 24h+).
But using HTTPS (OpenSSL 1.1.1) both 0.10.2 and 0.11.2, server refuse to accept connection after one 1 hour or 2 (sometime sooner). So There is definitely something I am doing wrong ... When I say refuse the connection, I tried by simply opening the stream from firefox or chrome or from the little python client I created, and nothing ... I don't see the connected to stream ...
cout ;( and of course no data client side. The "send_event" while loop is still running though ... So not sure what to do. Any advice?
To reproduce easily I load my host the more I can while the servers are running.
Server code http_sse_main.cpp
:
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <ctime>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <sstream>
#include <thread>
#define CPPHTTPLIB_OPENSSL_SUPPORT
#include <httplib.h>
using namespace httplib;
using namespace std;
class EventDispatcher {
public:
EventDispatcher()
{
}
void wait_event(DataSink* sink)
{
unique_lock<mutex> lk(m_);
int id = id_;
cv_.wait(lk, [&] { return cid_ == id; });
if (sink->is_writable()) {
sink->write(message_.data(), message_.size());
}
}
void send_event(const string& message)
{
lock_guard<mutex> lk(m_);
cid_ = id_++;
message_ = message;
cv_.notify_all();
}
private:
mutex m_;
condition_variable cv_;
atomic_int id_{0};
atomic_int cid_{-1};
string message_;
};
#ifndef CPPHTTPLIB_VERSION
#define CPPHTTPLIB_VERSION "0.10.2"
#define DEFAULT_PORT 8010
#else
#define DEFAULT_PORT 8011
#endif
template <typename Clock, typename Duration>
std::ostream& operator<<(std::ostream& stream,
const std::chrono::time_point<Clock, Duration>& time_point)
{
const time_t time = Clock::to_time_t(time_point);
struct tm tm;
localtime_r(&time, &tm);
return stream << std::put_time(&tm, "%c");
}
int main(int argc, char* argv[])
{
int port = argc == 2 ? stoi(argv[1]) : DEFAULT_PORT;
cout << "version(" << string(CPPHTTPLIB_VERSION) << ") url(https://localhost:" << port << "/stream)" << std::endl;
EventDispatcher ed;
SSLServer svr("cert.pem", "key.pem");
ofstream file(string(CPPHTTPLIB_VERSION) + ".csv");
svr.Get("/stream", [&](const Request& /*req*/, Response& res) {
cout << std::chrono::system_clock::now() << " connected to stream ..." << endl;
res.set_chunked_content_provider(
"text/event-stream",
[&](size_t /*offset*/, DataSink& sink) {
ed.wait_event(&sink);
return true;
},
[&](bool /*success*/) {
cout << std::chrono::system_clock::now() << " release stream" << endl;
return;
});
});
thread t([&] {
int id = 0;
auto last = chrono::steady_clock::now();
int64_t max_duration = -1;
double last_count = 0;
while (true) {
this_thread::sleep_for(chrono::milliseconds(10));
std::stringstream ss;
ss << "data: " << id << "\n\n";
auto start = chrono::steady_clock::now();
ed.send_event(ss.str());
max_duration = std::max(max_duration, chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - start).count());
if (chrono::duration_cast<chrono::seconds>(chrono::steady_clock::now() - last).count() >= 60) {
cout << std::chrono::system_clock::now()
<< " last(" << id << ")"
<< " rate(" << last_count / 60. << ")[events/s]"
<< " max duration(" << max_duration << ")[μs]" << endl;
file << std::chrono::system_clock::now() << "," << id << "," << last_count << "," << max_duration << endl;
last = chrono::steady_clock::now();
max_duration = -1;
last_count = 0;
}
last_count++;
id++;
}
});
svr.listen("localhost", port);
}
CMakeLists.txt
to build with both 0.10.2 and 0.11.2
cmake_minimum_required(VERSION 3.16)
set(VERSION 0.0.0)
project(http_sse VERSION ${VERSION} LANGUAGES C CXX)
find_package(OpenSSL)
include(FetchContent)
FetchContent_Declare(httplib_0_10_2 GIT_REPOSITORY https://github.com/yhirose/cpp-httplib.git GIT_TAG v0.10.2 GIT_SHALLOW TRUE GIT_PROGRESS TRUE)
FetchContent_Populate(httplib_0_10_2)
message(${httplib_0_10_2_SOURCE_DIR})
FetchContent_Declare(httplib_0_11_2 GIT_REPOSITORY https://github.com/yhirose/cpp-httplib.git GIT_TAG v0.11.2 GIT_SHALLOW TRUE GIT_PROGRESS TRUE)
FetchContent_Populate(httplib_0_11_2)
message(${httplib_0_11_2_SOURCE_DIR})
add_executable(http_sse_0_10_2 http_sse_main.cpp)
target_link_libraries(http_sse_0_10_2 PRIVATE pthread OpenSSL::SSL)
target_include_directories(http_sse_0_10_2 PRIVATE ${httplib_0_10_2_SOURCE_DIR})
add_executable(http_sse_0_11_2 http_sse_main.cpp)
target_link_libraries(http_sse_0_11_2 PRIVATE pthread OpenSSL::SSL)
target_include_directories(http_sse_0_11_2 PRIVATE ${httplib_0_11_2_SOURCE_DIR})
Client code (with a little bit of waiting to see if client-side hanging has an effect):
import time
import requests
import urllib3
import threading
import click
from random import random
count = 0
last = -1
def print_count():
threading.Timer(60.0, print_count).start()
print("events received", count, "last", last)
print_count()
@click.command(name="sse-client")
@click.option('--url', default='https://localhost:8010/stream')
@click.option('--timeout', default=10)
def cli(url, timeout):
global count, last
while True:
try:
print('Create stream')
stream = requests.get(url=url, stream=True, timeout=timeout, verify=False)
print('Streaming... Press CTRL+C to terminate.')
for line in stream.iter_lines(decode_unicode=True):
if line.startswith("data:"):
payload = line.split("data: ")[1]
last = int(payload)
count += 1
if random() > 0.9999:
pause = random()*5
print('sleeping', pause, 'seconds')
time.sleep(pause)
except KeyboardInterrupt:
break
except requests.exceptions.ConnectionError:
print('Connection lost. Reconnecting...')
time.sleep(1)
except urllib3.exceptions.TimeoutError:
print('Connection timeout. Reconnecting...')
time.sleep(1)
except urllib3.exceptions.InvalidChunkLength:
print('InvalidChunkLength. Reconnecting...')
time.sleep(1)
except Exception as e:
break
if __name__ == '__main__':
cli()
@vrince, did you check TCP state of server socket? Probably, all server sockets are in "CLOSE-WAIT" state when "refuse connection" occurs. I got following after running your snippets.
$ ss -t4np | grep :8011
CLOSE-WAIT 1 0 127.0.0.1:8011 127.0.0.1:53066 users:(("http_sse_0_11_2",pid=176899,fd=7))
CLOSE-WAIT 518 0 127.0.0.1:8011 127.0.0.1:60962 users:(("http_sse_0_11_2",pid=176899,fd=17))
CLOSE-WAIT 1 0 127.0.0.1:8011 127.0.0.1:53058 users:(("http_sse_0_11_2",pid=176899,fd=6))
CLOSE-WAIT 518 0 127.0.0.1:8011 127.0.0.1:60946 users:(("http_sse_0_11_2",pid=176899,fd=16))
CLOSE-WAIT 1 0 127.0.0.1:8011 127.0.0.1:60938 users:(("http_sse_0_11_2",pid=176899,fd=15))
CLOSE-WAIT 1 0 127.0.0.1:8011 127.0.0.1:60926 users:(("http_sse_0_11_2",pid=176899,fd=13))
CLOSE-WAIT 1 0 127.0.0.1:8011 127.0.0.1:60918 users:(("http_sse_0_11_2",pid=176899,fd=12))
CLOSE-WAIT 1 0 127.0.0.1:8011 127.0.0.1:53076 users:(("http_sse_0_11_2",pid=176899,fd=9))
CLOSE-WAIT 1 0 127.0.0.1:8011 127.0.0.1:53044 users:(("http_sse_0_11_2",pid=176899,fd=5))
CLOSE-WAIT 1 0 127.0.0.1:8011 127.0.0.1:60904 users:(("http_sse_0_11_2",pid=176899,fd=11))
CLOSE-WAIT 1 0 127.0.0.1:8011 127.0.0.1:53070 users:(("http_sse_0_11_2",pid=176899,fd=8))
CLOSE-WAIT 1 0 127.0.0.1:8011 127.0.0.1:60900 users:(("http_sse_0_11_2",pid=176899,fd=10))
CLOSE-WAIT 1 0 127.0.0.1:8011 127.0.0.1:60934 users:(("http_sse_0_11_2",pid=176899,fd=14))
IMO, it seems possible that server thread are waiting next "event" in wait_event()
even after client disconnection. FWIW, my own implementation of SSE Server has similar skeleton with yours but I used cv_.wait_for()
over cv_.wait()
so that I can check availability of connection periodically via sink->is_writable()
that is patched in my last PR.
Hi @gh4ck3r No I did not check, does it make sense if I do? Not sure to understand then, is this intended behavior and my snippet is wrong? (sorry I really just a "user" of those things).
Considering your "last" PR can you add a reference? Are we taking the one already merge in master or is this a new one I can test?
Thanks you for taking the time to help!
@gh4ck3r also did you notice that I get different behavior using HTTP vs HTTPS? Using HTTP I was absolutely not able to "break" anything even after a couple of millions of events over many days @ 100Hz. HTTPS on the other side produces the "refuse connection" issue pretty easily (even in the first hour of run if I load my host).
@vrince, you can check my PR #1373 which is merged already. So, your test is already on it.
Regarding to HTTP/HTTPS, yes, I aware of it. I guess, TLS could make the difference of behavior in connection as it also works on TCP before HTTP. As I'm not an expert on TLS, I'm just guessing it. But, I'm telling you, server sockets in CLOSE-WAIT
state should be closed by its owning thread. Otherwise, closed sockets and its owning threads will consume entire threadpool so that the server can't accept any more client.
So, I think it's worth to check the states of server side sockets when "refuse connection" happens.
Hi guys, though my question is not directly tied to the one in this thread but since it's still on the same page with the sse examples, I hope that putting mine here is approriate.
Following the provided examples (keeps all the ne, I have successfully connect to the server event from a client. But from the server, when I call the send_event method, it does not show any error but the client does not get any update from their side.
void send_event(const string &message)
{
*env << message.data() << "\n";
lock_guard<mutex> lk(m_);
cid_ = id_++;
message_ = message;
cv_.notify_all();
}
This is the simple client part
const ev1 = new EventSource("http://192.168.120.129:1234/stream_end_event");
ev1.onmessage = function (e) {
console.log('ev1', e.data);
}
ev1.onopen = function (e) {
console.log('Event Src Open');
}
ev1.onmessage = function (e) {
console.log('Event src err');
}
any help would be much appreciated.
Hi! Can you show how you build your message? I have experience the same frustration at some point. The string you send as message need to be properly formatted (line need to start with data: and more importantly en up with two \n ...). Especially if you try to see your event stream in a web browser, message format matter a lot.
@vrince Thank you sir. That was indeed the matter, I just have added the prefix ('data: ') and postfix ('\n\n') to messages and they started working as they should.
Best Regards.
@vrince can I close this issue? If not, what is the current status?
Sure !
Le ven. 10 mars 2023, 11 h 27 p.m., yhirose @.***> a écrit :
@vrince https://github.com/vrince can I close this issue? If not, what is the current status?
— Reply to this email directly, view it on GitHub https://github.com/yhirose/cpp-httplib/issues/1388#issuecomment-1464822107, or unsubscribe https://github.com/notifications/unsubscribe-auth/AANJLARKXBP4IX5C6MAZ2MDW3P5JJANCNFSM6AAAAAAQHAZE6A . You are receiving this because you were mentioned.Message ID: @.***>