spark-rapids
spark-rapids copied to clipboard
[BUG] GPU JSON reader fails to read the JSON string of an empty body
Describe the bug
GPU JSON reader can not read the JSON string of an empty body {}. But Spark can read it successfully.
Steps/Code to reproduce bug There are two sub cases, and GPU read will fail due to different errors.
$ cat no-body.json
{}
- Read without specifying a schema.
scala> spark.read.json("/data/tmp/no-body.json").show
++
||
++
||
++
scala> spark.read.json("/data/tmp/no-body.json").show
23/01/30 02:37:39 WARN GpuOverrides:
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
@Partitioning <SinglePartition$> could run on GPU
*Exec <FileSourceScanExec> will run on GPU
23/01/30 02:37:39 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 9)
java.lang.UnsupportedOperationException: empty.min
at scala.collection.TraversableOnce.min(TraversableOnce.scala:227)
at scala.collection.TraversableOnce.min$(TraversableOnce.scala:225)
at org.apache.spark.sql.types.StructType.min(StructType.scala:102)
at com.nvidia.spark.rapids.GpuTextBasedPartitionReader.readToTable(GpuTextBasedPartitionReader.scala:299)
......
- Read with a specified schema
scala> spark.read.schema("a int").json("/data/tmp/no-body.json").show
+----+
| a|
+----+
|null|
+----+
scala> spark.read.schema("a int").json("/data/tmp/no-body.json").show
23/01/30 02:40:04 WARN GpuOverrides:
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
@Partitioning <SinglePartition$> could run on GPU
*Exec <ProjectExec> will run on GPU
*Expression <Alias> cast(a#46 as string) AS a#49 will run on GPU
*Expression <Cast> cast(a#46 as string) will run on GPU
*Exec <FileSourceScanExec> will run on GPU
23/01/30 02:40:04 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 10)
java.io.IOException: Error when processing file [path: file:///data/tmp/no-body.json, range: 0-3, partition values: [empty row]]
at org.apache.spark.sql.catalyst.json.rapids.JsonPartitionReader.$anonfun$readToTable$1(GpuJsonScan.scala:290)
......
Caused by: ai.rapids.cudf.CudfException: CUDF failure at: /home/jenkins/agent/workspace/jenkins-spark-rapids-jni_nightly-pre_release-106-cuda11/thirdparty/cudf/cpp/src/io/json/reader_impl.cu:639: Error determining column names.
at ai.rapids.cudf.Table.readJSON(Native Method)
at ai.rapids.cudf.Table.readJSON(Table.java:1049)
at org.apache.spark.sql.catalyst.json.rapids.JsonPartitionReader.$anonfun$readToTable$1(GpuJsonScan.scala:287)
Expected behavior GPU JSON reader should handle it as what Spark does.
Additional context cudf Python has fixed the second sub issue by switching the JSON engine to the new reader, so JNI should also make the same switch when creating the read option. We need to test it well to make sure no regression will be introduced by this new JSON reader.
After fixing this, we need to enable the tests xfailed in https://github.com/NVIDIA/spark-rapids/pull/7447.
We should switch to the new JSON reader per issue #7518
I just re-tested this, and it is still an issue even after switching to the new engine.
scala> spark.read.json("no-body.json").show
24/01/17 00:02:09 WARN GpuOverrides:
!Exec <FileSourceScanExec> cannot run on GPU because unsupported file format: org.apache.spark.sql.execution.datasources.text.TextFileFormat
24/01/17 00:02:09 WARN GpuOverrides:
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
@Partitioning <SinglePartition$> could run on GPU
*Exec <FileSourceScanExec> will run on GPU
24/01/17 00:02:09 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 6)
java.lang.UnsupportedOperationException: empty.min
at scala.collection.TraversableOnce.min(TraversableOnce.scala:227)
at scala.collection.TraversableOnce.min$(TraversableOnce.scala:225)
at org.apache.spark.sql.types.StructType.min(StructType.scala:102)
at com.nvidia.spark.rapids.GpuTextBasedPartitionReader.readToTable(GpuTextBasedPartitionReader.scala:298)
This only seems to be an issue for a JSON file that only contains empty entries. If there is at least one non-empty row, then we match Spark.
$ cat with-body.json
{}
{ "a": 4 }
scala> spark.read.json("with-body.json").show
24/01/17 00:26:26 WARN GpuOverrides:
!Exec <FileSourceScanExec> cannot run on GPU because unsupported file format: org.apache.spark.sql.execution.datasources.text.TextFileFormat
24/01/17 00:26:26 WARN GpuOverrides:
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
@Partitioning <SinglePartition$> could run on GPU
*Exec <ProjectExec> will run on GPU
*Expression <Alias> cast(a#22L as string) AS a#25 will run on GPU
*Expression <Cast> cast(a#22L as string) will run on GPU
!Exec <FileSourceScanExec> cannot run on GPU because JSON input and output has been disabled. To enable set spark.rapids.sql.format.json.enabled to true
+----+
| a|
+----+
|null|
| 4|
+----+
@res-life are you still planning on working on this?
The failures are happening in two places. If you don't provide a schema, then schema discovery returns with an empty schema. CUDF does not like this so we try to make one up, and try to pull something out of the dataSchema, which is also empty and results in a crash.
If we do provide a schema, then we run into a null pointer exception when trying to read the data.
spark.read.schema("a string").json("./no-body.json").show
...
Caused by: java.lang.NullPointerException
at ai.rapids.cudf.TableWithMeta.getColumnNames(TableWithMeta.java:132)
at ai.rapids.cudf.Table.gatherJSONColumns(Table.java:1211)
at ai.rapids.cudf.Table.readJSON(Table.java:1373)
at org.apache.spark.sql.catalyst.json.rapids.JsonPartitionReader$.$anonfun$readToTable$2(GpuJsonScan.scala:325)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at org.apache.spark.sql.catalyst.json.rapids.JsonPartitionReader$.$anonfun$readToTable$1(GpuJsonScan.scala:323)
at com.nvidia.spark.rapids.RmmRapidsRetryIterator$AutoCloseableAttemptSpliterator.next(RmmRapidsRetryIterator.scala:477)
at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryIterator.next(RmmRapidsRetryIterator.scala:613)
at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryAutoCloseableIterator.next(RmmRapidsRetryIterator.scala:517)
We should not be trying to use the data schema if the read data schema is empty. That might result in us reading in the wrong data if it actually succeeded, because the only time that readDataSchema is empty but data schema is not is if we have partition columns.
In the short term I think we just need to fall back to the CPU if the readDataSchema is empty, and we should concentrate on fixing the null pointer exception.
@res-life are you still planning on working on this?
No, I'm now focusing on get-json-object issues, maybe anyone else can take this.