raydp icon indicating copy to clipboard operation
raydp copied to clipboard

raydp.modin module to integrate Modin?

Open Hoeze opened this issue 4 years ago • 6 comments

TL;DR: How does one zero-copy convert a PySpark dataframe to a Modin dataframe?

I am currently searching for a way to manipulate PySpark dataframes without materializing them as a Pandas dataframe. Since my experience with Ray has been quite good, I wonder whether it would be possible to solve this with Modin.

If yes, I think a raydp.modin module would be a perfect addition to this project :)

Hoeze avatar Feb 05 '21 18:02 Hoeze

Hi @Hoeze, Spark has their only data format (which called InternalRow) in memory. And as I know the partition of modin dataframe is represented by pandas DataFrame. So the from InternalRow to pandas DataFrame should be the most straightforward conversion.

ConeyLiu avatar Feb 07 '21 03:02 ConeyLiu

Thanks for your answer @ConeyLiu. What I'm basically asking for, is the following method integrated to RayDP:

  1. convert each Spark partition to a separate Pandas dataframe (zero copy). This has to happen on the Spark worker that holds the partition.
  2. put each dataframe into Ray's object store
  3. hand list of Ray remote dataframes over to Modin

This avoids collecting all data in a single large Pandas dataframe on the host and then distributing it again with Ray.

Hoeze avatar Feb 07 '21 03:02 Hoeze

We have already done this by leverage arrow format. You can see https://github.com/oap-project/raydp/blob/1f4853f44476fad52b9d2fa4519c4c76974557ef/python/raydp/spark/dataset.py#L70. This will save the spark dataframe into the ray object store and get a list of ray ObjectId as a return. The only gap is to wrap those ObjectIds into a Modin DataFrame.

ConeyLiu avatar Feb 07 '21 04:02 ConeyLiu

@Hoeze , Thanks for the feedback! I am curious what your entire pipeline looks like and what operations are needed after the Spark processing? We also support creating a Ray MLDataset (a parallel iterator of pandas dataframe) from the Spark dataframe https://github.com/oap-project/raydp/blob/f833995a569df7da285b61e5eff415526538a360/python/raydp/spark/dataset.py#L31 This MLDataset can be used to transform on each shard and connect with other Ray libraries like RaySGD. This will not collect all data on a single host. Will this be useful in your use case? Or only modin can meet your requirement?

carsonwang avatar Feb 07 '21 04:02 carsonwang

We have already done this by leverage arrow format. You can see

https://github.com/oap-project/raydp/blob/1f4853f44476fad52b9d2fa4519c4c76974557ef/python/raydp/spark/dataset.py#L70 . This will save the spark dataframe into the ray object store and get a list of ray ObjectId as a return. The only gap is to wrap those ObjectIds into a Modin DataFrame.

Perfect! Then the missing gap can be filled with that: https://github.com/modin-project/modin/issues/2704

@Hoeze , Thanks for the feedback! I am curious what your entire pipeline looks like and what operations are needed after the Spark processing?

In the past, I tried Spark but had a lot of issues with in theory trivial joins that caused many full shuffle operations in practice. As a solution, I tried applying Pandas UDF's that did the join manually. This worked to a certain extent; writing Pandas UDF's is not easy to get right, especially with the different ouput decorators. Also, they were super buggy and often caused random Memory was leaked by query exceptions. That's why I now use Spark only for basic cleanup and then directly write it to Parquet files that I can read with Pandas. Of course, that's very annoying.

If there is now an easy way to directly convert PySpark to Python dataframes back and forth, one can just use the technology that fits best to solve the task.

  • It is not necessary any more to write complex Pandas UDFs
  • Things are more easily debuggable, as one has direct access to the remote DataFrame partitions and everything is native Python code

Who needs Pandas UDF when he can work seamlessly with Modin? :smile:

We also support creating a Ray MLDataset (a parallel iterator of pandas dataframe) from the Spark dataframe

https://github.com/oap-project/raydp/blob/f833995a569df7da285b61e5eff415526538a360/python/raydp/spark/dataset.py#L31 This MLDataset can be used to transform on each shard and connect with other Ray libraries like RaySGD. This will not collect all data on a single host. Will this be useful in your use case? Or only modin can meet your requirement?

I have to admit I did not look into MLDataset yet, but I highly appreciate Modin for its nearly complete Pandas API compatibility.

Hoeze avatar Feb 07 '21 14:02 Hoeze

Thanks for the detailed explanation. This makes sense. We should probably add an example for this.

carsonwang avatar Feb 08 '21 05:02 carsonwang

close as stale

kira-lin avatar Apr 14 '23 08:04 kira-lin