mars
mars copied to clipboard
[BUG] Ray executor run learn shuffle raises KeyError
Describe the bug A clear and concise description of what the bug is.
There are many cases fail to run in Ray DAG executor. After digging into the problem, I found that there may have some bugs to run mars learn by Ray executor.
mars/learn/ensemble/tests/test_bagging.py:62 (test_bagging_sample_execution[False-10-1.0-False-False])
setup = <mars.deploy.oscar.session.SyncSession object at 0x28d81f370>
use_dataframe = False, max_samples = 10, max_features = 1.0, with_labels = False
with_weights = False
@pytest.mark.parametrize(
"use_dataframe, max_samples, max_features, with_labels, with_weights",
[
(False, 10, 1.0, False, False),
(False, 10, 0.5, True, True),
(True, 10, 1.0, False, False),
(True, 10, 0.5, True, True),
],
)
def test_bagging_sample_execution(
setup, use_dataframe, max_samples, max_features, with_labels, with_weights
):
rs = np.random.RandomState(0)
raw_data = rs.randint(100, size=(100, 50))
if not use_dataframe:
t = mt.tensor(raw_data, chunk_size=20)
else:
raw_data = pd.DataFrame(raw_data)
t = md.DataFrame(raw_data, chunk_size=20)
raw_labels = rs.choice([0, 1, 2], size=100)
raw_weights = rs.random(100)
labels = mt.tensor(raw_labels, chunk_size=20) if with_labels else None
weights = mt.tensor(raw_weights, chunk_size=20) if with_weights else None
sample_op = BaggingSample(
n_estimators=10,
max_samples=max_samples,
max_features=max_features,
random_state=rs,
)
> result_tuple = execute(*sample_op(t, labels, weights))
mars/learn/ensemble/tests/test_bagging.py:95:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
mars/deploy/oscar/session.py:1888: in execute
return session.execute(
mars/deploy/oscar/session.py:1682: in execute
execution_info: ExecutionInfo = fut.result(
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/_base.py:444: in result
return self.__get_result()
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
raise self._exception
mars/deploy/oscar/session.py:1868: in _execute
await execution_info
../../.pyenv/versions/3.8.13/lib/python3.8/asyncio/tasks.py:695: in _wrap_awaitable
return (yield from awaitable.__await__())
mars/deploy/oscar/session.py:105: in wait
return await self._aio_task
mars/deploy/oscar/session.py:953: in _run_in_background
raise task_result.error.with_traceback(task_result.traceback)
mars/services/task/supervisor/processor.py:372: in run
await self._process_stage_chunk_graph(*stage_args)
mars/services/task/supervisor/processor.py:250: in _process_stage_chunk_graph
chunk_to_result = await self._executor.execute_subtask_graph(
mars/services/task/execution/ray/executor.py:553: in execute_subtask_graph
meta_list = await asyncio.gather(*output_meta_object_refs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
awaitable = ObjectRef(71b133a11e1c461cffffffffffffffffffffffff0100000001000000)
@types.coroutine
def _wrap_awaitable(awaitable):
"""Helper for asyncio.ensure_future().
Wraps awaitable (an object with __await__) into a coroutine
that will later be wrapped in a Task by ensure_future().
"""
> return (yield from awaitable.__await__())
E ray.exceptions.RayTaskError(KeyError): ray::execute_subtask() (pid=46396, ip=127.0.0.1)
E File "/home/admin/mars/mars/services/task/execution/ray/executor.py", line 187, in execute_subtask
E execute(context, chunk.op)
E File "/home/admin/mars/mars/core/operand/core.py", line 491, in execute
E result = executor(results, op)
E File "/home/admin/mars/mars/learn/ensemble/_bagging.py", line 641, in execute
E cls._execute_reduce(ctx, op)
E File "/home/admin/mars/mars/learn/ensemble/_bagging.py", line 580, in _execute_reduce
E chunk_data = ctx[input_key, out_data.index][-1]
E KeyError: ('fa0ee28981ff176e32e51d5de745cca0_0', (3, 0))
../../.pyenv/versions/3.8.13/lib/python3.8/asyncio/tasks.py:695: RayTaskError(KeyError)
The graph is simple:

To Reproduce To help us reproducing this bug, please provide information below:
- Your Python version
- The version of Mars you use
- Versions of crucial packages, such as numpy, scipy and pandas
- Full stack of the error.
- Minimized code to reproduce the error.
Expected behavior A clear and concise description of what you expected to happen.
Additional context Add any other context about the problem here.