paho.mqtt.python icon indicating copy to clipboard operation
paho.mqtt.python copied to clipboard

Unable to make 340 connections

Open liquidharmonic opened this issue 7 years ago • 19 comments

Hi Everyone,

I've been hitting my head against the wall because I can't make more than 340 connections to a local vernemq server and I have no idea why. I created a simple script to to try to make 350 connections but CONNACKs are not received after the first 340 connections are made. I have not received the connection limit on my local machine because if I run two instance a total of 680 (340 x 2) connections are established.

Is anyone aware of this bizarre limit ?

PS. My goal is to use this client with locustio for load testing.

My setup

  • python 3.6.0
  • maximum number of file descriptors, 65536
  • local vernemq allows anonymous connections
  • vernemq has all other defaults enabled

test_paho.py

import paho.mqtt.client as mqtt
import threading

connect_count = 0
lock = threading.Lock()
def locust_on_connect(client, flags_dict, userdata, rc):
	global lock
	global connect_count
	print(f"client: {client} connected, rc: {rc}")
	lock.acquire()
	connect_count += 1
	print(f"connect_count: {connect_count}")
	lock.release()

def locust_on_subscribe(client, userdata, mid, granted_qos):
	print(f"client: {client} subscribed, mid: {mid}")	

def log(client, userdata, level, buf):
	print(f"[paho-log][client: {client}] {buf}")

for x in range(0, 350):
	client = mqtt.Client(transport="websockets")

	host = localhost #<local server>
	port = 8888 #<websocket port> 

	client.on_connect = locust_on_connect
	client.connect_async(host, port)
	client.loop_start()

	client.subscribe("/topic", 1)


input("type enter to end")

last few lines of output

connect_count: 339
connect_count: 340

Any help is greatly appreicated.

liquidharmonic avatar Oct 12 '17 19:10 liquidharmonic

I think duplicate of #183.

Each mqtt client means 3 open file descriptors: one to the mqtt server, and a pair of sockets connected to each other. 3 * 340 = 1020. And there you hit the 1024 limit of select.

joernheissler avatar Oct 13 '17 07:10 joernheissler

Thanks @joernheissler. You are right. It is due to python being compiled with FD_SETSIZE=1024. I've tried recompiling python 3.6.3 with /usr/include/sys/_types/_fd_setsize.h setting FD_SETSIZE=2048 with no luck. Was anyone successful in increasing the FD_SETSIZE when building their own python? Or has anyone forked a copy of paho-mqtt with poll() instead of select()?

I've been using the following script to test select()

from socket import *
from select import select
s = [socket(AF_INET, SOCK_DGRAM) for i in range(2048)]
select(s, [], [], 1)

My setup:

  • macbook pro 2017, macOS siera 10.12.6

liquidharmonic avatar Oct 13 '17 20:10 liquidharmonic

I'm using asyncio. I didn't try with more than 1 connection, but I don't see why it shouldn't work.

joernheissler avatar Oct 14 '17 09:10 joernheissler

I've been looking at this issue as well, and it seems like if it is FD_SETSIZE that's limiting us, it's not obvious how. Using lsof -a -p <pid> while the script is running shows that we're able to open more than 1024 file descriptors.. the issue is that after 340 connections, the file descriptors to the mqtt server become closed. This is both with the default paho client as well as the one off your branch using asyncio. Definitely strange, not sure where to look next

it seems like if it is FD_SETSIZE that's limiting us, it's not obvious how.

It's in the select manpage. select doesn't like larger FDs. I looked at the kernel code too, but there it wasn't obvious to me if it's purely a userspace limitation or kernel too. "select() can monitor only file descriptors numbers that are less than FD_SETSIZE"

after 340 connections, the file descriptors to the mqtt server become closed. This is both with the default paho client as well as the one off your branch using asyncio.

You're saying that with asyncio this still happens? I haven't tried yet, but I really doubt it. I would be really surprised if it were happening.

not sure where to look next

strace should prove really helpful here.

joernheissler avatar Oct 17 '17 19:10 joernheissler

So I wrote a program which opens 1500 useless file descriptors and then 1 mqtt connection, with asyncio. Works as expected.

Got code which won't work for you?

joernheissler avatar Oct 17 '17 21:10 joernheissler

Hi! Just got it working on our end, I think I had basically made a silly mistake and kept calling start_loop() in the client which ended up using select anyways. We're able to open over 340 connections (seems to hang at around 1000 connections, looking at why for that), and we're now just familiarizing ourselves enough with asyncio to fine-tune how we spin up our workers.

Our code now basically looks like your example here, except we've put 'main' in a loop to spin up hundreds of workers. We're even directly using your AsyncioHelper class.

Thanks so much for your help!

We ran into this while doing some performance testing too. I have a patch which substitutes eventfd instead of select. I haven't tested on anything except linux though.

https://github.com/kellycampbell/paho.mqtt.python/commit/f23831ee365278052a3a4bd07a6851207f25a2e3

kellycampbell avatar Nov 29 '17 02:11 kellycampbell

I think eventfd is a linux-only feature.

joernheissler avatar Nov 29 '17 07:11 joernheissler

hitting the same problem... any news?

susfly avatar Mar 07 '19 12:03 susfly

What worked for us was 2 things.

  1. Changing file descriptors for the system, see: https://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/
  2. Increasing the FD_SETSIZE in /usr/include/sys/select.h, /usr/include/bits/typesizes.h, /usr/include/linux/posix_types.h and then recompiling python (this is because the select() call that python uses while setting up the connections uses this hardcoded value).

@susfly the solution I posted in the comment from Nov 28, 2017 is what worked for us.

kellycampbell avatar Mar 07 '19 22:03 kellycampbell

We can use multiprocessing instead. Each process 340 connects. And it works well.

susfly avatar Mar 08 '19 02:03 susfly

Hi, can you show me an example of using multiprocessing and threading? Im trying to simulate a high number of devices sending data, im using paho-mqtt. However, I cannot manage to connect 1,000 devices (my goal is to go up to 10K devices).

import paho.mqtt.client as mqtt
import time
import threading
import logging
import thingsboard_objects as Things
import random
import datetime
logging.basicConfig(level=logging.INFO)


init_time = time.time()


def Connect(client, broker, port, token, keepalive, run_forever=False):
    connflag = False
    delay = 5
    print("connecting ",client)
    badcount = 0  # counter for bad connection attempts
    while not connflag:
        print(logging.info("connecting to broker " + str(broker)))
        # print("connecting to broker "+str(broker)+":"+str(port))
        print("Attempts ", str(badcount))
        time.sleep(2)
        try:
            client.username_pw_set(token)
            client.connect(broker, port, keepalive)
            connflag = True

        except:
            client.badconnection_flag = True
            logging.info("connection failed " + str(badcount))
            badcount += 1
            if badcount >= 3 and not run_forever:
                return -1
                raise SystemExit  # give up

    return 0


def wait_for(client, msgType, period=1, wait_time=20, running_loop=False):
    """Will wait for a particular event gives up after period*wait_time, Default=10
seconds.Returns True if succesful False if fails"""
    # running loop is true when using loop_start or loop_forever
    client.running_loop = running_loop  #
    wcount = 0
    while True:
        logging.info("waiting" + msgType)
        if msgType == "CONNACK":
            if client.on_connect:
                if client.connected_flag:
                    return True
                if client.bad_connection_flag:  #
                    return False

        if msgType == "SUBACK":
            if client.on_subscribe:
                if client.suback_flag:
                    return True
        if msgType == "MESSAGE":
            if client.on_message:
                if client.message_received_flag:
                    return True
        if msgType == "PUBACK":
            if client.on_publish:
                if client.puback_flag:
                    return True

        if not client.running_loop:
            client.loop(.01)  # check for messages manually
        time.sleep(period)
        wcount += 1
        if wcount > wait_time:
            print("return from wait loop taken too long")
            return False
    return True


def client_loop(client, broker, port, token, keepalive=300, loop_function=None,
                loop_delay=10, run_forever=False):
    """runs a loop that will auto reconnect and subscribe to topics
    pass topics as a list of tuples. You can pass a function to be
    called at set intervals determined by the loop_delay
    """
    client.run_flag = True
    client.broker = broker
    print("running loop ")
    client.reconnect_delay_set(min_delay=1, max_delay=12)

    while client.run_flag:  # loop forever

        if client.bad_connection_flag:
            break
        if not client.connected_flag:
            print("Connecting to " + broker)
            if Connect(client, broker, port, token, keepalive, run_forever) != -1:
                if not wait_for(client, "CONNACK"):
                    client.run_flag = False  # break no connack
            else:  # connect fails
                client.run_flag = False  # break
                print("quitting loop for  broker ", broker)

        client.loop(0.01)

        if client.connected_flag and loop_function:  # function to call
            loop_function(client, loop_delay)  # call function

    time.sleep(1)
    print("disconnecting from", broker)
    if client.connected_flag:
        client.disconnect()
        client.connected_flag = False


def on_log(client, userdata, level, buf):
    print(buf)


#def on_message(client, userdata, message):
#    time.sleep(1)
#    print("message received", str(message.payload.decode("utf-8")))


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        client.connected_flag = True  # set flag
        for c in clients:
            if client == c["client"]:
                if c["sub_topic"] != "":
                    client.subscribe(c["sub_topic"])

                    print("connected OK")
    else:
        print("Bad connection Returned code=", rc)
        client.loop_stop()


def on_disconnect(client, userdata, rc):
    client.connected_flag = False  # set flag
    # print("client disconnected ok")


def on_publish(client, userdata, mid):
    print("In on_pub callback mid= ", mid)


def pub(client, loop_delay):

    rmd_current = round(random.uniform(0.6, 50.0), 2)
    rmd_pressure = round(random.uniform(0.6, 50.0), 2)
    global init_time
    if time.time() - init_time >= 3600:
        rmd_mnc = round(random.uniform(5.0, 30.0), 2)
        rmd_sdc = round(random.random(), 2)
        rmd_mnp = round(random.uniform(5.0, 30.0), 2)
        rmd_sdp = round(random.random(), 2)

        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
        client.publish('v1/devices/me/telemetry',
                       '{"MnC": "%s", "SdC": "%s", "Str": "2554","Stp": "2554", '
                       '"MnP": "%s", "SdP": "%s"}' % (rmd_mnc, rmd_sdc, rmd_mnp, rmd_sdp))

        init_time = time.time()
    else:
        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
    print(datetime.datetime.now())
    time.sleep(loop_delay)
    pass


def Create_connections():
    for i in range(n_clients):
        cname = "client" + str(i)
        t = int(time.time())
        client_id = cname + str(t)  # create unique client_id
        client = mqtt.Client(client_id)  # create new instance
        clients[i]["client"] = client
        clients[i]["client_id"] = client_id
        clients[i]["cname"] = cname
        broker = clients[i]["broker"]
        port = clients[i]["port"]
        token = clients[i]["token"]
        client.on_connect = on_connect
        client.on_disconnect = on_disconnect
        client.on_publish = on_publish
        #client.on_message = on_message
        t = threading.Thread(target=client_loop, args=(client, broker, port, token, 300, pub))
        threads.append(t)
        t.start()


if __name__ == '__main__':

    things_location = input("What type of thingsboard installation are you working with (demo/local)? ")

    if things_location == "local":
        type_install = 'cseetprj03.essex.ac.uk:8080'
        broker = 'cseetprj03.essex.ac.uk'
    else:
        type_install = broker = 'demo.thingsboard.io'

    header = Things.get_credentials(things_location)
    my_devices = Things.get_devices_id(header, type_install)

    clients = []
    for device in my_devices:
        device_info = {"broker": broker, "port": 1883, "name": device["name"],
                       "token": Things.get_device_token(device["id"]["id"], header, type_install)}
        clients.append(device_info)

    n_clients = len(clients)
    mqtt.Client.connected_flag = False  # create flag in class
    mqtt.Client.bad_connection_flag = False  # create flag in class

    threads = []
    print("Creating Connections ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("Publishing ")
    Create_connections()

    print("All clients connected ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("starting main loop")
    try:
        while no_threads == 1001:
            time.sleep(10)
            no_threads = threading.active_count()
            print("current threads =", no_threads)
            for c in clients:
                if not c["client"].connected_flag:
                    print("broker ", c["broker"], " is disconnected")

    except KeyboardInterrupt:
        print("ending")
        for c in clients:
            c["client"].run_flag = False
    time.sleep(10)

That is my code. Is multiprocessing needed for this, or what can I change to be able to scale the sending of data up to 10K connections?

Thanks in advance

fjpa121197 avatar Jul 09 '20 14:07 fjpa121197

We can use multiprocessing instead. Each process 340 connects. And it works well.

Hi, can you share your solution?

fjpa121197 avatar Jul 09 '20 15:07 fjpa121197

import multiprocessing
p = multiprocessing.Process(target=yourthreadscreateandstartfunc)

def yourthreadscreateandstartfunc:
    for loop
       create thread
       start thread

each process should less than 340 threads

susfly avatar Jul 14 '20 00:07 susfly

import multiprocessing
p = multiprocessing.Process(target=yourthreadscreateandstartfunc)

def yourthreadscreateandstartfunc:
    for loop
       create thread
       start thread

each process should less than 340 threads

Hi,

Sorry for asking again, but I can manage to start the second process with the second portion of my clients.

import multiprocessing
import paho.mqtt.client as mqtt
import time
import threading
import logging
import math
import thingsboard_objects as Things
import random
import datetime
import numpy as np
logging.basicConfig(level=logging.INFO)

init_time = time.time()

def Connect(client, broker, port, token, keepalive, run_forever=False):
    connflag = False
    delay = 5
    print("connecting ",client)
    badcount = 0  # counter for bad connection attempts
    while not connflag:
        print(logging.info("connecting to broker " + str(broker)))
        # print("connecting to broker "+str(broker)+":"+str(port))
        print("Attempts ", str(badcount))
        time.sleep(2)
        try:
            client.username_pw_set(token)
            client.connect(broker, port, keepalive)
            connflag = True

        except:
            client.badconnection_flag = True
            logging.info("connection failed " + str(badcount))
            badcount += 1
            if badcount >= 3 and not run_forever:
                return -1
                raise SystemExit  # give up

    return 0


def wait_for(client, msgType, period=1, wait_time=15, running_loop=False):
    """Will wait for a particular event gives up after period*wait_time, Default=10
seconds.Returns True if succesful False if fails"""
    # running loop is true when using loop_start or loop_forever
    client.running_loop = running_loop  #
    wcount = 0
    while True:
        logging.info("waiting" + msgType)
        if msgType == "CONNACK":
            if client.on_connect:
                if client.connected_flag:
                    return True
                if client.bad_connection_flag:  #
                    return False

        if msgType == "SUBACK":
            if client.on_subscribe:
                if client.suback_flag:
                    return True
        if msgType == "MESSAGE":
            if client.on_message:
                if client.message_received_flag:
                    return True
        if msgType == "PUBACK":
            if client.on_publish:
                if client.puback_flag:
                    return True

        if not client.running_loop:
            client.loop(.01)  # check for messages manually
        time.sleep(period)
        wcount += 1
        if wcount > wait_time:
            print("return from wait loop taken too long")
            return False
    return True


def client_loop(client, broker, port, token, keepalive=600, loop_function=None,
                loop_delay=10, run_forever=False):
    """runs a loop that will auto reconnect and subscribe to topics
    pass topics as a list of tuples. You can pass a function to be
    called at set intervals determined by the loop_delay
    """
    client.run_flag = True
    client.broker = broker
    print("running loop ")
    client.reconnect_delay_set(min_delay=1, max_delay=12)

    while client.run_flag:  # loop forever

        if client.bad_connection_flag:
            break
        if not client.connected_flag:
            print("Connecting to " + broker)
            if Connect(client, broker, port, token, keepalive, run_forever) != -1:
                if not wait_for(client, "CONNACK"):
                    client.run_flag = True  # break no connack
            else:  # connect fails
                client.run_flag = False  # break
                print("quitting loop for  broker ", broker)

        client.loop(0.01)

        if client.connected_flag and loop_function:  # function to call
            loop_function(client, loop_delay)  # call function

    time.sleep(1)
    print("disconnecting from", broker)
    if client.connected_flag:
        client.disconnect()
        client.connected_flag = False


def on_log(client, userdata, level, buf):
    print(buf)


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        client.connected_flag = True  # set flag
        for c in clients:
          print("connected OK")
    else:
        print("Bad connection Returned code=", rc)
        client.loop_stop()


def on_disconnect(client, userdata, rc):
    client.connected_flag = False  # set flag
    # print("client disconnected ok")


def on_publish(client, userdata, mid):
    print("In on_pub callback mid= ", mid)

def pub(client, loop_delay):

    rmd_current = round(random.uniform(0.6, 50.0), 2)
    rmd_pressure = round(random.uniform(0.6, 50.0), 2)
    global init_time
    if time.time() - init_time >= 3600:
        rmd_mnc = round(random.uniform(5.0, 30.0), 2)
        rmd_sdc = round(random.random(), 2)
        rmd_mnp = round(random.uniform(5.0, 30.0), 2)
        rmd_sdp = round(random.random(), 2)

        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
        client.publish('v1/devices/me/telemetry',
                       '{"MnC": "%s", "SdC": "%s", "Str": "2554","Stp": "2554", '
                       '"MnP": "%s", "SdP": "%s"}' % (rmd_mnc, rmd_sdc, rmd_mnp, rmd_sdp))

        init_time = time.time()
    else:
        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
    print(datetime.datetime.now())
    time.sleep(loop_delay)

def Create_connections(n_clients, threads):
    for i in range(len(n_clients)):
        cname = "client" + n_clients[i]["name"]
        t = int(time.time())
        client_id = cname + str(t)  # create unique client_id
        client = mqtt.Client(client_id)  # create new instance
        clients[i]["client"] = client
        clients[i]["client_id"] = client_id
        clients[i]["cname"] = cname
        broker_p = clients[i]["broker"]
        port = clients[i]["port"]
        token = clients[i]["token"]
        client.on_connect = on_connect
        client.on_disconnect = on_disconnect
        client.on_publish = on_publish
        #client.on_message = on_message
        t = threading.Thread(target=client_loop, args=(client, broker_p, port, token, 600, pub))
        threads.append(t)
        t.start()

def main_loop(clients_loop):

    mqtt.Client.connected_flag = False  # create flag in class
    mqtt.Client.bad_connection_flag = False  # create flag in class

    threads = []
    print("Creating Connections ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("Publishing ")
    Create_connections(clients_loop, threads)

    print("All clients connected ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("starting main loop")
    try:
        while True:
            time.sleep(10)
            no_threads = threading.active_count()
            print("current threads =", no_threads)
            for c in clients_loop:
                if not c["client"].connected_flag:
                    print("broker ", c["broker"], " is disconnected" , c["name"])
                    time.sleep(1)

    except KeyboardInterrupt:
        print("ending")
        for c in clients:
            c["client"].run_flag = False
        
    time.sleep(10)

if __name__ == '__main__':

    # In case the user is using a demo version or local version of thingsboard
    things_location = input("What type of thingsboard installation are you working with (demo/local)? ")

    if things_location == "demo":
        type_install = "demo.thingsboard.io"
        header = Things.get_credentials(things_location)
    elif things_location == "local":
        computer = input("Which computer? ")
        type_install = "cseetprj%s.essex.ac.uk:8080" % computer
        broker = "cseetprj%s.essex.ac.uk" % computer
        header = Things.get_credentials("local", type_install)
    else:
        print("Error: Installation not supported")

    my_devices = Things.get_devices_id(header, type_install)

    clients = []
    for device in my_devices:
        device_info = {"broker": broker, "port": 1883, "name": device["name"],
                       "token": Things.get_device_token(device["id"]["id"], header, type_install)}
        clients.append(device_info)

    if len(clients) >= 200:
        print("Splitting devices to multiprocess")
        split_by = math.ceil(len(clients) / 250)
        split_clients = np.array_split(clients, split_by)

    jobs = []
    for idx, client_portion in enumerate(split_clients):
        print("Starting process for portion %s" % (idx + 1))
        p = multiprocessing.Process(target=main_loop, args = (client_portion,))
        jobs.append(p)
        p.start()

I cannot get past the create_connection part, all the clients connect but dont publish after it. I think is related to the order or possition of functions. But I dont why, any thoughts?

fjpa121197 avatar Jul 14 '20 10:07 fjpa121197

import multiprocessing
p = multiprocessing.Process(target=yourthreadscreateandstartfunc)

def yourthreadscreateandstartfunc:
    for loop
       create thread
       start thread

each process should less than 340 threads

Hi, how many connections were able to make for each process? And did you change any configuration in the machine that recieve the connections?

fjpa121197 avatar Jul 18 '20 11:07 fjpa121197

may be your server limit

yxlwfds avatar Nov 18 '20 10:11 yxlwfds