spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-48743][SS]MergingSessionIterator should better handle when getStruct returns null

Open WweiL opened this issue 1 year ago • 1 comments

What changes were proposed in this pull request?

The getStruct() method used in MergingSessionIterator.initialize could return a null value. When that happens, the copy() called upon it throws a NullPointerException.

We see an exception thrown there:

ava.lang.NullPointerException: <Redacted Exception Message>
	at org.apache.spark.sql.execution.aggregate.MergingSessionsIterator.initialize(MergingSessionsIterator.scala:121)
	at org.apache.spark.sql.execution.aggregate.MergingSessionsIterator.<init>(MergingSessionsIterator.scala:130)
	at org.apache.spark.sql.execution.aggregate.MergingSessionsExec.$anonfun$doExecute$1(MergingSessionsExec.scala:93)
	at org.apache.spark.sql.execution.aggregate.MergingSessionsExec.$anonfun$doExecute$1$adapted(MergingSessionsExec.scala:72)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:920)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:920)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:189)
	at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:154)
	at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:148)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:101)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:984)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:987)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:879)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

It is still not clear why that field could be null, but in general Spark should not throw NPEs. So this PR purposes to wrap it with SparkException.internalError with more details.

Why are the changes needed?

Improvemtns

Does this PR introduce any user-facing change?

No

How was this patch tested?

This is a hard-to repro issue. The change should not cause any harm.

Was this patch authored or co-authored using generative AI tooling?

No

WweiL avatar Jun 27 '24 22:06 WweiL

cc @HeartSaVioR @sigmod PTAL! Thank you!

WweiL avatar Jun 27 '24 22:06 WweiL

Merged to master.

HyukjinKwon avatar Jul 09 '24 10:07 HyukjinKwon