raydp
raydp copied to clipboard
raydp.modin module to integrate Modin?
TL;DR: How does one zero-copy convert a PySpark dataframe to a Modin dataframe?
I am currently searching for a way to manipulate PySpark dataframes without materializing them as a Pandas dataframe. Since my experience with Ray has been quite good, I wonder whether it would be possible to solve this with Modin.
If yes, I think a raydp.modin
module would be a perfect addition to this project :)
Hi @Hoeze, Spark has their only data format (which called InternalRow) in memory. And as I know the partition of modin dataframe is represented by pandas DataFrame. So the from InternalRow
to pandas DataFrame should be the most straightforward conversion.
Thanks for your answer @ConeyLiu. What I'm basically asking for, is the following method integrated to RayDP:
- convert each Spark partition to a separate Pandas dataframe (zero copy). This has to happen on the Spark worker that holds the partition.
- put each dataframe into Ray's object store
- hand list of Ray remote dataframes over to Modin
This avoids collecting all data in a single large Pandas dataframe on the host and then distributing it again with Ray.
We have already done this by leverage arrow format. You can see https://github.com/oap-project/raydp/blob/1f4853f44476fad52b9d2fa4519c4c76974557ef/python/raydp/spark/dataset.py#L70. This will save the spark dataframe into the ray object store and get a list of ray ObjectId
as a return. The only gap is to wrap those ObjectId
s into a Modin DataFrame.
@Hoeze , Thanks for the feedback! I am curious what your entire pipeline looks like and what operations are needed after the Spark processing? We also support creating a Ray MLDataset (a parallel iterator of pandas dataframe) from the Spark dataframe https://github.com/oap-project/raydp/blob/f833995a569df7da285b61e5eff415526538a360/python/raydp/spark/dataset.py#L31 This MLDataset can be used to transform on each shard and connect with other Ray libraries like RaySGD. This will not collect all data on a single host. Will this be useful in your use case? Or only modin can meet your requirement?
We have already done this by leverage arrow format. You can see
https://github.com/oap-project/raydp/blob/1f4853f44476fad52b9d2fa4519c4c76974557ef/python/raydp/spark/dataset.py#L70 . This will save the spark dataframe into the ray object store and get a list of ray
ObjectId
as a return. The only gap is to wrap thoseObjectId
s into a Modin DataFrame.
Perfect! Then the missing gap can be filled with that: https://github.com/modin-project/modin/issues/2704
@Hoeze , Thanks for the feedback! I am curious what your entire pipeline looks like and what operations are needed after the Spark processing?
In the past, I tried Spark but had a lot of issues with in theory trivial joins that caused many full shuffle operations in practice.
As a solution, I tried applying Pandas UDF's that did the join manually.
This worked to a certain extent; writing Pandas UDF's is not easy to get right, especially with the different ouput decorators. Also, they were super buggy and often caused random Memory was leaked by query
exceptions.
That's why I now use Spark only for basic cleanup and then directly write it to Parquet files that I can read with Pandas.
Of course, that's very annoying.
If there is now an easy way to directly convert PySpark to Python dataframes back and forth, one can just use the technology that fits best to solve the task.
- It is not necessary any more to write complex Pandas UDFs
- Things are more easily debuggable, as one has direct access to the remote DataFrame partitions and everything is native Python code
Who needs Pandas UDF when he can work seamlessly with Modin? :smile:
We also support creating a Ray MLDataset (a parallel iterator of pandas dataframe) from the Spark dataframe
https://github.com/oap-project/raydp/blob/f833995a569df7da285b61e5eff415526538a360/python/raydp/spark/dataset.py#L31 This MLDataset can be used to transform on each shard and connect with other Ray libraries like RaySGD. This will not collect all data on a single host. Will this be useful in your use case? Or only modin can meet your requirement?
I have to admit I did not look into MLDataset
yet, but I highly appreciate Modin for its nearly complete Pandas API compatibility.
Thanks for the detailed explanation. This makes sense. We should probably add an example for this.
close as stale