paho.mqtt.python icon indicating copy to clipboard operation
paho.mqtt.python copied to clipboard

Use Python selectors instead of select

Open j04n-f opened this issue 2 years ago • 5 comments

Using Python select.select(..) method limits the open connections to 340:

  • https://github.com/eclipse/paho.mqtt.python/issues/183
  • https://github.com/eclipse/paho.mqtt.python/issues/238
  • https://github.com/eclipse/paho.mqtt.python/issues/499
  • https://github.com/eclipse/paho.mqtt.python/issues/662

Python select docs encourages users to use selectors instead of select.

Would it be possible to change the select usage to use selectors? The following implementation worked for us:

import selectors

class Client:
    [...]
    def _loop(self, timeout: float = 1.0) -> int:
        if timeout < 0.0:
            raise ValueError("Invalid timeout.")

        sel = selectors.DefaultSelector()

        eventmask = selectors.EVENT_READ

        with suppress(IndexError):
            packet = self._out_packet.popleft()
            self._out_packet.appendleft(packet)
            eventmask = selectors.EVENT_WRITE | eventmask

        if self._sockpairR is None:
            sel.register(self._sock, eventmask)
        else:
            sel.register(self._sock, eventmask)
            sel.register(self._sockpairR, selectors.EVENT_READ)

        pending_bytes = 0
        if hasattr(self._sock, "pending"):
            pending_bytes = self._sock.pending()

        if pending_bytes > 0:
            timeout = 0.0

        try:
            events = sel.select(timeout)
        except TypeError:
            return int(MQTT_ERR_CONN_LOST)
        except ValueError:
            return int(MQTT_ERR_CONN_LOST)
        except Exception:
            return int(MQTT_ERR_UNKNOWN)

        socklist: list[list] = [[], []]

        for key, _event in events:
            if key.events & selectors.EVENT_READ:
                socklist[0].append(key.fileobj)

            if key.events & selectors.EVENT_WRITE:
                socklist[1].append(key.fileobj)

        if self._sock in socklist[0] or pending_bytes > 0:
            rc = self.loop_read()
            if rc or self._sock is None:
                return int(rc)

        if self._sockpairR and self._sockpairR in socklist[0]:
            socklist[1].insert(0, self._sock)

            with suppress(BlockingIOError):
                self._sockpairR.recv(10000)

        if self._sock in socklist[1]:
            rc = self.loop_write()
            if rc or self._sock is None:
                return int(rc)

        sel.close()

        return int(self.loop_misc())

j04n-f avatar Dec 24 '22 16:12 j04n-f

Should the "sel.close()" be invoked before return? And i meet a error with self._sock is None at

        if self._sockpairR is None:
            sel.register(self._sock, eventmask)
        else:
            sel.register(self._sock, eventmask)
            sel.register(self._sockpairR, selectors.EVENT_READ)

lincoln310 avatar Jan 18 '23 03:01 lincoln310

I tested both use cases, closing and not closing the selectors. Not closing the selectors increases CPU usage: https://docs.python.org/3/library/selectors.html#selectors.BaseSelector.close

Which error do you have with the None value?

j04n-f avatar Jan 26 '23 10:01 j04n-f

Yesterday we had the same problem. The error raises when the MQTT client attempts to reconnect. The solution is quite easy. The old select method registers and selects the sockets using only one method. The new one uses two methods. Then, both methods (register and select) should be wrapped by the try-catch as follows:

import selectors

class Client:
    [...]
    def _loop(self, timeout: float = 1.0) -> int:
        if timeout < 0.0:
            raise ValueError("Invalid timeout.")

        sel = selectors.DefaultSelector()

        eventmask = selectors.EVENT_READ

        with suppress(IndexError):
            packet = self._out_packet.popleft()
            self._out_packet.appendleft(packet)
            eventmask = selectors.EVENT_WRITE | eventmask

        # used to check if there are any bytes left in the (SSL) socket
        pending_bytes = 0
        if hasattr(self._sock, "pending"):
            pending_bytes = self._sock.pending()

        # if bytes are pending do not wait in select
        if pending_bytes > 0:
            timeout = 0.0

        try:
            if self._sockpairR is None:
                sel.register(self._sock, eventmask)
            else:
                sel.register(self._sock, eventmask)
                sel.register(self._sockpairR, selectors.EVENT_READ)

            events = sel.select(timeout)

        except TypeError:
            # Socket isn't correct type, in likelihood connection is lost
            return int(MQTT_ERR_CONN_LOST)
        except ValueError:
            # Can occur if we just reconnected but rlist/wlist contain a -1 for
            # some reason.
            return int(MQTT_ERR_CONN_LOST)
        except Exception:
            # Note that KeyboardInterrupt, etc. can still terminate since they
            # are not derived from Exception
            return int(MQTT_ERR_UNKNOWN)

        socklist: list[list] = [[], []]

        for key, _event in events:
            if key.events & selectors.EVENT_READ:
                socklist[0].append(key.fileobj)

            if key.events & selectors.EVENT_WRITE:
                socklist[1].append(key.fileobj)

        if self._sock in socklist[0] or pending_bytes > 0:
            rc = self.loop_read()
            if rc or self._sock is None:
                return int(rc)

        if self._sockpairR and self._sockpairR in socklist[0]:
            # Stimulate output write even though we didn't ask for it, because
            # at that point the publish or other command wasn't present.
            socklist[1].insert(0, self._sock)
            # Clear sockpairR - only ever a single byte written.
            with suppress(BlockingIOError):
                # Read many bytes at once - this allows up to 10000 calls to
                # publish() inbetween calls to loop().
                self._sockpairR.recv(10000)

        if self._sock in socklist[1]:
            rc = self.loop_write()
            if rc or self._sock is None:
                return int(rc)

        sel.close()

        return int(self.loop_misc())

j04n-f avatar Mar 28 '23 11:03 j04n-f

Thank you very much, it solves my problem.

JXingK avatar Aug 15 '23 09:08 JXingK

Any update? May I open a PR?

j04n-f avatar Sep 05 '23 14:09 j04n-f