aiomqtt icon indicating copy to clipboard operation
aiomqtt copied to clipboard

How to reconnect inside the Client context manager

Open mirko opened this issue 1 year ago • 9 comments

I read the docs and examples and tickets about reconnection.

My problem is: I don't want to wrap everything in a while-loop and re-execute everything just because of a connection issue, but potentially re-exec a failed call - while the messages are being iterated over in its own asyncio task.

Let's start - this is what I gathered from the docs, and essentially works:

import asyncio
import aiomqtt

async def msg_loop(mqtt_client):
    try:
        async for msg in mqtt_client.messages:
            print("[M]", msg)
    except aiomqtt.exceptions.MqttError as exc:
        print("[W]", "Connection issue while waiting on MQTT bus, waiting for reconnect..", exc)
        await asyncio.sleep(5)


async def main():
    print("[*]", "Connecting to MQTT broker", end='\r')
    mqtt_client = aiomqtt.Client("test.mosquitto.org", 1883)

    #asyncio.create_task(connect(mqtt_client))

    while True:
        try:
            async with mqtt_client:
                await mqtt_client.subscribe("tpc/sub")
                tsk_msg_loop = asyncio.create_task(msg_loop(mqtt_client))

                print("[>]")

                print("[1]", "Step 1..")
                await mqtt_client.publish("Foo")

                print("[2]", "Step 2..")
                await mqtt_client.publish("Bar")

                print("[N]", "Step N..")
                await mqtt_client.publish("End")

                print("[X]", "Done")

                tsk_msg_loop.cancel()

                break

        except aiomqtt.exceptions.MqttError as exc:
            print("[W]", "Connection error", exc)
            continue

asyncio.run(main())

The problem is: whenever there's a connection issue during a publish call, the whole code clock gets executed again. This is not what I want. I want to e.g. decide, whether to try again, to skip, or do whatever.

As far as I understand, I do have to do all that within the same context, so I cant just wrap every publish call around a(nother) async with mqtt_client.

I was also trying to put the (re)connect routine into its own task, which again doesn't work, because then the client context is not available in main() anymore. I'm losing hair over this seemingly simple issue and would very much appreciate some pointer on how to achieve that, as I can't see the wood for the trees anymore.

Thanks in advance!

mirko avatar Sep 08 '24 13:09 mirko