cpp-httplib icon indicating copy to clipboard operation
cpp-httplib copied to clipboard

SSE (server-side-event) example can hang the sent_event method

Open vrince opened this issue 2 years ago • 13 comments

Hi guys,

I have been using the SSE for a while now exactly as describe in the example/ssesvr.cc file.

I noticed that my main thread hung / stale sporadically, after a long and painful investigation, it looks like the send_event() method, can hang for some reason.

My use case is: I always have a least one listener waiting for an event, and I send about 1 event per second. I can easily see a couple of successive 5s hang here and there in a 24h time period (meaning the send_event() method take 5s ish to get back.

Not familiar with wait conditions etc, but do you guys know what could be done to avoid any locking like that? Not even sure what the root cause can be. Can the sink->write be super long on network failure and lock the entire thing ? Any help is appreciated!

Here is a reminder of the SSE implementation :

class EventDispatcher {
public:
  EventDispatcher() {
  }

  void wait_event(DataSink *sink) {
    unique_lock<mutex> lk(m_);
    int id = id_;
    cv_.wait(lk, [&] { return cid_ == id; });
    if (sink->is_writable()) { sink->write(message_.data(), message_.size()); }
  }

  void send_event(const string &message) {
    lock_guard<mutex> lk(m_);
    cid_ = id_++;
    message_ = message;
    cv_.notify_all();
  }

private:
  mutex m_;
  condition_variable cv_;
  atomic_int id_{0};
  atomic_int cid_{-1};
  string message_;
};

vrince avatar Sep 07 '22 18:09 vrince

@gh4ck3r, do you have any comment of this issue?

yhirose avatar Sep 07 '22 20:09 yhirose

The case described is different from mine. I didn't experience such a stale or hung for sending message. @vrince, you may need to offer more information. Because, I couldn't find problem only with the code above. As I saw/tested it several times too, I know that the implementation is not perfect but it's good enough for starting point.

Only point send_event could be hold is first line where lock_guard<mutex> lk(m_) is written. To make it happen, wait_event() must owns/locks the mutex m_ in other thread. If sink->write(...) takes some time, it could keep the lock longer because the mutex will be unlocked when returning from the function.

So, IMO, it could be about message and underlying socket. For example, it multiple messages fill up socket's send buffer, it could be happen I guess.

gh4ck3r avatar Sep 07 '22 23:09 gh4ck3r

@vrince would you make/share your case in unittest? I can run it more than 24 hours on my environment to check the reproducibility next week.

gh4ck3r avatar Sep 07 '22 23:09 gh4ck3r

@gh4ck3r, thanks for the comment. @vrince, could you respond to @gh4ck3r's comment? Thanks!

yhirose avatar Sep 09 '22 12:09 yhirose

Thanks for the interest you guys put into this issue! To be honest to narrow it down to the sse part in our project was already really long. I'll try to make a simple (no other deps) example that reproduce the behaviour.

Le ven. 9 sept. 2022, 2 h 31 p.m., yhirose @.***> a écrit :

@gh4ck3r https://github.com/gh4ck3r, thanks for the comment. @vrince https://github.com/vrince, could you respond to @gh4ck3r https://github.com/gh4ck3r's comment? Thanks!

— Reply to this email directly, view it on GitHub https://github.com/yhirose/cpp-httplib/issues/1388#issuecomment-1241919836, or unsubscribe https://github.com/notifications/unsubscribe-auth/AANJLASSYH2L3HFOHZ7C7UTV5MUZ7ANCNFSM6AAAAAAQHAZE6A . You are receiving this because you were mentioned.Message ID: @.***>

vrince avatar Sep 11 '22 20:09 vrince

Btw I'm using 0.10.2 on Linux. Maybe I should update to 0.11.1 first?

vrince avatar Sep 12 '22 15:09 vrince

@vrince, I just released the new version 0.11.2 which includes the @gh4ck3r's change for better SSE support.

yhirose avatar Sep 12 '22 16:09 yhirose

Ok good to know so I am making my best to have something as unit as possible that we can run long enough to see something hanging with 0.10.2 and hopefully it will be a good enough use case to see that 0.11.2 is stronger.

vrince avatar Sep 12 '22 20:09 vrince

Ok tried my best to do the simplest example possible (100Hz trivial event dispatcher).

I am even more confused now. I was not able to replicate the "hang" with either 0.10.2 or 0.11.2.

Running my example with a simple HTTP server, impossible to see any problem (response time is really tight ~250us over 24h+).

But using HTTPS (OpenSSL 1.1.1) both 0.10.2 and 0.11.2, server refuse to accept connection after one 1 hour or 2 (sometime sooner). So There is definitely something I am doing wrong ... When I say refuse the connection, I tried by simply opening the stream from firefox or chrome or from the little python client I created, and nothing ... I don't see the connected to stream ... cout ;( and of course no data client side. The "send_event" while loop is still running though ... So not sure what to do. Any advice?

To reproduce easily I load my host the more I can while the servers are running.

Server code http_sse_main.cpp :

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <ctime>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <sstream>
#include <thread>

#define CPPHTTPLIB_OPENSSL_SUPPORT

#include <httplib.h>

using namespace httplib;
using namespace std;

class EventDispatcher {
public:
    EventDispatcher()
    {
    }

    void wait_event(DataSink* sink)
    {
        unique_lock<mutex> lk(m_);
        int id = id_;
        cv_.wait(lk, [&] { return cid_ == id; });
        if (sink->is_writable()) {
            sink->write(message_.data(), message_.size());
        }
    }

    void send_event(const string& message)
    {
        lock_guard<mutex> lk(m_);
        cid_ = id_++;
        message_ = message;
        cv_.notify_all();
    }

private:
    mutex m_;
    condition_variable cv_;
    atomic_int id_{0};
    atomic_int cid_{-1};
    string message_;
};

#ifndef CPPHTTPLIB_VERSION
#define CPPHTTPLIB_VERSION "0.10.2"
#define DEFAULT_PORT 8010
#else
#define DEFAULT_PORT 8011
#endif

template <typename Clock, typename Duration>
std::ostream& operator<<(std::ostream& stream,
                         const std::chrono::time_point<Clock, Duration>& time_point)
{
    const time_t time = Clock::to_time_t(time_point);
    struct tm tm;
    localtime_r(&time, &tm);
    return stream << std::put_time(&tm, "%c");
}

int main(int argc, char* argv[])
{
    int port = argc == 2 ? stoi(argv[1]) : DEFAULT_PORT;

    cout << "version(" << string(CPPHTTPLIB_VERSION) << ") url(https://localhost:" << port << "/stream)" << std::endl;

    EventDispatcher ed;
    SSLServer svr("cert.pem", "key.pem");

    ofstream file(string(CPPHTTPLIB_VERSION) + ".csv");

    svr.Get("/stream", [&](const Request& /*req*/, Response& res) {
        cout << std::chrono::system_clock::now() << " connected to stream ..." << endl;
        res.set_chunked_content_provider(
            "text/event-stream",
            [&](size_t /*offset*/, DataSink& sink) {
                ed.wait_event(&sink);
                return true;
            },
            [&](bool /*success*/) {
                cout << std::chrono::system_clock::now() << " release stream" << endl;
                return;
            });
    });

    thread t([&] {
        int id = 0;
        auto last = chrono::steady_clock::now();
        int64_t max_duration = -1;
        double last_count = 0;
        while (true) {
            this_thread::sleep_for(chrono::milliseconds(10));
            std::stringstream ss;
            ss << "data: " << id << "\n\n";
            auto start = chrono::steady_clock::now();
            ed.send_event(ss.str());
            max_duration = std::max(max_duration, chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - start).count());
            if (chrono::duration_cast<chrono::seconds>(chrono::steady_clock::now() - last).count() >= 60) {
                cout << std::chrono::system_clock::now()
                     << " last(" << id << ")"
                     << " rate(" << last_count / 60. << ")[events/s]"
                     << " max duration(" << max_duration << ")[μs]" << endl;
                file << std::chrono::system_clock::now() << "," << id << "," << last_count << "," << max_duration << endl;
                last = chrono::steady_clock::now();
                max_duration = -1;
                last_count = 0;
            }
            last_count++;
            id++;
        }
    });

    svr.listen("localhost", port);
}

CMakeLists.txt to build with both 0.10.2 and 0.11.2

cmake_minimum_required(VERSION 3.16)

set(VERSION 0.0.0)
project(http_sse VERSION ${VERSION} LANGUAGES C CXX)

find_package(OpenSSL)

include(FetchContent)

FetchContent_Declare(httplib_0_10_2 GIT_REPOSITORY https://github.com/yhirose/cpp-httplib.git GIT_TAG v0.10.2 GIT_SHALLOW TRUE GIT_PROGRESS TRUE)
FetchContent_Populate(httplib_0_10_2)

message(${httplib_0_10_2_SOURCE_DIR})

FetchContent_Declare(httplib_0_11_2 GIT_REPOSITORY https://github.com/yhirose/cpp-httplib.git GIT_TAG v0.11.2 GIT_SHALLOW TRUE GIT_PROGRESS TRUE)
FetchContent_Populate(httplib_0_11_2)

message(${httplib_0_11_2_SOURCE_DIR})

add_executable(http_sse_0_10_2 http_sse_main.cpp)
target_link_libraries(http_sse_0_10_2 PRIVATE pthread OpenSSL::SSL)
target_include_directories(http_sse_0_10_2 PRIVATE ${httplib_0_10_2_SOURCE_DIR})

add_executable(http_sse_0_11_2 http_sse_main.cpp)
target_link_libraries(http_sse_0_11_2 PRIVATE pthread OpenSSL::SSL)
target_include_directories(http_sse_0_11_2 PRIVATE ${httplib_0_11_2_SOURCE_DIR})

Client code (with a little bit of waiting to see if client-side hanging has an effect):

import time
import requests
import urllib3
import threading
import click
from random import random

count = 0
last = -1

def print_count():
  threading.Timer(60.0, print_count).start()
  print("events received", count, "last", last)

print_count()

@click.command(name="sse-client")
@click.option('--url', default='https://localhost:8010/stream')
@click.option('--timeout', default=10)
def cli(url, timeout):
    global count, last
    while True:
        try:
            print('Create stream')
            stream = requests.get(url=url, stream=True, timeout=timeout, verify=False)
            print('Streaming... Press CTRL+C to terminate.')
            for line in stream.iter_lines(decode_unicode=True):
                if line.startswith("data:"):
                    payload = line.split("data: ")[1]
                    last = int(payload)
                    count += 1
                if random() > 0.9999:
                    pause = random()*5
                    print('sleeping', pause, 'seconds')
                    time.sleep(pause)

        except KeyboardInterrupt:
            break

        except requests.exceptions.ConnectionError:
            print('Connection lost. Reconnecting...')
            time.sleep(1)

        except urllib3.exceptions.TimeoutError:
            print('Connection timeout. Reconnecting...')
            time.sleep(1)

        except urllib3.exceptions.InvalidChunkLength:
            print('InvalidChunkLength. Reconnecting...')
            time.sleep(1)

        except Exception as e:
            break


if __name__ == '__main__':
    cli()

vrince avatar Sep 15 '22 14:09 vrince

@vrince, did you check TCP state of server socket? Probably, all server sockets are in "CLOSE-WAIT" state when "refuse connection" occurs. I got following after running your snippets.

$ ss -t4np | grep :8011
CLOSE-WAIT   1        0              127.0.0.1:8011            127.0.0.1:53066   users:(("http_sse_0_11_2",pid=176899,fd=7))         
CLOSE-WAIT   518      0              127.0.0.1:8011            127.0.0.1:60962   users:(("http_sse_0_11_2",pid=176899,fd=17))        
CLOSE-WAIT   1        0              127.0.0.1:8011            127.0.0.1:53058   users:(("http_sse_0_11_2",pid=176899,fd=6))         
CLOSE-WAIT   518      0              127.0.0.1:8011            127.0.0.1:60946   users:(("http_sse_0_11_2",pid=176899,fd=16))        
CLOSE-WAIT   1        0              127.0.0.1:8011            127.0.0.1:60938   users:(("http_sse_0_11_2",pid=176899,fd=15))        
CLOSE-WAIT   1        0              127.0.0.1:8011            127.0.0.1:60926   users:(("http_sse_0_11_2",pid=176899,fd=13))        
CLOSE-WAIT   1        0              127.0.0.1:8011            127.0.0.1:60918   users:(("http_sse_0_11_2",pid=176899,fd=12))        
CLOSE-WAIT   1        0              127.0.0.1:8011            127.0.0.1:53076   users:(("http_sse_0_11_2",pid=176899,fd=9))         
CLOSE-WAIT   1        0              127.0.0.1:8011            127.0.0.1:53044   users:(("http_sse_0_11_2",pid=176899,fd=5))         
CLOSE-WAIT   1        0              127.0.0.1:8011            127.0.0.1:60904   users:(("http_sse_0_11_2",pid=176899,fd=11))        
CLOSE-WAIT   1        0              127.0.0.1:8011            127.0.0.1:53070   users:(("http_sse_0_11_2",pid=176899,fd=8))         
CLOSE-WAIT   1        0              127.0.0.1:8011            127.0.0.1:60900   users:(("http_sse_0_11_2",pid=176899,fd=10))        
CLOSE-WAIT   1        0              127.0.0.1:8011            127.0.0.1:60934   users:(("http_sse_0_11_2",pid=176899,fd=14)) 

IMO, it seems possible that server thread are waiting next "event" in wait_event() even after client disconnection. FWIW, my own implementation of SSE Server has similar skeleton with yours but I used cv_.wait_for() over cv_.wait() so that I can check availability of connection periodically via sink->is_writable() that is patched in my last PR.

gh4ck3r avatar Sep 22 '22 02:09 gh4ck3r

Hi @gh4ck3r No I did not check, does it make sense if I do? Not sure to understand then, is this intended behavior and my snippet is wrong? (sorry I really just a "user" of those things).

Considering your "last" PR can you add a reference? Are we taking the one already merge in master or is this a new one I can test?

Thanks you for taking the time to help!

vrince avatar Sep 30 '22 13:09 vrince

@gh4ck3r also did you notice that I get different behavior using HTTP vs HTTPS? Using HTTP I was absolutely not able to "break" anything even after a couple of millions of events over many days @ 100Hz. HTTPS on the other side produces the "refuse connection" issue pretty easily (even in the first hour of run if I load my host).

vrince avatar Sep 30 '22 13:09 vrince

@vrince, you can check my PR #1373 which is merged already. So, your test is already on it.

Regarding to HTTP/HTTPS, yes, I aware of it. I guess, TLS could make the difference of behavior in connection as it also works on TCP before HTTP. As I'm not an expert on TLS, I'm just guessing it. But, I'm telling you, server sockets in CLOSE-WAIT state should be closed by its owning thread. Otherwise, closed sockets and its owning threads will consume entire threadpool so that the server can't accept any more client.

So, I think it's worth to check the states of server side sockets when "refuse connection" happens.

gh4ck3r avatar Oct 04 '22 04:10 gh4ck3r

Hi guys, though my question is not directly tied to the one in this thread but since it's still on the same page with the sse examples, I hope that putting mine here is approriate.

Following the provided examples (keeps all the ne, I have successfully connect to the server event from a client. But from the server, when I call the send_event method, it does not show any error but the client does not get any update from their side.

void send_event(const string &message)
{
  *env << message.data() << "\n";
  lock_guard<mutex> lk(m_);
  cid_ = id_++;
  message_ = message;
  cv_.notify_all();
}

This is the simple client part

 const ev1 = new EventSource("http://192.168.120.129:1234/stream_end_event");
    ev1.onmessage = function (e) {
        console.log('ev1', e.data);
    }
    ev1.onopen = function (e) {
        console.log('Event Src Open');
    }
    ev1.onmessage = function (e) {
        console.log('Event src err');
    }

any help would be much appreciated.

hnguyen48206 avatar Feb 27 '23 10:02 hnguyen48206

Hi! Can you show how you build your message? I have experience the same frustration at some point. The string you send as message need to be properly formatted (line need to start with data: and more importantly en up with two \n ...). Especially if you try to see your event stream in a web browser, message format matter a lot.

vrince avatar Feb 27 '23 13:02 vrince

@vrince Thank you sir. That was indeed the matter, I just have added the prefix ('data: ') and postfix ('\n\n') to messages and they started working as they should.

Best Regards.

hnguyen48206 avatar Feb 28 '23 01:02 hnguyen48206

@vrince can I close this issue? If not, what is the current status?

yhirose avatar Mar 11 '23 04:03 yhirose

Sure !

Le ven. 10 mars 2023, 11 h 27 p.m., yhirose @.***> a écrit :

@vrince https://github.com/vrince can I close this issue? If not, what is the current status?

— Reply to this email directly, view it on GitHub https://github.com/yhirose/cpp-httplib/issues/1388#issuecomment-1464822107, or unsubscribe https://github.com/notifications/unsubscribe-auth/AANJLARKXBP4IX5C6MAZ2MDW3P5JJANCNFSM6AAAAAAQHAZE6A . You are receiving this because you were mentioned.Message ID: @.***>

vrince avatar Mar 11 '23 11:03 vrince