ray
ray copied to clipboard
skip_empty_json_files
Why are these changes needed?
https://github.com/ray-project/ray/issues/47198 Skip empty files and do not raise json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Related issue number
https://github.com/ray-project/ray/issues/47198
Checks
- [x] I've signed off every commit(by using the -s flag, i.e.,
git commit -s) in this PR. - [x] I've run
scripts/format.shto lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
doc/source/tune/api/under the corresponding.rstfile.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
- [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
- [ ] Unit tests
- [ ] Release tests
- [ ] This PR is not tested :(
Tested the changes in local
Before the change
python t.py
2024-08-27 22:00:47,024 INFO worker.py:1783 -- Started a local Ray instance.
Using files: ['/Users/vr/ray_test_venv/sample_data/file2.json.gz', '/Users/vr/ray_test_venv/sample_data/empty_file.json.gz', '/Users/vr/ray_test_venv/sample_data/file1.json.gz']
2024-08-27 22:00:48,306 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-27_22-00-46_401777_16215/logs/ray-data
2024-08-27 22:00:48,306 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON]
[dataset]: Run `pip install tqdm` to enable progress reporting.
Schema of the dataset: Column Type
------ ----
col1 string
col2 string
col3 string
2024-08-27 22:00:48,538 INFO dataset.py:2409 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-08-27 22:00:48,542 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-27_22-00-46_401777_16215/logs/ray-data
2024-08-27 22:00:48,542 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> LimitOperator[limit=5]
(ReadJSON->SplitBlocks(6) pid=16228) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=16228) Empty JSON file
2024-08-27 22:00:48,606 ERROR worker.py:409 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadJSON->SplitBlocks(6)() (pid=16228, ip=127.0.0.1)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 451, in __call__
for block in blocks:
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 392, in __call__
for data in iter:
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 253, in __call__
yield from self._block_fn(input, ctx)
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py", line 103, in do_read
yield from call_with_retry(
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/datasource.py", line 168, in __call__
yield from result
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 254, in read_task_fn
yield from read_files(read_paths)
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 220, in read_files
for block in read_stream(f, read_path):
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 139, in _read_stream
yield from self._read_with_python_json(buffer)
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 108, in _read_with_python_json
parsed_json = json.load(BytesIO(buffer))
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py", line 293, in load
return loads(fp.read(),
^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
2024-08-27 22:00:48,820 ERROR streaming_executor_state.py:469 -- An exception was raised from a task of operator "ReadJSON->SplitBlocks(6)". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.
Traceback (most recent call last):
File "/Users/vr/ray_test_venv/t.py", line 38, in <module>
print("First few records:\n", ds.take(5))
^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/dataset.py", line 2416, in take
for row in limited_ds.iter_rows():
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/iterator.py", line 238, in _wrapped_iterator
for batch in batch_iterable:
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/iterator.py", line 178, in _create_iterator
for batch in iterator:
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 178, in iter_batches
next_batch = next(async_batch_iter)
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 932, in make_async_gen
raise next_item
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 909, in execute_computation
for item in fn(thread_safe_generator):
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 167, in _async_iter_batches
yield from extract_data_from_batch(batch_iter)
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/util.py", line 211, in extract_data_from_batch
for batch in batch_iter:
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 313, in restore_original_order
for batch in batch_iter:
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 932, in make_async_gen
raise next_item
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 909, in execute_computation
for item in fn(thread_safe_generator):
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 220, in threadpool_computations_format_collate
yield from formatted_batch_iter
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/util.py", line 159, in format_batches
for batch in block_iter:
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 889, in __next__
return next(self.it)
^^^^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/util.py", line 118, in blocks_to_batches
for block in block_iter:
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/util.py", line 55, in resolve_block_refs
for block_ref in block_ref_iter:
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 288, in prefetch_batches_locally
next_ref_bundle = next(ref_bundles)
^^^^^^^^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 889, in __next__
return next(self.it)
^^^^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
return self.get_next()
^^^^^^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py", line 76, in get_next
bundle = self._base_iterator.get_next(output_split_idx)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 153, in get_next
item = self._outer._output_node.get_output_blocking(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 296, in get_output_blocking
raise self._exception
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 232, in run
continue_sched = self._scheduling_loop_step(self._topology)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 287, in _scheduling_loop_step
num_errored_blocks = process_completed_tasks(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 470, in process_completed_tasks
raise e from None
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 437, in process_completed_tasks
bytes_read = task.on_data_ready(
^^^^^^^^^^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 105, in on_data_ready
raise ex from None
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 101, in on_data_ready
ray.get(block_ref)
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/_private/worker.py", line 2661, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/_private/worker.py", line 871, in get_objects
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(JSONDecodeError): ray::ReadJSON->SplitBlocks(6)() (pid=16225, ip=127.0.0.1)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 451, in __call__
for block in blocks:
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 392, in __call__
for data in iter:
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 253, in __call__
yield from self._block_fn(input, ctx)
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py", line 103, in do_read
yield from call_with_retry(
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/datasource.py", line 168, in __call__
yield from result
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 254, in read_task_fn
yield from read_files(read_paths)
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 220, in read_files
for block in read_stream(f, read_path):
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 139, in _read_stream
yield from self._read_with_python_json(buffer)
File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 108, in _read_with_python_json
parsed_json = json.load(BytesIO(buffer))
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py", line 293, in load
return loads(fp.read(),
^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
(ReadJSON->SplitBlocks(6) pid=16225) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=16225) Empty JSON file
After the change
python t.py
2024-08-27 22:10:03,861 INFO worker.py:1783 -- Started a local Ray instance.
Using files: ['/Users/vr/ray_test_venv/sample_data/file2.json.gz', '/Users/vr/ray_test_venv/sample_data/empty_file.json.gz', '/Users/vr/ray_test_venv/sample_data/file1.json.gz']
2024-08-27 22:10:05,156 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-27_22-10-03_240198_18060/logs/ray-data
2024-08-27 22:10:05,156 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON]
[dataset]: Run `pip install tqdm` to enable progress reporting.
Schema of the dataset: Column Type
------ ----
col1 string
col2 string
col3 string
2024-08-27 22:10:05,387 INFO dataset.py:2409 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-08-27 22:10:05,391 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-27_22-10-03_240198_18060/logs/ray-data
2024-08-27 22:10:05,391 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> LimitOperator[limit=5]
(ReadJSON->SplitBlocks(6) pid=18074) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=18074) Empty JSON file
First few records:
[{'col1': 'value1', 'col2': 'value2', 'col3': 'value3'}, {'col1': 'value4', 'col2': 'value5', 'col3': 'value6'}, {'col1': 'value1', 'col2': 'value2', 'col3': 'value3'}, {'col1': 'value4', 'col2': 'value5', 'col3': 'value6'}]
(ReadJSON->SplitBlocks(6) pid=18073) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=18073) Empty JSON file
@scottjlee , Please review.
could you also add a unit test in
test_json.pyto test the change in this PR? you can follow the logic in the reproducible example from the original issue. thanks!
@scottjlee , Added Unit test to read from a file path containing both empty file and non empty file. Please check.