aiomqtt
aiomqtt copied to clipboard
More than one subscription, routing it to a specific handler
The example denotes the following.
async with Client("test.mosquitto.org") as client:
async with client.messages() as messages:
await client.subscribe("humidity/#")
async for message in messages:
print(message.payload)
If a user wants to also test for temperature:
async with Client("test.mosquitto.org") as client:
async with client.messages() as messages:
await client.subscribe("humidity/#")
await client.subscribe("temperature/#")
async for message in messages:
if message.topic.startswith('humidity'):
print(message.payload)
elif message.topic.startswith('temperature'):
print(message.payload)
Is there any way to do this more elegantly? Maybe even in a concurrent way?
Pseudo:
async with Client("test.mosquitto.org") as client:
async with client.messages() as messages:
await client.subscribe("humidity/#")
async for message in messages:
print(message.payload)
async with client.messages() as messages:
await client.subscribe("temperature/#")
async for message in messages:
print(message.payload)
Hi there Stefan,
I'll respond to your question of handling incoming messages concurrently. We can also happily discuss how to make aiomqtt more elegant if you have specific ideas!
The second admonition in the "Message queue" section of our docs shows a small example of how to handle each message in a separate coroutine. However, as far as I understand your pseudocode, you'd like to have one coroutine per subscription instead of per message.
We'll need to dive a bit into asyncio for this 😋 The idea is to implement a "distributor" that sorts the incoming messages into different asyncio queues, which are then processed concurrently (but sequentially inside a subscription). Here's a minimal working example:
import asyncio
import aiomqtt
async def fast_producer(client: aiomqtt.Client):
while True:
await asyncio.sleep(0.2)
await client.publish("fast", "fast")
async def fast_consumer():
while True:
message = await fast_queue.get()
print(f"Fast consumer received: {message.payload}")
async def slow_producer(client: aiomqtt.Client):
while True:
await asyncio.sleep(2)
await client.publish("slow", "slow")
async def slow_consumer():
while True:
message = await slow_queue.get()
print(f"Slow consumer received: {message.payload}")
fast_queue = asyncio.Queue()
slow_queue = asyncio.Queue()
async def distributor(client: aiomqtt.Client):
async with client.messages() as messages:
await client.subscribe("fast")
await client.subscribe("slow")
# Sort messages into the appropriate queues
async for message in messages:
if message.topic.matches("fast"):
fast_queue.put_nowait(message)
elif message.topic.matches("slow"):
slow_queue.put_nowait(message)
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
# Use a task group to manage and await all tasks
async with asyncio.TaskGroup() as tg:
tg.create_task(fast_producer(client))
tg.create_task(fast_consumer())
tg.create_task(slow_producer(client))
tg.create_task(slow_consumer())
tg.create_task(distributor(client))
asyncio.run(main())
Does that make sense? 🙂 Issue #250 is similar and could have some more context.
@empicano what I would love to avoid is the if .. elif
-construction, and apply a more functional (or for example with aiogram filter/decorator) approach. I mean, if you create the distributor you showed in the example, you will still get the second variant. For this specific project (a telegram bot) I don't really need an extremely high performance coroutine. In a different project I discussed with the author of flashmq, what would be the best way to have very big (far bigger than regular burst of the retained messages), his suggestion was to create separate subscriptions like sharding the load. I wonder if there is a way to avoid 'multiple clients' and have this handled under the hood.
We had aiomqtt.filtered_messages
before (in fact, this is still in the code, but deprecated now). This turned out to be more complex and less understandable than it should be. Simple if
statements are easy and flexible. You can read a bit more about this design decision in this PR.
I agree with you however that aiomqtt is not yet as elegant as it could be.
@empicano was there a change?
Hi Stefan,
I should have commented something before closing, sorry about that. I'm currently weeding out issues to get an overview of what to work on.
Do you have specific changes to aiomqtt that you still want to discuss?
If you say "just solve it with if-then-else" that is fine for me. But I wish some way of routing would exists based on on the semantics that is allowed making subscriptions (the + #). Ideally a bit bigger than that so a single subscription could be used as well, with matching happening inside of the application. Might even be more elegant that the actual subscription would not be required at all, and the annotations figuring out the 'best' subscription(s).
You probably know this, but the Topic.matches
method can match wildcards as well, e.g. Topic.matches("foo/+/bar/#")
.
I'm happy to discuss possible improvements to the interface. What matters most to me that it's intuitive. Could you provide some (pseudo-)code of how your ideal interface would look like?
You probably know this, but the
Topic.matches
method can match wildcards as well, e.g.Topic.matches("foo/+/bar/#")
.
Did not know this :-) I literally redo all those things manually so it would be already an improvement.
I'm happy to discuss possible improvements to the interface. What matters most to me that it's intuitive. Could you provide some (pseudo-)code of how your ideal interface would look like?
@subscription(topic="/temperature/#')
async def handleTemperature(message: Message):
print(message.payload)
@subscription(topic="/humidity/{room}/#') # Analogue to aiohttp's @routes.post('/{room}/something.xml')
async def handleHumidity(message: Message, room: str):
print(message.payload)
With the above it may become obvious that creating two subscription would be the best towards the server, and a subscription on '/#' wasteful.
Interesting, this seems similar to how fastapi-mqtt is designed, and in general many web frameworks.
Some thoughts on this:
- This design pulls subscribing, receiving, and publishing messages out of the client's context manager. This means that the client must reconnect and recover automatically (which it currently does not). We have discussed automatic reconnection before, and it's still something I'd like to see in aiomqtt.
- This design assumes that the MQTT client should run from the beginning to the end of the execution, like a web framework. This is not always the case. You might have a sensor that shuts down the MQTT connection for a few hours to save battery power, but still continues measuring. Also, clients should be able to subscribe and unsubscribe from topics dynamically.
- I like how this would parse wildcards into variables (the
humidity/{room}
part). It would be cool to add this if it fits well somewhere.
I guess what could work is to pass a handler when calling subscribe and assigning the messages to the correct handler under the hood instead of letting the user do that with if/else
(pseudocode):
async def handle_temperature(message):
print(message.payload)
async def handle_humidity(message, room):
print(message.payload)
async with Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#", handle_temperature)
await client.subscribe("humidity/{room}/#", handle_humidity)
# Block here or do something else such that we don't exit the context manager
However, this would be a big change to the interface that I'm not sure is worth it. Some more intricate ways of prioritizing and handling messages concurrently would for example be more difficult to achieve. I believe that one of aiomqtt's strengths is that it's very flexible. The if/else
design is simple, but I'm reluctant to move away from it, because it's so intuitive and flexible.
@frederikaalund, what do you think about this? 😊