ray icon indicating copy to clipboard operation
ray copied to clipboard

[WIP][Data] Add read_iceberg API to support scalable iceberg scans to ray datasets

Open JonasJ-ap opened this issue 2 years ago • 3 comments

Why are these changes needed?

This PR adds a new API read_iceberg table in ray.data. It leverage pyiceberg to provide a list of parquet data files and ray.data's Parquet Datasource to load the data.

You can find more related discussion on Iceberg & Ray integration in iceberg slack channel

Related issue number

Checks

  • [x] I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • [ ] I've run scripts/format.sh to lint the changes in this PR.
  • [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
    • [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in doc/source/tune/api/ under the corresponding .rst file.
  • [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • [ ] Unit tests
    • [ ] Release tests
    • [ ] This PR is not tested :(

JonasJ-ap avatar Apr 10 '23 23:04 JonasJ-ap

Unfortunately, since pyiceberg does not support write yet, we may not be able to have some meaningful tests on read_iceberg.

However, it is still possible to utilized external docker containers to create integration test environment outside ray-project repo. I setup one in https://github.com/JonasJ-ap/icebergray, which contains a copy of read_iceberg and integration tests.

JonasJ-ap avatar Apr 26 '23 16:04 JonasJ-ap

Thank you @JonasJ-ap. This is great and provides a basic integration. We used the same ideas to do our internal integration. We basically use table scan to list the underlying files and either submit the list to the dataset read or use these files in our custom code.

blublinsky avatar May 15 '23 13:05 blublinsky

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.

  • If you'd like to keep this open, just leave any comment, and the stale label will be removed.

stale[bot] avatar Jun 15 '23 00:06 stale[bot]

Just tested this on existing data in Iceberg format which was ingested from Spark. Looks like the partition related code is causing some issue. Removing that works fine.


(_execute_read_task_split pid=3522)     table = table.set_column(
(_execute_read_task_split pid=3522)   File "pyarrow/table.pxi", line 4503, in pyarrow.lib.Table.set_column
(_execute_read_task_split pid=3522)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(_execute_read_task_split pid=3522)   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
(_execute_read_task_split pid=3522) pyarrow.lib.ArrowInvalid: Invalid column index to set field.
2023-06-15 23:13:00,956	ERROR worker.py:408 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_execute_read_task_split() (pid=3522, ip=192.168.22.174)
  File "/home/jovyan/.local/lib/python3.9/site-packages/ray/data/_internal/lazy_block_list.py", line 689, in _execute_read_task_split
    for block in blocks:
  File "/home/jovyan/.local/lib/python3.9/site-packages/ray/data/datasource/datasource.py", line 216, in __call__
    for block in result:
  File "/home/jovyan/.local/lib/python3.9/site-packages/ray/data/datasource/parquet_datasource.py", line 396, in _read_pieces
    table = table.set_column(
  File "pyarrow/table.pxi", line 4503, in pyarrow.lib.Table.set_column
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Invalid column index to set field.

rakeshJn avatar Jun 15 '23 23:06 rakeshJn

I believe this happens when the partition column(s) are not among the queried columns.

rakeshJn avatar Jun 15 '23 23:06 rakeshJn

Thanks @blublinsky and @rakeshJn for reviewing this.

Based on the discussion in https://github.com/ray-project/deltacat/issues/116#issuecomment-1608478951, we will first contribute this basic reader to deltacat and then contribute back to ray when more integration tests are ready. Please check this PR: https://github.com/ray-project/deltacat/pull/131 (including the change suggested by @rakeshJn).

I will close this PR for now

JonasJ-ap avatar Jun 30 '23 21:06 JonasJ-ap