spark-rapids icon indicating copy to clipboard operation
spark-rapids copied to clipboard

[BUG] `spark.sql.parquet.outputTimestampType` is not considered during read/write parquet for nested types containing timestamp

Open ttnghia opened this issue 3 years ago • 9 comments

It looks like the option spark.sql.parquet.outputTimestampType is not considered anywhere in the plugin code. As such, when the integration test (test_cache_columnar) is passed with a timestamp of MILLISECONDS type then the test failed:

[2022-08-11T20:50:03.492Z] cpu = datetime.datetime(3124, 11, 11, 7, 44, 42, 992000)
[2022-08-11T20:50:03.492Z] gpu = datetime.datetime(3124, 11, 11, 7, 44, 42, 992028)

From the test output, the default read/write is probably always TIMESTAMP_MICROS, instead of the input type specified by outputTimestampType.

Related: tests failed in https://github.com/NVIDIA/spark-rapids/pull/6286:

  • test_cache_columnar
  • test_timestamp_write_round_trip_*

As such, when this issue is fixed then that test should also be updated.

ttnghia avatar Aug 11 '22 21:08 ttnghia

It seems this bug is happening when the parquet write is falling back to CPU when the input is Array(Timestamp). I tried to insert throw new Exception in the plugin code parquet write but no exception was thrown.

ttnghia avatar Aug 11 '22 23:08 ttnghia

This is the code to reproduce:

scala> import java.sql.Timestamp
import java.sql.Timestamp

scala> val df = Seq(Seq(Timestamp.valueOf("4456-03-21 19:12:30.001070"), Timestamp.valueOf("2015-01-01 02:11:05.123456"))).toDF("v")
df: org.apache.spark.sql.DataFrame = [v: array<timestamp>]

scala> spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")

scala> spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED")

scala> spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED")

scala> spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED")

scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")

scala> df.coalesce(1).write.mode("overwrite").parquet("/tmp/timestamp.parquet")
22/08/11 23:29:59 WARN GpuOverrides: 
*Exec <DataWritingCommandExec> will run on GPU
  *Output <InsertIntoHadoopFsRelationCommand> will run on GPU
  *Exec <CoalesceExec> will run on GPU
    ! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
      @Expression <AttributeReference> v#4 could run on GPU


scala> val df2 = spark.read.parquet("/tmp/timestamp.parquet")
df2: org.apache.spark.sql.DataFrame = [v: array<timestamp>]

scala> df2.show(false)
22/08/11 23:30:26 WARN GpuOverrides: 
*Exec <CollectLimitExec> will run on GPU
  *Partitioning <SinglePartition$> will run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> cast(v#7 as string) AS v#10 will run on GPU
      *Expression <Cast> cast(v#7 as string) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

+-------------------------------------------------------+
|v                                                      |
+-------------------------------------------------------+
|[4456-03-21 19:12:30.00107, 2015-01-01 02:11:05.123456]|
+-------------------------------------------------------+

Obviously spark.sql.parquet.outputTimestampType value is not given to the CPU side when falling back.

ttnghia avatar Aug 11 '22 23:08 ttnghia

The code to handle this is here

https://github.com/NVIDIA/spark-rapids/blob/2b9fa502caac77b451e8a6dc84842a3b182592c8/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala#L304-L350

It should work. Not 100% sure what is happening that is causing it to fail.

revans2 avatar Aug 12 '22 14:08 revans2

That is the plug-in code. Here the issue is falling back to CPU (that code is not called at all) but the configuration is not passed to the CPU.

ttnghia avatar Aug 12 '22 14:08 ttnghia

I created a scala test with the example and get the same results and the debugger shows that the write is happening on GPU (there is no reason for it to fall back to CPU). It hits breakpoints in GpuFileFormatWriter#write and GpuParquetFileFormat#prepareWrite.

test("nghia") {
  withGpuSparkSession(spark => {
    import spark.implicits._
    val df = Seq(Seq(Timestamp.valueOf("4456-03-21 19:12:30.001070"),
      Timestamp.valueOf("2015-01-01 02:11:05.123456"))).toDF("v")

    spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
    spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED")
    spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED")
    spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED")
    spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
    spark.conf.set("spark.rapids.sql.explain", "ALL")

    df.coalesce(1).write.mode("overwrite").parquet("/tmp/timestamp.parquet")

    val df2 = spark.read.parquet("/tmp/timestamp.parquet")
    df2.collect().foreach(println)
  })
}

Produces:

[WrappedArray(4456-03-21 19:12:30.00107, 2015-01-01 02:11:05.123456)]

andygrove avatar Aug 12 '22 14:08 andygrove

Write on GPU and column has type Timestamp(Microsecond):

+----------------------------------------------------------+
| v                                                        |
+----------------------------------------------------------+
| [4456-03-21 19:12:30.001070, 2015-01-01 02:11:05.123456] |
+----------------------------------------------------------+

Write on CPU and column has type Timestamp(Millisecond):

+----------------------------------------------------+
| v                                                  |
+----------------------------------------------------+
| [4456-03-21 19:12:30.001, 2015-01-01 02:11:05.123] |
+----------------------------------------------------+

So the bug is in the GPU parquet writing.

andygrove avatar Aug 12 '22 15:08 andygrove

Oh then I was wrong. I tried to insert the throw exception in the write millisecond type code path and didn't see it executed. So the GPU write was actually executed but the config was dropped somehow before reaching the write function.

ttnghia avatar Aug 12 '22 15:08 ttnghia

I think I've found the problem. Look at this line: https://github.com/NVIDIA/spark-rapids/blob/2b9fa502caac77b451e8a6dc84842a3b182592c8/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala#L308

So the input data type is checked. Various cases are considered but only for the input type is DecimalType. When the input is not DecimalType but nested types containing DecimalType then no conversion will happen.

ttnghia avatar Aug 12 '22 16:08 ttnghia

Fixing this would be complicated IMO as we may need to consider various nested types and do the conversion appropriately.

ttnghia avatar Aug 12 '22 17:08 ttnghia