paho.mqtt.python
paho.mqtt.python copied to clipboard
Timeout subscribe.simple
There is anyway to set timeout to subscribe.simple? Thanks
No, not as currently implemented in paho. I've made a function that mimics subscribe.simple
with a timeout:
import socket
import threading
from paho.mqtt.client import Client
def subscribe(
topics: str | list[str],
hostname: str,
timeout: Optional[float] = None,
**mqtt_kwargs,
):
"""
Modeled closely after the paho version, this also includes some try/excepts and
a timeout. Note that this _does_ disconnect after receiving a single message.
"""
lock: Optional[threading.Lock]
def on_connect(client, userdata, flags, rc):
client.subscribe(userdata["topics"])
return
def on_message(client, userdata, message):
userdata["messages"] = message
client.disconnect()
if userdata["lock"]:
userdata["lock"].release()
return
if timeout:
lock = threading.Lock()
else:
lock = None
topics = [topics] if isinstance(topics, str) else topics
userdata: dict[str, Any] = {
"topics": [(topic, mqtt_kwargs.pop("qos", 0)) for topic in topics],
"messages": None,
"lock": lock,
}
client = Client(userdata=userdata)
client.on_connect = on_connect
client.on_message = on_message
client.connect(leader_address)
if timeout is None:
client.loop_forever()
else:
assert lock is not None
lock.acquire()
client.loop_start()
lock.acquire(timeout=timeout)
client.loop_stop()
client.disconnect()
return userdata["messages"]
Would be great to have this implemented somehow. Without a timeout it makes the current implementation in most practical cases useless.
timeout on publish.simple
has also been requested ref #404