DeepSpeed icon indicating copy to clipboard operation
DeepSpeed copied to clipboard

[BUG] Pipeline Dataloader Sampler: `shuffle=False`

Open Coobiw opened this issue 1 year ago • 10 comments

Describe the bug When I read the source code of building the dataloader in PipelineEngine. I find shuffle=False in the sampler. Code:

sampler = torch.utils.data.distributed.DistributedSampler(dataset,
                                                                  num_replicas=self.dp_world_size,
                                                                  rank=self.mpu.get_data_parallel_rank(),
                                                                  shuffle=False)

I want to know why you set shuffle to False, not True. The code is in deepspeed/runtime/pipe/engine.py, Pipeline Engine class, def _build_data_iter.

deepspeed version: 0.12.4

To Reproduce Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior A clear and concise description of what you expected to happen.

ds_report output Please run ds_report to give us details about your setup.

Screenshots If applicable, add screenshots to help explain your problem.

System info (please complete the following information):

  • OS: [e.g. Ubuntu 18.04]
  • GPU count and types [e.g. two machines with x8 A100s each]
  • Interconnects (if applicable) [e.g., two machines connected with 100 Gbps IB]
  • Python version
  • Any other relevant info about your setup

Launcher context Are you launching your experiment with the deepspeed launcher, MPI, or something else?

Docker context Are you using a specific docker image that you can share?

Additional context Add any other context about the problem here.

Coobiw avatar Jun 05 '24 11:06 Coobiw

It's unclear to me why this is the default, and confusing that there is no documentation of this behavior. Shuffle should probably be a parameter of the deepspeed config file.

avicooper1 avatar Aug 16 '24 19:08 avicooper1

After struggling with a bug all morning, I believe I now understand why the shuffle must be set to false here.

Let's first take a look at how the engine loads data when pipeline parallelism is enabled:

def _exec_load_micro_batch(self, buffer_id):
    if self.wall_clock_breakdown():
        self.timers(BATCH_INPUT_TIMER).start()

    batch = self._next_batch()

    if self.is_first_stage():
        loaded = None
        if torch.is_tensor(batch[0]):
            loaded = batch[0].clone().to(self.device).detach()
            if self._config.pipeline['activation_checkpoint_interval'] > 0 and self._config.pipeline[
                    'use_reentrant']:
                loaded.requires_grad = loaded.is_floating_point()
        else:
            assert isinstance(batch[0], (tuple, list))
            # Assume list or tuple
            loaded = []
            for x in batch[0]:
                assert torch.is_tensor(x)
                mine = x.clone().detach().to(self.device)
                if self._config.pipeline['activation_checkpoint_interval'] > 0 and self._config.pipeline[
                        'use_reentrant']:
                    mine.requires_grad = mine.is_floating_point()
                loaded.append(mine)
            loaded = tuple(loaded)

        self.pipe_buffers['inputs'][buffer_id] = loaded

    if self.is_last_stage():
        loaded = batch[1]
        if torch.is_tensor(batch[1]):
            loaded = batch[1].to(self.device)
        # XXX: torch 1.6.0 DataLoader will auto convert tuple to list
        elif isinstance(batch[1], (tuple, list)):
            loaded = []
            for x in batch[1]:
                assert torch.is_tensor(x)
                x = [x.to](http://x.to/)(self.device).detach()
                loaded.append(x)
            loaded = tuple(loaded)

        self.pipe_buffers['labels'][buffer_id] = loaded

From the above code snippet, we can observe that both the first and last stages independently fetch data from the DataLoader. The first stage retains the inputs (i.e., batch[0]), while the last stage retains the labels (i.e., batch[1]). If shuffle is set to true, it becomes impossible to ensure that the inputs and labels remain consistent.

P.S. What's the correct behavior if we want to enable data shuffling in DeepSpeed.

xianshunw avatar Sep 06 '24 07:09 xianshunw

@xianshunw No, mismatching wil not appear. The batch is a tuple, batch[0] is the input of the model and batch[1] is the label for computing loss. This is the protocol of DeepSpeed Pipeline Parallel.

Coobiw avatar Sep 08 '24 17:09 Coobiw

@xianshunw @avicooper1 Setting shuffle=True will not cause anything error in my experiment. I am just curious why it is False for default in Pipeline Dataloader Sampler.

Coobiw avatar Sep 08 '24 17:09 Coobiw

As we pass a data parallel rank to the loader, I think shuffling should properly work. Feel free to submit a PR to set shuffle=True as the default.

tohtana avatar Sep 09 '24 21:09 tohtana

@tohtana @Coobiw In addition to Pipeline Dataloader Samler, deepspeed runtime engine dataloader also has similar problems, the code is in https://github.com/microsoft/DeepSpeed/blob/master/deepspeed/runtime/engine.py#L1777, do you think we should submit a PR to set shuffle=True as the default for them?

ranzhejiang avatar Sep 11 '24 03:09 ranzhejiang

I'm not sure why it is also set to False but I agree that shuffle=True is better.

tohtana avatar Sep 13 '24 00:09 tohtana

@xianshunw @avicooper1 Setting shuffle=True will not cause anything error in my experiment. I am just curious why it is False for default in Pipeline Dataloader Sampler.

Apologies, it was my fault. The issue arose when I manually configured the dataloader. It turned out that the problem was due to incorrect initialization of the dataloader

xianshunw avatar Sep 13 '24 02:09 xianshunw

I'm not sure why it is also set to False but I agree that shuffle=True is better.

Making it configurable is better.

xianshunw avatar Sep 13 '24 02:09 xianshunw

@xianshunw Yeah, I've used the following sampler for my custom dataloader:

sampler = torch.utils.data.distributed.DistributedSampler(
                datasets['train'],
                num_replicas=engine.dp_world_size,
                rank=engine.mpu.get_data_parallel_rank(),
                shuffle=True
            )

There is no problem.

Coobiw avatar Sep 13 '24 02:09 Coobiw

Thanks @xianshunw and @Coobiw - we will work on making it configurable, but at least with the current unit tests, the linked PR seems to hang with shuffle=True, so we will need to debug this first as well.

loadams avatar Jan 06 '25 22:01 loadams

Creating #6950 to track adding the config option.

loadams avatar Jan 14 '25 23:01 loadams