libzmq
libzmq copied to clipboard
Pub/Sub with NORM issues
I have been experimenting with the norm_engine through cppzmq, and am able to get multiple machines communicating but I ran into a couple issues:
-
The SUB needs to be started before the PUB, otherwise none of the messages are received. Closing the PUB halfway through its transmission and restarting it, results in all messages received. Pub/Sub is asynchronous, and the 0MQ docs state that a number of messages would be dropped if the sub is started later, but it never (1000+ messages, 10+ seconds) seems to recover.
-
If I am using two publishers on the same interface, but with different broadcasting IP's, (i.e.
norm://1,interface1;224.1.2.152:5555
andnorm://2,interface1;224.1.2.153:5555
), a subscriber listening onnorm://interface1;224.1,2,152:5555
receives from BOTH publishers. Surprisingly, this doesn't seem to happen for any other ip's I have tried. (eg. pub x.x.x.151 & 152, sub 152/151)
Environment
- libzmq version (commit hash if unreleased): 4.3.4
- OS: linux
Minimal test code / Steps to reproduce the issue
Subscriber
#include <zmq.hpp>
#include <zmq_addon.hpp>
#include <string>
#include <iostream>
#include <thread>
#include <cstdlib>
#include <chrono>
#ifdef _WIN32
#include <Windows.h>
#elif __linux__
#include <assert.h>
#include <unistd.h>
#endif
using namespace std;
int main(int argc, char *argv[])
{
try
{
if (argc < 1) {
throw;
}
string receiveHost = argv[1];
string tag = "MasterData";
cout << "zmqSubNorm" << receiveHost << " " << tag << endl;
zmq::context_t ctx(1);
try
{
zmq::socket_t subscriber(ctx, ZMQ_SUB);
cout << "sub zmq_connect " << endl;
subscriber.connect(receiveHost.c_str());
cout << "zmq_connect, ok" << endl;
subscriber.set(zmq::sockopt::subscribe, tag.c_str());
cout << "zmq_setsockopt, ok" << endl;
for (;;)
{
cout << "\nSUB: waiting for message" << endl;
char message[256];
auto nbytes = zmq_recv(subscriber,
message,
256,
0);
assert(nbytes != -1);
cout << "SUB-" << receiveHost << "\nReceived: " << nbytes << " bytes\n"
<< string(std::begin(message), std::begin(message) + nbytes) << endl;
}
subscriber.close();
ctx.close();
}
catch (...)
{
auto rc = zmq_ctx_destroy(&ctx);
assert(rc == 0);
throw;
}
cout << "Done" << endl;
}
catch (zmq::error_t &e)
{
printf("zmq error: %s\n", e.what());
}
catch (std::exception &e)
{
printf("exception error: %s\n", e.what());
}
catch (...)
{
printf("rest error");
}
return 0;
}
Publisher
#include <zmq.hpp>
#include <zmq_addon.hpp>
#include <string>
#include <iostream>
#include <thread>
#include <cstdlib>
#include <chrono>
#ifdef _WIN32
#include <Windows.h>
#elif __linux__
#include <assert.h>
#include <unistd.h>
#endif
using namespace std;
int main(int argc, char *argv[])
{
try
{
if (argc < 1)
{
throw;
}
string sendHost = argv[1];
string tag = "MasterData";
cout << "PUB-ssh134: " << sendHost << " " << tag << endl;
string sentMessage = "Hello from -ssh134-";
string fullMessage = tag + sentMessage;
zmq::context_t context;
try
{
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind(sendHost);
cout << "zmq_bind, ok" << endl;
int tick = 0;
for (int i = 0; i < 5; i++)
{
std::this_thread::sleep_for(1s);
auto toSend = fullMessage + std::to_string(i);
cout << "PUB-ssh134 " << sendHost << " rep:" << tick++ << endl;
auto result = publisher.send(zmq::message_t(toSend), zmq::send_flags::none);
cout << "Bytes send:" << toSend.size() << "/" << result.value() << endl;
}
publisher.close();
context.close();
}
catch (...)
{
auto rc = zmq_ctx_destroy(&context);
assert(rc == 0);
throw;
}
cout << "Done" << endl;
}
catch (zmq::error_t &e)
{
printf("zmq error: %s\n", e.what());
}
catch (std::exception &e)
{
printf("exception error: %s\n", e.what());
}
catch (...)
{
printf("rest error");
}
return 0;
}
Issue 1: Change code above to send 1000+ messages and sleep for 20ms
terminal1: ./zmqPub 'norm://1,interface1;224.0.6.152:8573 after 1 starts in a second terminal: ./zmqSub 'norm://2,interface1;224.0.6.152:8573
Issue 2:
terminal: zmqSub 'norm://1,interface1;224.0.6.152:8573 new terminal: zmqPub 'norm://2,interface1;224.0.6.153:8573
What's the actual result? (include assertion message & call stack if applicable)
Issue1: messages never received
issue2: messages received
What's the expected result?
Issue1: messages received
issue2: no messages received
@bebopagogo I believe you created the norm implementation, any ideas?
Any update on this issue?
For what it's worth, I cannot reproduce these issues with pyzmq using the latest libzmq and NORM code. Your second issue does not happen for me at all using pyzmq (I have not tested with cppzmq).
Regarding your first issue, if the pub is started before the sub, NORM's reliable transport will try to catch up on all messages (as far back as the publisher is caching), and it will try to deliver them in order. So a newly sent message will need to wait for all the old messages to be delivered before it will show up. That may take some time depending on the size of the buffer, the amount of old messages, and available link bandwidth. This seems to work for me on pyzmq using the latest libzmq/NORM, but it can take many seconds to catch up on the old messages. The buffer size is configurable in the current development snapshot of libzmq (#4541) so making that smaller may help to limit that effect.
@wouterz - Does the code in the development branch that @weston-nrl mentions here still have this issue for you. I missed this message thread (last year! - sorry). If so, I can look into it.
Thanks for the replies, however I no longer work on this project and am not sure of the current status. Additionally, as it seems that it is not reproducable for weston-nrl, feel free to close this issue.