paho.mqtt.python
paho.mqtt.python copied to clipboard
Use Python selectors instead of select
Using Python select.select(..)
method limits the open connections to 340:
- https://github.com/eclipse/paho.mqtt.python/issues/183
- https://github.com/eclipse/paho.mqtt.python/issues/238
- https://github.com/eclipse/paho.mqtt.python/issues/499
- https://github.com/eclipse/paho.mqtt.python/issues/662
Python select docs encourages users to use selectors instead of select.
Would it be possible to change the select usage to use selectors? The following implementation worked for us:
import selectors
class Client:
[...]
def _loop(self, timeout: float = 1.0) -> int:
if timeout < 0.0:
raise ValueError("Invalid timeout.")
sel = selectors.DefaultSelector()
eventmask = selectors.EVENT_READ
with suppress(IndexError):
packet = self._out_packet.popleft()
self._out_packet.appendleft(packet)
eventmask = selectors.EVENT_WRITE | eventmask
if self._sockpairR is None:
sel.register(self._sock, eventmask)
else:
sel.register(self._sock, eventmask)
sel.register(self._sockpairR, selectors.EVENT_READ)
pending_bytes = 0
if hasattr(self._sock, "pending"):
pending_bytes = self._sock.pending()
if pending_bytes > 0:
timeout = 0.0
try:
events = sel.select(timeout)
except TypeError:
return int(MQTT_ERR_CONN_LOST)
except ValueError:
return int(MQTT_ERR_CONN_LOST)
except Exception:
return int(MQTT_ERR_UNKNOWN)
socklist: list[list] = [[], []]
for key, _event in events:
if key.events & selectors.EVENT_READ:
socklist[0].append(key.fileobj)
if key.events & selectors.EVENT_WRITE:
socklist[1].append(key.fileobj)
if self._sock in socklist[0] or pending_bytes > 0:
rc = self.loop_read()
if rc or self._sock is None:
return int(rc)
if self._sockpairR and self._sockpairR in socklist[0]:
socklist[1].insert(0, self._sock)
with suppress(BlockingIOError):
self._sockpairR.recv(10000)
if self._sock in socklist[1]:
rc = self.loop_write()
if rc or self._sock is None:
return int(rc)
sel.close()
return int(self.loop_misc())
Should the "sel.close()" be invoked before return? And i meet a error with self._sock is None at
if self._sockpairR is None:
sel.register(self._sock, eventmask)
else:
sel.register(self._sock, eventmask)
sel.register(self._sockpairR, selectors.EVENT_READ)
I tested both use cases, closing and not closing the selectors. Not closing the selectors increases CPU usage: https://docs.python.org/3/library/selectors.html#selectors.BaseSelector.close
Which error do you have with the None value?
Yesterday we had the same problem. The error raises when the MQTT client attempts to reconnect. The solution is quite easy. The old select method registers and selects the sockets using only one method. The new one uses two methods. Then, both methods (register and select) should be wrapped by the try-catch
as follows:
import selectors
class Client:
[...]
def _loop(self, timeout: float = 1.0) -> int:
if timeout < 0.0:
raise ValueError("Invalid timeout.")
sel = selectors.DefaultSelector()
eventmask = selectors.EVENT_READ
with suppress(IndexError):
packet = self._out_packet.popleft()
self._out_packet.appendleft(packet)
eventmask = selectors.EVENT_WRITE | eventmask
# used to check if there are any bytes left in the (SSL) socket
pending_bytes = 0
if hasattr(self._sock, "pending"):
pending_bytes = self._sock.pending()
# if bytes are pending do not wait in select
if pending_bytes > 0:
timeout = 0.0
try:
if self._sockpairR is None:
sel.register(self._sock, eventmask)
else:
sel.register(self._sock, eventmask)
sel.register(self._sockpairR, selectors.EVENT_READ)
events = sel.select(timeout)
except TypeError:
# Socket isn't correct type, in likelihood connection is lost
return int(MQTT_ERR_CONN_LOST)
except ValueError:
# Can occur if we just reconnected but rlist/wlist contain a -1 for
# some reason.
return int(MQTT_ERR_CONN_LOST)
except Exception:
# Note that KeyboardInterrupt, etc. can still terminate since they
# are not derived from Exception
return int(MQTT_ERR_UNKNOWN)
socklist: list[list] = [[], []]
for key, _event in events:
if key.events & selectors.EVENT_READ:
socklist[0].append(key.fileobj)
if key.events & selectors.EVENT_WRITE:
socklist[1].append(key.fileobj)
if self._sock in socklist[0] or pending_bytes > 0:
rc = self.loop_read()
if rc or self._sock is None:
return int(rc)
if self._sockpairR and self._sockpairR in socklist[0]:
# Stimulate output write even though we didn't ask for it, because
# at that point the publish or other command wasn't present.
socklist[1].insert(0, self._sock)
# Clear sockpairR - only ever a single byte written.
with suppress(BlockingIOError):
# Read many bytes at once - this allows up to 10000 calls to
# publish() inbetween calls to loop().
self._sockpairR.recv(10000)
if self._sock in socklist[1]:
rc = self.loop_write()
if rc or self._sock is None:
return int(rc)
sel.close()
return int(self.loop_misc())
Thank you very much, it solves my problem.
Any update? May I open a PR?