mars icon indicating copy to clipboard operation
mars copied to clipboard

[BUG] unique operand failed on ray shuffle with AssertionError

Open chaokunyang opened this issue 1 year ago • 1 comments

Describe the bug Mars unique operand failed on ray shuffle with AssertionError on check assert len(subtask_chunk_graph.result_chunks) == 1:

def _get_subtask_out_info(
    subtask_chunk_graph: ChunkGraph, is_mapper: bool, n_reducers: int = None
):
    # output_keys might be duplicate in chunk graph, use dict to deduplicate.
    # output_keys order should be consistent with remote `execute_subtask`,
    # dict can preserve insert order.
    output_keys = {}
    for chunk in subtask_chunk_graph.result_chunks:
        if isinstance(
            chunk.op, VirtualOperand
        ):  # FIXME(chaokunyang) no need to check this?
            continue
        elif is_mapper:
            assert (
                len(subtask_chunk_graph.result_chunks) == 1
            ), subtask_chunk_graph.result_chunks
            assert n_reducers is not None
            return set(), n_reducers
        else:
            output_keys[chunk.key] = 1
    return output_keys.keys(), len(output_keys)

For unique operands, reducer may be a mapper too and has outputs which isn' mapper blocks. Ray task-absed shuffle should support this pattern. graphviz (4)

To Reproduce To help us reproducing this bug, please provide information below:

  1. Your Python version: 3.8
  2. The version of Mars you use: master
  3. Versions of crucial packages, such as numpy, scipy and pandas
  4. Full stack of the error.
FAILED mars/tensor/base/tests/test_base_execution.py::test_unique_execution - AssertionError: [Chunk <op=TensorUnique, stage=reduce, key=f0628e398249f4071de49f3bc018f471_0>, Chunk <o...
============================================================================= 1 failed, 1 warning in 20.37s =============================================================================
2022-08-11 20:01:26,022 ERROR (unknown file):0 -- Task exception was never retrieved
future: <Task finished name='Task-336' coro=<_wrap_awaitable() done, defined at /Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError([Chunk <op=TensorUnique, stage=reduce, key=f0628e398249f4071de49f3bc018f471_0>, Chunk <op=TensorUnique, stage=reduce, key=f0628e398249f4071de49f3bc018f471_1>])>
Traceback (most recent call last):
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/_pytest/runner.py", line 338, in from_call
    result: Optional[TResult] = func()
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/_pytest/runner.py", line 259, in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_hooks.py", line 265, in __call__
    return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_manager.py", line 80, in _hookexec
    return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_callers.py", line 60, in _multicall
    return outcome.get_result()
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_result.py", line 60, in get_result
    raise ex[1].with_traceback(ex[2])
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_callers.py", line 39, in _multicall
    res = hook_impl.function(*args)
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/_pytest/runner.py", line 174, in pytest_runtest_call
    raise e
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/_pytest/runner.py", line 166, in pytest_runtest_call
    item.runtest()
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/_pytest/python.py", line 1761, in runtest
    self.ihook.pytest_pyfunc_call(pyfuncitem=self)
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_hooks.py", line 265, in __call__
    return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_manager.py", line 80, in _hookexec
    return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_callers.py", line 60, in _multicall
    return outcome.get_result()
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_result.py", line 60, in get_result
    raise ex[1].with_traceback(ex[2])
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/pluggy/_callers.py", line 39, in _multicall
    res = hook_impl.function(*args)
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/site-packages/_pytest/python.py", line 192, in pytest_pyfunc_call
    result = testfunction(**testargs)
  File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/tensor/base/tests/test_base_execution.py", line 1013, in test_unique_execution
    res = fetch(*execute(y, inverse))
  File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/deploy/oscar/session.py", line 1890, in execute
    return session.execute(
  File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/deploy/oscar/session.py", line 1684, in execute
    execution_info: ExecutionInfo = fut.result(
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/deploy/oscar/session.py", line 1870, in _execute
    await execution_info
  File "/Users/chaokunyang/anaconda3/envs/mars3.8/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/deploy/oscar/session.py", line 105, in wait
    return await self._aio_task
  File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/deploy/oscar/session.py", line 953, in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
  File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/services/task/supervisor/processor.py", line 369, in run
    await self._process_stage_chunk_graph(*stage_args)
  File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/services/task/supervisor/processor.py", line 247, in _process_stage_chunk_graph
    chunk_to_result = await self._executor.execute_subtask_graph(
  File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/services/task/execution/ray/executor.py", line 505, in execute_subtask_graph
    output_keys, out_count = _get_subtask_out_info(
  File "/Users/chaokunyang/Desktop/chaokun/python/mars/mars/services/task/execution/ray/executor.py", line 258, in _get_subtask_out_info
    assert (
AssertionError: [Chunk <op=TensorUnique, stage=reduce, key=f0628e398249f4071de49f3bc018f471_0>, Chunk <op=TensorUnique, stage=reduce, key=f0628e398249f4071de49f3bc018f471_1>]
  1. Minimized code to reproduce the error.
 MARS_CI_BACKEND=ray pytest -v -s mars/tensor/base/tests/test_base_execution.py::test_unique_execution

Expected behavior A clear and concise description of what you expected to happen.

Additional context Add any other context about the problem here.

chaokunyang avatar Aug 11 '22 12:08 chaokunyang