mars
mars copied to clipboard
[BUG] Ray executor auto merge chunk may raise KeyError
Describe the bug A clear and concise description of what the bug is.
mars/deploy/oscar/tests/test_ray_dag.py:189 (test_merge_groupby[before-None])
ray_start_regular_shared2 = RayContext(dashboard_url='', python_version='3.8.13', ray_version='1.13.0', ray_commit='e4ce38d001dbbe09cd21c497fedd03...127.0.0.1:64894', 'address': '127.0.0.1:64894', 'node_id': '987c20539d0bb8031ea7d8ddfc5783c01d5b79d143191bdb072ba21b'})
create_cluster = (<mars.deploy.oscar.local.LocalClient object at 0x31b18edc0>, {})
method = None, auto_merge = 'before'
@require_ray
@pytest.mark.parametrize("method", ["broadcast", None])
@pytest.mark.parametrize("auto_merge", ["before", "after"])
def test_merge_groupby(ray_start_regular_shared2, create_cluster, method, auto_merge):
rs = np.random.RandomState(0)
raw1 = pd.DataFrame({"a": rs.randint(3, size=100), "b": rs.rand(100)})
raw2 = pd.DataFrame({"a": rs.randint(3, size=10), "c": rs.rand(10)})
df1 = md.DataFrame(raw1, chunk_size=10).execute()
df2 = md.DataFrame(raw2, chunk_size=10).execute()
# do not trigger auto merge
df3 = df1.merge(
df2, on="a", auto_merge_threshold=8, method=method, auto_merge=auto_merge
)
df4 = df3.groupby("a").sum()
> result = df4.execute().fetch()
mars/deploy/oscar/tests/test_ray_dag.py:205:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
mars/core/entity/tileables.py:462: in execute
result = self.data.execute(session=session, **kw)
mars/core/entity/executable.py:144: in execute
return execute(self, session=session, **kw)
mars/deploy/oscar/session.py:1890: in execute
return session.execute(
mars/deploy/oscar/session.py:1684: in execute
execution_info: ExecutionInfo = fut.result(
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/_base.py:444: in result
return self.__get_result()
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
raise self._exception
mars/deploy/oscar/session.py:1870: in _execute
await execution_info
mars/deploy/oscar/session.py:105: in wait
return await self._aio_task
mars/deploy/oscar/session.py:953: in _run_in_background
raise task_result.error.with_traceback(task_result.traceback)
mars/services/task/supervisor/processor.py:368: in run
async for stage_args in self._iter_stage_chunk_graph():
mars/services/task/supervisor/processor.py:158: in _iter_stage_chunk_graph
chunk_graph = await self._get_next_chunk_graph(chunk_graph_iter)
mars/services/task/supervisor/processor.py:149: in _get_next_chunk_graph
chunk_graph = await fut
mars/lib/aio/_threads.py:36: in to_thread
return await loop.run_in_executor(None, func_call)
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/thread.py:57: in run
result = self.fn(*self.args, **self.kwargs)
mars/services/task/supervisor/processor.py:144: in next_chunk_graph
return next(chunk_graph_iter)
mars/services/task/supervisor/preprocessor.py:194: in tile
for chunk_graph in chunk_graph_builder.build():
mars/core/graph/builder/chunk.py:440: in build
yield from self._build()
mars/core/graph/builder/chunk.py:434: in _build
graph = next(tile_iterator)
mars/services/task/supervisor/preprocessor.py:74: in _iter_without_check
to_update_tileables = self._iter()
mars/core/graph/builder/chunk.py:317: in _iter
self._tile(
mars/core/graph/builder/chunk.py:211: in _tile
need_process = next(tile_handler)
mars/core/graph/builder/chunk.py:183: in _tile_handler
tiled_tileables = yield from handler.tile(tiled_tileables)
mars/core/entity/tileables.py:79: in tile
tiled_result = yield from tile_handler(op)
mars/dataframe/merge/merge.py:729: in tile
left = auto_merge_chunks(ctx, left)
mars/dataframe/utils.py:1355: in auto_merge_chunks
metas = ctx.get_chunks_meta(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <mars.services.task.execution.ray.context.RayExecutionContext object at 0x31c7432b0>
data_keys = ['ed0cb85eeb0149649a565523a17aee60_0', 'ef36ff532158e2c4219867243b37f2dd_0', 'd9f91608f2ca6d88396d91ebdd9ff435_0', 'dc92a54294b3a665971b5b15da6ddd0b_0', 'c7cbea6d90a45df0826bc2a267b72d15_0', 'f769e2009ccc91538652404889dcf893_0', ...]
fields = ['memory_size'], error = 'ignore'
@implements(Context.get_chunks_meta)
def get_chunks_meta(
self, data_keys: List[str], fields: List[str] = None, error="raise"
) -> List[Dict]:
result = []
# TODO(fyrestone): Support get_chunks_meta from meta service if needed.
for key in data_keys:
> chunk_meta = self._task_chunks_meta[key]
E KeyError: 'ed0cb85eeb0149649a565523a17aee60_0'
mars/services/task/execution/ray/context.py:141: KeyError
To Reproduce To help us reproducing this bug, please provide information below:
- Your Python version 3.8.13
- The version of Mars you use Latest master
- Versions of crucial packages, such as numpy, scipy and pandas
- Full stack of the error.
- Minimized code to reproduce the error.
Expected behavior A clear and concise description of what you expected to happen.
Additional context Add any other context about the problem here.