[SPARK-46653][SQL] Code-gen for full outer sort merge join output row by row
What changes were proposed in this pull request?
Be consistent with closing code-gen, update code-gen for full outer sort merge join output row by row. For example:
val a = Seq((1, 2), (2, 3)).toDF("a", "b")
val b = Seq((2, 5), (3, 4)).toDF("a", "c")
a.join(b, Seq("a"), "fullouter")
before this pr, generated code: https://gist.github.com/zml1206/aff18fc313a7164d6f65096a97d233eb after: https://gist.github.com/zml1206/a27350b8849951e6efac0fb6088e527f
Why are the changes needed?
Avoid oom. When code-gen for full outer sort merge join is enbaled and the parent of SortMergeJoin cannot codegen, full outer sort merge join needs to append the output of the same key to BufferedRowIterator.currentRows which type is LinkedList. If there are a large number of duplicate keys, it is likely to cause executor oom.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UT and local test.
val df1 = spark.range(10000).map(_ => ("testkey", "testvalue1")).toDF("key", "value")
val df2 = spark.range(10000).map(_ => ("testkey", "testvalue2")).toDF("key", "value")
df1.join(df2, Seq("key"), "fullouter").show()
Local mode and driver memory 1G. Before this pr will oom.
java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.LinkedList.linkLast(LinkedList.java:146)
at java.base/java.util.LinkedList.add(LinkedList.java:342)
at org.apache.spark.sql.execution.BufferedRowIterator.append(BufferedRowIterator.java:73)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.smj_consumeFullOuterJoinRow_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
After this pr is ok.
Was this patch authored or co-authored using generative AI tooling?
No.
@cloud-fan Can you help take a look if you have time? Thanks.
cc @cloud-fan @wankunde @ulysses-you do you have any thought about this? Thanks.
I'm sorry where is smj_leftIndex_0 reset to 0 ?
https://gist.github.com/zml1206/a27350b8849951e6efac0fb6088e527f#file-full_outer_sort_merge_join_codegen_after-L280-L305
I'm sorry where is smj_leftIndex_0 reset to 0 ?
https://gist.github.com/zml1206/a27350b8849951e6efac0fb6088e527f#file-full_outer_sort_merge_join_codegen_after-L280-L305
https://gist.github.com/zml1206/a27350b8849951e6efac0fb6088e527f#file-full_outer_sort_merge_join_codegen_after-L126
Whenever processNext is called, the buffer will be consumed first, and then findNextJoinRows will be called to reset the index and buffer and match the new row written into the buffer. @wankunde
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!