[WIP][Data] Add read_iceberg API to support scalable iceberg scans to ray datasets
Why are these changes needed?
This PR adds a new API read_iceberg table in ray.data. It leverage pyiceberg to provide a list of parquet data files and ray.data's Parquet Datasource to load the data.
You can find more related discussion on Iceberg & Ray integration in iceberg slack channel
Related issue number
Checks
- [x] I've signed off every commit(by using the -s flag, i.e.,
git commit -s) in this PR. - [ ] I've run
scripts/format.shto lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
doc/source/tune/api/under the corresponding.rstfile.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
- [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
- [ ] Unit tests
- [ ] Release tests
- [ ] This PR is not tested :(
Unfortunately, since pyiceberg does not support write yet, we may not be able to have some meaningful tests on read_iceberg.
However, it is still possible to utilized external docker containers to create integration test environment outside ray-project repo. I setup one in https://github.com/JonasJ-ap/icebergray, which contains a copy of read_iceberg and integration tests.
Thank you @JonasJ-ap. This is great and provides a basic integration. We used the same ideas to do our internal integration. We basically use table scan to list the underlying files and either submit the list to the dataset read or use these files in our custom code.
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.
- If you'd like to keep this open, just leave any comment, and the stale label will be removed.
Just tested this on existing data in Iceberg format which was ingested from Spark. Looks like the partition related code is causing some issue. Removing that works fine.
(_execute_read_task_split pid=3522) table = table.set_column(
(_execute_read_task_split pid=3522) File "pyarrow/table.pxi", line 4503, in pyarrow.lib.Table.set_column
(_execute_read_task_split pid=3522) File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(_execute_read_task_split pid=3522) File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
(_execute_read_task_split pid=3522) pyarrow.lib.ArrowInvalid: Invalid column index to set field.
2023-06-15 23:13:00,956 ERROR worker.py:408 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_execute_read_task_split() (pid=3522, ip=192.168.22.174)
File "/home/jovyan/.local/lib/python3.9/site-packages/ray/data/_internal/lazy_block_list.py", line 689, in _execute_read_task_split
for block in blocks:
File "/home/jovyan/.local/lib/python3.9/site-packages/ray/data/datasource/datasource.py", line 216, in __call__
for block in result:
File "/home/jovyan/.local/lib/python3.9/site-packages/ray/data/datasource/parquet_datasource.py", line 396, in _read_pieces
table = table.set_column(
File "pyarrow/table.pxi", line 4503, in pyarrow.lib.Table.set_column
File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Invalid column index to set field.
I believe this happens when the partition column(s) are not among the queried columns.
Thanks @blublinsky and @rakeshJn for reviewing this.
Based on the discussion in https://github.com/ray-project/deltacat/issues/116#issuecomment-1608478951, we will first contribute this basic reader to deltacat and then contribute back to ray when more integration tests are ready. Please check this PR: https://github.com/ray-project/deltacat/pull/131 (including the change suggested by @rakeshJn).
I will close this PR for now