mars
mars copied to clipboard
[BUG] Ray executor raises KeyError when gen_subtask_graph
Describe the bug A clear and concise description of what the bug is.
________________ test_bagging_classifier[True-10-1.0-True-SVC] _________________
setup = <mars.deploy.oscar.session.SyncSession object at 0x33ac7eb50>
use_dataframe = True, max_samples = 10, max_features = 1.0, with_weights = True
base_estimator_cls = <class 'sklearn.svm._classes.SVC'>
@pytest.mark.parametrize(
"use_dataframe, max_samples, max_features, with_weights, base_estimator_cls",
[
(False, 10, 0.5, False, LogisticRegression),
(True, 10, 1.0, True, SVC),
],
)
def test_bagging_classifier(
setup, use_dataframe, max_samples, max_features, with_weights, base_estimator_cls
):
rs = np.random.RandomState(0)
raw_x, raw_y = make_classification(
n_samples=100,
n_features=4,
n_informative=2,
n_redundant=0,
random_state=rs,
shuffle=False,
)
if not use_dataframe:
t_x = mt.tensor(raw_x, chunk_size=20)
else:
raw_x = pd.DataFrame(raw_x)
t_x = md.DataFrame(raw_x, chunk_size=20)
raw_weights = rs.random(100)
t_y = mt.tensor(raw_y, chunk_size=20)
t_weights = mt.tensor(raw_weights, chunk_size=20) if with_weights else None
clf = BaggingClassifier(
base_estimator=base_estimator_cls(),
n_estimators=10,
max_samples=max_samples,
max_features=max_features,
random_state=rs,
warm_start=True,
)
clf.fit(t_x, t_y, sample_weight=t_weights)
for _tiled, _chunk, chunk_data in _get_tileable_chunk_data(setup, clf.estimators_):
assert len(chunk_data) == 2
assert all(isinstance(c, base_estimator_cls) for c in chunk_data)
if max_features < 1.0:
assert clf.estimator_features_ is not None
with pytest.warns(Warning):
clf.fit(t_x, t_y, sample_weight=t_weights)
with pytest.raises(ValueError):
clf.n_estimators = 5
clf.fit(t_x, t_y, sample_weight=t_weights)
clf.n_estimators = 20
clf.fit(t_x, t_y, sample_weight=t_weights)
assert clf.estimators_.shape[0] == 20
proba = clf.predict_proba(t_x)
proba_array = proba.fetch()
assert np.all((proba_array >= 0) & (proba_array <= 1))
assert np.allclose(np.sum(proba_array, axis=1), 1.0)
log_proba = clf.predict_log_proba(t_x)
exp_log_proba_array = np.exp(log_proba.fetch())
assert np.all((exp_log_proba_array >= 0) & (exp_log_proba_array <= 1))
assert np.allclose(np.sum(exp_log_proba_array, axis=1), 1.0)
> y = clf.predict(t_x)
mars/learn/ensemble/tests/test_bagging.py:274:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
mars/learn/ensemble/_bagging.py:1492: in predict
return execute(y, session=session, **(run_kwargs or dict()))
mars/deploy/oscar/session.py:1888: in execute
return session.execute(
mars/deploy/oscar/session.py:1682: in execute
execution_info: ExecutionInfo = fut.result(
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/_base.py:444: in result
return self.__get_result()
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
raise self._exception
mars/deploy/oscar/session.py:1868: in _execute
await execution_info
../../.pyenv/versions/3.8.13/lib/python3.8/asyncio/tasks.py:695: in _wrap_awaitable
return (yield from awaitable.__await__())
mars/deploy/oscar/session.py:105: in wait
return await self._aio_task
mars/deploy/oscar/session.py:953: in _run_in_background
raise task_result.error.with_traceback(task_result.traceback)
mars/services/task/supervisor/processor.py:372: in run
await self._process_stage_chunk_graph(*stage_args)
mars/services/task/supervisor/processor.py:216: in _process_stage_chunk_graph
subtask_graph = await asyncio.to_thread(
mars/lib/aio/_threads.py:36: in to_thread
return await loop.run_in_executor(None, func_call)
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/thread.py:57: in run
result = self.fn(*self.args, **self.kwargs)
mars/core/mode.py:77: in _inner
return func(*args, **kwargs)
mars/services/task/supervisor/tests/task_preprocessor.py:181: in analyze
subtask_graph = analyzer.gen_subtask_graph()
mars/core/mode.py:77: in _inner
return func(*args, **kwargs)
mars/services/task/analyzer/analyzer.py:466: in gen_subtask_graph
subtask, inp_subtasks, is_shuffle_proxy = self._gen_subtask_info(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <mars.services.task.analyzer.analyzer.GraphAnalyzer object at 0x33d7a9850>
chunks = [TensorChunkData <op=TensorConcatenate, key=c5bb11bc0edb3df973226e5e38faff2b_0>, TensorChunkData <op=TensorSum, stage=...agg, key=ce125e5107002a50457df480baf02bcf_0>, TensorChunkData <op=TensorSlice, key=dabeb289402d19739cc99fa887734617_0>]
chunk_to_subtask = {DataFrameChunkData <op=DataFrameDataSource, key=c7b091c709abec93f88cb961255bdd31_0>: <Subtask id=xdHHx4jnnJ0sw8K5Li6f...9462e_0>: <Subtask id=j7A8BuHPGXVw5D8A4QD25aEl results=[DataFrameDataSource(d2030277d86500b90f91ebb48da9462e_0)]>, ...}
chunk_to_bands = {TensorChunkData <op=FancyIndexingConcat, stage=reduce, key=fe84472d67b1c15aac3b0ce8e6ccab3c_0>: ('ray_virtual_address...FancyIndexingConcat, stage=reduce, key=e18dcabc2ba9703bff01dde3f2789b95_0>: ('ray_virtual_address_0:0', 'numa-0'), ...}
chunk_to_fetch_chunk = {DataFrameChunkData <op=DataFrameDataSource, key=c6d3ec8b2ef904b4b9a84d6d55bb56b4_0>: DataFrameChunkData <op=DataFrame...b091c709abec93f88cb961255bdd31_0>: DataFrameChunkData <op=DataFrameFetch, key=c7b091c709abec93f88cb961255bdd31_0>, ...}
def _gen_subtask_info(
self,
chunks: List[ChunkType],
chunk_to_subtask: Dict[ChunkType, Subtask],
chunk_to_bands: Dict[ChunkType, BandType],
chunk_to_fetch_chunk: Dict[ChunkType, ChunkType],
) -> Tuple[Subtask, List[Subtask], bool]:
# gen subtask and its input subtasks
chunks_set = set(chunks)
result_chunks = []
result_chunks_set = set()
chunk_graph = ChunkGraph(result_chunks)
out_of_scope_chunks = []
chunk_to_copied = self._chunk_to_copied
update_meta_chunks = []
# subtask properties
band = None
is_virtual = None
retryable = True
chunk_priority = None
expect_worker = None
bands_specified = None
processed = set()
for chunk in chunks:
if chunk in processed:
continue
if expect_worker is None:
expect_worker = chunk.op.expect_worker
bands_specified = expect_worker is not None
else: # pragma: no cover
assert (
chunk.op.expect_worker is None
or expect_worker == chunk.op.expect_worker
), (
f"expect_worker {chunk.op.expect_worker} conflicts with chunks that have same color: "
f"{expect_worker}"
)
# process band
chunk_band = chunk_to_bands.get(chunk)
if chunk_band is not None:
assert (
band is None or band == chunk_band
), "band conflicts with chunks that have same color"
band = chunk_band
# process is_virtual
if isinstance(chunk.op, VirtualOperand):
assert is_virtual is None, "only 1 virtual operand can exist"
is_virtual = True
else:
is_virtual = False
# process retryable
if not chunk.op.retryable:
retryable = False
# process priority
if chunk.op.priority is not None:
assert (
chunk_priority is None or chunk_priority == chunk.op.priority
), "priority conflicts with chunks that have same color"
chunk_priority = chunk.op.priority
# process input chunks
inp_chunks = []
build_fetch_index_to_chunks = dict()
for i, inp_chunk in enumerate(chunk.inputs):
if inp_chunk in chunks_set:
inp_chunks.append(chunk_to_copied[inp_chunk])
else:
build_fetch_index_to_chunks[i] = inp_chunk
inp_chunks.append(None)
if not isinstance(inp_chunk.op, Fetch):
out_of_scope_chunks.append(inp_chunk)
fetch_chunks = self._gen_input_chunks(
list(build_fetch_index_to_chunks.values()), chunk_to_fetch_chunk
)
for i, fetch_chunk in zip(build_fetch_index_to_chunks, fetch_chunks):
inp_chunks[i] = fetch_chunk
copied_op = chunk.op.copy()
copied_op._key = chunk.op.key
out_chunks = [
c.data
for c in copied_op.new_chunks(
inp_chunks, kws=[c.params.copy() for c in chunk.op.outputs]
)
]
for src_chunk, out_chunk in zip(chunk.op.outputs, out_chunks):
processed.add(src_chunk)
out_chunk._key = src_chunk.key
chunk_graph.add_node(out_chunk)
# cannot be copied twice
assert src_chunk not in chunk_to_copied
chunk_to_copied[src_chunk] = out_chunk
if src_chunk in self._final_result_chunks_set:
if out_chunk not in result_chunks_set:
# add to result chunks
result_chunks.append(out_chunk)
# chunk is in the result chunks of full chunk graph
# meta need to be updated
update_meta_chunks.append(out_chunk)
result_chunks_set.add(out_chunk)
if not is_virtual:
# skip adding fetch chunk to chunk graph when op is virtual operand
for c in inp_chunks:
if c not in chunk_graph:
chunk_graph.add_node(c)
chunk_graph.add_edge(c, out_chunk)
# add chunks with no successors into result chunks
result_chunks.extend(
c
for c in chunk_graph.iter_indep(reverse=True)
if c not in result_chunks_set
)
expect_bands = (
[self._to_band(expect_worker)]
if bands_specified
else ([band] if band is not None else None)
)
# calculate priority
if out_of_scope_chunks:
inp_subtasks = []
for out_of_scope_chunk in out_of_scope_chunks:
> copied_out_of_scope_chunk = chunk_to_copied[out_of_scope_chunk]
E KeyError: TensorChunkData <op=TensorArgmax, stage=map, key=eab3440ebb09b9b78d27ad27130f6501_0>
mars/services/task/analyzer/analyzer.py:283: KeyError
The problem is that there are two mapper oprands in one colour (a subtask). Ray DAG shuffle requires one subtask one mapper. So, the TensorArgmax:map:ed36c and FancyIndexingDistribute:map:f6251 are set to new colours (subtasks). But, the TensorArgmax:agg:ea33c is the output of TensorArgmax:map:ed36c, it is kept in the original colour (subtask).
The solution may be:
- Assign a new colour to the mapper and all it's successors. (Only assign a new colour to the mapper itself is incorrect, e.g. this case).
- If the mapper's successor is not a shuffle proxy, we don't need to assign a new colour to it.
I thinks the second solution is better.

To Reproduce To help us reproducing this bug, please provide information below:
- Your Python version
- The version of Mars you use
- Versions of crucial packages, such as numpy, scipy and pandas
- Full stack of the error.
- Minimized code to reproduce the error.
Expected behavior A clear and concise description of what you expected to happen.
Additional context Add any other context about the problem here.