Immediate send/recvs hang when the packets being sent are large
Say I have 4 MPI processes labelled: P0, P1, P2, P3. Each process potentially has packets to send to other processes, but may not.
I.e. P0 needs to send packets to P1 and P2, or
P0->[P1, P2]
Similarly,
P1->[P3]
P2 ->[]
P3 -> [P1]
So P1 has to receive potential packets from both P0 and P3, and P3 has to receive packets from P1, and P2 from P0.
After doing an all-reduce to calculate the receive counts at each process, I'm sending the packets as follows:
for (i, packet) in packets.iter().enumerate() {
let partner_process = world.process_at_rank(packet_destinations[i]);
mpi::request::scope(|scope| {
let _sreq = WaitGuard::from(partner_process.immediate_send(scope, &packet[..]));
});
}
for (i, &recv_rank) in received_packet_sources.iter().enumerate() {
let partner_process = world.process_at_rank(recv_rank);
mpi::request::scope(|scope| {
let _rreq = WaitGuard::from(partner_process.immediate_receive_into(scope, &mut buffers[i][..]));
});
}
Where the sends and receives will in general be from different processes for each given process. This code works ok when the packets are quite small (<100 elements) but it hangs for larger packet sizes.
C Code for the above would be ok, there would be no wait guard, but you'd add a wait all at the end of the two for loops. How do I replicate this in Rust? Would appreciate any pointers.
Can you look at immediate_multiple_requests.rs to see if it answers your question about scope? The issue is that your messages exceed the MPI implementation's "eager threshold" (usually an environment-tunable parameter, but your code shouldn't rely on it) and thus sends can't complete until receives are posted, but as you've written it, you can't get around to posting receives until your rank has waited on its sends.
I've tried the following, and it appears to work
mpi::request::multiple_scope(nreqs as usize, |scope, coll| {
for (i, packet) in packets.iter().enumerate() {
let sreq = world
.process_at_rank(packet_destinations[i])
.immediate_send(scope, &packet[..]);
coll.add(sreq);
}
for (i, buffer) in buffers.iter_mut().enumerate() {
let rreq = world
.process_at_rank(received_packet_sources[i])
.immediate_receive_into(scope, &mut buffer[..]);
coll.add(rreq);
}
let mut out = vec![];
coll.wait_all(&mut out);
assert_eq!(out.len(), nreqs as usize);
});
thank you for your suggestion, there's an unrelated issue though, which I think is coming from my MPI implentation (OpenMPI 4.1.4). When I run this code with the max number of processors on my machine, I get a weird system call error, but only some of the time, which may be related to this code.
System call: unlink(2) /var/folders/6g/xsc42sxx5lvdkl0z4_tr8_kh0000gn/T//ompi.eduroam-int-dhcp-97-1-127.501/pid.78254/1/vader_segment.eduroam-int-dhcp-97-1-127.501.77210001.3
Occasionally, the code also fails with the previous setup with the error,
[eduroam-int-dhcp-97-1-127:78465] *** An error occurred in MPI_Wait
[eduroam-int-dhcp-97-1-127:78465] *** reported by process [1947140097,0]
[eduroam-int-dhcp-97-1-127:78465] *** on communicator MPI COMMUNICATOR 4 DUP FROM 3
Do you have any idea what may be causing this?
I don't see anything obviously wrong in the code. Could you share a test case (maybe a gist of the repo you're testing)? I like to try with both Open MPI and MPICH (with error checking configured) because they do different input validation and sometimes the difference in behavior help narrow down code issues. (I'm not sure there is a code issue, but if your MPI isn't broken with other apps, then something is going wrong.)
~~The second error seems to be something to do with sending packets of size 0, and attempting to insert into buffers initialized with 0 length.~~ Actually, still appearing, not sure now.
The first error I'm still not sure though, and seems to only happen a minority of the time. The code is here
Playing with this example more, it seems to only occur on my Arm M1 Mac (running openmpi 4.0, will try MPICH now), the code runs fine on Ubuntu 20.04 with same MPI version, with no warnings or errors at all.
You may want to take a look at the workarounds discussed at https://github.com/open-mpi/ompi/issues/8531
You may want to take a look at the workarounds discussed at open-mpi/ompi#8531
I tried the tmpdir workaround but I'm getting the same error.
Is this issue believed to be an rsmpi issue or can it be closed?
I am seeing something similar, but it could just be that I am a MPI novice. The messages I am trying to send are small (24 bytes), I am using rustv1.7.3 and trying to execute on an intel-based MBP. I have OpenMPI version 5.0.2 (Could this be my issue?). I also set the temp dir to ~/tmp. I put more details in this discussion: https://github.com/rsmpi/rsmpi/discussions/181#discussioncomment-8946494 , but here is the code that is failing:
use bincode;
#[allow(unused_imports)]
use crdts::{CmRDT, Dot, GCounter};
use mpi::request::WaitGuard;
use mpi::traits::*;
use rand::{thread_rng, Rng};
use std::{thread, time};
fn main() {
#[allow(unused_variables)]
// number of fake jobs to process (i.e: count up to this number)
const NJOBS: u16 = 2000;
// Represents the worst time job can take to finish
let dur = time::Duration::from_millis(25);
// Represents the probability of performing a large job:
const P: f64 = 0.015; // 15% chance of a long running job (i.e: 2.5 seconds)
let mut rng = thread_rng();
// Init MPI
let universe = mpi::initialize().unwrap();
let world = universe.world();
// Get the rank and size of the communicator
let size = world.size();
let rank = world.rank();
// Define the processor that sends to this rank and that receives from this rank:
let prev_rank = (rank + size - 1) % size;
let next_rank = (rank + 1) % size;
// Init local replica of the count
let mut local_counter = GCounter::new();
println!(
"Prev rank: {}, This rank: {}, Next rank: {}",
prev_rank, rank, next_rank
);
for _n in 0..NJOBS {
// simulate a long or short process between incrementing the count:
if rng.gen_bool(P) {
thread::sleep(dur);
//println!("Running a long job on processor {}", rank);
}
// Declare intent to increment by creating an `Op`. Use rank as the uid of the replica
let inc_op = local_counter.inc(rank);
// Apply the operation to our local counter first and then send the `Op` off to others
local_counter.apply(inc_op.clone()); // Apply the increment:
// Serialize the local operation for sending:
let send_op: Vec<u8> = bincode::serialize(&inc_op).unwrap();
// For getting the most up-to-date count of the remote Dot
let mut remote_op: Vec<u8> = Vec::new();
// Simple point-to-point messaging in a ring configuration:
mpi::request::scope(|scope| {
// Receive the operation to increment the counter from the previous rank:
let _rreq = WaitGuard::from(
world
.process_at_rank(prev_rank)
.immediate_receive_into(scope, &mut remote_op),
);
// Send the change that was applied by the local counter:
let _sreq = WaitGuard::from(
world
.process_at_rank(next_rank)
.immediate_ready_send(scope, &send_op),
);
});
// Deserialize the remote_op:
let recv_op: Dot<mpi::Rank> = bincode::deserialize(&remote_op).unwrap();
// Apply the change to the local counter:
local_counter.apply(recv_op);
}
println!(
"Processor {} says the count = {:?}",
rank,
local_counter.read()
);
}
The error is:
❯ mpirun -np 4 --tmpdir ~/tmp target/debug/MPI_Example
Prev rank: 1, This rank: 2, Next rank: 3
Prev rank: 3, This rank: 0, Next rank: 1
Prev rank: 0, This rank: 1, Next rank: 2
Prev rank: 2, This rank: 3, Next rank: 0
[MacBook-Pro:00000] *** An error occurred in MPI_Wait
[MacBook-Pro:00000] *** reported by process [783941633,2]
[MacBook-Pro:00000] *** on communicator MPI_COMM_WORLD
[MacBook-Pro:00000] *** MPI_ERR_TRUNCATE: message truncated
[MacBook-Pro:00000] *** MPI_ERRORS_ARE_FATAL (processes in this communicator will now abort,
[MacBook-Pro:00000] *** and MPI will try to terminate your MPI job as well)
In your example, the receive remote_op needs to actually have the size. In this case, that looks like vec![0; 12] instead of Vec::new(). Also, you can't use *ready_send unless you can guarantee that the receiver has already posted a matching receive. That's a race condition here, so the present code is noncompliant. I think it's okay once you fix those two things.
See also #182 -- I'm inclined to make ready send unsafe in the next release.
In your example, the receive
remote_opneeds to actually have the size. In this case, that looks likevec![0; 12]instead ofVec::new(). Also, you can't use*ready_sendunless you can guarantee that the receiver has already posted a matching receive. That's a race condition here, so the present code is noncompliant. I think it's okay once you fix those two things.
Thank you for the feedback @jedbrown. I am still getting familiar with MPI in general, so I appreciate the feedback. I was trying to follow the code example provided in examples/immediate.rs.
Yeah, that's what I figured so I added this comment. (I'll merge this with some more housekeeping bits, which will probably include making ready-mode unsafe.)
https://github.com/rsmpi/rsmpi/commit/1e5eeabc7f2b7f55976d3bc8df0867a229d03425#diff-08ccf39fba6d59419fdacc6fa9fb0003071407cc2792c0a8811369aea0fc57d1R34-R36
@jedbrown , I am a bit confused by some of the examples. Is there a pattern you would recommend? From my example I am simply trying to receive from the previous rank and store what is received into remote_op, and send &send_op to next_rank. Best, Dave
Just replace immediate_ready_send with immediate_send.
Oh, I am so sorry. I had done that and was trying to run a previous build on accident. Totally my fault; the code runs now!!! Thank you @jedbrown, I am going to build off of this!
Here is the final example that compiles and runs:
use bincode;
#[allow(unused_imports)]
use crdts::{CmRDT, Dot, GCounter};
use mpi::request::WaitGuard;
use mpi::traits::*;
use rand::{thread_rng, Rng};
use std::{sync::WaitTimeoutResult, thread, time};
fn main() {
#[allow(unused_variables)]
// number of fake jobs to process (i.e: count up to this number)
const NJOBS: u16 = 2000;
// Represents the worst time job can take to finish
let dur = time::Duration::from_millis(25);
// Represents the probability of performing a large job:
const P: f64 = 0.015; // 1.5% chance of a long running job (i.e: 25 milliseconds)
let mut rng = thread_rng();
// Init MPI
let universe = mpi::initialize().unwrap();
let world = universe.world();
// Get the rank and size of the communicator
let size = world.size();
let rank = world.rank();
// Define the processor that sends to this rank and that receives from this rank:
let prev_rank = (rank + size - 1) % size;
let next_rank = (rank + 1) % size;
// Init local replica of the count
let mut local_counter = GCounter::new();
println!(
"Prev rank: {}, This rank: {}, Next rank: {}",
prev_rank, rank, next_rank
);
for _n in 0..NJOBS {
// simulate a long or short process between incrementing the count:
if rng.gen_bool(P) {
thread::sleep(dur);
// println!("Running a long job on processor {}", rank);
}
// Declare intent to increment by creating an `Op`. Use rank as the uid of the replica
let inc_op = local_counter.inc(rank);
// Apply the operation to our local counter first and then send the `Op` off to others
local_counter.apply(inc_op.clone()); // Apply the increment:
// Serialize the local operation for sending:
let send_op: Vec<u8> = bincode::serialize(&inc_op).unwrap();
// For getting the most up-to-date count of the remote Dot
let mut remote_op = vec![0u8; 24]; // the buffer must provide a size
// Simple point-to-point messaging in a ring configuration:
mpi::request::scope(|scope| {
let _rreq = WaitGuard::from(
world
.process_at_rank(prev_rank)
.immediate_receive_into(scope, &mut remote_op),
);
let _sreq = WaitGuard::from(
world
.process_at_rank(next_rank)
.immediate_send(scope, &send_op),
);
});
// Deserialize the remote_op:
let recv_op: Dot<mpi::Rank> = bincode::deserialize(&remote_op).unwrap();
// Apply the change to the local counter:
local_counter.apply(recv_op);
}
println!(
"Processor {} says the count = {:?}",
rank,
local_counter.read()
);
}