datasets
datasets copied to clipboard
Expanding streaming capabilities
Some ideas for a few features that could be useful when working with large datasets in streaming mode.
filter for IterableDataset
Adding filtering to streaming datasets would be useful in several scenarios:
- filter a dataset with many languages for a subset of languages
- filter a dataset for specific licenses
- other custom logic to get a subset The only way to achieve this at the moment is I think through writing a custom loading script and implementing filters there.
IterableDataset to Dataset conversion
In combination with the above filter a functionality to "play" the whole stream would be useful. The motivation is that often one might filter the dataset to get a manageable size for experimentation. In that case streaming mode is no longer necessary as the filtered dataset is small enough and it would be useful to be able to play through the whole stream to create a normal Dataset with all its benefits.
ds = load_dataset("some_large_dataset", streaming=True)
ds_filter = ds.filter(lambda x: x["lang"]="fr")
ds_filter = ds_filter.stream() # here the `IterableDataset` is converted to a `Dataset`
Naturally, this could be expanded with stream(n=1000) which creates a Dataset with the first n elements similar to take.
Stream to the Hub
While streaming allows to use a dataset as is without saving the whole dataset on the local machine it is currently not possible to process a dataset and add it to the hub. The only way to do this is by downloading the full dataset and saving the processed dataset again before pushing them to the hub. The API could looks something like:
ds = load_dataset("some_large_dataset", streaming=True)
ds_filter = ds.filter(some_filter_func)
ds_processed = ds_filter.map(some_processing_func)
ds_processed.push_to_hub("new_better_dataset", batch_size=100_000)
Under the hood this could be done by processing and aggregating batch_size elements and then pushing that batch as a single file to the hub. With this functionality one could process and create TB scale datasets while only requiring size of batch_size local disk space.
cc @lhoestq @albertvillanova
Related to: https://github.com/huggingface/datasets/issues/3444
Cool ! filter will be very useful. There can be a filter that you can apply on a streaming dataset:
load_dataset(..., streaming=True).filter(lambda x: x["lang"] == "sw")
Otherwise if you want to apply a filter on the source files that are going to be used for streaming, the logic has to be impIemented directly in the dataset script, or if there's no dataset script this can be done with pattern matching
load_dataset(..., lang="sw") # if the dataset script supports this parameter
load_dataset(..., data_files="data/lang=sw/*") # if there's no dataset script, but only data files
Here are also some additional ideas of API to convert from iterable to map-style dataset:
on_disk_dataset = streaming_dataset.to_disk()
on_disk_dataset = streaming_dataset.to_disk(path="path/to/my/dataset/dir")
in_memory_dataset = streaming_dataset.take(100).to_memory() # to experiment without having to write files
Finally regarding push_to_hub, we can replace batch_size by shard_size (same API as for on-disk datasets). The default is 500MB per file
Let me know what you think !
Regarding conversion, I'd also ask for some kind of equivalent to save_to_disk for an IterableDataset.
Similarly to the streaming to hub idea, my use case would be to define a sequence of dataset transforms via .map(), using an IterableDataset as the input (so processing could start without doing whole download up-front), but streaming the resultant processed dataset just to disk.
That makes sense @athewsey , thanks for the suggestion :)
Maybe instead of the to_disk we could simply have save_to_disk instead:
streaming_dataset.save_to_disk("path/to/my/dataset/dir")
on_disk_dataset = load_from_disk("path/to/my/dataset/dir")
in_memory_dataset = Dataset.from_list(list(streaming_dataset.take(100))) # to experiment without having to write files
Any updates on this?
So far are implemented: IterableDataset.filter() and Dataset.to_iterable_dataset().
Still missing: IterableDataset.push_to_hub() - though there is a hack to write on disk and then push to hub using
ds_on_disk = Dataset.from_generator(streaming_ds.__iter__) # stream to disk
ds_on_disk.push_to_hub(...)