[BUG] `UPDATE` on a Databricks (10.4) DELTA table leads to JVM crash
The Spark Executor JVM crashes when UPDATE command is run on a Delta table on Databricks 10.4. This does not appear to break on Apache Spark (3.2.1, at least).
Repro code
import org.apache.spark.sql.types._
import org.apache.spark.sql._
spark.conf.set("spark.rapids.sql.enabled", false)
val schema = StructType(Array(
StructField("country", StringType),
StructField("year", StringType),
)
)
val corpus_rows = Array(
Row("china", "1920"),
Row("japan", "1920"),
Row("china", "2021"),
)
val rdd = sc.parallelize(corpus_rows)
spark.createDataFrame(rdd, schema).repartition(1).write.mode(SaveMode.Overwrite).format("delta").save("/tmp/myth/delta/corpus")
val read_df = spark.read.format("delta").load("/tmp/myth/delta/corpus")
read_df.createOrReplaceTempView("mycorpus")
// Crashy Update
spark.conf.set("spark.rapids.sql.enabled", true)
sql(" UPDATE mycorpus SET year=2022 WHERE year=2023 ").show // Crash!
Crash output:
#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x00007f1d3d3ae8f5, pid=24326, tid=0x00007f1bcd7ff700
#
# JRE version: OpenJDK Runtime Environment (Zulu 8.56.0.21-CA-linux64) (8.0_302-b08) (build 1.8.0_302-b08)
# Java VM: OpenJDK 64-Bit Server VM (25.302-b08 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# C [libc.so.6+0x18b8f5]
#
# Core dump written. Default location: /home/ubuntu/spark-rapids/core or core.24326
#
# An error report file with more information is saved as:
# /home/ubuntu/spark-rapids/hs_err_pid24326.log
#Stage 8:> (0 + 1) / 1]
# If you would like to submit a bug report, please visit:
# http://www.azul.com/support/
#
/databricks/spark/bin/spark-shell: line 47: 24326 Aborted (core dumped) "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
Hmm. Looks like this has to do with the implied cast in the WHERE clause:
UPDATE mycorpus
SET year=2022
WHERE year=2023 ---> Should've been a STRING.
A JVM crash is not ideal.
Relevant stack frames for future investigation:
Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
j ai.rapids.cudf.Table.partition(JJI[I)[J+0
j ai.rapids.cudf.Table.partition(Lai/rapids/cudf/ColumnView;I)Lai/rapids/cudf/PartitionedTable;+23
j com.nvidia.spark.rapids.GpuHashPartitioningBase.$anonfun$partitionInternalAndClose$6(Lcom/nvidia/spark/rapids/GpuHashPartitioningBase;Lai/rapids/cudf/ColumnVector;Lai/rapids/cudf/Table;)Lai/rapids/cudf/PartitionedTable;+6
j com.nvidia.spark.rapids.GpuHashPartitioningBase$$Lambda$7341.apply(Ljava/lang/Object;)Ljava/lang/Object;+12
j com.nvidia.spark.rapids.Arm.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+2
j com.nvidia.spark.rapids.Arm.withResource$(Lcom/nvidia/spark/rapids/Arm;Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j com.nvidia.spark.rapids.GpuHashPartitioningBase.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j com.nvidia.spark.rapids.GpuHashPartitioningBase.$anonfun$partitionInternalAndClose$5(Lcom/nvidia/spark/rapids/GpuHashPartitioningBase;Lorg/apache/spark/sql/vectorized/ColumnarBatch;Lai/rapids/cudf/ColumnVector;)Lai/rapids/cudf/PartitionedTable;+12
j com.nvidia.spark.rapids.GpuHashPartitioningBase$$Lambda$7340.apply(Ljava/lang/Object;)Ljava/lang/Object;+12
j com.nvidia.spark.rapids.Arm.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+2
j com.nvidia.spark.rapids.Arm.withResource$(Lcom/nvidia/spark/rapids/Arm;Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j com.nvidia.spark.rapids.GpuHashPartitioningBase.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j com.nvidia.spark.rapids.GpuHashPartitioningBase.$anonfun$partitionInternalAndClose$1(Lcom/nvidia/spark/rapids/GpuHashPartitioningBase;Lorg/apache/spark/sql/vectorized/ColumnarBatch;)Lai/rapids/cudf/PartitionedTable;+37
j com.nvidia.spark.rapids.GpuHashPartitioningBase$$Lambda$7331.apply(Ljava/lang/Object;)Ljava/lang/Object;+8
j com.nvidia.spark.rapids.Arm.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+2
j com.nvidia.spark.rapids.Arm.withResource$(Lcom/nvidia/spark/rapids/Arm;Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j com.nvidia.spark.rapids.GpuHashPartitioningBase.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j com.nvidia.spark.rapids.GpuHashPartitioningBase.partitionInternalAndClose(Lorg/apache/spark/sql/vectorized/ColumnarBatch;)Lscala/Tuple2;+13
j com.nvidia.spark.rapids.GpuHashPartitioningBase.$anonfun$columnarEval$2(Lcom/nvidia/spark/rapids/GpuHashPartitioningBase;Lorg/apache/spark/sql/vectorized/ColumnarBatch;Lai/rapids/cudf/NvtxRange;)Lscala/Tuple2;+2
j com.nvidia.spark.rapids.GpuHashPartitioningBase$$Lambda$7330.apply(Ljava/lang/Object;)Ljava/lang/Object;+12
j com.nvidia.spark.rapids.Arm.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+2
j com.nvidia.spark.rapids.Arm.withResource$(Lcom/nvidia/spark/rapids/Arm;Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j com.nvidia.spark.rapids.GpuHashPartitioningBase.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j com.nvidia.spark.rapids.GpuHashPartitioningBase.$anonfun$columnarEval$1(Lcom/nvidia/spark/rapids/GpuHashPartitioningBase;Lorg/apache/spark/sql/vectorized/ColumnarBatch;Lai/rapids/cudf/NvtxRange;)[Lscala/Tuple2;+27
j com.nvidia.spark.rapids.GpuHashPartitioningBase$$Lambda$7329.apply(Ljava/lang/Object;)Ljava/lang/Object;+12
j com.nvidia.spark.rapids.Arm.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+2
j com.nvidia.spark.rapids.Arm.withResource$(Lcom/nvidia/spark/rapids/Arm;Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j com.nvidia.spark.rapids.GpuHashPartitioningBase.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j com.nvidia.spark.rapids.GpuHashPartitioningBase.columnarEval(Lorg/apache/spark/sql/vectorized/ColumnarBatch;)Ljava/lang/Object;+21
j org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$.$anonfun$prepareBatchShuffleDependency$3(Lcom/nvidia/spark/rapids/GpuExpression;Lorg/apache/spark/sql/vectorized/ColumnarBatch;)Ljava/lang/Object;+2
j org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$$Lambda$7150.apply(Ljava/lang/Object;)Ljava/lang/Object;+8
j org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.partNextBatch()V+154
j org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.hasNext()Z+21
j org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(Lscala/collection/Iterator;)V+44
j org.apache.spark.shuffle.ShuffleWriteProcessor.write(Lorg/apache/spark/rdd/RDD;Lorg/apache/spark/ShuffleDependency;JLorg/apache/spark/TaskContext;Lorg/apache/spark/Partition;)Lorg/apache/spark/scheduler/MapStatus;+46
j org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(Lorg/apache/spark/scheduler/ShuffleMapTask;Lorg/apache/spark/ShuffleDependency;Lorg/apache/spark/rdd/RDD;JLorg/apache/spark/TaskContext;)Lorg/apache/spark/scheduler/MapStatus;+13
j org.apache.spark.scheduler.ShuffleMapTask$$Lambda$5245.apply()Ljava/lang/Object;+20
j com.databricks.spark.util.ExecutorFrameProfiler$.record(Ljava/lang/String;Ljava/lang/String;Lscala/Function0;)Ljava/lang/Object;+82
j org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(Lorg/apache/spark/scheduler/ShuffleMapTask;Lorg/apache/spark/TaskContext;)Lorg/apache/spark/scheduler/MapStatus;+194
j org.apache.spark.scheduler.ShuffleMapTask$$Lambda$5232.apply()Ljava/lang/Object;+8
j com.databricks.spark.util.ExecutorFrameProfiler$.record(Ljava/lang/String;Ljava/lang/String;Lscala/Function0;)Ljava/lang/Object;+82
j org.apache.spark.scheduler.ShuffleMapTask.runTask(Lorg/apache/spark/TaskContext;)Lorg/apache/spark/scheduler/MapStatus;+14
j org.apache.spark.scheduler.ShuffleMapTask.runTask(Lorg/apache/spark/TaskContext;)Ljava/lang/Object;+2
j org.apache.spark.scheduler.Task.doRunTask(JILscala/Option;)Ljava/lang/Object;+151
j org.apache.spark.scheduler.Task.$anonfun$run$1(Lorg/apache/spark/scheduler/Task;JILorg/apache/spark/metrics/MetricsSystem;Lscala/collection/immutable/Map;Lscala/Enumeration$Value;Lscala/Option;)Ljava/lang/Object;+116
j org.apache.spark.scheduler.Task$$Lambda$5187.apply()Ljava/lang/Object;+28
j org.apache.spark.scheduler.Task.run(JILorg/apache/spark/metrics/MetricsSystem;Lscala/collection/immutable/Map;Lscala/Option;Lscala/Enumeration$Value;)Ljava/lang/Object;+23
j org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Lorg/apache/spark/executor/Executor$TaskRunner;Lscala/runtime/BooleanRef;)Ljava/lang/Object;+43
j org.apache.spark.executor.Executor$TaskRunner$$Lambda$5183.apply()Ljava/lang/Object;+8
j org.apache.spark.util.Utils$.tryWithSafeFinally(Lscala/Function0;Lscala/Function0;)Ljava/lang/Object;+4
j org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Lorg/apache/spark/executor/Executor$TaskRunner;)V+965
j org.apache.spark.executor.Executor$TaskRunner$$Lambda$5162.apply$mcV$sp()V+4
J 11063 C1 scala.runtime.java8.JFunction0$mcV$sp.apply()Ljava/lang/Object; (10 bytes) @ 0x00007f1d255a9d6c [0x00007f1d255a9d00+0x6c]
j com.databricks.spark.util.ExecutorFrameProfiler$.record(Ljava/lang/String;Ljava/lang/String;Lscala/Function0;)Ljava/lang/Object;+82
j org.apache.spark.executor.Executor$TaskRunner.run()V+38
j java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V+95
j java.util.concurrent.ThreadPoolExecutor$Worker.run()V+5
j java.lang.Thread.run()V+11
@mythrocks I tried to run the snippet above on Databrick cluster "10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)" but don't see any crash. Is there any special config needed?
Can you verify that you still reproduce the crash, please?
Curious, why the stack showed it called GPU cudf code, but the repro code contains spark.conf.set("spark.rapids.sql.enabled", false) to disable the GPU? Is it correct? @mythrocks
Curious, why the stack showed it called GPU cudf code, but the repro code contains spark.conf.set("spark.rapids.sql.enabled", false) to disable the GPU? Is it correct? @mythrocks
It is enabled just before running the update operation.
// Crashy Update
spark.conf.set("spark.rapids.sql.enabled", true)
sql(" UPDATE mycorpus SET year=2022 WHERE year=2023 ").show
~Ah, right. I should have enabled the plugin. Thanks for catching it. I'll correct the description.~ Edit: Nope, the script is correct, as it is.
I'm trying to repro it at my end as well.
Edit: Yep, it's still crashing:
#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x00007f193ee788f5, pid=6511, tid=0x00007f17b9bff700
#
# JRE version: OpenJDK Runtime Environment (Zulu 8.56.0.21-CA-linux64) (8.0_302-b08) (build 1.8.0_302-b08)
# Java VM: OpenJDK 64-Bit Server VM (25.302-b08 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# C [libc.so.6+0x18b8f5]
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /home/ubuntu/work/spark-rapids/hs_err_pid6511.log
#Stage 5:> (0 + 1) / 1]
# If you would like to submit a bug report, please visit:
# http://www.azul.com/support/
#
/databricks/spark/bin/spark-shell: line 47: 6511 Aborted (core dumped) "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
Here's what I'm getting from compute-sanitizer:
========= Program hit named symbol not found (error 500) on CUDA API call to cuGetProcAddress.
========= Saved host backtrace up to driver entry point at error
========= Host Frame: [0x218540]
========= in /lib/x86_64-linux-gnu/libcuda.so.1
========= Host Frame: [0x124d44]
========= in /tmp/nvcomp1455830403426930150.so
========= Host Frame: [0x128e67]
========= in /tmp/nvcomp1455830403426930150.so
========= Host Frame: [0x128f68]
========= in /tmp/nvcomp1455830403426930150.so
========= Host Frame: [0x114df]
========= in /lib/x86_64-linux-gnu/libpthread.so.0
========= Host Frame: [0x176ca9]
========= in /tmp/nvcomp1455830403426930150.so
========= Host Frame: [0x11fb97]
========= in /tmp/nvcomp1455830403426930150.so
========= Host Frame: [0x16ab5f]
========= in /tmp/nvcomp1455830403426930150.so
========= Host Frame:__device_stub__ZN6nvcomp13unsnap_kernelEPKPKvPKmPKPvS5_P14nvcompStatus_tPm(void const * const *, unsigned long const *, void* const *, unsigned long const *, nvcompStatus_t*, unsigned long*) [0xf57cc]
========= in /tmp/nvcomp1455830403426930150.so
========= Host Frame:nvcomp::gpu_unsnap(void const * const *, unsigned long const *, void* const *, unsigned long const *, nvcompStatus_t*, unsigned long*, int, CUstream_st*) [0xf58c7]
========= in /tmp/nvcomp1455830403426930150.so
========= Host Frame:nvcompBatchedSnappyDecompressAsync [0x1090f4]
========= in /tmp/nvcomp1455830403426930150.so
========= Host Frame:cudf::io::nvcomp::batched_decompress(cudf::io::nvcomp::compression_type, cudf::device_span<cudf::device_span<unsigned char const , unsigned long=18446744073709551615> const , unsigned long=18446744073709551615>, cudf::device_span<cudf::device_span<unsigned char, unsigned long=18446744073709551615> const , unsigned long=18446744073709551615>, cudf::device_span<cudf::io::decompress_status, unsigned long=18446744073709551615>, unsigned long, unsigned long, rmm::cuda_stream_view) [0x1ccec90]
========= in /tmp/cudf6781276602044116984.so
========= Host Frame:cudf::io::detail::parquet::reader::impl::decompress_page_data(hostdevice_vector<cudf::io::parquet::gpu::ColumnChunkDesc>&, hostdevice_vector<cudf::io::parquet::gpu::PageInfo>&) [0x1d884bf]
========= in /tmp/cudf6781276602044116984.so
========= Host Frame:cudf::io::detail::parquet::reader::impl::read(std::vector<std::vector<int, std::allocator<int>>, std::allocator<std::vector<int, std::allocator<int>>>> const &) [0x1d8ab76]
========= in /tmp/cudf6781276602044116984.so
========= Host Frame:cudf::io::detail::parquet::reader::read(cudf::io::parquet_reader_options const &) [0x1d8aed4]
========= in /tmp/cudf6781276602044116984.so
========= Host Frame:cudf::io::read_parquet(cudf::io::parquet_reader_options const &, rmm::mr::device_memory_resource*) [0x1cf6260]
========= in /tmp/cudf6781276602044116984.so
========= Host Frame:Java_ai_rapids_cudf_Table_readParquet [0x14b4c35]
========= in /tmp/cudf6781276602044116984.so
========= Host Frame: [0x7fe98d018527]
========= in
I wonder if this points towards predicate pushdown in the Parquet reader.
Should that be crash in nvcomp code?
If so, the crash should happen right at the reading stage. So this should also crash:
val read_df = ....
read_df.show()
~Ah, right. I should have enabled the plugin. Thanks for catching it. I'll correct the description.~ Edit: Nope, the script is correct, as it is.
I'm trying to repro it at my end as well.
Edit: Yep, it's still crashing:
# # A fatal error has been detected by the Java Runtime Environment: # # SIGSEGV (0xb) at pc=0x00007f193ee788f5, pid=6511, tid=0x00007f17b9bff700 # # JRE version: OpenJDK Runtime Environment (Zulu 8.56.0.21-CA-linux64) (8.0_302-b08) (build 1.8.0_302-b08) # Java VM: OpenJDK 64-Bit Server VM (25.302-b08 mixed mode linux-amd64 compressed oops) # Problematic frame: # C [libc.so.6+0x18b8f5] # # Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again # # An error report file with more information is saved as: # /home/ubuntu/work/spark-rapids/hs_err_pid6511.log #Stage 5:> (0 + 1) / 1] # If you would like to submit a bug report, please visit: # http://www.azul.com/support/ # /databricks/spark/bin/spark-shell: line 47: 6511 Aborted (core dumped) "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
Does this mean you can also repro it outside databricks ? @mythrocks
I tried that. But the UPDATE command is not supported in Apache Spark. Not to mention delta format is also not supported.
I tried locally (non-db env) and it passed with Spark3.3.0 + delta-2.1.0 + plugin-22.10. But not sure if it actually ran into the path that causes the crash.
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.0
/_/
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_342)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala>
scala> spark.conf.set("spark.rapids.sql.enabled", false)
scala>
scala> val schema = StructType(Array(
| StructField("country", StringType),
| StructField("year", StringType),
| )
| )
schema: org.apache.spark.sql.types.StructType = StructType(StructField(country,StringType,true),StructField(year,StringType,true))
scala>
scala> val corpus_rows = Array(
| Row("china", "1920"),
| Row("japan", "1920"),
| Row("china", "2021"),
| )
corpus_rows: Array[org.apache.spark.sql.Row] = Array([china,1920], [japan,1920], [china,2021])
scala>
scala> val rdd = sc.parallelize(corpus_rows)
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[0] at parallelize at <console>:30
scala>
scala> spark.createDataFrame(rdd, schema).repartition(1).write.mode(SaveMode.Overwrite).format("delta").save("/tmp/myth/delta/corpus")
scala>
scala> val read_df = spark.read.format("delta").load("/tmp/myth/delta/corpus")
read_df: org.apache.spark.sql.DataFrame = [country: string, year: string]
scala> read_df.createOrReplaceTempView("mycorpus")
scala>
scala> // Crashy Update
scala> spark.conf.set("spark.rapids.sql.enabled", true)
scala> sql(" UPDATE mycorpus SET year=2022 WHERE year=2023 ").show
22/09/09 05:15:26 WARN GpuOverrides: Can't replace any part of this plan due to: Delta Lake metadata queries are not efficient on GPU
22/09/09 05:15:26 WARN GpuOverrides: Can't replace any part of this plan due to: Delta Lake metadata queries are not efficient on GPU
22/09/09 05:15:27 WARN GpuOverrides:
*Exec <HashAggregateExec> will run on GPU
*Exec <ShuffleExchangeExec> will run on GPU
*Partitioning <HashPartitioning> will run on GPU
*Exec <HashAggregateExec> will run on GPU
!Exec <FilterExec> cannot run on GPU because not all expressions can be replaced
!Expression <ScalaUDF> UDF() cannot run on GPU because neither UDF implemented by class org.apache.spark.sql.delta.commands.UpdateCommand$$Lambda$6596/554794768 provides a GPU implementation, nor the conf `spark.rapids.sql.rowBasedUDF.enabled` is enabled
*Exec <ProjectExec> will run on GPU
*Expression <Alias> input_file_name() AS input_file_name()#766 will run on GPU
*Expression <InputFileName> input_file_name() will run on GPU
*Exec <FilterExec> will run on GPU
*Expression <And> (isnotnull(year#645) AND (cast(year#645 as int) = 2023)) will run on GPU
*Expression <IsNotNull> isnotnull(year#645) will run on GPU
*Expression <EqualTo> (cast(year#645 as int) = 2023) will run on GPU
*Expression <Cast> cast(year#645 as int) will run on GPU
*Exec <FileSourceScanExec> will run on GPU
22/09/09 05:15:27 WARN GpuOverrides:
! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
@Expression <AttributeReference> path#778 could run on GPU
@Expression <AttributeReference> partitionValues#779 could run on GPU
@Expression <AttributeReference> size#780L could run on GPU
@Expression <AttributeReference> modificationTime#781L could run on GPU
@Expression <AttributeReference> dataChange#782 could run on GPU
@Expression <AttributeReference> stats#783 could run on GPU
@Expression <AttributeReference> tags#784 could run on GPU
22/09/09 05:15:27 WARN GpuOverrides:
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
@Partitioning <SinglePartition$> could run on GPU
*Exec <ProjectExec> will run on GPU
*Expression <Alias> cast(num_affected_rows#649L as string) AS num_affected_rows#816 will run on GPU
*Expression <Cast> cast(num_affected_rows#649L as string) will run on GPU
! <CommandResultExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.CommandResultExec
@Expression <AttributeReference> num_affected_rows#649L could run on GPU
+-----------------+
|num_affected_rows|
+-----------------+
| 0|
+-----------------+
Confirm the crash still persists. So the following command causes crash:
sql(" UPDATE mycorpus SET year=2022 WHERE year=2023 ").show
And as @mythrocks suggested, this command will not crash:
sql(" UPDATE mycorpus SET year=2022 WHERE year='2023' ").show
Thanks ttnghia, I will try it on DB.
Found the root cause. See https://github.com/rapidsai/cudf/issues/11700. I am trying to make a fix ...
Here is the fix https://github.com/rapidsai/cudf/pull/11702
@firestarman do you have the stacktrace where we're trying to partition an empty table? There's checks in the plugin where we have avoided trying to shuffle empty tables, curious what the call stack is and how we allowed an empty table to slip through. The fix in cudf is still desired, just wondering how we got here.
The stacktrace that Mithun posted above should be somewhat relevant to the partitioning issue.
I'm also curious why this is caused by a partitioning issue instead of casting, because changing the query from year=2023 into year='2023' doesn't cause any crash.
I tried locally (non-db env) and it passed with Spark3.3.0 + delta-2.1.0 + plugin-22.10.
Sorry I'm late to this party, but I can confirm that this bug is specific to Databricks (though I'm not sure why). IIRC, I don't remember seeing the problem on Apache Spark + Delta.
The stacktrace that Mithun posted above should be somewhat relevant to the partitioning issue.
Ah, thanks! I didn't look far enough back in the issue, missed that.
I'm also curious why this is caused by a partitioning issue instead of casting, because changing the query from year=2023 into year='2023' doesn't cause any crash.
I'm guessing because the subquery somehow ends up selecting nothing without it being a string to search for, and that causes an empty table to try to be partitioned. Why it ends up selecting nothing is unclear though, as I would expect the query to either fail to compile due to a type mismatch or automatically apply a cast which should have worked.
Why it ends up selecting nothing is unclear though... ends up selecting nothing without it being a string to search for...
I'll have to work out the vagaries of how UPDATE works, but note that year=2023 should select no rows in the corpus, string or otherwise:
val corpus_rows = Array(
Row("china", "1920"),
Row("japan", "1920"),
Row("china", "2021"),
)
I tried locally (non-db env) and it passed with Spark3.3.0 + delta-2.1.0 + plugin-22.10.
Sorry I'm late to this party, but I can confirm that this bug is specific to Databricks (though I'm not sure why). IIRC, I don't remember seeing the problem on Apache Spark + Delta.
I got a case to reproduce it locally. Command
spark-shell \
--conf spark.executor.cores=6 \
--driver-memory 40G \
--jars ${RAPIDS_JAR} \
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC \
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC \
--conf spark.sql.adaptive.enabled=false \
--conf spark.rapids.sql.explain=ALL \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--packages io.delta:delta-core_2.12:2.0.0 \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
Code
// Create the delta table in fs.
spark.read.format("delta").load("/tmp/myth/delta/corpus").createOrReplaceTempView("mycorpus")
spark.conf.set("spark.rapids.sql.rowBasedUDF.enabled", true)
sql(" select * from mycorpus ").show
+-------+----+
|country|year|
+-------+----+
| china|1920|
| japan|1920|
| china|2023|
+-------+----+
sql(" UPDATE mycorpus SET year=2021 WHERE year=2022 ") // will crash
Enable the rowBasedUDF to let the whole fiter exec run on GPU as below.
scala> sql(" UPDATE mycorpus SET year=2022 WHERE year=2021 ")
22/09/15 00:35:58 WARN GpuOverrides: Can't replace any part of this plan due to: Delta Lake metadata queries are not efficient on GPU
22/09/15 00:35:58 WARN GpuOverrides: Can't replace any part of this plan due to: Delta Lake metadata queries are not efficient on GPU
22/09/15 00:35:59 WARN GpuOverrides:
*Exec <HashAggregateExec> will run on GPU
*Exec <ShuffleExchangeExec> will run on GPU
*Partitioning <HashPartitioning> will run on GPU
*Exec <HashAggregateExec> will run on GPU
*Exec <ProjectExec> will run on GPU
*Expression <Alias> input_file_name() AS input_file_name()#746 will run on GPU
*Expression <InputFileName> input_file_name() will run on GPU
!Exec <FilterExec> cannot run on GPU because not all expressions can be replaced
!Expression <ScalaUDF> UDF() cannot run on GPU because neither UDF implemented by class org.apache.spark.sql.delta.commands.UpdateCommand$$Lambda$6614/1948441518 provides a GPU implementation, nor the conf `spark.rapids.sql.rowBasedUDF.enabled` is enabled
*Exec <FilterExec> will run on GPU
*Expression <And> (isnotnull(year#514) AND (cast(year#514 as int) = 2021)) will run on GPU
*Expression <IsNotNull> isnotnull(year#514) will run on GPU
*Expression <EqualTo> (cast(year#514 as int) = 2021) will run on GPU
*Expression <Cast> cast(year#514 as int) will run on GPU
*Exec <FileSourceScanExec> will run on GPU
The stacktrace that Mithun posted above should be somewhat relevant to the partitioning issue.
Ah, thanks! I didn't look far enough back in the issue, missed that.
I'm also curious why this is caused by a partitioning issue instead of casting, because changing the query from year=2023 into year='2023' doesn't cause any crash.
I'm guessing because the subquery somehow ends up selecting nothing without it being a string to search for, and that causes an empty table to try to be partitioned. Why it ends up selecting nothing is unclear though, as I would expect the query to either fail to compile due to a type mismatch or automatically apply a cast which should have worked.
Here is the plan tree that ran into this crash.
scala> sql(" UPDATE mycorpus SET year=2022 WHERE year=2021 ")
22/09/15 00:37:56 WARN GpuOverrides: Can't replace any part of this plan due to: Delta Lake metadata queries are not efficient on GPU
22/09/15 00:37:56 WARN GpuOverrides:
*Exec <HashAggregateExec> will run on GPU
*Exec <ShuffleExchangeExec> will run on GPU
*Partitioning <HashPartitioning> will run on GPU
*Exec <HashAggregateExec> will run on GPU
*Exec <ProjectExec> will run on GPU
*Expression <Alias> input_file_name() AS input_file_name()#848 will run on GPU
*Expression <InputFileName> input_file_name() will run on GPU
*Exec <FilterExec> will run on GPU
*Expression <ScalaUDF> UDF() will run on GPU
*Exec <FilterExec> will run on GPU
*Expression <And> (isnotnull(year#514) AND (cast(year#514 as int) = 2021)) will run on GPU
*Expression <IsNotNull> isnotnull(year#514) will run on GPU
*Expression <EqualTo> (cast(year#514 as int) = 2021) will run on GPU
*Expression <Cast> cast(year#514 as int) will run on GPU
*Exec <FileSourceScanExec> will run on GPU
This would be for the subquery in the update command
If changing year=2023 to year='2023', the query will not run into the crash path. (I added some logs and they were not printed out.) There migth be some optimization if the given filtering literal has the same type with the column.
@firestarman do you have the stacktrace where we're trying to partition an empty table? There's checks in the plugin where we have avoided trying to shuffle empty tables, curious what the call stack is and how we allowed an empty table to slip through. The fix in cudf is still desired, just wondering how we got here.
@jlowe I believe there is a defect in the empty batches check, and made a PR https://github.com/NVIDIA/spark-rapids/pull/6564 to fix it.