raydp
raydp copied to clipboard
Can I use spark.createDataFrame() with a list of ObjectRef from various remote Ray workers?
I am trying to see if I can create a Spark DataFrame with a list of ObjectRef from several remote Ray tasks that have put large objects into their local plasma stores. I know that Spark can take a Pandas dataframe and turn it into a Spark dataframe. However, I don't want the Spark driver to collect all the large objects from various Ray workers in remote nodes, because it can easily cause the Spark driver to be out of memory. But, it turns out that ObjectRef is not one of the supported DataTypes in Spark.createDataFrame(). Is there any workaround?
Here is the code that I was trying to do:
import ray
import pandas as pd
import numpy as np
ray.shutdown()
ray.init()
@ray.remote
def create_small_dataframe(i):
pdf = pd.DataFrame(data=np.random.randint(5*i, size=(3, 4)))
pdf_ref = ray.put(pdf)
return pdf_ref
# The driver program uses Ray.remote to let the remote actors/task
# to use Ray.put() to create/store partitions of dataframe locally in
# their plasma store. The Object_refs of these partitions are returned
# back to the driver
obj_ref1 = ray.get(create_small_dataframe.remote(1))
obj_ref2 = ray.get(create_small_dataframe.remote(2))
# print(ray.get(obj_ref1))
# print(ray.get(obj_ref2))
# The drive then collects all obj_refs into a list on the driver
obj_ref_list = [obj_ref1, obj_ref2]
# Now, the driver can call SparkSession.createDataFrame
import raydp
raydp.stop_spark()
spark = raydp.init_spark('dataframe_with_obj_ref',
num_executors=2,
executor_cores=2,
executor_memory='1G')
from pyspark.sql.types import BinaryType, StructType, StructField
from ray._raylet import ObjectRef
schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])
sdf = spark.createDataFrame(obj_ref_list, schema)
And the following are the error messages:
TypeError Traceback (most recent call last)
<ipython-input-3-2ad94baefe18> in <module>
30 schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])
31
---> 32 sdf = spark.createDataFrame(obj_ref_list, schema)
33
34
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
603 return super(SparkSession, self).createDataFrame(
604 data, schema, samplingRatio, verifySchema)
--> 605 return self._create_dataframe(data, schema, samplingRatio, verifySchema)
606
607 def _create_dataframe(self, data, schema, samplingRatio, verifySchema):
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
628 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
629 else:
--> 630 rdd, schema = self._createFromLocal(map(prepare, data), schema)
631 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
632 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _createFromLocal(self, data, schema)
446 # make sure data could consumed multiple times
447 if not isinstance(data, list):
--> 448 data = list(data)
449
450 if schema is None or isinstance(schema, (list, tuple)):
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in prepare(obj)
610
611 def prepare(obj):
--> 612 verify_func(obj)
613 return obj
614 elif isinstance(schema, DataType):
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py in verify(obj)
1406 def verify(obj):
1407 if not verify_nullability(obj):
-> 1408 verify_value(obj)
1409
1410 return verify
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py in verify_struct(obj)
1393 verifier(d.get(f))
1394 else:
-> 1395 raise TypeError(new_msg("StructType can not accept object %r in type %s"
1396 % (obj, type(obj))))
1397 verify_value = verify_struct
TypeError: StructType can not accept object ObjectRef(a67dc375e60ddd1affffffffffffffffffffffff0100000002000000) in type <class 'ray._raylet.ObjectRef'>
Hi @klwuibm, the feature is not supported yet. @kira-lin is working to support it.
You might want to take a look at ray.experiment.data
, to which we are currently adding support. But it requires ray-nightly, so for now you can probably use our MLDataset. You need to use ray.cloudpickle.dumps(obj_ref)
when you want to put it into a dataframe, and ray.cloudpickle.loads
before you ray.get
. Besides, things returned from create_small_datataframe.remote()
is a ref to a ref to the data, you could just return pdf instead.
@kira-lin thanks for the comments. I wonder if you could clarify two questions. (1) Is there any connection between MLDataset
and ray.cloudpickle.dumps()
? I assume they are not related. (2) Your suggestion of serializing obj_ref
with ray.cloudpickle.dumps()
before calling spark.createDataFrame()
, do you mean that I store the Pandas dataframe created by a remote task onto a file on disk? Namely, are you suggesting that I write the remotely created Pandas dataframe
onto disks? And then spark can read them back to create Spark datafame
?
- yes they are not related. I was just suggesting maybe you can try our MLDataset, or if you want to solve the problem, you should use cloudpickle
- No,
cloudpickle.dumps
just serialize the reference and returns a binary bytes object. Then you can store it in the dataframe, with schema binary type.cloudpickle.loads
loads a reference from the binary bytes, that's how you can use ray.get to get data again.
@kira-lin Thanks very much. Appreciated the help. Here is what I did, but still not working as I expected.
import cloudpickle
@ray.remote
def create_small_dataframe(i):
pdf = pd.DataFrame(data=np.random.randint(5*i, size=(3, 4)))
return ray.cloudpickle.dumps(pdf)
# The driver program uses Ray.remote to let the remote actors/task
# to use Ray.put() to create/store partitions of dataframe locally in
# their plasma store. The Object_refs of these partitions are returned
# back to the driver
obj_ref1 = ray.get(create_small_dataframe.remote(1))
obj_ref2 = ray.get(create_small_dataframe.remote(2))
print(obj_ref1)
#print(ray.get(obj_ref2))
#obj_ref_hex1 = ray.cloudpickle.dumps(obj_ref1).hex()
#obj_ref_hex2 = ray.cloudpickle.dumps(obj_ref2).hex()
# The drive then collects all obj_refs into a list on the driver
obj_ref_list = [obj_ref1, obj_ref2]
# Now, the driver can call SparkSession.createDataFrame
import raydp
raydp.stop_spark()
spark = raydp.init_spark('dataframe_with_obj_ref',
num_executors=2,
executor_cores=2,
executor_memory='1G')
from pyspark.sql.types import BinaryType, StructType, StructField
schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])
sdf = spark.createDataFrame(obj_ref_list, schema)
The error messages are:
b'\x80\x05\x95\x89\x02\x00\x00\x00\x00\x00\x00\x8c\x11pandas.core.frame\x94\x8c\tDataFrame\x94\x93\x94)\x81\x94}\x94(\x8c\x04_mgr\x94\x8c\x1epandas.core.internals.managers\x94\x8c\x0cBlockManager\x94\x93\x94)\x81\x94(]\x94(\x8c\x18pandas.core.indexes.base\x94\x8c\n_new_Index\x94\x93\x94\x8c\x19pandas.core.indexes.range\x94\x8c\nRangeIndex\x94\x93\x94}\x94(\x8c\x04name\x94N\x8c\x05start\x94K\x00\x8c\x04stop\x94K\x04\x8c\x04step\x94K\x01u\x86\x94R\x94h\rh\x10}\x94(h\x12Nh\x13K\x00h\x14K\x03h\x15K\x01u\x86\x94R\x94e]\x94\x8c\x12numpy.core.numeric\x94\x8c\x0b_frombuffer\x94\x93\x94(\x96`\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x94\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02i8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94bK\x04K\x03\x86\x94\x8c\x01F\x94t\x94R\x94a]\x94h\rh\x10}\x94(h\x12Nh\x13K\x00h\x14K\x04h\x15K\x01u\x86\x94R\x94a}\x94\x8c\x060.14.1\x94}\x94(\x8c\x04axes\x94h\n\x8c\x06blocks\x94]\x94}\x94(\x8c\x06values\x94h+\x8c\x08mgr_locs\x94\x8c\x08builtins\x94\x8c\x05slice\x94\x93\x94K\x00K\x04K\x01\x87\x94R\x94uaust\x94b\x8c\x04_typ\x94\x8c\tdataframe\x94\x8c\t_metadata\x94]\x94\x8c\x05attrs\x94}\x94ub.'
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-35-ff5408f1769a> in <module>
33 schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])
34
---> 35 sdf = spark.createDataFrame(obj_ref_list, schema)
36
37
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
603 return super(SparkSession, self).createDataFrame(
604 data, schema, samplingRatio, verifySchema)
--> 605 return self._create_dataframe(data, schema, samplingRatio, verifySchema)
606
607 def _create_dataframe(self, data, schema, samplingRatio, verifySchema):
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
628 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
629 else:
--> 630 rdd, schema = self._createFromLocal(map(prepare, data), schema)
631 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
632 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _createFromLocal(self, data, schema)
446 # make sure data could consumed multiple times
447 if not isinstance(data, list):
--> 448 data = list(data)
449
450 if schema is None or isinstance(schema, (list, tuple)):
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in prepare(obj)
610
611 def prepare(obj):
--> 612 verify_func(obj)
613 return obj
614 elif isinstance(schema, DataType):
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py in verify(obj)
1406 def verify(obj):
1407 if not verify_nullability(obj):
-> 1408 verify_value(obj)
1409
1410 return verify
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py in verify_struct(obj)
1393 verifier(d.get(f))
1394 else:
-> 1395 raise TypeError(new_msg("StructType can not accept object %r in type %s"
1396 % (obj, type(obj))))
1397 verify_value = verify_struct
TypeError: StructType can not accept object b'\x80\x05\x95\x89\x02\x00\x00\x00\x00\x00\x00\x8c\x11pandas.core.frame\x94\x8c\tDataFrame\x94\x93\x94)\x81\x94}\x94(\x8c\x04_mgr\x94\x8c\x1epandas.core.internals.managers\x94\x8c\x0cBlockManager\x94\x93\x94)\x81\x94(]\x94(\x8c\x18pandas.core.indexes.base\x94\x8c\n_new_Index\x94\x93\x94\x8c\x19pandas.core.indexes.range\x94\x8c\nRangeIndex\x94\x93\x94}\x94(\x8c\x04name\x94N\x8c\x05start\x94K\x00\x8c\x04stop\x94K\x04\x8c\x04step\x94K\x01u\x86\x94R\x94h\rh\x10}\x94(h\x12Nh\x13K\x00h\x14K\x03h\x15K\x01u\x86\x94R\x94e]\x94\x8c\x12numpy.core.numeric\x94\x8c\x0b_frombuffer\x94\x93\x94(\x96`\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x94\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02i8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94bK\x04K\x03\x86\x94\x8c\x01F\x94t\x94R\x94a]\x94h\rh\x10}\x94(h\x12Nh\x13K\x00h\x14K\x04h\x15K\x01u\x86\x94R\x94a}\x94\x8c\x060.14.1\x94}\x94(\x8c\x04axes\x94h\n\x8c\x06blocks\x94]\x94}\x94(\x8c\x06values\x94h+\x8c\x08mgr_locs\x94\x8c\x08builtins\x94\x8c\x05slice\x94\x93\x94K\x00K\x04K\x01\x87\x94R\x94uaust\x94b\x8c\x04_typ\x94\x8c\tdataframe\x94\x8c\t_metadata\x94]\x94\x8c\x05attrs\x94}\x94ub.' in type <class 'bytes'>
What you want to do is probably this:
import ray
import pandas as pd
@ray.remote
def create_small_dataframe(i):
return pd.DataFrame(data=np.random.randint(5*i, size=(3, 4)))
# these are ObjectRef[pd.Dataframe]
obj_ref1 = create_small_dataframe.remote(1)
obj_ref2 = create_small_dataframe.remote(2)
# use cloudpickle to serialize them
ser_obj_ref1 = ray.cloudpickle.dumps(obj_ref1)
ser_obj_ref2 = ray.cloudpickle.dumps(obj_ref2)
obj_refs = [[ser_obj_ref1], [ser_obj_ref2]]
# start spark
import raydp
raydp.stop_spark()
spark = raydp.init_spark('dataframe_with_obj_ref',
num_executors=2,
executor_cores=2,
executor_memory='1G')
# create the dataframe
from pyspark.sql.types import BinaryType, StructType, StructField
schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])
sdf = spark.createDataFrame(obj_refs, schema)
raydp.stop_spark()
ray.shutdown()
What do you want to do with this dataframe? You can check out this PR #166 since it's very similar.
@kira-lin thanks very much. I will take a look at the #166 PR. The idea is to pass various Pandas dataframes
(created by remote functions) via ObjectRef
to Spark dataframe
without moving large chunks of the data from various worker nodes to the driver node in order for the driver to execute spark.createDataFrame()
. If the data is large, the driver node can be out of memory. My understanding is that only spark driver can execute spark.createDataFrame()
and the data must be in the driver's main memory.
oh yes, I see. Just FYI, in ray-nightly, a feature similar to this is under development, in ray.experimental.data. If you don't need it to be a spark dataframe, you might just use their Ray Dataset, and they have a from_pandas function. You can also use to_spark after you use from_pandas, but to_spark has not been merged yet.
@kira-lin Thanks for your help in getting Spark to successfully create a dataframe
with a list of serialized objectref
. After that, I would like to, on the Spark side, get the original dataframe
by ray.get(ray.cloudpickle.loads())
. Here is what I am trying to do after sdf = spark.createDataFrame(obj_refs, schema)
, but I ran into some difficulties. Any help would be greatly appreciated.
# create the dataframe
from pyspark.sql.types import BinaryType, StructType, StructField
schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])
sdf = spark.createDataFrame(obj_refs, schema)
sdf.show()
# converting a spark dataframe into an RDD, and then use flatMap to expand from one row of
# serialized ObjectRef to multiple rows of original dataframe.
myrdd = sdf.rdd.flatMap(lambda x: ray.get(ray.cloudpickle.loads(x['Pandas_df_ref'])))
mydf = myrdd.toDF(['column-1', 'column-2', 'column-3', 'column-4'])
mydf.show()
raydp.stop_spark()
ray.shutdown()
I got an error that puzzles me: it seems to indicate that ray is not started. I don't think I have stopped ray at that point. Here are some of the error messages.
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, 192.168.1.6, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/worker.py", line 605, in main
process()
File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/worker.py", line 597, in process
serializer.dump_stream(out_iter, outfile)
File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/serializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/rdd.py", line 1440, in takeUpToNumLeft
yield next(iterator)
File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/util.py", line 107, in wrapper
return f(*args, **kwargs)
File "<ipython-input-8-d07d992c915a>", line 46, in <lambda>
File "/opt/anaconda3/lib/python3.8/site-packages/ray/serialization.py", line 45, in _object_ref_deserializer
worker.check_connected()
File "/opt/anaconda3/lib/python3.8/site-packages/ray/worker.py", line 199, in check_connected
raise RaySystemError("Ray has not been started yet. You can "
ray.exceptions.RaySystemError: System error: Ray has not been started yet. You can start Ray with 'ray.init()'.
The python function in flatMap
is performed by pyspark workers, and these are not controlled by raydp. Therefore, these processes are not connected to ray, hence the exception. You need to connect to ray by ray.client().connect
or ray.init(address='auto')
before you use ray.get
in the function.
Besides, if you see many connecting to ray
after you do so, just disable it by setting loglevel to warn. I am also investigating this problem.
@kira-lin I think the problem is that an executor of PySpark doesn't have access to Ray, even if I do a ray.init(address='auto', ignore_rayinit_error = True)
before
myrdd = sdf.rdd.flatMap(lambda x: ray.get(ray.cloudpickle.loads(x['Pandas_df_ref']))).
It fails again on the next step:
mydf = myrdd.toDF(['column-1', 'column-2', 'column-3', 'column-4'])
I think due to lazy evaluation in Spark, the execution of flatMap()
is not done until myrdd.toDF()
. However, even if I add another ray.init(address='auto', ignore_rayinit_error = True)
before myrdd.toDF()
, it still fails similarly.
My question to you is: Is there a way in RayDP to pass Ray access from the PySpark driver to the PySpark workers during the spark initialization? Or only the spark driver has access to Ray?
I think in order to support good integration between Ray and Spark, the workers of PySpark need to be able to access Ray, especially for accessing data in the local plasma store. This way we can pass ObjectRef
efficiently and seamlessly between Ray and Spark. thoughts?
Have you tried this:
def map_func(x):
# command for executors to connect to ray cluster
# ray.init will also work
ray.client().connect()
# actual work using ray
ray.get(ray.cloudpickle.loads(x['Pandas_df_ref']))
myrdd = sdf.rdd.flatMap(map_func)
A normal process can access ray if it connects to the ray cluster by ray.init
or ray.client().connect()
. In RayDP, spark executor in java is spawned as a ray actor process, thus it can access ray. But for spark python executors, we did not do the same thing. We'll consider whether to add this feature.
@kira-lin Many thanks again. I tried your suggestion of initiating ray
in the map_func()
. I had to add a return()
to the map_func()
, however. Here is what I did:
obj_ref_schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])
sdf = spark.createDataFrame(data = obj_ref_list, schema = obj_ref_schema)
sdf.show()
def map_func(x):
# command for executors to connect to ray cluster
# ray.init will also work
import pandas as pd
ray.init(address='auto', ignore_reinit_error = True)
# actual work using ray
return(ray.get(ray.cloudpickle.loads(x['Pandas_df_ref'])))
myrdd = sdf.rdd.flatMap(lambda x: map_func(x))
for x in myrdd.collect():
print(x)
mydf = myrdd.toDF(['column-1', 'column-2', 'column-3', 'column-4'])
mydf.show()
However, I found that the original Pandas dataframe
didn't seem to be returned back to the RDD
by the flatMap()
function. I try to print out what is in myrdd
. It seems weird. Here are what I got.
ObjectRef(a67dc375e60ddd1affffffffffffffffffffffff0100000001000000)
type of obj_ref1: <class 'ray._raylet.ObjectRef'>
type of obj_ref_list: <class 'list'>
+--------------------+
| Pandas_df_ref|
+--------------------+
|[80 05 95 A5 00 0...|
|[80 05 95 A5 00 0...|
+--------------------+
(raylet) 2021-07-22 10:58:54,399 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) 2021-07-22 10:58:54,400 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
column-1
column-2
column-3
column-4
column-1
column-2
column-3
column-4
(raylet) 2021-07-22 10:58:55,492 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) (raylet) 2021-07-22 10:58:55,492 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) (raylet) 2021-07-22 10:58:55,492 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) (raylet) (raylet) 2021-07-22 10:58:55,492 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) (raylet) (raylet) 2021-07-22 10:58:55,492 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) (raylet) (raylet) 2021-07-22 10:58:55,492 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) (raylet) (raylet) 2021-07-22 10:58:55,492 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-3-d0cf4dc6d7da> in <module>
58 print(x)
59
---> 60 mydf = myrdd.toDF(['column-1', 'column-2', 'column-3', 'column-4'])
61
62 mydf.show()
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in toDF(self, schema, sampleRatio)
59 [Row(name=u'Alice', age=1)]
60 """
---> 61 return sparkSession.createDataFrame(self, schema, sampleRatio)
62
63 RDD.toDF = toDF
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
603 return super(SparkSession, self).createDataFrame(
604 data, schema, samplingRatio, verifySchema)
--> 605 return self._create_dataframe(data, schema, samplingRatio, verifySchema)
606
607 def _create_dataframe(self, data, schema, samplingRatio, verifySchema):
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
626
627 if isinstance(data, RDD):
--> 628 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
629 else:
630 rdd, schema = self._createFromLocal(map(prepare, data), schema)
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _createFromRDD(self, rdd, schema, samplingRatio)
423 """
424 if schema is None or isinstance(schema, (list, tuple)):
--> 425 struct = self._inferSchema(rdd, samplingRatio, names=schema)
426 converter = _create_converter(struct)
427 rdd = rdd.map(converter)
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _inferSchema(self, rdd, samplingRatio, names)
403
404 if samplingRatio is None:
--> 405 schema = _infer_schema(first, names=names)
406 if _has_nulltype(schema):
407 for row in rdd.take(100)[1:]:
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py in _infer_schema(row, names)
1065
1066 else:
-> 1067 raise TypeError("Can not infer schema for type: %s" % type(row))
1068
1069 fields = [StructField(k, _infer_type(v), True) for k, v in items]
TypeError: Can not infer schema for type: <class 'str'>
You can try to use sdf.mapInPandas
instead of rdd flatmap. Here is a doc. This step is quite similar to the to_spark
function in the previously mentioned PR(arrow table and pandas dataframe are convertible), you can refer to it.
Here is what I tried:
obj_ref_schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])
sdf = spark.createDataFrame(data = obj_ref_list, schema = obj_ref_schema)
sdf.show()
def map_func(x):
# command for executors to connect to ray cluster
# ray.init will also work
import pandas as pd
ray.init(address='auto', ignore_reinit_error = True)
# actual work using ray
yield(ray.get(ray.cloudpickle.loads(x['Pandas_df_ref'])))
mySchema = StructType([StructField('column-1', IntegerType(), True),
StructField('column-2', IntegerType(), True),
StructField('column-3', IntegerType(), True),
StructField('colum-4', IntegerType(), True)])
mydf = sdf.mapInPandas(map_func, mySchema)
mydf.show()
And here is what I got:
ObjectRef(753565f917242c11ffffffffffffffffffffffff0100000001000000)
type of obj_ref1: <class 'ray._raylet.ObjectRef'>
type of obj_ref_list: <class 'list'>
+--------------------+
| Pandas_df_ref|
+--------------------+
|[80 05 95 A5 00 0...|
|[80 05 95 A5 00 0...|
+--------------------+
(raylet) 2021-07-23 10:43:43,660 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:43,660 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:44,537 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:44,537 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:45,638 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:45,638 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:46,504 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:46,504 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
---------------------------------------------------------------------------
PythonException Traceback (most recent call last)
<ipython-input-12-86ea798eccd3> in <module>
61 mydf = sdf.mapInPandas(map_func, mySchema)
62
---> 63 mydf.show()
64
65
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
438 """
439 if isinstance(truncate, bool) and truncate:
--> 440 print(self._jdf.showString(n, 20, vertical))
441 else:
442 print(self._jdf.showString(n, int(truncate), vertical))
/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
132 # Hide where the exception came from that shows a non-Pythonic
133 # JVM exception message.
--> 134 raise_from(converted)
135 else:
136 raise
/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/utils.py in raise_from(e)
PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/worker.py", line 605, in main
process()
File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/worker.py", line 597, in process
serializer.dump_stream(out_iter, outfile)
File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/pandas/serializers.py", line 258, in dump_stream
return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/pandas/serializers.py", line 88, in dump_stream
for batch in iterator:
File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/pandas/serializers.py", line 251, in init_stream_yield_batches
for series in iterator:
File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/worker.py", line 359, in func
for result_batch, result_type in result_iter:
File "<ipython-input-12-86ea798eccd3>", line 54, in map_func
TypeError: 'map' object is not subscriptable
Notice that x
passed to map_func
would be a iterator of pandas dataframe. If not clear, please search mapInPandas
in the doc. The function should look like this one in the PR:
def _convert_blocks_to_dataframe(blocks):
# connect to ray
if not ray.is_initialized():
ray.client().connect()
for block in blocks:
dfs = []
for b in block["ref"]:
ref = ray.cloudpickle.loads(b)
data = ray.get(ref)
dfs.append(data.to_pandas())
yield pd.concat(dfs)
@kira-lin thanks. I think I got it to work in my map_func()
with the following:
def map_func(blocks):
# command for executors to connect to ray cluster
# ray.init will also work
import pandas as pd
if not ray.is_initialized():
ray.init(address='auto', ignore_reinit_error = True)
# actual work using ray
for block in blocks:
dfs =[]
for b in block['Pandas_df_ref']:
data = ray.get(ray.cloudpickle.loads(b))
dfs.append(data)
yield pd.concat(dfs)
The only difference is I didn't do data.to_pandas()
as you did. I think it is because you use Dataset
as an intermediary, while I put the Pandas dataframe
into ray by the remote task. I wonder what implications I might encounter without using Dataset
as the intermediary. One potential issue would be garbage collection - namely the Pandas dataframe
stored in the local plasma store by a remote task might not be available when the Spark mapInPandas()
function is executed. Are there any others?
More importantly, in your pull request, do you know if data locality is observed when your _convert_blocks_to_dataframe(blocks)
is executed by the mapInPands()
? Namely, is data = ray.get(ref)
inside the _convert_blocks_to_dataframe(blocks)
getting all its data in the local plasma store? Or is it possible that this ray.get()
might need to go to many remote plasma stores?
My original motivation is to avoid movement of big data from one node to another node in converting a Pandas dataframe
into a Spark dataframe
, by only passing the ray.ObjectRef
to the Spark driver. This is especially true if all the Pandas dataframes
are big data and have been stored in the local plasma stores of different nodes. I certainly would like the execution of sdf.mapInPandas()
to be mostly involving local plasma data when the map_func()
is executed. I think you probably would like the same for your create_spark_dataframe_from_ray().
I wonder what implications I might encounter without using Dataset as the intermediary.
I think there is not big difference. We need to use to_pandas
because the data stored in Ray Dataset is arrow table. If you store pandas dataframe, that would also be fine. As long as the owner of your dataframe in plasma is alive and objects have not gone out of scope, they should be available. For this part you can refer to ray doc.
Namely, is data = ray.get(ref) inside the _convert_blocks_to_dataframe(blocks) getting all its data in the local plasma store? Or is it possible that this ray.get() might need to go to many remote plasma stores?
You are right, it is possible that data would be fetched from remote nodes in this implementation. If we instead use remote tasks and objectref as its arguments, ray will try to schedule based on locality. But pyspark workers are not aware of this information, thus it has no locality. This is not ready for huge dataset, and we are also looking into other better ways to implement this, maybe spark Datasource.
close as stale. This has been fixed by implementing getPreferredLocations.