cppzmq
cppzmq copied to clipboard
Poll monitoring sockets
I would like to be able to use zmq::poll()
for normal ZMQ sockets and monitoring ZMQ sockets, to handle them all within a single thread. However this isn't currently possible with the zmq::monitor_t
class due to the blocking nature of the zmq::monitor_t::monitor()
method and the lack of an accessor for the socket that is monitoring socketPtr
.
Could we augment the zmq::monitor_t
class with the functionality to allow polling? Perhaps something like the following would work (apologies in advance for syntax)...
class monitor_t {
// ...
inline operator void* () ZMQ_NOTHROW
{
return monitorPtr;
}
inline operator void const* () const ZMQ_NOTHROW
{
return monitorPtr;
}
void monitor_start(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
{
int rc = zmq_socket_monitor(socket.ptr, addr_, events);
if (rc != 0)
throw error_t ();
socketPtr = socket.ptr;
monitorPtr = zmq_socket (socket.ctxptr, ZMQ_PAIR);
assert (monitorPtr);
rc = zmq_connect (monitorPtr, addr_);
assert (rc == 0);
on_monitor_started();
}
bool monitor_poll()
{
zmq_msg_t eventMsg;
zmq_msg_init (&eventMsg);
int rc = zmq_msg_recv (&eventMsg, s, 0);
if (rc == -1 && zmq_errno() == ETERM)
return false;
assert (rc != -1);
// ...
}
void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
{
monitor_start(socket, addr_, events);
bool keep_looping = monitor_poll();
while (keep_looping) {
keep_looping = monitor_poll();
}
}
// ...
private:
void* socketPtr;
void* monitorPtr;
};
This would allow a usage similar to the following...
void Worker(zmq::socket_t* raw_zmq_data_sock)
{
std::unique_ptr<zmq::socket_t> zmq_data_sock(raw_zmq_data_sock);
std::unique_ptr<zmq::monitor_t> zmq_monitor_sock(new zmq::monitor_t);
zmq_monitor_sock->start_monitor(*zmq_data_sock, "inproc://abc");
std::vector<zmq::pollitem_t> poll_items = {
{ (void*)(*zmq_data_sock), 0, ZMQ_POLLIN | ZMQ_POLLERR, 0, },
{ (void*)(*zmq_monitor_sock), 0, ZMQ_POLLIN | ZMQ_POLLERR, 0, },
};
while (true) {
poll_items[0].revents = 0;
poll_items[1].revents = 0;
items = zmq::poll(poll_items);
// ...
if (poll_items[1].revents) {
zmq_monitor_sock->monitor_poll();
}
}
// ...
}
I can put a pull request together if it sounds like a reasonable idea.
I have just split the blocking function without breaking the existing interface. https://github.com/zeromq/cppzmq/commit/b8385630bc535c73b8fc8faea14bd9f80c1fbed5 now you can use monitor_t non blocking in the same thread as the monitored, or any other, socket.
the socket is currently not public, but I have a similar use case like you describe and might adopt it to be able to poll on the monitor and other socket in one poll item. But before I do this I would like to measure if this has a performance advantage over having 2 poll items.
Implicit conversion, like you suggest, is imho not like it should be in C++ since C++ should be type safe and strong typed. also, I do not think that the sockets currently stored as void pointers it the best idea, so I think they should be typed.
Hey,
I'm also missing this feature. Why not making monitor_t inherit from socket_t?
IMHO, this would be a better RAII design (see #381), and force the user to configure the monitor in the constructor instead of using init() method.
I'm also missing this feature. Why not making monitor_t inherit from socket_t?
I think it is not necessary, it would be sufficient if a method were added to monitor_t
that provides accessing to its _monitor_socket
, IIUC. Composition should generally be preferred over inheritance.
IMHO, this would be a better RAII design (see #381), and force the user to configure the monitor in the constructor instead of using init() method.
I agree that the init
method is not nice, however bear in mind that any changes need to be backwards-compatible. It is conceivable to add a new class under a different name that is designed without an ìnitmethod and declare
monitor_t` deprecated.
I added a Pull Request (#612) to expose the monitor socket for using it with an active poller from the hppaddon.cpp the check_event() method was split to prevent polling for the same event twice. The api stays fully backward compatible.