libzmq icon indicating copy to clipboard operation
libzmq copied to clipboard

inproc pub/sub must bind before connect with libuv

Open aeiouaoeiuv opened this issue 3 years ago • 2 comments

Issue description

inproc pub/sub must bind before connect was fixed mentioned by this comment since version 4.0. When I am trying to use inproc pub/sub with libuv, this bug comes out again.

Environment

  • libzmq version (commit hash if unreleased): 4.3.4
  • OS: ubuntu 22.04 amd64
  • zmq.hpp: 4.9.0
  • libuv: 1.42.0

Minimal test code / Steps to reproduce the issue

1.main.cpp

#include <chrono>
#include <functional>
#include <iostream>
#include <memory>
#include <thread>

#include "uv.h"
#include "zmq.hpp"

using namespace std::placeholders;

class Job {
public:
  void Run() {
    std::thread(&Job::ThreadPub, this).detach();             // pub thread runs first
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::thread(&Job::ThreadSub, this).detach();             // sub thread runs after

    while (true) {
      std::this_thread::sleep_for(std::chrono::seconds(100));
    }
  }

private:
  zmq::context_t ctx_;

  void ThreadPub() {
    std::cout << "pub thread start" << std::endl;
    zmq::socket_t sock(ctx_, zmq::socket_type::pub);
    sock.bind("inproc://what");

    while (true) {
      std::this_thread::sleep_for(std::chrono::seconds(1));
      zmq::const_buffer zmq_buf("topicabcd", 9);
      sock.send(zmq_buf, zmq::send_flags::none);
      std::cout << "pub data" << std::endl;
    }
  }

  static void PollCallback(uv_poll_t *handle, int status, int events) {
    auto sock = static_cast<zmq::socket_t *>(handle->data);
    char buf[256];
    while (true) {
      auto events = sock->get(zmq::sockopt::events);
      if (!(events & ZMQ_POLLIN)) {
        return;
      }

      zmq::mutable_buffer buf(&buf, sizeof(buf));
      auto recv_result = sock->recv(buf, zmq::recv_flags::none);

      std::cout << "sub msg length: " << recv_result.value().size << std::endl;
    }
  }

  void ThreadSub() {
    std::cout << "sub thread start" << std::endl;
    auto sock = std::make_shared<zmq::socket_t>(ctx_, zmq::socket_type::sub);

    sock->connect("inproc://what");
    std::string topic = "topic";
    sock->set(zmq::sockopt::subscribe, topic);
    int fd = sock->get(zmq::sockopt::fd);

    uv_loop_t loop;
    uv_loop_init(&loop);
    uv_poll_t poll_handle;
    poll_handle.data = sock.get();
    uv_poll_init(&loop, &poll_handle, fd);
    uv_poll_start(&poll_handle, UV_READABLE, PollCallback);
    uv_run(&loop, UV_RUN_DEFAULT);
  }
};

int main(int argc, char *argv[]) {
  Job job;
  job.Run();

  return 0;
}

test.tar.gz is a demo can quickly test this bug. libuv.so(v1.42.0) and libzmq.a(v4.3.4) are also added for compilation.

What's the actual result? (include assertion message & call stack if applicable)

the main.cpp above runs output like this:

# ./build/test
pub thread start
pub data
sub thread start
pub data
pub data
......

That means sub thread subscribe nothing. But, if we start ThreadSub() first before ThreadPub(), ThreadSub() started to subscribe data.

aeiouaoeiuv avatar Nov 21 '22 07:11 aeiouaoeiuv

I just hit this as well with the addition that I also have to postpone my first write from the publisher as the write is otherwise dropped.

kasperisager avatar Nov 29 '24 07:11 kasperisager

It looks like I was holding it wrong, at least in part: I was binding the publisher and connecting the subscriber instead of the other way around. The bind must still happen before the connect, however.

kasperisager avatar Nov 29 '24 12:11 kasperisager