mars
mars copied to clipboard
[BUG]Test on ray cluster: TypeError: __init__() missing 1 required positional argument: 'pid'
Describe the bug When I run the example code in mars documentation, I got the error msg my code like this
import ray
import time
import mars
import mars.tensor as mt
import mars.dataframe as md
def ray_init():
while True:
try:
print(ray.init(address='xx.xx.xx.xx:9999',
log_to_driver=True,
ignore_reinit_error=True,
))
break
except ConnectionError:
print("Ray head is not ready yet, retry")
time.sleep(RAY_WORKER_DELAY)
ray_init()
cluster = mars.new_cluster_in_ray(worker_num=2, worker_mem=2 * 1024 ** 3)
mt.random.RandomState(0).rand(1000_000000, 5).sum().execute()
df = md.DataFrame(
mt.random.rand(1000_0000, 4, chunk_size=500_0000),
columns=list('abcd'))
print(df.sum().execute())
print(df.describe().execute())
# Convert mars dataframe to ray dataset
ds = md.to_ray_dataset(df)
print(ds.schema(), ds.count())
ds.filter(lambda row: row["a"] > 0.5).show(5)
To Reproduce To help us reproducing this bug, please provide information below:
- Your Python version: 3.8.13
- The version of Mars you use: 0.9.0
- Versions of crucial packages, such as numpy, scipy and pandas i. numpy: 1.23.0 ii. ray: 1.13.0 iii. scipy: 1.8.1 iv. pandas: 1.4.2
- Full stack of the error.
(RaySubPool pid=27115) 2022-07-28 10:58:23,066 ERROR serialization.py:342 -- __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) Traceback (most recent call last):
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 340, in deserialize_objects
(RaySubPool pid=27115) obj = self._deserialize_object(data, metadata, object_ref)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/oscar/backends/ray/communication.py", line 169, in _deserialize_object
(RaySubPool pid=27115) value = _ray_deserialize_object(self, data, metadata, object_ref)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 237, in _deserialize_object
(RaySubPool pid=27115) return self._deserialize_msgpack_data(data, metadata_fields)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 192, in _deserialize_msgpack_data
(RaySubPool pid=27115) python_objects = self._deserialize_pickle5_data(pickle5_data)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 182, in _deserialize_pickle5_data
(RaySubPool pid=27115) obj = pickle.loads(in_band)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/lib/tblib/pickling_support.py", line 26, in unpickle_exception
(RaySubPool pid=27115) inst = func(*args)
(RaySubPool pid=27115) TypeError: __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) Get object ObjectRef(a9e08a2346149af818688fe9862d1fe4520f05550900000001000000) from ray://ray-cluster-1658977074/2/0 failed, got exception System error: __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) traceback: Traceback (most recent call last):
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 340, in deserialize_objects
(RaySubPool pid=27115) obj = self._deserialize_object(data, metadata, object_ref)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/oscar/backends/ray/communication.py", line 169, in _deserialize_object
(RaySubPool pid=27115) value = _ray_deserialize_object(self, data, metadata, object_ref)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 237, in _deserialize_object
(RaySubPool pid=27115) return self._deserialize_msgpack_data(data, metadata_fields)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 192, in _deserialize_msgpack_data
(RaySubPool pid=27115) python_objects = self._deserialize_pickle5_data(pickle5_data)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 182, in _deserialize_pickle5_data
(RaySubPool pid=27115) obj = pickle.loads(in_band)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/lib/tblib/pickling_support.py", line 26, in unpickle_exception
(RaySubPool pid=27115) inst = func(*args)
(RaySubPool pid=27115) TypeError: __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) .
(RaySubPool pid=27115) Traceback (most recent call last):
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/oscar/backends/ray/communication.py", line 285, in handle_task
(RaySubPool pid=27115) result = await object_ref
(RaySubPool pid=27115) ray.exceptions.RaySystemError: System error: __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) traceback: Traceback (most recent call last):
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 340, in deserialize_objects
(RaySubPool pid=27115) obj = self._deserialize_object(data, metadata, object_ref)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/oscar/backends/ray/communication.py", line 169, in _deserialize_object
(RaySubPool pid=27115) value = _ray_deserialize_object(self, data, metadata, object_ref)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 237, in _deserialize_object
(RaySubPool pid=27115) return self._deserialize_msgpack_data(data, metadata_fields)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 192, in _deserialize_msgpack_data
(RaySubPool pid=27115) python_objects = self._deserialize_pickle5_data(pickle5_data)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 182, in _deserialize_pickle5_data
(RaySubPool pid=27115) obj = pickle.loads(in_band)
(RaySubPool pid=27115) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/lib/tblib/pickling_support.py", line 26, in unpickle_exception
(RaySubPool pid=27115) inst = func(*args)
(RaySubPool pid=27115) TypeError: __init__() missing 1 required positional argument: 'pid'
- Minimized code to reproduce the error.
Expected behavior A clear and concise description of what you expected to happen.
Additional context Find #2860 has the similar error msg
@chaokunyang @wjsi It seems that the tblib has compatibility issue with RayError?
Could you try this?
import ray
import time
import mars
import mars.tensor as mt
import mars.dataframe as md
def ray_init():
while True:
try:
print(ray.init(address='xx.xx.xx.xx:9999',
log_to_driver=True,
ignore_reinit_error=True,
))
break
except ConnectionError:
print("Ray head is not ready yet, retry")
time.sleep(RAY_WORKER_DELAY)
ray_init()
# cluster = mars.new_cluster_in_ray(worker_num=2, worker_mem=2 * 1024 ** 3)
mars.new_session(backend="ray")
mt.random.RandomState(0).rand(1000_000000, 5).sum().execute()
df = md.DataFrame(
mt.random.rand(1000_0000, 4, chunk_size=500_0000),
columns=list('abcd'))
print(df.sum().execute())
print(df.describe().execute())
# Convert mars dataframe to ray dataset
ds = md.to_ray_dataset(df)
print(ds.schema(), ds.count())
ds.filter(lambda row: row["a"] > 0.5).show(5)
@fyrestone Thanks for reply.
I've tried the new code and the 'pid' error seems to be fixed.
But another error raises at print(df.describe().execute())
Traceback (most recent call last):
File "test_in_cluster.py", line 86, in <module>
print(df.describe().execute())
File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/core/entity/tileables.py", line 462, in execute
result = self.data.execute(session=session, **kw)
File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/core/entity/executable.py", line 144, in execute
return execute(self, session=session, **kw)
File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1890, in execute
return session.execute(
File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1684, in execute
execution_info: ExecutionInfo = fut.result(
File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1870, in _execute
await execution_info
File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 105, in wait
return await self._aio_task
File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 953, in _run_in_background
raise task_result.error.with_traceback(task_result.traceback)
File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/supervisor/processor.py", line 365, in run
await self._process_stage_chunk_graph(*stage_args)
File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/supervisor/processor.py", line 243, in _process_stage_chunk_graph
chunk_to_result = await self._executor.execute_subtask_graph(
File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/execution/ray/executor.py", line 366, in execute_subtask_graph
output_keys = self._get_subtask_output_keys(subtask_chunk_graph)
File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/execution/ray/executor.py", line 550, in _get_subtask_output_keys
raise NotImplementedError(
NotImplementedError: The shuffle operands are not supported by the ray executor.
What confuses me is the initial code I provide before can work on this line with the output
a b c d
count 1.000000e+07 1.000000e+07 1.000000e+07 1.000000e+07
mean 5.000693e-01 4.999776e-01 5.000117e-01 5.001075e-01
std 2.887084e-01 2.886315e-01 2.886625e-01 2.887464e-01
min 5.400421e-08 3.773959e-08 2.246664e-08 3.552102e-08
25% 2.501076e-01 2.499848e-01 2.500975e-01 2.500542e-01
50% 5.001653e-01 5.001005e-01 5.000041e-01 5.001642e-01
75% 7.502280e-01 7.499122e-01 7.499833e-01 7.501780e-01
max 9.999998e-01 1.000000e+00 9.999999e-01 1.000000e+00
@fyrestone Thanks for reply. I've tried the new code and the 'pid' error seems to be fixed. But another error raises at
print(df.describe().execute())Traceback (most recent call last): File "test_in_cluster.py", line 86, in <module> print(df.describe().execute()) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/core/entity/tileables.py", line 462, in execute result = self.data.execute(session=session, **kw) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/core/entity/executable.py", line 144, in execute return execute(self, session=session, **kw) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1890, in execute return session.execute( File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1684, in execute execution_info: ExecutionInfo = fut.result( File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/concurrent/futures/_base.py", line 444, in result return self.__get_result() File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1870, in _execute await execution_info File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 105, in wait return await self._aio_task File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 953, in _run_in_background raise task_result.error.with_traceback(task_result.traceback) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/supervisor/processor.py", line 365, in run await self._process_stage_chunk_graph(*stage_args) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/supervisor/processor.py", line 243, in _process_stage_chunk_graph chunk_to_result = await self._executor.execute_subtask_graph( File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/execution/ray/executor.py", line 366, in execute_subtask_graph output_keys = self._get_subtask_output_keys(subtask_chunk_graph) File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/execution/ray/executor.py", line 550, in _get_subtask_output_keys raise NotImplementedError( NotImplementedError: The shuffle operands are not supported by the ray executor.What confuses me is the initial code I provide before can work on this line with the output
a b c d count 1.000000e+07 1.000000e+07 1.000000e+07 1.000000e+07 mean 5.000693e-01 4.999776e-01 5.000117e-01 5.001075e-01 std 2.887084e-01 2.886315e-01 2.886625e-01 2.887464e-01 min 5.400421e-08 3.773959e-08 2.246664e-08 3.552102e-08 25% 2.501076e-01 2.499848e-01 2.500975e-01 2.500542e-01 50% 5.001653e-01 5.001005e-01 5.000041e-01 5.001642e-01 75% 7.502280e-01 7.499122e-01 7.499833e-01 7.501780e-01 max 9.999998e-01 1.000000e+00 9.999999e-01 1.000000e+00
The latest master will be OK. The feature is not contained in previous releases, so the document has not been updated.
@fyrestone cool! The laster master works well!