libzmq icon indicating copy to clipboard operation
libzmq copied to clipboard

Filedescriptor leak with ROUTER socket and HANDOVER (possible race condition in pipes)

Open mrvn opened this issue 2 years ago • 0 comments

Issue description

When a ROUTER socket has the HANDOVER socket option set then clients connecting with the same ID as a previous client will replace that client.

Problem is the socket connection to the old client does not get closed until the router socket is closed. Even the client closing their side of the socket gets ignored. A server with a ROUTER socket collects a lot of sockets that way until it reaches the FD limit in the kernel (1000 per default).

This is a revival of #3238 and #3450.

I believe I found a data race in the pipe communications, see the traces at the end.

Environment

  • libzmq version : 4.2 - 4.3.4
  • OS: linux Debian stable / Ubuntu Focal, Jammy

Minimal test code / Steps to reproduce the issue

  1. tests/test_router_handover.cpp exhibits this problem, improved testcase:
void test_with_handover ()
{
    const int timeout = SETTLE_TIME;
    char my_endpoint[MAX_SOCKET_STRING];
    void *router = test_context_socket (ZMQ_ROUTER);
    bind_loopback_ipv4 (router, my_endpoint, sizeof my_endpoint);

    // Enable the handover flag
    int handover = 1;
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (router, ZMQ_ROUTER_HANDOVER,
                                               &handover, sizeof (handover)));

    //  Create dealer called "X" and connect it to our router, add monitoring
    void *dealer_one = test_context_socket (ZMQ_DEALER);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_socket_monitor (dealer_one, "inproc://monitor-dealer_one", ZMQ_EVENT_ALL));
    void *mon_one = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_connect (mon_one, "inproc://monitor-dealer_one"));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (mon_one, ZMQ_RCVTIMEO, &timeout, sizeof timeout));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dealer_one, ZMQ_ROUTING_ID, "X", 1));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer_one, my_endpoint));

    //  Get message from dealer to know when connection is ready
    char buffer[255];
    send_string_expect_success (dealer_one, "Hello", 0);

    recv_string_expect_success (router, "X", 0);
    recv_string_expect_success (router, "Hello", 0);

    // Now create a second dealer that uses the same routing id
    void *dealer_two = test_context_socket (ZMQ_DEALER);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dealer_two, ZMQ_ROUTING_ID, "X", 1));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer_two, my_endpoint));

    //  Get message from dealer to know when connection is ready
    send_string_expect_success (dealer_two, "Hello", 0);

    recv_string_expect_success (router, "X", 0);
    recv_string_expect_success (router, "Hello", 0);

    //  Send a message to 'X' routing id. This should be delivered
    //  to the second dealer, instead of the first because of the handover.
    send_string_expect_success (router, "X", ZMQ_SNDMORE);
    send_string_expect_success (router, "Hello", 0);

    //  Ensure that the first dealer doesn't receive the message
    //  but the second one does
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dealer_one, ZMQ_RCVTIMEO, &timeout, sizeof timeout));
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (dealer_one, buffer, 255, 0));

    recv_string_expect_success (dealer_two, "Hello", 0);

    //  Now collect and check events from dealer_one socket
    fprintf(stderr, "let everything settle\n");
    msleep(10000); // let everything settle
    fprintf(stderr, "waited long enough I hope\n");
    int event = get_monitor_event_with_timeout (mon_one, NULL, NULL, SETTLE_TIME);
    if (event == ZMQ_EVENT_CONNECT_DELAYED)
        event = get_monitor_event_with_timeout (mon_one, NULL, NULL, SETTLE_TIME);
    TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_CONNECTED, event);
    expect_monitor_event_with_timeout (mon_one, ZMQ_EVENT_HANDSHAKE_SUCCEEDED, SETTLE_TIME);
    fprintf(stderr, "awaiting disconnected event\n");
    expect_monitor_event_with_timeout (mon_one, ZMQ_EVENT_DISCONNECTED, SETTLE_TIME);
    //int extra = print_events(mon_one, SETTLE_TIME, 100);
    //TEST_ASSERT_EQUAL_INT (0, extra);

    test_context_socket_close_zero_linger (router);
    test_context_socket_close_zero_linger (dealer_one);
    test_context_socket_close_zero_linger (dealer_two);
}

What's the actual result? (include assertion message & call stack if applicable)

The peers socket never gets closed, no DISCONNECTED message is shown on the monitor.

What's the expected result?

The peers socket should get closed and the peer should get a DISCONNECTED message on the monitor.

=================================================================================

I've added a lot of printf debugging to the code now I seem to have detected a data race in the code. In pipe.cpp I have (unaltered code):

    void zmq::pipe_t::flush ()
    {
        //fprintf(stderr, "%s[%p, tid = %u, peer = %p]()\n", __PRETTY_FUNCTION__, (void*)this, get_tid(), (void*)_peer);
        //  The peer does not exist anymore at this point.
        if (_state == term_ack_sent) {
            return;
        }

        if (_out_pipe && !_out_pipe->flush ()) {
            send_activate_read (_peer);
        }
    }

With that code I get the following trace: (the crucial pipe is 0x7f8cbc001000)

old_pipe->terminate (true);
void zmq::pipe_t::terminate(bool)[0x7f8cbc018680, tid = 3, peer = 0x7f8cbc001000](delay = true)
void zmq::pipe_t::terminate(bool)[0x7f8cbc018680, tid = 3, peer = 0x7f8cbc001000]() : active
void zmq::pipe_t::terminate(bool)[0x7f8cbc018680, tid = 3, peer = 0x7f8cbc001000]() : write delimiter
int zmq::socket_base_t::process_commands(int, bool)[0x55f6a9996a70]() : out of commands
int zmq::socket_base_t::process_commands(int, bool)[0x55f6a9996a70]()
int zmq::socket_base_t::process_commands(int, bool)[0x55f6a9996a70]() : out of commands
int zmq::socket_base_t::process_commands(int, bool)[0x55f6a9996a70]()
int zmq::socket_base_t::process_commands(int, bool)[0x55f6a9997e80]()
void zmq::object_t::process_command(const zmq::command_t&)[0x7f8cbc0190d0]() : activate_write
void zmq::object_t::process_command(const zmq::command_t&)[0x7f8cbc001000]() : pipe_term
virtual void zmq::pipe_t::process_pipe_term()[0x7f8cbc001000, tid = 2, peer = 0x7f8cbc018680]()
virtual void zmq::pipe_t::process_pipe_term()[0x7f8cbc001000, tid = 2, peer = 0x7f8cbc018680]() : active
virtual void zmq::pipe_t::process_pipe_term()[0x7f8cbc001000, tid = 2, peer = 0x7f8cbc018680]() : _state = active -> waiting_for_delimiter
void zmq::object_t::process_command(const zmq::command_t&)[0x7f8cbc001000]() : activate_read
virtual void zmq::pipe_t::process_activate_read()[0x7f8cbc001000, tid = 2, peer = 0x7f8cbc018680]()
virtual void zmq::pipe_t::process_activate_read()[0x7f8cbc001000, tid = 2, peer = 0x7f8cbc018680]() : activating sink
void zmq::pipe_t::process_delimiter()[0x7f8cbc001000, tid = 2, peer = 0x7f8cbc018680]()
void zmq::pipe_t::process_delimiter()[0x7f8cbc001000, tid = 2, peer = 0x7f8cbc018680]() : state = waiting_for_delimiter -> term_ack_sent; send_pipe_term_ack(peer)
void zmq::object_t::process_command(const zmq::command_t&)[0x7f8cbc0190d0]() : activate_read
virtual void zmq::pipe_t::process_activate_read()[0x7f8cbc0190d0, tid = 2, peer = 0x7f8cbc019220]()
virtual void zmq::pipe_t::process_activate_read()[0x7f8cbc0190d0, tid = 2, peer = 0x7f8cbc019220]() : activating sink
int zmq::socket_base_t::process_commands(int, bool)[0x55f6a9997e80]() : out of commands
let everything settle

pipe_t 0x7f8cbc001000 sends a term_ack to 0x7f8cbc018680 but that pipe never reacts.

If I uncomment that "fprintf" and I get:

old_pipe->terminate (true);
void zmq::pipe_t::terminate(bool)[0x7f6ce0018680, tid = 3, peer = 0x7f6ce0001000](delay = true)
void zmq::pipe_t::terminate(bool)[0x7f6ce0018680, tid = 3, peer = 0x7f6ce0001000]() : active
void zmq::pipe_t::terminate(bool)[0x7f6ce0018680, tid = 3, peer = 0x7f6ce0001000]() : write delimiter
void zmq::pipe_t::flush()[0x55da6898c5c0, tid = 2, peer = 0x55da6898c470]()
void zmq::pipe_t::flush()[0x7f6ce0018680, tid = 3, peer = 0x7f6ce0001000]()
int zmq::socket_base_t::process_commands(int, bool)[0x55da6896da70]() : out of commands
int zmq::socket_base_t::process_commands(int, bool)[0x55da6896da70]()
void zmq::pipe_t::flush()[0x7f6ce00190d0, tid = 2, peer = 0x7f6ce0019220]()
void zmq::object_t::process_command(const zmq::command_t&)[0x7f6ce00190d0]() : activate_write
void zmq::object_t::process_command(const zmq::command_t&)[0x7f6ce0001000]() : pipe_term
virtual void zmq::pipe_t::process_pipe_term()[0x7f6ce0001000, tid = 2, peer = 0x7f6ce0018680]()
virtual void zmq::pipe_t::process_pipe_term()[0x7f6ce0001000, tid = 2, peer = 0x7f6ce0018680]() : active
virtual void zmq::pipe_t::process_pipe_term()[0x7f6ce0001000, tid = 2, peer = 0x7f6ce0018680]() : _state = active -> waiting_for_delimiter
void zmq::object_t::process_command(const zmq::command_t&)[0x7f6ce0001000]() : activate_read
virtual void zmq::pipe_t::process_activate_read()[0x7f6ce0001000, tid = 2, peer = 0x7f6ce0018680]()
virtual void zmq::pipe_t::process_activate_read()[0x7f6ce0001000, tid = 2, peer = 0x7f6ce0018680]() : activating sink
void zmq::pipe_t::process_delimiter()[0x7f6ce0001000, tid = 2, peer = 0x7f6ce0018680]()
void zmq::pipe_t::process_delimiter()[0x7f6ce0001000, tid = 2, peer = 0x7f6ce0018680]() : state = waiting_for_delimiter -> term_ack_sent; send_pipe_term_ack(peer)
int zmq::socket_base_t::process_commands(int, bool)[0x55da6896da70]() : got command
void zmq::object_t::process_command(const zmq::command_t&)[0x7f6ce0019220]() : activate_read
virtual void zmq::pipe_t::process_activate_read()[0x7f6ce0019220, tid = 3, peer = 0x7f6ce00190d0]()
virtual void zmq::pipe_t::process_activate_read()[0x7f6ce0019220, tid = 3, peer = 0x7f6ce00190d0]() : activating sink
int zmq::socket_base_t::process_commands(int, bool)[0x55da6896da70]() : got command
void zmq::object_t::process_command(const zmq::command_t&)[0x7f6ce0018680]() : pipe_term_ack
virtual void zmq::pipe_t::process_pipe_term_ack()[0x7f6ce0018680, tid = 3, peer = 0x7f6ce0001000]()
virtual zmq::pipe_t::~pipe_t()[0x7f6ce0018680, tid = 3, peer = 0x7f6ce0001000]()
void zmq::object_t::process_command(const zmq::command_t&)[0x7f6ce0001000]() : pipe_term_ack
virtual void zmq::pipe_t::process_pipe_term_ack()[0x7f6ce0001000, tid = 2, peer = 0x7f6ce0018680]()
session_base_t::pipe_terminated: _pipe = 0x7f6ce0001000, _pending = false
virtual zmq::pipe_t::~pipe_t()[0x7f6ce0001000, tid = 2, peer = 0x7f6ce0018680]()

Now 0x7f6ce0018680 reacts and both pipes get deleted just fine.

This seems to indicate there is some data race and delaying the code with a few fprintf changes what the 0x7f6ce0018680 pipe sees in their mailbox. Without the fprintf delaying it it goes back to sleep and never wakes up when the term_ack command comes in from pipe 0x7f6ce0001000. Or the command gets lost?

Even with the fprintf delay and the pipes getting destroyed the monitor socket still doesn't get a DISCONNECETED message for the obsolete client.

mrvn avatar Jul 17 '22 23:07 mrvn