aedes icon indicating copy to clipboard operation
aedes copied to clipboard

Subscriptions not effective on first connection

Open albertocaponedev opened this issue 4 years ago • 14 comments

Hi everybody, I'm new to mqtt so I'm sorry if the question is silly.

Broker settings (over TLS): Concurrency: 100 Emitter: redis emitter Persistence: mongo

Client (mqtt.js) Clean session: false Attempts to subscribe a pair of topics on connect with QoS 2.

I encounter this problem: if I connect the client to the broker, with clean session to false, subscription aren't (apparently) effective i.e.:

  1. the callback on the subscriptions client event doesn't fire (nonetheless they show up in their own collection on mongo as expected)

  2. if another client publishes on those subscriptions messages aren't delivered

But... if the guilty client disconnects and reconnects it gets every message it should have, as if subs had always been working.

If I use the aedes default emitter the problem does not exist, it does exist with redis and mongo emitters. The problem ceases to exist even if the client connects with clean session true (but obviously I lose persistence and that's not what I want).

What is happening?

albertocaponedev avatar Mar 26 '21 23:03 albertocaponedev

But... if the guilty client disconnects and reconnects it gets every message it should have, as if subs had always been working.

That's what clean: false is used for. It creates a persistent session that allows the client to receive messages while he was offline if he disconnect and reconnect. Most times you will not need clean set to false but to true. I suggest to firstly read something about MQTT foundamentals

robertsLando avatar Mar 28 '21 09:03 robertsLando

Sure. But I would like it to receive messages even on first connection without having to disconnect and reconnect to make subscriptions effective. And I can obtain this behaviour if I use the embedded emitter.

albertocaponedev avatar Mar 28 '21 09:03 albertocaponedev

You can do this using retained flag on messages published. This is well explained in mqtt foundamentals

robertsLando avatar Mar 28 '21 10:03 robertsLando

Maybe I have not been clear enough. I'll try again.

On its first connection my mqtt client: a) tries to subscribe to its topics, b) its subscripitions get recorded on mongo as everything went good, c) but the callback on Client#subscribe does not fire, d) and messages won't be delivered until my client disconnects and reconnects.

Now, If I understood what you said, this is intendend to be the correct behaviour. And this surprises me: I knew that persistence purpose is to supply offline clients with their otherwise lost packets on reconnection, nonetheless I thought that persistence delivery would have been effective instantly from the very same moment of the first subscription and not just upon reconnection.

Then to my understanding retaining messages is useless in my case because either:

  1. it's client first connection so, retained or not, messages won't be delivered on a session clean false client first connection
  2. client disconnected and reconnected, therefore subscriptions are effective and persistence is in place, so client will get every message it needs.

albertocaponedev avatar Mar 28 '21 13:03 albertocaponedev

ok now it is more clear, the problem essentially is:

d) and messages won't be delivered until my client disconnects and reconnects.

This is strange, are you able to provide a piece of code that reproduce the problem?

robertsLando avatar Mar 29 '21 07:03 robertsLando

I'll be working on it, however so far I can tell you, as a side note, that the ghosting Client#subscribe callback fires at last, for example, on broker closed event.

albertocaponedev avatar Mar 29 '21 14:03 albertocaponedev

Ok, this code replicates the issue. I found that it happens only if my-subscriber subscribes to an array (or to an object) of topics. If I go with a single string everything works as expected, and its callback (the callback in Client#subscribe) fires without delay. Am I doing something wrong?

package.json

{
    "aedes": "^0.45.0",
    "aedes-persistence-mongodb": "^8.1.3",
    "mqemitter-mongodb": "^7.0.4",
    "mqtt": "^4.2.6",
    "net": "^1.0.2"
}

Broker

const Aedes = require('aedes');
const aedesPersistenceMongoDB = require('aedes-persistence-mongodb');
const mongodb = require('mqemitter-mongodb');


const brokerConfiguration = {
    concurrency: 100,
    mq: mongodb({
        url: 'mongodb://debuggingUser:[email protected]:27017/debugging'
    }),
    persistence: aedesPersistenceMongoDB({
        url: 'mongodb://debuggingUser:[email protected]:27017/debugging',
        ttl: {                          
            packets: {
                incoming: 86400,
                outgoing: 86400,
                will: 86400,
                retained: 86400
            },
            subscriptions: -1,
        },
    })
};


const broker = Aedes(brokerConfiguration);

const server = require('net').createServer(broker.handle);




server.listen(1883, () => {
    console.log(`Server started and listening on port ${1883}`);

});

broker.on('client', (client) => {
    console.log(`${client.id} has connected!`);
});

Subscriber

const mqtt = require('mqtt');
const clientId = "my-subscriber";



const options = {
    clientId: clientId,
    clean: false
};



const client = mqtt.connect('mqtt://127.0.0.1:1883', options);




let subscribeTo = [`receive/${clientId}/command`, `receive/${clientId}/firmware`];  
/* Subscribing to a single topic resolves the issue */



client.on("connect", (connack) => {

    console.log(`${client.options.clientId} has connected.`);

    client.subscribe(subscribeTo, { qos: 2 }, (err, granted) => {
        console.log(err, granted);
    });


    /* Subscribing twice works
    subscribeTo.forEach( sub => {
            client.subscribe(sub, { qos: 2 }, (err, granted) => {
                console.log(err, granted);
            });
        });
   */
   
});

client.on('message', (topic, message, packet) => {
    console.log("A message arrived...");
});

client.on('close', () => {
    console.log(`${clientId} has disconnected.`);
});

Publisher


const mqtt = require('mqtt');
const clientId = "my-publisher";



const options = {
    clientId: clientId,
    clean: true
};


const client = mqtt.connect('mqtt://127.0.0.1:1883', options);

const publishTo = "receive/my-subscriber/command";

client.on("connect", (connack) => {

    console.log(`${client.options.clientId} has connected.`);

    client.publish(publishTo, "hello", { qos: 2 }, (err) => {
        if(err) console.log(err);
        else console.log("my-publisher published a message!");
    });

});



client.on('close', () => {
    console.log(`${clientId} has disconnected.`);
});

albertocaponedev avatar Mar 29 '21 18:03 albertocaponedev

what's the content of granted in the failed subscribe?

robertsLando avatar Mar 30 '21 12:03 robertsLando

Client#subscribe doesn't fail, it freezes. So this question has not an answer. However if I close the broker via Aedes#close, Client#subscribe finally calls back and granted is 2, as expected.

albertocaponedev avatar Mar 30 '21 13:03 albertocaponedev

Are you using aedes 0.45.0? If so could you try with 0.44.2? It may be a regression introduced by #584.

@gnought @mcollina Thoughts?

robertsLando avatar Mar 30 '21 13:03 robertsLando

I tried. Nothing changed.

albertocaponedev avatar Mar 30 '21 13:03 albertocaponedev

Same happens to me, only when closing subscription and and subscribing again i see the retained publish i make. So essentially when I am subscribed and I publish a message nothing shows up. Any tip? I have async methods inside authorizators should I make them synchronous?

joanoliete avatar Nov 03 '21 14:11 joanoliete

It's not safe to have them async, we don't handle any rejection in case it happens. BTW should be ok if you remember to call the callback

robertsLando avatar Nov 03 '21 15:11 robertsLando

Could be because we are not iterating the subscriptions array: https://github.com/moscajs/aedes/blob/main/lib/handlers/subscribe.js#L77

robertsLando avatar Dec 03 '21 16:12 robertsLando