spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-46653][SQL] Code-gen for full outer sort merge join output row by row

Open zml1206 opened this issue 1 year ago • 5 comments

What changes were proposed in this pull request?

Be consistent with closing code-gen, update code-gen for full outer sort merge join output row by row. For example:

val a = Seq((1, 2), (2, 3)).toDF("a", "b")
val b = Seq((2, 5), (3, 4)).toDF("a", "c")
a.join(b, Seq("a"), "fullouter")

before this pr, generated code: https://gist.github.com/zml1206/aff18fc313a7164d6f65096a97d233eb after: https://gist.github.com/zml1206/a27350b8849951e6efac0fb6088e527f

Why are the changes needed?

Avoid oom. When code-gen for full outer sort merge join is enbaled and the parent of SortMergeJoin cannot codegen, full outer sort merge join needs to append the output of the same key to BufferedRowIterator.currentRows which type is LinkedList. If there are a large number of duplicate keys, it is likely to cause executor oom.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UT and local test.

val df1 = spark.range(10000).map(_ => ("testkey", "testvalue1")).toDF("key", "value")
val df2 = spark.range(10000).map(_ => ("testkey", "testvalue2")).toDF("key", "value")
df1.join(df2, Seq("key"), "fullouter").show()

Local mode and driver memory 1G. Before this pr will oom.

java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.LinkedList.linkLast(LinkedList.java:146)
	at java.base/java.util.LinkedList.add(LinkedList.java:342)
	at org.apache.spark.sql.execution.BufferedRowIterator.append(BufferedRowIterator.java:73)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.smj_consumeFullOuterJoinRow_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)

After this pr is ok.

Was this patch authored or co-authored using generative AI tooling?

No.

zml1206 avatar Jan 10 '24 05:01 zml1206

@cloud-fan Can you help take a look if you have time? Thanks.

zml1206 avatar Jan 12 '24 02:01 zml1206

cc @cloud-fan @wankunde @ulysses-you do you have any thought about this? Thanks.

zml1206 avatar Jan 23 '24 10:01 zml1206

I'm sorry where is smj_leftIndex_0 reset to 0 ?

https://gist.github.com/zml1206/a27350b8849951e6efac0fb6088e527f#file-full_outer_sort_merge_join_codegen_after-L280-L305

wankunde avatar Jan 25 '24 02:01 wankunde

I'm sorry where is smj_leftIndex_0 reset to 0 ?

https://gist.github.com/zml1206/a27350b8849951e6efac0fb6088e527f#file-full_outer_sort_merge_join_codegen_after-L280-L305

https://gist.github.com/zml1206/a27350b8849951e6efac0fb6088e527f#file-full_outer_sort_merge_join_codegen_after-L126

zml1206 avatar Jan 25 '24 02:01 zml1206

Whenever processNext is called, the buffer will be consumed first, and then findNextJoinRows will be called to reset the index and buffer and match the new row written into the buffer. @wankunde

zml1206 avatar Jan 25 '24 02:01 zml1206

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Jun 28 '24 00:06 github-actions[bot]