aiomqtt icon indicating copy to clipboard operation
aiomqtt copied to clipboard

Improve message routing

Open empicano opened this issue 8 months ago • 1 comments

This proposes a simpler way to filter messages and structure message handling.

Unsorted points that I considered:

  • This lets us split message handling into multiple files via multiple routers (without circular imports)
  • This lets us use the client inside a handler function, to e.g. publish a message back in a request/response fashion
  • We can dynamically subscribe and unsubscribe
  • The values of wildcards (+/#) of the topic filter are automatically available as *args in the handler function
  • We still only have a single message queue (easier for newcomers, concurrency could be implemented as shown below, optionally with priority queue)
  • We can still pass a non-default queue to the client to prioritize the handling of certain messages
  • We still have flexibility to not use the routers, but handle the messages directly. Routers are a natural development, once the application gets too complex we can iteratively add them
  • We are still flexible enough to process messages concurrently in an individual way (i.e. our message loop is still transparent)
  • It's a non-breaking change

The interface looks like this:

router = aiomqtt.Router()


@router.match("humidity/+/inside/#")
async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)


async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
    await client.subscribe("humidity/#")
    async for message in client.messages:
        await client.route(message)

Where we can process messages concurrently e.g. like this:

router = aiomqtt.Router()


@router.match("humidity/+/inside/#")
async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)


async def work(client):
    async for message in client._messages():
        await client.route(message)


async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
    await client.subscribe("humidity/#")
    async with asyncio.TaskGroup() as tg:
        tg.create_task(work(client))
        tg.create_task(work(client))

Glad to hear feedback on this 🙂

empicano avatar Jun 02 '24 00:06 empicano