tokio-modbus icon indicating copy to clipboard operation
tokio-modbus copied to clipboard

Trying to understand asynchronous client.

Open glennpierce opened this issue 5 years ago • 4 comments

Hi I am having trouble understanding how or the best way to make many requests in a multi threaded way. Basically I was trying to simulate talking to a couple of thousand modbus devices as quickly as possible. As I don't have those devices :) I was running your tcp-server.rs example with added delays of 60-80 ms. This seems reasonable as a simulation.

My trouble is developing the client code.

I want all the requests to be asynchronous. Once they all complete I would store these results into a db.

Would you recommend running code like

let socket_addr = "192.168.0.222:502".parse().unwrap();
let mut ctx = sync::tcp::connect(socket_addr).unwrap();
let buff = ctx.read_input_registers(0x1000, 7).unwrap();

in many threads and having each result send back through a channel ? or is there a better way built in using tokio async ?

Thanks

glennpierce avatar Feb 21 '20 13:02 glennpierce

I also believe there is an issue with sync::tcp::connect not closing the socket correctly.

I have the following code that eventually fails with too many open files error. I have tested this with modbus-rs and it works.

extern crate chrono;
extern crate crossbeam_channel;

use std::sync::{Arc, Mutex};
use tokio_core::reactor::Core;
use futures::Future;
use tokio_modbus::prelude::*;
use systemd::daemon;

use chrono::{DateTime, NaiveDate, TimeZone, Utc};
use std::thread;

#[derive(Clone, Debug)]
struct MeterResult {
    name: String,
    ip: String,
    result: Option<u32>,
    last_dt: Option<DateTime<Utc>>
}

type MeterValue = (MeterResult, u32);

const NUMBER_OF_THREADS: usize = 10;
const NUMBER_OF_METERS: usize = 1000;

pub fn main() {
    let mut meters: Vec<MeterResult> = vec![];

    for i in 1..NUMBER_OF_METERS {
        let meter: MeterResult = MeterResult {name: format!("bi{}", i),
                                              ip: "127.0.0.1:5502".to_string(),
                                              result: None,
                                              last_dt: None
                                             };

        meters.push(meter);
    }
let mut values = Arc::new(Mutex::new(vec![]));
    let mut chunks = meters.chunks(NUMBER_OF_THREADS);

    for chunk in chunks {

        let mut threads = vec![];

        for meter in chunk {
            threads.push(thread::spawn( {

                let clone = Arc::clone(&values);
                let m = meter.clone();
                    move || {

                        let socket_addr = "127.0.0.1:5502".parse().unwrap();
                        let mut ctx = sync::tcp::connect(socket_addr).unwrap();
                        //let data = ctx.read_holding_registers(0x16, 2).unwrap();
                        //let value : u32 = ((data[1] as u32) << 16) | (data[0] as u32);
                        let mut v = clone.lock().unwrap();
                        v.push((m.clone(), 2));
                        //ctx.disconnect();
        //                drop(ctx);
        //                drop(socket_addr);
                    }
                }));
        }

        for t in threads {
            t.join().unwrap();
        }
        println!("batch done");
    }
}

glennpierce avatar Feb 26 '20 10:02 glennpierce

For everyone encountering this issue: I once experienced a similar problem with the sync client. If you use tokio driven libraries you are actually best off to use them with async/await. One solution for the first question of this issue, is to use tokio::spawn to spawn tasks and at the end join them using the futures crate. This solution lets tokio decide how many threads to use and to concurrently run multiple tasks on one thread if for instance one of the slaves takes long to respond to the request.

DrSloth avatar Jan 31 '21 09:01 DrSloth

I can't figure out how to make async requests via one Context. The problem is read_* requests expect mutable reference to Context. I'd like to write something like that:

    let mut context = tokio_modbus::client::tcp::connect("127.0.0.1:502".parse()?).await?;
    let (a, b) = tokio::join!(
        context.read_holding_registers(1, 1),
        context.read_holding_registers(2, 1),
    );
    
    context.disconnect().await?;
    dbg!(a, b);

I've tried to create a context for every request but a server doesn't like this. I thought about to wrap context in Arc<RwLock> but I think it's a not good idea.

aleksuss avatar Dec 03 '21 12:12 aleksuss

Because pretty much everything is uses &mut self you can just use Arc<Mutex<Context>>, i would recommend the tokio Mutex.

Internal state is mutated when making any networking request, reading from a std::net::TcpStream also requires it to be mutable for instance. It is generally not possible to make concurrent/parallel requests over the same connection.

For concurrent writes to the same server you would need two contexts but many devices can't handle parallel requests correctly.

Hope that helps.

DrSloth avatar Dec 03 '21 14:12 DrSloth