aiomqtt
aiomqtt copied to clipboard
Publish should not wait for confirmation
The publish()
method forces the user to wait for the message to be published. It is perfectly reasonable to publish a message with QOS > 0 when the client is disconnected. For example, on unreliable links, a loop in a thread/task is used to reconnect when the connection goes down. The publisher can continue on publishing at QOS > 0 without knowing anything about the status of the connection. This is not possible with the current publish()
implementation and puts a bigger implementation burden on the user of this library.
Thanks for opening this issue. Let me have a look. :)
It is perfectly reasonable to publish a message with QOS > 0 when the client is disconnected. For example, on unreliable links, a loop in a thread/task is used to reconnect when the connection goes down. The publisher can continue on publishing at QOS > 0 without knowing anything about the status of the connection.
I never considered that use case. Thanks for bringing it to my attention. :+1: Your question in #46 makes a lot more sense to me now.
To support this, we need to make some API-breaking changes. I'm up for it, though.
Make Client
work in "offline" mode
Old behaviour:
-
Client.publish
only works while online (connected to the server) -
Client
can only connect once and only disconnect once. You have to create a newClient
instance if you want to, e.g., reconnect to the server.
New behaviour:
-
Client.publish
works while offline (not connected to the server) -
Client
can connect to/disconnect from the server multiple times. We use the existingpaho.client.reconnect
logic to support this.
An example of the new API that these changes bring with them (based on the "Advanced example"):
async def ensure_connection(client):
reconnect_interval = 3 # [seconds]
while True:
try:
# Connect to the server
async with client:
# We pass the (connected) client to the advanced example
await advanced_example(client)
except MqttError as error:
print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
finally:
await asyncio.sleep(reconnect_interval)
async def post_to_topics(client, topics):
while True:
for topic in topics:
message = randrange(100)
print(f'[topic="{topic}"] Publishing message={message}')
# Note that we no longer use await here! If the client isn't connected, paho-mqtt queues the messages
# up internally.
client.publish(topic, message, qos=1)
await asyncio.sleep(2)
# There is only a single client instance. We reuse it again and again.
client = Client("test.mosquitto.org")
# Keep the client connected indefinitely
task = asyncio.create_task(ensure_connection(client))
tasks.add(task)
# Publish a random value to each of these topics
topics = (
"floors/basement/humidity",
"floors/rooftop/humidity",
"floors/rooftop/illuminance",
# 👉 Try to add more topics!
)
task = asyncio.create_task(post_to_topics(client, topics))
tasks.add(task)
Would something like the above example work for you?
As long as the publish
can still be done outside of the context manager, i.e. the async with client
, I think this design will work.
This is part of the experimental anyio-based branch. See #44.
Regarding this statement,
Client can only connect once and only disconnect once. You have to create a new Client instance if you want to, e.g., reconnect to the server.
Has anything changed in asyncio-mqtt
?
I have no problem with the following code, where I re-use the same client and connect/disconnect every second.
class Publisher:
def __init__(
self,
url,
topics
) -> None:
self.url = url
if isinstance(topics, str):
topics = [topics]
self.topics = topics
async def run(self):
i = 0
client = Client(self.url)
while True:
message = f"Message n{i}"
async with client:
for topic in self.topics:
await client.publish(topic, message.encode())
print(client._disconnected.done()) # Returns True
await asyncio.sleep(1)
i += 1
if __name__ == "__main__":
publisher = Publisher(MQTT_BROKER_ADDRESS, "some_topic")
asyncio.get_event_loop().run_until_complete(publisher.run())
Hi Robin, thanks for the comment here. :)
I have no problem with the following code, where I re-use the same client and connect/disconnect every second.
For now, asyncio_mqtt.Client
is officially a single-use context manager.
We (the asyncio-mqtt maintainers) may at some point announce that asyncio_mqtt.Client
is a reusable (but not reentrant) context manager.
It may be that asyncio_mqtt.Client
already is a reusable context manager. We (the asyncio-mqtt maintainers) just don't guarantee it yet.
Robin, if you want to, you can make a PR with a suite of tests that proves that asyncio_mqtt.Client
is reusable. With those tests in place, we can guarantee this property.
I don't think this is correct, QoS1 means could make sure that message was sent.It's not reliable to store message in memory, the only way to make sure messages sent is to store it in disk(which is too much work for a mqtt client), and send them one by one, or you MUST accept some message sending may be failed.
Making publish()
work outside the client context is very unexpected.
If the goal is to provide more convenience then e.g. a new function publish_with_buffer()
would imho make way more sense.
Curiously asking, why is making it work outside the client context so unexpected, shouldn't a client work like that being connected to the broker, publishes and reads messages when they come, but shouldn't the connection be persistent?
I might have misphrased, the unexpected part is the suggested behavior.: E.g. I call publish on a disconnected client and after the client connects the queued up publish call goes through. It's not clear at all that publish will behave like that and I even think it's reasonable to expect that this behavior is (sometimes) even undesired.
Yes @spacemanspiff2007, got your point, makes good sense to me now what you say. PS: It was misinterpretation on my end, thanks for taking out the time to re-explain.