paho.mqtt.python icon indicating copy to clipboard operation
paho.mqtt.python copied to clipboard

mqtt not open thread 340

Open jiajintao2021 opened this issue 2 years ago • 2 comments

It's been too long since the previous question Read some problems, also read some solutions。 No solution was found, it seems that the library does not intend to solve this problem。 A maximum of 340 links can be opened under a single process。 example: loop_start() If you maintain the thread yourself, you can open more links example: loop_forever() In this way, you can open about 1000 links, and then through the process, you can add more。1000 * CpuCount

jiajintao2021 avatar May 27 '22 07:05 jiajintao2021

https://github.com/concurrencylabs/mqtt-locust/issues/3 https://github.com/eclipse/paho.mqtt.python/issues/238

jiajintao2021 avatar May 27 '22 07:05 jiajintao2021

#183 #238

jiajintao2021 avatar May 27 '22 07:05 jiajintao2021

Maybe it's too late but replacing the select(..) method used on the _loop(..) method worked for us. You can override the method to use the Python selectors package instead of select. Selectors choose the best fit for your OS.

import selectors
from paho.mqtt.client import Client

class MQTTClient(Client): 
[...]
    def _loop(self, timeout: float = 1.0) -> int:
        if timeout < 0.0:
            raise ValueError("Invalid timeout.")

        sel = selectors.DefaultSelector()

        eventmask = selectors.EVENT_READ

        with suppress(IndexError):
            packet = self._out_packet.popleft()
            self._out_packet.appendleft(packet)
            eventmask = selectors.EVENT_WRITE | eventmask

        if self._sockpairR is None:
            sel.register(self._sock, eventmask)
        else:
            sel.register(self._sock, eventmask)
            sel.register(self._sockpairR, selectors.EVENT_READ)

        pending_bytes = 0
        if hasattr(self._sock, "pending"):
            pending_bytes = self._sock.pending()

        if pending_bytes > 0:
            timeout = 0.0

        try:
            events = sel.select(timeout)

        except TypeError:
            return int(MQTT_ERR_CONN_LOST)
        except ValueError:
            return int(MQTT_ERR_CONN_LOST)
        except Exception:
            return int(MQTT_ERR_UNKNOWN)

        socklist: list[list] = [[], []]

        for key, _event in events:
            if key.events & selectors.EVENT_READ:
                socklist[0].append(key.fileobj)

            if key.events & selectors.EVENT_WRITE:
                socklist[1].append(key.fileobj)

        if self._sock in socklist[0] or pending_bytes > 0:
            rc = self.loop_read()
            if rc or self._sock is None:
                return int(rc)

        if self._sockpairR and self._sockpairR in socklist[0]:
            with suppress(BlockingIOError):
                self._sockpairR.recv(10000)

        if self._sock in socklist[1]:
            rc = self.loop_write()
            if rc or self._sock is None:
                return int(rc)

        sel.close()

        return int(self.loop_misc())

j04n-f avatar Dec 24 '22 15:12 j04n-f

I'm going to close this as a duplicate (#183 & #238 and a few others). This is part of a general project to clean-up issues (which should make it simpler to identify priorities going forward).

MattBrittan avatar Dec 07 '23 20:12 MattBrittan