mars icon indicating copy to clipboard operation
mars copied to clipboard

[BUG] Ray executor run learn shuffle raises KeyError

Open fyrestone opened this issue 3 years ago • 0 comments

Describe the bug A clear and concise description of what the bug is.

There are many cases fail to run in Ray DAG executor. After digging into the problem, I found that there may have some bugs to run mars learn by Ray executor.

mars/learn/ensemble/tests/test_bagging.py:62 (test_bagging_sample_execution[False-10-1.0-False-False])
setup = <mars.deploy.oscar.session.SyncSession object at 0x28d81f370>
use_dataframe = False, max_samples = 10, max_features = 1.0, with_labels = False
with_weights = False

    @pytest.mark.parametrize(
        "use_dataframe, max_samples, max_features, with_labels, with_weights",
        [
            (False, 10, 1.0, False, False),
            (False, 10, 0.5, True, True),
            (True, 10, 1.0, False, False),
            (True, 10, 0.5, True, True),
        ],
    )
    def test_bagging_sample_execution(
        setup, use_dataframe, max_samples, max_features, with_labels, with_weights
    ):
        rs = np.random.RandomState(0)
    
        raw_data = rs.randint(100, size=(100, 50))
        if not use_dataframe:
            t = mt.tensor(raw_data, chunk_size=20)
        else:
            raw_data = pd.DataFrame(raw_data)
            t = md.DataFrame(raw_data, chunk_size=20)
    
        raw_labels = rs.choice([0, 1, 2], size=100)
        raw_weights = rs.random(100)
        labels = mt.tensor(raw_labels, chunk_size=20) if with_labels else None
        weights = mt.tensor(raw_weights, chunk_size=20) if with_weights else None
    
        sample_op = BaggingSample(
            n_estimators=10,
            max_samples=max_samples,
            max_features=max_features,
            random_state=rs,
        )
>       result_tuple = execute(*sample_op(t, labels, weights))

mars/learn/ensemble/tests/test_bagging.py:95: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
mars/deploy/oscar/session.py:1888: in execute
    return session.execute(
mars/deploy/oscar/session.py:1682: in execute
    execution_info: ExecutionInfo = fut.result(
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/_base.py:444: in result
    return self.__get_result()
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
    raise self._exception
mars/deploy/oscar/session.py:1868: in _execute
    await execution_info
../../.pyenv/versions/3.8.13/lib/python3.8/asyncio/tasks.py:695: in _wrap_awaitable
    return (yield from awaitable.__await__())
mars/deploy/oscar/session.py:105: in wait
    return await self._aio_task
mars/deploy/oscar/session.py:953: in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
mars/services/task/supervisor/processor.py:372: in run
    await self._process_stage_chunk_graph(*stage_args)
mars/services/task/supervisor/processor.py:250: in _process_stage_chunk_graph
    chunk_to_result = await self._executor.execute_subtask_graph(
mars/services/task/execution/ray/executor.py:553: in execute_subtask_graph
    meta_list = await asyncio.gather(*output_meta_object_refs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

awaitable = ObjectRef(71b133a11e1c461cffffffffffffffffffffffff0100000001000000)

    @types.coroutine
    def _wrap_awaitable(awaitable):
        """Helper for asyncio.ensure_future().
    
        Wraps awaitable (an object with __await__) into a coroutine
        that will later be wrapped in a Task by ensure_future().
        """
>       return (yield from awaitable.__await__())
E       ray.exceptions.RayTaskError(KeyError): ray::execute_subtask() (pid=46396, ip=127.0.0.1)
E         File "/home/admin/mars/mars/services/task/execution/ray/executor.py", line 187, in execute_subtask
E           execute(context, chunk.op)
E         File "/home/admin/mars/mars/core/operand/core.py", line 491, in execute
E           result = executor(results, op)
E         File "/home/admin/mars/mars/learn/ensemble/_bagging.py", line 641, in execute
E           cls._execute_reduce(ctx, op)
E         File "/home/admin/mars/mars/learn/ensemble/_bagging.py", line 580, in _execute_reduce
E           chunk_data = ctx[input_key, out_data.index][-1]
E       KeyError: ('fa0ee28981ff176e32e51d5de745cca0_0', (3, 0))

../../.pyenv/versions/3.8.13/lib/python3.8/asyncio/tasks.py:695: RayTaskError(KeyError)

The graph is simple:

image

To Reproduce To help us reproducing this bug, please provide information below:

  1. Your Python version
  2. The version of Mars you use
  3. Versions of crucial packages, such as numpy, scipy and pandas
  4. Full stack of the error.
  5. Minimized code to reproduce the error.

Expected behavior A clear and concise description of what you expected to happen.

Additional context Add any other context about the problem here.

fyrestone avatar Sep 19 '22 07:09 fyrestone