aiomqtt icon indicating copy to clipboard operation
aiomqtt copied to clipboard

Calling client.subscribe with a zero length list causes TimeoutError

Open flyte opened this issue 3 years ago • 3 comments

Calling client.subscribe([]) should fail/return immediately, but it gets passed on to the paho client, which happily subscribes to no topics. The client.subscribe function then sets up a callback to wait for paho to fire an on_subscribe callback, but it never does, because there were no topics subscribed to.

Recreate with:

import asyncio

from asyncio_mqtt import Client


async def main():
    client = Client("test.mosquitto.org")

    print("Connecting...")
    await client.connect()
    print("Connected!")

    print("Subscribing...")
    await client.subscribe()
    print("Subscribed!")

    print("Disconnecting...")
    await client.disconnect()
    print("Disconnected!")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()
        loop.stop()

The client.subscribe function for reference:

    async def subscribe(self, *args, timeout=10, **kwargs):
        result, mid = self._client.subscribe(*args, **kwargs)
        # Early out on error
        if result != mqtt.MQTT_ERR_SUCCESS:
            raise MqttCodeError(result, 'Could not subscribe to topic')
        # Create future for when the on_subscribe callback is called
        cb_result = asyncio.Future()
        with self._pending_call(mid, cb_result):
            # Wait for cb_result
            return await self._wait_for(cb_result, timeout=timeout)

flyte avatar Feb 24 '21 17:02 flyte

As a side note, does this callback mechanism support multiple subscriptions? Does paho call back with once per subscribed topic, or once per invocation of self._client.subscribe()?

flyte avatar Feb 24 '21 17:02 flyte

Nice find. I never tested this case. Reminds me that we also need test cases. :))

As a side note, does this callback mechanism support multiple subscriptions? Does paho call back with once per subscribed topic, or once per invocation of self._client.subscribe()?

I'm not sure that I completely follow. :)

When we call self._client.subscribe(), paho sends a SUBSCRIBE packet. When paho receives a SUBACK packet from the server, it calls the on_subscribe callback.

There is also this interesting sentence from the MQTTv5 spec:

When the Server receives a SUBSCRIBE packet from a Client, the Server MUST respond with a SUBACK packet.

So if everything goes as planned (e.g., no network errors), we will receive exactly one on_subscribe call for every self._client.subscribe call that we make.

Does that answer your question?

frederikaalund avatar Feb 24 '21 20:02 frederikaalund

Maybe I over-complicated that, sorry. :))

Yes, it's completely valid to call self._client_subscribe multiple times. Even with overlapping topic strings. :+1:

frederikaalund avatar Feb 24 '21 20:02 frederikaalund