dist_tuto.pth
dist_tuto.pth copied to clipboard
Using MPI backend with a P2P process communication time exception!
My Code:
"""run.py:"""
#!/usr/bin/env python
import os
import sys
import torch
import torch.distributed as dist
import time
from torch.multiprocessing import Process
# """Blocking point-to-point communication."""
#
def run(rank, size):
# tensor = torch.rand(50,100)
tensor = torch.rand(50,80)
processnumber = 64
resultlist = [[0 for i in range(2)]for j in range(64)]
tensorsize =sys.getsizeof(tensor.storage())
traintimes = 3
dsetination = 63
for dsetination in range(1, processnumber):
avsendtime = 0
avrectime = 0
for j in range(traintimes):
if dist.get_rank() == 0:
# tensor += 1
# # Send the tensor to process 1
beginsend = time.time()
dist.send(tensor=tensor, dst=dsetination)
endsend = time.time()
# if dsetination == processnumber -1:
# print("sendtimes is:", j, "spend time is:", endsend - beginsend,"begin time is:",beginsend,"end time is:",endsend)
if j != 0:
avsendtime += endsend - beginsend
elif dist.get_rank() == dsetination:
# Receive tensor from process 0
beginrec = time.time()
dist.recv(tensor=tensor, src=0)
endrec = time.time()
if j != 0:
avrectime += endrec - beginrec
if dist.get_rank() == processnumber - 1:
print("receivetimes is:", j, "spend time is:", endrec - beginrec, "beginre time is:", beginrec,
"endre time is:", endrec)
if dist.get_rank() == 0:
resultlist[dsetination][0] = avsendtime/(traintimes-1)
rtime = avsendtime/(traintimes-1)
print("send to:", dsetination, "time is:",rtime,"speed is:",(tensorsize/rtime)/(2**30),"GB/S")
if dist.get_rank() == dsetination:
resultlist[dsetination][1] = avrectime/(traintimes-1)
rtime = avrectime/(traintimes-1)
print("rank is:", dsetination, "rectime is:",rtime,"speed is:",(tensorsize/rtime)/(2**30),"GB/S")
# print(resultlist)
# print('Rank ', rank, ' has data ', tensor)
# mylist = tensor.numpy().tolist()
# print('Rank ', dist.get_rank(), ' has data ',mylist)
# """ All-Reduce example."""
# def run(rank, size):
# """ Simple point-to-point communication. """
# group = dist.new_group([0, 1])
# tensor = torch.ones(1)
# dist.all_reduce(tensor, op=dist.reduce_op.SUM, group=group)
# print('Rank ', rank, ' has data ', tensor[0])
def init_process(rank, size, fn, backend='mpi'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
if __name__ == "__main__":
init_process(0, 0, run, backend='mpi')
I send a tensor from rank0 to rank1...rank63, each rank send 1000times, I got an unacceptable result during the last test of rank63, and I can't understand why it would be so long?
result:
receivetimes is: 991 spend time is: 5.1021575927734375e-05 beginre time is: 1608996863.2943463 endre time is: 1608996863.2943974 receivetimes is: 992 spend time is: 5.078315734863281e-05 beginre time is: 1608996863.294423 endre time is: 1608996863.294474 receivetimes is: 993 spend time is: 5.0067901611328125e-05 beginre time is: 1608996863.2944958 endre time is: 1608996863.294546 receivetimes is: 994 spend time is: 5.078315734863281e-05 beginre time is: 1608996863.2945678 endre time is: 1608996863.2946186 receivetimes is: 995 spend time is: 5.1021575927734375e-05 beginre time is: 1608996863.2946415 endre time is: 1608996863.2946925 receivetimes is: 996 spend time is: 4.982948303222656e-05 beginre time is: 1608996863.2947164 endre time is: 1608996863.2947662 receivetimes is: 997 spend time is: 4.9591064453125e-05 beginre time is: 1608996863.2947881 endre time is: 1608996863.2948377 receivetimes is: 998 spend time is: 5.125999450683594e-05 beginre time is: 1608996863.2948594 endre time is: 1608996863.2949107 receivetimes is: 999 spend time is: 0.12546324729919434 beginre time is: 1608996863.2949336 endre time is: 1608996863.4203968