`FullSync` gives distributed not initialized error for old `DataLoader`
🐛 Describe the bug
Hi, here we are training in webdataset format with torchdata. Everything else works fine without fullsync, just the training hangs at the last iteration. So I tried to apply fullsync to my datapipe, then it would give the following error:
RuntimeError: Caught RuntimeError in DataLoader worker process 0.
File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 173, in wrap_generator
response = gen.send(None)
File "/root/miniconda3/lib/python3.8/site-packages/torchdata/datapipes/iter/util/prefetch.py", line 173, in __iter__
raise RuntimeError("Torch Distributed is required to be initialized")
RuntimeError: Torch Distributed is required to be initialized
This exception is thrown by __iter__ of FullSyncIterDataPipe()
My training scirpt:
def build_torchdata_loader(rootdir):
import torch.distributed as ddist
# both true
print("dist: ", ddist.is_available(), ddist.is_initialized())
assert ddist.is_initialized()
tar_files = list(glob.glob('{}/*.tar'.format(rootdir)))
train_dp = dp.iter.FileLister(tar_files).shuffle().sharding_filter()
# the last iter hangs
# train_dp = dp.iter.FileOpener(train_dp, mode="rb").load_from_tar().webdataset()
# dist not init error
train_dp = dp.iter.FileOpener(train_dp, mode="rb").load_from_tar().webdataset().fullsync()
train_dp = train_dp.shuffle(buffer_size=128)
train_dp = train_dp.map(postprocess_func)
train_loader = DataLoader(train_dp, batch_size=128, num_workers=8, pin_memory=True)
return train_loader
Also, I'm not sure if this is the correct way to use torchdata with dataLoader and ddp. I only managed to find the colab tutorial with dataloader2 ddp. I'm wondering if it is possible to also provide one for the original dataloader.
Many thanks in advance!
Versions
Versions of relevant libraries: [pip3] numpy==1.23.5 [pip3] torch==1.13.1 [pip3] torchdata==0.5.1 [pip3] torchvision==0.14.1
It seems like there is an issue with import. If you run this, do you see an error?
from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(list(range(5)))
dp2 = dp.fullsync()
Hi @NivekT , the above scipt works fine. And I did a little digging.
This works and gives the correct results with
python -m torch.distributed.launch --nproc_per_node=2 --master_port 29100 fullsync.py
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torchdata.datapipes.iter import IterableWrapper
from torch.utils.data import DataLoader
if __name__ == '__main__':
dist.init_process_group(backend="nccl", init_method="env://")
rank, world_size = dist.get_rank(), dist.get_world_size()
dp = IterableWrapper(list(range(5))).sharding_filter()
torch.utils.data.graph_settings.apply_sharding(dp, world_size, rank)
dp = dp.fullsync()
for _, x in enumerate(dp):
print("rank={}, world_size={}, x={}".format(rank, world_size, x))
But this doesn't and gives the following error.
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torchdata.datapipes.iter import IterableWrapper
from torch.utils.data import DataLoader
if __name__ == '__main__':
dist.init_process_group(backend="nccl", init_method="env://")
rank, world_size = dist.get_rank(), dist.get_world_size()
dp = IterableWrapper(list(range(5))).sharding_filter()
torch.utils.data.graph_settings.apply_sharding(dp, world_size, rank)
dp = dp.fullsync()
dl = DataLoader(dp, batch_size=2, num_workers=2, pin_memory=True)
for _, x in enumerate(dl):
print("rank={}, world_size={}, x={}".format(rank, world_size, x))
WARNING:torch.distributed.run:
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
*****************************************
Traceback (most recent call last):
File "fullsync.py", line 20, in <module>
for _, x in enumerate(dl):
File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 628, in __next__
data = self._next_data()
File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 1333, in _next_data
return self._process_data(data)
File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 1359, in _process_data
data.reraise()
File "/root/miniconda3/lib/python3.8/site-packages/torch/_utils.py", line 543, in reraise
raise exception
MemoryError: Caught MemoryError in DataLoader worker process 0.
Original Traceback (most recent call last):
File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/_utils/worker.py", line 302, in _worker_loop
data = fetcher.fetch(index)
File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/_utils/fetch.py", line 34, in fetch
data.append(next(self.dataset_iter))
File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 173, in wrap_generator
response = gen.send(None)
File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/datapipes/datapipe.py", line 351, in __iter__
yield from self._datapipe
File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 173, in wrap_generator
response = gen.send(None)
File "/root/miniconda3/lib/python3.8/site-packages/torchdata/datapipes/iter/util/prefetch.py", line 174, in __iter__
self._process_group = dist.new_group(backend="gloo")
File "/root/miniconda3/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 3335, in new_group
pg = _new_process_group_helper(
File "/root/miniconda3/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 862, in _new_process_group_helper
pg = ProcessGroupGloo(prefix_store, group_rank, group_size, timeout=timeout)
MemoryError: std::bad_alloc
This exception is thrown by __iter__ of FullSyncIterDataPipe()
Switch to dataloader2 and it works fine. Closing this issue.
Let's keep this open as we should still take a look into old DataLoader to either support or explicitly disallow this use case.
The problem is fullsync is sent to worker process in DataLoader and worker processes are not aware of distributed processes. But, we keep it in the main process at the end when DataLoader2 is used.
We can keep this issue open but I am reluctant to fix it for DataLoader as it makes DataLoader even more complicated