mars icon indicating copy to clipboard operation
mars copied to clipboard

[BUG] Ray executor raises KeyError when exiting stage

Open fyrestone opened this issue 3 years ago • 0 comments

Describe the bug A clear and concise description of what the bug is.

mars/learn/cluster/tests/test_k_means.py:35 (test_k_means_results[elkan-float64-dense])
setup = <mars.deploy.oscar.session.SyncSession object at 0x16fd4b370>
representation = 'dense', dtype = <class 'numpy.float64'>, algo = 'elkan'

    @pytest.mark.skipif(KMeans is None, reason="scikit-learn not installed")
    @pytest.mark.parametrize("representation", ["dense", "sparse"])
    @pytest.mark.parametrize("dtype", [np.float32, np.float64])
    @pytest.mark.parametrize("algo", ["full", "elkan"])
    def test_k_means_results(setup, representation, dtype, algo):
        array_constr = {"dense": np.array, "sparse": sp.csr_matrix}[representation]
    
        X = array_constr([[0, 0], [0.5, 0], [0.5, 1], [1, 1]], dtype=dtype)
        sample_weight = [3, 1, 1, 3]  # will be rescaled to [1.5, 0.5, 0.5, 1.5]
        init_centers = np.array([[0, 0], [1, 1]], dtype=dtype)
    
        expected_labels = [0, 0, 1, 1]
        expected_inertia = 0.1875
        expected_centers = np.array([[0.125, 0], [0.875, 1]], dtype=dtype)
        expected_n_iter = 2
    
        kmeans = KMeans(n_clusters=2, n_init=1, init=init_centers, algorithm=algo)
>       kmeans.fit(X, sample_weight=sample_weight)

mars/learn/cluster/tests/test_k_means.py:53: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
mars/learn/cluster/_kmeans.py:925: in fit
    labels, inertia, centers, n_iter_ = kmeans_single(
mars/learn/cluster/_kmeans.py:295: in _kmeans_single_elkan
    mt.ExecutableTuple(to_runs).execute(session=session, **(run_kwargs or dict()))
mars/core/entity/executable.py:267: in execute
    ret = execute(*self, session=session, **kw)
mars/deploy/oscar/session.py:1888: in execute
    return session.execute(
mars/deploy/oscar/session.py:1682: in execute
    execution_info: ExecutionInfo = fut.result(
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/_base.py:444: in result
    return self.__get_result()
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
    raise self._exception
mars/deploy/oscar/session.py:1868: in _execute
    await execution_info
../../.pyenv/versions/3.8.13/lib/python3.8/asyncio/tasks.py:695: in _wrap_awaitable
    return (yield from awaitable.__await__())
mars/deploy/oscar/session.py:105: in wait
    return await self._aio_task
mars/deploy/oscar/session.py:953: in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
mars/services/task/supervisor/processor.py:372: in run
    await self._process_stage_chunk_graph(*stage_args)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <mars.services.task.execution.ray.executor.RayTaskExecutor object at 0x31c7c5ca0>
exc_type = None, exc_val = None, exc_tb = None

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if exc_type is not None:
            try:
                await self.cancel()
            except BaseException:  # noqa: E722  # nosec  # pylint: disable=bare-except
                pass
            return
    
        # Update info if no exception occurs.
        tileable_keys = []
        update_metas = []
        update_lifecycles = []
        for tileable in self._task.tileable_graph.result_tileables:
            tileable_keys.append(tileable.key)
            tileable = tileable.data if hasattr(tileable, "data") else tileable
            chunk_keys = []
            for chunk in self._tile_context[tileable].chunks:
                chunk_key = chunk.key
                chunk_keys.append(chunk_key)
                if chunk_key in self._task_context:
                    # Some tileable graph may have result chunks that not be executed,
                    # for example:
                    # r, b = cut(series, bins, retbins=True)
                    #     r_result = r.execute().fetch()
                    #     b_result = b.execute().fetch() <- This is the case
                    object_ref = self._task_context[chunk_key]
>                   chunk_meta = self._task_chunks_meta[chunk_key]
E                   KeyError: 'ee5d4ed435009bd4bcff6d87f7450fb8_2'

mars/services/task/execution/ray/executor.py:610: KeyError

To Reproduce To help us reproducing this bug, please provide information below:

  1. Your Python version
  2. The version of Mars you use
  3. Versions of crucial packages, such as numpy, scipy and pandas
  4. Full stack of the error.
  5. Minimized code to reproduce the error.

Expected behavior A clear and concise description of what you expected to happen.

Additional context Add any other context about the problem here.

fyrestone avatar Sep 19 '22 09:09 fyrestone