cppzmq icon indicating copy to clipboard operation
cppzmq copied to clipboard

socket monitor behavior

Open raymanfx opened this issue 4 years ago • 4 comments

I'm interested in knowing about peer disconnect events for ZMQ pair/pair and req/rep sockets. Consider the following example code:

#include <iostream>
#include <thread>

#include <zmq.hpp>

int main(int argc, char *argv[]) {
    zmq::context_t ctx(1);
    zmq::socket_t s1(ctx, zmq::socket_type::rep);
    zmq::socket_t s2(ctx, zmq::socket_type::req);
    zmq::monitor_t monitor;
    zmq::socket_t monitor_socket(ctx, zmq::socket_type::pair);
    bool main_loop = true;

    monitor.init(s1, "inproc://events");
    monitor_socket.connect("inproc://events");

    std::thread monitor_thread([&]() {
        // monitor the control socket
        while (main_loop) {
            zmq_event_t event;
            zmq::message_t event_msg;
            size_t event_msg_len;

            try {
                event_msg_len = monitor_socket.recv(event_msg).value_or(0);
                std::cout << "Event Message Size: " << event_msg_len << std::endl;
                if (event_msg_len != sizeof(event)) {
                    continue;
                }

                auto data = reinterpret_cast<std::byte*>(event_msg.data());
                std::memcpy(&event, data, event_msg_len);

                switch (event.event) {
                case ZMQ_EVENT_DISCONNECTED:
                    std::cout << "Event: Disconnected" << std::endl;
                    break;
                case ZMQ_EVENT_CLOSED:
                    std::cout << "Event: Closed" << std::endl;
                    break;
                }
            } catch (...) {
                // swallow
            }
        }
    });

    s1.bind("tcp://*:8088");
    s2.connect("tcp://localhost:8088");

    int a = 7;
    zmq::message_t msg(&a, sizeof(int));
    s2.send(msg);
    s1.recv(msg);
    a = *(reinterpret_cast<int*>(msg.data()));
    std::cout << "Received int: " << a << std::endl;

    s2.close();

    std::this_thread::sleep_for(std::chrono::milliseconds(1000));

    main_loop = false;
    if (monitor_thread.joinable()) {
        monitor_thread.join();
    }

    return 0;
}

I can't seem to get any events from on the s1 socket (which should be the "master", as in server). In fact, I am never seeing any events pop up at all.

I'm on Fedora 31 with the latest updates installed, cppzmq is at version: 4.5.0-1.fc31. What am I doing wrong here?

raymanfx avatar Feb 15 '20 23:02 raymanfx

Sorry there is no proper documentation or example for this.

I can give some basic hints, which hopefully are helpful to you: You don't need monitor_socket, which is doing what already zmq::monitor_t is doing. You need to subclass zmq::monitor_t and override to on_event_* methods and then run the monitor_thread simply like in https://github.com/zeromq/cppzmq/blob/master/tests/monitor.cpp#L128-L132

sigiesec avatar Feb 19 '20 11:02 sigiesec

I see. Unfortunately I needed a solution, so I went with a custom heartbeat monitoring for my (camera) streams which works well.

The first attempt modeled in the (simplified) example code I posted was actually inspired by the official C example: http://api.zeromq.org/4-1:zmq-socket-monitor.

From your answer I conclude it is not possible to just "do the same" with the C++ API? Like nstantiating the monitor and connecting from a PAIR socket with the inproc transport?

raymanfx avatar Feb 19 '20 15:02 raymanfx

You can do it. In that case do not use zmq::monitor_t, but call the C zmq_socket_monitor function, and then you can use a zmq::socket_t again to connect your monitor client.

sigiesec avatar Feb 21 '20 10:02 sigiesec

Is it possible to create and use a monitor socket on different threads?

ghost avatar Dec 13 '20 07:12 ghost