mars icon indicating copy to clipboard operation
mars copied to clipboard

Support get_chunk_meta in RayExecutionContext

Open chaokunyang opened this issue 3 years ago • 0 comments

Currently RayExecutionContext.get_chunk_meta is not supported, which will make any operands relied on this API failed on tiling, such as when call DataFrame.groupby:

df = md.DataFrame(mt.random.rand(300, 4, chunk_size=100), columns=list("abcd"))
df["a"], df["b"] = (df["a"] * 5).astype(int), (df["b"] * 2).astype(int)
df.groupby(["a", "b"]).apply(lambda pdf: pdf.sum()).execute()

Will got following error:

================================================================================== FAILURES ==================================================================================
________________________________________________________________________________ test_shuffle ________________________________________________________________________________

ray_start_regular_shared2 = RayContext(dashboard_url='127.0.0.1:8265', python_version='3.8.2', ray_version='1.12.0', ray_commit='f18fc31c756299095...127.0.0.1:55710', 'address': '127.0.0.1:55710', 'node_id': '38787319e06bc89f95d7600524069ed4dfba256068c917c261fe697f'})
create_cluster = (<mars.deploy.oscar.local.LocalClient object at 0x7fb22aaf38b0>, {})

    @require_ray
    @pytest.mark.asyncio
    async def test_shuffle(ray_start_regular_shared2, create_cluster):
        df = md.DataFrame(mt.random.rand(300, 4, chunk_size=100), columns=list("abcd"))
        # `describe` contains multiple shuffle.
        df.describe().execute()
    
        arr = np.random.RandomState(0).rand(31, 27)
        t1 = mt.tensor(arr, chunk_size=10).reshape(27, 31)
        t1.op.extra_params["_reshape_with_shuffle"] = True
        np.testing.assert_almost_equal(arr.reshape(27, 31), t1.to_numpy())
    
        np.testing.assert_equal(mt.bincount(mt.arange(5, 10)).to_numpy(), np.bincount(np.arange(5, 10)))
    
        # `RayExecutionContext.get_chunk_meta` not supported, skip dataframe.groupby
        df["a"], df["b"] = (df["a"] * 5).astype(int), (df["b"] * 2).astype(int)
>       df.groupby(["a", "b"]).apply(lambda pdf: pdf.sum()).execute()

mars/deploy/oscar/tests/test_ray_dag.py:147: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
mars/core/entity/tileables.py:462: in execute
    result = self.data.execute(session=session, **kw)
mars/core/entity/executable.py:144: in execute
    return execute(self, session=session, **kw)
mars/deploy/oscar/session.py:1855: in execute
    return session.execute(
mars/deploy/oscar/session.py:1649: in execute
    execution_info: ExecutionInfo = fut.result(
../../../../../opt/anaconda3/envs/mars-py3.8-dev/lib/python3.8/concurrent/futures/_base.py:439: in result
    return self.__get_result()
../../../../../opt/anaconda3/envs/mars-py3.8-dev/lib/python3.8/concurrent/futures/_base.py:388: in __get_result
    raise self._exception
mars/deploy/oscar/session.py:1835: in _execute
    await execution_info
mars/deploy/oscar/session.py:105: in wait
    return await self._aio_task
mars/deploy/oscar/session.py:953: in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
mars/services/task/supervisor/processor.py:364: in run
    async for stage_args in self._iter_stage_chunk_graph():
mars/services/task/supervisor/processor.py:158: in _iter_stage_chunk_graph
    chunk_graph = await self._get_next_chunk_graph(chunk_graph_iter)
mars/services/task/supervisor/processor.py:149: in _get_next_chunk_graph
    chunk_graph = await fut
mars/lib/aio/_threads.py:36: in to_thread
    return await loop.run_in_executor(None, func_call)
../../../../../opt/anaconda3/envs/mars-py3.8-dev/lib/python3.8/concurrent/futures/thread.py:57: in run
    result = self.fn(*self.args, **self.kwargs)
mars/services/task/supervisor/processor.py:144: in next_chunk_graph
    return next(chunk_graph_iter)
mars/services/task/supervisor/preprocessor.py:194: in tile
    for chunk_graph in chunk_graph_builder.build():
mars/core/graph/builder/chunk.py:440: in build
    yield from self._build()
mars/core/graph/builder/chunk.py:434: in _build
    graph = next(tile_iterator)
mars/services/task/supervisor/preprocessor.py:74: in _iter_without_check
    to_update_tileables = self._iter()
mars/core/graph/builder/chunk.py:317: in _iter
    self._tile(
mars/core/graph/builder/chunk.py:211: in _tile
    need_process = next(tile_handler)
mars/core/graph/builder/chunk.py:183: in _tile_handler
    tiled_tileables = yield from handler.tile(tiled_tileables)
mars/core/entity/tileables.py:79: in tile
    tiled_result = yield from tile_handler(op)
mars/dataframe/groupby/apply.py:151: in tile
    return [auto_merge_chunks(get_context(), ret)]
mars/dataframe/utils.py:1333: in auto_merge_chunks
    metas = ctx.get_chunks_meta(
mars/services/context.py:188: in get_chunks_meta
    return self._call(self._get_chunks_meta(data_keys, fields=fields, error=error))
mars/services/context.py:84: in _call
    return fut.result()
../../../../../opt/anaconda3/envs/mars-py3.8-dev/lib/python3.8/concurrent/futures/_base.py:439: in result
    return self.__get_result()
../../../../../opt/anaconda3/envs/mars-py3.8-dev/lib/python3.8/concurrent/futures/_base.py:388: in __get_result
    raise self._exception
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <mars.services.task.execution.ray.context.RayExecutionContext object at 0x7fb22b3485e0>
data_keys = ['9f92dcd8196d32f25e43e33ba1f56e02_0', '223590f1093c414359f466c42a698006_0', 'dc80798f45b8ed8bb358a7b39b6d8170_0'], fields = ['memory_size'], error = 'ignore'

    async def _get_chunks_meta(
        self, data_keys: List[str], fields: List[str] = None, error: str = "raise"
    ) -> List[Dict]:
        # get chunks meta
        get_metas = []
        for data_key in data_keys:
            meta = self._meta_api.get_chunk_meta.delay(
                data_key, fields=["bands"], error=error
            )
            get_metas.append(meta)
        metas = await self._meta_api.get_chunk_meta.batch(*get_metas)
        api_to_keys_calls = defaultdict(lambda: (list(), list()))
        for data_key, meta in zip(data_keys, metas):
>           addr = meta["bands"][0][0]
E           TypeError: 'NoneType' object is not subscriptable

mars/services/context.py:145: TypeError

We need to support get_chunk_meta for ray task backend.

chaokunyang avatar May 17 '22 03:05 chaokunyang