libzmq icon indicating copy to clipboard operation
libzmq copied to clipboard

Pub/Sub with NORM issues

Open wouterz opened this issue 3 years ago • 5 comments

I have been experimenting with the norm_engine through cppzmq, and am able to get multiple machines communicating but I ran into a couple issues:

  1. The SUB needs to be started before the PUB, otherwise none of the messages are received. Closing the PUB halfway through its transmission and restarting it, results in all messages received. Pub/Sub is asynchronous, and the 0MQ docs state that a number of messages would be dropped if the sub is started later, but it never (1000+ messages, 10+ seconds) seems to recover.

  2. If I am using two publishers on the same interface, but with different broadcasting IP's, (i.e. norm://1,interface1;224.1.2.152:5555 and norm://2,interface1;224.1.2.153:5555), a subscriber listening on norm://interface1;224.1,2,152:5555 receives from BOTH publishers. Surprisingly, this doesn't seem to happen for any other ip's I have tried. (eg. pub x.x.x.151 & 152, sub 152/151)

Environment

  • libzmq version (commit hash if unreleased): 4.3.4
  • OS: linux

Minimal test code / Steps to reproduce the issue

Subscriber


#include <zmq.hpp>
#include <zmq_addon.hpp>
#include <string>
#include <iostream>
#include <thread>
#include <cstdlib>
#include <chrono>

#ifdef _WIN32
#include <Windows.h>
#elif __linux__
#include <assert.h>
#include <unistd.h>
#endif

using namespace std;

int main(int argc, char *argv[])
{
    try
    {
        if (argc < 1) {
            throw;
        }
        string receiveHost = argv[1];
        string tag = "MasterData";

        cout << "zmqSubNorm" << receiveHost << " " << tag << endl;

        zmq::context_t ctx(1);

        try
        {
            zmq::socket_t subscriber(ctx, ZMQ_SUB);
            cout << "sub zmq_connect " << endl;
            subscriber.connect(receiveHost.c_str());
            cout << "zmq_connect, ok" << endl;
            subscriber.set(zmq::sockopt::subscribe, tag.c_str());
            cout << "zmq_setsockopt, ok" << endl;

            for (;;)
            {
                cout << "\nSUB: waiting for message" << endl;
                char message[256];
                auto nbytes = zmq_recv(subscriber,
                                       message,
                                       256,
                                       0);
                assert(nbytes != -1);
                cout << "SUB-" << receiveHost << "\nReceived: " << nbytes << " bytes\n"
                     << string(std::begin(message), std::begin(message) + nbytes) << endl;
            }

            subscriber.close();
            ctx.close();
        }
        catch (...)
        {
            auto rc = zmq_ctx_destroy(&ctx);
            assert(rc == 0);
            throw;
        }
        cout << "Done" << endl;
    }
    catch (zmq::error_t &e)
    {
        printf("zmq error: %s\n", e.what());
    }
    catch (std::exception &e)
    {
        printf("exception error: %s\n", e.what());
    }
    catch (...)
    {
        printf("rest error");
    }

    return 0;
}

Publisher

#include <zmq.hpp>
#include <zmq_addon.hpp>
#include <string>
#include <iostream>
#include <thread>
#include <cstdlib>
#include <chrono>

#ifdef _WIN32
#include <Windows.h>
#elif __linux__
#include <assert.h>
#include <unistd.h>
#endif

using namespace std;

int main(int argc, char *argv[])
{
    try
    {
        if (argc < 1)
        {
            throw;
        }
        string sendHost = argv[1];
        string tag = "MasterData";
        cout << "PUB-ssh134: " << sendHost << " " << tag << endl;
        string sentMessage = "Hello from -ssh134-";
        string fullMessage = tag + sentMessage;

        zmq::context_t context;

        try
        {
            zmq::socket_t publisher(context, ZMQ_PUB);
            publisher.bind(sendHost);
            cout << "zmq_bind, ok" << endl;

            int tick = 0;
            for (int i = 0; i < 5; i++)
            {
                std::this_thread::sleep_for(1s);
                auto toSend = fullMessage + std::to_string(i);
                cout << "PUB-ssh134 " << sendHost << "  rep:" << tick++ << endl;
                auto result = publisher.send(zmq::message_t(toSend), zmq::send_flags::none);
                cout << "Bytes send:" << toSend.size() << "/" << result.value() << endl;
            }

            publisher.close();
            context.close();
        }
        catch (...)
        {

            auto rc = zmq_ctx_destroy(&context);
            assert(rc == 0);
            throw;
        }

        cout << "Done" << endl;
    }
    catch (zmq::error_t &e)
    {
        printf("zmq error: %s\n", e.what());
    }
    catch (std::exception &e)
    {
        printf("exception error: %s\n", e.what());
    }
    catch (...)
    {
        printf("rest error");
    }

    return 0;
}

Issue 1: Change code above to send 1000+ messages and sleep for 20ms

terminal1: ./zmqPub 'norm://1,interface1;224.0.6.152:8573 after 1 starts in a second terminal: ./zmqSub 'norm://2,interface1;224.0.6.152:8573

Issue 2: terminal: zmqSub 'norm://1,interface1;224.0.6.152:8573 new terminal: zmqPub 'norm://2,interface1;224.0.6.153:8573

What's the actual result? (include assertion message & call stack if applicable)

Issue1: messages never received

issue2: messages received

What's the expected result?

Issue1: messages received

issue2: no messages received

wouterz avatar Jan 13 '22 15:01 wouterz

@bebopagogo I believe you created the norm implementation, any ideas?

wouterz avatar Jan 13 '22 15:01 wouterz

Any update on this issue?

muraliadiga avatar Mar 25 '22 09:03 muraliadiga

For what it's worth, I cannot reproduce these issues with pyzmq using the latest libzmq and NORM code. Your second issue does not happen for me at all using pyzmq (I have not tested with cppzmq).

Regarding your first issue, if the pub is started before the sub, NORM's reliable transport will try to catch up on all messages (as far back as the publisher is caching), and it will try to deliver them in order. So a newly sent message will need to wait for all the old messages to be delivered before it will show up. That may take some time depending on the size of the buffer, the amount of old messages, and available link bandwidth. This seems to work for me on pyzmq using the latest libzmq/NORM, but it can take many seconds to catch up on the old messages. The buffer size is configurable in the current development snapshot of libzmq (#4541) so making that smaller may help to limit that effect.

weston-nrl avatar Jul 20 '23 12:07 weston-nrl

@wouterz - Does the code in the development branch that @weston-nrl mentions here still have this issue for you. I missed this message thread (last year! - sorry). If so, I can look into it.

bebopagogo avatar Oct 31 '23 23:10 bebopagogo

Thanks for the replies, however I no longer work on this project and am not sure of the current status. Additionally, as it seems that it is not reproducable for weston-nrl, feel free to close this issue.

wouterz avatar Nov 03 '23 08:11 wouterz