ray icon indicating copy to clipboard operation
ray copied to clipboard

skip_empty_json_files

Open venkatram-dev opened this issue 1 year ago • 1 comments

Why are these changes needed?

https://github.com/ray-project/ray/issues/47198 Skip empty files and do not raise json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Related issue number

https://github.com/ray-project/ray/issues/47198

Checks

  • [x] I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • [x] I've run scripts/format.sh to lint the changes in this PR.
  • [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
    • [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in doc/source/tune/api/ under the corresponding .rst file.
  • [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • [ ] Unit tests
    • [ ] Release tests
    • [ ] This PR is not tested :(

venkatram-dev avatar Aug 28 '24 05:08 venkatram-dev

Tested the changes in local

Before the change

python t.py 
2024-08-27 22:00:47,024	INFO worker.py:1783 -- Started a local Ray instance.
Using files: ['/Users/vr/ray_test_venv/sample_data/file2.json.gz', '/Users/vr/ray_test_venv/sample_data/empty_file.json.gz', '/Users/vr/ray_test_venv/sample_data/file1.json.gz']
2024-08-27 22:00:48,306	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-27_22-00-46_401777_16215/logs/ray-data
2024-08-27 22:00:48,306	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON]
[dataset]: Run `pip install tqdm` to enable progress reporting.
Schema of the dataset: Column  Type
------  ----
col1    string
col2    string
col3    string
2024-08-27 22:00:48,538	INFO dataset.py:2409 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-08-27 22:00:48,542	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-27_22-00-46_401777_16215/logs/ray-data
2024-08-27 22:00:48,542	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> LimitOperator[limit=5]
(ReadJSON->SplitBlocks(6) pid=16228) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=16228) Empty JSON file
2024-08-27 22:00:48,606	ERROR worker.py:409 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadJSON->SplitBlocks(6)() (pid=16228, ip=127.0.0.1)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 451, in __call__
    for block in blocks:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 392, in __call__
    for data in iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 253, in __call__
    yield from self._block_fn(input, ctx)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py", line 103, in do_read
    yield from call_with_retry(
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/datasource.py", line 168, in __call__
    yield from result
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 254, in read_task_fn
    yield from read_files(read_paths)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 220, in read_files
    for block in read_stream(f, read_path):
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 139, in _read_stream
    yield from self._read_with_python_json(buffer)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 108, in _read_with_python_json
    parsed_json = json.load(BytesIO(buffer))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py", line 293, in load
    return loads(fp.read(),
           ^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
2024-08-27 22:00:48,820	ERROR streaming_executor_state.py:469 -- An exception was raised from a task of operator "ReadJSON->SplitBlocks(6)". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.
Traceback (most recent call last):
  File "/Users/vr/ray_test_venv/t.py", line 38, in <module>
    print("First few records:\n", ds.take(5))
                                  ^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/dataset.py", line 2416, in take
    for row in limited_ds.iter_rows():
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/iterator.py", line 238, in _wrapped_iterator
    for batch in batch_iterable:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/iterator.py", line 178, in _create_iterator
    for batch in iterator:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 178, in iter_batches
    next_batch = next(async_batch_iter)
                 ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 932, in make_async_gen
    raise next_item
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 909, in execute_computation
    for item in fn(thread_safe_generator):
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 167, in _async_iter_batches
    yield from extract_data_from_batch(batch_iter)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/util.py", line 211, in extract_data_from_batch
    for batch in batch_iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 313, in restore_original_order
    for batch in batch_iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 932, in make_async_gen
    raise next_item
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 909, in execute_computation
    for item in fn(thread_safe_generator):
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 220, in threadpool_computations_format_collate
    yield from formatted_batch_iter
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/util.py", line 159, in format_batches
    for batch in block_iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 889, in __next__
    return next(self.it)
           ^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/util.py", line 118, in blocks_to_batches
    for block in block_iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/util.py", line 55, in resolve_block_refs
    for block_ref in block_ref_iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 288, in prefetch_batches_locally
    next_ref_bundle = next(ref_bundles)
                      ^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 889, in __next__
    return next(self.it)
           ^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
           ^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py", line 76, in get_next
    bundle = self._base_iterator.get_next(output_split_idx)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 153, in get_next
    item = self._outer._output_node.get_output_blocking(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 296, in get_output_blocking
    raise self._exception
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 232, in run
    continue_sched = self._scheduling_loop_step(self._topology)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 287, in _scheduling_loop_step
    num_errored_blocks = process_completed_tasks(
                         ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 470, in process_completed_tasks
    raise e from None
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 437, in process_completed_tasks
    bytes_read = task.on_data_ready(
                 ^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 105, in on_data_ready
    raise ex from None
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 101, in on_data_ready
    ray.get(block_ref)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/_private/worker.py", line 2661, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/_private/worker.py", line 871, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(JSONDecodeError): ray::ReadJSON->SplitBlocks(6)() (pid=16225, ip=127.0.0.1)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 451, in __call__
    for block in blocks:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 392, in __call__
    for data in iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 253, in __call__
    yield from self._block_fn(input, ctx)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py", line 103, in do_read
    yield from call_with_retry(
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/datasource.py", line 168, in __call__
    yield from result
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 254, in read_task_fn
    yield from read_files(read_paths)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 220, in read_files
    for block in read_stream(f, read_path):
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 139, in _read_stream
    yield from self._read_with_python_json(buffer)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 108, in _read_with_python_json
    parsed_json = json.load(BytesIO(buffer))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py", line 293, in load
    return loads(fp.read(),
           ^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
(ReadJSON->SplitBlocks(6) pid=16225) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=16225) Empty JSON file


After the change

python t.py
2024-08-27 22:10:03,861	INFO worker.py:1783 -- Started a local Ray instance.
Using files: ['/Users/vr/ray_test_venv/sample_data/file2.json.gz', '/Users/vr/ray_test_venv/sample_data/empty_file.json.gz', '/Users/vr/ray_test_venv/sample_data/file1.json.gz']
2024-08-27 22:10:05,156	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-27_22-10-03_240198_18060/logs/ray-data
2024-08-27 22:10:05,156	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON]
[dataset]: Run `pip install tqdm` to enable progress reporting.
Schema of the dataset: Column  Type
------  ----
col1    string
col2    string
col3    string
2024-08-27 22:10:05,387	INFO dataset.py:2409 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-08-27 22:10:05,391	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-27_22-10-03_240198_18060/logs/ray-data
2024-08-27 22:10:05,391	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> LimitOperator[limit=5]
(ReadJSON->SplitBlocks(6) pid=18074) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=18074) Empty JSON file
First few records:
 [{'col1': 'value1', 'col2': 'value2', 'col3': 'value3'}, {'col1': 'value4', 'col2': 'value5', 'col3': 'value6'}, {'col1': 'value1', 'col2': 'value2', 'col3': 'value3'}, {'col1': 'value4', 'col2': 'value5', 'col3': 'value6'}]
(ReadJSON->SplitBlocks(6) pid=18073) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=18073) Empty JSON file


venkatram-dev avatar Aug 28 '24 05:08 venkatram-dev

@scottjlee , Please review.

venkatram-dev avatar Aug 29 '24 01:08 venkatram-dev

could you also add a unit test in test_json.py to test the change in this PR? you can follow the logic in the reproducible example from the original issue. thanks!

@scottjlee , Added Unit test to read from a file path containing both empty file and non empty file. Please check.

venkatram-dev avatar Aug 30 '24 00:08 venkatram-dev