spark
spark copied to clipboard
[SPARK-48743][SS]MergingSessionIterator should better handle when getStruct returns null
What changes were proposed in this pull request?
The getStruct() method used in MergingSessionIterator.initialize could return a null value. When that happens, the copy() called upon it throws a NullPointerException.
We see an exception thrown there:
ava.lang.NullPointerException: <Redacted Exception Message>
at org.apache.spark.sql.execution.aggregate.MergingSessionsIterator.initialize(MergingSessionsIterator.scala:121)
at org.apache.spark.sql.execution.aggregate.MergingSessionsIterator.<init>(MergingSessionsIterator.scala:130)
at org.apache.spark.sql.execution.aggregate.MergingSessionsExec.$anonfun$doExecute$1(MergingSessionsExec.scala:93)
at org.apache.spark.sql.execution.aggregate.MergingSessionsExec.$anonfun$doExecute$1$adapted(MergingSessionsExec.scala:72)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:920)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:920)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:189)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:154)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:148)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:984)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:987)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:879)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
It is still not clear why that field could be null, but in general Spark should not throw NPEs. So this PR purposes to wrap it with SparkException.internalError with more details.
Why are the changes needed?
Improvemtns
Does this PR introduce any user-facing change?
No
How was this patch tested?
This is a hard-to repro issue. The change should not cause any harm.
Was this patch authored or co-authored using generative AI tooling?
No
cc @HeartSaVioR @sigmod PTAL! Thank you!
Merged to master.