aiomqtt
aiomqtt copied to clipboard
Race condition with entering client context and receiving QoS > 0 queued messages.
I was trying to receive messages published while the client is offline, which MQTT does if the following conditions are fulfilled:
- Fixed client ID (as you've done)
- Always connect with clean_session=False
- Subscriptions must be made with QoS>0
- Messages published must have QoS>0
Now, this works when i use the simple example:
import asyncio
from asyncio_mqtt import Client, MqttError
from conf import *
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
async def main():
async with Client(
MQTTHOST,
MQTTPORT,
username=MQTTUSER,
password=MQTTPWD,
client_id=MQTTID,
clean_session=False,
) as client:
async with client.filtered_messages("ame_test/data/#") as messages:
await client.subscribe("ame_test/data/#", qos=1)
async for message in messages:
print(message.payload.decode())
asyncio.run(main())
This is receiving the messages that were published with QoS > 0 while the script was not running. The issue was that i wasn't getting the messages with my larger program using this library. After a LOT of debugging, trying things, and banging my head against the desk, i found out what the issue is: if the client context manager and the filtered_message or unfiltered_message context manager have any delay between them, the messages don't arrive. This seems to be because the paho client initially has a empty on_message handler.
Heres the exact same code as above, with a asyncio.sleep added to simulate the delay that was caused by other processes in my program. This does not recieve the queued QoS > 0 messages.
import asyncio
from asyncio_mqtt import Client, MqttError
from conf import *
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
async def main():
async with Client(
MQTTHOST,
MQTTPORT,
username=MQTTUSER,
password=MQTTPWD,
client_id=MQTTID,
clean_session=False,
) as client:
await asyncio.sleep(1)
async with client.filtered_messages("ame_test/data/#") as messages:
await client.subscribe("ame_test/data/#", qos=1)
async for message in messages:
print(message.payload.decode())
asyncio.run(main())
Note: sometimes the first few queued messages dont arrive in the first version either. i might have 5 queued and only get 3 or 4.
Hi laundmo. Thanks for opening this issue. Let me have a look. :)
A really appreciate the detailed and overall thorough bug report here. Good job. 👍
The issue was that i wasn't getting the messages with my larger program using this library. After a LOT of debugging, trying things, and banging my head against the desk, i found out what the issue is: if the client context manager and the filtered_message or unfiltered_message context manager have any delay between them, the messages don't arrive. This seems to be because the paho client initially has a empty on_message handler.
Sorry to hear about your frustration with asyncio-mqtt. I'm really glad that you kept going on and found the underlying issue. Let's try to solve it together.
The order of events (according to the current asyncio-mqtt recommendations):
- We create an
asyncio_mqtt.Client
object. - We call
Client.__aenter__
. Internally, this calls paho'sconnect
method. - We call
Client.filtered_messages
(orunfiltered_messages
). Internally, this sets theon_message
callback in paho.
As you already figured out, this order of events is troublesome. Specifically, we may loose messages between (2) and (3). This is what you experienced in your application.
There are multiple solutions to this problem. I'll mention a couple below.
Change the asyncio-mqtt recommendation
In order to switch events (2) and (3) around, we change the asyncio-mqtt recommendation to something like this (adopted from your example code):
async def main():
# Create client object (1)
client = with Client(
MQTTHOST,
MQTTPORT,
username=MQTTUSER,
password=MQTTPWD,
client_id=MQTTID,
clean_session=False,
)
# Add message handler
messages = client.filtered_messages("ame_test/data/#")
# Register message handler (3) and connect the client (2)
async with messages, client:
await asyncio.sleep(1) # We can now sleep for as long as we want since we fixed the race condition
await client.subscribe("ame_test/data/#", qos=1)
async for message in messages:
print(message.payload.decode())
Pros: No change to the asyncio-mqtt API Cons: Unintuitive?
Open issue in paho.mqtt
Basically, forward this issue upstream. Persuade paho.mqtt to add support for the (1), (2), (3) order of events. That is, paho buffers the messages up internally until we set on_message
.
Pros: No change to asyncio-mqtt at all Cons: This may take a long to materialize (if ever). Maybe the paho.mqtt developers don't want to support this use case.
Buffer up messages on asyncio-mqtt
Basically, set a dummy on_message
handler on asyncio_mqtt.Client.__init__
. Buffer up any messages that we receive before and release them when the user calls (un)filtered_messages
Pros: No change to the asyncio-mqtt API Cons: This goes beyond the "a simple wrapper around paho.mqtt" idea. Paho has many flaws. This is just one of them. If we start to work around these flaws, the work never ends.
There are probably other solutions as well. I specifically tried to maneuver around breaking changes to the asyncio-mqtt API.
Personally, I lean to the "Change the asyncio-mqtt recommendation" solution. We can achieve that quickly. What do you think? Do you have any other suggestions?
Thanks again for the thorough and detailed bug report. :) Keep it up. 👍
~Frederik
Option 1: im not really sure about this, changing the recommended order like this makes the code less readable.
Option 2: i consider it very unlikely paho will change anything, since their callback based API intuitively leads to a structure where this won't be a issue: create client object, assign callbacks, connect.
Option 3: i can understand not wanting to queue messages due to the complexity involved.
my idea:
When a client instance is made and the context manager entered, don't call paho connect just yet. Wait until the user has set and started reading at least one message handler, then connect the paho client. That way, the handler has to already exist when the connection is made.
Pros:
- No breaking change to the asyncio-mqtt API, only a minor one to add a argument to
Client.__init__
which enables this feature. - Intuitive, when flag is set.
Cons:
- Can only define a single handler before connection.
The con could be mitigated by allowing the user to manually connect once all the handlers are defined, which would require the argument to have a 3rd value to stop automatic connection and let the user connect, though at that point using a context manager might be moot and the user might as well use connect and disconnect manually.
When a client instance is made and the context manager entered, don't call paho connect just yet. Wait until the user has set and started reading at least one message handler, then connect the paho client. That way, the handler has to already exist when the connection is made.
I like this idea. Though I don't like the con that you mention:
Can only define a single handler before connection.
That makes this keyword argument very special-purpose. I'd like to avoid that. It's a rabbit hole (difficult to maintain).
I like your mitigation idea:
The con could be mitigated by allowing the user to manually connect once all the handlers are defined, which would require the argument to have a 3rd value to stop automatic connection and let the user connect, though at that point using a context manager might be moot and the user might as well use connect and disconnect manually.
You propose something like this, right (correct me if I'm wrong):
async with with Client(..., skip_connect_on_init=True): # Don't connect right away
async with client.filtered_messages("ame_test/data/#") as messages:
await client.subscribe("ame_test/data/#", qos=1)
await client.connect() # Connect manually after we set up all message handlers
async for message in messages:
print(message.payload.decode())
Yes, the last example should allow for recieving all QOS messages. If that's a change you're willing to accept, i could PR that. shouldn't be too large of a change.
Hereby approved. 👍 I try to find the time to review your PR. If I don't respond right away, then just ping me.
Sorry about the month-long wait time on this issue.
Change the asyncio-mqtt recommendation
In order to switch events (2) and (3) around, we change the asyncio-mqtt recommendation to something like this (adopted from your example code):
async def main(): # Create client object (1) client = with Client( MQTTHOST, MQTTPORT, username=MQTTUSER, password=MQTTPWD, client_id=MQTTID, clean_session=False, ) # Add message handler messages = client.filtered_messages("ame_test/data/#") # Register message handler (3) and connect the client (2) async with messages, client: await asyncio.sleep(1) # We can now sleep for as long as we want since we fixed the race condition await client.subscribe("ame_test/data/#", qos=1) async for message in messages: print(message.payload.decode())
Pros: No change to the asyncio-mqtt API Cons: Unintuitive?
Hi @frederikaalund
I'm having trouble making it work as per your suggestion. The following code raises an exception:
from asyncio_mqtt import Client
import asyncio
async def coro():
client = Client('localhost', client_id='cliente', clean_session=False)
messages = client.filtered_messages("topic")
async with messages, client:
await client.subscribe("topic", qos=1)
async for message in messages:
print(message.payload.decode())
asyncio.run(coro())
The exception is:
async for message in messages:
TypeError: 'async for' requires an object with __aiter__ method, got _AsyncGeneratorContextManager
Whatever I do, i can't seem to overcome this error. Can you point me what am I doing wrong?
Please excuse me for reviving this old thread, I thought this would be the most relevant place to post this question. Last but not least, thank you for this library and your support Cheers! German
Sorry, I just found the other issue and this comment (https://github.com/sbtinstruments/asyncio-mqtt/issues/109#issuecomment-1067953122) and got it working without losing any messages:
from asyncio_mqtt import Client
import asyncio
async def coro():
client = Client('localhost', client_id='cliente', clean_session=False)
async with client.filtered_messages("topic") as messages:
async with client:
await client.subscribe("topic", qos=1)
async for message in messages:
print(message.payload.decode())
asyncio.run(coro())
Thanks again! German
I encounter the same problem, in these solutions I support that buffer up messages on asyncio-mqtt, anyway it's just a flaw of paho, not a bug, I (as a user) think that correct is more important than simplicity.
considering this, the API seems might should change, that seems means too much work and API break change might be painful.
async def main():
# Create client object (1)
client = with Client(
MQTTHOST,
MQTTPORT,
username=MQTTUSER,
password=MQTTPWD,
client_id=MQTTID,
clean_session=False,
)
# Add message handler
messages = client.filtered_messages("ame_test/data/#")
# Register message handler (3) and connect the client (2)
async with messages, client:
await asyncio.sleep(1) # We can now sleep for as long as we want since we fixed the race condition
await client.subscribe("ame_test/data/#", qos=1)
async for message in messages:
print(message.payload.decode())
there is a small mistake,
async def main():
# Create client object (1)
client = with Client(
MQTTHOST,
MQTTPORT,
username=MQTTUSER,
password=MQTTPWD,
client_id=MQTTID,
clean_session=False,
)
# Register message handler (3) and connect the client (2)
async with client.filtered_messages("ame_test/data/#") as messages, client:
await asyncio.sleep(1) # We can now sleep for as long as we want since we fixed the race condition
await client.subscribe("ame_test/data/#", qos=1)
async for message in messages:
print(message.payload.decode())
this should fix it and it's less nested.
there is a small mistake,
Thanks for chipping in! :+1: Though maybe I'm missing something but the two code snippets seem semantically identical to me. What's the intended "fix"?
there is a small mistake,
Thanks for chipping in! +1 Though maybe I'm missing something but the two code snippets seem semantically identical to me. What's the intended "fix"?
just a syntax error, I tried the async with messages, client
got exception TypeError: 'async for' requires an object with __aiter__ method, got _AsyncGeneratorContextManager
```python
async with messages as msg_iter: # messages is context manager and msg_iter is async iterable
pass
must use as
with client.messages()
to get what __aenter__
return, that is different from with client
because client.__aenter__
returns itself.
just a syntax error, I tried the async with messages, client got exception TypeError: 'async for' requires an object with aiter method, got _AsyncGeneratorContextManager
That makes sense! I forgot about that level of indirection. Thanks for the correction. :+1:
This is a very interesting discussion 👍 In 2.0.0 we removed filtered_messages()
, unfiltered_messages()
, and messages()
in favor of a single client-wide queue. Incidentally, this means that we now set paho's _on_message
callback already inside the client's __init__
method, which was one of the possible solutions discussed here.
This example does not lose message, even though we sleep before entering the messages
generator:
import asyncio
import aiomqtt
async def main():
client = aiomqtt.Client(
hostname="test.mosquitto.org",
identifier="example",
clean_session=False,
)
# Initial subscription with QoS=1
async with client:
await client.subscribe("foo", qos=1)
# Publish messages with QoS=1 and unrelated client
async with aiomqtt.Client("test.mosquitto.org") as publisher:
await publisher.publish("foo", 1, qos=1)
await publisher.publish("foo", 2, qos=1)
await publisher.publish("foo", 3, qos=1)
# Receive messages with the initial client after reconnection and sleep
async with client:
await asyncio.sleep(2)
async for message in client.messages:
print(message.payload)
asyncio.run(main())
Let me know if that solves this issue! 😊
If you notice that this issue is not solved after all, please reopen!