Inconsistent message latency from client to client
I am using mosquitto 2.0.14-0 on my host system. Configuration is default. To evaluate transmission time (latency) between two clients, I wrote one sender which sends the current timestamp as message content. The sender publishes a number of messages (e.g. 1 k) utilizing a loop. The receiver takes a timestamp during arrival and outputs the difference (e.g. the transmission time)
I get inconsistent results, even with QoS 0. That means a number of things in particular:
1.) the average transmission times vary between runs. I do the test once, I get an average of ~40 ms. Immediately afterwards, re-doing the experiment yields an average of ~0.5 ms.
2.) the individual transmission times are very inconsistent when regarded in chronological order. Sometimes, there is a slight increase, sometimes big steps are taken from a low latency to a higher latency. For instance, this is an excerpt from a run:
Transmission time: 0.341849
Transmission time: 0.333402
Transmission time: 0.330654
Transmission time: 0.331768
Transmission time: 0.328704
Transmission time: 0.328263
Transmission time: 0.324812
Transmission time: 0.326584
Transmission time: 0.327864
Transmission time: 41.7938
Transmission time: 41.8305
Transmission time: 41.8376
Transmission time: 41.8308
Transmission time: 41.8342
Transmission time: 41.8378
Transmission time: 41.8425
Transmission time: 41.8433
3.) Out of curiosity, I re-ran the exact same tests with an online test broker hosted by hive-mq (broker.hivemq.com). Not only is this as fast (if not a bit faster than the local broker), but the latency is a lot more stable. Shouldn't the exact opposite be the case with everything being more unpredictable when communicating with a remote broker?
Is there an explanation for this strange behavior? Do I have to configure the broker in a specific way?
Sender source code
#include <iostream>
#include <cstdlib>
#include <string>
#include <cstring>
#include <cctype>
#include <thread>
#include <chrono>
#include <atomic>
#include "mqtt/async_client.h"
//const std::string SERVER_ADDRESS("tcp://localhost:1883");
const std::string SERVER_ADDRESS("tcp://broker.hivemq.com:1883");
const std::string CLIENT_ID("client1");
const std::string TOPIC("thisisstrange");
const int QOS(0);
const int N_MSG(1000);
int main(int argc, char* argv[])
{
mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);
mqtt::connect_options connOpts;
connOpts.set_clean_session(true);
// Connect
try {
cli.connect(connOpts);
}
catch (const mqtt::exception& exc) {
std::cerr << "\nERROR: Unable to connect to MQTT server: '"
<< SERVER_ADDRESS << "'" << exc << std::endl;
return 1;
}
// Press 'm' to send messages or 'q' to exit
while (true)
{
char c = std::tolower(std::cin.get());
if (c == 'q') {
break;
}
else if (c == 'm')
{
for (int i = 0; i < N_MSG; ++i)
{
const auto ts = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1);
mqtt::message_ptr pubmsg = mqtt::make_message(TOPIC, std::to_string(ts));
pubmsg->set_qos(QOS);
auto tok = cli.publish(pubmsg);
}
}
}
// Disconnect
try {
cli.disconnect()->wait();
std::cout << "OK" << std::endl;
}
catch (const mqtt::exception& exc) {
std::cerr << exc << std::endl;
return 1;
}
return 0;
}
Receiver source code
#include <iostream>
#include <cstdlib>
#include <string>
#include <cstring>
#include <cctype>
#include <thread>
#include <chrono>
#include <atomic>
#include "mqtt/async_client.h"
//const std::string SERVER_ADDRESS("tcp://localhost:1883");
const std::string SERVER_ADDRESS("tcp://broker.hivemq.com:1883");
const std::string CLIENT_ID("client2");
const std::string TOPIC("thisisstrange");
const int QOS(0);
class Callback
: public virtual mqtt::callback
{
// the mqtt client
mqtt::async_client& cli_;
void connected(const std::string& cause) override
{
cli_.subscribe(TOPIC, QOS);
}
void message_arrived(mqtt::const_message_ptr msg) override
{
uint64_t recv_ts = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1);
uint64_t send_ts = static_cast<uint64_t>(std::stoull(msg->get_payload()));
std::cout << "Transmission time: " << (recv_ts - send_ts) / 1e6 << std::endl;
}
public:
Callback(mqtt::async_client& cli)
: cli_(cli) {}
};
int main(int argc, char* argv[])
{
mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);
mqtt::connect_options connOpts;
connOpts.set_clean_session(true);
Callback cb(cli);
cli.set_callback(cb);
// Connect
try {
cli.connect(connOpts);
}
catch (const mqtt::exception& exc) {
std::cerr << "\nERROR: Unable to connect to MQTT server: '"
<< SERVER_ADDRESS << "'" << exc << std::endl;
return 1;
}
while (std::tolower(std::cin.get()) != 'q')
;
// Disconnect
try {
cli.disconnect()->wait();
std::cout << "OK" << std::endl;
}
catch (const mqtt::exception& exc) {
std::cerr << exc << std::endl;
return 1;
}
return 0;
}
40ms sounds very much like Nagle's algorithm. Could you please try adding set_tcp_nodelay true to your config file and looking at your tests again?