mars
mars copied to clipboard
[BUG] unique operand failed on ray shuffle with AssertionError
Describe the bug
Mars unique operand failed on ray shuffle with AssertionError on check assert len(subtask_chunk_graph.result_chunks) == 1
:
def _get_subtask_out_info(
subtask_chunk_graph: ChunkGraph, is_mapper: bool, n_reducers: int = None
):
# output_keys might be duplicate in chunk graph, use dict to deduplicate.
# output_keys order should be consistent with remote `execute_subtask`,
# dict can preserve insert order.
output_keys = {}
for chunk in subtask_chunk_graph.result_chunks:
if isinstance(
chunk.op, VirtualOperand
): # FIXME(chaokunyang) no need to check this?
continue
elif is_mapper:
assert (
len(subtask_chunk_graph.result_chunks) == 1
), subtask_chunk_graph.result_chunks
assert n_reducers is not None
return set(), n_reducers
else:
output_keys[chunk.key] = 1
return output_keys.keys(), len(output_keys)
For unique operands, reducer may be a mapper too and has outputs which isn' mapper blocks. Ray task-absed shuffle should support this pattern.
To Reproduce To help us reproducing this bug, please provide information below:
- Your Python version: 3.8
- The version of Mars you use: master
- Versions of crucial packages, such as numpy, scipy and pandas
- Full stack of the error.
FAILED mars/tensor/base/tests/test_base_execution.py::test_unique_execution - AssertionError: [Chunk <op=TensorUnique, stage=reduce, key=f0628e398249f4071de49f3bc018f471_0>, Chunk <o...
============================================================================= 1 failed, 1 warning in 20.37s =============================================================================
2022-08-11 20:01:26,022 ERROR (unknown file):0 -- Task exception was never retrieved
future: <Task finished name='Task-336' coro=<_wrap_awaitable() done, defined at /Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError([Chunk <op=TensorUnique, stage=reduce, key=f0628e398249f4071de49f3bc018f471_0>, Chunk <op=TensorUnique, stage=reduce, key=f0628e398249f4071de49f3bc018f471_1>])>
Traceback (most recent call last):
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/_pytest/runner.py", line 338, in from_call
result: Optional[TResult] = func()
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/_pytest/runner.py", line 259, in <lambda>
lambda: ihook(item=item, **kwds), when=when, reraise=reraise
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_hooks.py", line 265, in __call__
return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_manager.py", line 80, in _hookexec
return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_callers.py", line 60, in _multicall
return outcome.get_result()
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_result.py", line 60, in get_result
raise ex[1].with_traceback(ex[2])
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_callers.py", line 39, in _multicall
res = hook_impl.function(*args)
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/_pytest/runner.py", line 174, in pytest_runtest_call
raise e
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/_pytest/runner.py", line 166, in pytest_runtest_call
item.runtest()
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/_pytest/python.py", line 1761, in runtest
self.ihook.pytest_pyfunc_call(pyfuncitem=self)
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_hooks.py", line 265, in __call__
return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_manager.py", line 80, in _hookexec
return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_callers.py", line 60, in _multicall
return outcome.get_result()
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_result.py", line 60, in get_result
raise ex[1].with_traceback(ex[2])
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_callers.py", line 39, in _multicall
res = hook_impl.function(*args)
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/_pytest/python.py", line 192, in pytest_pyfunc_call
result = testfunction(**testargs)
File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/tensor/base/tests/test_base_execution.py", line 1013, in test_unique_execution
res = fetch(*execute(y, inverse))
File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/deploy/oscar/session.py", line 1890, in execute
return session.execute(
File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/deploy/oscar/session.py", line 1684, in execute
execution_info: ExecutionInfo = fut.result(
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/deploy/oscar/session.py", line 1870, in _execute
await execution_info
File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/deploy/oscar/session.py", line 105, in wait
return await self._aio_task
File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/deploy/oscar/session.py", line 953, in _run_in_background
raise task_result.error.with_traceback(task_result.traceback)
File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/services/task/supervisor/processor.py", line 369, in run
await self._process_stage_chunk_graph(*stage_args)
File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/services/task/supervisor/processor.py", line 247, in _process_stage_chunk_graph
chunk_to_result = await self._executor.execute_subtask_graph(
File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/services/task/execution/ray/executor.py", line 505, in execute_subtask_graph
output_keys, out_count = _get_subtask_out_info(
File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/services/task/execution/ray/executor.py", line 258, in _get_subtask_out_info
assert (
AssertionError: [Chunk <op=TensorUnique, stage=reduce, key=f0628e398249f4071de49f3bc018f471_0>, Chunk <op=TensorUnique, stage=reduce, key=f0628e398249f4071de49f3bc018f471_1>]
- Minimized code to reproduce the error.
MARS_CI_BACKEND=ray pytest -v -s mars/tensor/base/tests/test_base_execution.py::test_unique_execution
Expected behavior A clear and concise description of what you expected to happen.
Additional context Add any other context about the problem here.