spark
spark copied to clipboard
[SPARK-40306][SQL]Support more than Integer.MAX_VALUE of the same join key
What changes were proposed in this pull request?
Support more than Integer.MAX_VALUE of the same join key.
Why are the changes needed?
For SMJ, the number of the same join key records of the right table is greater than Integer.MAX_VALUE, the result will be incorrect. Before SMJ JOIN, we will put the records of the same join key into the ExternalAppendOnlyUnsafeRowArray. ExternalAppendOnlyUnsafeRowArray.numRows overflow may cause OOM. During SMJ JOIN, SpillableArrayIterator.startIndex overflow may cause incorrect result.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Manual test.
A simple UT
test("join with too many duplicate key") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val duplicateKeyNumber = Integer.MAX_VALUE + 2
val df1 = Seq(1).toDF("id")
val df2 =
spark.range(1, duplicateKeyNumber, 1, 1000).map(_ => 1).toDF("id")
df1.join(df2, Seq("id"), "inner").collect()
}
}
For this SMJ, the SortMergeJoinScanner will append all id=1 rows into a ExternalAppendOnlyUnsafeRowArray. After appending Integer.MAX_VALUE rows, ExternalAppendOnlyUnsafeRowArray.numRows will be overflow.
Can one of the admins verify this patch?
Can we add a test? or at least can you describe how you tested it?
A simple UT
test("join with too many duplicate key") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val duplicateKeyNumber = 2L + Integer.MAX_VALUE
val df1 = Seq(1).toDF("id")
val df2 =
spark.range(1, duplicateKeyNumber, 1, 1000).map(_ => 1).toDF("id")
df1.join(df2, Seq("id"), "inner").collect()
}
}
For this SMJ, the SortMergeJoinScanner will append all id=1 rows into a ExternalAppendOnlyUnsafeRowArray. After appending Integer.MAX_VALUE rows, ExternalAppendOnlyUnsafeRowArray.numRows will be overflow.
@wankunde
in your UT, the variable duplicateKeyNumber is negative
scala> val duplicateKeyNumber = Integer.MAX_VALUE + 2
val duplicateKeyNumber: Int = -2147483647
@zhengruifeng I'm sorry, I made a mistake, it should be val duplicateKeyNumber = 2L + Integer.MAX_VALUE
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!