rsmpi icon indicating copy to clipboard operation
rsmpi copied to clipboard

Immediate send/recvs hang when the packets being sent are large

Open skailasa opened this issue 3 years ago • 16 comments

Say I have 4 MPI processes labelled: P0, P1, P2, P3. Each process potentially has packets to send to other processes, but may not.

I.e. P0 needs to send packets to P1 and P2, or

    P0->[P1, P2]

Similarly,

    P1->[P3] 
    P2 ->[]
    P3 -> [P1]

So P1 has to receive potential packets from both P0 and P3, and P3 has to receive packets from P1, and P2 from P0.

After doing an all-reduce to calculate the receive counts at each process, I'm sending the packets as follows:

    for (i, packet) in packets.iter().enumerate() {
        let partner_process = world.process_at_rank(packet_destinations[i]);
        mpi::request::scope(|scope| {
            let _sreq = WaitGuard::from(partner_process.immediate_send(scope, &packet[..]));
        });
    }

    for (i, &recv_rank) in received_packet_sources.iter().enumerate() {
        let partner_process = world.process_at_rank(recv_rank);
        mpi::request::scope(|scope| {
            let _rreq = WaitGuard::from(partner_process.immediate_receive_into(scope, &mut buffers[i][..]));
        }); 
    }

Where the sends and receives will in general be from different processes for each given process. This code works ok when the packets are quite small (<100 elements) but it hangs for larger packet sizes.

C Code for the above would be ok, there would be no wait guard, but you'd add a wait all at the end of the two for loops. How do I replicate this in Rust? Would appreciate any pointers.

skailasa avatar Jan 06 '23 16:01 skailasa

Can you look at immediate_multiple_requests.rs to see if it answers your question about scope? The issue is that your messages exceed the MPI implementation's "eager threshold" (usually an environment-tunable parameter, but your code shouldn't rely on it) and thus sends can't complete until receives are posted, but as you've written it, you can't get around to posting receives until your rank has waited on its sends.

jedbrown avatar Jan 06 '23 17:01 jedbrown

I've tried the following, and it appears to work

  mpi::request::multiple_scope(nreqs as usize, |scope, coll| {

        for (i, packet) in packets.iter().enumerate() {
            let sreq = world
                .process_at_rank(packet_destinations[i])
                .immediate_send(scope, &packet[..]);
                coll.add(sreq);
        }

        for (i, buffer) in buffers.iter_mut().enumerate() {
            let rreq = world
                .process_at_rank(received_packet_sources[i])
                .immediate_receive_into(scope, &mut buffer[..]);
            coll.add(rreq);
        }
        let mut out = vec![];
        coll.wait_all(&mut out);
        assert_eq!(out.len(), nreqs as usize);
    });

thank you for your suggestion, there's an unrelated issue though, which I think is coming from my MPI implentation (OpenMPI 4.1.4). When I run this code with the max number of processors on my machine, I get a weird system call error, but only some of the time, which may be related to this code.

  System call: unlink(2) /var/folders/6g/xsc42sxx5lvdkl0z4_tr8_kh0000gn/T//ompi.eduroam-int-dhcp-97-1-127.501/pid.78254/1/vader_segment.eduroam-int-dhcp-97-1-127.501.77210001.3

Occasionally, the code also fails with the previous setup with the error,

[eduroam-int-dhcp-97-1-127:78465] *** An error occurred in MPI_Wait
[eduroam-int-dhcp-97-1-127:78465] *** reported by process [1947140097,0]
[eduroam-int-dhcp-97-1-127:78465] *** on communicator MPI COMMUNICATOR 4 DUP FROM 3

Do you have any idea what may be causing this?

skailasa avatar Jan 09 '23 12:01 skailasa

I don't see anything obviously wrong in the code. Could you share a test case (maybe a gist of the repo you're testing)? I like to try with both Open MPI and MPICH (with error checking configured) because they do different input validation and sometimes the difference in behavior help narrow down code issues. (I'm not sure there is a code issue, but if your MPI isn't broken with other apps, then something is going wrong.)

jedbrown avatar Jan 09 '23 13:01 jedbrown

~~The second error seems to be something to do with sending packets of size 0, and attempting to insert into buffers initialized with 0 length.~~ Actually, still appearing, not sure now.

The first error I'm still not sure though, and seems to only happen a minority of the time. The code is here

The code I'm testing is here, and is run using this example

skailasa avatar Jan 09 '23 14:01 skailasa

Playing with this example more, it seems to only occur on my Arm M1 Mac (running openmpi 4.0, will try MPICH now), the code runs fine on Ubuntu 20.04 with same MPI version, with no warnings or errors at all.

skailasa avatar Jan 10 '23 12:01 skailasa

You may want to take a look at the workarounds discussed at https://github.com/open-mpi/ompi/issues/8531

hppritcha avatar Jan 12 '23 21:01 hppritcha

You may want to take a look at the workarounds discussed at open-mpi/ompi#8531

I tried the tmpdir workaround but I'm getting the same error.

skailasa avatar Jan 17 '23 10:01 skailasa

Is this issue believed to be an rsmpi issue or can it be closed?

jedbrown avatar Oct 20 '23 22:10 jedbrown

I am seeing something similar, but it could just be that I am a MPI novice. The messages I am trying to send are small (24 bytes), I am using rustv1.7.3 and trying to execute on an intel-based MBP. I have OpenMPI version 5.0.2 (Could this be my issue?). I also set the temp dir to ~/tmp. I put more details in this discussion: https://github.com/rsmpi/rsmpi/discussions/181#discussioncomment-8946494 , but here is the code that is failing:

use bincode;
#[allow(unused_imports)]
use crdts::{CmRDT, Dot, GCounter};
use mpi::request::WaitGuard;
use mpi::traits::*;
use rand::{thread_rng, Rng};
use std::{thread, time};

fn main() {
    #[allow(unused_variables)]
    // number of fake jobs to process (i.e: count up to this number)
    const NJOBS: u16 = 2000;

    // Represents the worst time job can take to finish
    let dur = time::Duration::from_millis(25);

    // Represents the probability of performing a large job:
    const P: f64 = 0.015; // 15% chance of a long running job (i.e: 2.5 seconds)
    let mut rng = thread_rng();

    // Init MPI
    let universe = mpi::initialize().unwrap();
    let world = universe.world();

    // Get the rank and size of the communicator
    let size = world.size();
    let rank = world.rank();

    // Define the processor that sends to this rank and  that receives from this rank:
    let prev_rank = (rank + size - 1) % size;
    let next_rank = (rank + 1) % size;

    // Init local replica of the count
    let mut local_counter = GCounter::new();

    println!(
        "Prev rank: {}, This rank: {}, Next rank: {}",
        prev_rank, rank, next_rank
    );
    for _n in 0..NJOBS {
        // simulate a long or short process between incrementing the count:
        if rng.gen_bool(P) {
             thread::sleep(dur);
             //println!("Running a long job on processor {}", rank);
        }

        // Declare intent to increment by creating an `Op`. Use rank as the uid of the replica
        let inc_op = local_counter.inc(rank);

        // Apply the operation to our local counter first and then send the `Op` off to others
        local_counter.apply(inc_op.clone()); // Apply the increment:

        // Serialize the local operation for sending:
        let send_op: Vec<u8> = bincode::serialize(&inc_op).unwrap();

        // For getting the most up-to-date count of the remote Dot
        let mut remote_op: Vec<u8> = Vec::new();

        // Simple point-to-point messaging in a ring configuration:
        mpi::request::scope(|scope| {
            // Receive the operation to increment the counter from the previous rank:
            let _rreq = WaitGuard::from(
                world
                    .process_at_rank(prev_rank)
                    .immediate_receive_into(scope, &mut remote_op),
            );

            // Send the change that was applied by the local counter:
            let _sreq = WaitGuard::from(
                world
                    .process_at_rank(next_rank)
                    .immediate_ready_send(scope, &send_op),
            );
        });

        // Deserialize the remote_op:
        let recv_op: Dot<mpi::Rank> = bincode::deserialize(&remote_op).unwrap();

        // Apply the change to the local counter:
        local_counter.apply(recv_op);
    }
    println!(
        "Processor {} says the count = {:?}",
        rank,
        local_counter.read()
    );
}

The error is:

❯ mpirun -np 4 --tmpdir ~/tmp  target/debug/MPI_Example
Prev rank: 1, This rank: 2, Next rank: 3
Prev rank: 3, This rank: 0, Next rank: 1
Prev rank: 0, This rank: 1, Next rank: 2
Prev rank: 2, This rank: 3, Next rank: 0
[MacBook-Pro:00000] *** An error occurred in MPI_Wait
[MacBook-Pro:00000] *** reported by process [783941633,2]
[MacBook-Pro:00000] *** on communicator MPI_COMM_WORLD
[MacBook-Pro:00000] *** MPI_ERR_TRUNCATE: message truncated
[MacBook-Pro:00000] *** MPI_ERRORS_ARE_FATAL (processes in this communicator will now abort,
[MacBook-Pro:00000] ***    and MPI will try to terminate your MPI job as well)

DaveLanday avatar Apr 12 '24 19:04 DaveLanday

In your example, the receive remote_op needs to actually have the size. In this case, that looks like vec![0; 12] instead of Vec::new(). Also, you can't use *ready_send unless you can guarantee that the receiver has already posted a matching receive. That's a race condition here, so the present code is noncompliant. I think it's okay once you fix those two things.

jedbrown avatar Apr 14 '24 06:04 jedbrown

See also #182 -- I'm inclined to make ready send unsafe in the next release.

jedbrown avatar Apr 15 '24 02:04 jedbrown

In your example, the receive remote_op needs to actually have the size. In this case, that looks like vec![0; 12] instead of Vec::new(). Also, you can't use *ready_send unless you can guarantee that the receiver has already posted a matching receive. That's a race condition here, so the present code is noncompliant. I think it's okay once you fix those two things.

Thank you for the feedback @jedbrown. I am still getting familiar with MPI in general, so I appreciate the feedback. I was trying to follow the code example provided in examples/immediate.rs.

DaveLanday avatar Apr 15 '24 15:04 DaveLanday

Yeah, that's what I figured so I added this comment. (I'll merge this with some more housekeeping bits, which will probably include making ready-mode unsafe.) https://github.com/rsmpi/rsmpi/commit/1e5eeabc7f2b7f55976d3bc8df0867a229d03425#diff-08ccf39fba6d59419fdacc6fa9fb0003071407cc2792c0a8811369aea0fc57d1R34-R36

jedbrown avatar Apr 15 '24 15:04 jedbrown

@jedbrown , I am a bit confused by some of the examples. Is there a pattern you would recommend? From my example I am simply trying to receive from the previous rank and store what is received into remote_op, and send &send_op to next_rank. Best, Dave

DaveLanday avatar Apr 15 '24 18:04 DaveLanday

Just replace immediate_ready_send with immediate_send.

jedbrown avatar Apr 15 '24 19:04 jedbrown

Oh, I am so sorry. I had done that and was trying to run a previous build on accident. Totally my fault; the code runs now!!! Thank you @jedbrown, I am going to build off of this!

Here is the final example that compiles and runs:

use bincode;
#[allow(unused_imports)]
use crdts::{CmRDT, Dot, GCounter};
use mpi::request::WaitGuard;
use mpi::traits::*;
use rand::{thread_rng, Rng};
use std::{sync::WaitTimeoutResult, thread, time};

fn main() {
    #[allow(unused_variables)]
    // number of fake jobs to process (i.e: count up to this number)
    const NJOBS: u16 = 2000;

    // Represents the worst time job can take to finish
    let dur = time::Duration::from_millis(25);

    // Represents the probability of performing a large job:
    const P: f64 = 0.015; // 1.5% chance of a long running job (i.e: 25 milliseconds)
    let mut rng = thread_rng();

    // Init MPI
    let universe = mpi::initialize().unwrap();
    let world = universe.world();

    // Get the rank and size of the communicator
    let size = world.size();
    let rank = world.rank();

    // Define the processor that sends to this rank and  that receives from this rank:
    let prev_rank = (rank + size - 1) % size;
    let next_rank = (rank + 1) % size;

    // Init local replica of the count
    let mut local_counter = GCounter::new();

    println!(
        "Prev rank: {}, This rank: {}, Next rank: {}",
        prev_rank, rank, next_rank
    );
    for _n in 0..NJOBS {
        // simulate a long or short process between incrementing the count:
        if rng.gen_bool(P) {
             thread::sleep(dur);
             // println!("Running a long job on processor {}", rank);
        }

        // Declare intent to increment by creating an `Op`. Use rank as the uid of the replica
        let inc_op = local_counter.inc(rank);

        // Apply the operation to our local counter first and then send the `Op` off to others
        local_counter.apply(inc_op.clone()); // Apply the increment:

        // Serialize the local operation for sending:
        let send_op: Vec<u8> = bincode::serialize(&inc_op).unwrap();

        // For getting the most up-to-date count of the remote Dot
        let mut remote_op = vec![0u8; 24]; // the buffer must provide a size

        // Simple point-to-point messaging in a ring configuration:
        mpi::request::scope(|scope| {
            let _rreq = WaitGuard::from(
                world
                    .process_at_rank(prev_rank)
                    .immediate_receive_into(scope, &mut remote_op),
            );
            let _sreq = WaitGuard::from(
                world
                    .process_at_rank(next_rank)
                    .immediate_send(scope, &send_op),
            );
        });

        // Deserialize the remote_op:
        let recv_op: Dot<mpi::Rank> = bincode::deserialize(&remote_op).unwrap();

        // Apply the change to the local counter:
        local_counter.apply(recv_op);
    }
    println!(
        "Processor {} says the count = {:?}",
        rank,
        local_counter.read()
    );
}

DaveLanday avatar Apr 15 '24 19:04 DaveLanday