aiomqtt
aiomqtt copied to clipboard
Improve message routing
This proposes a simpler way to filter messages and structure message handling.
Unsorted points that I considered:
- This lets us split message handling into multiple files via multiple routers (without circular imports)
- This lets us use the client inside a handler function, to e.g. publish a message back in a request/response fashion
- We can dynamically subscribe and unsubscribe
- The values of wildcards (
+
/#
) of the topic filter are automatically available as*args
in the handler function - We still only have a single message queue (easier for newcomers, concurrency could be implemented as shown below, optionally with priority queue)
- We can still pass a non-default queue to the client to prioritize the handling of certain messages
- We still have flexibility to not use the routers, but handle the messages directly. Routers are a natural development, once the application gets too complex we can iteratively add them
- We are still flexible enough to process messages concurrently in an individual way (i.e. our message loop is still transparent)
- It's a non-breaking change
The interface looks like this:
router = aiomqtt.Router()
@router.match("humidity/+/inside/#")
async def handle(message, client, *args): # automatically extract wildcards into *args
print(message.payload)
async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
await client.subscribe("humidity/#")
async for message in client.messages:
await client.route(message)
Where we can process messages concurrently e.g. like this:
router = aiomqtt.Router()
@router.match("humidity/+/inside/#")
async def handle(message, client, *args): # automatically extract wildcards into *args
print(message.payload)
async def work(client):
async for message in client._messages():
await client.route(message)
async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
await client.subscribe("humidity/#")
async with asyncio.TaskGroup() as tg:
tg.create_task(work(client))
tg.create_task(work(client))
Glad to hear feedback on this 🙂