cppzmq icon indicating copy to clipboard operation
cppzmq copied to clipboard

Dropped first frame on failed send

Open diehard2 opened this issue 8 months ago • 3 comments

I'm experiencing an unusual issue. Under high load, I'm seeing the first frame of a message that isn't the routing id sometimes vanish. I'm doing a blocking synchronous client using zmq_immediate in conjunction with zmq_sndtimeo. So, if the message can't be sent it returns false after a few tries. Something like this

bool cppzmq_client(zmq::context_t* context, bool dowhile)
{

  const int recvtimeout = 4000;
  static std::atomic<int> id = 0;

  zmq::socket_t m_socket = {*context, ZMQ_DEALER};
  m_socket.set(zmq::sockopt::sndhwm, 0);
  m_socket.set(zmq::sockopt::routing_id, std::to_string(id++));
  m_socket.set(zmq::sockopt::immediate, 1);
  m_socket.set(zmq::sockopt::rcvtimeo, recvtimeout);

  zmq::multipart_t message;
  message.pushstr({});
  message.push_back(zmq::message_t{std::string{"stuff"}});
  message.push_back(zmq::message_t{std::string{"more stuff"}});
  message.push_back(zmq::message_t{std::string{"even more stuff"}});

  bool succeeded = false;
  const int sendtimeout = 1000;
  m_socket.set(zmq::sockopt::sndtimeo, sendtimeout);
  m_socket.connect("ipc://@/zmq-client");
  int retry_count = 0;
  do {
    try {
       succeeded = message.send(m_socket);
      } catch (const std::exception& e) {
        if (zmq_errno() != EINTR && zmq_errno() != EAGAIN) {
          throw;
        }
      }
      retry_count++;
      std::cout << "Retry count: " << retry_count << std::endl;
    } while (!succeeded && retry_count < 20);
    
  if(!succeeded) {
    std::cerr << "Failed to send message to broker: " << zmq_strerror(zmq_errno()) << std::endl;
    return false;
  }

  zmq::multipart_t result;
  bool read_succeeded = result.recv(m_socket);
  return read_succeeded && result.peekstr(0) != "MISSING_FRAME";
}

In the send() function, its popping off the first part of the message, but it never restores the first message to the mulitpart_t if the read fails, which leads to retries failing

 bool send(socket_ref socket, int flags = 0)
    {
        flags &= ~(ZMQ_SNDMORE);
        bool more = size() > 0;
        while (more) {
            message_t message = pop();
            more = size() > 0;
#ifdef ZMQ_CPP11
            if (!socket.send(message, static_cast<send_flags>(
                                        (more ? ZMQ_SNDMORE : 0) | flags)))
                return false;
#else
            if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
                return false;
#endif
        }
        clear();
        return true;
    }

Would it be possible to restore the popped message so a retry can occur?

diehard2 avatar May 10 '25 00:05 diehard2

I would suggest using zmq::recv_multipart and zmq::send_multipart. See if you problems go away. But you are right, that is clearly a bug.

gummif avatar May 12 '25 09:05 gummif

Thanks @gummif. I think there's some subtle issues with zmq::send_multipart also. The rvalue reference strongly suggests I should use it as

zmq::send_multipart(m_socket, std::move(message));

in which case I can't retry since I've lost the message. However, despite the rvalue signature, I could do

zmq::send_multipart(m_socket, message);

and if I receive an unset optional response I would be able to safely retry. However, its possible to get an unset optional response in the following scenario if one isn't in debug mode

  1. first message sends
  2. second send fails

I think the following would allow for the ability to gracefully resend.

send_result_t
send_multipart(socket_ref s, Range &msgs, send_flags flags = send_flags::none)
{
    using std::begin;
    using std::end;
    auto it = begin(msgs);
    const auto end_it = end(msgs);
    size_t msg_count = 0;
    while (it != end_it) {
        const auto next = std::next(it);
        const auto msg_flags =
          flags | (next == end_it ? send_flags::none : send_flags::sndmore);
        if (!s.send(*it, msg_flags)) {
            // zmq ensures atomic delivery of messages
            if(it == begin(msgs)) {
             return {};
            }
            throw some_zmq_error_code;
        }
        ++msg_count;
        it = next;
    }
    return msg_count;
}


diehard2 avatar May 12 '25 16:05 diehard2

The Range&& is a universal reference, so any const or ref qualifiers are valid.

The case you mention is guaranteed by libzmq, so it would be a bug in the API if that would happen. But might be pretty cheap to check as a sanity check.

gummif avatar May 12 '25 16:05 gummif