aiomqtt icon indicating copy to clipboard operation
aiomqtt copied to clipboard

Race condition with entering client context and receiving QoS > 0 queued messages.

Open laundmo opened this issue 3 years ago • 7 comments

I was trying to receive messages published while the client is offline, which MQTT does if the following conditions are fulfilled:

  • Fixed client ID (as you've done)
  • Always connect with clean_session=False
  • Subscriptions must be made with QoS>0
  • Messages published must have QoS>0

Now, this works when i use the simple example:

import asyncio
from asyncio_mqtt import Client, MqttError
from conf import *

asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

async def main():
    async with Client(
        MQTTHOST,
        MQTTPORT,
        username=MQTTUSER,
        password=MQTTPWD,
        client_id=MQTTID,
        clean_session=False,
    ) as client:
        async with client.filtered_messages("ame_test/data/#") as messages:
            await client.subscribe("ame_test/data/#", qos=1)
            async for message in messages:
                print(message.payload.decode())

asyncio.run(main())

This is receiving the messages that were published with QoS > 0 while the script was not running. The issue was that i wasn't getting the messages with my larger program using this library. After a LOT of debugging, trying things, and banging my head against the desk, i found out what the issue is: if the client context manager and the filtered_message or unfiltered_message context manager have any delay between them, the messages don't arrive. This seems to be because the paho client initially has a empty on_message handler.

Heres the exact same code as above, with a asyncio.sleep added to simulate the delay that was caused by other processes in my program. This does not recieve the queued QoS > 0 messages.

import asyncio
from asyncio_mqtt import Client, MqttError
from conf import *

asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

async def main():
    async with Client(
        MQTTHOST,
        MQTTPORT,
        username=MQTTUSER,
        password=MQTTPWD,
        client_id=MQTTID,
        clean_session=False,
    ) as client:
        await asyncio.sleep(1)
        async with client.filtered_messages("ame_test/data/#") as messages:
            await client.subscribe("ame_test/data/#", qos=1)
            async for message in messages:
                print(message.payload.decode())

asyncio.run(main())

Note: sometimes the first few queued messages dont arrive in the first version either. i might have 5 queued and only get 3 or 4.

laundmo avatar Dec 10 '21 10:12 laundmo

Hi laundmo. Thanks for opening this issue. Let me have a look. :)

A really appreciate the detailed and overall thorough bug report here. Good job. 👍

The issue was that i wasn't getting the messages with my larger program using this library. After a LOT of debugging, trying things, and banging my head against the desk, i found out what the issue is: if the client context manager and the filtered_message or unfiltered_message context manager have any delay between them, the messages don't arrive. This seems to be because the paho client initially has a empty on_message handler.

Sorry to hear about your frustration with asyncio-mqtt. I'm really glad that you kept going on and found the underlying issue. Let's try to solve it together.

The order of events (according to the current asyncio-mqtt recommendations):

  1. We create an asyncio_mqtt.Client object.
  2. We call Client.__aenter__. Internally, this calls paho's connect method.
  3. We call Client.filtered_messages (or unfiltered_messages). Internally, this sets the on_message callback in paho.

As you already figured out, this order of events is troublesome. Specifically, we may loose messages between (2) and (3). This is what you experienced in your application.

There are multiple solutions to this problem. I'll mention a couple below.

Change the asyncio-mqtt recommendation

In order to switch events (2) and (3) around, we change the asyncio-mqtt recommendation to something like this (adopted from your example code):

async def main():
    # Create client object (1)
    client = with Client(
        MQTTHOST,
        MQTTPORT,
        username=MQTTUSER,
        password=MQTTPWD,
        client_id=MQTTID,
        clean_session=False,
    )
    # Add message handler
    messages = client.filtered_messages("ame_test/data/#")
    # Register message handler (3) and connect the client (2)
    async with messages, client:
        await asyncio.sleep(1)  # We can now sleep for as long as we want since we fixed the race condition
        await client.subscribe("ame_test/data/#", qos=1)
        async for message in messages:
            print(message.payload.decode())

Pros: No change to the asyncio-mqtt API Cons: Unintuitive?

Open issue in paho.mqtt

Basically, forward this issue upstream. Persuade paho.mqtt to add support for the (1), (2), (3) order of events. That is, paho buffers the messages up internally until we set on_message.

Pros: No change to asyncio-mqtt at all Cons: This may take a long to materialize (if ever). Maybe the paho.mqtt developers don't want to support this use case.

Buffer up messages on asyncio-mqtt

Basically, set a dummy on_message handler on asyncio_mqtt.Client.__init__. Buffer up any messages that we receive before and release them when the user calls (un)filtered_messages

Pros: No change to the asyncio-mqtt API Cons: This goes beyond the "a simple wrapper around paho.mqtt" idea. Paho has many flaws. This is just one of them. If we start to work around these flaws, the work never ends.


There are probably other solutions as well. I specifically tried to maneuver around breaking changes to the asyncio-mqtt API.

Personally, I lean to the "Change the asyncio-mqtt recommendation" solution. We can achieve that quickly. What do you think? Do you have any other suggestions?

Thanks again for the thorough and detailed bug report. :) Keep it up. 👍

~Frederik

frederikaalund avatar Dec 11 '21 19:12 frederikaalund

Option 1: im not really sure about this, changing the recommended order like this makes the code less readable.

Option 2: i consider it very unlikely paho will change anything, since their callback based API intuitively leads to a structure where this won't be a issue: create client object, assign callbacks, connect.

Option 3: i can understand not wanting to queue messages due to the complexity involved.

my idea:

When a client instance is made and the context manager entered, don't call paho connect just yet. Wait until the user has set and started reading at least one message handler, then connect the paho client. That way, the handler has to already exist when the connection is made.

Pros:

  • No breaking change to the asyncio-mqtt API, only a minor one to add a argument to Client.__init__ which enables this feature.
  • Intuitive, when flag is set.

Cons:

  • Can only define a single handler before connection.

The con could be mitigated by allowing the user to manually connect once all the handlers are defined, which would require the argument to have a 3rd value to stop automatic connection and let the user connect, though at that point using a context manager might be moot and the user might as well use connect and disconnect manually.

laundmo avatar Dec 13 '21 08:12 laundmo

When a client instance is made and the context manager entered, don't call paho connect just yet. Wait until the user has set and started reading at least one message handler, then connect the paho client. That way, the handler has to already exist when the connection is made.

I like this idea. Though I don't like the con that you mention:

Can only define a single handler before connection.

That makes this keyword argument very special-purpose. I'd like to avoid that. It's a rabbit hole (difficult to maintain).

I like your mitigation idea:

The con could be mitigated by allowing the user to manually connect once all the handlers are defined, which would require the argument to have a 3rd value to stop automatic connection and let the user connect, though at that point using a context manager might be moot and the user might as well use connect and disconnect manually.

You propose something like this, right (correct me if I'm wrong):


async with with Client(..., skip_connect_on_init=True):  # Don't connect right away
    async with client.filtered_messages("ame_test/data/#") as messages:
      await client.subscribe("ame_test/data/#", qos=1)
      await client.connect()  # Connect manually after we set up all message handlers
      async for message in messages:
          print(message.payload.decode())

frederikaalund avatar Jan 16 '22 10:01 frederikaalund

Yes, the last example should allow for recieving all QOS messages. If that's a change you're willing to accept, i could PR that. shouldn't be too large of a change.

laundmo avatar Jan 16 '22 10:01 laundmo

Hereby approved. 👍 I try to find the time to review your PR. If I don't respond right away, then just ping me.

Sorry about the month-long wait time on this issue.

frederikaalund avatar Jan 16 '22 12:01 frederikaalund

Change the asyncio-mqtt recommendation

In order to switch events (2) and (3) around, we change the asyncio-mqtt recommendation to something like this (adopted from your example code):

async def main():
    # Create client object (1)
    client = with Client(
        MQTTHOST,
        MQTTPORT,
        username=MQTTUSER,
        password=MQTTPWD,
        client_id=MQTTID,
        clean_session=False,
    )
    # Add message handler
    messages = client.filtered_messages("ame_test/data/#")
    # Register message handler (3) and connect the client (2)
    async with messages, client:
        await asyncio.sleep(1)  # We can now sleep for as long as we want since we fixed the race condition
        await client.subscribe("ame_test/data/#", qos=1)
        async for message in messages:
            print(message.payload.decode())

Pros: No change to the asyncio-mqtt API Cons: Unintuitive?

Hi @frederikaalund

I'm having trouble making it work as per your suggestion. The following code raises an exception:

from asyncio_mqtt import Client
import asyncio

async def coro():
    client = Client('localhost', client_id='cliente', clean_session=False)
    messages = client.filtered_messages("topic")
    async with messages, client:
        await client.subscribe("topic", qos=1)
        async for message in messages:
            print(message.payload.decode())

asyncio.run(coro())

The exception is:

    async for message in messages:
TypeError: 'async for' requires an object with __aiter__ method, got _AsyncGeneratorContextManager

Whatever I do, i can't seem to overcome this error. Can you point me what am I doing wrong?

Please excuse me for reviving this old thread, I thought this would be the most relevant place to post this question. Last but not least, thank you for this library and your support Cheers! German

germanh1982 avatar Aug 18 '22 13:08 germanh1982

Sorry, I just found the other issue and this comment (https://github.com/sbtinstruments/asyncio-mqtt/issues/109#issuecomment-1067953122) and got it working without losing any messages:

from asyncio_mqtt import Client
import asyncio

async def coro():
    client = Client('localhost', client_id='cliente', clean_session=False)
    async with client.filtered_messages("topic") as messages:
        async with client:
            await client.subscribe("topic", qos=1)
            async for message in messages:
                print(message.payload.decode())

asyncio.run(coro())

Thanks again! German

germanh1982 avatar Aug 18 '22 13:08 germanh1982

I encounter the same problem, in these solutions I support that buffer up messages on asyncio-mqtt, anyway it's just a flaw of paho, not a bug, I (as a user) think that correct is more important than simplicity.

considering this, the API seems might should change, that seems means too much work and API break change might be painful.

YAGregor avatar Jan 12 '23 06:01 YAGregor

async def main():
    # Create client object (1)
    client = with Client(
        MQTTHOST,
        MQTTPORT,
        username=MQTTUSER,
        password=MQTTPWD,
        client_id=MQTTID,
        clean_session=False,
    )
    # Add message handler
    messages = client.filtered_messages("ame_test/data/#")
    # Register message handler (3) and connect the client (2)
    async with messages, client:
        await asyncio.sleep(1)  # We can now sleep for as long as we want since we fixed the race condition
        await client.subscribe("ame_test/data/#", qos=1)
        async for message in messages:
            print(message.payload.decode())

there is a small mistake,

async def main():
    # Create client object (1)
    client = with Client(
        MQTTHOST,
        MQTTPORT,
        username=MQTTUSER,
        password=MQTTPWD,
        client_id=MQTTID,
        clean_session=False,
    )
    # Register message handler (3) and connect the client (2)
    async with client.filtered_messages("ame_test/data/#") as messages, client:
        await asyncio.sleep(1)  # We can now sleep for as long as we want since we fixed the race condition
        await client.subscribe("ame_test/data/#", qos=1)
        async for message in messages:
            print(message.payload.decode())

this should fix it and it's less nested.

YAGregor avatar Jan 12 '23 09:01 YAGregor

there is a small mistake,

Thanks for chipping in! :+1: Though maybe I'm missing something but the two code snippets seem semantically identical to me. What's the intended "fix"?

frederikaalund avatar Jan 13 '23 08:01 frederikaalund

there is a small mistake,

Thanks for chipping in! +1 Though maybe I'm missing something but the two code snippets seem semantically identical to me. What's the intended "fix"?

just a syntax error, I tried the async with messages, client got exception TypeError: 'async for' requires an object with __aiter__ method, got _AsyncGeneratorContextManager


```python
async with messages as msg_iter: # messages is context manager and msg_iter is async iterable
    pass

must use as with client.messages() to get what __aenter__ return, that is different from with client because client.__aenter__ returns itself.

YAGregor avatar Jan 16 '23 06:01 YAGregor

just a syntax error, I tried the async with messages, client got exception TypeError: 'async for' requires an object with aiter method, got _AsyncGeneratorContextManager

That makes sense! I forgot about that level of indirection. Thanks for the correction. :+1:

frederikaalund avatar Jan 16 '23 06:01 frederikaalund

This is a very interesting discussion 👍 In 2.0.0 we removed filtered_messages(), unfiltered_messages(), and messages() in favor of a single client-wide queue. Incidentally, this means that we now set paho's _on_message callback already inside the client's __init__ method, which was one of the possible solutions discussed here.

This example does not lose message, even though we sleep before entering the messages generator:

import asyncio
import aiomqtt


async def main():
    client = aiomqtt.Client(
        hostname="test.mosquitto.org",
        identifier="example",
        clean_session=False,
    )
    # Initial subscription with QoS=1
    async with client:
        await client.subscribe("foo", qos=1)
    # Publish messages with QoS=1 and unrelated client
    async with aiomqtt.Client("test.mosquitto.org") as publisher:
        await publisher.publish("foo", 1, qos=1)
        await publisher.publish("foo", 2, qos=1)
        await publisher.publish("foo", 3, qos=1)
    # Receive messages with the initial client after reconnection and sleep
    async with client:
        await asyncio.sleep(2)
        async for message in client.messages:
            print(message.payload)


asyncio.run(main())

Let me know if that solves this issue! 😊

empicano avatar Jan 19 '24 01:01 empicano

If you notice that this issue is not solved after all, please reopen!

empicano avatar Feb 04 '24 00:02 empicano