mosquitto
mosquitto copied to clipboard
Inconsistent timing of messages clocked to the second (1110 ms)
Hello everyone,
I am encountering a very strange issue when sending messages at a one-second interval. To elaborate, we need to send data every second to an MQTT broker, but we have noticed that we are unable to achieve this one-second interval; there is a deviation of 1110 ms between each message, a deviation observed through the message history in MQTT Explorer.
I tried to find a similar issue in the existing GitHub issues and found this one: link to the issue. I attempted to enable the set_tcp_nodelay option in the config.mk file of my broker, but it had no effect.
Third test with 1000 messages (set_tcp_nodelay=true) (average interval between each message):
Wireshark | MOSQUITTO Program (ms) | Processing Program (ms) |
---|---|---|
1110.45213 | 1000.05901 | 0.02202 |
Fourth test with 1000 messages (set_tcp_nodelay=false) (average interval between each message):
Wireshark | MOSQUITTO Program (ms) | Processing Program (ms) |
---|---|---|
1110.43184 | 1000.07039 | 0.02803 |
Here is my C code.
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <time.h>
#include <sys/time.h>
#include <unistd.h>
#include <assert.h>
#include <mosquitto.h>
#include <client/client_shared.h>
#include <errno.h>
void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
int i;
// printf("%d\n", result);
if (!result)
{
fprintf(stderr, "Connect ok\n");
// mosquitto_subscribe(mosq, NULL, "#", 0);
}
else
{
fprintf(stderr, "Connect failed\n");
}
}
void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
printf("%s\n", str);
}
void sleepMilliseconds(unsigned long milliseconds)
{
struct timespec ts;
ts.tv_sec = milliseconds / 1000;
ts.tv_nsec = (milliseconds % 1000) * 1000000;
nanosleep(&ts, NULL);
}
void print_usage(void)
{
int major, minor, revision;
mosquitto_lib_version(&major, &minor, &revision);
printf("mosquitto_sub is a simple mqtt client that will subscribe to a single topic and print all messages it receives.\n");
printf("mosquitto_sub running on libmosquitto %d.%d.%d.\n\n", /*VERSION,*/ major, minor, revision);
printf("Usage: mosquitto_sub [-c] [-h host] [-k keepalive] [-p port] [-q qos] [-R] -t topic ...\n");
printf(" [-C msg_count] [-T filter_out]\n");
#ifdef WITH_SRV
printf(" [-A bind_address] [-S]\n");
#else
printf(" [-A bind_address]\n");
#endif
printf(" [-i id] [-I id_prefix]\n");
printf(" [-d] [-N] [--quiet] [-v]\n");
printf(" [-u username [-P password]]\n");
printf(" [--will-topic [--will-payload payload] [--will-qos qos] [--will-retain]]\n");
#ifdef WITH_TLS
printf(" [{--cafile file | --capath dir} [--cert file] [--key file]\n");
printf(" [--ciphers ciphers] [--insecure]]\n");
#ifdef WITH_TLS_PSK
printf(" [--psk hex-key --psk-identity identity [--ciphers ciphers]]\n");
#endif
#endif
#ifdef WITH_SOCKS
printf(" [--proxy socks-url]\n");
#endif
printf(" mosquitto_sub --help\n\n");
printf(" -A : bind the outgoing socket to this host/ip address. Use to control which interface\n");
printf(" the client communicates over.\n");
printf(" -c : disable 'clean session' (store subscription and pending messages when client disconnects).\n");
printf(" -C : disconnect and exit after receiving the 'msg_count' messages.\n");
printf(" -d : enable debug messages.\n");
printf(" -h : mqtt host to connect to. Defaults to localhost.\n");
printf(" -i : id to use for this client. Defaults to mosquitto_sub_ appended with the process id.\n");
printf(" -I : define the client id as id_prefix appended with the process id. Useful for when the\n");
printf(" broker is using the clientid_prefixes option.\n");
printf(" -k : keep alive in seconds for this client. Defaults to 60.\n");
printf(" -N : do not add an end of line character when printing the payload.\n");
printf(" -p : network port to connect to. Defaults to 1883.\n");
printf(" -P : provide a password (requires MQTT 3.1 broker)\n");
printf(" -q : quality of service level to use for the subscription. Defaults to 0.\n");
printf(" -R : do not print stale messages (those with retain set).\n");
#ifdef WITH_SRV
printf(" -S : use SRV lookups to determine which host to connect to.\n");
#endif
printf(" -t : mqtt topic to subscribe to. May be repeated multiple times.\n");
printf(" -T : topic string to filter out of results. May be repeated.\n");
printf(" -u : provide a username (requires MQTT 3.1 broker)\n");
printf(" -v : print published messages verbosely.\n");
printf(" -V : specify the version of the MQTT protocol to use when connecting.\n");
printf(" Can be mqttv31 or mqttv311. Defaults to mqttv31.\n");
printf(" --help : display this message.\n");
printf(" --quiet : don't print error messages.\n");
printf(" --will-payload : payload for the client Will, which is sent by the broker in case of\n");
printf(" unexpected disconnection. If not given and will-topic is set, a zero\n");
printf(" length message will be sent.\n");
printf(" --will-qos : QoS level for the client Will.\n");
printf(" --will-retain : if given, make the client Will retained.\n");
printf(" --will-topic : the topic on which to publish the client Will.\n");
#ifdef WITH_TLS
printf(" --cafile : path to a file containing trusted CA certificates to enable encrypted\n");
printf(" certificate based communication.\n");
printf(" --capath : path to a directory containing trusted CA certificates to enable encrypted\n");
printf(" communication.\n");
printf(" --cert : client certificate for authentication, if required by server.\n");
printf(" --key : client private key for authentication, if required by server.\n");
printf(" --ciphers : openssl compatible list of TLS ciphers to support.\n");
printf(" --tls-version : TLS protocol version, can be one of tlsv1.2 tlsv1.1 or tlsv1.\n");
printf(" Defaults to tlsv1.2 if available.\n");
printf(" --insecure : do not check that the server certificate hostname matches the remote\n");
printf(" hostname. Using this option means that you cannot be sure that the\n");
printf(" remote host is the server you wish to connect to and so is insecure.\n");
printf(" Do not use this option in a production environment.\n");
#ifdef WITH_TLS_PSK
printf(" --psk : pre-shared-key in hexadecimal (no leading 0x) to enable TLS-PSK mode.\n");
printf(" --psk-identity : client identity string for TLS-PSK mode.\n");
#endif
#endif
#ifdef WITH_SOCKS
printf(" --proxy : SOCKS5 proxy URL of the form:\n");
printf(" socks5h://[username[:password]@]hostname[:port]\n");
printf(" Only \"none\" and \"username\" authentication is supported.\n");
#endif
printf("\nSee http://mosquitto.org/ for more information.\n\n");
}
#include <time.h>
void timestamp()
{
struct timeval tv;
gettimeofday(&tv, NULL);
double time_in_mill = (tv.tv_sec) * 1000 + (tv.tv_usec) / 1000; // convert tv_sec & tv_usec to millisecond
printf("%f\n", time_in_mill);
}
int main(int argc, char *argv[])
{
int rc, i;
/* les variables Mosquitto */
char topic[20];
bool clean_session = true;
struct mosquitto *mosq = NULL;
struct mosq_config cfg;
rc = client_config_load(&cfg, CLIENT_SUB, argc, argv);
if (rc)
{
client_config_cleanup(&cfg);
if (rc == 2)
{
/* --help */
print_usage();
}
else
{
fprintf(stderr, "\nUse 'mosquitto_sub --help' to see usage.\n");
}
return 1;
}
mosquitto_lib_init();
if (client_id_generate(&cfg))
{
return 1;
}
mosq = mosquitto_new(cfg.id, true, &cfg);
if (!mosq)
{
switch (errno)
{
case ENOMEM:
if (!cfg.quiet)
fprintf(stderr, "Error: Out of memory.\n");
break;
case EINVAL:
if (!cfg.quiet)
fprintf(stderr, "Error: Invalid id and/or clean_session.\n");
break;
}
mosquitto_lib_cleanup();
return 1;
}
if (client_opts_set(mosq, &cfg))
{
return 1;
}
if (cfg.debug)
{
mosquitto_log_callback_set(mosq, my_log_callback);
}
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
rc = client_connect(mosq, &cfg);
if (rc)
return rc;
strcpy(topic, cfg.topics[0]);
mosquitto_loop_start(mosq);
for (i = 0; i < 1000; i++)
{
timestamp();
mosquitto_publish(mosq, NULL, topic, strlen(topic), topic, 0, 0);
timestamp();
sleepMilliseconds(1000);
}
}
This is a basic program that prints the timestamp, sends the message "test" to the "/test" topic, then reprints the timestamp and sleeps for 1000 ms. To complete my tests, I also captured network traffic using Wireshark with a filter tcp.port = 1883.
3rd test:
On the second image, please disregard the 4585; it is my machine resetting its clock.
4th test:
The machine pings are good, and the issue persists even when I try to publish messages to the broker located locally on the machine with the IP address 192.6.1.50.
Do you have any idea why I have these extra 110ms? We have tried with other technologies (Node.js and Node-RED), and we don't encounter any issues; the messages are consistently sent every approximately 1000ms.
I'm not sure if you need more information or configuration files; I will try to provide them as soon as possible.
Thanks to everyone.
hh,you can fix that