[#2596] feat(spark): Introduce fory serializer
What changes were proposed in this pull request?
This is an experimental feature to introduce the fory serializer to replace the villina spark serializer to speed up.
Why are the changes needed?
for #2596
Does this PR introduce any user-facing change?
Yes.
spark.rss.client.shuffle.serializer=FORY
How was this patch tested?
Unit test.
cc @chaokunyang . If you have time, could you help review this integration with Fory?
So far, this implementation hasn’t shown significant improvements. I would greatly appreciate any guidance you could provide on using Fory.
Test Results
2 731 files - 359 2 731 suites - 359 4h 10m 44s ⏱️ - 2h 38m 52s 1 112 tests - 86 1 026 ✅ - 171 1 💤 ±0 2 ❌ + 2 83 🔥 + 83 14 465 runs - 701 14 252 ✅ - 899 15 💤 ±0 32 ❌ +32 166 🔥 +166
For more details on these failures and errors, see this check.
Results for commit c2a7d46a. ± Comparison against base commit d5e689c3.
This pull request removes 99 and adds 13 tests. Note that renamed tests count towards both.
org.apache.spark.shuffle.DelegationRssShuffleManagerTest ‑ testCreateFallback
org.apache.spark.shuffle.DelegationRssShuffleManagerTest ‑ testCreateInDriver
org.apache.spark.shuffle.DelegationRssShuffleManagerTest ‑ testCreateInDriverDenied
org.apache.spark.shuffle.DelegationRssShuffleManagerTest ‑ testCreateInExecutor
org.apache.spark.shuffle.DelegationRssShuffleManagerTest ‑ testDefaultIncludeExcludeProperties
org.apache.spark.shuffle.DelegationRssShuffleManagerTest ‑ testExcludeProperties
org.apache.spark.shuffle.DelegationRssShuffleManagerTest ‑ testIncludeProperties
org.apache.spark.shuffle.DelegationRssShuffleManagerTest ‑ testTryAccessCluster
org.apache.spark.shuffle.FunctionUtilsTests ‑ testOnceFunction0
org.apache.spark.shuffle.RssShuffleManagerTest ‑ testCreateShuffleManagerServer
…
org.apache.spark.serializer.ForySerializerTest ‑ ForyDeserializationStream should handle stream operations after close
org.apache.spark.serializer.ForySerializerTest ‑ ForySerializationStream should handle empty stream
org.apache.spark.serializer.ForySerializerTest ‑ ForySerializationStream should handle stream operations after close
org.apache.spark.serializer.ForySerializerTest ‑ ForySerializationStream should serialize and deserialize simple objects
org.apache.spark.serializer.ForySerializerTest ‑ ForySerializer should create new instance
org.apache.spark.serializer.ForySerializerTest ‑ ForySerializer should support relocation of serialized objects
org.apache.spark.serializer.ForySerializerTest ‑ ForySerializerInstance should handle byte arrays
org.apache.spark.serializer.ForySerializerTest ‑ ForySerializerInstance should handle large strings
org.apache.spark.serializer.ForySerializerTest ‑ ForySerializerInstance should handle null values
org.apache.spark.serializer.ForySerializerTest ‑ ForySerializerInstance should serialize and deserialize simple case class
…
:recycle: This comment has been updated with latest results.
Shuffle data should already be binary, is there anything that needs being serialized?
Have you ever benchmark your job to see whether there is bottleneck on serialization?
Big thanks for your quick and patient review. @chaokunyang
Shuffle data should already be binary, is there anything that needs being serialized?
If using the vanilla spark, record is a object class and then serialized into bytes to push to remote shuffle-server. If using the gluten/auron/datafusion-comet, there is no need to serialize.
Have you ever benchmark your job to see whether there is bottleneck on serialization?
Haven't. This PR is still in initial phase
Only if you are using spark rdd with raw java objects, there will be serialization bottleneck. Such cases are similiar to datastream in flink. We've observed several times of e2e performance speed up for multiple cases.
Only if you are using spark rdd with raw java objects, there will be serialization bottleneck. Such cases are similiar to datastream in flink. We've observed several times of e2e performance speed up for multiple cases.
Thanks for your sharing. Do you mean that there is no need to optimize performance of vanilla spark SQL shuffle serialization ?
Data record in Spark SQL are alreay binary, there is no serialization happened. I suggest benchmark first before optimizing.
Data record in Spark SQL are alreay binary, there is no serialization happened. I suggest benchmark first before optimizing.
It seems that serialization is still happening. https://github.com/apache/spark/blob/2de0248071035aa94818386c2402169f6670d2d4/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala#L57
The product2 contains the Key/Value that will be serializated. refer: https://github.com/apache/spark/blob/47991b074a5a277e1fb75be3a5cc207f400b0b0c/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java#L243
The serialization of Spark happens in the shuffle write shuffle stage.