tokio-modbus
tokio-modbus copied to clipboard
Trying to understand asynchronous client.
Hi I am having trouble understanding how or the best way to make many requests in a multi threaded way. Basically I was trying to simulate talking to a couple of thousand modbus devices as quickly as possible. As I don't have those devices :) I was running your tcp-server.rs example with added delays of 60-80 ms. This seems reasonable as a simulation.
My trouble is developing the client code.
I want all the requests to be asynchronous. Once they all complete I would store these results into a db.
Would you recommend running code like
let socket_addr = "192.168.0.222:502".parse().unwrap();
let mut ctx = sync::tcp::connect(socket_addr).unwrap();
let buff = ctx.read_input_registers(0x1000, 7).unwrap();
in many threads and having each result send back through a channel ? or is there a better way built in using tokio async ?
Thanks
I also believe there is an issue with sync::tcp::connect not closing the socket correctly.
I have the following code that eventually fails with too many open files error. I have tested this with modbus-rs and it works.
extern crate chrono;
extern crate crossbeam_channel;
use std::sync::{Arc, Mutex};
use tokio_core::reactor::Core;
use futures::Future;
use tokio_modbus::prelude::*;
use systemd::daemon;
use chrono::{DateTime, NaiveDate, TimeZone, Utc};
use std::thread;
#[derive(Clone, Debug)]
struct MeterResult {
name: String,
ip: String,
result: Option<u32>,
last_dt: Option<DateTime<Utc>>
}
type MeterValue = (MeterResult, u32);
const NUMBER_OF_THREADS: usize = 10;
const NUMBER_OF_METERS: usize = 1000;
pub fn main() {
let mut meters: Vec<MeterResult> = vec![];
for i in 1..NUMBER_OF_METERS {
let meter: MeterResult = MeterResult {name: format!("bi{}", i),
ip: "127.0.0.1:5502".to_string(),
result: None,
last_dt: None
};
meters.push(meter);
}
let mut values = Arc::new(Mutex::new(vec![]));
let mut chunks = meters.chunks(NUMBER_OF_THREADS);
for chunk in chunks {
let mut threads = vec![];
for meter in chunk {
threads.push(thread::spawn( {
let clone = Arc::clone(&values);
let m = meter.clone();
move || {
let socket_addr = "127.0.0.1:5502".parse().unwrap();
let mut ctx = sync::tcp::connect(socket_addr).unwrap();
//let data = ctx.read_holding_registers(0x16, 2).unwrap();
//let value : u32 = ((data[1] as u32) << 16) | (data[0] as u32);
let mut v = clone.lock().unwrap();
v.push((m.clone(), 2));
//ctx.disconnect();
// drop(ctx);
// drop(socket_addr);
}
}));
}
for t in threads {
t.join().unwrap();
}
println!("batch done");
}
}
For everyone encountering this issue: I once experienced a similar problem with the sync client. If you use tokio driven libraries you are actually best off to use them with async/await. One solution for the first question of this issue, is to use tokio::spawn to spawn tasks and at the end join them using the futures crate. This solution lets tokio decide how many threads to use and to concurrently run multiple tasks on one thread if for instance one of the slaves takes long to respond to the request.
I can't figure out how to make async
requests via one Context
. The problem is read_*
requests expect mutable reference to Context
. I'd like to write something like that:
let mut context = tokio_modbus::client::tcp::connect("127.0.0.1:502".parse()?).await?;
let (a, b) = tokio::join!(
context.read_holding_registers(1, 1),
context.read_holding_registers(2, 1),
);
context.disconnect().await?;
dbg!(a, b);
I've tried to create a context for every request but a server doesn't like this. I thought about to wrap context in Arc<RwLock>
but I think it's a not good idea.
Because pretty much everything is uses &mut self
you can just use Arc<Mutex<Context>>
, i would recommend the tokio Mutex.
Internal state is mutated when making any networking request, reading from a std::net::TcpStream
also requires it to be mutable for instance. It is generally not possible to make concurrent/parallel requests over the same connection.
For concurrent writes to the same server you would need two contexts but many devices can't handle parallel requests correctly.
Hope that helps.