Megatron-DeepSpeed
Megatron-DeepSpeed copied to clipboard
WIP: distributed terashuf
This is solid enough that I'll go ahead and post a WIP PR. It's based on https://github.com/bigscience-workshop/Megatron-DeepSpeed/pull/60, so this will look noisy until that PR is merged. Most of the changes are the distshuf file, for which I've got a link below.
I don't know how much time I will have to polish this up, but I have a prototype MPI-enabled "terashuf". This uses an algorithm similar to terashuf, where contiguous records from the source file are shuffled in segments, and then one randomly picks leading records from the shuffled segments to put together the final shuffled file.
This prototype currently stores the shuffled segments in memory (rather than files), and so it requires one to be able to load the full file into distributed memory. Currently each rank reads a portion of the source file into memory, shuffles that section, and then ranks exchange lines with each other in order to write out the file in contiguous chunks.
It can shuffle the oscar.jsonl file in about 10 minutes using 80 procs on 8 nodes on my system.
2021-09-01T12:19:15: 0: Wrote 1319979521428 of 1320971843503 bytes (99.92%) in 343 secs, 3668.675 MB/s
2021-09-01T12:19:20: 0: Waiting for ranks to finish ...
2021-09-01T12:19:20: Seconds to write file: 348.45524168014526
real 6m25.041s
Just posting this notice in case others need to shuffle a large JSON file in a hurry.
https://github.com/adammoody/Megatron-DeepSpeed/blob/distshuf/tools/distshuf.py
It currently requires mpi4py
and an mpi4py enabled DistData
class.
https://github.com/adammoody/Megatron-DeepSpeed/blob/distshuf/megatron/data/distdata_mpi.py
I first attempted a torch.distributed
version, but hit some problems. I haven't yet gone back to see if a torch.dist
equivalent is easy.
For speed and correctness, both the input and output files must be on a parallel file system like Lustre/GPFS.
Example command:
srun -n 80 -N 8 python3 tools/distshuf.py \
--input /gpfs/path/to/oscar.jsonl \
--output /gpfs/path/to/oscarshuf.jsonl \
--seed 101
Update 2021-09-02: Took a pass at using numpy to optimize performance a bit more. The tool currently prints a timing breakdown of its major operations as it goes, and the current number of seconds for each step in one phase picked at random looks like:
2021-09-02T13:05:07: Wrote 972550634051 of 1320971843503 bytes (73.62%) in 156 secs, 5920.115 MB/s
2021-09-02T13:05:11: bcast 0.5990447998046875
2021-09-02T13:05:11: ident 0.46723270416259766
2021-09-02T13:05:11: pack 1.1431987285614014
2021-09-02T13:05:11: exch 6.822070360183716
2021-09-02T13:05:11: write 3.847602367401123
In each step in this particular run, each rank gathers 100_000 samples which are each about 5000 bytes on average. This is using 320 procs on 8 nodes. So the total data being processed in each step is about 100_000 * 5000 * 320 = 149 GiB
. The data movement portions are pack, exchange, and write. Converting those component times in seconds to bandwidths:
2021-09-02T13:05:11: pack 1.1431987285614014 = 130 GiB/s
2021-09-02T13:05:11: exch 6.822070360183716 = 21.8 GiB/s
2021-09-02T13:05:11: write 3.847602367401123 = 38.8 GiB/s
total effective write bandwidth: 149 GB / (1.14 + 6.82 + 3.85) sec = 12.6 GiB/s
Based on system hardware speeds, there should be room for improvement in all of those (pack would be bottlenecked by memory bandwidth, exchange by network bandwidth, and write by file system write bandwidth). That might be worth doing for larger input files, but I'm pretty content with the current performance.