mars
mars copied to clipboard
Support get_chunk_meta in RayExecutionContext
Currently RayExecutionContext.get_chunk_meta is not supported, which will make any operands relied on this API failed on tiling, such as when call DataFrame.groupby:
df = md.DataFrame(mt.random.rand(300, 4, chunk_size=100), columns=list("abcd"))
df["a"], df["b"] = (df["a"] * 5).astype(int), (df["b"] * 2).astype(int)
df.groupby(["a", "b"]).apply(lambda pdf: pdf.sum()).execute()
Will got following error:
================================================================================== FAILURES ==================================================================================
________________________________________________________________________________ test_shuffle ________________________________________________________________________________
ray_start_regular_shared2 = RayContext(dashboard_url='127.0.0.1:8265', python_version='3.8.2', ray_version='1.12.0', ray_commit='f18fc31c756299095...127.0.0.1:55710', 'address': '127.0.0.1:55710', 'node_id': '38787319e06bc89f95d7600524069ed4dfba256068c917c261fe697f'})
create_cluster = (<mars.deploy.oscar.local.LocalClient object at 0x7fb22aaf38b0>, {})
@require_ray
@pytest.mark.asyncio
async def test_shuffle(ray_start_regular_shared2, create_cluster):
df = md.DataFrame(mt.random.rand(300, 4, chunk_size=100), columns=list("abcd"))
# `describe` contains multiple shuffle.
df.describe().execute()
arr = np.random.RandomState(0).rand(31, 27)
t1 = mt.tensor(arr, chunk_size=10).reshape(27, 31)
t1.op.extra_params["_reshape_with_shuffle"] = True
np.testing.assert_almost_equal(arr.reshape(27, 31), t1.to_numpy())
np.testing.assert_equal(mt.bincount(mt.arange(5, 10)).to_numpy(), np.bincount(np.arange(5, 10)))
# `RayExecutionContext.get_chunk_meta` not supported, skip dataframe.groupby
df["a"], df["b"] = (df["a"] * 5).astype(int), (df["b"] * 2).astype(int)
> df.groupby(["a", "b"]).apply(lambda pdf: pdf.sum()).execute()
mars/deploy/oscar/tests/test_ray_dag.py:147:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
mars/core/entity/tileables.py:462: in execute
result = self.data.execute(session=session, **kw)
mars/core/entity/executable.py:144: in execute
return execute(self, session=session, **kw)
mars/deploy/oscar/session.py:1855: in execute
return session.execute(
mars/deploy/oscar/session.py:1649: in execute
execution_info: ExecutionInfo = fut.result(
../../../../../opt/anaconda3/envs/mars-py3.8-dev/lib/python3.8/concurrent/futures/_base.py:439: in result
return self.__get_result()
../../../../../opt/anaconda3/envs/mars-py3.8-dev/lib/python3.8/concurrent/futures/_base.py:388: in __get_result
raise self._exception
mars/deploy/oscar/session.py:1835: in _execute
await execution_info
mars/deploy/oscar/session.py:105: in wait
return await self._aio_task
mars/deploy/oscar/session.py:953: in _run_in_background
raise task_result.error.with_traceback(task_result.traceback)
mars/services/task/supervisor/processor.py:364: in run
async for stage_args in self._iter_stage_chunk_graph():
mars/services/task/supervisor/processor.py:158: in _iter_stage_chunk_graph
chunk_graph = await self._get_next_chunk_graph(chunk_graph_iter)
mars/services/task/supervisor/processor.py:149: in _get_next_chunk_graph
chunk_graph = await fut
mars/lib/aio/_threads.py:36: in to_thread
return await loop.run_in_executor(None, func_call)
../../../../../opt/anaconda3/envs/mars-py3.8-dev/lib/python3.8/concurrent/futures/thread.py:57: in run
result = self.fn(*self.args, **self.kwargs)
mars/services/task/supervisor/processor.py:144: in next_chunk_graph
return next(chunk_graph_iter)
mars/services/task/supervisor/preprocessor.py:194: in tile
for chunk_graph in chunk_graph_builder.build():
mars/core/graph/builder/chunk.py:440: in build
yield from self._build()
mars/core/graph/builder/chunk.py:434: in _build
graph = next(tile_iterator)
mars/services/task/supervisor/preprocessor.py:74: in _iter_without_check
to_update_tileables = self._iter()
mars/core/graph/builder/chunk.py:317: in _iter
self._tile(
mars/core/graph/builder/chunk.py:211: in _tile
need_process = next(tile_handler)
mars/core/graph/builder/chunk.py:183: in _tile_handler
tiled_tileables = yield from handler.tile(tiled_tileables)
mars/core/entity/tileables.py:79: in tile
tiled_result = yield from tile_handler(op)
mars/dataframe/groupby/apply.py:151: in tile
return [auto_merge_chunks(get_context(), ret)]
mars/dataframe/utils.py:1333: in auto_merge_chunks
metas = ctx.get_chunks_meta(
mars/services/context.py:188: in get_chunks_meta
return self._call(self._get_chunks_meta(data_keys, fields=fields, error=error))
mars/services/context.py:84: in _call
return fut.result()
../../../../../opt/anaconda3/envs/mars-py3.8-dev/lib/python3.8/concurrent/futures/_base.py:439: in result
return self.__get_result()
../../../../../opt/anaconda3/envs/mars-py3.8-dev/lib/python3.8/concurrent/futures/_base.py:388: in __get_result
raise self._exception
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <mars.services.task.execution.ray.context.RayExecutionContext object at 0x7fb22b3485e0>
data_keys = ['9f92dcd8196d32f25e43e33ba1f56e02_0', '223590f1093c414359f466c42a698006_0', 'dc80798f45b8ed8bb358a7b39b6d8170_0'], fields = ['memory_size'], error = 'ignore'
async def _get_chunks_meta(
self, data_keys: List[str], fields: List[str] = None, error: str = "raise"
) -> List[Dict]:
# get chunks meta
get_metas = []
for data_key in data_keys:
meta = self._meta_api.get_chunk_meta.delay(
data_key, fields=["bands"], error=error
)
get_metas.append(meta)
metas = await self._meta_api.get_chunk_meta.batch(*get_metas)
api_to_keys_calls = defaultdict(lambda: (list(), list()))
for data_key, meta in zip(data_keys, metas):
> addr = meta["bands"][0][0]
E TypeError: 'NoneType' object is not subscriptable
mars/services/context.py:145: TypeError
We need to support get_chunk_meta for ray task backend.