ray
ray copied to clipboard
[WIP] Prototype splitting blocks to ensure requested read parallelism is always met
Why are these changes needed?
Today, the number of initial blocks of a dataset is limited to the number of input files of the datasource, regardless of the requested parallelism. This is problematic as it means to increase the number of blocks requires a repartition()
call, which is not always practical in the streaming setting.
This PR inserts a streaming SplitBlocks operator that is fused with read tasks in this case to allow for arbitrarily high requested parallelism (up to number of individual records) without needing a blocking repartition.
Before:
ray.data.read_parquet([list, of, 100, parquet, files], parallelism=2000)
# -> num_blocks = 100
After:
ray.data.read_parquet([list, of, 100, parquet, files], parallelism=2000)
# -> num_blocks = 2000
Limitations:
- Until https://github.com/ray-project/ray/pull/36071 merges and is integrated with Ray Data, downstream operators of the read may still block until the entire file is read, even if the read would produce multiple blocks.
- The SplitBlocks operator cannot be fused with downstream Map stages, since it is changing the physical partitioning of the stream. If we fused it, then the parallelism increase would not be realized as we could not split the read output to multiple processes.
Related issue number
Closes https://github.com/ray-project/ray/issues/31501
Checks
- [ ] I've signed off every commit(by using the -s flag, i.e.,
git commit -s
) in this PR. - [ ] I've run
scripts/format.sh
to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
doc/source/tune/api/
under the corresponding.rst
file.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
- [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
- [ ] Unit tests
- [ ] Release tests
- [ ] This PR is not tested :( ](https://github.com/ray-project/ray/issues/31501)
Closes https://github.com/ray-project/ray/issues/31501
This is ready for review.
Updated.
Merging so we can test in master. Let's discuss the future of split blocks as an operator separately.