mosquitto icon indicating copy to clipboard operation
mosquitto copied to clipboard

Inconsistent message latency from client to client

Open DocDriven opened this issue 3 years ago • 1 comments

I am using mosquitto 2.0.14-0 on my host system. Configuration is default. To evaluate transmission time (latency) between two clients, I wrote one sender which sends the current timestamp as message content. The sender publishes a number of messages (e.g. 1 k) utilizing a loop. The receiver takes a timestamp during arrival and outputs the difference (e.g. the transmission time)

I get inconsistent results, even with QoS 0. That means a number of things in particular:

1.) the average transmission times vary between runs. I do the test once, I get an average of ~40 ms. Immediately afterwards, re-doing the experiment yields an average of ~0.5 ms.

2.) the individual transmission times are very inconsistent when regarded in chronological order. Sometimes, there is a slight increase, sometimes big steps are taken from a low latency to a higher latency. For instance, this is an excerpt from a run:

Transmission time: 0.341849
Transmission time: 0.333402
Transmission time: 0.330654
Transmission time: 0.331768
Transmission time: 0.328704
Transmission time: 0.328263
Transmission time: 0.324812
Transmission time: 0.326584
Transmission time: 0.327864
Transmission time: 41.7938
Transmission time: 41.8305
Transmission time: 41.8376
Transmission time: 41.8308
Transmission time: 41.8342
Transmission time: 41.8378
Transmission time: 41.8425
Transmission time: 41.8433

3.) Out of curiosity, I re-ran the exact same tests with an online test broker hosted by hive-mq (broker.hivemq.com). Not only is this as fast (if not a bit faster than the local broker), but the latency is a lot more stable. Shouldn't the exact opposite be the case with everything being more unpredictable when communicating with a remote broker?

Is there an explanation for this strange behavior? Do I have to configure the broker in a specific way?

Sender source code

#include <iostream>
#include <cstdlib>
#include <string>
#include <cstring>
#include <cctype>
#include <thread>
#include <chrono>
#include <atomic>
#include "mqtt/async_client.h"

//const std::string SERVER_ADDRESS("tcp://localhost:1883");
const std::string SERVER_ADDRESS("tcp://broker.hivemq.com:1883");
const std::string CLIENT_ID("client1");
const std::string TOPIC("thisisstrange");
const int QOS(0);
const int N_MSG(1000);

int main(int argc, char* argv[])
{
    mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);

    mqtt::connect_options connOpts;
    connOpts.set_clean_session(true);

    // Connect
    try {
        cli.connect(connOpts);
    }
    catch (const mqtt::exception& exc) {
        std::cerr << "\nERROR: Unable to connect to MQTT server: '"
            << SERVER_ADDRESS << "'" << exc << std::endl;
        return 1;
    }

    // Press 'm' to send messages or 'q' to exit
    while (true)
    {
        char c = std::tolower(std::cin.get());

        if (c == 'q') {
            break;
        }

        else if (c == 'm')
        {
            for (int i = 0; i < N_MSG; ++i)
            {
                const auto ts = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1);
                mqtt::message_ptr pubmsg = mqtt::make_message(TOPIC, std::to_string(ts));
                pubmsg->set_qos(QOS);
                auto tok = cli.publish(pubmsg);
            }
        }
    }

    // Disconnect
    try {
        cli.disconnect()->wait();
        std::cout << "OK" << std::endl;
    }
    catch (const mqtt::exception& exc) {
        std::cerr << exc << std::endl;
        return 1;
    }

    return 0;
}

Receiver source code

#include <iostream>
#include <cstdlib>
#include <string>
#include <cstring>
#include <cctype>
#include <thread>
#include <chrono>
#include <atomic>
#include "mqtt/async_client.h"

//const std::string SERVER_ADDRESS("tcp://localhost:1883");
const std::string SERVER_ADDRESS("tcp://broker.hivemq.com:1883");
const std::string CLIENT_ID("client2");
const std::string TOPIC("thisisstrange");
const int QOS(0);

class Callback
    : public virtual mqtt::callback
{
    // the mqtt client
    mqtt::async_client& cli_;

    void connected(const std::string& cause) override
    {
        cli_.subscribe(TOPIC, QOS);
    }

    void message_arrived(mqtt::const_message_ptr msg) override
    {
        uint64_t recv_ts = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1);
        uint64_t send_ts = static_cast<uint64_t>(std::stoull(msg->get_payload()));
        std::cout << "Transmission time: " << (recv_ts - send_ts) / 1e6 << std::endl;
    }

public:
    Callback(mqtt::async_client& cli)
        : cli_(cli) {}
};

int main(int argc, char* argv[])
{
    mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);

    mqtt::connect_options connOpts;
    connOpts.set_clean_session(true);

    Callback cb(cli);
    cli.set_callback(cb);

    // Connect
    try {
        cli.connect(connOpts);
    }
    catch (const mqtt::exception& exc) {
        std::cerr << "\nERROR: Unable to connect to MQTT server: '"
            << SERVER_ADDRESS << "'" << exc << std::endl;
        return 1;
    }

    while (std::tolower(std::cin.get()) != 'q')
        ;

    // Disconnect
    try {
        cli.disconnect()->wait();
        std::cout << "OK" << std::endl;
    }
    catch (const mqtt::exception& exc) {
        std::cerr << exc << std::endl;
        return 1;
    }

    return 0;
}

DocDriven avatar May 17 '22 19:05 DocDriven

40ms sounds very much like Nagle's algorithm. Could you please try adding set_tcp_nodelay true to your config file and looking at your tests again?

ralight avatar May 23 '22 22:05 ralight