mars icon indicating copy to clipboard operation
mars copied to clipboard

[BUG] Ray executor auto merge chunk may raise KeyError

Open fyrestone opened this issue 3 years ago • 0 comments

Describe the bug A clear and concise description of what the bug is.

mars/deploy/oscar/tests/test_ray_dag.py:189 (test_merge_groupby[before-None])
ray_start_regular_shared2 = RayContext(dashboard_url='', python_version='3.8.13', ray_version='1.13.0', ray_commit='e4ce38d001dbbe09cd21c497fedd03...127.0.0.1:64894', 'address': '127.0.0.1:64894', 'node_id': '987c20539d0bb8031ea7d8ddfc5783c01d5b79d143191bdb072ba21b'})
create_cluster = (<mars.deploy.oscar.local.LocalClient object at 0x31b18edc0>, {})
method = None, auto_merge = 'before'

    @require_ray
    @pytest.mark.parametrize("method", ["broadcast", None])
    @pytest.mark.parametrize("auto_merge", ["before", "after"])
    def test_merge_groupby(ray_start_regular_shared2, create_cluster, method, auto_merge):
        rs = np.random.RandomState(0)
        raw1 = pd.DataFrame({"a": rs.randint(3, size=100), "b": rs.rand(100)})
        raw2 = pd.DataFrame({"a": rs.randint(3, size=10), "c": rs.rand(10)})
        df1 = md.DataFrame(raw1, chunk_size=10).execute()
        df2 = md.DataFrame(raw2, chunk_size=10).execute()
        # do not trigger auto merge
        df3 = df1.merge(
            df2, on="a", auto_merge_threshold=8, method=method, auto_merge=auto_merge
        )
        df4 = df3.groupby("a").sum()
    
>       result = df4.execute().fetch()

mars/deploy/oscar/tests/test_ray_dag.py:205: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
mars/core/entity/tileables.py:462: in execute
    result = self.data.execute(session=session, **kw)
mars/core/entity/executable.py:144: in execute
    return execute(self, session=session, **kw)
mars/deploy/oscar/session.py:1890: in execute
    return session.execute(
mars/deploy/oscar/session.py:1684: in execute
    execution_info: ExecutionInfo = fut.result(
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/_base.py:444: in result
    return self.__get_result()
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
    raise self._exception
mars/deploy/oscar/session.py:1870: in _execute
    await execution_info
mars/deploy/oscar/session.py:105: in wait
    return await self._aio_task
mars/deploy/oscar/session.py:953: in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
mars/services/task/supervisor/processor.py:368: in run
    async for stage_args in self._iter_stage_chunk_graph():
mars/services/task/supervisor/processor.py:158: in _iter_stage_chunk_graph
    chunk_graph = await self._get_next_chunk_graph(chunk_graph_iter)
mars/services/task/supervisor/processor.py:149: in _get_next_chunk_graph
    chunk_graph = await fut
mars/lib/aio/_threads.py:36: in to_thread
    return await loop.run_in_executor(None, func_call)
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/thread.py:57: in run
    result = self.fn(*self.args, **self.kwargs)
mars/services/task/supervisor/processor.py:144: in next_chunk_graph
    return next(chunk_graph_iter)
mars/services/task/supervisor/preprocessor.py:194: in tile
    for chunk_graph in chunk_graph_builder.build():
mars/core/graph/builder/chunk.py:440: in build
    yield from self._build()
mars/core/graph/builder/chunk.py:434: in _build
    graph = next(tile_iterator)
mars/services/task/supervisor/preprocessor.py:74: in _iter_without_check
    to_update_tileables = self._iter()
mars/core/graph/builder/chunk.py:317: in _iter
    self._tile(
mars/core/graph/builder/chunk.py:211: in _tile
    need_process = next(tile_handler)
mars/core/graph/builder/chunk.py:183: in _tile_handler
    tiled_tileables = yield from handler.tile(tiled_tileables)
mars/core/entity/tileables.py:79: in tile
    tiled_result = yield from tile_handler(op)
mars/dataframe/merge/merge.py:729: in tile
    left = auto_merge_chunks(ctx, left)
mars/dataframe/utils.py:1355: in auto_merge_chunks
    metas = ctx.get_chunks_meta(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <mars.services.task.execution.ray.context.RayExecutionContext object at 0x31c7432b0>
data_keys = ['ed0cb85eeb0149649a565523a17aee60_0', 'ef36ff532158e2c4219867243b37f2dd_0', 'd9f91608f2ca6d88396d91ebdd9ff435_0', 'dc92a54294b3a665971b5b15da6ddd0b_0', 'c7cbea6d90a45df0826bc2a267b72d15_0', 'f769e2009ccc91538652404889dcf893_0', ...]
fields = ['memory_size'], error = 'ignore'

    @implements(Context.get_chunks_meta)
    def get_chunks_meta(
        self, data_keys: List[str], fields: List[str] = None, error="raise"
    ) -> List[Dict]:
        result = []
        # TODO(fyrestone): Support get_chunks_meta from meta service if needed.
        for key in data_keys:
>           chunk_meta = self._task_chunks_meta[key]
E           KeyError: 'ed0cb85eeb0149649a565523a17aee60_0'

mars/services/task/execution/ray/context.py:141: KeyError

To Reproduce To help us reproducing this bug, please provide information below:

  1. Your Python version 3.8.13
  2. The version of Mars you use Latest master
  3. Versions of crucial packages, such as numpy, scipy and pandas
  4. Full stack of the error.
  5. Minimized code to reproduce the error.

Expected behavior A clear and concise description of what you expected to happen.

Additional context Add any other context about the problem here.

fyrestone avatar Aug 09 '22 02:08 fyrestone