datafusion-comet
datafusion-comet copied to clipboard
Add option to replace SortMergeJoin with ShuffleHashJoin
What is the problem the feature request solves?
Other Spark accelerators, such as Spark RAPIDS and Apache Gluten, replace SortMergeJoin with ShuffleHashJoin for improved performance. We should evaluate this approach for Comet.
Spark RAPIDS
val ENABLE_REPLACE_SORTMERGEJOIN = conf("spark.rapids.sql.replaceSortMergeJoin.enabled")
.doc("Allow replacing sortMergeJoin with HashJoin")
.booleanConf
.createWithDefault(true)
Apache Gluten
val COLUMNAR_FPRCE_SHUFFLED_HASH_JOIN_ENABLED =
buildConf("spark.gluten.sql.columnar.forceShuffledHashJoin")
.internal()
.booleanConf
.createWithDefault(true)
/**
* If force ShuffledHashJoin, convert [[SortMergeJoinExec]] to [[ShuffledHashJoinExec]]. There is no
* need to select a smaller table as buildSide here, it will be reselected when offloading.
*/
object RewriteJoin extends RewriteSingleNode with JoinSelectionHelper {
Describe the potential solution
No response
Additional context
No response
It sounds reasonable. The vectorized implementation of SMJ looks inefficient in DataFusion. I'm not sure if there is any optimized algorithm for SMJ in vectorized execution. If not, using SHJ to replace SMJ will be good for performance.