data icon indicating copy to clipboard operation
data copied to clipboard

`FullSync` gives distributed not initialized error for old `DataLoader`

Open rxqy opened this issue 2 years ago • 5 comments

🐛 Describe the bug

Hi, here we are training in webdataset format with torchdata. Everything else works fine without fullsync, just the training hangs at the last iteration. So I tried to apply fullsync to my datapipe, then it would give the following error:

RuntimeError: Caught RuntimeError in DataLoader worker process 0.
  File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 173, in wrap_generator
    response = gen.send(None)
  File "/root/miniconda3/lib/python3.8/site-packages/torchdata/datapipes/iter/util/prefetch.py", line 173, in __iter__
    raise RuntimeError("Torch Distributed is required to be initialized")
RuntimeError: Torch Distributed is required to be initialized
This exception is thrown by __iter__ of FullSyncIterDataPipe()

My training scirpt:

def build_torchdata_loader(rootdir):
    import torch.distributed as ddist
    # both true
    print("dist: ", ddist.is_available(), ddist.is_initialized())
    assert ddist.is_initialized()

    tar_files = list(glob.glob('{}/*.tar'.format(rootdir)))

    train_dp = dp.iter.FileLister(tar_files).shuffle().sharding_filter()
    
    # the last iter hangs
    # train_dp = dp.iter.FileOpener(train_dp, mode="rb").load_from_tar().webdataset()
    # dist not init error
    train_dp = dp.iter.FileOpener(train_dp, mode="rb").load_from_tar().webdataset().fullsync()

    train_dp = train_dp.shuffle(buffer_size=128)
    train_dp = train_dp.map(postprocess_func)

    train_loader = DataLoader(train_dp, batch_size=128, num_workers=8, pin_memory=True)
    return train_loader

Also, I'm not sure if this is the correct way to use torchdata with dataLoader and ddp. I only managed to find the colab tutorial with dataloader2 ddp. I'm wondering if it is possible to also provide one for the original dataloader.

Many thanks in advance!

Versions

Versions of relevant libraries: [pip3] numpy==1.23.5 [pip3] torch==1.13.1 [pip3] torchdata==0.5.1 [pip3] torchvision==0.14.1

rxqy avatar Apr 04 '23 10:04 rxqy

It seems like there is an issue with import. If you run this, do you see an error?

from torchdata.datapipes.iter import IterableWrapper


dp = IterableWrapper(list(range(5)))
dp2 = dp.fullsync()

NivekT avatar Apr 04 '23 18:04 NivekT

Hi @NivekT , the above scipt works fine. And I did a little digging.

This works and gives the correct results with

python -m torch.distributed.launch --nproc_per_node=2 --master_port 29100 fullsync.py
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torchdata.datapipes.iter import IterableWrapper
from torch.utils.data import DataLoader

if __name__ == '__main__':
    dist.init_process_group(backend="nccl", init_method="env://")
    rank, world_size = dist.get_rank(), dist.get_world_size()

    dp = IterableWrapper(list(range(5))).sharding_filter()
    torch.utils.data.graph_settings.apply_sharding(dp, world_size, rank)
    dp = dp.fullsync()

    for _, x in enumerate(dp):
        print("rank={}, world_size={}, x={}".format(rank, world_size, x))

But this doesn't and gives the following error.

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torchdata.datapipes.iter import IterableWrapper
from torch.utils.data import DataLoader

if __name__ == '__main__':
    dist.init_process_group(backend="nccl", init_method="env://")
    rank, world_size = dist.get_rank(), dist.get_world_size()

    dp = IterableWrapper(list(range(5))).sharding_filter()
    torch.utils.data.graph_settings.apply_sharding(dp, world_size, rank)
    dp = dp.fullsync()

    dl = DataLoader(dp, batch_size=2, num_workers=2, pin_memory=True)
    for _, x in enumerate(dl):
        print("rank={}, world_size={}, x={}".format(rank, world_size, x))
WARNING:torch.distributed.run:
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
Traceback (most recent call last):
  File "fullsync.py", line 20, in <module>
    for _, x in enumerate(dl):
  File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 628, in __next__
    data = self._next_data()
  File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 1333, in _next_data
    return self._process_data(data)
  File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 1359, in _process_data
    data.reraise()
  File "/root/miniconda3/lib/python3.8/site-packages/torch/_utils.py", line 543, in reraise
    raise exception
MemoryError: Caught MemoryError in DataLoader worker process 0.
Original Traceback (most recent call last):
  File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/_utils/worker.py", line 302, in _worker_loop
    data = fetcher.fetch(index)
  File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/_utils/fetch.py", line 34, in fetch
    data.append(next(self.dataset_iter))
  File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 173, in wrap_generator
    response = gen.send(None)
  File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/datapipes/datapipe.py", line 351, in __iter__
    yield from self._datapipe
  File "/root/miniconda3/lib/python3.8/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 173, in wrap_generator
    response = gen.send(None)
  File "/root/miniconda3/lib/python3.8/site-packages/torchdata/datapipes/iter/util/prefetch.py", line 174, in __iter__
    self._process_group = dist.new_group(backend="gloo")
  File "/root/miniconda3/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 3335, in new_group
    pg = _new_process_group_helper(
  File "/root/miniconda3/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 862, in _new_process_group_helper
    pg = ProcessGroupGloo(prefix_store, group_rank, group_size, timeout=timeout)
MemoryError: std::bad_alloc
This exception is thrown by __iter__ of FullSyncIterDataPipe()

rxqy avatar Apr 06 '23 02:04 rxqy

Switch to dataloader2 and it works fine. Closing this issue.

rxqy avatar Apr 10 '23 02:04 rxqy

Let's keep this open as we should still take a look into old DataLoader to either support or explicitly disallow this use case.

NivekT avatar Apr 10 '23 18:04 NivekT

The problem is fullsync is sent to worker process in DataLoader and worker processes are not aware of distributed processes. But, we keep it in the main process at the end when DataLoader2 is used.

We can keep this issue open but I am reluctant to fix it for DataLoader as it makes DataLoader even more complicated

ejguan avatar Apr 17 '23 16:04 ejguan