mosquitto icon indicating copy to clipboard operation
mosquitto copied to clipboard

Inconsistent timing of messages clocked to the second (1110 ms)

Open PepeMax opened this issue 1 year ago • 1 comments

Hello everyone,

I am encountering a very strange issue when sending messages at a one-second interval. To elaborate, we need to send data every second to an MQTT broker, but we have noticed that we are unable to achieve this one-second interval; there is a deviation of 1110 ms between each message, a deviation observed through the message history in MQTT Explorer.

I tried to find a similar issue in the existing GitHub issues and found this one: link to the issue. I attempted to enable the set_tcp_nodelay option in the config.mk file of my broker, but it had no effect.

Third test with 1000 messages (set_tcp_nodelay=true) (average interval between each message):

Wireshark MOSQUITTO Program (ms) Processing Program (ms)
1110.45213 1000.05901 0.02202

Fourth test with 1000 messages (set_tcp_nodelay=false) (average interval between each message):

Wireshark MOSQUITTO Program (ms) Processing Program (ms)
1110.43184 1000.07039 0.02803

Here is my C code.

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <time.h>
#include <sys/time.h>
#include <unistd.h>
#include <assert.h>

#include <mosquitto.h>
#include <client/client_shared.h>
#include <errno.h>

void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
    int i;
    // printf("%d\n", result);
    if (!result)
    {
        fprintf(stderr, "Connect ok\n");
        // mosquitto_subscribe(mosq, NULL, "#", 0);
    }
    else
    {
        fprintf(stderr, "Connect failed\n");
    }
}


void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
    printf("%s\n", str);
}

void sleepMilliseconds(unsigned long milliseconds)
{
    struct timespec ts;
    ts.tv_sec = milliseconds / 1000;
    ts.tv_nsec = (milliseconds % 1000) * 1000000;
    nanosleep(&ts, NULL);
}

void print_usage(void)
{
    int major, minor, revision;

    mosquitto_lib_version(&major, &minor, &revision);
    printf("mosquitto_sub is a simple mqtt client that will subscribe to a single topic and print all messages it receives.\n");
    printf("mosquitto_sub running on libmosquitto %d.%d.%d.\n\n", /*VERSION,*/ major, minor, revision);
    printf("Usage: mosquitto_sub [-c] [-h host] [-k keepalive] [-p port] [-q qos] [-R] -t topic ...\n");
    printf("                     [-C msg_count] [-T filter_out]\n");
#ifdef WITH_SRV
    printf("                     [-A bind_address] [-S]\n");
#else
    printf("                     [-A bind_address]\n");
#endif
    printf("                     [-i id] [-I id_prefix]\n");
    printf("                     [-d] [-N] [--quiet] [-v]\n");
    printf("                     [-u username [-P password]]\n");
    printf("                     [--will-topic [--will-payload payload] [--will-qos qos] [--will-retain]]\n");
#ifdef WITH_TLS
    printf("                     [{--cafile file | --capath dir} [--cert file] [--key file]\n");
    printf("                      [--ciphers ciphers] [--insecure]]\n");
#ifdef WITH_TLS_PSK
    printf("                     [--psk hex-key --psk-identity identity [--ciphers ciphers]]\n");
#endif
#endif
#ifdef WITH_SOCKS
    printf("                     [--proxy socks-url]\n");
#endif
    printf("       mosquitto_sub --help\n\n");
    printf(" -A : bind the outgoing socket to this host/ip address. Use to control which interface\n");
    printf("      the client communicates over.\n");
    printf(" -c : disable 'clean session' (store subscription and pending messages when client disconnects).\n");
    printf(" -C : disconnect and exit after receiving the 'msg_count' messages.\n");
    printf(" -d : enable debug messages.\n");
    printf(" -h : mqtt host to connect to. Defaults to localhost.\n");
    printf(" -i : id to use for this client. Defaults to mosquitto_sub_ appended with the process id.\n");
    printf(" -I : define the client id as id_prefix appended with the process id. Useful for when the\n");
    printf("      broker is using the clientid_prefixes option.\n");
    printf(" -k : keep alive in seconds for this client. Defaults to 60.\n");
    printf(" -N : do not add an end of line character when printing the payload.\n");
    printf(" -p : network port to connect to. Defaults to 1883.\n");
    printf(" -P : provide a password (requires MQTT 3.1 broker)\n");
    printf(" -q : quality of service level to use for the subscription. Defaults to 0.\n");
    printf(" -R : do not print stale messages (those with retain set).\n");
#ifdef WITH_SRV
    printf(" -S : use SRV lookups to determine which host to connect to.\n");
#endif
    printf(" -t : mqtt topic to subscribe to. May be repeated multiple times.\n");
    printf(" -T : topic string to filter out of results. May be repeated.\n");
    printf(" -u : provide a username (requires MQTT 3.1 broker)\n");
    printf(" -v : print published messages verbosely.\n");
    printf(" -V : specify the version of the MQTT protocol to use when connecting.\n");
    printf("      Can be mqttv31 or mqttv311. Defaults to mqttv31.\n");
    printf(" --help : display this message.\n");
    printf(" --quiet : don't print error messages.\n");
    printf(" --will-payload : payload for the client Will, which is sent by the broker in case of\n");
    printf("                  unexpected disconnection. If not given and will-topic is set, a zero\n");
    printf("                  length message will be sent.\n");
    printf(" --will-qos : QoS level for the client Will.\n");
    printf(" --will-retain : if given, make the client Will retained.\n");
    printf(" --will-topic : the topic on which to publish the client Will.\n");
#ifdef WITH_TLS
    printf(" --cafile : path to a file containing trusted CA certificates to enable encrypted\n");
    printf("            certificate based communication.\n");
    printf(" --capath : path to a directory containing trusted CA certificates to enable encrypted\n");
    printf("            communication.\n");
    printf(" --cert : client certificate for authentication, if required by server.\n");
    printf(" --key : client private key for authentication, if required by server.\n");
    printf(" --ciphers : openssl compatible list of TLS ciphers to support.\n");
    printf(" --tls-version : TLS protocol version, can be one of tlsv1.2 tlsv1.1 or tlsv1.\n");
    printf("                 Defaults to tlsv1.2 if available.\n");
    printf(" --insecure : do not check that the server certificate hostname matches the remote\n");
    printf("              hostname. Using this option means that you cannot be sure that the\n");
    printf("              remote host is the server you wish to connect to and so is insecure.\n");
    printf("              Do not use this option in a production environment.\n");
#ifdef WITH_TLS_PSK
    printf(" --psk : pre-shared-key in hexadecimal (no leading 0x) to enable TLS-PSK mode.\n");
    printf(" --psk-identity : client identity string for TLS-PSK mode.\n");
#endif
#endif
#ifdef WITH_SOCKS
    printf(" --proxy : SOCKS5 proxy URL of the form:\n");
    printf("           socks5h://[username[:password]@]hostname[:port]\n");
    printf("           Only \"none\" and \"username\" authentication is supported.\n");
#endif
    printf("\nSee http://mosquitto.org/ for more information.\n\n");
}

#include <time.h>
void timestamp()
{
    struct timeval tv;
    gettimeofday(&tv, NULL);

    double time_in_mill = (tv.tv_sec) * 1000 + (tv.tv_usec) / 1000; // convert tv_sec & tv_usec to millisecond
    printf("%f\n", time_in_mill);
}

int main(int argc, char *argv[])
{
    int rc, i;

    /* les variables Mosquitto */
    char topic[20];
    bool clean_session = true;
    struct mosquitto *mosq = NULL;
    struct mosq_config cfg;

    rc = client_config_load(&cfg, CLIENT_SUB, argc, argv);
    if (rc)
    {
        client_config_cleanup(&cfg);
        if (rc == 2)
        {
            /* --help */
            print_usage();
        }
        else
        {
            fprintf(stderr, "\nUse 'mosquitto_sub --help' to see usage.\n");
        }
        return 1;
    }

    mosquitto_lib_init();

    if (client_id_generate(&cfg))
    {
        return 1;
    }

    mosq = mosquitto_new(cfg.id, true, &cfg);
    if (!mosq)
    {
        switch (errno)
        {
        case ENOMEM:
            if (!cfg.quiet)
                fprintf(stderr, "Error: Out of memory.\n");
            break;
        case EINVAL:
            if (!cfg.quiet)
                fprintf(stderr, "Error: Invalid id and/or clean_session.\n");
            break;
        }
        mosquitto_lib_cleanup();
        return 1;
    }
    if (client_opts_set(mosq, &cfg))
    {
        return 1;
    }

    if (cfg.debug)
    {
        mosquitto_log_callback_set(mosq, my_log_callback);
    }
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);

    rc = client_connect(mosq, &cfg);
    if (rc)
        return rc;

    strcpy(topic, cfg.topics[0]);

    mosquitto_loop_start(mosq);

    for (i = 0; i < 1000; i++)
    {
        timestamp();
        mosquitto_publish(mosq, NULL, topic, strlen(topic), topic, 0, 0);
        timestamp();
        sleepMilliseconds(1000);
    }
}

This is a basic program that prints the timestamp, sends the message "test" to the "/test" topic, then reprints the timestamp and sleeps for 1000 ms. To complete my tests, I also captured network traffic using Wireshark with a filter tcp.port = 1883.

3rd test:

Image Image On the second image, please disregard the 4585; it is my machine resetting its clock.

4th test:

Image Image

image image The machine pings are good, and the issue persists even when I try to publish messages to the broker located locally on the machine with the IP address 192.6.1.50.

Do you have any idea why I have these extra 110ms? We have tried with other technologies (Node.js and Node-RED), and we don't encounter any issues; the messages are consistently sent every approximately 1000ms.

I'm not sure if you need more information or configuration files; I will try to provide them as soon as possible.

Thanks to everyone.

PepeMax avatar Jan 03 '24 09:01 PepeMax

hh,you can fix that

dongqiceo avatar Jan 30 '24 08:01 dongqiceo