paho.mqtt.python
paho.mqtt.python copied to clipboard
Unable to make 340 connections
Hi Everyone,
I've been hitting my head against the wall because I can't make more than 340 connections to a local vernemq server and I have no idea why. I created a simple script to to try to make 350 connections but CONNACKs are not received after the first 340 connections are made. I have not received the connection limit on my local machine because if I run two instance a total of 680 (340 x 2) connections are established.
Is anyone aware of this bizarre limit ?
PS. My goal is to use this client with locustio for load testing.
My setup
- python 3.6.0
- maximum number of file descriptors, 65536
- local vernemq allows anonymous connections
- vernemq has all other defaults enabled
test_paho.py
import paho.mqtt.client as mqtt
import threading
connect_count = 0
lock = threading.Lock()
def locust_on_connect(client, flags_dict, userdata, rc):
global lock
global connect_count
print(f"client: {client} connected, rc: {rc}")
lock.acquire()
connect_count += 1
print(f"connect_count: {connect_count}")
lock.release()
def locust_on_subscribe(client, userdata, mid, granted_qos):
print(f"client: {client} subscribed, mid: {mid}")
def log(client, userdata, level, buf):
print(f"[paho-log][client: {client}] {buf}")
for x in range(0, 350):
client = mqtt.Client(transport="websockets")
host = localhost #<local server>
port = 8888 #<websocket port>
client.on_connect = locust_on_connect
client.connect_async(host, port)
client.loop_start()
client.subscribe("/topic", 1)
input("type enter to end")
last few lines of output
connect_count: 339
connect_count: 340
Any help is greatly appreicated.
I think duplicate of #183.
Each mqtt client means 3 open file descriptors: one to the mqtt server, and a pair of sockets connected to each other. 3 * 340 = 1020. And there you hit the 1024 limit of select.
Thanks @joernheissler. You are right. It is due to python being compiled with FD_SETSIZE=1024.
I've tried recompiling python 3.6.3 with /usr/include/sys/_types/_fd_setsize.h
setting FD_SETSIZE=2048 with no luck.
Was anyone successful in increasing the FD_SETSIZE when building their own python?
Or has anyone forked a copy of paho-mqtt with poll()
instead of select()
?
I've been using the following script to test select()
from socket import *
from select import select
s = [socket(AF_INET, SOCK_DGRAM) for i in range(2048)]
select(s, [], [], 1)
My setup:
- macbook pro 2017, macOS siera 10.12.6
I'm using asyncio. I didn't try with more than 1 connection, but I don't see why it shouldn't work.
I've been looking at this issue as well, and it seems like if it is FD_SETSIZE that's limiting us, it's not obvious how. Using lsof -a -p <pid>
while the script is running shows that we're able to open more than 1024 file descriptors.. the issue is that after 340 connections, the file descriptors to the mqtt server become closed. This is both with the default paho client as well as the one off your branch using asyncio. Definitely strange, not sure where to look next
it seems like if it is FD_SETSIZE that's limiting us, it's not obvious how.
It's in the select manpage. select doesn't like larger FDs. I looked at the kernel code too, but there it wasn't obvious to me if it's purely a userspace limitation or kernel too. "select() can monitor only file descriptors numbers that are less than FD_SETSIZE"
after 340 connections, the file descriptors to the mqtt server become closed. This is both with the default paho client as well as the one off your branch using asyncio.
You're saying that with asyncio this still happens? I haven't tried yet, but I really doubt it. I would be really surprised if it were happening.
not sure where to look next
strace should prove really helpful here.
So I wrote a program which opens 1500 useless file descriptors and then 1 mqtt connection, with asyncio. Works as expected.
Got code which won't work for you?
Hi! Just got it working on our end, I think I had basically made a silly mistake and kept calling start_loop() in the client which ended up using select anyways. We're able to open over 340 connections (seems to hang at around 1000 connections, looking at why for that), and we're now just familiarizing ourselves enough with asyncio to fine-tune how we spin up our workers.
Our code now basically looks like your example here, except we've put 'main' in a loop to spin up hundreds of workers. We're even directly using your AsyncioHelper class.
Thanks so much for your help!
We ran into this while doing some performance testing too. I have a patch which substitutes eventfd instead of select. I haven't tested on anything except linux though.
https://github.com/kellycampbell/paho.mqtt.python/commit/f23831ee365278052a3a4bd07a6851207f25a2e3
I think eventfd is a linux-only feature.
hitting the same problem... any news?
What worked for us was 2 things.
- Changing file descriptors for the system, see: https://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/
- Increasing the
FD_SETSIZE
in/usr/include/sys/select.h, /usr/include/bits/typesizes.h, /usr/include/linux/posix_types.h
and then recompiling python (this is because theselect()
call that python uses while setting up the connections uses this hardcoded value).
@susfly the solution I posted in the comment from Nov 28, 2017 is what worked for us.
We can use multiprocessing instead. Each process 340 connects. And it works well.
Hi, can you show me an example of using multiprocessing and threading? Im trying to simulate a high number of devices sending data, im using paho-mqtt. However, I cannot manage to connect 1,000 devices (my goal is to go up to 10K devices).
import paho.mqtt.client as mqtt
import time
import threading
import logging
import thingsboard_objects as Things
import random
import datetime
logging.basicConfig(level=logging.INFO)
init_time = time.time()
def Connect(client, broker, port, token, keepalive, run_forever=False):
connflag = False
delay = 5
print("connecting ",client)
badcount = 0 # counter for bad connection attempts
while not connflag:
print(logging.info("connecting to broker " + str(broker)))
# print("connecting to broker "+str(broker)+":"+str(port))
print("Attempts ", str(badcount))
time.sleep(2)
try:
client.username_pw_set(token)
client.connect(broker, port, keepalive)
connflag = True
except:
client.badconnection_flag = True
logging.info("connection failed " + str(badcount))
badcount += 1
if badcount >= 3 and not run_forever:
return -1
raise SystemExit # give up
return 0
def wait_for(client, msgType, period=1, wait_time=20, running_loop=False):
"""Will wait for a particular event gives up after period*wait_time, Default=10
seconds.Returns True if succesful False if fails"""
# running loop is true when using loop_start or loop_forever
client.running_loop = running_loop #
wcount = 0
while True:
logging.info("waiting" + msgType)
if msgType == "CONNACK":
if client.on_connect:
if client.connected_flag:
return True
if client.bad_connection_flag: #
return False
if msgType == "SUBACK":
if client.on_subscribe:
if client.suback_flag:
return True
if msgType == "MESSAGE":
if client.on_message:
if client.message_received_flag:
return True
if msgType == "PUBACK":
if client.on_publish:
if client.puback_flag:
return True
if not client.running_loop:
client.loop(.01) # check for messages manually
time.sleep(period)
wcount += 1
if wcount > wait_time:
print("return from wait loop taken too long")
return False
return True
def client_loop(client, broker, port, token, keepalive=300, loop_function=None,
loop_delay=10, run_forever=False):
"""runs a loop that will auto reconnect and subscribe to topics
pass topics as a list of tuples. You can pass a function to be
called at set intervals determined by the loop_delay
"""
client.run_flag = True
client.broker = broker
print("running loop ")
client.reconnect_delay_set(min_delay=1, max_delay=12)
while client.run_flag: # loop forever
if client.bad_connection_flag:
break
if not client.connected_flag:
print("Connecting to " + broker)
if Connect(client, broker, port, token, keepalive, run_forever) != -1:
if not wait_for(client, "CONNACK"):
client.run_flag = False # break no connack
else: # connect fails
client.run_flag = False # break
print("quitting loop for broker ", broker)
client.loop(0.01)
if client.connected_flag and loop_function: # function to call
loop_function(client, loop_delay) # call function
time.sleep(1)
print("disconnecting from", broker)
if client.connected_flag:
client.disconnect()
client.connected_flag = False
def on_log(client, userdata, level, buf):
print(buf)
#def on_message(client, userdata, message):
# time.sleep(1)
# print("message received", str(message.payload.decode("utf-8")))
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.connected_flag = True # set flag
for c in clients:
if client == c["client"]:
if c["sub_topic"] != "":
client.subscribe(c["sub_topic"])
print("connected OK")
else:
print("Bad connection Returned code=", rc)
client.loop_stop()
def on_disconnect(client, userdata, rc):
client.connected_flag = False # set flag
# print("client disconnected ok")
def on_publish(client, userdata, mid):
print("In on_pub callback mid= ", mid)
def pub(client, loop_delay):
rmd_current = round(random.uniform(0.6, 50.0), 2)
rmd_pressure = round(random.uniform(0.6, 50.0), 2)
global init_time
if time.time() - init_time >= 3600:
rmd_mnc = round(random.uniform(5.0, 30.0), 2)
rmd_sdc = round(random.random(), 2)
rmd_mnp = round(random.uniform(5.0, 30.0), 2)
rmd_sdp = round(random.random(), 2)
client.publish('v1/devices/me/telemetry',
'{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
'"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
client.publish('v1/devices/me/telemetry',
'{"MnC": "%s", "SdC": "%s", "Str": "2554","Stp": "2554", '
'"MnP": "%s", "SdP": "%s"}' % (rmd_mnc, rmd_sdc, rmd_mnp, rmd_sdp))
init_time = time.time()
else:
client.publish('v1/devices/me/telemetry',
'{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
'"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
print(datetime.datetime.now())
time.sleep(loop_delay)
pass
def Create_connections():
for i in range(n_clients):
cname = "client" + str(i)
t = int(time.time())
client_id = cname + str(t) # create unique client_id
client = mqtt.Client(client_id) # create new instance
clients[i]["client"] = client
clients[i]["client_id"] = client_id
clients[i]["cname"] = cname
broker = clients[i]["broker"]
port = clients[i]["port"]
token = clients[i]["token"]
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
#client.on_message = on_message
t = threading.Thread(target=client_loop, args=(client, broker, port, token, 300, pub))
threads.append(t)
t.start()
if __name__ == '__main__':
things_location = input("What type of thingsboard installation are you working with (demo/local)? ")
if things_location == "local":
type_install = 'cseetprj03.essex.ac.uk:8080'
broker = 'cseetprj03.essex.ac.uk'
else:
type_install = broker = 'demo.thingsboard.io'
header = Things.get_credentials(things_location)
my_devices = Things.get_devices_id(header, type_install)
clients = []
for device in my_devices:
device_info = {"broker": broker, "port": 1883, "name": device["name"],
"token": Things.get_device_token(device["id"]["id"], header, type_install)}
clients.append(device_info)
n_clients = len(clients)
mqtt.Client.connected_flag = False # create flag in class
mqtt.Client.bad_connection_flag = False # create flag in class
threads = []
print("Creating Connections ")
no_threads = threading.active_count()
print("current threads =", no_threads)
print("Publishing ")
Create_connections()
print("All clients connected ")
no_threads = threading.active_count()
print("current threads =", no_threads)
print("starting main loop")
try:
while no_threads == 1001:
time.sleep(10)
no_threads = threading.active_count()
print("current threads =", no_threads)
for c in clients:
if not c["client"].connected_flag:
print("broker ", c["broker"], " is disconnected")
except KeyboardInterrupt:
print("ending")
for c in clients:
c["client"].run_flag = False
time.sleep(10)
That is my code. Is multiprocessing needed for this, or what can I change to be able to scale the sending of data up to 10K connections?
Thanks in advance
We can use multiprocessing instead. Each process 340 connects. And it works well.
Hi, can you share your solution?
import multiprocessing
p = multiprocessing.Process(target=yourthreadscreateandstartfunc)
def yourthreadscreateandstartfunc:
for loop
create thread
start thread
each process should less than 340 threads
import multiprocessing p = multiprocessing.Process(target=yourthreadscreateandstartfunc) def yourthreadscreateandstartfunc: for loop create thread start thread
each process should less than 340 threads
Hi,
Sorry for asking again, but I can manage to start the second process with the second portion of my clients.
import multiprocessing
import paho.mqtt.client as mqtt
import time
import threading
import logging
import math
import thingsboard_objects as Things
import random
import datetime
import numpy as np
logging.basicConfig(level=logging.INFO)
init_time = time.time()
def Connect(client, broker, port, token, keepalive, run_forever=False):
connflag = False
delay = 5
print("connecting ",client)
badcount = 0 # counter for bad connection attempts
while not connflag:
print(logging.info("connecting to broker " + str(broker)))
# print("connecting to broker "+str(broker)+":"+str(port))
print("Attempts ", str(badcount))
time.sleep(2)
try:
client.username_pw_set(token)
client.connect(broker, port, keepalive)
connflag = True
except:
client.badconnection_flag = True
logging.info("connection failed " + str(badcount))
badcount += 1
if badcount >= 3 and not run_forever:
return -1
raise SystemExit # give up
return 0
def wait_for(client, msgType, period=1, wait_time=15, running_loop=False):
"""Will wait for a particular event gives up after period*wait_time, Default=10
seconds.Returns True if succesful False if fails"""
# running loop is true when using loop_start or loop_forever
client.running_loop = running_loop #
wcount = 0
while True:
logging.info("waiting" + msgType)
if msgType == "CONNACK":
if client.on_connect:
if client.connected_flag:
return True
if client.bad_connection_flag: #
return False
if msgType == "SUBACK":
if client.on_subscribe:
if client.suback_flag:
return True
if msgType == "MESSAGE":
if client.on_message:
if client.message_received_flag:
return True
if msgType == "PUBACK":
if client.on_publish:
if client.puback_flag:
return True
if not client.running_loop:
client.loop(.01) # check for messages manually
time.sleep(period)
wcount += 1
if wcount > wait_time:
print("return from wait loop taken too long")
return False
return True
def client_loop(client, broker, port, token, keepalive=600, loop_function=None,
loop_delay=10, run_forever=False):
"""runs a loop that will auto reconnect and subscribe to topics
pass topics as a list of tuples. You can pass a function to be
called at set intervals determined by the loop_delay
"""
client.run_flag = True
client.broker = broker
print("running loop ")
client.reconnect_delay_set(min_delay=1, max_delay=12)
while client.run_flag: # loop forever
if client.bad_connection_flag:
break
if not client.connected_flag:
print("Connecting to " + broker)
if Connect(client, broker, port, token, keepalive, run_forever) != -1:
if not wait_for(client, "CONNACK"):
client.run_flag = True # break no connack
else: # connect fails
client.run_flag = False # break
print("quitting loop for broker ", broker)
client.loop(0.01)
if client.connected_flag and loop_function: # function to call
loop_function(client, loop_delay) # call function
time.sleep(1)
print("disconnecting from", broker)
if client.connected_flag:
client.disconnect()
client.connected_flag = False
def on_log(client, userdata, level, buf):
print(buf)
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.connected_flag = True # set flag
for c in clients:
print("connected OK")
else:
print("Bad connection Returned code=", rc)
client.loop_stop()
def on_disconnect(client, userdata, rc):
client.connected_flag = False # set flag
# print("client disconnected ok")
def on_publish(client, userdata, mid):
print("In on_pub callback mid= ", mid)
def pub(client, loop_delay):
rmd_current = round(random.uniform(0.6, 50.0), 2)
rmd_pressure = round(random.uniform(0.6, 50.0), 2)
global init_time
if time.time() - init_time >= 3600:
rmd_mnc = round(random.uniform(5.0, 30.0), 2)
rmd_sdc = round(random.random(), 2)
rmd_mnp = round(random.uniform(5.0, 30.0), 2)
rmd_sdp = round(random.random(), 2)
client.publish('v1/devices/me/telemetry',
'{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
'"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
client.publish('v1/devices/me/telemetry',
'{"MnC": "%s", "SdC": "%s", "Str": "2554","Stp": "2554", '
'"MnP": "%s", "SdP": "%s"}' % (rmd_mnc, rmd_sdc, rmd_mnp, rmd_sdp))
init_time = time.time()
else:
client.publish('v1/devices/me/telemetry',
'{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
'"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
print(datetime.datetime.now())
time.sleep(loop_delay)
def Create_connections(n_clients, threads):
for i in range(len(n_clients)):
cname = "client" + n_clients[i]["name"]
t = int(time.time())
client_id = cname + str(t) # create unique client_id
client = mqtt.Client(client_id) # create new instance
clients[i]["client"] = client
clients[i]["client_id"] = client_id
clients[i]["cname"] = cname
broker_p = clients[i]["broker"]
port = clients[i]["port"]
token = clients[i]["token"]
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
#client.on_message = on_message
t = threading.Thread(target=client_loop, args=(client, broker_p, port, token, 600, pub))
threads.append(t)
t.start()
def main_loop(clients_loop):
mqtt.Client.connected_flag = False # create flag in class
mqtt.Client.bad_connection_flag = False # create flag in class
threads = []
print("Creating Connections ")
no_threads = threading.active_count()
print("current threads =", no_threads)
print("Publishing ")
Create_connections(clients_loop, threads)
print("All clients connected ")
no_threads = threading.active_count()
print("current threads =", no_threads)
print("starting main loop")
try:
while True:
time.sleep(10)
no_threads = threading.active_count()
print("current threads =", no_threads)
for c in clients_loop:
if not c["client"].connected_flag:
print("broker ", c["broker"], " is disconnected" , c["name"])
time.sleep(1)
except KeyboardInterrupt:
print("ending")
for c in clients:
c["client"].run_flag = False
time.sleep(10)
if __name__ == '__main__':
# In case the user is using a demo version or local version of thingsboard
things_location = input("What type of thingsboard installation are you working with (demo/local)? ")
if things_location == "demo":
type_install = "demo.thingsboard.io"
header = Things.get_credentials(things_location)
elif things_location == "local":
computer = input("Which computer? ")
type_install = "cseetprj%s.essex.ac.uk:8080" % computer
broker = "cseetprj%s.essex.ac.uk" % computer
header = Things.get_credentials("local", type_install)
else:
print("Error: Installation not supported")
my_devices = Things.get_devices_id(header, type_install)
clients = []
for device in my_devices:
device_info = {"broker": broker, "port": 1883, "name": device["name"],
"token": Things.get_device_token(device["id"]["id"], header, type_install)}
clients.append(device_info)
if len(clients) >= 200:
print("Splitting devices to multiprocess")
split_by = math.ceil(len(clients) / 250)
split_clients = np.array_split(clients, split_by)
jobs = []
for idx, client_portion in enumerate(split_clients):
print("Starting process for portion %s" % (idx + 1))
p = multiprocessing.Process(target=main_loop, args = (client_portion,))
jobs.append(p)
p.start()
I cannot get past the create_connection part, all the clients connect but dont publish after it. I think is related to the order or possition of functions. But I dont why, any thoughts?
import multiprocessing p = multiprocessing.Process(target=yourthreadscreateandstartfunc) def yourthreadscreateandstartfunc: for loop create thread start thread
each process should less than 340 threads
Hi, how many connections were able to make for each process? And did you change any configuration in the machine that recieve the connections?
may be your server limit