models
                                
                                 models copied to clipboard
                                
                                    models copied to clipboard
                            
                            
                            
                        [BUG] Data parallel training freezes due to different number of batches
Bug description
In data parallel training, we start multiple workers with different initialization of the dataloader and train with horovod. After each batch update, the parameters are synced. Merlin dataloader has different number of batches depending on the selected rank. Therefore, some workers finishes the training loop and other workers are still training - this causes horovod to freeze.
import cudf
import os
import merlin.models.tf.dataset as tf_dataloader
import nvtabular as nvt
os.system('mkdir ./test/')
df = cudf.DataFrame({
    'col1': range(0,9000000)
})
df.to_parquet('./test/part_1.parquet')
df = cudf.DataFrame({
    'col1': range(0,10000000)
})
df.to_parquet('./test/part_2.parquet')
df = cudf.DataFrame({
    'col1': range(0,11000000)
})
df.to_parquet('./test/part_3.parquet')
df = cudf.DataFrame({
    'col1': range(0,12000000)
})
df.to_parquet('./test/part_4.parquet')
ds = nvt.Dataset('./test/*.parquet', part_size='100MB')
for i in range(4):
    train_dl = tf_dataloader.BatchedDataset(
        ds,
        batch_size = 1024*16,
        shuffle=True,
        drop_last=True,
        cat_names=['col1'],
        global_size=4,
        global_rank=i,
    )
    print(len(train_dl))
Output:
549
610
671
732
@bschifferer will try to set the seed before the dataloader, and check it out.
@jperez999 I provided an example with Merlin Models: https://github.com/NVIDIA-Merlin/models/pull/778
I add the seed_fn
train_dl = tf_dataloader.BatchedDataset(
    train,
    batch_size = batch_size,
    shuffle=True,
    drop_last=True,
    global_size=2,
    global_rank=hvd.rank(),
    seed_fn=seed_fn
)
print(len(train_dl))
When I check the print statement, I get following split:
[1,0]<stdout>:2
[1,0]<stdout>:0
[1,1]<stdout>:2
[1,1]<stdout>:1
[1,0]<stdout>:322
[1,1]<stdout>:104
@jperez999 @benfred @rjzamora It looks like batching isn't correctly splitting the datasets? Is this particular to multi-gpu or does the problem also occur on single GPU but it just doesn't show an error.
Trying to figure out if this has always been broken or if it's a recent change.
So this is not the correct way to use the merlin dataloader with horovod. This requires a lot more background information. You should never be creating dataloaders in a for loop. When dealing with horovod you should follow the example in the tests in nvtabular https://github.com/NVIDIA-Merlin/NVTabular/blob/main/tests/unit/loader/test_tf_dataloader.py#L537. Notice that to use horovod you need to use the horovodrun subprocess. And you need to ensure you are also using the supplied wrapper as it adds the necessary variables for mpi to run under the hood, located here: https://github.com/NVIDIA-Merlin/NVTabular/blob/main/examples/multi-gpu-movielens/hvd_wrapper.sh.
@bschifferer can you follow up and make sure we're using HV correctly. We'll probably need to find a way to make clear to our customers how to properly set this up, even if it's just giving the links that @jperez999 shared more highlights.
@jperez999 Is there a way to produce equal number of batches so that the workload is balanced across workers? Although nvtabular seems to produce equal-sized batches in tf_trainer.py, the number of batches are different (hence the need for hvd.join() https://github.com/NVIDIA-Merlin/NVTabular/blob/main/examples/multi-gpu-movielens/tf_trainer.py#L142).
for batch, (examples, labels) in enumerate(train_dataset_tf):
    loss_value = training_step(examples, labels, batch == 0)
print(f"There are {batch} batches in worker {hvd.local_rank()}.")
#hvd.join()
[1,1]<stdout>:There are 548 batches in worker 1.
[1,0]<stdout>:There are 670 batches in worker 0.
Without hvd.join(), worker 1 will terminate before worker 0 and horovod will crash. Although it does work with hvd.join(), it doesn't seem ideal to have one worker sit idle while the other one keeps processing remaining batches.
One workaround I have is to repartition the dataset with something like
train = Dataset(output_path / "train" / "*.parquet")
ddf = train.to_ddf().repartition(npartitions=hvd.size())
train = Dataset(ddf, schema=train.schema)
but I'm wondering if there is a better way to do this in the dataloader.
Edit: I ran horovodrun -np 2 sh hvd_wrapper.sh python tf_trainer.py --dir_in $BASE_DIR --batch_size 16384 using tf_trainer.py and hvd_wrapper.sh.
So I just ran this unit test: pytest tests/unit/loader/test_tf_dataloader.py::test_horovod_multigpu And it runs as expected. There are five partitions spread across two workers, so naturally one worker will get more partitions than the other. The dataloader is designed like this. Now what can happens is that, depending on the batch size, you can end up slicing your partition into much smaller pieces. This could mean that one partition could give you 100+ batches and if that worker has one more partition than the other workers... then you will end up with 100 extra batches in that worker.
Remember that the split does not happen based on batches... it happens based on partitions. Those partitions are subsequently broken down into chunks of batch_size. You can try to repartition the dataset, so that it will put out the same number of partitions in each worker. But even then... you would need to guarantee that the partitions are all the same size. This is a gotcha we have known since the creation of the dataloader. because partitions are not merged across files. Lets say you have a dataset that has 2 files and in file one there 225 rows, and your partition size if 50 rows, then that first file we have 5 partitions (even though the last partition is half full). Then file 2 has only 150 rows, then this file will have 3 partitions and you will find yourself in a situation where one worker will get four full partitions and the other will get 3 full one partial. Now you can extrapolate that to a scenario where all files end with half partitions... you see how you can find yourself with non-full partitions littered across your dataset? And the current time dask has no way to fix this issue. it cant continue one partition between files. Another thing to remember is that if you have a dataset with 10 partitions and you repartition to 1000 you may want to also change the parts_per_chunk variable to allow you to create bigger chunks of data.