Megatron-DeepSpeed icon indicating copy to clipboard operation
Megatron-DeepSpeed copied to clipboard

WIP: distributed terashuf

Open adammoody opened this issue 2 years ago • 0 comments

This is solid enough that I'll go ahead and post a WIP PR. It's based on https://github.com/bigscience-workshop/Megatron-DeepSpeed/pull/60, so this will look noisy until that PR is merged. Most of the changes are the distshuf file, for which I've got a link below.

I don't know how much time I will have to polish this up, but I have a prototype MPI-enabled "terashuf". This uses an algorithm similar to terashuf, where contiguous records from the source file are shuffled in segments, and then one randomly picks leading records from the shuffled segments to put together the final shuffled file.

This prototype currently stores the shuffled segments in memory (rather than files), and so it requires one to be able to load the full file into distributed memory. Currently each rank reads a portion of the source file into memory, shuffles that section, and then ranks exchange lines with each other in order to write out the file in contiguous chunks.

It can shuffle the oscar.jsonl file in about 10 minutes using 80 procs on 8 nodes on my system.

2021-09-01T12:19:15: 0: Wrote 1319979521428 of 1320971843503 bytes (99.92%) in 343 secs, 3668.675 MB/s
2021-09-01T12:19:20: 0: Waiting for ranks to finish ...
2021-09-01T12:19:20: Seconds to write file: 348.45524168014526

real	6m25.041s

Just posting this notice in case others need to shuffle a large JSON file in a hurry.

https://github.com/adammoody/Megatron-DeepSpeed/blob/distshuf/tools/distshuf.py

It currently requires mpi4py and an mpi4py enabled DistData class.

https://github.com/adammoody/Megatron-DeepSpeed/blob/distshuf/megatron/data/distdata_mpi.py

I first attempted a torch.distributed version, but hit some problems. I haven't yet gone back to see if a torch.dist equivalent is easy.

For speed and correctness, both the input and output files must be on a parallel file system like Lustre/GPFS.

Example command:

srun -n 80 -N 8 python3 tools/distshuf.py \
       --input /gpfs/path/to/oscar.jsonl \
       --output /gpfs/path/to/oscarshuf.jsonl \
       --seed 101

Update 2021-09-02: Took a pass at using numpy to optimize performance a bit more. The tool currently prints a timing breakdown of its major operations as it goes, and the current number of seconds for each step in one phase picked at random looks like:

2021-09-02T13:05:07: Wrote 972550634051 of 1320971843503 bytes (73.62%) in 156 secs, 5920.115 MB/s
2021-09-02T13:05:11:   bcast 0.5990447998046875
2021-09-02T13:05:11:   ident 0.46723270416259766
2021-09-02T13:05:11:   pack  1.1431987285614014
2021-09-02T13:05:11:   exch  6.822070360183716
2021-09-02T13:05:11:   write 3.847602367401123

In each step in this particular run, each rank gathers 100_000 samples which are each about 5000 bytes on average. This is using 320 procs on 8 nodes. So the total data being processed in each step is about 100_000 * 5000 * 320 = 149 GiB. The data movement portions are pack, exchange, and write. Converting those component times in seconds to bandwidths:

2021-09-02T13:05:11:   pack  1.1431987285614014  = 130 GiB/s
2021-09-02T13:05:11:   exch  6.822070360183716 = 21.8 GiB/s
2021-09-02T13:05:11:   write 3.847602367401123 = 38.8 GiB/s
total effective write bandwidth: 149 GB / (1.14 + 6.82 + 3.85) sec = 12.6 GiB/s

Based on system hardware speeds, there should be room for improvement in all of those (pack would be bottlenecked by memory bandwidth, exchange by network bandwidth, and write by file system write bandwidth). That might be worth doing for larger input files, but I'm pretty content with the current performance.

adammoody avatar Sep 02 '21 20:09 adammoody