streaming icon indicating copy to clipboard operation
streaming copied to clipboard

Cannot Load MDS Dataset

Open naston opened this issue 1 year ago • 41 comments

Issue

I am trying to load an already written .mds version of the C4 dataset. The dataset was written out in a manner similar to the demos provided on this repository. I have used the first code box below to test this data loader and it works, however, when I run the function build_text_dataloader from tge second code box I get an error on line 269 resulting in the stack traces shown below.

Environment

  • OS: [Ubuntu 20.04]
  • Hardware (GPU, or instance type): [8xA100]

Code

Testing Code

from omegaconf import OmegaConf as om
from itertools import islice
import sys
import os

sys.path.append(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))

from src.text_data import build_tokenizer, build_text_dataloader

# Helpful to test if your dataloader is working locally
# Run `python data.py  --local_path [local] [--remote_path remote, optional]` and verify that batches are printed out
if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('--tokenizer',
                        type=str,
                        default='bert-base-uncased',
                        help='the name of the tokenizer to use')
    parser.add_argument('--local_path',
                        type=str,
                        default='./data/c4/',
                        help='the path to the local copy of the dataset')
    parser.add_argument(
        '--remote_path',
        type=str,
        default=None,
        help='the path to the remote copy to stream from (optional)')
    parser.add_argument('--split',
                        type=str,
                        default='train_small',
                        help='which split of the dataset to use')
    parser.add_argument('--max_seq_len',
                        type=int,
                        default=128,
                        help='max sequence length to test')

    args = parser.parse_args()

    if args.remote_path is not None:
        print(
            f'Reading {args.split} split from {args.local_path} <- streamed from <- {args.remote_path}'
        )
    else:
        print(f'Reading {args.split} split from {args.local_path}')

    cfg = {
        'name': 'text',
        'dataset': {
            'local': args.local_path,
            'remote': args.remote_path,
            'split': args.split,
            'shuffle': True, # to match small dataset test
            'max_seq_len': args.max_seq_len,
            'mlm_probability': 0.3
            # 'keep_zip': True,  # in case we need compressed files after testing # to match small dataset test
        },
        'drop_last': True, # to match small dataset test
        'num_workers': 8, # to match small dataset test
    }
    cfg = om.create(cfg)
    device_batch_size = 2

    tokenizer_cfg = {'name': args.tokenizer, 'kwargs': {}}
    tokenizer_cfg['kwargs'] = {'model_max_length': args.max_seq_len}
    tokenizer_cfg = om.create(tokenizer_cfg)
    tokenizer = build_tokenizer(tokenizer_cfg)

    loader = build_text_dataloader(cfg, tokenizer, device_batch_size)
    tokenizer = loader.dataset.tokenizer  # type: ignore
    for batch_ix, batch in enumerate(islice(loader, 5)):
        print('\n')
        print('#' * 20, f'Batch {batch_ix}', '#' * 20)
        for k, v in batch.items():
            print(k, v.shape, v.dtype)
        for sample_ix, token_sample in enumerate(batch['input_ids']):
            print('-' * 20, f' Sample {sample_ix} ', '-' * 20)
            print(tokenizer.decode(token_sample))

Execution (Where error occurs)

import os
from itertools import islice
from typing import Any, Callable, Dict, List, Optional, Sequence, Union

import numpy as np
import torch
import transformers
from omegaconf import DictConfig
from omegaconf import OmegaConf as om
from streaming import Stream, StreamingDataset
from torch.utils.data import DataLoader
from transformers import (AutoTokenizer, PreTrainedTokenizer,
                          PreTrainedTokenizerFast)

Tokenizer = Union[PreTrainedTokenizer, PreTrainedTokenizerFast]


def build_tokenizer(om_tokenizer_config: DictConfig,) -> Tokenizer:
    os.environ['TRANSFORMERS_NO_ADVISORY_WARNINGS'] = '1'
    os.environ['TOKENIZERS_PARALLELISM'] = 'false'

    resolved_om_tokenizer_config = om.to_container(om_tokenizer_config,
                                                   resolve=True)
    tokenizer_kwargs = resolved_om_tokenizer_config.get(  # type: ignore
        'kwargs', {})
    tokenizer_name = resolved_om_tokenizer_config['name']  # type: ignore
    tokenizer = AutoTokenizer.from_pretrained(tokenizer_name,
                                              **tokenizer_kwargs)

    # HuggingFace does not respect the model_max_length kwarg, and overrides it with
    # min(kwargs['model_max_length'], original_config['model_max_length']), so we
    # explicitly set it here
    tokenizer.model_max_length = tokenizer_kwargs.get(
        'model_max_length',
        int(1e30),
    )

    return tokenizer


class StreamingTextDataset(StreamingDataset):
    """Generic text dataset using MosaicML's StreamingDataset.

    Args:
        tokenizer (Tokenizer): HuggingFace tokenizer to
            tokenize samples.
        max_seq_len (int): The max sequence length of each sample.
        streams (Sequence[Stream], optional): One or more Streams to stream/cache samples from,
            which may be upsampled or downsampled. StreamingDataset uses either ``streams`` or
            ``remote``/``local``. Defaults to ``None``.
        remote (str, optional): Remote path or directory to download the dataset from. If ``None``,
            its data must exist locally. StreamingDataset uses either ``streams`` or
            ``remote``/``local``. Defaults to ``None``.
        local (str, optional): Local working directory to download shards to. This is where shards
            are cached while they are being used. Uses a temp directory if not set.
            StreamingDataset uses either ``streams`` or ``remote``/``local``. Defaults to ``None``.
        split (str, optional): Which dataset split to use, if any. If provided, we stream from/to
            the ``split`` subdirs of  ``remote`` and ``local``. Defaults to ``None``.
        download_retry (int): Number of download re-attempts before giving up. Defaults to ``2``.
        download_timeout (float): Number of seconds to wait for a shard to download before raising
            an exception. Defaults to ``60``.
        validate_hash (str, optional): Optional hash or checksum algorithm to use to validate
            shards. Defaults to ``None``.
        keep_zip (bool): Whether to keep or delete the compressed form when decompressing
            downloaded shards. If ``False``, keep iff remote is local or no remote. Defaults to
            `False``.
        keep_raw (bool): Whether to keep or delete the decompressed form (or only form)
            of shards after all their samples have been yielded this epoch. If ``False``, keep iff
            remote is local or no remote and no compression. Defaults to ``True``.
        samples_per_epoch (int, optional): Provide this field iff you are weighting sub-datasets
            proportionally. Defaults to ``None``.
        predownload (int, optional): Target number of samples ahead to download the shards of while
            iterating. Defaults to ``100_000``.
        partition_algo (str): Which partitioning algorithm to use. Defaults to ``orig``.
        num_canonical_nodes (int, optional): Canonical number of nodes for shuffling with
            resumption. Defaults to ``None``, which is interpreted as the number of nodes of the
            initial run.
        batch_size (int, optional): Batch size of its DataLoader, which affects how the dataset is
            partitioned over the workers. Defaults to ``None``.
        shuffle (bool): Whether to iterate over the samples in randomized order. Defaults to
            ``False``.
        shuffle_algo (str): Which shuffling algorithm to use. Defaults to ``py1s``.
        shuffle_seed (int): Seed for Deterministic data shuffling. Defaults to ``9176``.
    """

    def __init__(self,
                 tokenizer: Tokenizer,
                 max_seq_len: int,
                 streams: Optional[Sequence[Stream]] = None,
                 remote: Optional[str] = None,
                 local: Optional[str] = None,
                 split: Optional[str] = None,
                 download_retry: int = 2,
                 download_timeout: float = 60,
                 validate_hash: Optional[str] = None,
                 keep_zip: bool = False,
                 keep_raw: bool = True,
                 samples_per_epoch: Optional[int] = None,
                 predownload: int = 100_000,
                 partition_algo: str = 'orig',
                 num_canonical_nodes: Optional[int] = None,
                 batch_size: Optional[int] = None,
                 shuffle: bool = False,
                 shuffle_algo: str = 'py1s',
                 shuffle_seed: int = 9176,
                 **kwargs: Dict[str, Any]):

        group_method = kwargs.pop('group_method', None)
        if group_method is not None:
            raise NotImplementedError(
                'group_method is deprecated and has been removed.\nTo ' +
                'concatenate, use the --concat_tokens ' +
                'argument when creating your MDS dataset with concat_c4.py')

        if kwargs is not None and len(kwargs) > 0:
            raise ValueError(
                f'StreamingTextDataset() got an unexpected keyword argument: {kwargs}'
            )

        if local is not None and (remote is None or (local == remote)):
            if os.path.isdir(local):
                contents = set(os.listdir(local))
                if split not in contents:
                    raise ValueError(
                        f'local directory {local} does not contain split {split}'
                    )

        # Build Dataset
        super().__init__(
            streams=streams,
            remote=remote,
            local=local,
            split=split,
            download_retry=download_retry,
            download_timeout=download_timeout,
            validate_hash=validate_hash,
            keep_zip=keep_zip,
            # keep_raw=keep_raw,
            # samples_per_epoch=samples_per_epoch,
            predownload=predownload,
            partition_algo=partition_algo,
            num_canonical_nodes=num_canonical_nodes,
            batch_size=batch_size,
            shuffle=shuffle,
            shuffle_algo=shuffle_algo,
            shuffle_seed=shuffle_seed,
        )
        self.tokenizer = tokenizer
        self.max_seq_len = max_seq_len

    # How to tokenize a text sample to a token sample
    def _tokenize(self, text_sample):
        if self.tokenizer._pad_token is None:
            # Some tokenizers (e.g. GPT2 tokenizer) have no padding token which causes bugs
            raise RuntimeError(
                'If tokenizing on-the-fly, tokenizer must have a pad_token_id')

        return self.tokenizer(text_sample['text'],
                              truncation=True,
                              padding='max_length',
                              max_length=self.max_seq_len)

    def _read_binary_tokenized_sample(self, sample):
        return torch.from_numpy(
            np.frombuffer(sample['tokens'],
                          dtype=np.int64)[:self.max_seq_len].copy())

    # How to process a sample
    def __getitem__(self, idx: int) -> Union[Dict[str, Any], torch.Tensor]:
        sample = super().__getitem__(idx)
        if 'text' in sample:
            token_sample = self._tokenize(sample)
        elif 'tokens' in sample:
            token_sample = self._read_binary_tokenized_sample(sample)
        else:
            raise RuntimeError(
                'StreamingTextDataset needs samples to have a `text` or `tokens` column'
            )
        return token_sample


class ConcatenatedSequenceCollatorWrapper:
    """Collator wrapper to add sequence_id to batch."""

    def __init__(self,
                 base_collator: Callable,
                 eos_token_id: Optional[int] = None,
                 bos_token_id: Optional[int] = None):
        self.base_collator = base_collator
        if (eos_token_id is None) and (bos_token_id is None):
            raise ValueError(
                'Must supply a value for either eos_token_id or bos_token_id, but got None for both.'
            )
        if (eos_token_id is not None) and (bos_token_id is not None):
            raise ValueError(
                'Cannot use *both* EOS and BOS tokens for detecting sequence boundaries. ' +\
                'Please supply `eos_token_id` if sequences end with an EOS token, or use ' +\
                '`bos_token_id` if sequences start with a BOS token.'
            )
        if eos_token_id is None:
            self.split_token_id = bos_token_id
            self.bos_mode = True
        else:
            self.split_token_id = eos_token_id
            self.bos_mode = False

    def __call__(self, examples: List[Any]) -> Dict[str, torch.Tensor]:
        batch = self.base_collator(examples)
        batch['sequence_id'] = self.get_sequence_id_from_batch(batch)
        return batch

    def get_sequence_id_from_batch(
            self, batch: Dict[str, torch.Tensor]) -> torch.Tensor:
        assert self.split_token_id is not None
        is_separator = torch.eq(batch['input_ids'], self.split_token_id)
        cumulative_sep = torch.cumsum(is_separator,
                                      dim=1).to(batch['input_ids'].dtype)
        # If separator token is bos, we're already done
        if self.bos_mode:
            return cumulative_sep

        # If separator token is eos, right shift 1 space
        left_zeros = cumulative_sep.new_zeros((cumulative_sep.shape[0], 1))
        return torch.cat([left_zeros, cumulative_sep[:, :-1]], dim=1)


def build_text_dataloader(
    cfg: DictConfig,
    tokenizer: Tokenizer,
    device_batch_size: int,
):
    assert cfg.name == 'text', f'Tried to build text dataloader with cfg.name={cfg.name}'
    if cfg.dataset.get('group_method', None) is not None:
        raise NotImplementedError(
            'group_method is deprecated and has been removed.\nTo ' +
            'concatenate, use the --concat_tokens ' +
            'argument when creating your MDS dataset with convert_dataset.py')

    # build streams
    streams_dict = cfg.dataset.get('streams', None)
    streams = None
    if streams_dict is not None:
        streams = []
        for _, stream in streams_dict.items():
            streams.append(
                Stream(
                    remote=stream.get('remote', None) or
                    cfg.dataset.get('remote', None),
                    local=stream.get('local', None) or
                    cfg.dataset.get('local', None),
                    split=stream.get('split', None) or
                    cfg.dataset.get('split', None),
                    proportion=stream.get('proportion', None),
                    repeat=stream.get('repeat', None),
                    samples=stream.get('samples', None),
                    download_retry=stream.get('download_retry', None) or
                    cfg.dataset.get('download_retry', 2),
                    download_timeout=stream.get('download_timeout', None) or
                    cfg.dataset.get('download_timeout', 60),
                    validate_hash=stream.get('validate_hash', None) or
                    cfg.dataset.get('validate_hash', None),
                    keep_zip=stream.get('keep_zip', None) or
                    cfg.dataset.get('keep_zip', False),
                    keep_raw=stream.get('keep_raw', None) or
                    cfg.dataset.get('keep_raw', True),
                ))

    # build dataset potentially with streams
    dataset = StreamingTextDataset(
        tokenizer=tokenizer,
        max_seq_len=cfg.dataset.max_seq_len,
        streams=streams,
        remote=cfg.dataset.get('remote', None),
        local=cfg.dataset.get('local', None),
        split=cfg.dataset.get('split', None),
        download_retry=cfg.dataset.get('download_retry', 2),
        download_timeout=cfg.dataset.get('download_timeout', 60),
        validate_hash=cfg.dataset.get('validate_hash', None),
        keep_zip=cfg.dataset.get('keep_zip', False),
        keep_raw=cfg.dataset.get('keep_raw', True),
        samples_per_epoch=cfg.dataset.get('samples_per_epoch', None),
        predownload=cfg.dataset.get('predownload', 100_000),
        partition_algo=cfg.dataset.get('partition_algo', 'orig'),
        num_canonical_nodes=cfg.dataset.get('num_canonical_nodes', 128),
        batch_size=device_batch_size,
        shuffle=cfg.dataset.get('shuffle', False),
        shuffle_algo=cfg.dataset.get('shuffle_algo', 'py1s'),
        shuffle_seed=cfg.dataset.get('shuffle_seed', 9176),
    )

    mlm_probability = cfg.dataset.get('mlm_probability', None)
    collate_fn = transformers.DataCollatorForLanguageModeling(
        tokenizer=dataset.tokenizer,
        mlm=mlm_probability is not None,
        mlm_probability=mlm_probability)

    eos_token_id = cfg.dataset.get('eos_token_id')
    bos_token_id = cfg.dataset.get('bos_token_id')
    if (eos_token_id is not None) or (bos_token_id is not None):
        # Note: Will raise an error if both are non-None
        collate_fn = ConcatenatedSequenceCollatorWrapper(
            base_collator=collate_fn,
            eos_token_id=eos_token_id,
            bos_token_id=bos_token_id)

    return DataLoader(
        dataset,
        collate_fn=collate_fn,
        batch_size=device_batch_size,
        drop_last=cfg.drop_last,
        num_workers=cfg.num_workers,
        pin_memory=cfg.get('pin_memory', True),
        prefetch_factor=cfg.get('prefetch_factor', 2),
        persistent_workers=cfg.get('persistent_workers', True),
        timeout=cfg.get('timeout', 0),
    )

Stack Trace

First Error

[rank7]: Traceback (most recent call last): [rank7]:   File "/home/ubuntu/foundation-c4-mds/ProjectOrion/train.py", line 242, in [rank7]:     main(cfg) [rank7]:   File "/home/ubuntu/foundation-c4-mds/ProjectOrion/train.py", line 153, in main [rank7]:     train_loader = build_dataloader( [rank7]:   File "/home/ubuntu/foundation-c4-mds/ProjectOrion/train.py", line 124, in build_dataloader [rank7]:     return text_data_module.build_text_dataloader(cfg, tokenizer, [rank7]:   File "/home/ubuntu/foundation-c4-mds/ProjectOrion/src/text_data.py", line 422, in build_text_dataloader [rank7]:     dataset = StreamingC4(local='./data/c4',split='train_small',batch_size=device_batch_size,tokenizer_name='bert-base-uncased',max_seq_len=128,group_method='truncate') [rank7]:   File "/home/ubuntu/foundation-c4-mds/ProjectOrion/src/text_data.py", line 132, in init [rank7]:     super().init(remote=remote, [rank7]:   File "/home/ubuntu/.local/lib/python3.10/site-packages/streaming/base/dataset.py", line 527, in init [rank7]:     self._shm_prefix_int, self._locals_shm = get_shm_prefix(streams_local, streams_remote, [rank7]:   File "/home/ubuntu/.local/lib/python3.10/site-packages/streaming/base/shared/prefix.py", line 243, in get_shm_prefix [rank7]:     raise RuntimeError(f'Internal error: shared memory registered does not match ' + [rank7]: RuntimeError: Internal error: shared memory registered does not match local leader as streams_local or prefix_int not match. local leader: [] and 0. expected: ['/home/ubuntu/foundation-c4-mds/ProjectOrion/data/c4/train_small'] and 9.

Second Error

[rank3]: Traceback (most recent call last): [rank3]:   File "/home/ubuntu/.local/lib/python3.10/site-packages/streaming/base/shared/prefix.py", line 235, in get_shm_prefix [rank3]:     shm = SharedMemory(name, False) [rank3]:   File "/home/ubuntu/.local/lib/python3.10/site-packages/streaming/base/shared/memory.py", line 49, in init [rank3]:     shm = BuiltinSharedMemory(name, create, size) [rank3]:   File "/usr/lib/python3.10/multiprocessing/shared_memory.py", line 104, in init [rank3]:     self._fd = _posixshmem.shm_open( [rank3]: FileNotFoundError: [Errno 2] No such file or directory: '/000009_locals'

[rank3]: During handling of the above exception, another exception occurred:

[rank3]: Traceback (most recent call last): [rank3]:   File "/home/ubuntu/foundation-c4-mds/ProjectOrion/train.py", line 242, in [rank3]:     main(cfg) [rank3]:   File "/home/ubuntu/foundation-c4-mds/ProjectOrion/train.py", line 153, in main [rank3]:     train_loader = build_dataloader( [rank3]:   File "/home/ubuntu/foundation-c4-mds/ProjectOrion/train.py", line 124, in build_dataloader [rank3]:     return text_data_module.build_text_dataloader(cfg, tokenizer, [rank3]:   File "/home/ubuntu/foundation-c4-mds/ProjectOrion/src/text_data.py", line 422, in build_text_dataloader [rank3]:     dataset = StreamingC4(local='./data/c4',split='train_small',batch_size=device_batch_size,tokenizer_name='bert-base-uncased',max_seq_len=128,group_method='truncate') [rank3]:   File "/home/ubuntu/foundation-c4-mds/ProjectOrion/src/text_data.py", line 132, in init [rank3]:     super().init(remote=remote, [rank3]:   File "/home/ubuntu/.local/lib/python3.10/site-packages/streaming/base/dataset.py", line 527, in init [rank3]:     self._shm_prefix_int, self._locals_shm = get_shm_prefix(streams_local, streams_remote, [rank3]:   File "/home/ubuntu/.local/lib/python3.10/site-packages/streaming/base/shared/prefix.py", line 237, in get_shm_prefix [rank3]:     raise RuntimeError(f'Internal error: shared memory prefix={prefix_int} was not ' + [rank3]: RuntimeError: Internal error: shared memory prefix=9 was not registered by local leader. This may be because you specified different local parameters from different ranks.

naston avatar Nov 11 '24 01:11 naston

@XiaohanZhangCMU this might be related to your PR https://github.com/mosaicml/streaming/pull/813?

ethantang-db avatar Nov 11 '24 17:11 ethantang-db

@naston Thanks for raising this. Which streaming version are you using? If you're using v0.9.1, can you try downgrading to v0.9.0 and see if this solves the issue?

@XiaohanZhangCMU if that's the case we should yank, revert, re-release.

snarayan21 avatar Nov 11 '24 18:11 snarayan21

I will look into downgrading as I am v0.9.1. As I am testing more I see it specifically occur when I run my program using the mosaicml composer library with something like composer train.py ./config.yaml. When I run the code using python train.py ./config.yaml I receive no error.

naston avatar Nov 11 '24 19:11 naston

The downgrade seems to have fixed the problem in my local single GPU machine. I will confirm this on my multi-GPU environment when I get a chance and update this thread.

naston avatar Nov 11 '24 19:11 naston

Hey @naston I am trying to reproduce but couldnt. I got a value error instead of the runtime error. ValueError: In the absence of local dataset, path to remote dataset must be provided

Also, I noticed that you use local as the path to your dataset, that's not what streaming desires.

Can you set

remote = './data/c4', local=None

and try again?

I am also surprised it is different behaviors with launcher (composer) or without. I think you should always use composer as the launcher, because streamingdataset needs certain env vars to be set up for it to recognize the worker infos.

XiaohanZhangCMU avatar Nov 11 '24 21:11 XiaohanZhangCMU

Downgrading to v0.9.0 and running streaming.base.util.clean_stale_shared_memory() fixed my issue. I saw no change in the error when using remote vs. local. Should I still be using remote for a locally downloaded dataset? Any insight you could provide on why that is would be helpful.

naston avatar Nov 11 '24 21:11 naston

AFAIK, if you are using a remote path for a dataset, it is better to just specify the remote and don't specify local as we can then do custom logic on the local folder.

It is also possible that when you use the past version of streaming, it might have left some left over files. We did make a change on how to recognize those files in the 0.9.1 release so that might have caused some issues. Can you try to reproduce the issue again with 0.9.1 after cleaning the stale shared memory?

ethantang-db avatar Nov 11 '24 23:11 ethantang-db

I will test again on the new version when I get a chance. That may not be until this weekend but I will report back when I get the chance.

naston avatar Nov 12 '24 01:11 naston

Regarding:

Also, I noticed that you use local as the path to your dataset, that's not what streaming desires.

@XiaohanZhangCMU @naston If the dataset is on local disk, you should specify the path to local. Otherwise remote is fine.

Thanks for testing this @naston, if this issue persists with the new version we will rollback.

snarayan21 avatar Nov 12 '24 17:11 snarayan21

If the dataset is on local disk, you should specify the path to local. Otherwise remote is fine.

Last time I used version 0.8.1 with my dataset stored on a local disk, I set the path to local, and everything worked fine initially. However, after some time, the streaming encountered missing data (one or two items may have been deleted previously) which seemed to trigger a cleanup operation that treated local as the cache directory. This led to the unintended deletion of a lot of data. I'm not sure if this is a bug, or just intented.

wizyoung avatar Nov 15 '24 18:11 wizyoung

@wizyoung That does sound like unintended behavior. If you have a repro, can you open a separate issue please?

snarayan21 avatar Nov 15 '24 18:11 snarayan21

@snarayan21 Okay, I will try to reproduce this issue with the latest version next week and then provide more details in a new issue report.

wizyoung avatar Nov 15 '24 18:11 wizyoung

@wizyoung if you do manage to reproduce it, can you open up a new issue so we can track it better? This sounds pretty different than what the author of this issue is experiencing

ethantang-db avatar Nov 15 '24 19:11 ethantang-db

@naston wonder if you got a chance to reproduce this issue over the weekend?

ethantang-db avatar Nov 18 '24 18:11 ethantang-db

No. I was delayed due to some flash attention issues I was managing. Hopefully this weekend will yield results.

naston avatar Nov 21 '24 07:11 naston

I still get the same issue on v0.9.1 that does not show up with v0.9.0:

[rank7]: [rank7]: Traceback (most recent call last):
[rank7]: [rank7]:   File "/home/ubuntu/.local/lib/python3.10/site-packages/streaming/base/shared/prefix.py", line 235, in get_shm_prefix
[rank7]: [rank7]:     shm = SharedMemory(name, False)
[rank7]: [rank7]:   File "/home/ubuntu/.local/lib/python3.10/site-packages/streaming/base/shared/memory.py", line 49, in __init__
[rank7]: [rank7]:     shm = BuiltinSharedMemory(name, create, size)
[rank7]: [rank7]:   File "/usr/lib/python3.10/multiprocessing/shared_memory.py", line 104, in __init__
[rank7]: [rank7]:     self._fd = _posixshmem.shm_open(
[rank7]: [rank7]: FileNotFoundError: [Errno 2] No such file or directory: '/000000_locals'

Environment info:

# python==3.10
einops==0.8.0
torch==2.4.0 torchvision==0.19.0 torchaudio==2.4.0 --index-url https://download.pytorch.org/whl/cu124
mosaicml[nlp,tensorboard]==0.26.1
mosaicml-streaming==0.9.1
omegaconf==2.3.0
transformers==4.46.2
packaging
ninja
loguru
numpy==1.24.0
python-dotenv
flash_attn==2.6.3

I had to reset my machine in order to downgrade.

naston avatar Nov 27 '24 19:11 naston

I see, ok, I will try to reproduce this on my machine. Just want to confirm the following:

  1. were you running this in a clean v env?
  2. can you also check there's no outstanding shared memory objects on the machine? IE if there's anything under /dev/shm on linux.

ethantang-db avatar Dec 02 '24 16:12 ethantang-db

I am using Lambda cloud canned machines yes the env is clean and on reset/startup there are no shared memory objects on the machine.

naston avatar Dec 02 '24 20:12 naston

@naston is this still an issue for you?

and @ethantang-db @XiaohanZhangCMU , any luck repro-ing?

snarayan21 avatar Jan 04 '25 05:01 snarayan21

Happy New Year, team!

We're running into a similar issue, when using DDP on 1 Node of 8xH100s and across several nodes. We get this error message across several ranks:

[rank6]: Traceback (most recent call last):
[rank6]:   File "/home/sahil/.cache/pypoetry/virtualenvs/linum-v2-e1r7WG8o-py3.10/lib/python3.10/site-packages/streaming/base/shared/prefix.py", line 235, in get_shm_prefix
[rank6]:     shm = SharedMemory(name, False)
[rank6]:   File "/home/sahil/.cache/pypoetry/virtualenvs/linum-v2-e1r7WG8o-py3.10/lib/python3.10/site-packages/streaming/base/shared/memory.py", line 49, in __init__
[rank6]:     shm = BuiltinSharedMemory(name, create, size)
[rank6]:   File "/opt/conda/lib/python3.10/multiprocessing/shared_memory.py", line 103, in __init__
[rank6]:     self._fd = _posixshmem.shm_open(
[rank6]: FileNotFoundError: [Errno 2] No such file or directory: '/000001_locals'

[rank6]: During handling of the above exception, another exception occurred:
[rank6]: Traceback (most recent call last):
[rank6]:   File "/home/sahil/composer_train.py", line 172, in <module>
[rank6]:     main(config)
[rank6]:   File "/home/sahil/composer_train.py", line 43, in main
[rank6]:     train_dataloader, val_dataloader, model = create_dataloader_and_model(config=config)
[rank6]:   File "/home/sahil/composer_train.py", line 96, in create_dataloader_and_model
[rank6]:     train_dataloader = create_mds_dataloader(
[rank6]:   File "/home/sahil/data/mosaicml/mds_tc_latents_dataset.py", line 169, in create_mds_dataloader
[rank6]:     dataset = StreamingDataset(streams=streams, **config_dict)
[rank6]:   File "/home/sahil/.cache/pypoetry/virtualenvs/linum-v2-e1r7WG8o-py3.10/lib/python3.10/site-packages/streaming/base/dataset.py", line 521, in __init__
[rank6]:     self._shm_prefix_int, self._locals_shm = get_shm_prefix(streams_local, streams_remote,
[rank6]:   File "/home/sahil/.cache/pypoetry/virtualenvs/linum-v2-e1r7WG8o-py3.10/lib/python3.10/site-packages/streaming/base/shared/prefix.py", line 237, in get_shm_prefix
[rank6]:     raise RuntimeError(f'Internal error: shared memory prefix={prefix_int} was not ' +
[rank6]: RuntimeError: Internal error: shared memory prefix=1 was not registered by local leader. This may be because you specified different ``local`` parameters from different ranks.
----------End global rank 6 STDERR----------

We create the StreamingDataset out of a list of Streams, where we specify the remote and a local folder on our nvme drive to store the cache files. We prefer this over using the default behavior where local=None because our root disk is much smaller and slower (not-SSD).

StreamingDataset(streams=[Stream(remote='s3://dataset-1', local='/scratch/training_data/dataset-1'), ...]

When we run it as composer train.py --config <path_to_config> (i.e., with a single GPU) we have no issues. We get this issue when use composer -n 8 train.py --config <path_to_config> (i.e. with 8 GPUs on a single node).

Environment:

torch==2.4.1+cuda121
mosaicml==0.28.0
mosiacml-streaming==0.10.0

We've tried a few of the things mentioned in this thread, but none seem to fix the problem:

  • Downgrading Streaming Dataset -- We downgraded from 0.10.0 to 0.9.0.
  • Trying Different Launchers -- We tried using torchrun w/ the 8 GPUs on 1 Node.
  • Get rid of local path from Stream config -- As I mentioned, we want to specify the local to save to our nvme drive -- but for the sake of debugging, we tried using local=None.
  • Cleared the shared memory at /dev/shm -- We tried deleting everything in shared memory, calling the streaming.base.util.clean_stale_shared_memory(), and re-running with composer launcher.

Is there any additional information that I can provide to help support this investigation? And do ya'll have any other suggestions / quick things to try that we could test out?

schopra8 avatar Jan 07 '25 19:01 schopra8

@naston is this still an issue for you?

and @ethantang-db @XiaohanZhangCMU , any luck repro-ing?

Yes, I am still having this issue. However, downgrading to 0.9.0 fixes the issue for me.

naston avatar Jan 07 '25 21:01 naston

One thing we've noticed in debugging this further, within

https://github.com/mosaicml/streaming/blob/0b2227f552d76cf359e1364876167f46a021f293/streaming/base/shared/prefix.py#L218-L240

On rank zero we see both of these being triggered, which is unexpected -- https://github.com/mosaicml/streaming/blob/0b2227f552d76cf359e1364876167f46a021f293/streaming/base/shared/prefix.py#L225 https://github.com/mosaicml/streaming/blob/0b2227f552d76cf359e1364876167f46a021f293/streaming/base/shared/prefix.py#L235

I think we're expecting rank 0 to be the local leader, create the files, hit the barrier, and then allow the other ranks to access the files.

schopra8 avatar Jan 07 '25 21:01 schopra8

Turns out ^ was a red herring -- this is expected behavior.

The below function tries to create SharedMemory objects with create=False on all the ranks to see whether the shared memory files already exist. That's why we were seeing two invocations to SharedMemory on rank 0. It's not executing lines 225 and lines 235 on rank 0.

https://github.com/mosaicml/streaming/blob/0b2227f552d76cf359e1364876167f46a021f293/streaming/base/shared/prefix.py#L213-L216

schopra8 avatar Jan 07 '25 22:01 schopra8

@schopra8 can you confirm 0.9.0 also having the issue? These shared memory issues are likely created by PR 813, so hypothetically downgrading to 0.9.0 should resolve them.

XiaohanZhangCMU avatar Jan 07 '25 22:01 XiaohanZhangCMU

Confirming, when we downgrade to 0.9.0 we get a bunch of NCCL errors --

[rank6]: [rank6]:   File "/home/sahil/.cache/pypoetry/virtualenvs/linum-v2-e1r7WG8o-py3.10/lib/python3.10/site-packages/streaming/base/dataset.py", line 529, in __ini
t__
[rank6]: [rank6]:     self._shm_prefix_int, self._locals_shm = get_shm_prefix(streams_local, streams_remote,
[rank6]: [rank6]:   File "/home/sahil/.cache/pypoetry/virtualenvs/linum-v2-e1r7WG8o-py3.10/lib/python3.10/site-packages/streaming/base/shared/prefix.py", line 196, in
 get_shm_prefix
[rank6]: [rank6]:     dist.barrier()
[rank6]: [rank6]:   File "/home/sahil/.cache/pypoetry/virtualenvs/linum-v2-e1r7WG8o-py3.10/lib/python3.10/site-packages/torch/distributed/c10d_logger.py", line 79, in
 wrapper
[rank6]: [rank6]:     return func(*args, **kwargs)
[rank6]: [rank6]:   File "/home/sahil/.cache/pypoetry/virtualenvs/linum-v2-e1r7WG8o-py3.10/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 3
936, in barrier
[rank6]: [rank6]:     work = default_pg.barrier(opts=opts)
[rank6]: [rank6]: torch.distributed.DistBackendError: [6] is setting up NCCL communicator and retrieving ncclUniqueId from [0] via c10d key-value store by key '0', bu
t store->get('0') got error: Connection reset by peer

schopra8 avatar Jan 07 '25 22:01 schopra8

Looking back, I realize that I'm missing part of the traceback with 0.10.0. I've edited my earlier message to reflect this -- but here it as well for posterity --

[rank6]: Traceback (most recent call last):
[rank6]:   File "/home/sahil/.cache/pypoetry/virtualenvs/linum-v2-e1r7WG8o-py3.10/lib/python3.10/site-packages/streaming/base/shared/prefix.py", line 235, in get_shm_prefix
[rank6]:     shm = SharedMemory(name, False)
[rank6]:   File "/home/sahil/.cache/pypoetry/virtualenvs/linum-v2-e1r7WG8o-py3.10/lib/python3.10/site-packages/streaming/base/shared/memory.py", line 49, in __init__
[rank6]:     shm = BuiltinSharedMemory(name, create, size)
[rank6]:   File "/opt/conda/lib/python3.10/multiprocessing/shared_memory.py", line 103, in __init__
[rank6]:     self._fd = _posixshmem.shm_open(
[rank6]: FileNotFoundError: [Errno 2] No such file or directory: '/000001_locals'

Rank 6 is looking for 000001_locals and cannot open it, even though I see it is created by rank 0 and can be found at /dev/shm/000001_locals.

schopra8 avatar Jan 07 '25 22:01 schopra8

@XiaohanZhangCMU Looks like mosaicml-streamng==0.9.0 fails with torch == 2.4.1 but works for torch == 2.5.1. Training seems to be working on single node 8xH100s after downgrading mosaicml-streaming and upgrading torch.

One thing to note -- is that we get this warning --

[W107 22:46:05.913257855 ProcessGroupNCCL.cpp:4115] [PG ID 1 PG GUID 0 Rank 0]  using GPU 0 to perform barrier as devices used by this process are currently unknown. This can potentially cause a hang if this rank to GPU mapping is incorrect. Specify device_ids in barrier() to force use of a particular device,or call init_process_group() with a device_id

Thought I'd post it, in case helps with further debuggin.

schopra8 avatar Jan 07 '25 22:01 schopra8

Spoke too soon -- training worked once, but we tried again with 0.9.0 streaming and 2.5.1 torch -- and we're getting the same NCCL timeout issues at dist.barrier() --

https://github.com/mosaicml/streaming/blob/a7ed9de1d352e4c6b477a7c9d473e387703da2d5/streaming/base/shared/prefix.py#L196

schopra8 avatar Jan 07 '25 22:01 schopra8

@schopra8 based on the warning you're getting, I'm suspecting that it's possible you may not be setting up distributed training correctly. Composer's -n argument (see here) sets the number of processes to launch per rank, and so I'm suspecting that 8 processes may be getting launched on rank 0. Can you verify the output of torch.cuda.device_count()? This may be causing the shared mem access issues you're seeing.

@naston Would you be able to verify that your training script also works on single GPU? And that you are correctly configuring distributed training?

snarayan21 avatar Jan 07 '25 23:01 snarayan21

@snarayan21 Confirmed I ran torch.cuda.device_count() and got 8 back.

But I think you're right about the underlying issue. When I look at nvidia-smi I see 8 processes on rank 0, in addition to the processes on ranks 1-7:

Screenshot 2025-01-07 at 3 08 19 PM

schopra8 avatar Jan 07 '25 23:01 schopra8