[BUG] Pipeline Dataloader Sampler: `shuffle=False`
Describe the bug
When I read the source code of building the dataloader in PipelineEngine. I find shuffle=False in the sampler. Code:
sampler = torch.utils.data.distributed.DistributedSampler(dataset,
num_replicas=self.dp_world_size,
rank=self.mpu.get_data_parallel_rank(),
shuffle=False)
I want to know why you set shuffle to False, not True.
The code is in deepspeed/runtime/pipe/engine.py, Pipeline Engine class, def _build_data_iter.
deepspeed version: 0.12.4
To Reproduce Steps to reproduce the behavior:
- Go to '...'
- Click on '....'
- Scroll down to '....'
- See error
Expected behavior A clear and concise description of what you expected to happen.
ds_report output
Please run ds_report to give us details about your setup.
Screenshots If applicable, add screenshots to help explain your problem.
System info (please complete the following information):
- OS: [e.g. Ubuntu 18.04]
- GPU count and types [e.g. two machines with x8 A100s each]
- Interconnects (if applicable) [e.g., two machines connected with 100 Gbps IB]
- Python version
- Any other relevant info about your setup
Launcher context
Are you launching your experiment with the deepspeed launcher, MPI, or something else?
Docker context Are you using a specific docker image that you can share?
Additional context Add any other context about the problem here.
It's unclear to me why this is the default, and confusing that there is no documentation of this behavior. Shuffle should probably be a parameter of the deepspeed config file.
After struggling with a bug all morning, I believe I now understand why the shuffle must be set to false here.
Let's first take a look at how the engine loads data when pipeline parallelism is enabled:
def _exec_load_micro_batch(self, buffer_id):
if self.wall_clock_breakdown():
self.timers(BATCH_INPUT_TIMER).start()
batch = self._next_batch()
if self.is_first_stage():
loaded = None
if torch.is_tensor(batch[0]):
loaded = batch[0].clone().to(self.device).detach()
if self._config.pipeline['activation_checkpoint_interval'] > 0 and self._config.pipeline[
'use_reentrant']:
loaded.requires_grad = loaded.is_floating_point()
else:
assert isinstance(batch[0], (tuple, list))
# Assume list or tuple
loaded = []
for x in batch[0]:
assert torch.is_tensor(x)
mine = x.clone().detach().to(self.device)
if self._config.pipeline['activation_checkpoint_interval'] > 0 and self._config.pipeline[
'use_reentrant']:
mine.requires_grad = mine.is_floating_point()
loaded.append(mine)
loaded = tuple(loaded)
self.pipe_buffers['inputs'][buffer_id] = loaded
if self.is_last_stage():
loaded = batch[1]
if torch.is_tensor(batch[1]):
loaded = batch[1].to(self.device)
# XXX: torch 1.6.0 DataLoader will auto convert tuple to list
elif isinstance(batch[1], (tuple, list)):
loaded = []
for x in batch[1]:
assert torch.is_tensor(x)
x = [x.to](http://x.to/)(self.device).detach()
loaded.append(x)
loaded = tuple(loaded)
self.pipe_buffers['labels'][buffer_id] = loaded
From the above code snippet, we can observe that both the first and last stages independently fetch data from the DataLoader. The first stage retains the inputs (i.e., batch[0]), while the last stage retains the labels (i.e., batch[1]). If shuffle is set to true, it becomes impossible to ensure that the inputs and labels remain consistent.
P.S. What's the correct behavior if we want to enable data shuffling in DeepSpeed.
@xianshunw No, mismatching wil not appear. The batch is a tuple, batch[0] is the input of the model and batch[1] is the label for computing loss. This is the protocol of DeepSpeed Pipeline Parallel.
@xianshunw @avicooper1 Setting shuffle=True will not cause anything error in my experiment. I am just curious why it is False for default in Pipeline Dataloader Sampler.
As we pass a data parallel rank to the loader, I think shuffling should properly work. Feel free to submit a PR to set shuffle=True as the default.
@tohtana @Coobiw In addition to Pipeline Dataloader Samler, deepspeed runtime engine dataloader also has similar problems, the code is in https://github.com/microsoft/DeepSpeed/blob/master/deepspeed/runtime/engine.py#L1777, do you think we should submit a PR to set shuffle=True as the default for them?
I'm not sure why it is also set to False but I agree that shuffle=True is better.
@xianshunw @avicooper1 Setting
shuffle=Truewill not cause anything error in my experiment. I am just curious why it isFalsefor default in Pipeline Dataloader Sampler.
Apologies, it was my fault. The issue arose when I manually configured the dataloader. It turned out that the problem was due to incorrect initialization of the dataloader
I'm not sure why it is also set to False but I agree that
shuffle=Trueis better.
Making it configurable is better.
@xianshunw Yeah, I've used the following sampler for my custom dataloader:
sampler = torch.utils.data.distributed.DistributedSampler(
datasets['train'],
num_replicas=engine.dp_world_size,
rank=engine.mpu.get_data_parallel_rank(),
shuffle=True
)
There is no problem.
Thanks @xianshunw and @Coobiw - we will work on making it configurable, but at least with the current unit tests, the linked PR seems to hang with shuffle=True, so we will need to debug this first as well.
Creating #6950 to track adding the config option.