mars icon indicating copy to clipboard operation
mars copied to clipboard

[BUG]Test on ray cluster: TypeError: __init__() missing 1 required positional argument: 'pid'

Open guodashun opened this issue 3 years ago • 5 comments

Describe the bug When I run the example code in mars documentation, I got the error msg my code like this

import ray
import time

import mars
import mars.tensor as mt
import mars.dataframe as md

def ray_init():
    while True:
        try:
            print(ray.init(address='xx.xx.xx.xx:9999',
                           log_to_driver=True,
                           ignore_reinit_error=True,
                           ))
            break
        except ConnectionError:
            print("Ray head is not ready yet, retry")
            time.sleep(RAY_WORKER_DELAY)

ray_init()
cluster = mars.new_cluster_in_ray(worker_num=2, worker_mem=2 * 1024 ** 3)
mt.random.RandomState(0).rand(1000_000000, 5).sum().execute()
df = md.DataFrame(
    mt.random.rand(1000_0000, 4, chunk_size=500_0000),
    columns=list('abcd'))
print(df.sum().execute())
print(df.describe().execute())
# Convert mars dataframe to ray dataset
ds = md.to_ray_dataset(df)
print(ds.schema(), ds.count())
ds.filter(lambda row: row["a"] > 0.5).show(5)

To Reproduce To help us reproducing this bug, please provide information below:

  1. Your Python version: 3.8.13
  2. The version of Mars you use: 0.9.0
  3. Versions of crucial packages, such as numpy, scipy and pandas i. numpy: 1.23.0 ii. ray: 1.13.0 iii. scipy: 1.8.1 iv. pandas: 1.4.2
  4. Full stack of the error.
(RaySubPool pid=27115) 2022-07-28 10:58:23,066  ERROR serialization.py:342 -- __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) Traceback (most recent call last):
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 340, in deserialize_objects
(RaySubPool pid=27115)     obj = self._deserialize_object(data, metadata, object_ref)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/oscar/backends/ray/communication.py", line 169, in _deserialize_object
(RaySubPool pid=27115)     value = _ray_deserialize_object(self, data, metadata, object_ref)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 237, in _deserialize_object
(RaySubPool pid=27115)     return self._deserialize_msgpack_data(data, metadata_fields)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 192, in _deserialize_msgpack_data
(RaySubPool pid=27115)     python_objects = self._deserialize_pickle5_data(pickle5_data)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 182, in _deserialize_pickle5_data
(RaySubPool pid=27115)     obj = pickle.loads(in_band)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/lib/tblib/pickling_support.py", line 26, in unpickle_exception
(RaySubPool pid=27115)     inst = func(*args)
(RaySubPool pid=27115) TypeError: __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) Get object ObjectRef(a9e08a2346149af818688fe9862d1fe4520f05550900000001000000) from ray://ray-cluster-1658977074/2/0 failed, got exception System error: __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) traceback: Traceback (most recent call last):
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 340, in deserialize_objects
(RaySubPool pid=27115)     obj = self._deserialize_object(data, metadata, object_ref)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/oscar/backends/ray/communication.py", line 169, in _deserialize_object
(RaySubPool pid=27115)     value = _ray_deserialize_object(self, data, metadata, object_ref)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 237, in _deserialize_object
(RaySubPool pid=27115)     return self._deserialize_msgpack_data(data, metadata_fields)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 192, in _deserialize_msgpack_data
(RaySubPool pid=27115)     python_objects = self._deserialize_pickle5_data(pickle5_data)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 182, in _deserialize_pickle5_data
(RaySubPool pid=27115)     obj = pickle.loads(in_band)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/lib/tblib/pickling_support.py", line 26, in unpickle_exception
(RaySubPool pid=27115)     inst = func(*args)
(RaySubPool pid=27115) TypeError: __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) .
(RaySubPool pid=27115) Traceback (most recent call last):
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/oscar/backends/ray/communication.py", line 285, in handle_task
(RaySubPool pid=27115)     result = await object_ref
(RaySubPool pid=27115) ray.exceptions.RaySystemError: System error: __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) traceback: Traceback (most recent call last):
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 340, in deserialize_objects
(RaySubPool pid=27115)     obj = self._deserialize_object(data, metadata, object_ref)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/oscar/backends/ray/communication.py", line 169, in _deserialize_object
(RaySubPool pid=27115)     value = _ray_deserialize_object(self, data, metadata, object_ref)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 237, in _deserialize_object
(RaySubPool pid=27115)     return self._deserialize_msgpack_data(data, metadata_fields)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 192, in _deserialize_msgpack_data
(RaySubPool pid=27115)     python_objects = self._deserialize_pickle5_data(pickle5_data)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 182, in _deserialize_pickle5_data
(RaySubPool pid=27115)     obj = pickle.loads(in_band)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/lib/tblib/pickling_support.py", line 26, in unpickle_exception
(RaySubPool pid=27115)     inst = func(*args)
(RaySubPool pid=27115) TypeError: __init__() missing 1 required positional argument: 'pid'

  1. Minimized code to reproduce the error.

Expected behavior A clear and concise description of what you expected to happen.

Additional context Find #2860 has the similar error msg

guodashun avatar Jul 28 '22 03:07 guodashun

@chaokunyang @wjsi It seems that the tblib has compatibility issue with RayError?

fyrestone avatar Jul 28 '22 04:07 fyrestone

Could you try this?

import ray
import time

import mars
import mars.tensor as mt
import mars.dataframe as md

def ray_init():
    while True:
        try:
            print(ray.init(address='xx.xx.xx.xx:9999',
                           log_to_driver=True,
                           ignore_reinit_error=True,
                           ))
            break
        except ConnectionError:
            print("Ray head is not ready yet, retry")
            time.sleep(RAY_WORKER_DELAY)

ray_init()
# cluster = mars.new_cluster_in_ray(worker_num=2, worker_mem=2 * 1024 ** 3)
mars.new_session(backend="ray")
mt.random.RandomState(0).rand(1000_000000, 5).sum().execute()
df = md.DataFrame(
    mt.random.rand(1000_0000, 4, chunk_size=500_0000),
    columns=list('abcd'))
print(df.sum().execute())
print(df.describe().execute())
# Convert mars dataframe to ray dataset
ds = md.to_ray_dataset(df)
print(ds.schema(), ds.count())
ds.filter(lambda row: row["a"] > 0.5).show(5)

fyrestone avatar Jul 28 '22 04:07 fyrestone

@fyrestone Thanks for reply. I've tried the new code and the 'pid' error seems to be fixed. But another error raises at print(df.describe().execute())

Traceback (most recent call last):
  File "test_in_cluster.py", line 86, in <module>
    print(df.describe().execute())
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/core/entity/tileables.py", line 462, in execute
    result = self.data.execute(session=session, **kw)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/core/entity/executable.py", line 144, in execute
    return execute(self, session=session, **kw)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1890, in execute
    return session.execute(
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1684, in execute
    execution_info: ExecutionInfo = fut.result(
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1870, in _execute
    await execution_info
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 105, in wait
    return await self._aio_task
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 953, in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/supervisor/processor.py", line 365, in run
    await self._process_stage_chunk_graph(*stage_args)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/supervisor/processor.py", line 243, in _process_stage_chunk_graph
    chunk_to_result = await self._executor.execute_subtask_graph(
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/execution/ray/executor.py", line 366, in execute_subtask_graph
    output_keys = self._get_subtask_output_keys(subtask_chunk_graph)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/execution/ray/executor.py", line 550, in _get_subtask_output_keys
    raise NotImplementedError(
NotImplementedError: The shuffle operands are not supported by the ray executor.

What confuses me is the initial code I provide before can work on this line with the output

                  a             b             c             d
count  1.000000e+07  1.000000e+07  1.000000e+07  1.000000e+07
mean   5.000693e-01  4.999776e-01  5.000117e-01  5.001075e-01
std    2.887084e-01  2.886315e-01  2.886625e-01  2.887464e-01
min    5.400421e-08  3.773959e-08  2.246664e-08  3.552102e-08
25%    2.501076e-01  2.499848e-01  2.500975e-01  2.500542e-01
50%    5.001653e-01  5.001005e-01  5.000041e-01  5.001642e-01
75%    7.502280e-01  7.499122e-01  7.499833e-01  7.501780e-01
max    9.999998e-01  1.000000e+00  9.999999e-01  1.000000e+00

guodashun avatar Jul 28 '22 04:07 guodashun

@fyrestone Thanks for reply. I've tried the new code and the 'pid' error seems to be fixed. But another error raises at print(df.describe().execute())

Traceback (most recent call last):
  File "test_in_cluster.py", line 86, in <module>
    print(df.describe().execute())
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/core/entity/tileables.py", line 462, in execute
    result = self.data.execute(session=session, **kw)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/core/entity/executable.py", line 144, in execute
    return execute(self, session=session, **kw)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1890, in execute
    return session.execute(
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1684, in execute
    execution_info: ExecutionInfo = fut.result(
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1870, in _execute
    await execution_info
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 105, in wait
    return await self._aio_task
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 953, in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/supervisor/processor.py", line 365, in run
    await self._process_stage_chunk_graph(*stage_args)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/supervisor/processor.py", line 243, in _process_stage_chunk_graph
    chunk_to_result = await self._executor.execute_subtask_graph(
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/execution/ray/executor.py", line 366, in execute_subtask_graph
    output_keys = self._get_subtask_output_keys(subtask_chunk_graph)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/execution/ray/executor.py", line 550, in _get_subtask_output_keys
    raise NotImplementedError(
NotImplementedError: The shuffle operands are not supported by the ray executor.

What confuses me is the initial code I provide before can work on this line with the output

                  a             b             c             d
count  1.000000e+07  1.000000e+07  1.000000e+07  1.000000e+07
mean   5.000693e-01  4.999776e-01  5.000117e-01  5.001075e-01
std    2.887084e-01  2.886315e-01  2.886625e-01  2.887464e-01
min    5.400421e-08  3.773959e-08  2.246664e-08  3.552102e-08
25%    2.501076e-01  2.499848e-01  2.500975e-01  2.500542e-01
50%    5.001653e-01  5.001005e-01  5.000041e-01  5.001642e-01
75%    7.502280e-01  7.499122e-01  7.499833e-01  7.501780e-01
max    9.999998e-01  1.000000e+00  9.999999e-01  1.000000e+00

The latest master will be OK. The feature is not contained in previous releases, so the document has not been updated.

fyrestone avatar Jul 28 '22 05:07 fyrestone

@fyrestone cool! The laster master works well!

guodashun avatar Jul 28 '22 06:07 guodashun