mars
mars copied to clipboard
[BUG] Ray shuffle reduce call iter_mapper_data_with_index returns incorrect values
Describe the bug A clear and concise description of what the bug is.
mars/dataframe/indexing/tests/test_indexing_execution.py:727 (test_reset_index_execution)
setup = <mars.deploy.oscar.session.SyncSession object at 0x14c308f40>
def test_reset_index_execution(setup):
data = pd.DataFrame(
[("bird", 389.0), ("bird", 24.0), ("mammal", 80.5), ("mammal", np.nan)],
index=["falcon", "parrot", "lion", "monkey"],
columns=("class", "max_speed"),
)
df = md.DataFrame(data)
df2 = df.reset_index()
result = df2.execute().fetch()
expected = data.reset_index()
pd.testing.assert_frame_equal(result, expected)
df = md.DataFrame(data, chunk_size=2)
df2 = df.reset_index()
result = df2.execute().fetch()
expected = data.reset_index()
pd.testing.assert_frame_equal(result, expected)
df = md.DataFrame(data, chunk_size=1)
df2 = df.reset_index(drop=True)
result = df2.execute().fetch()
expected = data.reset_index(drop=True)
pd.testing.assert_frame_equal(result, expected)
index = pd.MultiIndex.from_tuples(
[
("bird", "falcon"),
("bird", "parrot"),
("mammal", "lion"),
("mammal", "monkey"),
],
names=["class", "name"],
)
data = pd.DataFrame(
[("bird", 389.0), ("bird", 24.0), ("mammal", 80.5), ("mammal", np.nan)],
index=index,
columns=("type", "max_speed"),
)
df = md.DataFrame(data, chunk_size=1)
df2 = df.reset_index(level="class")
result = df2.execute().fetch()
expected = data.reset_index(level="class")
pd.testing.assert_frame_equal(result, expected)
columns = pd.MultiIndex.from_tuples([("speed", "max"), ("species", "type")])
data.columns = columns
df = md.DataFrame(data, chunk_size=2)
df2 = df.reset_index(level="class", col_level=1, col_fill="species")
result = df2.execute().fetch()
expected = data.reset_index(level="class", col_level=1, col_fill="species")
pd.testing.assert_frame_equal(result, expected)
df = md.DataFrame(data, chunk_size=3)
df.reset_index(level="class", col_level=1, col_fill="species", inplace=True)
result = df.execute().fetch()
expected = data.reset_index(level="class", col_level=1, col_fill="species")
pd.testing.assert_frame_equal(result, expected)
# Test Series
s = pd.Series(
[1, 2, 3, 4], name="foo", index=pd.Index(["a", "b", "c", "d"], name="idx")
)
series = md.Series(s)
s2 = series.reset_index(name="bar")
result = s2.execute().fetch()
expected = s.reset_index(name="bar")
pd.testing.assert_frame_equal(result, expected)
series = md.Series(s, chunk_size=2)
s2 = series.reset_index(drop=True)
result = s2.execute().fetch()
expected = s.reset_index(drop=True)
pd.testing.assert_series_equal(result, expected)
# Test Unknown shape
data1 = pd.DataFrame(np.random.rand(10, 3), index=[0, 10, 2, 3, 4, 5, 6, 7, 8, 9])
df1 = md.DataFrame(data1, chunk_size=5)
data2 = pd.DataFrame(np.random.rand(10, 3), index=[11, 1, 2, 5, 7, 6, 8, 9, 10, 3])
df2 = md.DataFrame(data2, chunk_size=6)
df = (df1 + df2).reset_index(incremental_index=True)
> result = df.execute().fetch()
mars/dataframe/indexing/tests/test_indexing_execution.py:810:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
mars/core/entity/tileables.py:462: in execute
result = self.data.execute(session=session, **kw)
mars/core/entity/executable.py:144: in execute
return execute(self, session=session, **kw)
mars/deploy/oscar/session.py:1890: in execute
return session.execute(
mars/deploy/oscar/session.py:1684: in execute
execution_info: ExecutionInfo = fut.result(
../../.pyenv/versions/3.8.7/lib/python3.8/concurrent/futures/_base.py:439: in result
return self.__get_result()
../../.pyenv/versions/3.8.7/lib/python3.8/concurrent/futures/_base.py:388: in __get_result
raise self._exception
mars/deploy/oscar/session.py:1870: in _execute
await execution_info
../../.pyenv/versions/3.8.7/lib/python3.8/asyncio/tasks.py:695: in _wrap_awaitable
return (yield from awaitable.__await__())
mars/deploy/oscar/session.py:105: in wait
return await self._aio_task
mars/deploy/oscar/session.py:953: in _run_in_background
raise task_result.error.with_traceback(task_result.traceback)
mars/services/task/supervisor/processor.py:369: in run
await self._process_stage_chunk_graph(*stage_args)
mars/services/task/supervisor/processor.py:247: in _process_stage_chunk_graph
chunk_to_result = await self._executor.execute_subtask_graph(
mars/services/task/execution/ray/executor.py:538: in execute_subtask_graph
meta_list = await asyncio.gather(*output_meta_object_refs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
awaitable = ObjectRef(8915caf77059302bffffffffffffffffffffffff0100000001000000)
@types.coroutine
def _wrap_awaitable(awaitable):
"""Helper for asyncio.ensure_future().
Wraps awaitable (an object with __await__) into a coroutine
that will later be wrapped in a Task by ensure_future().
"""
> return (yield from awaitable.__await__())
E ray.exceptions.RayTaskError(TypeError): ray::execute_subtask() (pid=32948, ip=127.0.0.1)
E At least one of the input arguments for this task could not be computed:
E ray.exceptions.RayTaskError: ray::execute_subtask() (pid=32945, ip=127.0.0.1)
E File "/Users/po/Work/mars/mars/services/task/execution/ray/executor.py", line 180, in execute_subtask
E execute(context, chunk.op)
E File "/Users/po/Work/mars/mars/core/operand/core.py", line 491, in execute
E result = executor(results, op)
E File "/Users/po/Work/mars/mars/dataframe/align.py", line 321, in execute
E cls.execute_reduce(ctx, op)
E File "/Users/po/Work/mars/mars/dataframe/align.py", line 298, in execute_reduce
E row_idxes = sorted({idx[0] for idx in input_idx_to_df})
E File "/Users/po/Work/mars/mars/dataframe/align.py", line 298, in <setcomp>
E row_idxes = sorted({idx[0] for idx in input_idx_to_df})
E TypeError: 'int' object is not subscriptable
../../.pyenv/versions/3.8.7/lib/python3.8/asyncio/tasks.py:695: RayTaskError(TypeError)
To Reproduce To help us reproducing this bug, please provide information below:
- Your Python version
- The version of Mars you use
- Versions of crucial packages, such as numpy, scipy and pandas
- Full stack of the error.
- Minimized code to reproduce the error.
Expected behavior A clear and concise description of what you expected to happen.
Additional context Add any other context about the problem here.
This is pretty tricky, the index
in DataFrameIndexAlign
reduce chunk is a tuple for following graph:
The tuple index for chunk index break the assumption in ray shuffle, which mock a chunk index using an int: