paho.mqtt.python icon indicating copy to clipboard operation
paho.mqtt.python copied to clipboard

Code for multiple clients - Is there any possible error that disconnects a client after a while?

Open fjpa121197 opened this issue 4 years ago • 2 comments

Hi, Im trying to create multiple clients that connect to an IoT platform that is hosted in a remote linux machine. I have already managed to get the code working with 200 clients. However, when I try to increase the number (1000 clients), I get some rc=3 (this number varies) and some of the clients end up disconnecting. I dont know if there is something in the code that causes these behaviours (I cant tell if Im doing something wrong).

Any suggestions with my code?

Thanks in advance

import multiprocessing
import paho.mqtt.client as mqtt
import time
import threading
import logging
import math
import thingsboard_objects as Things
import random
import datetime
import numpy as np
import sys
logging.basicConfig(level=logging.INFO)

init_time = time.time()
disconnected = 0

def Connect(client, broker, port, token, keepalive, run_forever=False):
    connflag = False
    delay = 5
    print("connecting ",client)
    badcount = 0  # counter for bad connection attempts
    while not connflag:
        print(logging.info("connecting to broker " + str(broker)))
        # print("connecting to broker "+str(broker)+":"+str(port))
        print("Attempts ", str(badcount))
        time.sleep(2)
        try:
            client.username_pw_set(token)
            client.connect(broker, port, keepalive)
            time.sleep(1)
            connflag = True

        except:
            pass
            #client.badconnection_flag = True
            #logging.info("connection failed " + str(badcount))
            #time.sleep(5)
            #badcount += 1
            #if badcount >= 3 and not run_forever:
            #    return -1
            #    raise SystemExit  # give up

    return 0


def wait_for(client, msgType, period=1, wait_time=15, running_loop=False):
    """Will wait for a particular event gives up after period*wait_time, Default=10
seconds.Returns True if succesful False if fails"""
    # running loop is true when using loop_start or loop_forever
    client.running_loop = running_loop  #
    wcount = 0
    while True:
        logging.info("waiting" + msgType)
        if msgType == "CONNACK":
            if client.on_connect:
                if client.connected_flag:
                    return True
                if client.bad_connection_flag:  #
                    return False

        if msgType == "SUBACK":
            if client.on_subscribe:
                if client.suback_flag:
                    return True
        if msgType == "MESSAGE":
            if client.on_message:
                if client.message_received_flag:
                    return True
        if msgType == "PUBACK":
            if client.on_publish:
                if client.puback_flag:
                    return True

        if not client.running_loop:
            client.loop(.01)  # check for messages manually
        time.sleep(period)
        wcount += 1
        if wcount > wait_time:
            print("return from wait loop taken too long")
            return False
    return True


def client_loop(client, broker, port, token, keepalive=600, loop_function=None,
                loop_delay=10, run_forever=False):
    """runs a loop that will auto reconnect and subscribe to topics
    pass topics as a list of tuples. You can pass a function to be
    called at set intervals determined by the loop_delay
    """
    client.run_flag = True
    client.broker = broker
    print("running loop ")
    client.reconnect_delay_set(min_delay=1, max_delay=12)

    while client.run_flag:  # loop forever

        if client.bad_connection_flag:
            break
        if not client.connected_flag:
            print("Connecting to " + broker)
            if Connect(client, broker, port, token, keepalive, run_forever) != -1:
                if not wait_for(client, "CONNACK"):
                    client.run_flag = True  # break no connack
            else:  # connect fails
                client.run_flag = False  # break
                print("quitting loop for  broker ", broker)

        client.loop(0.01)

        if client.connected_flag and loop_function:  # function to call
            loop_function(client, loop_delay)  # call function

    time.sleep(1)
    print("disconnecting from", broker)
    if client.connected_flag:
        client.disconnect()
        client.connected_flag = False


def on_log(client, userdata, level, buf):
    print(buf)


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        client.connected_flag = True  # set flag
        for c in clients:
          #print("connected OK")
          pass
    else:
        print("Bad connection Returned code=", rc)
        file1 = open("bad_connections.txt","a")#append mode 
        file1.write("Bad connection Returned code=%s \n" % rc) 
        file1.close() 
        client.loop_stop()


def on_disconnect(client, userdata, rc):
    client.connected_flag = False  # set flag
    print("client disconnected ok")


def on_publish(client, userdata, mid):
    print("In on_pub callback mid= ", mid)

def pub(client, loop_delay):

    rmd_current = round(random.uniform(0.6, 50.0), 2)
    rmd_pressure = round(random.uniform(0.6, 50.0), 2)
    global init_time
    if time.time() - init_time >= 3600:
        rmd_mnc = round(random.uniform(5.0, 30.0), 2)
        rmd_sdc = round(random.random(), 2)
        rmd_mnp = round(random.uniform(5.0, 30.0), 2)
        rmd_sdp = round(random.random(), 2)

        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
        client.publish('v1/devices/me/telemetry',
                       '{"MnC": "%s", "SdC": "%s", "Str": "2554","Stp": "2554", '
                       '"MnP": "%s", "SdP": "%s"}' % (rmd_mnc, rmd_sdc, rmd_mnp, rmd_sdp))

        init_time = time.time()
    else:
        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
    print(datetime.datetime.now())
    time.sleep(loop_delay)

def Create_connections(n_clients, threads):
    for i in range(len(n_clients)):
        cname = "client-" + n_clients[i]["name"]
        t = int(time.time())
        client_id = cname + str(t)  # create unique client_id
        client = mqtt.Client(client_id)  # create new instance
        n_clients[i]["client"] = client
        n_clients[i]["client_id"] = client_id
        n_clients[i]["cname"] = cname
        broker_p = n_clients[i]["broker"]
        port = n_clients[i]["port"]
        token = n_clients[i]["token"]
        client.on_connect = on_connect
        client.on_disconnect = on_disconnect
        client.on_publish = on_publish
        #client.on_message = on_message
        t = threading.Thread(target=client_loop, args=(client, broker_p, port, token, 600, pub))
        threads.append(t)
        t.start()

def main_loop(clients_loop):

    mqtt.Client.connected_flag = False  # create flag in class
    mqtt.Client.bad_connection_flag = False  # create flag in class

    threads = []
    print("Creating Connections ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("Publishing ")
    Create_connections(clients_loop, threads)

    print("All clients connected ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("starting main loop")
    try:
        while True:
            time.sleep(10)
            no_threads = threading.active_count()
            print("current threads =", no_threads)
            for c in clients_loop:
                if not c["client"].connected_flag:
                    print("broker ", c["broker"], " is disconnected" , c["name"])
                    file2 = open("disconnects.txt","a")#append mode 
                    file2.write("broker %s is disconnected %s \n" % (c["broker"], c["name"])) 
                    file2.close()
                    time.sleep(1)
                    #sys.exit("A connection was dropped")

    except KeyboardInterrupt:
        print("ending")
        for c in clients_loop:
            c["client"].run_flag = False
        
    time.sleep(10)

if __name__ == '__main__':

    # In case the user is using a demo version or local version of thingsboard
    things_location = input("What type of thingsboard installation are you working with (demo/local)? ")

    if things_location == "demo":
        type_install = "demo.thingsboard.io"
        header = Things.get_credentials(things_location)
    elif things_location == "local":
        computer = input("Which computer? ")
        type_install = "cseetprj%s.essex.ac.uk:8080" % computer
        broker = "cseetprj%s.essex.ac.uk" % computer
        header = Things.get_credentials("local", type_install)
    else:
        print("Error: Installation not supported")

    my_devices = Things.get_devices_id(header, type_install)
    
    clients = []
    for device in my_devices:
        device_info = {"broker": broker, "port": 1883, "name": device["name"],
                       "token": Things.get_device_token(device["id"]["id"], header, type_install)}
        clients.append(device_info)
        
    print(len(clients))
    time.sleep(5)
    if len(clients) >= 200:
        print("Splitting devices to multiprocess")
        split_by = math.ceil(len(clients) / 250)
        split_clients = np.array_split(clients, split_by)

    jobs = []
    for idx, client_portion in enumerate(split_clients):
        print("Starting process for portion %s" % (idx + 1))
        p = multiprocessing.Process(target=main_loop, args = (client_portion,))
        jobs.append(p)
        p.start()
        
    for job in jobs:
      print("Ending process")
      job.join()

Also, is there any reason or possible explanation of why when publishing two messages(data) one after another, the second one doesnt arrive?

For example, using the 200 clients sending data every 10 seconds, the following part is sent every hour:

 client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
        client.publish('v1/devices/me/telemetry',
                       '{"MnC": "%s", "SdC": "%s", "Str": "2554","Stp": "2554", '
                       '"MnP": "%s", "SdP": "%s"}' % (rmd_mnc, rmd_sdc, rmd_mnp, rmd_sdp))

However, I sometimes see that the second message (data) doesnt arrive for all clients and if it does, the interval is not every hour.

fjpa121197 avatar Jul 20 '20 15:07 fjpa121197

up

yxlwfds avatar Nov 18 '20 07:11 yxlwfds

Apologies for the huge delay in responding (trying to clean up old issues on this repo).

Firstly I'd appreciate it if you could confirm you are still having this issue (quite possible you have moved onto another solution or updates to the library have resolved the issue).

I get some rc=3 (this number varies) and some of the clients end up disconnecting

Can you please confirm which function is returning the error (or is it a mix?) and also the frequency that this is occurring (you would expect some connection loss over time; especially with 1000+ connections).

I'll leave it there for now - note that if there is no response in a month we will close this off.

MattBrittan avatar Jan 07 '24 03:01 MattBrittan

Closing due to age/lack of activity.

MattBrittan avatar Jul 18 '24 00:07 MattBrittan