paho.mqtt.python
paho.mqtt.python copied to clipboard
mqtt not open thread 340
It's been too long since the previous question
Read some problems, also read some solutions。
No solution was found, it seems that the library does not intend to solve this problem。
A maximum of 340 links can be opened under a single process。
example:
loop_start()
If you maintain the thread yourself, you can open more links
example:
loop_forever()
In this way, you can open about 1000 links, and then through the process, you can add more。1000 * CpuCount
https://github.com/concurrencylabs/mqtt-locust/issues/3 https://github.com/eclipse/paho.mqtt.python/issues/238
#183 #238
Maybe it's too late but replacing the select(..)
method used on the _loop(..)
method worked for us. You can override the method to use the Python selectors package instead of select. Selectors choose the best fit for your OS.
import selectors
from paho.mqtt.client import Client
class MQTTClient(Client):
[...]
def _loop(self, timeout: float = 1.0) -> int:
if timeout < 0.0:
raise ValueError("Invalid timeout.")
sel = selectors.DefaultSelector()
eventmask = selectors.EVENT_READ
with suppress(IndexError):
packet = self._out_packet.popleft()
self._out_packet.appendleft(packet)
eventmask = selectors.EVENT_WRITE | eventmask
if self._sockpairR is None:
sel.register(self._sock, eventmask)
else:
sel.register(self._sock, eventmask)
sel.register(self._sockpairR, selectors.EVENT_READ)
pending_bytes = 0
if hasattr(self._sock, "pending"):
pending_bytes = self._sock.pending()
if pending_bytes > 0:
timeout = 0.0
try:
events = sel.select(timeout)
except TypeError:
return int(MQTT_ERR_CONN_LOST)
except ValueError:
return int(MQTT_ERR_CONN_LOST)
except Exception:
return int(MQTT_ERR_UNKNOWN)
socklist: list[list] = [[], []]
for key, _event in events:
if key.events & selectors.EVENT_READ:
socklist[0].append(key.fileobj)
if key.events & selectors.EVENT_WRITE:
socklist[1].append(key.fileobj)
if self._sock in socklist[0] or pending_bytes > 0:
rc = self.loop_read()
if rc or self._sock is None:
return int(rc)
if self._sockpairR and self._sockpairR in socklist[0]:
with suppress(BlockingIOError):
self._sockpairR.recv(10000)
if self._sock in socklist[1]:
rc = self.loop_write()
if rc or self._sock is None:
return int(rc)
sel.close()
return int(self.loop_misc())
I'm going to close this as a duplicate (#183 & #238 and a few others). This is part of a general project to clean-up issues (which should make it simpler to identify priorities going forward).