llm-foundry icon indicating copy to clipboard operation
llm-foundry copied to clipboard

How to support multi-threaded parallel data preprocessing?

Open YixinSong-e opened this issue 1 year ago • 11 comments

I want to pretrain an LLM with 2T tokens using llm-foundry. But before training, the data processing time is too long. Is there any way to accelerate it?

YixinSong-e avatar Jan 14 '24 06:01 YixinSong-e

Agree, this would be very useful.

Would it be possible to implement sharding for convert_dataset_json.py? Simply add extra parameters to specify # of shards and index of shard. Script could then be run on multiple machines, targeting the same output directory. I checked the code, but I am not sure how to do it with MDSWriter yet.

MFajcik avatar Jan 15 '24 14:01 MFajcik

I think the example conversion script is perhaps not very good. One thing that helps a lot is to use the Datasets .map() to batch tokenize the dataset. I'm not sure how writing to the MDS file can be parallelized, but it probably can.

Also, there is a bug in tokenizers that might make it way slower than you would like - see https://github.com/huggingface/tokenizers/pull/1413.

rlrs avatar Jan 16 '24 09:01 rlrs

The text to MDS conversion script (https://github.com/mosaicml/llm-foundry/blob/main/scripts/data_prep/convert_text_to_mds.py) is parallelized, is that what you are looking for (or at least a good starting point)?

dakinggg avatar Jan 16 '24 23:01 dakinggg

The text to MDS conversion script (https://github.com/mosaicml/llm-foundry/blob/main/scripts/data_prep/convert_text_to_mds.py) is parallelized, is that what you are looking for (or at least a good starting point)?

Thanks, I will look into it.

YixinSong-e avatar Jan 17 '24 04:01 YixinSong-e

Isn't enough to just run the script in parallel, and merge the mds shards with this method? https://github.com/mosaicml/llm-foundry/blob/f43d1cfb1ef8f38ca90fee68b0643f45d6d5b2da/llmfoundry/utils/data_prep_utils.py#L29

Currently, I am trying it like this.

I have large jsonl file. I used split -l to split it into number of procs files. Then I call convert_dataset_json.py independently on each of these, obtaining 1 output folder for each process, the output folder is in some output_root_folder.

Lastly, I hope it will be enough to just call the mentioned merge method on output_root_folder

(Will update once the progress is finished.).

MFajcik avatar Jan 17 '24 11:01 MFajcik

Yes @MFajcik , that should work!

dakinggg avatar Jan 19 '24 20:01 dakinggg

Isn't enough to just run the script in parallel, and merge the mds shards with this method?

https://github.com/mosaicml/llm-foundry/blob/f43d1cfb1ef8f38ca90fee68b0643f45d6d5b2da/llmfoundry/utils/data_prep_utils.py#L29

Currently, I am trying it like this.

I have large jsonl file. I used split -l to split it into number of procs files. Then I call convert_dataset_json.py independently on each of these, obtaining 1 output folder for each process, the output folder is in some output_root_folder.

Lastly, I hope it will be enough to just call the mentioned merge method on output_root_folder

(Will update once the progress is finished.).

Yes @MFajcik , that should work!

It does work! Preprocessing was done in notime. Training is running right now. Thanks for the hint!

MFajcik avatar Jan 20 '24 11:01 MFajcik

I changed ConcatTokensDataset.__iter__ to this:

def __iter__(self) -> Iterable[Dict[str, bytes]]:

        buffer = []
        # self.write_batch_size = 10_000
        shards = self.hf_dataset.num_rows // self.write_batch_size + 1
        for i in range(shards):
            shard = self.hf_dataset[
                i * self.write_batch_size : (i + 1) * self.write_batch_size
            ]
            encoded_shard = self.tokenizer(
                shard["text"], truncation=False, padding=False
            )
            for encoded in encoded_shard["input_ids"]:
                iids = encoded  # ['input_ids']
                buffer = buffer + self.bos_tokens + iids + self.eos_tokens
                while len(buffer) >= self.max_length:
                    concat_sample = buffer[: self.max_length]
                    buffer = buffer[self.max_length :] if self.should_wrap else []
                    yield {
                        # convert to bytes to store in MDS binary format
                        "tokens": np.asarray(concat_sample).tobytes(),
                        "num_tokens": len(concat_sample),
                    }

Processing 7B tokens takes around 20 hours with the original code and 30 min with this change. It's not very robust though and doesn't scale very well: a fast tokenizer hangs after a while with very long text and more than 16 threads seem not to give you any speedup.

Riccorl avatar Feb 13 '24 13:02 Riccorl

I changed ConcatTokensDataset.__iter__ to this:

def __iter__(self) -> Iterable[Dict[str, bytes]]:

        buffer = []
        # self.write_batch_size = 10_000
        shards = self.hf_dataset.num_rows // self.write_batch_size + 1
        for i in range(shards):
            shard = self.hf_dataset[
                i * self.write_batch_size : (i + 1) * self.write_batch_size
            ]
            encoded_shard = self.tokenizer(
                shard["text"], truncation=False, padding=False
            )
            for encoded in encoded_shard["input_ids"]:
                iids = encoded  # ['input_ids']
                buffer = buffer + self.bos_tokens + iids + self.eos_tokens
                while len(buffer) >= self.max_length:
                    concat_sample = buffer[: self.max_length]
                    buffer = buffer[self.max_length :] if self.should_wrap else []
                    yield {
                        # convert to bytes to store in MDS binary format
                        "tokens": np.asarray(concat_sample).tobytes(),
                        "num_tokens": len(concat_sample),
                    }

Processing 7B tokens takes around 20 hours with the original code and 30 min with this change. It's not very robust though and doesn't scale very well: a fast tokenizer hangs after a while with very long text and more than 16 threads seem not to give you any speedup.

Thanks for your update! Do you modify other files to enable multithread?

YixinSong-e avatar Feb 18 '24 14:02 YixinSong-e

Thanks for your update! Do you modify other files to enable multithreaded?

Yes sorry, I also removed os.environ["TOKENIZERS_PARALLELISM"] = "false" from ConcatTokensDataset.__init__.

Riccorl avatar Feb 19 '24 09:02 Riccorl

Thanks for your update! Do you modify other files to enable multithreaded?

Yes sorry, I also removed os.environ["TOKENIZERS_PARALLELISM"] = "false" from ConcatTokensDataset.__init__.

It helps a lot. I can process 100B tokens within in 7 hours with your code! :)

YixinSong-e avatar Feb 19 '24 13:02 YixinSong-e