inproc pub/sub must bind before connect with libuv
Issue description
inproc pub/sub must bind before connect was fixed mentioned by this comment since version 4.0. When I am trying to use inproc pub/sub with libuv, this bug comes out again.
Environment
- libzmq version (commit hash if unreleased): 4.3.4
- OS: ubuntu 22.04 amd64
- zmq.hpp: 4.9.0
- libuv: 1.42.0
Minimal test code / Steps to reproduce the issue
1.main.cpp
#include <chrono>
#include <functional>
#include <iostream>
#include <memory>
#include <thread>
#include "uv.h"
#include "zmq.hpp"
using namespace std::placeholders;
class Job {
public:
void Run() {
std::thread(&Job::ThreadPub, this).detach(); // pub thread runs first
std::this_thread::sleep_for(std::chrono::seconds(2));
std::thread(&Job::ThreadSub, this).detach(); // sub thread runs after
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(100));
}
}
private:
zmq::context_t ctx_;
void ThreadPub() {
std::cout << "pub thread start" << std::endl;
zmq::socket_t sock(ctx_, zmq::socket_type::pub);
sock.bind("inproc://what");
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
zmq::const_buffer zmq_buf("topicabcd", 9);
sock.send(zmq_buf, zmq::send_flags::none);
std::cout << "pub data" << std::endl;
}
}
static void PollCallback(uv_poll_t *handle, int status, int events) {
auto sock = static_cast<zmq::socket_t *>(handle->data);
char buf[256];
while (true) {
auto events = sock->get(zmq::sockopt::events);
if (!(events & ZMQ_POLLIN)) {
return;
}
zmq::mutable_buffer buf(&buf, sizeof(buf));
auto recv_result = sock->recv(buf, zmq::recv_flags::none);
std::cout << "sub msg length: " << recv_result.value().size << std::endl;
}
}
void ThreadSub() {
std::cout << "sub thread start" << std::endl;
auto sock = std::make_shared<zmq::socket_t>(ctx_, zmq::socket_type::sub);
sock->connect("inproc://what");
std::string topic = "topic";
sock->set(zmq::sockopt::subscribe, topic);
int fd = sock->get(zmq::sockopt::fd);
uv_loop_t loop;
uv_loop_init(&loop);
uv_poll_t poll_handle;
poll_handle.data = sock.get();
uv_poll_init(&loop, &poll_handle, fd);
uv_poll_start(&poll_handle, UV_READABLE, PollCallback);
uv_run(&loop, UV_RUN_DEFAULT);
}
};
int main(int argc, char *argv[]) {
Job job;
job.Run();
return 0;
}
test.tar.gz is a demo can quickly test this bug. libuv.so(v1.42.0) and libzmq.a(v4.3.4) are also added for compilation.
What's the actual result? (include assertion message & call stack if applicable)
the main.cpp above runs output like this:
# ./build/test
pub thread start
pub data
sub thread start
pub data
pub data
......
That means sub thread subscribe nothing. But, if we start ThreadSub() first before ThreadPub(), ThreadSub() started to subscribe data.
I just hit this as well with the addition that I also have to postpone my first write from the publisher as the write is otherwise dropped.
It looks like I was holding it wrong, at least in part: I was binding the publisher and connecting the subscriber instead of the other way around. The bind must still happen before the connect, however.