datasets
datasets copied to clipboard
AutoSharding IterableDataset's when num_workers > 1
Feature request
Minimal Example
import torch
from datasets import IterableDataset
d = IterableDataset.from_file(<file_name>)
dl = torch.utils.data.dataloader.DataLoader(d,num_workers=3)
for sample in dl:
print(sample)
Warning: Too many dataloader workers: 2 (max is dataset.n_shards=1). Stopping 1 dataloader workers. To parallelize data loading, we give each process some shards (or data sources) to process. Therefore it's unnecessary to have a number of workers greater than dataset.n_shards=1. To enable more parallelism, please split the dataset in more files than 1.
Expected Behavior: Dataset is sharded each cpu uses subset (contiguously - so you can do checkpoint loading/saving)
Motivation
I have a lot of unused cpu's and would like to be able to shard iterable datasets with pytorch's dataloader when num_workers > 1. This is for a very large single file. I am aware that we can use the split_dataset_by_node
to ensure that each node (for distributed) gets different shards, but we should extend it so that this also continues for multiple workers.
Your contribution
If someone points me to what needs to change, I can create a PR.
For this to be possible, we would have to switch from the "Streaming" Arrow format to the "Random Access" (IPC/Feather) format, which allows reading arbitrary record batches (explained here). We could then use these batches to construct shards.
@lhoestq @albertvillanova Do you think this use case is worth the switch? Also, we currently shard files, not inner row groups/chunks. Should we also support sharding row groups (e.g. if the number of input files is 1)?
PS: I don't expect significant speed-up for local, uncompressed Arrow files.
Alternatively we could support multiprocessing map for iterable datasets and let the user do the CPU intensive task there ?
This way it would work on arrow data but also on any iterable dataset
For this to be possible, we would have to switch from the "Streaming" Arrow format to the "Random Access" (IPC/Feather) format, which allows reading arbitrary record batches (explained here). We could then use these batches to construct shards.
@lhoestq @albertvillanova Do you think this use case is worth the switch? Also, we currently shard files, not inner row groups/chunks. Should we also support sharding row groups (e.g. if the number of input files is 1)?
PS: I don't expect significant speed-up for local, uncompressed Arrow files.
Could you explain why you'd need to change the arrow format?
When we use streaming datasets we simply determine the number of worker shards and then add some modulo logic at the appropriate place. Worst case scenario, you'd skip streaming entries according to the number of shards.
For PyTorch, I'd be happy to provide an implementation or a sketch thereof, if you point me toward what the testing requirements would be for such a PR.
Could you explain why you'd need to change the arrow format?
This way workers have random access to the location of the file where its dataset subset starts. Currently we're using the Arrow streaming format which doesn't include the metadata of the record batches offsets. This is needed here to efficiently split a dataset made of one single file.
Could you explain why you'd need to change the arrow format?
This way workers have random access to the location of the file where its dataset subset starts. Currently we're using the Arrow streaming format which doesn't include the metadata of the record batches offsets. This is needed here to efficiently split a dataset made of one single file.
I guess I don't understand why you'd need to subset the dataset in the first place. It seems sufficient to figure out how to offset or skip rows.
For instance, using pyArrow, you could use RecordBatchStreamReader to zero-copy iterate over records with read_next_batch and then only initiate the next step for records modulo worker shard. That's one way to do it, where of course you'd need to account for gpu sharding as well.
Otherwise, how did you implement worker/node/GPU sharding for iterable/streaming data where you do not have index information or prior splits (e.g. files)?
For instance, using pyArrow, you could use RecordBatchStreamReader to zero-copy iterate over records with read_next_batch and then only initiate the next step for records modulo worker shard.
That works indeed ! And what we meant is that you can make it even faster to instantiate. Indeed using RecordBatchStreamReader you need to get the list of all the record batches in each worker, whereas you could just get the list of record batches per worker if you use the record batches locations in the Arrow IPC file footer. This would be especially appreciated to have a fast instantiation in case you have tens of thousands of Arrow files for example.
Any recent updates on this ?
I would also appreciate this feature