spark-rapids icon indicating copy to clipboard operation
spark-rapids copied to clipboard

[BUG] `UPDATE` on a Databricks (10.4) DELTA table leads to JVM crash

Open mythrocks opened this issue 3 years ago • 2 comments

The Spark Executor JVM crashes when UPDATE command is run on a Delta table on Databricks 10.4. This does not appear to break on Apache Spark (3.2.1, at least).

Repro code

import org.apache.spark.sql.types._
import org.apache.spark.sql._

spark.conf.set("spark.rapids.sql.enabled", false)

val schema = StructType(Array(
      StructField("country", StringType),
      StructField("year", StringType),
      )
    )

val corpus_rows = Array(
    Row("china", "1920"),
    Row("japan", "1920"),
    Row("china", "2021"),
)

val rdd = sc.parallelize(corpus_rows)

spark.createDataFrame(rdd, schema).repartition(1).write.mode(SaveMode.Overwrite).format("delta").save("/tmp/myth/delta/corpus")

val read_df = spark.read.format("delta").load("/tmp/myth/delta/corpus")
read_df.createOrReplaceTempView("mycorpus")

// Crashy Update
spark.conf.set("spark.rapids.sql.enabled", true)
sql(" UPDATE mycorpus SET year=2022 WHERE year=2023 ").show // Crash!

Crash output:

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f1d3d3ae8f5, pid=24326, tid=0x00007f1bcd7ff700
#
# JRE version: OpenJDK Runtime Environment (Zulu 8.56.0.21-CA-linux64) (8.0_302-b08) (build 1.8.0_302-b08)
# Java VM: OpenJDK 64-Bit Server VM (25.302-b08 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# C  [libc.so.6+0x18b8f5]
#
# Core dump written. Default location: /home/ubuntu/spark-rapids/core or core.24326
#
# An error report file with more information is saved as:
# /home/ubuntu/spark-rapids/hs_err_pid24326.log
#Stage 8:>                                                          (0 + 1) / 1]
# If you would like to submit a bug report, please visit:
#   http://www.azul.com/support/
#
/databricks/spark/bin/spark-shell: line 47: 24326 Aborted                 (core dumped) "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"

mythrocks avatar Aug 09 '22 22:08 mythrocks

Hmm. Looks like this has to do with the implied cast in the WHERE clause:

UPDATE mycorpus 
SET year=2022 
WHERE year=2023 ---> Should've been a STRING.

A JVM crash is not ideal.

mythrocks avatar Aug 09 '22 22:08 mythrocks

Relevant stack frames for future investigation:

Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
j  ai.rapids.cudf.Table.partition(JJI[I)[J+0
j  ai.rapids.cudf.Table.partition(Lai/rapids/cudf/ColumnView;I)Lai/rapids/cudf/PartitionedTable;+23
j  com.nvidia.spark.rapids.GpuHashPartitioningBase.$anonfun$partitionInternalAndClose$6(Lcom/nvidia/spark/rapids/GpuHashPartitioningBase;Lai/rapids/cudf/ColumnVector;Lai/rapids/cudf/Table;)Lai/rapids/cudf/PartitionedTable;+6
j  com.nvidia.spark.rapids.GpuHashPartitioningBase$$Lambda$7341.apply(Ljava/lang/Object;)Ljava/lang/Object;+12
j  com.nvidia.spark.rapids.Arm.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+2
j  com.nvidia.spark.rapids.Arm.withResource$(Lcom/nvidia/spark/rapids/Arm;Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j  com.nvidia.spark.rapids.GpuHashPartitioningBase.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j  com.nvidia.spark.rapids.GpuHashPartitioningBase.$anonfun$partitionInternalAndClose$5(Lcom/nvidia/spark/rapids/GpuHashPartitioningBase;Lorg/apache/spark/sql/vectorized/ColumnarBatch;Lai/rapids/cudf/ColumnVector;)Lai/rapids/cudf/PartitionedTable;+12
j  com.nvidia.spark.rapids.GpuHashPartitioningBase$$Lambda$7340.apply(Ljava/lang/Object;)Ljava/lang/Object;+12
j  com.nvidia.spark.rapids.Arm.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+2
j  com.nvidia.spark.rapids.Arm.withResource$(Lcom/nvidia/spark/rapids/Arm;Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j  com.nvidia.spark.rapids.GpuHashPartitioningBase.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j  com.nvidia.spark.rapids.GpuHashPartitioningBase.$anonfun$partitionInternalAndClose$1(Lcom/nvidia/spark/rapids/GpuHashPartitioningBase;Lorg/apache/spark/sql/vectorized/ColumnarBatch;)Lai/rapids/cudf/PartitionedTable;+37
j  com.nvidia.spark.rapids.GpuHashPartitioningBase$$Lambda$7331.apply(Ljava/lang/Object;)Ljava/lang/Object;+8
j  com.nvidia.spark.rapids.Arm.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+2
j  com.nvidia.spark.rapids.Arm.withResource$(Lcom/nvidia/spark/rapids/Arm;Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j  com.nvidia.spark.rapids.GpuHashPartitioningBase.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j  com.nvidia.spark.rapids.GpuHashPartitioningBase.partitionInternalAndClose(Lorg/apache/spark/sql/vectorized/ColumnarBatch;)Lscala/Tuple2;+13
j  com.nvidia.spark.rapids.GpuHashPartitioningBase.$anonfun$columnarEval$2(Lcom/nvidia/spark/rapids/GpuHashPartitioningBase;Lorg/apache/spark/sql/vectorized/ColumnarBatch;Lai/rapids/cudf/NvtxRange;)Lscala/Tuple2;+2
j  com.nvidia.spark.rapids.GpuHashPartitioningBase$$Lambda$7330.apply(Ljava/lang/Object;)Ljava/lang/Object;+12
j  com.nvidia.spark.rapids.Arm.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+2
j  com.nvidia.spark.rapids.Arm.withResource$(Lcom/nvidia/spark/rapids/Arm;Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j  com.nvidia.spark.rapids.GpuHashPartitioningBase.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j  com.nvidia.spark.rapids.GpuHashPartitioningBase.$anonfun$columnarEval$1(Lcom/nvidia/spark/rapids/GpuHashPartitioningBase;Lorg/apache/spark/sql/vectorized/ColumnarBatch;Lai/rapids/cudf/NvtxRange;)[Lscala/Tuple2;+27
j  com.nvidia.spark.rapids.GpuHashPartitioningBase$$Lambda$7329.apply(Ljava/lang/Object;)Ljava/lang/Object;+12
j  com.nvidia.spark.rapids.Arm.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+2
j  com.nvidia.spark.rapids.Arm.withResource$(Lcom/nvidia/spark/rapids/Arm;Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j  com.nvidia.spark.rapids.GpuHashPartitioningBase.withResource(Ljava/lang/AutoCloseable;Lscala/Function1;)Ljava/lang/Object;+3
j  com.nvidia.spark.rapids.GpuHashPartitioningBase.columnarEval(Lorg/apache/spark/sql/vectorized/ColumnarBatch;)Ljava/lang/Object;+21
j  org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$.$anonfun$prepareBatchShuffleDependency$3(Lcom/nvidia/spark/rapids/GpuExpression;Lorg/apache/spark/sql/vectorized/ColumnarBatch;)Ljava/lang/Object;+2
j  org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$$Lambda$7150.apply(Ljava/lang/Object;)Ljava/lang/Object;+8
j  org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.partNextBatch()V+154
j  org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.hasNext()Z+21
j  org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(Lscala/collection/Iterator;)V+44
j  org.apache.spark.shuffle.ShuffleWriteProcessor.write(Lorg/apache/spark/rdd/RDD;Lorg/apache/spark/ShuffleDependency;JLorg/apache/spark/TaskContext;Lorg/apache/spark/Partition;)Lorg/apache/spark/scheduler/MapStatus;+46
j  org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(Lorg/apache/spark/scheduler/ShuffleMapTask;Lorg/apache/spark/ShuffleDependency;Lorg/apache/spark/rdd/RDD;JLorg/apache/spark/TaskContext;)Lorg/apache/spark/scheduler/MapStatus;+13
j  org.apache.spark.scheduler.ShuffleMapTask$$Lambda$5245.apply()Ljava/lang/Object;+20
j  com.databricks.spark.util.ExecutorFrameProfiler$.record(Ljava/lang/String;Ljava/lang/String;Lscala/Function0;)Ljava/lang/Object;+82
j  org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(Lorg/apache/spark/scheduler/ShuffleMapTask;Lorg/apache/spark/TaskContext;)Lorg/apache/spark/scheduler/MapStatus;+194
j  org.apache.spark.scheduler.ShuffleMapTask$$Lambda$5232.apply()Ljava/lang/Object;+8
j  com.databricks.spark.util.ExecutorFrameProfiler$.record(Ljava/lang/String;Ljava/lang/String;Lscala/Function0;)Ljava/lang/Object;+82
j  org.apache.spark.scheduler.ShuffleMapTask.runTask(Lorg/apache/spark/TaskContext;)Lorg/apache/spark/scheduler/MapStatus;+14
j  org.apache.spark.scheduler.ShuffleMapTask.runTask(Lorg/apache/spark/TaskContext;)Ljava/lang/Object;+2
j  org.apache.spark.scheduler.Task.doRunTask(JILscala/Option;)Ljava/lang/Object;+151
j  org.apache.spark.scheduler.Task.$anonfun$run$1(Lorg/apache/spark/scheduler/Task;JILorg/apache/spark/metrics/MetricsSystem;Lscala/collection/immutable/Map;Lscala/Enumeration$Value;Lscala/Option;)Ljava/lang/Object;+116
j  org.apache.spark.scheduler.Task$$Lambda$5187.apply()Ljava/lang/Object;+28
j  org.apache.spark.scheduler.Task.run(JILorg/apache/spark/metrics/MetricsSystem;Lscala/collection/immutable/Map;Lscala/Option;Lscala/Enumeration$Value;)Ljava/lang/Object;+23
j  org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Lorg/apache/spark/executor/Executor$TaskRunner;Lscala/runtime/BooleanRef;)Ljava/lang/Object;+43
j  org.apache.spark.executor.Executor$TaskRunner$$Lambda$5183.apply()Ljava/lang/Object;+8
j  org.apache.spark.util.Utils$.tryWithSafeFinally(Lscala/Function0;Lscala/Function0;)Ljava/lang/Object;+4
j  org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Lorg/apache/spark/executor/Executor$TaskRunner;)V+965
j  org.apache.spark.executor.Executor$TaskRunner$$Lambda$5162.apply$mcV$sp()V+4
J 11063 C1 scala.runtime.java8.JFunction0$mcV$sp.apply()Ljava/lang/Object; (10 bytes) @ 0x00007f1d255a9d6c [0x00007f1d255a9d00+0x6c]
j  com.databricks.spark.util.ExecutorFrameProfiler$.record(Ljava/lang/String;Ljava/lang/String;Lscala/Function0;)Ljava/lang/Object;+82
j  org.apache.spark.executor.Executor$TaskRunner.run()V+38
j  java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V+95
j  java.util.concurrent.ThreadPoolExecutor$Worker.run()V+5
j  java.lang.Thread.run()V+11

mythrocks avatar Aug 09 '22 22:08 mythrocks

@mythrocks I tried to run the snippet above on Databrick cluster "10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)" but don't see any crash. Is there any special config needed?

Can you verify that you still reproduce the crash, please?

ttnghia avatar Sep 07 '22 22:09 ttnghia

Curious, why the stack showed it called GPU cudf code, but the repro code contains spark.conf.set("spark.rapids.sql.enabled", false) to disable the GPU? Is it correct? @mythrocks

GaryShen2008 avatar Sep 09 '22 01:09 GaryShen2008

Curious, why the stack showed it called GPU cudf code, but the repro code contains spark.conf.set("spark.rapids.sql.enabled", false) to disable the GPU? Is it correct? @mythrocks

It is enabled just before running the update operation.

// Crashy Update
spark.conf.set("spark.rapids.sql.enabled", true)
sql(" UPDATE mycorpus SET year=2022 WHERE year=2023 ").show

firestarman avatar Sep 09 '22 02:09 firestarman

~Ah, right. I should have enabled the plugin. Thanks for catching it. I'll correct the description.~ Edit: Nope, the script is correct, as it is.

I'm trying to repro it at my end as well.

Edit: Yep, it's still crashing:

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f193ee788f5, pid=6511, tid=0x00007f17b9bff700
#
# JRE version: OpenJDK Runtime Environment (Zulu 8.56.0.21-CA-linux64) (8.0_302-b08) (build 1.8.0_302-b08)
# Java VM: OpenJDK 64-Bit Server VM (25.302-b08 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# C  [libc.so.6+0x18b8f5]
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /home/ubuntu/work/spark-rapids/hs_err_pid6511.log
#Stage 5:>                                                          (0 + 1) / 1]
# If you would like to submit a bug report, please visit:
#   http://www.azul.com/support/
#
/databricks/spark/bin/spark-shell: line 47:  6511 Aborted                 (core dumped) "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"

mythrocks avatar Sep 09 '22 03:09 mythrocks

Here's what I'm getting from compute-sanitizer:

========= Program hit named symbol not found (error 500) on CUDA API call to cuGetProcAddress.
=========     Saved host backtrace up to driver entry point at error
=========     Host Frame: [0x218540]
=========                in /lib/x86_64-linux-gnu/libcuda.so.1
=========     Host Frame: [0x124d44]
=========                in /tmp/nvcomp1455830403426930150.so
=========     Host Frame: [0x128e67]
=========                in /tmp/nvcomp1455830403426930150.so
=========     Host Frame: [0x128f68]
=========                in /tmp/nvcomp1455830403426930150.so
=========     Host Frame: [0x114df]
=========                in /lib/x86_64-linux-gnu/libpthread.so.0
=========     Host Frame: [0x176ca9]
=========                in /tmp/nvcomp1455830403426930150.so
=========     Host Frame: [0x11fb97]
=========                in /tmp/nvcomp1455830403426930150.so
=========     Host Frame: [0x16ab5f]
=========                in /tmp/nvcomp1455830403426930150.so
=========     Host Frame:__device_stub__ZN6nvcomp13unsnap_kernelEPKPKvPKmPKPvS5_P14nvcompStatus_tPm(void const * const *, unsigned long const *, void* const *, unsigned long const *, nvcompStatus_t*, unsigned long*) [0xf57cc]
=========                in /tmp/nvcomp1455830403426930150.so
=========     Host Frame:nvcomp::gpu_unsnap(void const * const *, unsigned long const *, void* const *, unsigned long const *, nvcompStatus_t*, unsigned long*, int, CUstream_st*) [0xf58c7]
=========                in /tmp/nvcomp1455830403426930150.so
=========     Host Frame:nvcompBatchedSnappyDecompressAsync [0x1090f4]
=========                in /tmp/nvcomp1455830403426930150.so
=========     Host Frame:cudf::io::nvcomp::batched_decompress(cudf::io::nvcomp::compression_type, cudf::device_span<cudf::device_span<unsigned char const , unsigned long=18446744073709551615> const , unsigned long=18446744073709551615>, cudf::device_span<cudf::device_span<unsigned char, unsigned long=18446744073709551615> const , unsigned long=18446744073709551615>, cudf::device_span<cudf::io::decompress_status, unsigned long=18446744073709551615>, unsigned long, unsigned long, rmm::cuda_stream_view) [0x1ccec90]
=========                in /tmp/cudf6781276602044116984.so
=========     Host Frame:cudf::io::detail::parquet::reader::impl::decompress_page_data(hostdevice_vector<cudf::io::parquet::gpu::ColumnChunkDesc>&, hostdevice_vector<cudf::io::parquet::gpu::PageInfo>&) [0x1d884bf]
=========                in /tmp/cudf6781276602044116984.so
=========     Host Frame:cudf::io::detail::parquet::reader::impl::read(std::vector<std::vector<int, std::allocator<int>>, std::allocator<std::vector<int, std::allocator<int>>>> const &) [0x1d8ab76]
=========                in /tmp/cudf6781276602044116984.so
=========     Host Frame:cudf::io::detail::parquet::reader::read(cudf::io::parquet_reader_options const &) [0x1d8aed4]
=========                in /tmp/cudf6781276602044116984.so
=========     Host Frame:cudf::io::read_parquet(cudf::io::parquet_reader_options const &, rmm::mr::device_memory_resource*) [0x1cf6260]
=========                in /tmp/cudf6781276602044116984.so
=========     Host Frame:Java_ai_rapids_cudf_Table_readParquet [0x14b4c35]
=========                in /tmp/cudf6781276602044116984.so
=========     Host Frame: [0x7fe98d018527]
=========                in

I wonder if this points towards predicate pushdown in the Parquet reader.

mythrocks avatar Sep 09 '22 04:09 mythrocks

Should that be crash in nvcomp code?

If so, the crash should happen right at the reading stage. So this should also crash:

val read_df = ....
read_df.show()

ttnghia avatar Sep 09 '22 04:09 ttnghia

~Ah, right. I should have enabled the plugin. Thanks for catching it. I'll correct the description.~ Edit: Nope, the script is correct, as it is.

I'm trying to repro it at my end as well.

Edit: Yep, it's still crashing:

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f193ee788f5, pid=6511, tid=0x00007f17b9bff700
#
# JRE version: OpenJDK Runtime Environment (Zulu 8.56.0.21-CA-linux64) (8.0_302-b08) (build 1.8.0_302-b08)
# Java VM: OpenJDK 64-Bit Server VM (25.302-b08 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# C  [libc.so.6+0x18b8f5]
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /home/ubuntu/work/spark-rapids/hs_err_pid6511.log
#Stage 5:>                                                          (0 + 1) / 1]
# If you would like to submit a bug report, please visit:
#   http://www.azul.com/support/
#
/databricks/spark/bin/spark-shell: line 47:  6511 Aborted                 (core dumped) "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"

Does this mean you can also repro it outside databricks ? @mythrocks

firestarman avatar Sep 09 '22 05:09 firestarman

I tried that. But the UPDATE command is not supported in Apache Spark. Not to mention delta format is also not supported.

ttnghia avatar Sep 09 '22 05:09 ttnghia

I tried locally (non-db env) and it passed with Spark3.3.0 + delta-2.1.0 + plugin-22.10. But not sure if it actually ran into the path that causes the crash.

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.0
      /_/
         
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_342)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> 

scala> spark.conf.set("spark.rapids.sql.enabled", false)

scala> 

scala> val schema = StructType(Array(
     |       StructField("country", StringType),
     |       StructField("year", StringType),
     |       )
     |     )
schema: org.apache.spark.sql.types.StructType = StructType(StructField(country,StringType,true),StructField(year,StringType,true))

scala> 

scala> val corpus_rows = Array(
     |     Row("china", "1920"),
     |     Row("japan", "1920"),
     |     Row("china", "2021"),
     | )
corpus_rows: Array[org.apache.spark.sql.Row] = Array([china,1920], [japan,1920], [china,2021])

scala> 

scala> val rdd = sc.parallelize(corpus_rows)
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[0] at parallelize at <console>:30

scala> 

scala> spark.createDataFrame(rdd, schema).repartition(1).write.mode(SaveMode.Overwrite).format("delta").save("/tmp/myth/delta/corpus")
                                                                                
scala> 

scala> val read_df = spark.read.format("delta").load("/tmp/myth/delta/corpus")
read_df: org.apache.spark.sql.DataFrame = [country: string, year: string]

scala> read_df.createOrReplaceTempView("mycorpus")

scala> 

scala> // Crashy Update

scala> spark.conf.set("spark.rapids.sql.enabled", true)

scala> sql(" UPDATE mycorpus SET year=2022 WHERE year=2023 ").show
22/09/09 05:15:26 WARN GpuOverrides: Can't replace any part of this plan due to: Delta Lake metadata queries are not efficient on GPU
22/09/09 05:15:26 WARN GpuOverrides: Can't replace any part of this plan due to: Delta Lake metadata queries are not efficient on GPU
22/09/09 05:15:27 WARN GpuOverrides: 
*Exec <HashAggregateExec> will run on GPU
  *Exec <ShuffleExchangeExec> will run on GPU
    *Partitioning <HashPartitioning> will run on GPU
    *Exec <HashAggregateExec> will run on GPU
      !Exec <FilterExec> cannot run on GPU because not all expressions can be replaced
        !Expression <ScalaUDF> UDF() cannot run on GPU because neither UDF implemented by class org.apache.spark.sql.delta.commands.UpdateCommand$$Lambda$6596/554794768 provides a GPU implementation, nor the conf `spark.rapids.sql.rowBasedUDF.enabled` is enabled
        *Exec <ProjectExec> will run on GPU
          *Expression <Alias> input_file_name() AS input_file_name()#766 will run on GPU
            *Expression <InputFileName> input_file_name() will run on GPU
          *Exec <FilterExec> will run on GPU
            *Expression <And> (isnotnull(year#645) AND (cast(year#645 as int) = 2023)) will run on GPU
              *Expression <IsNotNull> isnotnull(year#645) will run on GPU
              *Expression <EqualTo> (cast(year#645 as int) = 2023) will run on GPU
                *Expression <Cast> cast(year#645 as int) will run on GPU
            *Exec <FileSourceScanExec> will run on GPU

22/09/09 05:15:27 WARN GpuOverrides: 
! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
  @Expression <AttributeReference> path#778 could run on GPU
  @Expression <AttributeReference> partitionValues#779 could run on GPU
  @Expression <AttributeReference> size#780L could run on GPU
  @Expression <AttributeReference> modificationTime#781L could run on GPU
  @Expression <AttributeReference> dataChange#782 could run on GPU
  @Expression <AttributeReference> stats#783 could run on GPU
  @Expression <AttributeReference> tags#784 could run on GPU

22/09/09 05:15:27 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> cast(num_affected_rows#649L as string) AS num_affected_rows#816 will run on GPU
      *Expression <Cast> cast(num_affected_rows#649L as string) will run on GPU
    ! <CommandResultExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.CommandResultExec
      @Expression <AttributeReference> num_affected_rows#649L could run on GPU

+-----------------+
|num_affected_rows|
+-----------------+
|                0|
+-----------------+


firestarman avatar Sep 09 '22 05:09 firestarman

Confirm the crash still persists. So the following command causes crash:

sql(" UPDATE mycorpus SET year=2022 WHERE year=2023 ").show

And as @mythrocks suggested, this command will not crash:

sql(" UPDATE mycorpus SET year=2022 WHERE year='2023' ").show

ttnghia avatar Sep 09 '22 23:09 ttnghia

Thanks ttnghia, I will try it on DB.

firestarman avatar Sep 13 '22 08:09 firestarman

Found the root cause. See https://github.com/rapidsai/cudf/issues/11700. I am trying to make a fix ...

firestarman avatar Sep 14 '22 07:09 firestarman

Here is the fix https://github.com/rapidsai/cudf/pull/11702

firestarman avatar Sep 14 '22 13:09 firestarman

@firestarman do you have the stacktrace where we're trying to partition an empty table? There's checks in the plugin where we have avoided trying to shuffle empty tables, curious what the call stack is and how we allowed an empty table to slip through. The fix in cudf is still desired, just wondering how we got here.

jlowe avatar Sep 14 '22 15:09 jlowe

The stacktrace that Mithun posted above should be somewhat relevant to the partitioning issue.

I'm also curious why this is caused by a partitioning issue instead of casting, because changing the query from year=2023 into year='2023' doesn't cause any crash.

ttnghia avatar Sep 14 '22 17:09 ttnghia

I tried locally (non-db env) and it passed with Spark3.3.0 + delta-2.1.0 + plugin-22.10.

Sorry I'm late to this party, but I can confirm that this bug is specific to Databricks (though I'm not sure why). IIRC, I don't remember seeing the problem on Apache Spark + Delta.

mythrocks avatar Sep 14 '22 17:09 mythrocks

The stacktrace that Mithun posted above should be somewhat relevant to the partitioning issue.

Ah, thanks! I didn't look far enough back in the issue, missed that.

I'm also curious why this is caused by a partitioning issue instead of casting, because changing the query from year=2023 into year='2023' doesn't cause any crash.

I'm guessing because the subquery somehow ends up selecting nothing without it being a string to search for, and that causes an empty table to try to be partitioned. Why it ends up selecting nothing is unclear though, as I would expect the query to either fail to compile due to a type mismatch or automatically apply a cast which should have worked.

jlowe avatar Sep 14 '22 17:09 jlowe

Why it ends up selecting nothing is unclear though... ends up selecting nothing without it being a string to search for...

I'll have to work out the vagaries of how UPDATE works, but note that year=2023 should select no rows in the corpus, string or otherwise:

val corpus_rows = Array(
    Row("china", "1920"),
    Row("japan", "1920"),
    Row("china", "2021"),
)

mythrocks avatar Sep 14 '22 18:09 mythrocks

I tried locally (non-db env) and it passed with Spark3.3.0 + delta-2.1.0 + plugin-22.10.

Sorry I'm late to this party, but I can confirm that this bug is specific to Databricks (though I'm not sure why). IIRC, I don't remember seeing the problem on Apache Spark + Delta.

I got a case to reproduce it locally. Command

spark-shell \
    --conf spark.executor.cores=6 \
    --driver-memory 40G \
    --jars ${RAPIDS_JAR} \
    --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC \
    --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC \
    --conf spark.sql.adaptive.enabled=false \
    --conf spark.rapids.sql.explain=ALL \
    --conf spark.plugins=com.nvidia.spark.SQLPlugin \
    --packages io.delta:delta-core_2.12:2.0.0 \
    --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
    --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

Code

// Create the delta table in fs.

spark.read.format("delta").load("/tmp/myth/delta/corpus").createOrReplaceTempView("mycorpus")
spark.conf.set("spark.rapids.sql.rowBasedUDF.enabled", true)
sql(" select * from mycorpus ").show

+-------+----+
|country|year|
+-------+----+
|  china|1920|
|  japan|1920|
|  china|2023|
+-------+----+

sql(" UPDATE mycorpus SET year=2021 WHERE year=2022 ") // will crash

Enable the rowBasedUDF to let the whole fiter exec run on GPU as below.

scala> sql(" UPDATE mycorpus SET year=2022 WHERE year=2021 ")
22/09/15 00:35:58 WARN GpuOverrides: Can't replace any part of this plan due to: Delta Lake metadata queries are not efficient on GPU
22/09/15 00:35:58 WARN GpuOverrides: Can't replace any part of this plan due to: Delta Lake metadata queries are not efficient on GPU
22/09/15 00:35:59 WARN GpuOverrides:
*Exec <HashAggregateExec> will run on GPU
  *Exec <ShuffleExchangeExec> will run on GPU
    *Partitioning <HashPartitioning> will run on GPU
    *Exec <HashAggregateExec> will run on GPU
      *Exec <ProjectExec> will run on GPU
        *Expression <Alias> input_file_name() AS input_file_name()#746 will run on GPU
          *Expression <InputFileName> input_file_name() will run on GPU
        !Exec <FilterExec> cannot run on GPU because not all expressions can be replaced
          !Expression <ScalaUDF> UDF() cannot run on GPU because neither UDF implemented by class org.apache.spark.sql.delta.commands.UpdateCommand$$Lambda$6614/1948441518 provides a GPU implementation, nor the conf `spark.rapids.sql.rowBasedUDF.enabled` is enabled
          *Exec <FilterExec> will run on GPU
            *Expression <And> (isnotnull(year#514) AND (cast(year#514 as int) = 2021)) will run on GPU
              *Expression <IsNotNull> isnotnull(year#514) will run on GPU
              *Expression <EqualTo> (cast(year#514 as int) = 2021) will run on GPU
                *Expression <Cast> cast(year#514 as int) will run on GPU
            *Exec <FileSourceScanExec> will run on GPU

firestarman avatar Sep 15 '22 00:09 firestarman

The stacktrace that Mithun posted above should be somewhat relevant to the partitioning issue.

Ah, thanks! I didn't look far enough back in the issue, missed that.

I'm also curious why this is caused by a partitioning issue instead of casting, because changing the query from year=2023 into year='2023' doesn't cause any crash.

I'm guessing because the subquery somehow ends up selecting nothing without it being a string to search for, and that causes an empty table to try to be partitioned. Why it ends up selecting nothing is unclear though, as I would expect the query to either fail to compile due to a type mismatch or automatically apply a cast which should have worked.

Here is the plan tree that ran into this crash.

scala> sql(" UPDATE mycorpus SET year=2022 WHERE year=2021 ")
22/09/15 00:37:56 WARN GpuOverrides: Can't replace any part of this plan due to: Delta Lake metadata queries are not efficient on GPU
22/09/15 00:37:56 WARN GpuOverrides:
*Exec <HashAggregateExec> will run on GPU
  *Exec <ShuffleExchangeExec> will run on GPU
    *Partitioning <HashPartitioning> will run on GPU
    *Exec <HashAggregateExec> will run on GPU
      *Exec <ProjectExec> will run on GPU
        *Expression <Alias> input_file_name() AS input_file_name()#848 will run on GPU
          *Expression <InputFileName> input_file_name() will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <ScalaUDF> UDF() will run on GPU
          *Exec <FilterExec> will run on GPU
            *Expression <And> (isnotnull(year#514) AND (cast(year#514 as int) = 2021)) will run on GPU
              *Expression <IsNotNull> isnotnull(year#514) will run on GPU
              *Expression <EqualTo> (cast(year#514 as int) = 2021) will run on GPU
                *Expression <Cast> cast(year#514 as int) will run on GPU
            *Exec <FileSourceScanExec> will run on GPU

This would be for the subquery in the update command

If changing year=2023 to year='2023', the query will not run into the crash path. (I added some logs and they were not printed out.) There migth be some optimization if the given filtering literal has the same type with the column.

firestarman avatar Sep 15 '22 01:09 firestarman

@firestarman do you have the stacktrace where we're trying to partition an empty table? There's checks in the plugin where we have avoided trying to shuffle empty tables, curious what the call stack is and how we allowed an empty table to slip through. The fix in cudf is still desired, just wondering how we got here.

@jlowe I believe there is a defect in the empty batches check, and made a PR https://github.com/NVIDIA/spark-rapids/pull/6564 to fix it.

firestarman avatar Sep 15 '22 03:09 firestarman