MPIX Stream Restrictions
Hi,
I wanted to clarify what kinds of operations are allowed on MPIX Streams, or would like some guidance on what documentation to refer to on MPIX Streams. I have some code that continues to hang even when running on 1 thread and it's unclear why that's the case and I think I'm missing something and am operating on MPIX Streams in ways that are unintended.
What sorts of restrictions do MPIX Streams have? For example, even on 1 thread, can I perform multiple MPIX Stream isend/irecvs on the same stream index, for example launching multiple async isends on the same source stream index 0 (potentially different dest stream indices, or the same dest stream index but to different destination process numbers), or launching multiple async irecvs on the same dest stream index 0? Do I need to call progress on both the sender and receiver side, or is calling MPI_Wait on only the receiver side sufficient to ensure progress?
I understand that multiple threads cannot access the same stream index in parallel, but what other restrictions are there, as the code I have hangs when using MPIX Streams, even on 1 thread per process, whereas using regular MPI isend/irecvs works just fine.
The structure of my code looks like:
for (i = 0; i < 4; i++) { MPI_Wait on requests for iter i (no waiting for iter 0) MPIX_Stream_isend for iters i + 1, ... , 3; launch MPIX_Stream_irecv for iter i + 1; }
This happens when using the mpich 4.3.0 using ofi. When using UCX, it says that there are too many file descriptors being used. I am running my application on 64 MPI processes across 8 machines, so 8 processes per node. Each process reserves ~20 VCIs to use for ~20 MPIX Streams.
Thanks
I understand that multiple threads cannot access the same stream index in parallel,
That is the only restriction to use MPIX stream. However, you need to understand how MPI progresses as well because some of the dead lock is due to lack of progress.
When you call MPI_Wait(req); when req is on stream i, it only progresses stream i. So if somehow the remote process that req is waiting on is dependent on another message on another Isend or Irecv that you issued on another stream, say j, then you may have a dead lock situation due to not making progress on stream j.
If you have a short reproducer to upload, we can take a look.
Does that mean you can call multiple asynchronous operations on the same stream like multiple sends or multiple receives before the prior calls finish? If so, how does MPI know which actual communication needs to be progressed if I call MPIX_Stream_progress(stream_idx).
Also, if I call MPI_Waitany on an array of requests which correspond with communications happening on different streams, what happens in that case?
MPI_Wait progresses the stream that the request belongs. If you wait(all/any/some) on multiple requests covers multiple streams, MPI progresses all those streams.
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <sstream>
#include <mpi.h>
#include <cassert>
#include <utility>
#include <map>
constexpr static int NUM_ELEMENTS = 512;
constexpr static int NUM_LEVELS = 4;
constexpr static int NUM_STREAMS = 8;
constexpr bool USE_STREAMS = true;
MPI_Comm stream_comm;
std::vector<double> send_messages;
std::vector<double> recv_messages;
void parse_neighbor_file(std::string filename, std::vector<std::vector<int>>& send_neighbors, std::vector<std::vector<int>>& recv_neighbors) {
std::ifstream file(filename);
if (!file.is_open()) {
std::cerr << "Could not open the file!" << std::endl;
return;
}
std::string line;
while (std::getline(file, line)) {
std::stringstream ss(line);
std::string cell;
std::vector<int> row;
while (std::getline(ss, cell, ',')) {
row.push_back(std::stoi(cell));
}
row.pop_back();
if (row.size() > 0) {
int elem_number = row[0];
for (int i = 1; i < row.size() - 1; i++) {
send_neighbors[elem_number].push_back(row[i]);
recv_neighbors[row[i]].push_back(elem_number);
}
}
}
file.close();
}
void init_elements_at_level(std::vector<int>* elements_at_level) {
for (int i = 0; i < 64; i++) {
elements_at_level[0].push_back(i);
}
for (int i = 64; i < 256; i++) {
elements_at_level[1].push_back(i);
}
for (int i = 256; i < 448; i++) {
elements_at_level[2].push_back(i);
}
for (int i = 448; i < 512; i++) {
elements_at_level[3].push_back(i);
}
}
int get_mpi_tag(int dst, int src) {
return (dst << 14) | src;
}
void async_receive(int world_size, int rank, std::vector<MPI_Request>& recv_requests, std::map<std::pair<int, int>, int> elem_pair_to_recv_idx, const std::vector<int>& elem_to_stream_mapping) {
recv_requests.resize(elem_pair_to_recv_idx.size());
for (auto& [elem_pair, recv_idx] : elem_pair_to_recv_idx) {
int src = elem_pair.first;
int dst = elem_pair.second;
// int send_stream_idx = 0;
// int recv_stream_idx = 0;
int send_stream_idx = elem_to_stream_mapping[src];
int recv_stream_idx = elem_to_stream_mapping[dst];
int mpi_tag = get_mpi_tag(dst, src);
if (USE_STREAMS) {
MPIX_Stream_irecv(&recv_messages[recv_idx], 10000, MPI_DOUBLE,
src % world_size, mpi_tag,
stream_comm, send_stream_idx, recv_stream_idx,
&recv_requests[recv_idx]);
} else {
MPI_Irecv(&recv_messages[recv_idx], 1, MPI_DOUBLE,
src % world_size, mpi_tag,
MPI_COMM_WORLD, &recv_requests[recv_idx]);
}
}
}
void main_loop(int world_size, int rank, std::vector<int>* elements_at_level,
const std::vector<std::vector<int>>& send_neighbors,
const std::vector<std::vector<int>>& recv_neighbors,
std::map<std::pair<int, int>, int>* elem_pair_to_recv_idx,
const std::vector<int>& elem_to_stream_mapping) {
std::vector<std::vector<MPI_Request>> send_requests(NUM_ELEMENTS);
std::vector<MPI_Request> recv_requests[NUM_LEVELS];
int send_idx = 0;
for (int level = 0; level < NUM_LEVELS; level++) {
// Call async receives for elements in level + 1
if (level < NUM_LEVELS - 1) {
async_receive(world_size, rank, recv_requests[level + 1], elem_pair_to_recv_idx[level + 1], elem_to_stream_mapping);
}
int num_wait = 0;
while (num_wait < elem_pair_to_recv_idx[level].size()) {
int idx;
MPI_Waitany(recv_requests[level].size(), recv_requests[level].data(), &idx, MPI_STATUSES_IGNORE);
num_wait++;
}
for (int j = 0; j < elements_at_level[level].size(); j++) {
int elem = elements_at_level[level][j];
if (elem % world_size != rank) {
continue;
}
// Send data from elem to neighbors
for (int neigh : send_neighbors[elem]) {
if (neigh % world_size == rank) {
continue;
}
int mpi_tag = get_mpi_tag(neigh, elem);
// int send_stream_idx = 0;
// int recv_stream_idx = 0;
int send_stream_idx = elem_to_stream_mapping[elem];
int recv_stream_idx = elem_to_stream_mapping[neigh];
send_requests[elem].emplace_back();
if (USE_STREAMS) {
MPIX_Stream_isend(&send_messages[send_idx++], 10000, MPI_DOUBLE,
neigh % world_size, mpi_tag,
stream_comm, send_stream_idx, recv_stream_idx,
&send_requests[elem][send_requests[elem].size() - 1]);
} else {
MPI_Isend(&send_messages[send_idx++], 1, MPI_DOUBLE,
neigh % world_size, mpi_tag,
MPI_COMM_WORLD, &send_requests[elem][send_requests[elem].size() - 1]);
}
}
}
}
for (int level = 0; level < NUM_LEVELS; level++) {
for (int j = 0; j < elements_at_level[level].size(); j++) {
int elem = elements_at_level[level][j];
if (elem % world_size != rank) {
continue;
}
MPI_Waitall(send_requests[elem].size(), send_requests[elem].data(), MPI_STATUSES_IGNORE);
}
}
}
int main(int argc, char **argv) {
int provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
assert(provided >= MPI_THREAD_MULTIPLE);
std::string filename = "neigh.txt";
std::vector<std::vector<int>> send_neighbors(NUM_ELEMENTS);
std::vector<std::vector<int>> recv_neighbors(NUM_ELEMENTS);
parse_neighbor_file(filename, send_neighbors, recv_neighbors);
std::vector<int> elements_at_level[NUM_LEVELS];
init_elements_at_level(elements_at_level);
MPIX_Stream all_streams[NUM_STREAMS];
for (int i = 0; i < NUM_STREAMS; i++) {
MPIX_Stream_create(MPI_INFO_NULL, &all_streams[i]);
}
auto res = MPIX_Stream_comm_create_multiplex(MPI_COMM_WORLD, NUM_STREAMS, all_streams, &stream_comm);
assert(res == MPI_SUCCESS);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
send_messages.resize(100000);
recv_messages.resize(100000);
std::map<std::pair<int, int>, int> elem_pair_to_recv_idx[NUM_LEVELS];
// std::map<int, std::pair<int, int>> recv_idx_to_zoid_pair[NUM_LEVELS];
std::vector<MPI_Request> recv_requests[NUM_LEVELS];
for (int level = 0; level < NUM_LEVELS; level++) {
int recv_idx = 0;
for (int j = 0; j < elements_at_level[level].size(); j++) {
int elem = elements_at_level[level][j];
if (elem % world_size != rank) {
continue;
}
auto& neighbors = recv_neighbors[elem];
for (int neigh : neighbors) {
if (neigh % world_size == rank) {
continue;
}
// recv_idx_to_elem_pair[level][recv_idx] = {neigh, elem};
elem_pair_to_recv_idx[level][{neigh, elem}] = recv_idx;
recv_idx++;
}
}
recv_requests[level].resize(recv_idx, MPI_REQUEST_NULL);
}
std::vector<int> elem_to_stream_mapping(NUM_ELEMENTS, 0);
for (int level = 0; level < NUM_LEVELS; level++) {
int stream_idx = 0;
for (int j = 0; j < elements_at_level[level].size(); j++) {
int elem = elements_at_level[level][j];
if (elem % world_size != rank) {
continue;
}
elem_to_stream_mapping[elem] = stream_idx++;
}
}
MPI_Allreduce(MPI_IN_PLACE, elem_to_stream_mapping.data(), NUM_ELEMENTS, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
main_loop(world_size, rank, elements_at_level, send_neighbors, recv_neighbors, elem_pair_to_recv_idx, elem_to_stream_mapping);
MPI_Barrier(MPI_COMM_WORLD);
std::cout << "rank: " << rank << " done." << std::endl;
MPI_Comm_free(&stream_comm);
for (int i = 0; i < NUM_STREAMS; i++) {
MPIX_Stream_free(&all_streams[i]);
}
return 0;
}
Above is the code snippet and I attached the file neigh.txt to define the sender/receivers for each element.
Now, on 1 worker, I still get an error that I mentioned above, namely:
Assertion failed in file ./src/mpid/ch4/shm/src/../posix/posix_progress.h at line 71: MPIDI_POSIX_global. per_vci[vci].active_rreq[transaction.src_local_rank] == NULL. I assume this is because I am sending/receiving asynchronously on the same stream multiple times before prior sends/receives finish. If I change the send to send just 1 element instead of 10000 elements, actually the code works just fine.
The command to compile is:
mpicxx -O3 test_mpi.cpp and the command to run is:
mpirun -np 32 -env MPIR_CVAR_CH4_RESERVE_VCIS 8 -env MPIR_CVAR_ASYNC_PROGRESS 1 -env MPIR_CVAR_CH4_MT_MODEL lockless -env MPIR_CVAR_CH4_GLOBAL_PROGRESS 0 ./a.out
So changing the code such that the behavior above doesn't occur, by slightly modifying how stream indices are assigned to elements. In this code, I changed how elem_to_stream_mapping is constructed. Here, the code just hangs.
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <sstream>
#include <mpi.h>
#include <cassert>
#include <utility>
#include <map>
constexpr static int NUM_ELEMENTS = 512;
constexpr static int NUM_LEVELS = 4;
constexpr static int NUM_STREAMS = 8;
constexpr bool USE_STREAMS = true;
MPI_Comm stream_comm;
std::vector<double> send_messages;
std::vector<double> recv_messages;
void parse_neighbor_file(std::string filename, std::vector<std::vector<int>>& send_neighbors, std::vector<std::vector<int>>& recv_neighbors) {
std::ifstream file(filename);
if (!file.is_open()) {
std::cerr << "Could not open the file!" << std::endl;
return;
}
std::string line;
while (std::getline(file, line)) {
std::stringstream ss(line);
std::string cell;
std::vector<int> row;
while (std::getline(ss, cell, ',')) {
row.push_back(std::stoi(cell));
}
row.pop_back();
if (row.size() > 0) {
int elem_number = row[0];
for (int i = 1; i < row.size() - 1; i++) {
send_neighbors[elem_number].push_back(row[i]);
recv_neighbors[row[i]].push_back(elem_number);
}
}
}
file.close();
}
void init_elements_at_level(std::vector<int>* elements_at_level) {
for (int i = 0; i < 64; i++) {
elements_at_level[0].push_back(i);
}
for (int i = 64; i < 256; i++) {
elements_at_level[1].push_back(i);
}
for (int i = 256; i < 448; i++) {
elements_at_level[2].push_back(i);
}
for (int i = 448; i < 512; i++) {
elements_at_level[3].push_back(i);
}
}
int get_mpi_tag(int dst, int src) {
return (dst << 14) | src;
}
void async_receive(int world_size, int rank, std::vector<MPI_Request>& recv_requests, std::map<std::pair<int, int>, int> elem_pair_to_recv_idx, const std::vector<int>& elem_to_stream_mapping) {
recv_requests.resize(elem_pair_to_recv_idx.size());
for (auto& [elem_pair, recv_idx] : elem_pair_to_recv_idx) {
int src = elem_pair.first;
int dst = elem_pair.second;
// int send_stream_idx = 0;
// int recv_stream_idx = 0;
int send_stream_idx = elem_to_stream_mapping[src];
int recv_stream_idx = elem_to_stream_mapping[dst];
int mpi_tag = get_mpi_tag(dst, src);
if (USE_STREAMS) {
MPIX_Stream_irecv(&recv_messages[recv_idx], 10000, MPI_DOUBLE,
src % world_size, mpi_tag,
stream_comm, send_stream_idx, recv_stream_idx,
&recv_requests[recv_idx]);
} else {
MPI_Irecv(&recv_messages[recv_idx], 1, MPI_DOUBLE,
src % world_size, mpi_tag,
MPI_COMM_WORLD, &recv_requests[recv_idx]);
}
}
}
void main_loop(int world_size, int rank, std::vector<int>* elements_at_level,
const std::vector<std::vector<int>>& send_neighbors,
const std::vector<std::vector<int>>& recv_neighbors,
std::map<std::pair<int, int>, int>* elem_pair_to_recv_idx,
const std::vector<int>& elem_to_stream_mapping) {
std::vector<std::vector<MPI_Request>> send_requests(NUM_ELEMENTS);
std::vector<MPI_Request> recv_requests[NUM_LEVELS];
int send_idx = 0;
for (int level = 0; level < NUM_LEVELS; level++) {
// Call async receives for elements in level + 1
if (level < NUM_LEVELS - 1) {
async_receive(world_size, rank, recv_requests[level + 1], elem_pair_to_recv_idx[level + 1], elem_to_stream_mapping);
}
int num_wait = 0;
while (num_wait < elem_pair_to_recv_idx[level].size()) {
int idx;
MPI_Waitany(recv_requests[level].size(), recv_requests[level].data(), &idx, MPI_STATUSES_IGNORE);
num_wait++;
}
for (int j = 0; j < elements_at_level[level].size(); j++) {
int elem = elements_at_level[level][j];
if (elem % world_size != rank) {
continue;
}
// Send data from elem to neighbors
for (int neigh : send_neighbors[elem]) {
if (neigh % world_size == rank) {
continue;
}
int mpi_tag = get_mpi_tag(neigh, elem);
// int send_stream_idx = 0;
// int recv_stream_idx = 0;
int send_stream_idx = elem_to_stream_mapping[elem];
int recv_stream_idx = elem_to_stream_mapping[neigh];
send_requests[elem].emplace_back();
if (USE_STREAMS) {
MPIX_Stream_isend(&send_messages[send_idx++], 10000, MPI_DOUBLE,
neigh % world_size, mpi_tag,
stream_comm, send_stream_idx, recv_stream_idx,
&send_requests[elem][send_requests[elem].size() - 1]);
} else {
MPI_Isend(&send_messages[send_idx++], 1, MPI_DOUBLE,
neigh % world_size, mpi_tag,
MPI_COMM_WORLD, &send_requests[elem][send_requests[elem].size() - 1]);
}
}
}
}
for (int level = 0; level < NUM_LEVELS; level++) {
for (int j = 0; j < elements_at_level[level].size(); j++) {
int elem = elements_at_level[level][j];
if (elem % world_size != rank) {
continue;
}
MPI_Waitall(send_requests[elem].size(), send_requests[elem].data(), MPI_STATUSES_IGNORE);
}
}
}
int main(int argc, char **argv) {
int provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
assert(provided >= MPI_THREAD_MULTIPLE);
std::string filename = "neigh.txt";
std::vector<std::vector<int>> send_neighbors(NUM_ELEMENTS);
std::vector<std::vector<int>> recv_neighbors(NUM_ELEMENTS);
parse_neighbor_file(filename, send_neighbors, recv_neighbors);
std::vector<int> elements_at_level[NUM_LEVELS];
init_elements_at_level(elements_at_level);
MPIX_Stream all_streams[NUM_STREAMS];
for (int i = 0; i < NUM_STREAMS; i++) {
MPIX_Stream_create(MPI_INFO_NULL, &all_streams[i]);
}
auto res = MPIX_Stream_comm_create_multiplex(MPI_COMM_WORLD, NUM_STREAMS, all_streams, &stream_comm);
assert(res == MPI_SUCCESS);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
send_messages.resize(100000);
recv_messages.resize(100000);
std::map<std::pair<int, int>, int> elem_pair_to_recv_idx[NUM_LEVELS];
// std::map<int, std::pair<int, int>> recv_idx_to_zoid_pair[NUM_LEVELS];
std::vector<MPI_Request> recv_requests[NUM_LEVELS];
for (int level = 0; level < NUM_LEVELS; level++) {
int recv_idx = 0;
for (int j = 0; j < elements_at_level[level].size(); j++) {
int elem = elements_at_level[level][j];
if (elem % world_size != rank) {
continue;
}
auto& neighbors = recv_neighbors[elem];
for (int neigh : neighbors) {
if (neigh % world_size == rank) {
continue;
}
// recv_idx_to_elem_pair[level][recv_idx] = {neigh, elem};
elem_pair_to_recv_idx[level][{neigh, elem}] = recv_idx;
recv_idx++;
}
}
recv_requests[level].resize(recv_idx, MPI_REQUEST_NULL);
}
std::vector<int> elem_to_stream_mapping(NUM_ELEMENTS, 0);
int stream_idx = 0;
for (int level = 0; level < NUM_LEVELS; level++) {
for (int j = 0; j < elements_at_level[level].size(); j++) {
int elem = elements_at_level[level][j];
if (elem % world_size != rank) {
continue;
}
elem_to_stream_mapping[elem] = stream_idx;
stream_idx = (stream_idx + 1) % NUM_STREAMS;
}
}
MPI_Allreduce(MPI_IN_PLACE, elem_to_stream_mapping.data(), NUM_ELEMENTS, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
main_loop(world_size, rank, elements_at_level, send_neighbors, recv_neighbors, elem_pair_to_recv_idx, elem_to_stream_mapping);
MPI_Barrier(MPI_COMM_WORLD);
std::cout << "rank: " << rank << " done." << std::endl;
MPI_Comm_free(&stream_comm);
for (int i = 0; i < NUM_STREAMS; i++) {
MPIX_Stream_free(&all_streams[i]);
}
return 0;
}
If the code deadlocks here, why wouldn't it deadlock when not using streams?
Assigning a distinct stream index per element also does not help with the deadlock/hanging.
I also tried assigning each elem pair that communicates with each other to a unique stream index to avoid any sort of conflicts, and that seems to hang as well.
Do you somehow need to progress the stream on both the sender and receiver end? Currently the code is only progressing the streams on the receiver's end through MPI_Waitany.