aiomqtt icon indicating copy to clipboard operation
aiomqtt copied to clipboard

More than one subscription, routing it to a specific handler

Open skinkie opened this issue 7 months ago • 18 comments

The example denotes the following.

async with Client("test.mosquitto.org") as client:
    async with client.messages() as messages:
        await client.subscribe("humidity/#")
        async for message in messages:
            print(message.payload)

If a user wants to also test for temperature:

async with Client("test.mosquitto.org") as client:
    async with client.messages() as messages:
        await client.subscribe("humidity/#")
        await client.subscribe("temperature/#")
        async for message in messages:
            if message.topic.startswith('humidity'):
                print(message.payload)
            elif message.topic.startswith('temperature'):
                print(message.payload)

Is there any way to do this more elegantly? Maybe even in a concurrent way?

Pseudo:

async with Client("test.mosquitto.org") as client:
    async with client.messages() as messages:
        await client.subscribe("humidity/#")
        async for message in messages:
            print(message.payload)

    async with client.messages() as messages:
        await client.subscribe("temperature/#")
        async for message in messages:
            print(message.payload)

skinkie avatar Dec 21 '23 22:12 skinkie

Hi there Stefan,

I'll respond to your question of handling incoming messages concurrently. We can also happily discuss how to make aiomqtt more elegant if you have specific ideas!

The second admonition in the "Message queue" section of our docs shows a small example of how to handle each message in a separate coroutine. However, as far as I understand your pseudocode, you'd like to have one coroutine per subscription instead of per message.

We'll need to dive a bit into asyncio for this 😋 The idea is to implement a "distributor" that sorts the incoming messages into different asyncio queues, which are then processed concurrently (but sequentially inside a subscription). Here's a minimal working example:

import asyncio
import aiomqtt


async def fast_producer(client: aiomqtt.Client):
    while True:
        await asyncio.sleep(0.2)
        await client.publish("fast", "fast")


async def fast_consumer():
    while True:
        message = await fast_queue.get()
        print(f"Fast consumer received: {message.payload}")


async def slow_producer(client: aiomqtt.Client):
    while True:
        await asyncio.sleep(2)
        await client.publish("slow", "slow")


async def slow_consumer():
    while True:
        message = await slow_queue.get()
        print(f"Slow consumer received: {message.payload}")


fast_queue = asyncio.Queue()
slow_queue = asyncio.Queue()


async def distributor(client: aiomqtt.Client):
    async with client.messages() as messages:
        await client.subscribe("fast")
        await client.subscribe("slow")
        # Sort messages into the appropriate queues
        async for message in messages:
            if message.topic.matches("fast"):
                fast_queue.put_nowait(message)
            elif message.topic.matches("slow"):
                slow_queue.put_nowait(message)


async def main():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        # Use a task group to manage and await all tasks
        async with asyncio.TaskGroup() as tg:
            tg.create_task(fast_producer(client))
            tg.create_task(fast_consumer())
            tg.create_task(slow_producer(client))
            tg.create_task(slow_consumer())
            tg.create_task(distributor(client))


asyncio.run(main())

Does that make sense? 🙂 Issue #250 is similar and could have some more context.

empicano avatar Dec 23 '23 22:12 empicano

@empicano what I would love to avoid is the if .. elif-construction, and apply a more functional (or for example with aiogram filter/decorator) approach. I mean, if you create the distributor you showed in the example, you will still get the second variant. For this specific project (a telegram bot) I don't really need an extremely high performance coroutine. In a different project I discussed with the author of flashmq, what would be the best way to have very big (far bigger than regular burst of the retained messages), his suggestion was to create separate subscriptions like sharding the load. I wonder if there is a way to avoid 'multiple clients' and have this handled under the hood.

skinkie avatar Dec 23 '23 22:12 skinkie

We had aiomqtt.filtered_messages before (in fact, this is still in the code, but deprecated now). This turned out to be more complex and less understandable than it should be. Simple if statements are easy and flexible. You can read a bit more about this design decision in this PR.

I agree with you however that aiomqtt is not yet as elegant as it could be.

empicano avatar Dec 23 '23 23:12 empicano

@empicano was there a change?

skinkie avatar Jan 23 '24 08:01 skinkie

Hi Stefan,

I should have commented something before closing, sorry about that. I'm currently weeding out issues to get an overview of what to work on.

Do you have specific changes to aiomqtt that you still want to discuss?

empicano avatar Jan 23 '24 16:01 empicano

If you say "just solve it with if-then-else" that is fine for me. But I wish some way of routing would exists based on on the semantics that is allowed making subscriptions (the + #). Ideally a bit bigger than that so a single subscription could be used as well, with matching happening inside of the application. Might even be more elegant that the actual subscription would not be required at all, and the annotations figuring out the 'best' subscription(s).

skinkie avatar Jan 23 '24 16:01 skinkie

You probably know this, but the Topic.matches method can match wildcards as well, e.g. Topic.matches("foo/+/bar/#").

I'm happy to discuss possible improvements to the interface. What matters most to me that it's intuitive. Could you provide some (pseudo-)code of how your ideal interface would look like?

empicano avatar Jan 23 '24 17:01 empicano

You probably know this, but the Topic.matches method can match wildcards as well, e.g. Topic.matches("foo/+/bar/#").

Did not know this :-) I literally redo all those things manually so it would be already an improvement.

I'm happy to discuss possible improvements to the interface. What matters most to me that it's intuitive. Could you provide some (pseudo-)code of how your ideal interface would look like?

@subscription(topic="/temperature/#')
async def handleTemperature(message: Message):
    print(message.payload)

@subscription(topic="/humidity/{room}/#') # Analogue to aiohttp's @routes.post('/{room}/something.xml')
async def handleHumidity(message: Message, room: str):
    print(message.payload)

With the above it may become obvious that creating two subscription would be the best towards the server, and a subscription on '/#' wasteful.

skinkie avatar Jan 23 '24 17:01 skinkie

Interesting, this seems similar to how fastapi-mqtt is designed, and in general many web frameworks.

Some thoughts on this:

  • This design pulls subscribing, receiving, and publishing messages out of the client's context manager. This means that the client must reconnect and recover automatically (which it currently does not). We have discussed automatic reconnection before, and it's still something I'd like to see in aiomqtt.
  • This design assumes that the MQTT client should run from the beginning to the end of the execution, like a web framework. This is not always the case. You might have a sensor that shuts down the MQTT connection for a few hours to save battery power, but still continues measuring. Also, clients should be able to subscribe and unsubscribe from topics dynamically.
  • I like how this would parse wildcards into variables (the humidity/{room} part). It would be cool to add this if it fits well somewhere.

I guess what could work is to pass a handler when calling subscribe and assigning the messages to the correct handler under the hood instead of letting the user do that with if/else (pseudocode):

async def handle_temperature(message):
    print(message.payload)

async def handle_humidity(message, room):
    print(message.payload)

async with Client("test.mosquitto.org") as client:
    await client.subscribe("temperature/#", handle_temperature)
    await client.subscribe("humidity/{room}/#", handle_humidity)
    # Block here or do something else such that we don't exit the context manager

However, this would be a big change to the interface that I'm not sure is worth it. Some more intricate ways of prioritizing and handling messages concurrently would for example be more difficult to achieve. I believe that one of aiomqtt's strengths is that it's very flexible. The if/else design is simple, but I'm reluctant to move away from it, because it's so intuitive and flexible.

@frederikaalund, what do you think about this? 😊

empicano avatar Jan 24 '24 20:01 empicano