spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-40306][SQL]Support more than Integer.MAX_VALUE of the same join key

Open wankunde opened this issue 3 years ago • 2 comments

What changes were proposed in this pull request?

Support more than Integer.MAX_VALUE of the same join key.

Why are the changes needed?

For SMJ, the number of the same join key records of the right table is greater than Integer.MAX_VALUE, the result will be incorrect. Before SMJ JOIN, we will put the records of the same join key into the ExternalAppendOnlyUnsafeRowArray. ExternalAppendOnlyUnsafeRowArray.numRows overflow may cause OOM. During SMJ JOIN, SpillableArrayIterator.startIndex overflow may cause incorrect result.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Manual test.

A simple UT

  test("join with too many duplicate key") {
    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
      val duplicateKeyNumber = Integer.MAX_VALUE + 2
      val df1 = Seq(1).toDF("id")
      val df2 =
        spark.range(1, duplicateKeyNumber, 1, 1000).map(_ => 1).toDF("id")
      df1.join(df2, Seq("id"), "inner").collect()
    }
  }

For this SMJ, the SortMergeJoinScanner will append all id=1 rows into a ExternalAppendOnlyUnsafeRowArray. After appending Integer.MAX_VALUE rows, ExternalAppendOnlyUnsafeRowArray.numRows will be overflow.

wankunde avatar Sep 01 '22 15:09 wankunde

Can one of the admins verify this patch?

AmplabJenkins avatar Sep 02 '22 13:09 AmplabJenkins

Can we add a test? or at least can you describe how you tested it?

A simple UT

  test("join with too many duplicate key") {
    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
      val duplicateKeyNumber = 2L + Integer.MAX_VALUE
      val df1 = Seq(1).toDF("id")
      val df2 =
        spark.range(1, duplicateKeyNumber, 1, 1000).map(_ => 1).toDF("id")
      df1.join(df2, Seq("id"), "inner").collect()
    }
  }

For this SMJ, the SortMergeJoinScanner will append all id=1 rows into a ExternalAppendOnlyUnsafeRowArray. After appending Integer.MAX_VALUE rows, ExternalAppendOnlyUnsafeRowArray.numRows will be overflow.

wankunde avatar Sep 07 '22 03:09 wankunde

@wankunde

in your UT, the variable duplicateKeyNumber is negative

scala> val duplicateKeyNumber = Integer.MAX_VALUE + 2
val duplicateKeyNumber: Int = -2147483647

zhengruifeng avatar Sep 27 '22 09:09 zhengruifeng

@zhengruifeng I'm sorry, I made a mistake, it should be val duplicateKeyNumber = 2L + Integer.MAX_VALUE

wankunde avatar Sep 30 '22 01:09 wankunde

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Jan 09 '23 00:01 github-actions[bot]